Skip to content

Commit

Permalink
Use unique TripCountingRootCatchup for every SeekCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen committed Mar 13, 2019
1 parent 9a8b2f7 commit 8d4d067
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 46 deletions.
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongSet;
Expand Down Expand Up @@ -334,7 +335,7 @@ public void startupState( boolean clean )
/**
* Catchup for {@link SeekCursor} to become aware of new roots since it started.
*/
private final RootCatchup rootCatchup = new TripCountingRootCatchup( () -> root );
private final Supplier<RootCatchup> rootCatchupSupplier = () -> new TripCountingRootCatchup( () -> root );

/**
* Supplier of generation to readers. This supplier will actually very rarely be used, because normally
Expand Down Expand Up @@ -855,7 +856,7 @@ public RawCursor<Hit<KEY,VALUE>,IOException> seek( KEY fromInclusive, KEY toExcl

// Returns cursor which is now initiated with left-most leaf node for the specified range
return new SeekCursor<>( cursor, bTreeNode, fromInclusive, toExclusive, layout,
stableGeneration, unstableGeneration, generationSupplier, rootCatchup, rootGeneration,
stableGeneration, unstableGeneration, generationSupplier, rootCatchupSupplier.get(), rootGeneration,
exceptionDecorator, SeekCursor.DEFAULT_MAX_READ_AHEAD );
}

Expand Down
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -56,6 +57,7 @@
import org.neo4j.cursor.RawCursor;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.helpers.Exceptions;
import org.neo4j.index.internal.gbptree.GBPTree.Monitor;
import org.neo4j.io.pagecache.DelegatingPageCache;
import org.neo4j.io.pagecache.DelegatingPagedFile;
Expand All @@ -74,7 +76,9 @@
import org.neo4j.test.rule.RandomRule;
import org.neo4j.test.rule.TestDirectory;
import org.neo4j.test.rule.fs.DefaultFileSystemRule;
import org.neo4j.util.FeatureToggles;

import static java.lang.Long.MAX_VALUE;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.core.AllOf.allOf;
import static org.junit.Assert.assertArrayEquals;
Expand Down Expand Up @@ -1308,7 +1312,7 @@ public void mustNotSeeUpdatesThatWasNotCheckpointed() throws Exception
try ( GBPTree<MutableLong, MutableLong> index = index().build() )
{
MutableLong from = new MutableLong( Long.MIN_VALUE );
MutableLong to = new MutableLong( Long.MAX_VALUE );
MutableLong to = new MutableLong( MAX_VALUE );
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = index.seek( from, to ) )
{
assertFalse( seek.next() );
Expand All @@ -1334,7 +1338,7 @@ public void mustSeeUpdatesThatWasCheckpointed() throws Exception
try ( GBPTree<MutableLong, MutableLong> index = index().build() )
{
MutableLong from = new MutableLong( Long.MIN_VALUE );
MutableLong to = new MutableLong( Long.MAX_VALUE );
MutableLong to = new MutableLong( MAX_VALUE );
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = index.seek( from, to ) )
{
assertTrue( seek.next() );
Expand Down Expand Up @@ -1594,7 +1598,7 @@ public void shouldThrowIllegalStateExceptionOnCallingNextAfterClose() throws Exc
}

RawCursor<Hit<MutableLong,MutableLong>,IOException> seek =
tree.seek( new MutableLong( 0 ), new MutableLong( Long.MAX_VALUE ) );
tree.seek( new MutableLong( 0 ), new MutableLong( MAX_VALUE ) );
assertTrue( seek.next() );
assertTrue( seek.next() );
seek.close();
Expand Down Expand Up @@ -1626,55 +1630,18 @@ public void mustThrowIfStuckInInfiniteRootCatchup() throws IOException
// When seekCursor comes back to the same corrupt child again and again it should eventually escape from that loop
// with an exception.

// A page cache tracer that we can use to see when tree has seen enough updates and to figure out on which page the child sits.
List<Long> trace = new ArrayList<>();
PageCursorTracer pageCursorTracer = new DefaultPageCursorTracer()
{
@Override
public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swapper )
{
trace.add( filePageId );
return super.beginPin( writeLock, filePageId, swapper );
}
};
PageCursorTracerSupplier pageCursorTracerSupplier = () -> pageCursorTracer;
PageCache pageCache = createPageCache( DEFAULT_PAGE_SIZE, pageCursorTracerSupplier );
PageCache pageCache = pageCacheWithTrace( trace );

// Build a tree with root and two children.
try ( GBPTree<MutableLong,MutableLong> tree = index( pageCache ).build() )
{
// Insert data until we have a split in root
long count = 0;
for ( int i = 0; i < 100; i++ )
{
try ( Writer<MutableLong,MutableLong> writer = tree.writer() )
{
writer.put( new MutableLong( count ), new MutableLong( count ) );
count++;
}
trace.clear();
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) )
{
seek.next();
}
if ( trace.size() > 1 )
{
// Now a child exist
break;
}
}
treeWithRootSplit( trace, tree );
long corruptChild = trace.get( 1 );

// Corrupt the child
long corruptChild = trace.get( trace.size() - 1 );
try ( PagedFile pagedFile = pageCache.map( indexFile, DEFAULT_PAGE_SIZE );
PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next( corruptChild ) );
assertTrue( TreeNode.isLeaf( cursor ) );

// Make child look like freelist node
cursor.putByte( TreeNode.BYTE_POS_NODE_TYPE, TreeNode.NODE_TYPE_FREE_LIST_NODE );
}
corruptTheChild( pageCache, corruptChild );

// when seek end up in this corrupt child we should eventually fail with a tree inconsistency exception
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) )
Expand All @@ -1692,6 +1659,143 @@ public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swappe
}
}

@Test( timeout = 5_000L )
public void mustThrowIfStuckInInfiniteRootCatchupMultipleConcurrentSeekers() throws IOException, InterruptedException
{
FeatureToggles.set( TripCountingRootCatchup.class, TripCountingRootCatchup.MAX_TRIP_COUNT_NAME, 10000 );
try
{
List<Long> trace = new ArrayList<>();
PageCache pageCache = pageCacheWithTrace( trace );

// Build a tree with root and two children.
try ( GBPTree<MutableLong,MutableLong> tree = index( pageCache ).build() )
{
// Insert data until we have a split in root
treeWithRootSplit( trace, tree );
long leftChild = trace.get( 1 );
long rightChild = trace.get( 2 );

// Corrupt the child
corruptTheChild( pageCache, leftChild );
corruptTheChild( pageCache, rightChild );

// When seek end up in this corrupt child we should eventually fail with a tree inconsistency exception
// even if we have multiple seeker that traverse different part of the tree and both get stuck in start from root loop.
ExecutorService executor = Executors.newFixedThreadPool( 2 );
CountDownLatch go = new CountDownLatch( 2 );
Future<Object> execute1 = executor.submit( () ->
{
go.countDown();
go.await();
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) )
{
seek.next();
}
return null;
} );

Future<Object> execute2 = executor.submit( () ->
{
go.countDown();
go.await();
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = tree.seek( new MutableLong( MAX_VALUE ), new MutableLong( MAX_VALUE ) ) )
{
seek.next();
}
return null;
} );

assertFutureFailsWithTreeInconsistencyException( execute1 );
assertFutureFailsWithTreeInconsistencyException( execute2 );
}
}
finally
{
FeatureToggles.clear( TripCountingRootCatchup.class, TripCountingRootCatchup.MAX_TRIP_COUNT_NAME );
}
}

private void assertFutureFailsWithTreeInconsistencyException( Future<Object> execute1 ) throws InterruptedException
{
try
{
execute1.get();
fail( "Expected to fail" );
}
catch ( ExecutionException e )
{
Throwable cause = e.getCause();
if ( !(cause instanceof TreeInconsistencyException) )
{
fail( "Expected cause to be " + TreeInconsistencyException.class + " but was " + Exceptions.stringify( cause ) );
}
}
}

private void corruptTheChild( PageCache pageCache, long corruptChild ) throws IOException
{
try ( PagedFile pagedFile = pageCache.map( indexFile, DEFAULT_PAGE_SIZE );
PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) )
{
assertTrue( cursor.next( corruptChild ) );
assertTrue( TreeNode.isLeaf( cursor ) );

// Make child look like freelist node
cursor.putByte( TreeNode.BYTE_POS_NODE_TYPE, TreeNode.NODE_TYPE_FREE_LIST_NODE );
}
}

/**
* When split is done, trace contain:
* trace.get( 0 ) - root
* trace.get( 1 ) - leftChild
* trace.get( 2 ) - rightChild
*/
private void treeWithRootSplit( List<Long> trace, GBPTree<MutableLong,MutableLong> tree ) throws IOException
{
long count = 0;
do
{
try ( Writer<MutableLong,MutableLong> writer = tree.writer() )
{
writer.put( new MutableLong( count ), new MutableLong( count ) );
count++;
}
trace.clear();
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) )
{
seek.next();
}
}
while ( trace.size() <= 1 );

trace.clear();
try ( RawCursor<Hit<MutableLong,MutableLong>,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( MAX_VALUE ) ) )
{
//noinspection StatementWithEmptyBody
while ( seek.next() )
{
}
}
}

private PageCache pageCacheWithTrace( List<Long> trace )
{
// A page cache tracer that we can use to see when tree has seen enough updates and to figure out on which page the child sits.Trace( trace );
PageCursorTracer pageCursorTracer = new DefaultPageCursorTracer()
{
@Override
public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swapper )
{
trace.add( filePageId );
return super.beginPin( writeLock, filePageId, swapper );
}
};
PageCursorTracerSupplier pageCursorTracerSupplier = () -> pageCursorTracer;
return createPageCache( DEFAULT_PAGE_SIZE, pageCursorTracerSupplier );
}

private static class ControlledRecoveryCleanupWorkCollector extends RecoveryCleanupWorkCollector
{
Queue<CleanupJob> jobs = new LinkedList<>();
Expand Down

0 comments on commit 8d4d067

Please sign in to comment.