From 257cca2d97ad44cc0ab25cfe1dd416ec6b867bf2 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 20 Nov 2015 23:04:15 +0100 Subject: [PATCH] Speed up running the tests of the IO module from Maven The `mvn test` command, when run in the IO module, now takes 15 seconds instead of 5 minutes. This has been achieved by moving many of the tests to the verification phase instead of the test phase, by running them with the failsafe plugin instead of surefire. Also, the tests are now executed in parallel. --- community/io/pom.xml | 7 + .../io/fs/DefaultFileSystemAbstraction.java | 6 +- ...FileSystemAbstractionInterruptionTest.java | 8 +- .../neo4j/io/pagecache/PageCacheSlowTest.java | 449 ++++++++++++ .../org/neo4j/io/pagecache/PageCacheTest.java | 683 +----------------- .../io/pagecache/PageCacheTestSupport.java | 336 +++++++++ ...apperWithAdversarialFileDispatcherIT.java} | 2 +- ...eFilePageSwapperWithRealFileSystemIT.java} | 2 +- .../impl/muninn/MuninnPageCacheFixture.java | 50 ++ .../impl/muninn/MuninnPageCacheSlowIT.java | 31 + ...owTestWithAdversarialFileDispatcherIT.java | 45 ++ ...ageCacheSlowTestWithRealFileSystemIT.java} | 17 +- .../impl/muninn/MuninnPageCacheTest.java | 26 +- ...CacheWithAdversarialFileDispatcherIT.java} | 2 +- .../MuninnPageCacheWithRealFileSystemIT.java | 39 + 15 files changed, 982 insertions(+), 721 deletions(-) create mode 100644 community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java create mode 100644 community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTestSupport.java rename community/io/src/test/java/org/neo4j/io/pagecache/impl/{SingleFilePageSwapperWithAdversarialFileDispatcherTest.java => SingleFilePageSwapperWithAdversarialFileDispatcherIT.java} (92%) rename community/io/src/test/java/org/neo4j/io/pagecache/impl/{SingleFilePageSwapperWithRealFileSystemTest.java => SingleFilePageSwapperWithRealFileSystemIT.java} (93%) create mode 100644 community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheFixture.java create mode 100644 community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowIT.java create mode 100644 community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithAdversarialFileDispatcherIT.java rename community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/{MuninnPageCacheWithRealFileSystemTest.java => MuninnPageCacheSlowTestWithRealFileSystemIT.java} (74%) rename community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/{MuninnPageCacheWithAdversarialFileDispatcherTest.java => MuninnPageCacheWithAdversarialFileDispatcherIT.java} (95%) create mode 100644 community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemIT.java diff --git a/community/io/pom.xml b/community/io/pom.xml index 853e01c83878..f8e010d23678 100644 --- a/community/io/pom.xml +++ b/community/io/pom.xml @@ -120,6 +120,13 @@ the relevant Commercial Agreement. + + maven-surefire-plugin + + methods + 2 + + diff --git a/community/io/src/main/java/org/neo4j/io/fs/DefaultFileSystemAbstraction.java b/community/io/src/main/java/org/neo4j/io/fs/DefaultFileSystemAbstraction.java index a9698b206bbe..a1928a5a9733 100644 --- a/community/io/src/main/java/org/neo4j/io/fs/DefaultFileSystemAbstraction.java +++ b/community/io/src/main/java/org/neo4j/io/fs/DefaultFileSystemAbstraction.java @@ -94,14 +94,14 @@ public boolean mkdir( File fileName ) @Override public void mkdirs( File path ) throws IOException { - if (path.exists()) + if ( path.exists() ) { return; } - boolean directoriesWereCreated = path.mkdirs(); + path.mkdirs(); - if (directoriesWereCreated) + if ( path.exists() ) { return; } diff --git a/community/io/src/test/java/org/neo4j/io/fs/FileSystemAbstractionInterruptionTest.java b/community/io/src/test/java/org/neo4j/io/fs/FileSystemAbstractionInterruptionTest.java index df14cd103724..d12a4d2a4858 100644 --- a/community/io/src/test/java/org/neo4j/io/fs/FileSystemAbstractionInterruptionTest.java +++ b/community/io/src/test/java/org/neo4j/io/fs/FileSystemAbstractionInterruptionTest.java @@ -84,20 +84,16 @@ public FileSystemAbstractionInterruptionTest( @SuppressWarnings( "UnusedParamete fs = factory.newInstance(); } - @Before - public void interruptPriorToCall() - { - Thread.currentThread().interrupt(); - } - @Before public void createWorkingDirectoryAndTestFile() throws IOException { + Thread.interrupted(); fs.mkdirs( testdir.directory() ); file = testdir.file( "a" ); fs.create( file ).close(); channel = null; channelShouldBeClosed = false; + Thread.currentThread().interrupt(); } private StoreChannel channel; diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java new file mode 100644 index 000000000000..cefcfa262b82 --- /dev/null +++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheSlowTest.java @@ -0,0 +1,449 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.neo4j.adversaries.RandomAdversary; +import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.pagecache.tracing.PageCacheTracer; +import org.neo4j.test.LinearHistoryPageCacheTracer; +import org.neo4j.test.RepeatRule; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.neo4j.io.pagecache.PagedFile.PF_EXCLUSIVE_LOCK; +import static org.neo4j.io.pagecache.PagedFile.PF_SHARED_LOCK; +import static org.neo4j.test.ByteArrayMatcher.byteArray; + +public abstract class PageCacheSlowTest extends PageCacheTestSupport +{ + @RepeatRule.Repeat( times = 1000 ) + @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) + public void mustNotLoseUpdates() throws Exception + { + // Another test that tries to squeeze out data race bugs. The idea is + // the following: + // We have a number of threads that are going to perform one of two + // operations on randomly chosen pages. The first operation is this: + // They are going to pin a random page, and then scan through it to + // find a record that is their own. A record has a thread-id and a + // counter, both 32-bit integers. If the record is not found, it will + // be added after all the other existing records on that page, if any. + // The last 32-bit word on a page is a sum of all the counters, and it + // will be updated. Then it will verify that the sum matches the + // counters. + // The second operation is read-only, where only the verification is + // performed. + // The kicker is this: the threads will also keep track of which of + // their counters on what pages are at what value, by maintaining + // mirror counters in memory. The threads will continuously check if + // these stay in sync with the data on the page cache. If they go out + // of sync, then we have a data race bug where we either pin the wrong + // pages or somehow lose updates to the pages. + // This is somewhat similar to what the PageCacheStressTest does. + + final AtomicBoolean shouldStop = new AtomicBoolean(); + final int cachePages = 20; + final int filePages = cachePages * 2; + final int threadCount = 8; + final int pageSize = threadCount * 4; + + getPageCache( fs, cachePages, pageSize, PageCacheTracer.NULL ); + final PagedFile pagedFile = pageCache.map( file( "a" ), pageSize ); + + // Ensure all the pages exist + try ( PageCursor cursor = pagedFile.io( 0, PF_EXCLUSIVE_LOCK ) ) + { + for ( int i = 0; i < filePages; i++ ) + { + assertTrue( "failed to initialise file page " + i, cursor.next() ); + for ( int j = 0; j < pageSize; j++ ) + { + cursor.putByte( (byte) 0 ); + } + } + } + pageCache.flushAndForce(); + + class Result + { + final int threadId; + final int[] pageCounts; + + Result( int threadId, int[] pageCounts ) + { + this.threadId = threadId; + this.pageCounts = pageCounts; + } + } + + class Worker implements Callable + { + final int threadId; + + Worker( int threadId ) + { + this.threadId = threadId; + } + + @Override + public Result call() throws Exception + { + int[] pageCounts = new int[filePages]; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + while ( !shouldStop.get() ) + { + int pageId = rng.nextInt( 0, filePages ); + int offset = threadId * 4; + boolean updateCounter = rng.nextBoolean(); + int pf_flags = updateCounter? PF_EXCLUSIVE_LOCK : PF_SHARED_LOCK; + try ( PageCursor cursor = pagedFile.io( pageId, pf_flags ) ) + { + int counter; + try + { + assertTrue( cursor.next() ); + do + { + cursor.setOffset( offset ); + counter = cursor.getInt(); + } + while ( cursor.shouldRetry() ); + String lockName = updateCounter ? "PF_EXCLUSIVE_LOCK" : "PF_SHARED_LOCK"; + assertThat( "inconsistent page read from filePageId = " + pageId + ", with " + lockName + + ", workerId = " + threadId + " [t:" + Thread.currentThread().getId() + "]", + counter, is( pageCounts[pageId] ) ); + } + catch ( Throwable throwable ) + { + shouldStop.set( true ); + throw throwable; + } + if ( updateCounter ) + { + counter++; + pageCounts[pageId]++; + cursor.setOffset( offset ); + cursor.putInt( counter ); + } + } + } + + return new Result( threadId, pageCounts ); + } + } + + List> futures = new ArrayList<>(); + for ( int i = 0; i < threadCount; i++ ) + { + futures.add( executor.submit( new Worker( i ) ) ); + } + + Thread.sleep( 10 ); + shouldStop.set( true ); + + for ( Future future : futures ) + { + Result result = future.get(); + try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_LOCK ) ) + { + for ( int i = 0; i < filePages; i++ ) + { + assertTrue( cursor.next() ); + + int threadId = result.threadId; + int expectedCount = result.pageCounts[i]; + int actualCount; + do + { + cursor.setOffset( threadId * 4 ); + actualCount = cursor.getInt(); + } + while ( cursor.shouldRetry() ); + + assertThat( "wrong count for threadId = " + threadId + ", pageId = " + i, + actualCount, is( expectedCount ) ); + } + } + } + pagedFile.close(); + } + + @RepeatRule.Repeat( times = 100 ) + @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) + public void writeLockingCursorMustThrowWhenLockingPageRacesWithUnmapping() throws Exception + { + // Even if we block in pin, waiting to grab a lock on a page that is + // already locked, and the PagedFile is concurrently closed, then we + // want to have an exception thrown, such that we race and get a + // page that is writable after the PagedFile has been closed. + // This is important because closing a PagedFile implies flushing, thus + // ensuring that all changes make it to storage. + // Conversely, we don't have to go to the same lengths for read locked + // pages, because those are never changed. Not by us, anyway. + + File file = file( "a" ); + generateFileWithRecords( file, recordsPerFilePage * 2, recordSize ); + + getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL ); + + final PagedFile pf = pageCache.map( file, filePageSize ); + final CountDownLatch hasLockLatch = new CountDownLatch( 1 ); + final CountDownLatch unlockLatch = new CountDownLatch( 1 ); + final CountDownLatch secondThreadGotLockLatch = new CountDownLatch( 1 ); + + executor.submit( () -> { + try ( PageCursor cursor = pf.io( 0, PF_EXCLUSIVE_LOCK ) ) + { + cursor.next(); + hasLockLatch.countDown(); + unlockLatch.await(); + } + return null; + } ); + + hasLockLatch.await(); // An exclusive lock is now held on page 0. + + Future takeLockFuture = executor.submit( () -> { + try ( PageCursor cursor = pf.io( 0, PF_EXCLUSIVE_LOCK ) ) + { + cursor.next(); + secondThreadGotLockLatch.await(); + } + return null; + } ); + + Future closeFuture = executor.submit( () -> { + pf.close(); + return null; + } ); + + try + { + closeFuture.get( 100, TimeUnit.MILLISECONDS ); + fail( "Expected a TimeoutException here" ); + } + catch ( TimeoutException e ) + { + // As expected, the close cannot not complete while an exclusive + // lock is held + } + + // Now, both the close action and a grab for an exclusive page lock is + // waiting for our first thread. + // When we release that lock, we should see that either close completes + // and our second thread, the one blocked on the write lock, gets an + // exception, or we should see that the second thread gets the lock, + // and then close has to wait for that thread as well. + + unlockLatch.countDown(); // The race is on. + + try + { + closeFuture.get( 1000, TimeUnit.MILLISECONDS ); + // The closeFuture got it first, so the takeLockFuture should throw. + try + { + secondThreadGotLockLatch.countDown(); // only to prevent incorrect programs from deadlocking + takeLockFuture.get(); + fail( "Expected takeLockFuture.get() to throw an ExecutionException" ); + } + catch ( ExecutionException e ) + { + Throwable cause = e.getCause(); + assertThat( cause, instanceOf( IllegalStateException.class ) ); + assertThat( cause.getMessage(), startsWith( "File has been unmapped" ) ); + } + } + catch ( TimeoutException e ) + { + // The takeLockFuture got it first, so the closeFuture should + // complete when we release the latch. + secondThreadGotLockLatch.countDown(); + closeFuture.get( 2000, TimeUnit.MILLISECONDS ); + } + } + + @RepeatRule.Repeat( times = 3000 ) + @Test( timeout = LONG_TIMEOUT_MILLIS ) + public void pageCacheMustRemainInternallyConsistentWhenGettingRandomFailures() throws Exception + { + // NOTE: This test is inherently non-deterministic. This means that every failure must be + // thoroughly investigated, since they have a good chance of being a real issue. + // This is effectively a targeted robustness test. + + RandomAdversary adversary = new RandomAdversary( 0.5, 0.2, 0.2 ); + adversary.setProbabilityFactor( 0.0 ); + FileSystemAbstraction fs = new AdversarialFileSystemAbstraction( adversary, this.fs ); + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + // Because our test failures are non-deterministic, we use this tracer to capture a full history of the + // events leading up to any given failure. + LinearHistoryPageCacheTracer tracer = new LinearHistoryPageCacheTracer(); + getPageCache( fs, maxPages, pageCachePageSize, tracer ); + + PagedFile pfA = pageCache.map( existingFile( "a" ), filePageSize ); + PagedFile pfB = pageCache.map( existingFile( "b" ), filePageSize / 2 + 1 ); + adversary.setProbabilityFactor( 1.0 ); + + for ( int i = 0; i < 1000; i++ ) + { + PagedFile pagedFile = rng.nextBoolean()? pfA : pfB; + long maxPageId = pagedFile.getLastPageId(); + boolean performingRead = rng.nextBoolean() && maxPageId != -1; + long startingPage = maxPageId < 0? 0 : rng.nextLong( maxPageId + 1 ); + int pf_flags = performingRead ? PF_SHARED_LOCK : PF_EXCLUSIVE_LOCK; + int pageSize = pagedFile.pageSize(); + + try ( PageCursor cursor = pagedFile.io( startingPage, pf_flags ) ) + { + if ( performingRead ) + { + performConsistentAdversarialRead( cursor, maxPageId, startingPage, pageSize ); + } + else + { + performConsistentAdversarialWrite( cursor, rng, pageSize ); + } + } + catch ( AssertionError error ) + { + // Capture any exception that might have hit the eviction thread. + adversary.setProbabilityFactor( 0.0 ); + try ( PageCursor cursor = pagedFile.io( 0, PF_EXCLUSIVE_LOCK ) ) + { + for ( int j = 0; j < 100; j++ ) + { + cursor.next( rng.nextLong( maxPageId + 1 ) ); + } + } + catch ( Throwable throwable ) + { + error.addSuppressed( throwable ); + } + + throw error; + } + catch ( Throwable throwable ) + { + // Don't worry about it... it's fine! +// throwable.printStackTrace(); // only enable this when debugging test failures. + } + } + + // Unmapping will cause pages to be flushed. + // We don't want that to fail, since it will upset the test tear-down. + adversary.setProbabilityFactor( 0.0 ); + try + { + // Flushing all pages, if successful, should clear any internal + // exception. + pageCache.flushAndForce(); + + // Do some post-chaos verification of what has been written. + verifyAdversarialPagedContent( pfA ); + verifyAdversarialPagedContent( pfB ); + + pfA.close(); + pfB.close(); + } + catch ( Throwable e ) + { + tracer.printHistory( System.err ); + throw e; + } + } + + private void performConsistentAdversarialRead( PageCursor cursor, long maxPageId, long startingPage, + int pageSize ) throws IOException + { + long pagesToLookAt = Math.min( maxPageId, startingPage + 3 ) - startingPage + 1; + for ( int j = 0; j < pagesToLookAt; j++ ) + { + assertTrue( cursor.next() ); + readAndVerifyAdversarialPage( cursor, pageSize ); + } + } + + private void readAndVerifyAdversarialPage( PageCursor cursor, int pageSize ) throws IOException + { + byte[] actualPage = new byte[pageSize]; + byte[] expectedPage = new byte[pageSize]; + do + { + cursor.getBytes( actualPage ); + } + while ( cursor.shouldRetry() ); + Arrays.fill( expectedPage, actualPage[0] ); + String msg = String.format( + "filePageId = %s, pageSize = %s", + cursor.getCurrentPageId(), pageSize ); + assertThat( msg, actualPage, byteArray( expectedPage ) ); + } + + private void performConsistentAdversarialWrite( PageCursor cursor, ThreadLocalRandom rng, int pageSize ) throws IOException + { + for ( int j = 0; j < 3; j++ ) + { + assertTrue( cursor.next() ); + // Avoid generating zeros, so we can tell them apart from the + // absence of a write: + byte b = (byte) rng.nextInt( 1, 127 ); + for ( int k = 0; k < pageSize; k++ ) + { + cursor.putByte( b ); + } + assertFalse( cursor.shouldRetry() ); + } + } + + private void verifyAdversarialPagedContent( PagedFile pagedFile ) throws IOException + { + try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_LOCK ) ) + { + while ( cursor.next() ) + { + readAndVerifyAdversarialPage( cursor, pagedFile.pageSize() ); + } + } + } +} diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java index 1d70bb3099fb..c38873479a8c 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTest.java @@ -20,13 +20,8 @@ package org.neo4j.io.pagecache; import org.apache.commons.lang3.SystemUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -import org.junit.internal.AssumptionViolatedException; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -41,26 +36,17 @@ import java.nio.file.OpenOption; import java.nio.file.StandardOpenOption; import java.util.ArrayList; -import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Queue; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.neo4j.adversaries.RandomAdversary; -import org.neo4j.adversaries.fs.AdversarialFileSystemAbstraction; import org.neo4j.function.ThrowingAction; import org.neo4j.function.ThrowingConsumer; import org.neo4j.graphdb.mockfs.DelegatingFileSystemAbstraction; @@ -79,7 +65,6 @@ import org.neo4j.io.pagecache.tracing.DefaultPageCacheTracer; import org.neo4j.io.pagecache.tracing.PageCacheTracer; import org.neo4j.io.pagecache.tracing.PinEvent; -import org.neo4j.test.LinearHistoryPageCacheTracer; import org.neo4j.test.RepeatRule; import static java.lang.Long.toHexString; @@ -92,7 +77,6 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -105,257 +89,14 @@ import static org.neo4j.test.ThreadTestUtils.awaitThreadState; import static org.neo4j.test.ThreadTestUtils.fork; -public abstract class PageCacheTest +public abstract class PageCacheTest extends PageCacheTestSupport { - protected static final long SHORT_TIMEOUT_MILLIS = 10_000; - protected static final long SEMI_LONG_TIMEOUT_MILLIS = 60_000; - protected static final long LONG_TIMEOUT_MILLIS = 360_000; - - protected static ExecutorService executor; - @BeforeClass public static void enablePinUnpinMonitoring() { DefaultPageCacheTracer.enablePinUnpinTracing(); } - @BeforeClass - public static void startExecutor() - { - executor = Executors.newCachedThreadPool(); - } - - @AfterClass - public static void stopExecutor() - { - executor.shutdown(); - } - - @Rule - public RepeatRule repeatRule = new RepeatRule(); - - protected int recordSize = 9; - protected int maxPages = 20; - protected int pageCachePageSize = 32; - protected int recordsPerFilePage = pageCachePageSize / recordSize; - protected int recordCount = 25 * maxPages * recordsPerFilePage; - protected int filePageSize = recordsPerFilePage * recordSize; - protected ByteBuffer bufA = ByteBuffer.allocate( recordSize ); - - protected FileSystemAbstraction fs; - - private T pageCache; - - protected File file( String pathname ) - { - return new File( pathname ); - } - - protected abstract T createPageCache( - PageSwapperFactory swapperFactory, - int maxPages, - int pageSize, - PageCacheTracer tracer ); - - protected T createPageCache( - FileSystemAbstraction fs, - int maxPages, - int pageSize, - PageCacheTracer tracer ) - { - PageSwapperFactory swapperFactory = new SingleFilePageSwapperFactory(); - swapperFactory.setFileSystemAbstraction( fs ); - return createPageCache( swapperFactory, maxPages, pageSize, tracer ); - } - - protected FileSystemAbstraction createFileSystemAbstraction() - { - return new EphemeralFileSystemAbstraction(); - } - - protected abstract void tearDownPageCache( T pageCache ) throws IOException; - - protected final T getPageCache( - FileSystemAbstraction fs, - int maxPages, - int pageSize, - PageCacheTracer tracer ) throws IOException - { - if ( pageCache != null ) - { - tearDownPageCache( pageCache ); - } - pageCache = createPageCache( fs, maxPages, pageSize, tracer ); - return pageCache; - } - - @Before - public void setUp() throws IOException - { - Thread.interrupted(); // Clear stray interrupts - fs = createFileSystemAbstraction(); - ensureExists( file( "a" ) ); - } - - protected void ensureExists( File file ) throws IOException - { - fs.create( file ).close(); - } - - protected File existingFile( String name ) throws IOException - { - File file = file( name ); - ensureExists( file ); - return file; - } - - @After - public void tearDown() throws IOException - { - Thread.interrupted(); // Clear stray interrupts - - if ( pageCache != null ) - { - tearDownPageCache( pageCache ); - } - - if ( fs instanceof EphemeralFileSystemAbstraction ) - { - ((EphemeralFileSystemAbstraction) fs).shutdown(); - } - } - - /** - * Verifies the records on the current page of the cursor. - *

- * This does the do-while-retry loop internally. - */ - protected void verifyRecordsMatchExpected( PageCursor cursor ) throws IOException - { - ByteBuffer expectedPageContents = ByteBuffer.allocate( filePageSize ); - ByteBuffer actualPageContents = ByteBuffer.allocate( filePageSize ); - byte[] record = new byte[recordSize]; - long pageId = cursor.getCurrentPageId(); - for ( int i = 0; i < recordsPerFilePage; i++ ) - { - long recordId = (pageId * recordsPerFilePage) + i; - expectedPageContents.position( recordSize * i ); - generateRecordForId( recordId, expectedPageContents.slice() ); - do - { - cursor.setOffset( recordSize * i ); - cursor.getBytes( record ); - } while ( cursor.shouldRetry() ); - actualPageContents.position( recordSize * i ); - actualPageContents.put( record ); - } - assertRecord( pageId, actualPageContents, expectedPageContents ); - } - - protected void assertRecord( long pageId, ByteBuffer actualPageContents, ByteBuffer expectedPageContents ) - { - byte[] actualBytes = actualPageContents.array(); - byte[] expectedBytes = expectedPageContents.array(); - int estimatedPageId = estimateId( actualBytes ); - assertThat( - "Page id: " + pageId + " " + - "(based on record data, it should have been " + - estimatedPageId + ", a difference of " + - Math.abs( pageId - estimatedPageId ) + ")", - actualBytes, - byteArray( expectedBytes ) ); - } - - protected int estimateId( byte[] record ) - { - return ByteBuffer.wrap( record ).getInt() - 1; - } - - /** - * Fill the page bound by the cursor with records that can be verified with - * {@link #verifyRecordsMatchExpected(PageCursor)} or {@link #verifyRecordsInFile(java.io.File, int)}. - */ - protected void writeRecords( PageCursor cursor ) - { - cursor.setOffset( 0 ); - for ( int i = 0; i < recordsPerFilePage; i++ ) - { - long recordId = (cursor.getCurrentPageId() * recordsPerFilePage) + i; - generateRecordForId( recordId, bufA ); - cursor.putBytes( bufA.array() ); - } - } - - protected void generateFileWithRecords( - File file, - int recordCount, - int recordSize ) throws IOException - { - StoreChannel channel = fs.open( file, "rw" ); - ByteBuffer buf = ByteBuffer.allocate( recordSize ); - for ( int i = 0; i < recordCount; i++ ) - { - generateRecordForId( i, buf ); - channel.writeAll( buf ); - } - channel.close(); - } - - protected static void generateRecordForId( long id, ByteBuffer buf ) - { - buf.position( 0 ); - int x = (int) (id + 1); - buf.putInt( x ); - while ( buf.position() < buf.limit() ) - { - x++; - buf.put( (byte) (x & 0xFF) ); - } - buf.position( 0 ); - } - - protected void verifyRecordsInFile( File file, int recordCount ) throws IOException - { - StoreChannel channel = fs.open( file, "r" ); - ByteBuffer buf = ByteBuffer.allocate( recordSize ); - ByteBuffer observation = ByteBuffer.allocate( recordSize ); - for ( int i = 0; i < recordCount; i++ ) - { - generateRecordForId( i, buf ); - observation.position( 0 ); - channel.read( observation ); - assertRecord( i, observation, buf ); - } - channel.close(); - } - - protected Runnable $close( final PagedFile file ) - { - return () -> { - try - { - file.close(); - } - catch ( IOException e ) - { - throw new AssertionError( e ); - } - }; - } - - /** - * We implement 'assumeTrue' ourselves because JUnit insist on adding hamcrest matchers to the - * AssumptionViolatedException instances it throws. This is a problem because those matchers are not serializable, - * so they cannot be used together with the BootClassPathRunner, because it uses RMI under the hood. - */ - protected void assumeTrue( String description, boolean test ) - { - if ( !test ) - { - throw new AssumptionViolatedException( description ); - } - } - @Test public void mustReportConfiguredMaxPages() throws IOException { @@ -528,12 +269,12 @@ public void writesFlushedFromPageFileMustBeObservableEvenWhenRacingWithEviction( @Test( timeout = SHORT_TIMEOUT_MILLIS ) public void writesFlushedFromPageCacheMustBeExternallyObservable() throws IOException { - ByteBuffer buf = ByteBuffer.allocate( recordSize ); PageCache cache = getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL ); long startPageId = 0; long endPageId = recordCount / recordsPerFilePage; - try ( PagedFile pagedFile = cache.map( file( "a" ), filePageSize ); + File file = file( "a" ); + try ( PagedFile pagedFile = cache.map( file, filePageSize ); PageCursor cursor = pagedFile.io( startPageId, PF_EXCLUSIVE_LOCK ) ) { while ( cursor.getCurrentPageId() < endPageId && cursor.next() ) @@ -545,16 +286,7 @@ public void writesFlushedFromPageCacheMustBeExternallyObservable() throws IOExce } } // closing the PagedFile implies flushing because it was the last reference - StoreChannel channel = fs.open( file( "a" ), "r" ); - ByteBuffer observation = ByteBuffer.allocate( recordSize ); - for ( int i = 0; i < recordCount; i++ ) - { - generateRecordForId( i, buf ); - observation.position( 0 ); - channel.read( observation ); - assertRecord( i, observation, buf ); - } - channel.close(); + verifyRecordsInFile( file, recordCount ); } @Test( timeout = SHORT_TIMEOUT_MILLIS ) @@ -775,12 +507,12 @@ public void mustCloseFileChannelWhenTheLastHandleIsUnmapped() throws Exception @Test( timeout = SHORT_TIMEOUT_MILLIS ) public void dirtyPagesMustBeFlushedWhenTheCacheIsClosed() throws IOException { - ByteBuffer buf = ByteBuffer.allocate( recordSize ); getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL ); long startPageId = 0; long endPageId = recordCount / recordsPerFilePage; - try ( PagedFile pagedFile = pageCache.map( file( "a" ), filePageSize ); + File file = file( "a" ); + try ( PagedFile pagedFile = pageCache.map( file, filePageSize ); PageCursor cursor = pagedFile.io( startPageId, PF_EXCLUSIVE_LOCK ) ) { while ( cursor.getCurrentPageId() < endPageId && cursor.next() ) @@ -796,16 +528,7 @@ public void dirtyPagesMustBeFlushedWhenTheCacheIsClosed() throws IOException pageCache.close(); } - StoreChannel channel = fs.open( file( "a" ), "r" ); - ByteBuffer observation = ByteBuffer.allocate( recordSize ); - for ( int i = 0; i < recordCount; i++ ) - { - generateRecordForId( i, buf ); - observation.position( 0 ); - channel.read( observation ); - assertRecord( i, observation, buf ); - } - channel.close(); + verifyRecordsInFile( file, recordCount ); } @Test( timeout = SHORT_TIMEOUT_MILLIS ) @@ -1861,159 +1584,6 @@ public void readsAndWritesMustBeMutuallyConsistent() throws Exception } } - @RepeatRule.Repeat( times = 1000 ) - @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) - public void mustNotLoseUpdates() throws Exception - { - // Another test that tries to squeeze out data race bugs. The idea is - // the following: - // We have a number of threads that are going to perform one of two - // operations on randomly chosen pages. The first operation is this: - // They are going to pin a random page, and then scan through it to - // find a record that is their own. A record has a thread-id and a - // counter, both 32-bit integers. If the record is not found, it will - // be added after all the other existing records on that page, if any. - // The last 32-bit word on a page is a sum of all the counters, and it - // will be updated. Then it will verify that the sum matches the - // counters. - // The second operation is read-only, where only the verification is - // performed. - // The kicker is this: the threads will also keep track of which of - // their counters on what pages are at what value, by maintaining - // mirror counters in memory. The threads will continuously check if - // these stay in sync with the data on the page cache. If they go out - // of sync, then we have a data race bug where we either pin the wrong - // pages or somehow lose updates to the pages. - // This is somewhat similar to what the PageCacheStressTest does. - - final AtomicBoolean shouldStop = new AtomicBoolean(); - final int cachePages = 20; - final int filePages = cachePages * 2; - final int threadCount = 8; - final int pageSize = threadCount * 4; - - getPageCache( fs, cachePages, pageSize, PageCacheTracer.NULL ); - final PagedFile pagedFile = pageCache.map( file( "a" ), pageSize ); - - // Ensure all the pages exist - try ( PageCursor cursor = pagedFile.io( 0, PF_EXCLUSIVE_LOCK ) ) - { - for ( int i = 0; i < filePages; i++ ) - { - assertTrue( "failed to initialise file page " + i, cursor.next() ); - for ( int j = 0; j < pageSize; j++ ) - { - cursor.putByte( (byte) 0 ); - } - } - } - pageCache.flushAndForce(); - - class Result - { - final int threadId; - final int[] pageCounts; - - Result( int threadId, int[] pageCounts ) - { - this.threadId = threadId; - this.pageCounts = pageCounts; - } - } - - class Worker implements Callable - { - final int threadId; - - Worker( int threadId ) - { - this.threadId = threadId; - } - - @Override - public Result call() throws Exception - { - int[] pageCounts = new int[filePages]; - ThreadLocalRandom rng = ThreadLocalRandom.current(); - - while ( !shouldStop.get() ) - { - int pageId = rng.nextInt( 0, filePages ); - int offset = threadId * 4; - boolean updateCounter = rng.nextBoolean(); - int pf_flags = updateCounter? PF_EXCLUSIVE_LOCK : PF_SHARED_LOCK; - try ( PageCursor cursor = pagedFile.io( pageId, pf_flags ) ) - { - int counter; - try - { - assertTrue( cursor.next() ); - do - { - cursor.setOffset( offset ); - counter = cursor.getInt(); - } - while ( cursor.shouldRetry() ); - String lockName = updateCounter ? "PF_EXCLUSIVE_LOCK" : "PF_SHARED_LOCK"; - assertThat( "inconsistent page read from filePageId = " + pageId + ", with " + lockName + - ", workerId = " + threadId + " [t:" + Thread.currentThread().getId() + "]", - counter, is( pageCounts[pageId] ) ); - } - catch ( Throwable throwable ) - { - shouldStop.set( true ); - throw throwable; - } - if ( updateCounter ) - { - counter++; - pageCounts[pageId]++; - cursor.setOffset( offset ); - cursor.putInt( counter ); - } - } - } - - return new Result( threadId, pageCounts ); - } - } - - List> futures = new ArrayList<>(); - for ( int i = 0; i < threadCount; i++ ) - { - futures.add( executor.submit( new Worker( i ) ) ); - } - - Thread.sleep( 10 ); - shouldStop.set( true ); - - for ( Future future : futures ) - { - Result result = future.get(); - try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_LOCK ) ) - { - for ( int i = 0; i < filePages; i++ ) - { - assertTrue( cursor.next() ); - - int threadId = result.threadId; - int expectedCount = result.pageCounts[i]; - int actualCount; - do - { - cursor.setOffset( threadId * 4 ); - actualCount = cursor.getInt(); - } - while ( cursor.shouldRetry() ); - - assertThat( "wrong count for threadId = " + threadId + ", pageId = " + i, - actualCount, is( expectedCount ) ); - } - } - } - pagedFile.close(); - } - @Test( timeout = SHORT_TIMEOUT_MILLIS ) public void writesOfDifferentUnitsMustHaveCorrectEndianess() throws Exception { @@ -2922,101 +2492,6 @@ public void readingAndRetryingOnPageWithOptimisticReadLockingAfterUnmappingMustN } } - @RepeatRule.Repeat( times = 100 ) - @Test( timeout = SEMI_LONG_TIMEOUT_MILLIS ) - public void writeLockingCursorMustThrowWhenLockingPageRacesWithUnmapping() throws Exception - { - // Even if we block in pin, waiting to grab a lock on a page that is - // already locked, and the PagedFile is concurrently closed, then we - // want to have an exception thrown, such that we race and get a - // page that is writable after the PagedFile has been closed. - // This is important because closing a PagedFile implies flushing, thus - // ensuring that all changes make it to storage. - // Conversely, we don't have to go to the same lengths for read locked - // pages, because those are never changed. Not by us, anyway. - - File file = file( "a" ); - generateFileWithRecords( file, recordsPerFilePage * 2, recordSize ); - - getPageCache( fs, maxPages, pageCachePageSize, PageCacheTracer.NULL ); - - final PagedFile pf = pageCache.map( file, filePageSize ); - final CountDownLatch hasLockLatch = new CountDownLatch( 1 ); - final CountDownLatch unlockLatch = new CountDownLatch( 1 ); - final CountDownLatch secondThreadGotLockLatch = new CountDownLatch( 1 ); - - executor.submit( () -> { - try ( PageCursor cursor = pf.io( 0, PF_EXCLUSIVE_LOCK ) ) - { - cursor.next(); - hasLockLatch.countDown(); - unlockLatch.await(); - } - return null; - } ); - - hasLockLatch.await(); // An exclusive lock is now held on page 0. - - Future takeLockFuture = executor.submit( () -> { - try ( PageCursor cursor = pf.io( 0, PF_EXCLUSIVE_LOCK ) ) - { - cursor.next(); - secondThreadGotLockLatch.await(); - } - return null; - } ); - - Future closeFuture = executor.submit( () -> { - pf.close(); - return null; - } ); - - try - { - closeFuture.get( 100, TimeUnit.MILLISECONDS ); - fail( "Expected a TimeoutException here" ); - } - catch ( TimeoutException e ) - { - // As expected, the close cannot not complete while an exclusive - // lock is held - } - - // Now, both the close action and a grab for an exclusive page lock is - // waiting for our first thread. - // When we release that lock, we should see that either close completes - // and our second thread, the one blocked on the write lock, gets an - // exception, or we should see that the second thread gets the lock, - // and then close has to wait for that thread as well. - - unlockLatch.countDown(); // The race is on. - - try - { - closeFuture.get( 1000, TimeUnit.MILLISECONDS ); - // The closeFuture got it first, so the takeLockFuture should throw. - try - { - secondThreadGotLockLatch.countDown(); // only to prevent incorrect programs from deadlocking - takeLockFuture.get(); - fail( "Expected takeLockFuture.get() to throw an ExecutionException" ); - } - catch ( ExecutionException e ) - { - Throwable cause = e.getCause(); - assertThat( cause, instanceOf( IllegalStateException.class ) ); - assertThat( cause.getMessage(), startsWith( "File has been unmapped" ) ); - } - } - catch ( TimeoutException e ) - { - // The takeLockFuture got it first, so the closeFuture should - // complete when we release the latch. - secondThreadGotLockLatch.countDown(); - closeFuture.get( 2000, TimeUnit.MILLISECONDS ); - } - } - private interface PageCursorAction { void apply( PageCursor cursor ); @@ -3746,150 +3221,6 @@ private void accessPagesWhileInterrupted( } } - @RepeatRule.Repeat( times = 3000 ) - @Test( timeout = LONG_TIMEOUT_MILLIS ) - public void pageCacheMustRemainInternallyConsistentWhenGettingRandomFailures() throws Exception - { - // NOTE: This test is inherently non-deterministic. This means that every failure must be - // thoroughly investigated, since they have a good chance of being a real issue. - // This is effectively a targeted robustness test. - - RandomAdversary adversary = new RandomAdversary( 0.5, 0.2, 0.2 ); - adversary.setProbabilityFactor( 0.0 ); - FileSystemAbstraction fs = new AdversarialFileSystemAbstraction( adversary, this.fs ); - ThreadLocalRandom rng = ThreadLocalRandom.current(); - - // Because our test failures are non-deterministic, we use this tracer to capture a full history of the - // events leading up to any given failure. - LinearHistoryPageCacheTracer tracer = new LinearHistoryPageCacheTracer(); - getPageCache( fs, maxPages, pageCachePageSize, tracer ); - - PagedFile pfA = pageCache.map( existingFile( "a" ), filePageSize ); - PagedFile pfB = pageCache.map( existingFile( "b" ), filePageSize / 2 + 1 ); - adversary.setProbabilityFactor( 1.0 ); - - for ( int i = 0; i < 1000; i++ ) - { - PagedFile pagedFile = rng.nextBoolean()? pfA : pfB; - long maxPageId = pagedFile.getLastPageId(); - boolean performingRead = rng.nextBoolean() && maxPageId != -1; - long startingPage = maxPageId < 0? 0 : rng.nextLong( maxPageId + 1 ); - int pf_flags = performingRead ? PF_SHARED_LOCK : PF_EXCLUSIVE_LOCK; - int pageSize = pagedFile.pageSize(); - - try ( PageCursor cursor = pagedFile.io( startingPage, pf_flags ) ) - { - if ( performingRead ) - { - performConsistentAdversarialRead( cursor, maxPageId, startingPage, pageSize ); - } - else - { - performConsistentAdversarialWrite( cursor, rng, pageSize ); - } - } - catch ( AssertionError error ) - { - // Capture any exception that might have hit the eviction thread. - adversary.setProbabilityFactor( 0.0 ); - try ( PageCursor cursor = pagedFile.io( 0, PF_EXCLUSIVE_LOCK ) ) - { - for ( int j = 0; j < 100; j++ ) - { - cursor.next( rng.nextLong( maxPageId + 1 ) ); - } - } - catch ( Throwable throwable ) - { - error.addSuppressed( throwable ); - } - - throw error; - } - catch ( Throwable throwable ) - { - // Don't worry about it... it's fine! -// throwable.printStackTrace(); // only enable this when debugging test failures. - } - } - - // Unmapping will cause pages to be flushed. - // We don't want that to fail, since it will upset the test tear-down. - adversary.setProbabilityFactor( 0.0 ); - try - { - // Flushing all pages, if successful, should clear any internal - // exception. - pageCache.flushAndForce(); - - // Do some post-chaos verification of what has been written. - verifyAdversarialPagedContent( pfA ); - verifyAdversarialPagedContent( pfB ); - - pfA.close(); - pfB.close(); - } - catch ( Throwable e ) - { - tracer.printHistory( System.err ); - throw e; - } - } - - private void performConsistentAdversarialRead( PageCursor cursor, long maxPageId, long startingPage, - int pageSize ) throws IOException - { - long pagesToLookAt = Math.min( maxPageId, startingPage + 3 ) - startingPage + 1; - for ( int j = 0; j < pagesToLookAt; j++ ) - { - assertTrue( cursor.next() ); - readAndVerifyAdversarialPage( cursor, pageSize ); - } - } - - private void readAndVerifyAdversarialPage( PageCursor cursor, int pageSize ) throws IOException - { - byte[] actualPage = new byte[pageSize]; - byte[] expectedPage = new byte[pageSize]; - do - { - cursor.getBytes( actualPage ); - } - while ( cursor.shouldRetry() ); - Arrays.fill( expectedPage, actualPage[0] ); - String msg = String.format( - "filePageId = %s, pageSize = %s", - cursor.getCurrentPageId(), pageSize ); - assertThat( msg, actualPage, byteArray( expectedPage ) ); - } - - private void performConsistentAdversarialWrite( PageCursor cursor, ThreadLocalRandom rng, int pageSize ) throws IOException - { - for ( int j = 0; j < 3; j++ ) - { - assertTrue( cursor.next() ); - // Avoid generating zeros, so we can tell them apart from the - // absence of a write: - byte b = (byte) rng.nextInt( 1, 127 ); - for ( int k = 0; k < pageSize; k++ ) - { - cursor.putByte( b ); - } - assertFalse( cursor.shouldRetry() ); - } - } - - private void verifyAdversarialPagedContent( PagedFile pagedFile ) throws IOException - { - try ( PageCursor cursor = pagedFile.io( 0, PF_SHARED_LOCK ) ) - { - while ( cursor.next() ) - { - readAndVerifyAdversarialPage( cursor, pagedFile.pageSize() ); - } - } - } - // NOTE: This test is CPU architecture dependent, but it should fail on no // architecture that we support. // This test has no timeout because one may want to run it on a CPU diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTestSupport.java b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTestSupport.java new file mode 100644 index 000000000000..d3144937d7c5 --- /dev/null +++ b/community/io/src/test/java/org/neo4j/io/pagecache/PageCacheTestSupport.java @@ -0,0 +1,336 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.internal.AssumptionViolatedException; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.io.fs.StoreChannel; +import org.neo4j.io.pagecache.impl.SingleFilePageSwapperFactory; +import org.neo4j.io.pagecache.tracing.PageCacheTracer; +import org.neo4j.test.RepeatRule; + +import static org.junit.Assert.assertThat; +import static org.neo4j.test.ByteArrayMatcher.byteArray; + +public abstract class PageCacheTestSupport +{ + protected static final long SHORT_TIMEOUT_MILLIS = 10_000; + protected static final long SEMI_LONG_TIMEOUT_MILLIS = 60_000; + protected static final long LONG_TIMEOUT_MILLIS = 360_000; + protected static ExecutorService executor; + + @BeforeClass + public static void startExecutor() + { + executor = Executors.newCachedThreadPool(); + } + + @AfterClass + public static void stopExecutor() + { + executor.shutdown(); + } + + @Rule + public RepeatRule repeatRule = new RepeatRule(); + + protected int recordSize = 9; + protected int maxPages = 20; + protected int pageCachePageSize = 32; + protected int recordsPerFilePage = pageCachePageSize / recordSize; + protected int recordCount = 25 * maxPages * recordsPerFilePage; + protected int filePageSize = recordsPerFilePage * recordSize; + protected ByteBuffer bufA = ByteBuffer.allocate( recordSize ); + protected FileSystemAbstraction fs; + protected T pageCache; + + private Fixture fixture; + + protected abstract Fixture createFixture(); + + @Before + public void setUp() throws IOException + { + fixture = createFixture(); + Thread.interrupted(); // Clear stray interrupts + fs = createFileSystemAbstraction(); + ensureExists( file( "a" ) ); + } + + @After + public void tearDown() throws IOException + { + Thread.interrupted(); // Clear stray interrupts + + if ( pageCache != null ) + { + tearDownPageCache( pageCache ); + } + + if ( fs instanceof EphemeralFileSystemAbstraction ) + { + ((EphemeralFileSystemAbstraction) fs).shutdown(); + } + } + + protected final T createPageCache( + PageSwapperFactory swapperFactory, + int maxPages, + int pageSize, + PageCacheTracer tracer ) + { + return fixture.createPageCache( swapperFactory, maxPages, pageSize, tracer ); + } + + protected T createPageCache( + FileSystemAbstraction fs, + int maxPages, + int pageSize, + PageCacheTracer tracer ) + { + PageSwapperFactory swapperFactory = new SingleFilePageSwapperFactory(); + swapperFactory.setFileSystemAbstraction( fs ); + return createPageCache( swapperFactory, maxPages, pageSize, tracer ); + } + + protected final T getPageCache( + FileSystemAbstraction fs, + int maxPages, + int pageSize, + PageCacheTracer tracer ) throws IOException + { + if ( pageCache != null ) + { + tearDownPageCache( pageCache ); + } + pageCache = createPageCache( fs, maxPages, pageSize, tracer ); + return pageCache; + } + + protected final void tearDownPageCache( T pageCache ) throws IOException + { + fixture.tearDownPageCache( pageCache ); + } + + protected final FileSystemAbstraction createFileSystemAbstraction() + { + return fixture.getFileSystemAbstraction(); + } + + protected final File file( String pathname ) + { + return fixture.file( pathname ); + } + + protected void ensureExists( File file ) throws IOException + { + fs.create( file ).close(); + } + + protected File existingFile( String name ) throws IOException + { + File file = file( name ); + ensureExists( file ); + return file; + } + + /** + * Verifies the records on the current page of the cursor. + *

+ * This does the do-while-retry loop internally. + */ + protected void verifyRecordsMatchExpected( PageCursor cursor ) throws IOException + { + ByteBuffer expectedPageContents = ByteBuffer.allocate( filePageSize ); + ByteBuffer actualPageContents = ByteBuffer.allocate( filePageSize ); + byte[] record = new byte[recordSize]; + long pageId = cursor.getCurrentPageId(); + for ( int i = 0; i < recordsPerFilePage; i++ ) + { + long recordId = (pageId * recordsPerFilePage) + i; + expectedPageContents.position( recordSize * i ); + generateRecordForId( recordId, expectedPageContents.slice() ); + do + { + cursor.setOffset( recordSize * i ); + cursor.getBytes( record ); + } while ( cursor.shouldRetry() ); + actualPageContents.position( recordSize * i ); + actualPageContents.put( record ); + } + assertRecord( pageId, actualPageContents, expectedPageContents ); + } + + protected void assertRecord( long pageId, ByteBuffer actualPageContents, ByteBuffer expectedPageContents ) + { + byte[] actualBytes = actualPageContents.array(); + byte[] expectedBytes = expectedPageContents.array(); + int estimatedPageId = estimateId( actualBytes ); + assertThat( + "Page id: " + pageId + " " + + "(based on record data, it should have been " + + estimatedPageId + ", a difference of " + + Math.abs( pageId - estimatedPageId ) + ")", + actualBytes, + byteArray( expectedBytes ) ); + } + + protected int estimateId( byte[] record ) + { + return ByteBuffer.wrap( record ).getInt() - 1; + } + + /** + * Fill the page bound by the cursor with records that can be verified with + * {@link #verifyRecordsMatchExpected(PageCursor)} or {@link #verifyRecordsInFile(java.io.File, int)}. + */ + protected void writeRecords( PageCursor cursor ) + { + cursor.setOffset( 0 ); + for ( int i = 0; i < recordsPerFilePage; i++ ) + { + long recordId = (cursor.getCurrentPageId() * recordsPerFilePage) + i; + generateRecordForId( recordId, bufA ); + cursor.putBytes( bufA.array() ); + } + } + + protected void generateFileWithRecords( + File file, + int recordCount, + int recordSize ) throws IOException + { + StoreChannel channel = fs.open( file, "rw" ); + ByteBuffer buf = ByteBuffer.allocate( recordSize ); + for ( int i = 0; i < recordCount; i++ ) + { + generateRecordForId( i, buf ); + channel.writeAll( buf ); + } + channel.close(); + } + + protected static void generateRecordForId( long id, ByteBuffer buf ) + { + buf.position( 0 ); + int x = (int) (id + 1); + buf.putInt( x ); + while ( buf.position() < buf.limit() ) + { + x++; + buf.put( (byte) (x & 0xFF) ); + } + buf.position( 0 ); + } + + protected void verifyRecordsInFile( File file, int recordCount ) throws IOException + { + StoreChannel channel = fs.open( file, "r" ); + ByteBuffer buf = ByteBuffer.allocate( recordSize ); + ByteBuffer observation = ByteBuffer.allocate( recordSize ); + for ( int i = 0; i < recordCount; i++ ) + { + generateRecordForId( i, buf ); + observation.position( 0 ); + channel.read( observation ); + assertRecord( i, observation, buf ); + } + channel.close(); + } + + protected Runnable $close( final PagedFile file ) + { + return () -> { + try + { + file.close(); + } + catch ( IOException e ) + { + throw new AssertionError( e ); + } + }; + } + + /** + * We implement 'assumeTrue' ourselves because JUnit insist on adding hamcrest matchers to the + * AssumptionViolatedException instances it throws. This is a problem because those matchers are not serializable, + * so they cannot be used together with the BootClassPathRunner, because it uses RMI under the hood. + */ + protected void assumeTrue( String description, boolean test ) + { + if ( !test ) + { + throw new AssumptionViolatedException( description ); + } + } + + public static abstract class Fixture + { + public abstract T createPageCache( + PageSwapperFactory swapperFactory, + int maxPages, + int pageSize, + PageCacheTracer tracer ); + + public abstract void tearDownPageCache( T pageCache ) throws IOException; + + private Supplier fileSystemAbstractionSupplier = EphemeralFileSystemAbstraction::new; + private Function fileConstructor = File::new; + + public final FileSystemAbstraction getFileSystemAbstraction() + { + return fileSystemAbstractionSupplier.get(); + } + + public final Fixture withFileSystemAbstraction( + Supplier fileSystemAbstractionSupplier ) + { + this.fileSystemAbstractionSupplier = fileSystemAbstractionSupplier; + return this; + } + + public final File file( String pathname ) + { + return fileConstructor.apply( pathname ); + } + + public final Fixture withFileConstructor( Function fileConstructor ) + { + this.fileConstructor = fileConstructor; + return this; + } + } +} diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithAdversarialFileDispatcherTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithAdversarialFileDispatcherIT.java similarity index 92% rename from community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithAdversarialFileDispatcherTest.java rename to community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithAdversarialFileDispatcherIT.java index 4a74de209712..2066e3c6b8e5 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithAdversarialFileDispatcherTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithAdversarialFileDispatcherIT.java @@ -29,7 +29,7 @@ @BootClassPathRunner.BootEntryOf( UnsafeUtilTest.class ) @RunWith( BootClassPathRunner.class ) -public class SingleFilePageSwapperWithAdversarialFileDispatcherTest extends SingleFilePageSwapperWithRealFileSystemTest +public class SingleFilePageSwapperWithAdversarialFileDispatcherIT extends SingleFilePageSwapperWithRealFileSystemIT { @BeforeClass public static void enableAdversarialFileDispatcher() diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithRealFileSystemTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithRealFileSystemIT.java similarity index 93% rename from community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithRealFileSystemTest.java rename to community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithRealFileSystemIT.java index 86d043431ec2..fda61cfb66c8 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithRealFileSystemTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/SingleFilePageSwapperWithRealFileSystemIT.java @@ -24,7 +24,7 @@ import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; -public class SingleFilePageSwapperWithRealFileSystemTest extends SingleFilePageSwapperTest +public class SingleFilePageSwapperWithRealFileSystemIT extends SingleFilePageSwapperTest { private final DefaultFileSystemAbstraction fs = new DefaultFileSystemAbstraction(); diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheFixture.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheFixture.java new file mode 100644 index 000000000000..e58c18d249ae --- /dev/null +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheFixture.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache.impl.muninn; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.neo4j.io.pagecache.PageCacheTestSupport; +import org.neo4j.io.pagecache.PageSwapperFactory; +import org.neo4j.io.pagecache.tracing.PageCacheTracer; + +class MuninnPageCacheFixture extends PageCacheTestSupport.Fixture +{ + CountDownLatch backgroundFlushLatch; + + @Override + public MuninnPageCache createPageCache( PageSwapperFactory swapperFactory, int maxPages, int pageSize, + PageCacheTracer tracer ) + { + return new MuninnPageCache( swapperFactory, maxPages, pageSize, tracer ); + } + + @Override + public void tearDownPageCache( MuninnPageCache pageCache ) throws IOException + { + if ( backgroundFlushLatch != null ) + { + backgroundFlushLatch.countDown(); + backgroundFlushLatch = null; + } + pageCache.close(); + } +} diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowIT.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowIT.java new file mode 100644 index 000000000000..dfece208fb48 --- /dev/null +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowIT.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache.impl.muninn; + +import org.neo4j.io.pagecache.PageCacheSlowTest; + +public class MuninnPageCacheSlowIT extends PageCacheSlowTest +{ + @Override + protected Fixture createFixture() + { + return new MuninnPageCacheFixture(); + } +} diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithAdversarialFileDispatcherIT.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithAdversarialFileDispatcherIT.java new file mode 100644 index 000000000000..20c312c87bec --- /dev/null +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithAdversarialFileDispatcherIT.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache.impl.muninn; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; + +import org.neo4j.adversaries.fs.AdversarialFileChannel; +import org.neo4j.test.bootclasspathrunner.BootClassPathRunner; +import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtilTest; + +@BootClassPathRunner.BootEntryOf( UnsafeUtilTest.class ) +@RunWith( BootClassPathRunner.class ) +public class MuninnPageCacheSlowTestWithAdversarialFileDispatcherIT extends MuninnPageCacheSlowTestWithRealFileSystemIT +{ + @BeforeClass + public static void enableAdversarialFileDispatcher() + { + AdversarialFileChannel.useAdversarialFileDispatcherHack = true; + } + + @AfterClass + public static void disableAdversarialFileDispatcher() + { + AdversarialFileChannel.useAdversarialFileDispatcherHack = false; + } +} diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithRealFileSystemIT.java similarity index 74% rename from community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemTest.java rename to community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithRealFileSystemIT.java index 76a40dac3382..9d3cbcccc7dc 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheSlowTestWithRealFileSystemIT.java @@ -21,26 +21,19 @@ import org.junit.Rule; -import java.io.File; - import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.test.TargetDirectory; -public class MuninnPageCacheWithRealFileSystemTest extends MuninnPageCacheTest +public class MuninnPageCacheSlowTestWithRealFileSystemIT extends MuninnPageCacheSlowIT { @Rule public TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() ); @Override - protected File file( String pathname ) - { - return directory.file( pathname ); - } - - @Override - protected FileSystemAbstraction createFileSystemAbstraction() + protected Fixture createFixture() { - return new DefaultFileSystemAbstraction(); + return super.createFixture() + .withFileSystemAbstraction( DefaultFileSystemAbstraction::new ) + .withFileConstructor( pathname -> directory.file( pathname ) ); } } diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java index fb289121d59a..7088ba46cf4d 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheTest.java @@ -33,7 +33,6 @@ import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.pagecache.PageCacheTest; import org.neo4j.io.pagecache.PageCursor; -import org.neo4j.io.pagecache.PageSwapperFactory; import org.neo4j.io.pagecache.PagedFile; import org.neo4j.io.pagecache.RecordingPageCacheTracer; import org.neo4j.io.pagecache.tracing.DelegatingPageCacheTracer; @@ -57,39 +56,24 @@ public class MuninnPageCacheTest extends PageCacheTest { private final long x = 0xCAFEBABEDEADBEEFL; private final long y = 0xDECAFC0FFEEDECAFL; - private CountDownLatch backgroundFlushLatch; + private MuninnPageCacheFixture fixture; @Override - protected MuninnPageCache createPageCache( - PageSwapperFactory swapperFactory, - int maxPages, - int pageSize, - PageCacheTracer tracer ) + protected Fixture createFixture() { - return new MuninnPageCache( swapperFactory, maxPages, pageSize, tracer ); - } - - @Override - protected void tearDownPageCache( MuninnPageCache pageCache ) throws IOException - { - if ( backgroundFlushLatch != null ) - { - backgroundFlushLatch.countDown(); - backgroundFlushLatch = null; - } - pageCache.close(); + return fixture = new MuninnPageCacheFixture(); } private PageCacheTracer blockCacheFlush( PageCacheTracer delegate ) { - backgroundFlushLatch = new CountDownLatch( 1 ); + fixture.backgroundFlushLatch = new CountDownLatch( 1 ); return new DelegatingPageCacheTracer( delegate ) { public MajorFlushEvent beginCacheFlush() { try { - backgroundFlushLatch.await(); + fixture.backgroundFlushLatch.await(); } catch ( InterruptedException e ) { diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithAdversarialFileDispatcherTest.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithAdversarialFileDispatcherIT.java similarity index 95% rename from community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithAdversarialFileDispatcherTest.java rename to community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithAdversarialFileDispatcherIT.java index c1a5deae786c..e4764b6ad4b9 100644 --- a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithAdversarialFileDispatcherTest.java +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithAdversarialFileDispatcherIT.java @@ -41,7 +41,7 @@ */ @BootClassPathRunner.BootEntryOf( UnsafeUtilTest.class ) @RunWith( BootClassPathRunner.class ) -public class MuninnPageCacheWithAdversarialFileDispatcherTest extends MuninnPageCacheWithRealFileSystemTest +public class MuninnPageCacheWithAdversarialFileDispatcherIT extends MuninnPageCacheWithRealFileSystemIT { @BeforeClass public static void enableAdversarialFileDispatcher() diff --git a/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemIT.java b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemIT.java new file mode 100644 index 000000000000..7cc64531515f --- /dev/null +++ b/community/io/src/test/java/org/neo4j/io/pagecache/impl/muninn/MuninnPageCacheWithRealFileSystemIT.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2002-2015 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.io.pagecache.impl.muninn; + +import org.junit.Rule; + +import org.neo4j.io.fs.DefaultFileSystemAbstraction; +import org.neo4j.test.TargetDirectory; + +public class MuninnPageCacheWithRealFileSystemIT extends MuninnPageCacheTest +{ + @Rule + public TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() ); + + @Override + protected Fixture createFixture() + { + return super.createFixture() + .withFileSystemAbstraction( DefaultFileSystemAbstraction::new ) + .withFileConstructor( pathname -> directory.file( pathname ) ); + } +}