Skip to content

Commit

Permalink
Slighty revamped Race API and tests for it
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Sep 20, 2016
1 parent 7a4fc7b commit 321846e
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 96 deletions.
Expand Up @@ -104,7 +104,7 @@ public void shouldEncodeProperlyWithMultipleThreadsRacing() throws Throwable
"Since my imagination for coming up with test data is usually poor, I figured I'd do something useful.",
"Hopefully this isn't just nonsensical drivel, and maybe, just maybe someone might actually read it."};

Race race = new Race( false );
Race race = new Race();
for ( String input : INPUT )
{
final String[] inputArray = new String[] {input};
Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

import org.neo4j.collection.pool.Pool;
Expand Down Expand Up @@ -59,7 +58,6 @@
import static org.mockito.Mockito.mock;

import static java.lang.System.currentTimeMillis;
import static org.neo4j.test.Race.until;

public class KernelTransactionTerminationTest
{
Expand Down Expand Up @@ -132,15 +130,16 @@ private void runTwoThreads( Consumer<TestKernelTransaction> thread1Action,
int limit = 20_000;

Race race = new Race();
BooleanSupplier end = () -> ((t1Count.get() >= limit && t2Count.get() >= limit) || currentTimeMillis() >= endTime);
race.addContestant( until( end, () -> {
race.withEndCondition(
() -> ((t1Count.get() >= limit && t2Count.get() >= limit) || currentTimeMillis() >= endTime) );
race.addContestant( () -> {
thread1Action.accept( tx );
t1Count.incrementAndGet();
} ) );
race.addContestant( until( end, () -> {
} );
race.addContestant( () -> {
thread2Action.accept( tx );
t2Count.incrementAndGet();
} ) );
} );
race.go();
}

Expand Down
Expand Up @@ -92,7 +92,7 @@ private void loadNode( GraphDatabaseAPI db, Node node )

private void tryOnce( final GraphDatabaseAPI db, final Node node ) throws Throwable
{
Race race = new Race( true );
Race race = new Race().withRandomStartDelays();
race.addContestants( Runtime.getRuntime().availableProcessors(), () -> {
try ( Transaction ignored = db.beginTx() )
{
Expand Down
Expand Up @@ -30,7 +30,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;

import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
Expand Down Expand Up @@ -65,7 +64,6 @@
import static org.neo4j.kernel.impl.store.MetaDataStore.versionStringToLong;
import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP;
import static org.neo4j.test.Race.throwing;
import static org.neo4j.test.Race.until;

public class MetaDataStoreTest
{
Expand Down Expand Up @@ -405,22 +403,19 @@ public void setUpgradeTransactionMustBeAtomic() throws Throwable
long endTime = currentTimeMillis() + SECONDS.toMillis( 10 );

Race race = new Race();
BooleanSupplier end = () ->
{
boolean upperBoundReached = writeCount.get() >= upperLimit &&
fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit;
boolean lowerBoundReached = writeCount.get() >= lowerLimit &&
fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit;
return !upperBoundReached || (currentTimeMillis() >= endTime && lowerBoundReached);
};
race.withEndCondition( () -> writeCount.get() >= upperLimit &&
fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit );
race.withEndCondition( () -> writeCount.get() >= lowerLimit &&
fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit &&
currentTimeMillis() >= endTime );
// writers
race.addContestants( 3, until( end, () -> {
race.addContestants( 3, () -> {
long count = writeCount.incrementAndGet();
store.setUpgradeTransaction( count, count, count );
} ) );
} );

// file readers
race.addContestants( 3, until( end, throwing( () -> {
race.addContestants( 3, throwing( () -> {
try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) )
{
assertTrue( cursor.next() );
Expand All @@ -434,13 +429,13 @@ public void setUpgradeTransactionMustBeAtomic() throws Throwable
assertIdEqualsChecksum( id, checksum, "file" );
fileReadCount.incrementAndGet();
}
} ) ) );
} ) );

race.addContestants( 3, until( end, () -> {
race.addContestants( 3, () -> {
TransactionId transaction = store.getUpgradeTransaction();
assertIdEqualsChecksum( transaction.transactionId(), transaction.checksum(), "API" );
apiReadCount.incrementAndGet();
} ) );
} );
race.go();
}
}
Expand Down Expand Up @@ -488,22 +483,19 @@ public void transactionCommittedMustBeAtomic() throws Throwable
int lowerLimit = 100;
long endTime = currentTimeMillis() + SECONDS.toMillis( 10 );

BooleanSupplier end = () ->
{
boolean upperBoundReached = writeCount.get() >= upperLimit &&
fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit;
boolean lowerBoundReached = writeCount.get() >= lowerLimit &&
fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit;
return !upperBoundReached || (currentTimeMillis() >= endTime && lowerBoundReached);
};
Race race = new Race();
race.addContestants( 3, until( end, () ->
race.withEndCondition( () -> writeCount.get() >= upperLimit &&
fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit );
race.withEndCondition( () -> writeCount.get() >= lowerLimit &&
fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit &&
currentTimeMillis() >= endTime );
race.addContestants( 3, () ->
{
long count = writeCount.incrementAndGet();
store.transactionCommitted( count, count, count );
} ) );
} );

race.addContestants( 3, until( end, throwing( () -> {
race.addContestants( 3, throwing( () -> {
try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) )
{
assertTrue( cursor.next() );
Expand All @@ -517,14 +509,14 @@ public void transactionCommittedMustBeAtomic() throws Throwable
assertIdEqualsChecksum( id, checksum, "file" );
fileReadCount.incrementAndGet();
}
} ) ) );
} ) );

race.addContestants( 3, until( end, () ->
race.addContestants( 3, () ->
{
TransactionId transaction = store.getLastCommittedTransaction();
assertIdEqualsChecksum( transaction.transactionId(), transaction.checksum(), "API" );
apiReadCount.incrementAndGet();
} ) );
} );

race.go();
}
Expand All @@ -545,21 +537,18 @@ public void transactionClosedMustBeAtomic() throws Throwable
int lowerLimit = 100;
long endTime = currentTimeMillis() + SECONDS.toMillis( 10 );

BooleanSupplier end = () ->
{
boolean upperBoundReached = writeCount.get() >= upperLimit &&
fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit;
boolean lowerBoundReached = writeCount.get() >= lowerLimit &&
fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit;
return !upperBoundReached || (currentTimeMillis() >= endTime && lowerBoundReached);
};
Race race = new Race();
race.addContestants( 3, until( end, () -> {
race.withEndCondition( () -> writeCount.get() >= upperLimit &&
fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit );
race.withEndCondition( () -> writeCount.get() >= lowerLimit &&
fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit &&
currentTimeMillis() >= endTime );
race.addContestants( 3, () -> {
long count = writeCount.incrementAndGet();
store.transactionCommitted( count, count, count );
} ) );
} );

race.addContestants( 3, until( end, throwing( () -> {
race.addContestants( 3, throwing( () -> {
try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) )
{
assertTrue( cursor.next() );
Expand All @@ -575,13 +564,13 @@ public void transactionClosedMustBeAtomic() throws Throwable
assertLogVersionEqualsByteOffset( logVersion, byteOffset, "file" );
fileReadCount.incrementAndGet();
}
} ) ) );
} ) );

race.addContestants( 3, until( end, () -> {
race.addContestants( 3, () -> {
long[] transaction = store.getLastClosedTransaction();
assertLogVersionEqualsByteOffset( transaction[0], transaction[1], "API" );
apiReadCount.incrementAndGet();
} ) );
} );
race.go();
}
}
Expand Down
Expand Up @@ -23,8 +23,6 @@
import org.junit.Test;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;

import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotFoundException;
import org.neo4j.graphdb.Relationship;
Expand All @@ -40,8 +38,6 @@
import static java.lang.System.currentTimeMillis;
import static java.util.concurrent.TimeUnit.SECONDS;

import static org.neo4j.test.Race.until;

public class NeoStoresIT
{
@ClassRule
Expand Down Expand Up @@ -97,8 +93,8 @@ public void shouldWriteOutTheDynamicChainBeforeUpdatingThePropertyRecord()
AtomicLong writes = new AtomicLong();
AtomicLong reads = new AtomicLong();
long endTime = currentTimeMillis() + SECONDS.toMillis( 2 );
BooleanSupplier end = () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime;
race.addContestant( until( end, () ->
race.withEndCondition( () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime );
race.addContestant( () ->
{
try ( Transaction tx = db.beginTx() )
{
Expand All @@ -108,8 +104,8 @@ public void shouldWriteOutTheDynamicChainBeforeUpdatingThePropertyRecord()
tx.success();
}
writes.incrementAndGet();
} ) );
race.addContestant( until( end, () ->
} );
race.addContestant( () ->
{
try ( Transaction tx = db.getGraphDatabaseAPI().beginTx() )
{
Expand All @@ -126,7 +122,7 @@ public void shouldWriteOutTheDynamicChainBeforeUpdatingThePropertyRecord()
// but handled in shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord)
}
reads.incrementAndGet();
} ) );
} );
race.go();
}

Expand All @@ -139,8 +135,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord()
AtomicLong writes = new AtomicLong();
AtomicLong reads = new AtomicLong();
long endTime = currentTimeMillis() + SECONDS.toMillis( 2 );
BooleanSupplier end = () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime;
race.addContestant( until( end, () ->
race.withEndCondition( () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime );
race.addContestant( () ->
{
try ( Transaction tx = db.beginTx() )
{
Expand All @@ -150,8 +146,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord()
tx.success();
}
writes.incrementAndGet();
} ) );
race.addContestant( until( end, () ->
} );
race.addContestant( () ->
{
try ( Transaction tx = db.getGraphDatabaseAPI().beginTx() )
{
Expand All @@ -171,7 +167,7 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord()
}
}
reads.incrementAndGet();
} ) );
} );
race.go();
}

Expand All @@ -197,8 +193,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromARelationshipR
AtomicLong writes = new AtomicLong();
AtomicLong reads = new AtomicLong();
long endTime = currentTimeMillis() + SECONDS.toMillis( 2 );
BooleanSupplier end = () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime;
race.addContestant( until( end, () ->
race.withEndCondition( () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime );
race.addContestant( () ->
{
try ( Transaction tx = db.beginTx() )
{
Expand All @@ -212,8 +208,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromARelationshipR
tx.success();
}
writes.incrementAndGet();
} ) );
race.addContestant( until( end, () ->
} );
race.addContestant( () ->
{
try ( Transaction tx = db.getGraphDatabaseAPI().beginTx() )
{
Expand All @@ -233,7 +229,7 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromARelationshipR
}
}
reads.incrementAndGet();
} ) );
} );
race.go();
}
}

0 comments on commit 321846e

Please sign in to comment.