Skip to content

Commit

Permalink
Improve kernel panic when applying transactions to the store
Browse files Browse the repository at this point in the history
Before the transaction commit process was in charge to raise a kernel
panic in case of failure in transaction application and since the
commit process is different between single and HA we had to duplicate
that logic in several places: TransactionRepresentationCommitProcess
and TransactionCommittingResponseUnpacker.

Now the responsibility for kernel panic is moved down into
TransactionRepresentationStoreApplier and IndexUpdatesValidator.
Moreover such a logic is not spread around multiple modules but it
lives only inside the kernel.
  • Loading branch information
davidegrohmann committed Oct 28, 2015
1 parent 43a9e9c commit 9753cb6
Show file tree
Hide file tree
Showing 21 changed files with 169 additions and 135 deletions.
Expand Up @@ -34,7 +34,6 @@
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.GraphDatabaseAPI;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.api.ReadOperations;
import org.neo4j.kernel.api.direct.DirectStoreAccess;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
Expand Down Expand Up @@ -358,7 +357,6 @@ protected void applyTransaction( Transaction transaction ) throws TransactionFai
TransactionRepresentationCommitProcess commitProcess =
new TransactionRepresentationCommitProcess(
dependencyResolver.resolveDependency( LogicalTransactionStore.class ),
dependencyResolver.resolveDependency( KernelHealth.class ),
dependencyResolver.resolveDependency( NeoStoreProvider.class ).evaluate(),
dependencyResolver.resolveDependency( TransactionRepresentationStoreApplier.class ),
dependencyResolver.resolveDependency( IndexUpdatesValidator.class ) );
Expand Down
Expand Up @@ -226,8 +226,7 @@ public interface Dependencies
public static CommitProcessFactory defaultCommitProcessFactory = new CommitProcessFactory()
{
@Override
public TransactionCommitProcess create( LogicalTransactionStore logicalTransactionStore,
KernelHealth kernelHealth, NeoStore neoStore,
public TransactionCommitProcess create( LogicalTransactionStore logicalTransactionStore, NeoStore neoStore,
TransactionRepresentationStoreApplier storeApplier,
NeoStoreInjectedTransactionValidator txValidator,
IndexUpdatesValidator indexUpdatesValidator, Config config )
Expand All @@ -238,8 +237,8 @@ public TransactionCommitProcess create( LogicalTransactionStore logicalTransacti
}
else
{
return new TransactionRepresentationCommitProcess( logicalTransactionStore, kernelHealth,
neoStore, storeApplier, indexUpdatesValidator );
return new TransactionRepresentationCommitProcess( logicalTransactionStore, neoStore, storeApplier,
indexUpdatesValidator );
}
}
};
Expand Down
Expand Up @@ -744,8 +744,8 @@ private IndexingModule buildIndexing( Config config, JobScheduler scheduler, Sch
final IntegrityValidator integrityValidator = new IntegrityValidator( neoStore, indexingService );

final IndexUpdatesValidator indexUpdatesValidator = dependencies.satisfyDependency(
new OnlineIndexUpdatesValidator( neoStore, new PropertyLoader( neoStore ), indexingService,
IndexUpdateMode.ONLINE ) );
new OnlineIndexUpdatesValidator( neoStore, kernelHealth, new PropertyLoader( neoStore ),
indexingService, IndexUpdateMode.ONLINE ) );

// TODO Move to constructor
final LabelScanStore labelScanStore = dependencyResolver.resolveDependency( LabelScanStoreProvider.class,
Expand Down Expand Up @@ -846,7 +846,7 @@ private TransactionLogModule buildTransactionLogs( Config config, Logging loggin
new TransactionRepresentationStoreApplier(
indexingService, alwaysCreateNewWriter( labelScanStore ), neoStore,
cacheAccess, lockService, legacyIndexApplierLookup,
indexConfigStore, legacyIndexTransactionOrdering ) );
indexConfigStore, kernelHealth, legacyIndexTransactionOrdering ) );

final LogFile logFile = life.add( new PhysicalLogFile( fileSystemAbstraction, logFiles,
config.get( GraphDatabaseSettings.logical_log_rotation_threshold ), neoStore,
Expand Down Expand Up @@ -961,12 +961,11 @@ private void buildRecovery( final FileSystemAbstraction fileSystemAbstraction, C
legacyIndexApplierLookup, 1000 );
final RecoveryIndexingUpdatesValidator indexUpdatesValidator = new RecoveryIndexingUpdatesValidator( indexingService );
final TransactionRepresentationStoreApplier storeRecoverer =
new TransactionRepresentationStoreApplier(
indexingService, labelScanWriters, neoStore, cacheAccess, lockService,
legacyIndexApplierLookup, indexConfigStore, IdOrderingQueue.BYPASS );
new TransactionRepresentationStoreApplier( indexingService, labelScanWriters, neoStore, cacheAccess,
lockService, legacyIndexApplierLookup, indexConfigStore, kernelHealth, IdOrderingQueue.BYPASS );

RecoveryVisitor recoveryVisitor = new RecoveryVisitor( neoStore, storeRecoverer, indexUpdatesValidator,
recoveryVisitorMonitor );
RecoveryVisitor recoveryVisitor =
new RecoveryVisitor( neoStore, storeRecoverer, indexUpdatesValidator, recoveryVisitorMonitor );

LogEntryReader<ReadableVersionableLogChannel> logEntryReader = new VersionAwareLogEntryReader<>();
final Visitor<LogVersionedStoreChannel,IOException> logFileRecoverer =
Expand Down Expand Up @@ -1042,7 +1041,7 @@ private KernelModule buildKernel( IntegrityValidator integrityValidator,
PersistenceCache persistenceCache, SchemaIndexProviderMap schemaIndexProviderMap )
{
final TransactionCommitProcess transactionCommitProcess =
commitProcessFactory.create( logicalTransactionStore, kernelHealth, neoStore, storeApplier,
commitProcessFactory.create( logicalTransactionStore, neoStore, storeApplier,
new NeoStoreInjectedTransactionValidator( integrityValidator ), indexUpdatesValidator, config );

/*
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;

import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.RecoveryLabelScanWriterProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.impl.api.index.IndexingService;
Expand All @@ -43,12 +44,13 @@ public class BatchingTransactionRepresentationStoreApplier extends TransactionRe
public BatchingTransactionRepresentationStoreApplier( IndexingService indexingService,
LabelScanStore labelScanStore, NeoStore neoStore, CacheAccessBackDoor cacheAccess,
LockService lockService, LegacyIndexApplierLookup legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, IdOrderingQueue legacyIndexTransactionOrdering )
IndexConfigStore indexConfigStore, KernelHealth kernelHealth,
IdOrderingQueue legacyIndexTransactionOrdering )
{
this( indexingService, new RecoveryLabelScanWriterProvider( labelScanStore, 1000 ),
neoStore, cacheAccess, lockService,
new RecoveryLegacyIndexApplierLookup( legacyIndexProviderLookup, 1000 ),
indexConfigStore, legacyIndexTransactionOrdering );
indexConfigStore, kernelHealth, legacyIndexTransactionOrdering );
}

private BatchingTransactionRepresentationStoreApplier(
Expand All @@ -59,18 +61,27 @@ private BatchingTransactionRepresentationStoreApplier(
LockService lockService,
RecoveryLegacyIndexApplierLookup legacyIndexApplierLookup,
IndexConfigStore indexConfigStore,
KernelHealth kernelHealth,
IdOrderingQueue legacyIndexTransactionOrdering )
{
super( indexingService, labelScanWriterProvider, neoStore, cacheAccess, lockService, legacyIndexApplierLookup,
indexConfigStore, legacyIndexTransactionOrdering );
indexConfigStore, kernelHealth, legacyIndexTransactionOrdering );
this.labelScanWriterProvider = labelScanWriterProvider;
this.legacyIndexApplierLookup = legacyIndexApplierLookup;
}

public void closeBatch() throws IOException
{
labelScanWriterProvider.close();
legacyIndexApplierLookup.close();
indexingService.flushAll();
try
{
labelScanWriterProvider.close();
legacyIndexApplierLookup.close();
indexingService.flushAll();
}
catch ( Throwable ex )
{
health.panic( ex );
throw ex;
}
}
}
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.api;

import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
import org.neo4j.kernel.impl.store.NeoStore;
Expand All @@ -28,7 +27,7 @@

public interface CommitProcessFactory
{
TransactionCommitProcess create( LogicalTransactionStore logicalTransactionStore, KernelHealth kernelHealth,
TransactionCommitProcess create( LogicalTransactionStore logicalTransactionStore,
NeoStore neoStore, TransactionRepresentationStoreApplier storeApplier,
NeoStoreInjectedTransactionValidator txValidator,
IndexUpdatesValidator indexUpdatesValidator, Config config );
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.api;

import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.impl.api.index.IndexUpdatesValidator;
Expand All @@ -38,18 +37,16 @@
public class TransactionRepresentationCommitProcess implements TransactionCommitProcess
{
private final LogicalTransactionStore logicalTransactionStore;
private final KernelHealth kernelHealth;
private final TransactionIdStore transactionIdStore;
private final TransactionRepresentationStoreApplier storeApplier;
private final IndexUpdatesValidator indexUpdatesValidator;

public TransactionRepresentationCommitProcess( LogicalTransactionStore logicalTransactionStore,
KernelHealth kernelHealth, TransactionIdStore transactionIdStore,
TransactionRepresentationStoreApplier storeApplier, IndexUpdatesValidator indexUpdatesValidator )
TransactionIdStore transactionIdStore, TransactionRepresentationStoreApplier storeApplier,
IndexUpdatesValidator indexUpdatesValidator )
{
this.logicalTransactionStore = logicalTransactionStore;
this.transactionIdStore = transactionIdStore;
this.kernelHealth = kernelHealth;
this.storeApplier = storeApplier;
this.indexUpdatesValidator = indexUpdatesValidator;
}
Expand Down Expand Up @@ -89,7 +86,7 @@ private long appendToLog(
}
catch ( Throwable e )
{
throw exception( Status.Transaction.CouldNotWriteToLog, e,
throw new TransactionFailureException( Status.Transaction.CouldNotWriteToLog, e,
"Could not append transaction representation to log" );
}
commitEvent.setTransactionId( transactionId );
Expand All @@ -106,20 +103,14 @@ private void applyToStore(
storeApplier.apply( transaction, indexUpdates, locks, transactionId, mode );
}
// TODO catch different types of exceptions here, some which are OK
catch ( Throwable e )
catch ( Throwable cause )
{
throw exception( CouldNotCommit, e,
throw new TransactionFailureException( CouldNotCommit, cause,
"Could not apply the transaction to the store after written to log" );
}
finally
{
transactionIdStore.transactionClosed( transactionId );
}
}

private TransactionFailureException exception( Status status, Throwable cause, String message )
{
kernelHealth.panic( cause );
return new TransactionFailureException( status, cause, message );
}
}
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.concurrent.WorkSync;
import org.neo4j.helpers.Provider;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.index.ValidatedIndexUpdates;
import org.neo4j.kernel.impl.core.CacheAccessBackDoor;
Expand Down Expand Up @@ -54,14 +55,16 @@ public class TransactionRepresentationStoreApplier
private final Provider<LabelScanWriter> labelScanWriters;
private final IndexConfigStore indexConfigStore;
private final LegacyIndexApplierLookup legacyIndexProviderLookup;
protected final KernelHealth health;
private final IdOrderingQueue legacyIndexTransactionOrdering;

private final WorkSync<Provider<LabelScanWriter>,IndexTransactionApplier.LabelUpdateWork> labelScanStoreSync;

public TransactionRepresentationStoreApplier(
IndexingService indexingService, Provider<LabelScanWriter> labelScanWriters, NeoStore neoStore,
CacheAccessBackDoor cacheAccess, LockService lockService, LegacyIndexApplierLookup legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, IdOrderingQueue legacyIndexTransactionOrdering )
CacheAccessBackDoor cacheAccess, LockService lockService, LegacyIndexApplierLookup
legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, KernelHealth health, IdOrderingQueue legacyIndexTransactionOrdering )
{
this.indexingService = indexingService;
this.labelScanWriters = labelScanWriters;
Expand All @@ -70,13 +73,13 @@ public TransactionRepresentationStoreApplier(
this.lockService = lockService;
this.legacyIndexProviderLookup = legacyIndexProviderLookup;
this.indexConfigStore = indexConfigStore;
this.health = health;
this.legacyIndexTransactionOrdering = legacyIndexTransactionOrdering;
labelScanStoreSync = new WorkSync<>( labelScanWriters );
}

public void apply( TransactionRepresentation representation, ValidatedIndexUpdates indexUpdates, LockGroup locks,
long transactionId, TransactionApplicationMode mode )
throws IOException
long transactionId, TransactionApplicationMode mode ) throws IOException
{
// Graph store application. The order of the decorated store appliers is irrelevant
NeoCommandHandler storeApplier = new NeoStoreTransactionApplier(
Expand Down Expand Up @@ -107,12 +110,17 @@ public void apply( TransactionRepresentation representation, ValidatedIndexUpdat
{
representation.accept( applier );
}
catch ( Throwable cause )
{
health.panic( cause );
throw cause;
}
}

private NeoCommandHandler getCountsStoreApplier( long transactionId, TransactionApplicationMode mode )
{
Optional<NeoCommandHandler> handlerOption = neoStore.getCounts().apply( transactionId )
.map( CountsStoreApplier.FACTORY );
Optional<NeoCommandHandler> handlerOption =
neoStore.getCounts().apply( transactionId ).map( CountsStoreApplier.FACTORY );
if ( mode == TransactionApplicationMode.RECOVERY )
{
handlerOption = handlerOption.or( NeoCommandHandler.EMPTY );
Expand All @@ -124,7 +132,6 @@ public TransactionRepresentationStoreApplier withLegacyIndexTransactionOrdering(
IdOrderingQueue legacyIndexTransactionOrdering )
{
return new TransactionRepresentationStoreApplier( indexingService, labelScanWriters, neoStore, cacheAccess,
lockService, legacyIndexProviderLookup, indexConfigStore,
legacyIndexTransactionOrdering );
lockService, legacyIndexProviderLookup, indexConfigStore, health, legacyIndexTransactionOrdering );
}
}
Expand Up @@ -21,6 +21,7 @@

import java.io.IOException;

import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.api.index.NodePropertyUpdate;
import org.neo4j.kernel.impl.store.NeoStore;
import org.neo4j.kernel.impl.store.NodeStore;
Expand All @@ -34,19 +35,21 @@
* {@link org.neo4j.kernel.impl.transaction.command.Command}s in transaction state.
* It is done by inferring {@link org.neo4j.kernel.api.index.NodePropertyUpdate}s from commands and asking
* {@link org.neo4j.kernel.impl.api.index.IndexingService} to check those via
* {@link org.neo4j.kernel.impl.api.index.IndexingService#validate(Iterable)}.
* {@link org.neo4j.kernel.impl.api.index.IndexingService#validate(Iterable,IndexUpdateMode)}.
*/
public class OnlineIndexUpdatesValidator implements IndexUpdatesValidator
{
private final NodeStore nodeStore;
private final PropertyStore propertyStore;
private final PropertyLoader propertyLoader;
private final IndexingService indexing;
private final KernelHealth kernelHealth;
private final IndexUpdateMode updateMode;

public OnlineIndexUpdatesValidator( NeoStore neoStore, PropertyLoader propertyLoader, IndexingService indexing,
IndexUpdateMode updateMode )
public OnlineIndexUpdatesValidator( NeoStore neoStore, KernelHealth kernelHealth, PropertyLoader propertyLoader,
IndexingService indexing, IndexUpdateMode updateMode )
{
this.kernelHealth = kernelHealth;
this.updateMode = updateMode;
this.nodeStore = neoStore.getNodeStore();
this.propertyStore = neoStore.getPropertyStore();
Expand All @@ -55,11 +58,18 @@ public OnlineIndexUpdatesValidator( NeoStore neoStore, PropertyLoader propertyLo
}

@Override
public ValidatedIndexUpdates validate( TransactionRepresentation transaction )
throws IOException
public ValidatedIndexUpdates validate( TransactionRepresentation transaction ) throws IOException
{
NodePropertyCommandsExtractor extractor = new NodePropertyCommandsExtractor();
transaction.accept( extractor );
try
{
transaction.accept( extractor );
}
catch ( IOException cause )
{
kernelHealth.panic( cause );
throw cause;
}

if ( !extractor.containsAnyNodeOrPropertyUpdate() )
{
Expand Down
Expand Up @@ -30,8 +30,6 @@
*/
public interface TransactionRepresentation
{
public static final int NO_LOCK_SESSION = -1;

/**
* Accepts a visitor into the commands making up this transaction.
* @param visitor {@link Visitor} which will see the commands.
Expand Down
Expand Up @@ -308,6 +308,11 @@ private void forceLog( LogAppendEvent logAppendEvent ) throws IOException
{
force();
}
catch ( final Throwable panic )
{
kernelHealth.panic( panic );
throw panic;
}

unparkAll( links );
}
Expand Down

0 comments on commit 9753cb6

Please sign in to comment.