From 8d4d067022e8e48909b70446f8895d2990c9d51f Mon Sep 17 00:00:00 2001 From: Anton Persson Date: Tue, 12 Mar 2019 11:41:08 +0100 Subject: [PATCH] Use unique TripCountingRootCatchup for every SeekCursor --- .../neo4j/index/internal/gbptree/GBPTree.java | 5 +- .../index/internal/gbptree/GBPTreeTest.java | 192 ++++++++++++++---- 2 files changed, 151 insertions(+), 46 deletions(-) diff --git a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java index 3c7d0ef269cc2..22fca57695ed4 100644 --- a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java +++ b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.function.Supplier; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveLongSet; @@ -334,7 +335,7 @@ public void startupState( boolean clean ) /** * Catchup for {@link SeekCursor} to become aware of new roots since it started. */ - private final RootCatchup rootCatchup = new TripCountingRootCatchup( () -> root ); + private final Supplier rootCatchupSupplier = () -> new TripCountingRootCatchup( () -> root ); /** * Supplier of generation to readers. This supplier will actually very rarely be used, because normally @@ -855,7 +856,7 @@ public RawCursor,IOException> seek( KEY fromInclusive, KEY toExcl // Returns cursor which is now initiated with left-most leaf node for the specified range return new SeekCursor<>( cursor, bTreeNode, fromInclusive, toExclusive, layout, - stableGeneration, unstableGeneration, generationSupplier, rootCatchup, rootGeneration, + stableGeneration, unstableGeneration, generationSupplier, rootCatchupSupplier.get(), rootGeneration, exceptionDecorator, SeekCursor.DEFAULT_MAX_READ_AHEAD ); } diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java index 7c012ecbcfabe..67b9bfca0dc24 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,6 +57,7 @@ import org.neo4j.cursor.RawCursor; import org.neo4j.function.ThrowingConsumer; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.helpers.Exceptions; import org.neo4j.index.internal.gbptree.GBPTree.Monitor; import org.neo4j.io.pagecache.DelegatingPageCache; import org.neo4j.io.pagecache.DelegatingPagedFile; @@ -74,7 +76,9 @@ import org.neo4j.test.rule.RandomRule; import org.neo4j.test.rule.TestDirectory; import org.neo4j.test.rule.fs.DefaultFileSystemRule; +import org.neo4j.util.FeatureToggles; +import static java.lang.Long.MAX_VALUE; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.core.AllOf.allOf; import static org.junit.Assert.assertArrayEquals; @@ -1308,7 +1312,7 @@ public void mustNotSeeUpdatesThatWasNotCheckpointed() throws Exception try ( GBPTree index = index().build() ) { MutableLong from = new MutableLong( Long.MIN_VALUE ); - MutableLong to = new MutableLong( Long.MAX_VALUE ); + MutableLong to = new MutableLong( MAX_VALUE ); try ( RawCursor,IOException> seek = index.seek( from, to ) ) { assertFalse( seek.next() ); @@ -1334,7 +1338,7 @@ public void mustSeeUpdatesThatWasCheckpointed() throws Exception try ( GBPTree index = index().build() ) { MutableLong from = new MutableLong( Long.MIN_VALUE ); - MutableLong to = new MutableLong( Long.MAX_VALUE ); + MutableLong to = new MutableLong( MAX_VALUE ); try ( RawCursor,IOException> seek = index.seek( from, to ) ) { assertTrue( seek.next() ); @@ -1594,7 +1598,7 @@ public void shouldThrowIllegalStateExceptionOnCallingNextAfterClose() throws Exc } RawCursor,IOException> seek = - tree.seek( new MutableLong( 0 ), new MutableLong( Long.MAX_VALUE ) ); + tree.seek( new MutableLong( 0 ), new MutableLong( MAX_VALUE ) ); assertTrue( seek.next() ); assertTrue( seek.next() ); seek.close(); @@ -1626,55 +1630,18 @@ public void mustThrowIfStuckInInfiniteRootCatchup() throws IOException // When seekCursor comes back to the same corrupt child again and again it should eventually escape from that loop // with an exception. - // A page cache tracer that we can use to see when tree has seen enough updates and to figure out on which page the child sits. List trace = new ArrayList<>(); - PageCursorTracer pageCursorTracer = new DefaultPageCursorTracer() - { - @Override - public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swapper ) - { - trace.add( filePageId ); - return super.beginPin( writeLock, filePageId, swapper ); - } - }; - PageCursorTracerSupplier pageCursorTracerSupplier = () -> pageCursorTracer; - PageCache pageCache = createPageCache( DEFAULT_PAGE_SIZE, pageCursorTracerSupplier ); + PageCache pageCache = pageCacheWithTrace( trace ); // Build a tree with root and two children. try ( GBPTree tree = index( pageCache ).build() ) { // Insert data until we have a split in root - long count = 0; - for ( int i = 0; i < 100; i++ ) - { - try ( Writer writer = tree.writer() ) - { - writer.put( new MutableLong( count ), new MutableLong( count ) ); - count++; - } - trace.clear(); - try ( RawCursor,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) ) - { - seek.next(); - } - if ( trace.size() > 1 ) - { - // Now a child exist - break; - } - } + treeWithRootSplit( trace, tree ); + long corruptChild = trace.get( 1 ); // Corrupt the child - long corruptChild = trace.get( trace.size() - 1 ); - try ( PagedFile pagedFile = pageCache.map( indexFile, DEFAULT_PAGE_SIZE ); - PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) - { - assertTrue( cursor.next( corruptChild ) ); - assertTrue( TreeNode.isLeaf( cursor ) ); - - // Make child look like freelist node - cursor.putByte( TreeNode.BYTE_POS_NODE_TYPE, TreeNode.NODE_TYPE_FREE_LIST_NODE ); - } + corruptTheChild( pageCache, corruptChild ); // when seek end up in this corrupt child we should eventually fail with a tree inconsistency exception try ( RawCursor,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) ) @@ -1692,6 +1659,143 @@ public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swappe } } + @Test( timeout = 5_000L ) + public void mustThrowIfStuckInInfiniteRootCatchupMultipleConcurrentSeekers() throws IOException, InterruptedException + { + FeatureToggles.set( TripCountingRootCatchup.class, TripCountingRootCatchup.MAX_TRIP_COUNT_NAME, 10000 ); + try + { + List trace = new ArrayList<>(); + PageCache pageCache = pageCacheWithTrace( trace ); + + // Build a tree with root and two children. + try ( GBPTree tree = index( pageCache ).build() ) + { + // Insert data until we have a split in root + treeWithRootSplit( trace, tree ); + long leftChild = trace.get( 1 ); + long rightChild = trace.get( 2 ); + + // Corrupt the child + corruptTheChild( pageCache, leftChild ); + corruptTheChild( pageCache, rightChild ); + + // When seek end up in this corrupt child we should eventually fail with a tree inconsistency exception + // even if we have multiple seeker that traverse different part of the tree and both get stuck in start from root loop. + ExecutorService executor = Executors.newFixedThreadPool( 2 ); + CountDownLatch go = new CountDownLatch( 2 ); + Future execute1 = executor.submit( () -> + { + go.countDown(); + go.await(); + try ( RawCursor,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) ) + { + seek.next(); + } + return null; + } ); + + Future execute2 = executor.submit( () -> + { + go.countDown(); + go.await(); + try ( RawCursor,IOException> seek = tree.seek( new MutableLong( MAX_VALUE ), new MutableLong( MAX_VALUE ) ) ) + { + seek.next(); + } + return null; + } ); + + assertFutureFailsWithTreeInconsistencyException( execute1 ); + assertFutureFailsWithTreeInconsistencyException( execute2 ); + } + } + finally + { + FeatureToggles.clear( TripCountingRootCatchup.class, TripCountingRootCatchup.MAX_TRIP_COUNT_NAME ); + } + } + + private void assertFutureFailsWithTreeInconsistencyException( Future execute1 ) throws InterruptedException + { + try + { + execute1.get(); + fail( "Expected to fail" ); + } + catch ( ExecutionException e ) + { + Throwable cause = e.getCause(); + if ( !(cause instanceof TreeInconsistencyException) ) + { + fail( "Expected cause to be " + TreeInconsistencyException.class + " but was " + Exceptions.stringify( cause ) ); + } + } + } + + private void corruptTheChild( PageCache pageCache, long corruptChild ) throws IOException + { + try ( PagedFile pagedFile = pageCache.map( indexFile, DEFAULT_PAGE_SIZE ); + PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) + { + assertTrue( cursor.next( corruptChild ) ); + assertTrue( TreeNode.isLeaf( cursor ) ); + + // Make child look like freelist node + cursor.putByte( TreeNode.BYTE_POS_NODE_TYPE, TreeNode.NODE_TYPE_FREE_LIST_NODE ); + } + } + + /** + * When split is done, trace contain: + * trace.get( 0 ) - root + * trace.get( 1 ) - leftChild + * trace.get( 2 ) - rightChild + */ + private void treeWithRootSplit( List trace, GBPTree tree ) throws IOException + { + long count = 0; + do + { + try ( Writer writer = tree.writer() ) + { + writer.put( new MutableLong( count ), new MutableLong( count ) ); + count++; + } + trace.clear(); + try ( RawCursor,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( 0 ) ) ) + { + seek.next(); + } + } + while ( trace.size() <= 1 ); + + trace.clear(); + try ( RawCursor,IOException> seek = tree.seek( new MutableLong( 0 ), new MutableLong( MAX_VALUE ) ) ) + { + //noinspection StatementWithEmptyBody + while ( seek.next() ) + { + } + } + } + + private PageCache pageCacheWithTrace( List trace ) + { + // A page cache tracer that we can use to see when tree has seen enough updates and to figure out on which page the child sits.Trace( trace ); + PageCursorTracer pageCursorTracer = new DefaultPageCursorTracer() + { + @Override + public PinEvent beginPin( boolean writeLock, long filePageId, PageSwapper swapper ) + { + trace.add( filePageId ); + return super.beginPin( writeLock, filePageId, swapper ); + } + }; + PageCursorTracerSupplier pageCursorTracerSupplier = () -> pageCursorTracer; + return createPageCache( DEFAULT_PAGE_SIZE, pageCursorTracerSupplier ); + } + private static class ControlledRecoveryCleanupWorkCollector extends RecoveryCleanupWorkCollector { Queue jobs = new LinkedList<>();