Skip to content

Commit

Permalink
BlockBasedIndexPopulator properly handles being cancelled
Browse files Browse the repository at this point in the history
When cancelling merge of scan updates the scanComplete() method would
then continue to write scan updates and end up failing because
the single-block invariant would be broken. This can only happen
when merge is aborted and therefore is merge is aborted so must
the whole method be.
  • Loading branch information
tinwelint committed Apr 16, 2019
1 parent 3224431 commit d227b8f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
Expand Up @@ -247,6 +247,11 @@ public void scanCompleted( PhaseTracker phaseTracker ) throws IndexEntryConflict
// don't merge and sort the external updates // don't merge and sort the external updates


// Build the tree from the scan updates // Build the tree from the scan updates
if ( cancellation.cancelled() )
{
// Do one additional check before starting to write to the tree
return;
}
phaseTracker.enterPhase( PhaseTracker.Phase.BUILD ); phaseTracker.enterPhase( PhaseTracker.Phase.BUILD );
File duplicatesFile = new File( storeFile.getParentFile(), storeFile.getName() + ".dup" ); File duplicatesFile = new File( storeFile.getParentFile(), storeFile.getName() + ".dup" );
try ( IndexKeyStorage<KEY> indexKeyStorage = new IndexKeyStorage<>( fileSystem, duplicatesFile, bufferFactory, blockSize, layout ) ) try ( IndexKeyStorage<KEY> indexKeyStorage = new IndexKeyStorage<>( fileSystem, duplicatesFile, bufferFactory, blockSize, layout ) )
Expand Down
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.function.LongPredicate;


import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.internal.kernel.api.schema.IndexProviderDescriptor; import org.neo4j.internal.kernel.api.schema.IndexProviderDescriptor;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void setup()
public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Exception public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Exception
{ {
// given // given
TrappingMonitor monitor = new TrappingMonitor( false ); TrappingMonitor monitor = new TrappingMonitor( ignore -> false );
BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() );
boolean closed = false; boolean closed = false;
try try
Expand Down Expand Up @@ -126,11 +127,47 @@ public void shouldAwaitMergeToBeFullyAbortedBeforeLeavingCloseMethod() throws Ex
} }
} }


@Test
public void shouldHandleBeingAbortedWhileMerging() throws Exception
{
// given
TrappingMonitor monitor = new TrappingMonitor( numberOfBlocks -> numberOfBlocks == 2 );
BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() );
boolean closed = false;
try
{
populator.add( batchOfUpdates() );

// when starting to merge (in a separate thread)
Future<Object> mergeFuture = t2.execute( command( () -> populator.scanCompleted( nullInstance ) ) );
// and waiting for merge to get going
monitor.barrier.await();
monitor.barrier.release();
monitor.mergeFinishedBarrier.awaitUninterruptibly();
// calling close here should wait for the merge future, so that checking the merge future for "done" immediately afterwards must say true
Future<Object> closeFuture = t3.execute( command( () -> populator.close( false ) ) );
t3.get().waitUntilWaiting();
monitor.mergeFinishedBarrier.release();
closeFuture.get();
closed = true;

// then let's make sure scanComplete was cancelled, not throwing exception or anything.
mergeFuture.get();
}
finally
{
if ( !closed )
{
populator.close( false );
}
}
}

@Test @Test
public void shouldReportAccurateProgressThroughoutThePhases() throws Exception public void shouldReportAccurateProgressThroughoutThePhases() throws Exception
{ {
// given // given
TrappingMonitor monitor = new TrappingMonitor( true ); TrappingMonitor monitor = new TrappingMonitor( numberOfBlocks -> numberOfBlocks == 1 );
BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() );
try try
{ {
Expand Down Expand Up @@ -189,7 +226,7 @@ public void shouldCorrectlyDecideToAwaitMergeDependingOnProgress() throws Throwa
public void shouldDeleteDirectoryOnDrop() throws Exception public void shouldDeleteDirectoryOnDrop() throws Exception
{ {
// given // given
TrappingMonitor monitor = new TrappingMonitor( false ); TrappingMonitor monitor = new TrappingMonitor( ignore -> false );
BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() ); BlockBasedIndexPopulator<GenericKey,NativeIndexValue> populator = instantiatePopulator( monitor, new LocalMemoryTracker() );
boolean closed = false; boolean closed = false;
try try
Expand Down Expand Up @@ -341,11 +378,11 @@ private static class TrappingMonitor extends BlockStorage.Monitor.Adapter
{ {
private final Barrier.Control barrier = new Barrier.Control(); private final Barrier.Control barrier = new Barrier.Control();
private final Barrier.Control mergeFinishedBarrier = new Barrier.Control(); private final Barrier.Control mergeFinishedBarrier = new Barrier.Control();
private final boolean alsoTrapAfterMergeCompleted; private final LongPredicate trapForMergeIterationFinished;


TrappingMonitor( boolean alsoTrapAfterMergeCompleted ) TrappingMonitor( LongPredicate trapForMergeIterationFinished )
{ {
this.alsoTrapAfterMergeCompleted = alsoTrapAfterMergeCompleted; this.trapForMergeIterationFinished = trapForMergeIterationFinished;
} }


@Override @Override
Expand All @@ -357,7 +394,7 @@ public void mergedBlocks( long resultingBlockSize, long resultingEntryCount, lon
@Override @Override
public void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter ) public void mergeIterationFinished( long numberOfBlocksBefore, long numberOfBlocksAfter )
{ {
if ( numberOfBlocksAfter == 1 && alsoTrapAfterMergeCompleted ) if ( trapForMergeIterationFinished.test( numberOfBlocksAfter ) )
{ {
mergeFinishedBarrier.reached(); mergeFinishedBarrier.reached();
} }
Expand Down

0 comments on commit d227b8f

Please sign in to comment.