From c245d5b83be2a852378a97d9d18935d60e2decb0 Mon Sep 17 00:00:00 2001 From: Mikhaylo Demianenko Date: Thu, 30 Jun 2016 18:44:58 +0200 Subject: [PATCH] Panic in case of any unexpected problems during queued transaction apply on slave. Update handling of exceptions to always propagate unexpected exception to kernelPanic. --- .../recordstorage/RecordStorageEngine.java | 12 +-- .../RecordStorageEngineRule.java | 74 +++++++++++++++++-- .../RecordStorageEngineTest.java | 70 +++++++++++++++--- 3 files changed, 135 insertions(+), 21 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java index edd67047f8315..d75d89cf36e92 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java @@ -337,13 +337,13 @@ public void apply( CommandsToApply batch, TransactionApplicationMode mode ) thro } batch = batch.next(); } - catch ( Throwable cause ) - { - databaseHealth.panic( cause ); - throw cause; - } } } + catch ( Throwable cause ) + { + databaseHealth.panic( cause ); + throw cause; + } } /** @@ -353,7 +353,7 @@ public void apply( CommandsToApply batch, TransactionApplicationMode mode ) thro * * After all transactions have been applied the appliers are closed. */ - private BatchTransactionApplierFacade applier( TransactionApplicationMode mode ) + protected BatchTransactionApplierFacade applier( TransactionApplicationMode mode ) { ArrayList appliers = new ArrayList<>(); // Graph store application. The order of the decorated store appliers is irrelevant diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineRule.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineRule.java index e6621c6511ac1..fe0ab453c2529 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineRule.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineRule.java @@ -21,6 +21,7 @@ import java.io.File; import java.util.Collections; +import java.util.function.Function; import java.util.function.Supplier; import org.neo4j.helpers.collection.Iterables; @@ -31,27 +32,33 @@ import org.neo4j.kernel.api.index.SchemaIndexProvider; import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade; import org.neo4j.kernel.impl.api.KernelTransactionsSnapshot; import org.neo4j.kernel.impl.api.LegacyIndexProviderLookup; import org.neo4j.kernel.impl.api.index.IndexingService; import org.neo4j.kernel.impl.api.scan.InMemoryLabelScanStore; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; +import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.impl.core.LabelTokenHolder; import org.neo4j.kernel.impl.core.PropertyKeyTokenHolder; import org.neo4j.kernel.impl.core.RelationshipTypeTokenHolder; import org.neo4j.kernel.impl.index.IndexConfigStore; +import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.locking.ReentrantLockService; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; +import org.neo4j.kernel.impl.util.IdOrderingQueue; import org.neo4j.kernel.impl.util.JobScheduler; import org.neo4j.kernel.impl.util.Neo4jJobScheduler; import org.neo4j.kernel.impl.util.SynchronizedArrayIdOrderingQueue; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.internal.KernelEventHandlers; import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLog; import org.neo4j.logging.NullLogProvider; +import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.test.ExternalResource; import org.neo4j.test.PageCacheRule; import org.neo4j.test.impl.EphemeralIdGenerator; @@ -64,7 +71,7 @@ * {@link PageCache}, which usually are managed by test rules themselves. That's why they are passed in * when {@link #getWith(FileSystemAbstraction, PageCache) getting (constructing)} the engine. Further * dependencies can be overridden in that returned builder as well. - * + *

* Keep in mind that this rule must be created BEFORE {@link PageCacheRule} and any file system rule so that * shutdown order gets correct. */ @@ -85,7 +92,8 @@ public Builder getWith( FileSystemAbstraction fs, PageCache pageCache ) } private RecordStorageEngine get( FileSystemAbstraction fs, PageCache pageCache, LabelScanStore labelScanStore, - SchemaIndexProvider schemaIndexProvider, DatabaseHealth databaseHealth, File storeDirectory ) + SchemaIndexProvider schemaIndexProvider, DatabaseHealth databaseHealth, File storeDirectory, + Function transactionApplierTransformer ) { if ( !fs.fileExists( storeDirectory ) && !fs.mkdir( storeDirectory ) ) { @@ -100,14 +108,16 @@ private RecordStorageEngine get( FileSystemAbstraction fs, PageCache pageCache, Config config = Config.defaults(); Supplier txSnapshotSupplier = () -> new KernelTransactionsSnapshot( Collections.emptySet() ); - return life.add( new RecordStorageEngine( storeDirectory, config, idGeneratorFactory, + return life.add( new ExtendedRecordStorageEngine( storeDirectory, config, idGeneratorFactory, IdReuseEligibility.ALWAYS, pageCache, fs, NullLogProvider.getInstance(), mock( PropertyKeyTokenHolder.class ), mock( LabelTokenHolder.class ), - mock( RelationshipTypeTokenHolder.class ), () -> {}, new StandardConstraintSemantics(), + mock( RelationshipTypeTokenHolder.class ), () -> + { + }, new StandardConstraintSemantics(), scheduler, mock( TokenNameLookup.class ), new ReentrantLockService(), schemaIndexProvider, IndexingService.NO_MONITOR, databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup, indexConfigStore, - new SynchronizedArrayIdOrderingQueue( 20 ), txSnapshotSupplier ) ); + new SynchronizedArrayIdOrderingQueue( 20 ), txSnapshotSupplier, transactionApplierTransformer ) ); } @Override @@ -126,6 +136,8 @@ public class Builder new DatabasePanicEventGenerator( new KernelEventHandlers( NullLog.getInstance() ) ), NullLog.getInstance() ); private File storeDirectory = new File( "/graph.db" ); + private Function transactionApplierTransformer = + applierFacade -> applierFacade; private SchemaIndexProvider schemaIndexProvider = SchemaIndexProvider.NO_INDEX_PROVIDER; public Builder( FileSystemAbstraction fs, PageCache pageCache ) @@ -140,6 +152,13 @@ public Builder labelScanStore( LabelScanStore labelScanStore ) return this; } + public Builder transactionApplierTransformer( + Function transactionApplierTransformer ) + { + this.transactionApplierTransformer = transactionApplierTransformer; + return this; + } + public Builder indexProvider( SchemaIndexProvider schemaIndexProvider ) { this.schemaIndexProvider = schemaIndexProvider; @@ -162,7 +181,50 @@ public Builder storeDirectory( File storeDirectory ) public RecordStorageEngine build() { - return get( fs, pageCache, labelScanStore, schemaIndexProvider, databaseHealth, storeDirectory ); + return get( fs, pageCache, labelScanStore, schemaIndexProvider, databaseHealth, storeDirectory, + transactionApplierTransformer ); + } + + } + + private class ExtendedRecordStorageEngine extends RecordStorageEngine + { + + private final Function + transactionApplierTransformer; + + public ExtendedRecordStorageEngine( File storeDir, Config config, + IdGeneratorFactory idGeneratorFactory, IdReuseEligibility eligibleForReuse, + PageCache pageCache, FileSystemAbstraction fs, LogProvider logProvider, + PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHolder labelTokens, + RelationshipTypeTokenHolder relationshipTypeTokens, Runnable schemaStateChangeCallback, + ConstraintSemantics constraintSemantics, JobScheduler scheduler, + TokenNameLookup tokenNameLookup, LockService lockService, + SchemaIndexProvider indexProvider, + IndexingService.Monitor indexingServiceMonitor, DatabaseHealth databaseHealth, + LabelScanStoreProvider labelScanStoreProvider, + LegacyIndexProviderLookup legacyIndexProviderLookup, + IndexConfigStore indexConfigStore, IdOrderingQueue legacyIndexTransactionOrdering, + Supplier transactionsSnapshotSupplier, + Function + transactionApplierTransformer ) + { + super( storeDir, config, idGeneratorFactory, eligibleForReuse, pageCache, fs, logProvider, + propertyKeyTokenHolder, + labelTokens, relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, scheduler, + tokenNameLookup, lockService, indexProvider, indexingServiceMonitor, databaseHealth, + labelScanStoreProvider, + legacyIndexProviderLookup, indexConfigStore, legacyIndexTransactionOrdering, + transactionsSnapshotSupplier ); + this.transactionApplierTransformer = transactionApplierTransformer; + } + + + @Override + protected BatchTransactionApplierFacade applier( TransactionApplicationMode mode ) + { + BatchTransactionApplierFacade recordEngineApplier = super.applier( mode ); + return transactionApplierTransformer.apply( recordEngineApplier ); } } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java index 61d16053c8faf..308070adb50f3 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngineTest.java @@ -32,6 +32,8 @@ import org.neo4j.io.pagecache.DelegatingPageCache; import org.neo4j.io.pagecache.IOLimiter; import org.neo4j.io.pagecache.PageCache; +import org.neo4j.kernel.impl.api.BatchTransactionApplier; +import org.neo4j.kernel.impl.api.BatchTransactionApplierFacade; import org.neo4j.kernel.impl.api.CountsAccessor; import org.neo4j.kernel.impl.api.TransactionToApply; import org.neo4j.kernel.impl.store.UnderlyingStorageException; @@ -40,6 +42,7 @@ import org.neo4j.kernel.impl.transaction.log.FakeCommitment; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.storageengine.api.CommandsToApply; import org.neo4j.storageengine.api.TransactionApplicationMode; import org.neo4j.test.EphemeralFileSystemRule; import org.neo4j.test.PageCacheRule; @@ -61,6 +64,7 @@ public class RecordStorageEngineTest private final RecordStorageEngineRule storageEngineRule = new RecordStorageEngineRule(); private final EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule(); private final PageCacheRule pageCacheRule = new PageCacheRule(); + private DatabaseHealth databaseHealth = mock( DatabaseHealth.class ); @Rule public RuleChain ruleChain = RuleChain.outerRule( fsRule ) @@ -70,20 +74,44 @@ public class RecordStorageEngineTest @Test( timeout = 30_000 ) public void shutdownRecordStorageEngineAfterFailedTransaction() throws Throwable { - RecordStorageEngine engine = - storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ).build(); + RecordStorageEngine engine = buildRecordStorageEngine(); Exception applicationError = executeFailingTransaction( engine ); assertNotNull( applicationError ); } + @Test + public void panicOnExceptionDuringCommandsApply() throws Exception + { + IllegalStateException failure = new IllegalStateException( "Too many open files" ); + RecordStorageEngine engine = storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ) + .databaseHealth( databaseHealth ) + .transactionApplierTransformer( facade -> transactionApplierFacadeTransformer( facade, failure ) ) + .build(); + CommandsToApply commandsToApply = mock( CommandsToApply.class ); + + try + { + engine.apply( commandsToApply, TransactionApplicationMode.INTERNAL ); + fail( "Exception expected" ); + } + catch ( Exception exception ) + { + assertSame( failure, Exceptions.rootCause( exception ) ); + } + + verify( databaseHealth ).panic( any( Throwable.class ) ); + } + + private static BatchTransactionApplierFacade transactionApplierFacadeTransformer( + BatchTransactionApplierFacade facade, Exception failure ) + { + return new FailingBatchTransactionApplierFacade( failure, facade ); + } + @Test public void databasePanicIsRaisedWhenTxApplicationFails() throws Throwable { - DatabaseHealth databaseHealth = mock( DatabaseHealth.class ); - RecordStorageEngine engine = - storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ) - .databaseHealth( databaseHealth ) - .build(); + RecordStorageEngine engine = buildRecordStorageEngine(); Exception applicationError = executeFailingTransaction( engine ); verify( databaseHealth ).panic( applicationError ); } @@ -91,8 +119,7 @@ public void databasePanicIsRaisedWhenTxApplicationFails() throws Throwable @Test( timeout = 30_000 ) public void obtainCountsStoreResetterAfterFailedTransaction() throws Throwable { - RecordStorageEngine engine = - storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ).build(); + RecordStorageEngine engine = buildRecordStorageEngine(); Exception applicationError = executeFailingTransaction( engine ); assertNotNull( applicationError ); @@ -126,6 +153,13 @@ public void flushAndForce( IOLimiter limiter ) throws IOException assertThat( observedLimiter.get(), sameInstance( limiter ) ); } + private RecordStorageEngine buildRecordStorageEngine() + { + return storageEngineRule.getWith( fsRule.get(), pageCacheRule.getPageCache( fsRule.get() ) ) + .databaseHealth( databaseHealth ) + .build(); + } + private Exception executeFailingTransaction( RecordStorageEngine engine ) throws IOException { Exception applicationError = new UnderlyingStorageException( "No space left on device" ); @@ -156,4 +190,22 @@ private static TransactionToApply newTransactionThatFailsWith( Exception error ) txToApply.commitment( commitment, txId ); return txToApply; } + + private static class FailingBatchTransactionApplierFacade extends BatchTransactionApplierFacade + { + private Exception failure; + + FailingBatchTransactionApplierFacade( Exception failure, BatchTransactionApplier... appliers ) + { + super( appliers ); + this.failure = failure; + } + + @Override + public void close() throws Exception + { + throw failure; + } + } + }