Skip to content

Commit

Permalink
Panic in case of any unexpected problems during queued transaction ap…
Browse files Browse the repository at this point in the history
…ply on slave.

Update handling of exceptions to always propagate unexpected exception to kernelPanic.
  • Loading branch information
MishaDemianenko committed Jun 30, 2016
1 parent 653b75d commit c245d5b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 21 deletions.
Expand Up @@ -337,13 +337,13 @@ public void apply( CommandsToApply batch, TransactionApplicationMode mode ) thro
}
batch = batch.next();
}
catch ( Throwable cause )
{
databaseHealth.panic( cause );
throw cause;
}
}
}
catch ( Throwable cause )
{
databaseHealth.panic( cause );
throw cause;
}
}

/**
Expand All @@ -353,7 +353,7 @@ public void apply( CommandsToApply batch, TransactionApplicationMode mode ) thro
*
* After all transactions have been applied the appliers are closed.
*/
private BatchTransactionApplierFacade applier( TransactionApplicationMode mode )
protected BatchTransactionApplierFacade applier( TransactionApplicationMode mode )
{
ArrayList<BatchTransactionApplier> appliers = new ArrayList<>();
// Graph store application. The order of the decorated store appliers is irrelevant
Expand Down
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.util.Collections;
import java.util.function.Function;
import java.util.function.Supplier;

import org.neo4j.helpers.collection.Iterables;
Expand All @@ -31,27 +32,33 @@
import org.neo4j.kernel.api.index.SchemaIndexProvider;
import org.neo4j.kernel.api.labelscan.LabelScanStore;
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade;
import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot;
import org.neo4j.kernel.impl.api.LegacyIndexProviderLookup;
import org.neo4j.kernel.impl.api.index.IndexingService;
import org.neo4j.kernel.impl.api.scan.InMemoryLabelScanStore;
import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider;
import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics;
import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator;
import org.neo4j.kernel.impl.core.LabelTokenHolder;
import org.neo4j.kernel.impl.core.PropertyKeyTokenHolder;
import org.neo4j.kernel.impl.core.RelationshipTypeTokenHolder;
import org.neo4j.kernel.impl.index.IndexConfigStore;
import org.neo4j.kernel.impl.locking.LockService;
import org.neo4j.kernel.impl.locking.ReentrantLockService;
import org.neo4j.kernel.impl.store.id.IdGeneratorFactory;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.impl.util.JobScheduler;
import org.neo4j.kernel.impl.util.Neo4jJobScheduler;
import org.neo4j.kernel.impl.util.SynchronizedArrayIdOrderingQueue;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.KernelEventHandlers;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLog;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.test.ExternalResource;
import org.neo4j.test.PageCacheRule;
import org.neo4j.test.impl.EphemeralIdGenerator;
Expand All @@ -64,7 +71,7 @@
* {@link PageCache}, which usually are managed by test rules themselves. That's why they are passed in
* when {@link #getWith(FileSystemAbstraction, PageCache) getting (constructing)} the engine. Further
* dependencies can be overridden in that returned builder as well.
*
* <p>
* Keep in mind that this rule must be created BEFORE {@link PageCacheRule} and any file system rule so that
* shutdown order gets correct.
*/
Expand All @@ -85,7 +92,8 @@ public Builder getWith( FileSystemAbstraction fs, PageCache pageCache )
}

private RecordStorageEngine get( FileSystemAbstraction fs, PageCache pageCache, LabelScanStore labelScanStore,
SchemaIndexProvider schemaIndexProvider, DatabaseHealth databaseHealth, File storeDirectory )
SchemaIndexProvider schemaIndexProvider, DatabaseHealth databaseHealth, File storeDirectory,
Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade> transactionApplierTransformer )
{
if ( !fs.fileExists( storeDirectory ) && !fs.mkdir( storeDirectory ) )
{
Expand All @@ -100,14 +108,16 @@ private RecordStorageEngine get( FileSystemAbstraction fs, PageCache pageCache,
Config config = Config.defaults();
Supplier<KernelTransactionsSnapshot> txSnapshotSupplier =
() -> new KernelTransactionsSnapshot( Collections.emptySet() );
return life.add( new RecordStorageEngine( storeDirectory, config, idGeneratorFactory,
return life.add( new ExtendedRecordStorageEngine( storeDirectory, config, idGeneratorFactory,
IdReuseEligibility.ALWAYS, pageCache, fs, NullLogProvider.getInstance(),
mock( PropertyKeyTokenHolder.class ), mock( LabelTokenHolder.class ),
mock( RelationshipTypeTokenHolder.class ), () -> {}, new StandardConstraintSemantics(),
mock( RelationshipTypeTokenHolder.class ), () ->
{
}, new StandardConstraintSemantics(),
scheduler, mock( TokenNameLookup.class ), new ReentrantLockService(),
schemaIndexProvider, IndexingService.NO_MONITOR, databaseHealth,
labelScanStoreProvider, legacyIndexProviderLookup, indexConfigStore,
new SynchronizedArrayIdOrderingQueue( 20 ), txSnapshotSupplier ) );
new SynchronizedArrayIdOrderingQueue( 20 ), txSnapshotSupplier, transactionApplierTransformer ) );
}

@Override
Expand All @@ -126,6 +136,8 @@ public class Builder
new DatabasePanicEventGenerator( new KernelEventHandlers( NullLog.getInstance() ) ),
NullLog.getInstance() );
private File storeDirectory = new File( "/graph.db" );
private Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade> transactionApplierTransformer =
applierFacade -> applierFacade;
private SchemaIndexProvider schemaIndexProvider = SchemaIndexProvider.NO_INDEX_PROVIDER;

public Builder( FileSystemAbstraction fs, PageCache pageCache )
Expand All @@ -140,6 +152,13 @@ public Builder labelScanStore( LabelScanStore labelScanStore )
return this;
}

public Builder transactionApplierTransformer(
Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade> transactionApplierTransformer )
{
this.transactionApplierTransformer = transactionApplierTransformer;
return this;
}

public Builder indexProvider( SchemaIndexProvider schemaIndexProvider )
{
this.schemaIndexProvider = schemaIndexProvider;
Expand All @@ -162,7 +181,50 @@ public Builder storeDirectory( File storeDirectory )

public RecordStorageEngine build()
{
return get( fs, pageCache, labelScanStore, schemaIndexProvider, databaseHealth, storeDirectory );
return get( fs, pageCache, labelScanStore, schemaIndexProvider, databaseHealth, storeDirectory,
transactionApplierTransformer );
}

}

private class ExtendedRecordStorageEngine extends RecordStorageEngine
{

private final Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade>
transactionApplierTransformer;

public ExtendedRecordStorageEngine( File storeDir, Config config,
IdGeneratorFactory idGeneratorFactory, IdReuseEligibility eligibleForReuse,
PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider,
PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHolder labelTokens,
RelationshipTypeTokenHolder relationshipTypeTokens, Runnable schemaStateChangeCallback,
ConstraintSemantics constraintSemantics, JobScheduler scheduler,
TokenNameLookup tokenNameLookup, LockService lockService,
SchemaIndexProvider indexProvider,
IndexingService.Monitor indexingServiceMonitor, DatabaseHealth databaseHealth,
LabelScanStoreProvider labelScanStoreProvider,
LegacyIndexProviderLookup legacyIndexProviderLookup,
IndexConfigStore indexConfigStore, IdOrderingQueue legacyIndexTransactionOrdering,
Supplier<KernelTransactionsSnapshot> transactionsSnapshotSupplier,
Function<BatchTransactionApplierFacade,BatchTransactionApplierFacade>
transactionApplierTransformer )
{
super( storeDir, config, idGeneratorFactory, eligibleForReuse, pageCache, fs, logProvider,
propertyKeyTokenHolder,
labelTokens, relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, scheduler,
tokenNameLookup, lockService, indexProvider, indexingServiceMonitor, databaseHealth,
labelScanStoreProvider,
legacyIndexProviderLookup, indexConfigStore, legacyIndexTransactionOrdering,
transactionsSnapshotSupplier );
this.transactionApplierTransformer = transactionApplierTransformer;
}


@Override
protected BatchTransactionApplierFacade applier( TransactionApplicationMode mode )
{
BatchTransactionApplierFacade recordEngineApplier = super.applier( mode );
return transactionApplierTransformer.apply( recordEngineApplier );
}
}
}
Expand Up @@ -32,6 +32,8 @@
import org.neo4j.io.pagecache.DelegatingPageCache;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.kernel.impl.api.BatchTransactionApplier;
import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.api.TransactionToApply;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
Expand All @@ -40,6 +42,7 @@
import org.neo4j.kernel.impl.transaction.log.FakeCommitment;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.storageengine.api.CommandsToApply;
import org.neo4j.storageengine.api.TransactionApplicationMode;
import org.neo4j.test.EphemeralFileSystemRule;
import org.neo4j.test.PageCacheRule;
Expand All @@ -61,6 +64,7 @@ public class RecordStorageEngineTest
private final RecordStorageEngineRule storageEngineRule = new RecordStorageEngineRule();
private final EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
private final PageCacheRule pageCacheRule = new PageCacheRule();
private DatabaseHealth databaseHealth = mock( DatabaseHealth.class );

@Rule
public RuleChain ruleChain = RuleChain.outerRule( fsRule )
Expand All @@ -70,29 +74,52 @@ public class RecordStorageEngineTest
@Test( timeout = 30_000 )
public void shutdownRecordStorageEngineAfterFailedTransaction() throws Throwable
{
RecordStorageEngine engine =
storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ).build();
RecordStorageEngine engine = buildRecordStorageEngine();
Exception applicationError = executeFailingTransaction( engine );
assertNotNull( applicationError );
}

@Test
public void panicOnExceptionDuringCommandsApply() throws Exception
{
IllegalStateException failure = new IllegalStateException( "Too many open files" );
RecordStorageEngine engine = storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) )
.databaseHealth( databaseHealth )
.transactionApplierTransformer( facade -> transactionApplierFacadeTransformer( facade, failure ) )
.build();
CommandsToApply commandsToApply = mock( CommandsToApply.class );

try
{
engine.apply( commandsToApply, TransactionApplicationMode.INTERNAL );
fail( "Exception expected" );
}
catch ( Exception exception )
{
assertSame( failure, Exceptions.rootCause( exception ) );
}

verify( databaseHealth ).panic( any( Throwable.class ) );
}

private static BatchTransactionApplierFacade transactionApplierFacadeTransformer(
BatchTransactionApplierFacade facade, Exception failure )
{
return new FailingBatchTransactionApplierFacade( failure, facade );
}

@Test
public void databasePanicIsRaisedWhenTxApplicationFails() throws Throwable
{
DatabaseHealth databaseHealth = mock( DatabaseHealth.class );
RecordStorageEngine engine =
storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) )
.databaseHealth( databaseHealth )
.build();
RecordStorageEngine engine = buildRecordStorageEngine();
Exception applicationError = executeFailingTransaction( engine );
verify( databaseHealth ).panic( applicationError );
}

@Test( timeout = 30_000 )
public void obtainCountsStoreResetterAfterFailedTransaction() throws Throwable
{
RecordStorageEngine engine =
storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ).build();
RecordStorageEngine engine = buildRecordStorageEngine();
Exception applicationError = executeFailingTransaction( engine );
assertNotNull( applicationError );

Expand Down Expand Up @@ -126,6 +153,13 @@ public void flushAndForce( IOLimiter limiter ) throws IOException
assertThat( observedLimiter.get(), sameInstance( limiter ) );
}

private RecordStorageEngine buildRecordStorageEngine()
{
return storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) )
.databaseHealth( databaseHealth )
.build();
}

private Exception executeFailingTransaction( RecordStorageEngine engine ) throws IOException
{
Exception applicationError = new UnderlyingStorageException( "No space left on device" );
Expand Down Expand Up @@ -156,4 +190,22 @@ private static TransactionToApply newTransactionThatFailsWith( Exception error )
txToApply.commitment( commitment, txId );
return txToApply;
}

private static class FailingBatchTransactionApplierFacade extends BatchTransactionApplierFacade
{
private Exception failure;

FailingBatchTransactionApplierFacade( Exception failure, BatchTransactionApplier... appliers )
{
super( appliers );
this.failure = failure;
}

@Override
public void close() throws Exception
{
throw failure;
}
}

}

0 comments on commit c245d5b

Please sign in to comment.