From d227b8ffc622c457c5e6fc4854bb1b24cc20c008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Wed, 3 Apr 2019 12:10:19 +0200 Subject: [PATCH] BlockBasedIndexPopulator properly handles being cancelled When cancelling merge of scan updates the scanComplete() method would then continue to write scan updates and end up failing because the single-block invariant would be broken. This can only happen when merge is aborted and therefore is merge is aborted so must the whole method be. --- .../schema/BlockBasedIndexPopulator.java | 5 ++ .../schema/BlockBasedIndexPopulatorTest.java | 51 ++++++++++++++++--- 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java index 8bcbc6c2287f0..59c6b2d8c0aa1 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulator.java @@ -247,6 +247,11 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict // don't merge and sort the external updates // Build the tree from the scan updates + if ( cancellation.cancelled() ) + { + // Do one additional check before starting to write to the tree + return; + } phaseTracker.enterPhase( PhaseTracker.Phase.BUILD ); File duplicatesFile = new File( storeFile.getParentFile(), storeFile.getName() + ".dup" ); try ( IndexKeyStorage indexKeyStorage = new IndexKeyStorage<>( fileSystem, duplicatesFile, bufferFactory, blockSize, layout ) ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java index f9e37cbf1c3b6..e2ceeec960225 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/BlockBasedIndexPopulatorTest.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.Future; +import java.util.function.LongPredicate; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.internal.kernel.api.schema.IndexProviderDescriptor; @@ -96,7 +97,7 @@ public void setup() public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Exception { // given - TrappingMonitor monitor = new TrappingMonitor( false ); + TrappingMonitor monitor = new TrappingMonitor( ignore -> false ); BlockBasedIndexPopulator populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); boolean closed = false; try @@ -126,11 +127,47 @@ public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Ex } } + @Test + public void shouldHandleBeingAbortedWhileMerging() throws Exception + { + // given + TrappingMonitor monitor = new TrappingMonitor( numberOfBlocks -> numberOfBlocks == 2 ); + BlockBasedIndexPopulator populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); + boolean closed = false; + try + { + populator.add( batchOfUpdates() ); + + // when starting to merge (in a separate thread) + Future mergeFuture = t2.execute( command( () -> populator.scanCompleted( nullInstance ) ) ); + // and waiting for merge to get going + monitor.barrier.await(); + monitor.barrier.release(); + monitor.mergeFinishedBarrier.awaitUninterruptibly(); + // calling close here should wait for the merge future, so that checking the merge future for "done" immediately afterwards must say true + Future closeFuture = t3.execute( command( () -> populator.close( false ) ) ); + t3.get().waitUntilWaiting(); + monitor.mergeFinishedBarrier.release(); + closeFuture.get(); + closed = true; + + // then let's make sure scanComplete was cancelled, not throwing exception or anything. + mergeFuture.get(); + } + finally + { + if ( !closed ) + { + populator.close( false ); + } + } + } + @Test public void shouldReportAccurateProgressThroughoutThePhases() throws Exception { // given - TrappingMonitor monitor = new TrappingMonitor( true ); + TrappingMonitor monitor = new TrappingMonitor( numberOfBlocks -> numberOfBlocks == 1 ); BlockBasedIndexPopulator populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); try { @@ -189,7 +226,7 @@ public void shouldCorrectlyDecideToAwaitMergeDependingOnProgress() throws Throwa public void shouldDeleteDirectoryOnDrop() throws Exception { // given - TrappingMonitor monitor = new TrappingMonitor( false ); + TrappingMonitor monitor = new TrappingMonitor( ignore -> false ); BlockBasedIndexPopulator populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); boolean closed = false; try @@ -341,11 +378,11 @@ private static class TrappingMonitor extends BlockStorage.Monitor.Adapter { private final Barrier.Control barrier = new Barrier.Control(); private final Barrier.Control mergeFinishedBarrier = new Barrier.Control(); - private final boolean alsoTrapAfterMergeCompleted; + private final LongPredicate trapForMergeIterationFinished; - TrappingMonitor( boolean alsoTrapAfterMergeCompleted ) + TrappingMonitor( LongPredicate trapForMergeIterationFinished ) { - this.alsoTrapAfterMergeCompleted = alsoTrapAfterMergeCompleted; + this.trapForMergeIterationFinished = trapForMergeIterationFinished; } @Override @@ -357,7 +394,7 @@ public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, lon @Override public void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter ) { - if ( numberOfBlocksAfter == 1 && alsoTrapAfterMergeCompleted ) + if ( trapForMergeIterationFinished.test( numberOfBlocksAfter ) ) { mergeFinishedBarrier.reached(); }