From d3097917134727b3f39160d77bdddf7c5c25c032 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 14 Jan 2016 12:01:34 +0100 Subject: [PATCH] Add a randomised test that opens multiple page cursors --- .../neo4j/io/pagecache/PageCacheSlowTest.java | 224 +++++++++++++----- 1 file changed, 169 insertions(+), 55 deletions(-) diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java index e04afe7b7b529..c764a50767c1f 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java @@ -55,7 +55,57 @@ public abstract class PageCacheSlowTest extends PageCacheTestSupport { - @RepeatRule.Repeat( times = 1000 ) + private static class UpdateResult + { + final int threadId; + final int[] pageCounts; + + UpdateResult( int threadId, int[] pageCounts ) + { + this.threadId = threadId; + this.pageCounts = pageCounts; + } + } + + private static abstract class UpdateWorker implements Callable + { + final int threadId; + final int filePages; + final AtomicBoolean shouldStop; + final PagedFile pagedFile; + final int[] pageCounts; + final int offset; + + UpdateWorker( int threadId, int filePages, AtomicBoolean shouldStop, PagedFile pagedFile ) + { + this.threadId = threadId; + this.filePages = filePages; + this.shouldStop = shouldStop; + this.pagedFile = pagedFile; + pageCounts = new int[filePages]; + offset = threadId * 4; + } + + @Override + public UpdateResult call() throws Exception + { + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + while ( !shouldStop.get() ) + { + boolean updateCounter = rng.nextBoolean(); + int pf_flags = updateCounter ? PF_SHARED_WRITE_LOCK : PF_SHARED_READ_LOCK; + performReadOrUpdate( rng, updateCounter, pf_flags ); + } + + return new UpdateResult( threadId, pageCounts ); + } + + protected abstract void performReadOrUpdate( ThreadLocalRandom rng, boolean updateCounter, + int pf_flags ) throws IOException; + } + + @RepeatRule.Repeat( times = 250 ) @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) public void mustNotLoseUpdates() throws Exception { @@ -89,53 +139,17 @@ public void mustNotLoseUpdates() throws Exception getPageCache( fs, cachePages, pageSize, PageCacheTracer.NULL ); final PagedFile pagedFile = pageCache.map( file( "a" ), pageSize ); - // Ensure all the pages exist - try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) - { - for ( int i = 0; i < filePages; i++ ) - { - assertTrue( "failed to initialise file page " + i, cursor.next() ); - for ( int j = 0; j < pageSize; j++ ) - { - cursor.putByte( (byte) 0 ); - } - } - } - pageCache.flushAndForce(); - - class Result - { - final int threadId; - final int[] pageCounts; - - Result( int threadId, int[] pageCounts ) - { - this.threadId = threadId; - this.pageCounts = pageCounts; - } - } + ensureAllPagesExists( filePages, pagedFile ); - class Worker implements Callable + List> futures = new ArrayList<>(); + for ( int i = 0; i < threadCount; i++ ) { - final int threadId; - - Worker( int threadId ) + UpdateWorker worker = new UpdateWorker( i, filePages, shouldStop, pagedFile ) { - this.threadId = threadId; - } - - @Override - public Result call() throws Exception - { - int[] pageCounts = new int[filePages]; - ThreadLocalRandom rng = ThreadLocalRandom.current(); - - while ( !shouldStop.get() ) + protected void performReadOrUpdate( + ThreadLocalRandom rng, boolean updateCounter, int pf_flags ) throws IOException { int pageId = rng.nextInt( 0, filePages ); - int offset = threadId * 4; - boolean updateCounter = rng.nextBoolean(); - int pf_flags = updateCounter ? PF_SHARED_WRITE_LOCK : PF_SHARED_READ_LOCK; try ( PageCursor cursor = pagedFile.io( pageId, pf_flags ) ) { int counter; @@ -150,7 +164,9 @@ public Result call() throws Exception while ( cursor.shouldRetry() ); String lockName = updateCounter ? "PF_SHARED_WRITE_LOCK" : "PF_SHARED_READ_LOCK"; assertThat( "inconsistent page read from filePageId = " + pageId + ", with " + lockName + - ", workerId = " + threadId + " [t:" + Thread.currentThread().getId() + "]", + ", workerId = " + + threadId + " [t:" + Thread + .currentThread().getId() + "]", counter, is( pageCounts[pageId] ) ); } catch ( Throwable throwable ) @@ -167,23 +183,36 @@ public Result call() throws Exception } } } - - return new Result( threadId, pageCounts ); - } + }; + futures.add( executor.submit( worker ) ); } - List> futures = new ArrayList<>(); - for ( int i = 0; i < threadCount; i++ ) + Thread.sleep( 40 ); + shouldStop.set( true ); + + verifyUpdateResults( filePages, pagedFile, futures ); + pagedFile.close(); + } + + private void ensureAllPagesExists( int filePages, PagedFile pagedFile ) throws IOException + { + try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_WRITE_LOCK ) ) { - futures.add( executor.submit( new Worker( i ) ) ); + for ( int i = 0; i < filePages; i++ ) + { + assertTrue( "failed to initialise file page " + i, cursor.next() ); + } } + pageCache.flushAndForce(); + } - Thread.sleep( 10 ); - shouldStop.set( true ); - - for ( Future future : futures ) + private void verifyUpdateResults( int filePages, PagedFile pagedFile, + List> futures ) + throws InterruptedException, ExecutionException, IOException + { + for ( Future future : futures ) { - Result result = future.get(); + UpdateResult result = future.get(); try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_READ_LOCK ) ) { for ( int i = 0; i < filePages; i++ ) @@ -205,6 +234,91 @@ public Result call() throws Exception } } } + } + + @RepeatRule.Repeat( times = 250 ) + @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) + public void mustNotLoseUpdatesWhenOpeningMultiplePageCursorsPerThread() throws Exception + { + // Similar to the test above, except the threads will have multiple page cursors opened at a time. + + final AtomicBoolean shouldStop = new AtomicBoolean(); + final int cachePages = 20; + final int filePages = cachePages * 2; + final int threadCount = 8; + final int pageSize = threadCount * 4; + + getPageCache( fs, cachePages, pageSize, PageCacheTracer.NULL ); + final PagedFile pagedFile = pageCache.map( file( "a" ), pageSize ); + + ensureAllPagesExists( filePages, pagedFile ); + + List> futures = new ArrayList<>(); + for ( int i = 0; i < threadCount; i++ ) + { + UpdateWorker worker = new UpdateWorker( i, filePages, shouldStop, pagedFile ) + { + protected void performReadOrUpdate( + ThreadLocalRandom rng, boolean updateCounter, int pf_flags ) throws IOException + { + try + { + int pageCount = rng.nextInt( 1, filePages / 10 ); + int[] pageIds = new int[pageCount]; + for ( int j = 0; j < pageCount; j++ ) + { + pageIds[j] = rng.nextInt( 0, filePages ); + } + PageCursor[] cursors = new PageCursor[pageCount]; + for ( int j = 0; j < pageCount; j++ ) + { + cursors[j] = pagedFile.io( pageIds[j], pf_flags ); + assertTrue( cursors[j].next() ); + } + for ( int j = 0; j < pageCount; j++ ) + { + int pageId = pageIds[j]; + PageCursor cursor = cursors[j]; + int counter; + do + { + cursor.setOffset( offset ); + counter = cursor.getInt(); + } + while ( cursor.shouldRetry() ); + String lockName = updateCounter ? "PF_SHARED_WRITE_LOCK" : "PF_SHARED_READ_LOCK"; + assertThat( "inconsistent page read from filePageId = " + pageId + ", with " + lockName + + ", workerId = " + + threadId + " [t:" + Thread + .currentThread().getId() + "]", + counter, is( pageCounts[pageId] ) ); + if ( updateCounter ) + { + counter++; + pageCounts[pageId]++; + cursor.setOffset( offset ); + cursor.putInt( counter ); + } + } + for ( PageCursor cursor : cursors ) + { + cursor.close(); + } + } + catch ( Throwable throwable ) + { + shouldStop.set( true ); + throw throwable; + } + } + }; + futures.add( executor.submit( worker ) ); + } + + Thread.sleep( 40 ); + shouldStop.set( true ); + + verifyUpdateResults( filePages, pagedFile, futures ); pagedFile.close(); }