Skip to content

Commit

Permalink
Faster MetaDataStoreTest
Browse files Browse the repository at this point in the history
by using Race and not overkilling atomic testing. With the new code
the races could be reproduced with something like 98% accuracy
and yet runs in 1/30th of the time.
  • Loading branch information
tinwelint committed Sep 19, 2016
1 parent 7155dd8 commit 2dae86d
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 129 deletions.
Expand Up @@ -19,7 +19,6 @@
*/
package org.neo4j.kernel.impl.store;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -30,17 +29,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import org.neo4j.function.ThrowingAction;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
import org.neo4j.io.pagecache.DelegatingPageCache;
import org.neo4j.io.pagecache.DelegatingPagedFile;
Expand All @@ -55,6 +47,7 @@
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.logging.NullLogger;
import org.neo4j.test.Race;
import org.neo4j.test.rule.PageCacheRule;
import org.neo4j.test.rule.fs.EphemeralFileSystemRule;

Expand All @@ -65,13 +58,18 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.kernel.impl.store.MetaDataStore.versionStringToLong;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
import static org.neo4j.test.Race.throwing;
import static org.neo4j.test.Race.until;

public class MetaDataStoreTest
{
private static final File STORE_DIR = new File( "store" );
private static final ExecutorService executor = Executors.newCachedThreadPool();

@Rule
public final EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule();
Expand All @@ -84,12 +82,6 @@ public class MetaDataStoreTest
private boolean fakePageCursorOverflow;
private PageCache pageCacheWithFakeOverflow;

@AfterClass
public static void shutDownExecutor()
{
executor.shutdown();
}

@Before
public void setUp()
{
Expand Down Expand Up @@ -399,22 +391,33 @@ public void testRecordTransactionClosed() throws Exception
}

@Test
public void setUpgradeTransactionMustBeAtomic() throws Exception
public void setUpgradeTransactionMustBeAtomic() throws Throwable
{
try ( MetaDataStore store = newMetaDataStore() )
{
PagedFile pf = store.storeFile;
store.setUpgradeTransaction( 0, 0, 0 );
CountDownLatch runLatch = new CountDownLatch( 1 );
AtomicBoolean stopped = new AtomicBoolean();
AtomicLong counter = new AtomicLong();

Runnable writer = untilStopped( stopped, runLatch, () -> {
long count = counter.incrementAndGet();
AtomicLong writeCount = new AtomicLong();
AtomicLong fileReadCount = new AtomicLong();
AtomicLong apiReadCount = new AtomicLong();
int limit = 10_000;
long endTime = currentTimeMillis() + SECONDS.toMillis( 1 );

Race race = new Race();
BooleanSupplier end = () ->
{
boolean doContinue = writeCount.get() < limit ||
fileReadCount.get() < limit || apiReadCount.get() < limit;
return !doContinue || currentTimeMillis() >= endTime;
};
// writers
race.addContestants( 3, until( end, () -> {
long count = writeCount.incrementAndGet();
store.setUpgradeTransaction( count, count, count );
} );
} ) );

Runnable fileReader = untilStopped( stopped, runLatch, () -> {
// file readers
race.addContestants( 3, until( end, throwing( () -> {
try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) )
{
assertTrue( cursor.next() );
Expand All @@ -426,50 +429,19 @@ public void setUpgradeTransactionMustBeAtomic() throws Exception
}
while ( cursor.shouldRetry() );
assertIdEqualsChecksum( id, checksum, "file" );
fileReadCount.incrementAndGet();
}
} );
} ) ) );

Runnable apiReader = untilStopped( stopped, runLatch, () -> {
race.addContestants( 3, until( end, () -> {
TransactionId transaction = store.getUpgradeTransaction();
assertIdEqualsChecksum( transaction.transactionId(), transaction.checksum(), "API" );
} );

forkMultiple( 10, writer );
List<Future<?>> readerFutures = forkMultiple( 5, fileReader );
readerFutures.addAll( forkMultiple( 5, apiReader ) );

runLatch.await( 1, TimeUnit.SECONDS );
stopped.set( true );

for ( Future<?> future : readerFutures )
{
future.get(); // We assert that this does not throw
}
apiReadCount.incrementAndGet();
} ) );
race.go();
}
}

private static Runnable untilStopped(
AtomicBoolean stopped, CountDownLatch runLatch, ThrowingAction<? extends Exception> runnable )
{
return () -> {
try
{
while ( !stopped.get() )
{
runnable.apply();
}
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
finally
{
runLatch.countDown();
}
};
}

private static void assertIdEqualsChecksum( long id, long checksum, String source )
{
if ( id != checksum )
Expand All @@ -479,58 +451,52 @@ private static void assertIdEqualsChecksum( long id, long checksum, String sourc
}
}

private static List<Future<?>> forkMultiple( int forks, Runnable runnable )
{
List<Future<?>> futures = new ArrayList<>();
for ( int i = 0; i < forks; i++ )
{
futures.add( executor.submit( runnable ) );
}
return futures;
}

@Test
public void incrementAndGetVersionMustBeAtomic() throws Exception
public void incrementAndGetVersionMustBeAtomic() throws Throwable
{
try ( MetaDataStore store = newMetaDataStore() )
{
long initialVersion = store.incrementAndGetVersion();
int threads = 10, iterations = 2_000;
Semaphore startLatch = new Semaphore( 0 );
Runnable incrementer = () -> {
startLatch.acquireUninterruptibly();
int threads = Runtime.getRuntime().availableProcessors(), iterations = 500;
Race race = new Race();
race.addContestants( threads, () ->
{
for ( int i = 0; i < iterations; i++ )
{
store.incrementAndGetVersion();
}
};
List<Future<?>> futures = forkMultiple( threads, incrementer );
startLatch.release( threads );
for ( Future<?> future : futures )
{
future.get();
}
} );
race.go();
assertThat( store.incrementAndGetVersion(), is( initialVersion + (threads * iterations) + 1 ) );
}
}

@Test
public void transactionCommittedMustBeAtomic() throws Exception
public void transactionCommittedMustBeAtomic() throws Throwable
{
try ( MetaDataStore store = newMetaDataStore() )
{
PagedFile pf = store.storeFile;
store.transactionCommitted( 2, 2, 2 );
CountDownLatch runLatch = new CountDownLatch( 1 );
AtomicBoolean stopped = new AtomicBoolean();
AtomicLong counter = new AtomicLong( 2 );
AtomicLong writeCount = new AtomicLong();
AtomicLong fileReadCount = new AtomicLong();
AtomicLong apiReadCount = new AtomicLong();
long endTime = currentTimeMillis() + SECONDS.toMillis( 1 );

Runnable writer = untilStopped( stopped, runLatch, () -> {
long count = counter.incrementAndGet();
BooleanSupplier end = () ->
{
boolean doContinue = writeCount.get() < 10_000 ||
fileReadCount.get() < 10_000 || apiReadCount.get() < 10_000;
return !doContinue || currentTimeMillis() >= endTime;
};
Race race = new Race();
race.addContestants( 3, until( end, () ->
{
long count = writeCount.incrementAndGet();
store.transactionCommitted( count, count, count );
} );
} ) );

Runnable fileReader = untilStopped( stopped, runLatch, () -> {
race.addContestants( 3, until( end, throwing( () -> {
try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) )
{
assertTrue( cursor.next() );
Expand All @@ -542,46 +508,47 @@ public void transactionCommittedMustBeAtomic() throws Exception
}
while ( cursor.shouldRetry() );
assertIdEqualsChecksum( id, checksum, "file" );
fileReadCount.incrementAndGet();
}
} );
} ) ) );

Runnable apiReader = untilStopped( stopped, runLatch, () -> {
race.addContestants( 3, until( end, () ->
{
TransactionId transaction = store.getLastCommittedTransaction();
assertIdEqualsChecksum( transaction.transactionId(), transaction.checksum(), "API" );
} );
apiReadCount.incrementAndGet();
} ) );

forkMultiple( 10, writer );
List<Future<?>> readerFutures = forkMultiple( 5, fileReader );
readerFutures.addAll( forkMultiple( 5, apiReader ) );

runLatch.await( 1, TimeUnit.SECONDS );
stopped.set( true );

for ( Future<?> future : readerFutures )
{
future.get(); // We assert that this does not throw
}
race.go();
}
}

@Test
public void transactionClosedMustBeAtomic() throws Exception
public void transactionClosedMustBeAtomic() throws Throwable
{
try ( MetaDataStore store = newMetaDataStore() )
{
PagedFile pf = store.storeFile;
int initialValue = 2;
store.transactionClosed( initialValue, initialValue, initialValue );
CountDownLatch runLatch = new CountDownLatch( 1 );
AtomicBoolean stopped = new AtomicBoolean();
AtomicLong counter = new AtomicLong( initialValue );
AtomicLong writeCount = new AtomicLong();
AtomicLong fileReadCount = new AtomicLong();
AtomicLong apiReadCount = new AtomicLong();
long endTime = currentTimeMillis() + SECONDS.toMillis( 1 );

Runnable writer = untilStopped( stopped, runLatch, () -> {
long count = counter.incrementAndGet();
BooleanSupplier end = () ->
{
boolean doContinue = writeCount.get() < 10_000 ||
fileReadCount.get() < 10_000 || apiReadCount.get() < 10_000;
return !doContinue || currentTimeMillis() >= endTime;
};
Race race = new Race();
race.addContestants( 3, until( end, () -> {
long count = writeCount.incrementAndGet();
store.transactionCommitted( count, count, count );
} );
} ) );

Runnable fileReader = untilStopped( stopped, runLatch, () -> {
race.addContestants( 3, until( end, throwing( () -> {
try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) )
{
assertTrue( cursor.next() );
Expand All @@ -595,25 +562,16 @@ public void transactionClosedMustBeAtomic() throws Exception
}
while ( cursor.shouldRetry() );
assertLogVersionEqualsByteOffset( logVersion, byteOffset, "file" );
fileReadCount.incrementAndGet();
}
} );
} ) ) );

Runnable apiReader = untilStopped( stopped, runLatch, () -> {
race.addContestants( 3, until( end, () -> {
long[] transaction = store.getLastClosedTransaction();
assertLogVersionEqualsByteOffset( transaction[0], transaction[1], "API" );
} );

forkMultiple( 0, writer );
List<Future<?>> readerFutures = forkMultiple( 5, fileReader );
readerFutures.addAll( forkMultiple( 5, apiReader ) );

runLatch.await( 1, TimeUnit.SECONDS );
stopped.set( true );

for ( Future<?> future : readerFutures )
{
future.get(); // We assert that this does not throw
}
apiReadCount.incrementAndGet();
} ) );
race.go();
}
}

Expand Down

0 comments on commit 2dae86d

Please sign in to comment.