diff --git a/community/kernel/src/test/java/org/neo4j/kernel/api/index/ArrayEncoderTest.java b/community/kernel/src/test/java/org/neo4j/kernel/api/index/ArrayEncoderTest.java index c5c9ead51c8c0..7364a5eec19c3 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/api/index/ArrayEncoderTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/api/index/ArrayEncoderTest.java @@ -104,7 +104,7 @@ public void shouldEncodeProperlyWithMultipleThreadsRacing() throws Throwable "Since my imagination for coming up with test data is usually poor, I figured I'd do something useful.", "Hopefully this isn't just nonsensical drivel, and maybe, just maybe someone might actually read it."}; - Race race = new Race( false ); + Race race = new Race(); for ( String input : INPUT ) { final String[] inputArray = new String[] {input}; diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java index 64c2147ed674f..2861f0882554c 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/KernelTransactionTerminationTest.java @@ -26,7 +26,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BooleanSupplier; import java.util.function.Consumer; import org.neo4j.collection.pool.Pool; @@ -59,7 +58,6 @@ import static org.mockito.Mockito.mock; import static java.lang.System.currentTimeMillis; -import static org.neo4j.test.Race.until; public class KernelTransactionTerminationTest { @@ -132,15 +130,16 @@ private void runTwoThreads( Consumer thread1Action, int limit = 20_000; Race race = new Race(); - BooleanSupplier end = () -> ((t1Count.get() >= limit && t2Count.get() >= limit) || currentTimeMillis() >= endTime); - race.addContestant( until( end, () -> { + race.withEndCondition( + () -> ((t1Count.get() >= limit && t2Count.get() >= limit) || currentTimeMillis() >= endTime) ); + race.addContestant( () -> { thread1Action.accept( tx ); t1Count.incrementAndGet(); - } ) ); - race.addContestant( until( end, () -> { + } ); + race.addContestant( () -> { thread2Action.accept( tx ); t2Count.incrementAndGet(); - } ) ); + } ); race.go(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/core/TestConcurrentRelationshipChainLoadingIssue.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/core/TestConcurrentRelationshipChainLoadingIssue.java index d208f2baa60f1..855c2d3044d25 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/core/TestConcurrentRelationshipChainLoadingIssue.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/core/TestConcurrentRelationshipChainLoadingIssue.java @@ -92,7 +92,7 @@ private void loadNode( GraphDatabaseAPI db, Node node ) private void tryOnce( final GraphDatabaseAPI db, final Node node ) throws Throwable { - Race race = new Race( true ); + Race race = new Race().withRandomStartDelays(); race.addContestants( Runtime.getRuntime().availableProcessors(), () -> { try ( Transaction ignored = db.beginTx() ) { diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java index d75738219cb32..53d1413b516d3 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/MetaDataStoreTest.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction; @@ -65,7 +64,6 @@ import static org.neo4j.kernel.impl.store.MetaDataStore.versionStringToLong; import static org.neo4j.kernel.impl.transaction.log.TransactionIdStore.BASE_TX_COMMIT_TIMESTAMP; import static org.neo4j.test.Race.throwing; -import static org.neo4j.test.Race.until; public class MetaDataStoreTest { @@ -405,22 +403,19 @@ public void setUpgradeTransactionMustBeAtomic() throws Throwable long endTime = currentTimeMillis() + SECONDS.toMillis( 10 ); Race race = new Race(); - BooleanSupplier end = () -> - { - boolean upperBoundReached = writeCount.get() >= upperLimit && - fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit; - boolean lowerBoundReached = writeCount.get() >= lowerLimit && - fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit; - return !upperBoundReached || (currentTimeMillis() >= endTime && lowerBoundReached); - }; + race.withEndCondition( () -> writeCount.get() >= upperLimit && + fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit ); + race.withEndCondition( () -> writeCount.get() >= lowerLimit && + fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit && + currentTimeMillis() >= endTime ); // writers - race.addContestants( 3, until( end, () -> { + race.addContestants( 3, () -> { long count = writeCount.incrementAndGet(); store.setUpgradeTransaction( count, count, count ); - } ) ); + } ); // file readers - race.addContestants( 3, until( end, throwing( () -> { + race.addContestants( 3, throwing( () -> { try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) ) { assertTrue( cursor.next() ); @@ -434,13 +429,13 @@ public void setUpgradeTransactionMustBeAtomic() throws Throwable assertIdEqualsChecksum( id, checksum, "file" ); fileReadCount.incrementAndGet(); } - } ) ) ); + } ) ); - race.addContestants( 3, until( end, () -> { + race.addContestants( 3, () -> { TransactionId transaction = store.getUpgradeTransaction(); assertIdEqualsChecksum( transaction.transactionId(), transaction.checksum(), "API" ); apiReadCount.incrementAndGet(); - } ) ); + } ); race.go(); } } @@ -488,22 +483,19 @@ public void transactionCommittedMustBeAtomic() throws Throwable int lowerLimit = 100; long endTime = currentTimeMillis() + SECONDS.toMillis( 10 ); - BooleanSupplier end = () -> - { - boolean upperBoundReached = writeCount.get() >= upperLimit && - fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit; - boolean lowerBoundReached = writeCount.get() >= lowerLimit && - fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit; - return !upperBoundReached || (currentTimeMillis() >= endTime && lowerBoundReached); - }; Race race = new Race(); - race.addContestants( 3, until( end, () -> + race.withEndCondition( () -> writeCount.get() >= upperLimit && + fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit ); + race.withEndCondition( () -> writeCount.get() >= lowerLimit && + fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit && + currentTimeMillis() >= endTime ); + race.addContestants( 3, () -> { long count = writeCount.incrementAndGet(); store.transactionCommitted( count, count, count ); - } ) ); + } ); - race.addContestants( 3, until( end, throwing( () -> { + race.addContestants( 3, throwing( () -> { try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) ) { assertTrue( cursor.next() ); @@ -517,14 +509,14 @@ public void transactionCommittedMustBeAtomic() throws Throwable assertIdEqualsChecksum( id, checksum, "file" ); fileReadCount.incrementAndGet(); } - } ) ) ); + } ) ); - race.addContestants( 3, until( end, () -> + race.addContestants( 3, () -> { TransactionId transaction = store.getLastCommittedTransaction(); assertIdEqualsChecksum( transaction.transactionId(), transaction.checksum(), "API" ); apiReadCount.incrementAndGet(); - } ) ); + } ); race.go(); } @@ -545,21 +537,18 @@ public void transactionClosedMustBeAtomic() throws Throwable int lowerLimit = 100; long endTime = currentTimeMillis() + SECONDS.toMillis( 10 ); - BooleanSupplier end = () -> - { - boolean upperBoundReached = writeCount.get() >= upperLimit && - fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit; - boolean lowerBoundReached = writeCount.get() >= lowerLimit && - fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit; - return !upperBoundReached || (currentTimeMillis() >= endTime && lowerBoundReached); - }; Race race = new Race(); - race.addContestants( 3, until( end, () -> { + race.withEndCondition( () -> writeCount.get() >= upperLimit && + fileReadCount.get() >= upperLimit && apiReadCount.get() >= upperLimit ); + race.withEndCondition( () -> writeCount.get() >= lowerLimit && + fileReadCount.get() >= lowerLimit && apiReadCount.get() >= lowerLimit && + currentTimeMillis() >= endTime ); + race.addContestants( 3, () -> { long count = writeCount.incrementAndGet(); store.transactionCommitted( count, count, count ); - } ) ); + } ); - race.addContestants( 3, until( end, throwing( () -> { + race.addContestants( 3, throwing( () -> { try ( PageCursor cursor = pf.io( 0, PagedFile.PF_SHARED_READ_LOCK ) ) { assertTrue( cursor.next() ); @@ -575,13 +564,13 @@ public void transactionClosedMustBeAtomic() throws Throwable assertLogVersionEqualsByteOffset( logVersion, byteOffset, "file" ); fileReadCount.incrementAndGet(); } - } ) ) ); + } ) ); - race.addContestants( 3, until( end, () -> { + race.addContestants( 3, () -> { long[] transaction = store.getLastClosedTransaction(); assertLogVersionEqualsByteOffset( transaction[0], transaction[1], "API" ); apiReadCount.incrementAndGet(); - } ) ); + } ); race.go(); } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresIT.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresIT.java index 33d80e8aca17c..5a2b5c8ef1115 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresIT.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresIT.java @@ -23,8 +23,6 @@ import org.junit.Test; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BooleanSupplier; - import org.neo4j.graphdb.Node; import org.neo4j.graphdb.NotFoundException; import org.neo4j.graphdb.Relationship; @@ -40,8 +38,6 @@ import static java.lang.System.currentTimeMillis; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.neo4j.test.Race.until; - public class NeoStoresIT { @ClassRule @@ -97,8 +93,8 @@ public void shouldWriteOutTheDynamicChainBeforeUpdatingThePropertyRecord() AtomicLong writes = new AtomicLong(); AtomicLong reads = new AtomicLong(); long endTime = currentTimeMillis() + SECONDS.toMillis( 2 ); - BooleanSupplier end = () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime; - race.addContestant( until( end, () -> + race.withEndCondition( () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime ); + race.addContestant( () -> { try ( Transaction tx = db.beginTx() ) { @@ -108,8 +104,8 @@ public void shouldWriteOutTheDynamicChainBeforeUpdatingThePropertyRecord() tx.success(); } writes.incrementAndGet(); - } ) ); - race.addContestant( until( end, () -> + } ); + race.addContestant( () -> { try ( Transaction tx = db.getGraphDatabaseAPI().beginTx() ) { @@ -126,7 +122,7 @@ public void shouldWriteOutTheDynamicChainBeforeUpdatingThePropertyRecord() // but handled in shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord) } reads.incrementAndGet(); - } ) ); + } ); race.go(); } @@ -139,8 +135,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord() AtomicLong writes = new AtomicLong(); AtomicLong reads = new AtomicLong(); long endTime = currentTimeMillis() + SECONDS.toMillis( 2 ); - BooleanSupplier end = () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime; - race.addContestant( until( end, () -> + race.withEndCondition( () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime ); + race.addContestant( () -> { try ( Transaction tx = db.beginTx() ) { @@ -150,8 +146,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord() tx.success(); } writes.incrementAndGet(); - } ) ); - race.addContestant( until( end, () -> + } ); + race.addContestant( () -> { try ( Transaction tx = db.getGraphDatabaseAPI().beginTx() ) { @@ -171,7 +167,7 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromANodeRecord() } } reads.incrementAndGet(); - } ) ); + } ); race.go(); } @@ -197,8 +193,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromARelationshipR AtomicLong writes = new AtomicLong(); AtomicLong reads = new AtomicLong(); long endTime = currentTimeMillis() + SECONDS.toMillis( 2 ); - BooleanSupplier end = () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime; - race.addContestant( until( end, () -> + race.withEndCondition( () -> (writes.get() > 100 && reads.get() > 10_000) || currentTimeMillis() > endTime ); + race.addContestant( () -> { try ( Transaction tx = db.beginTx() ) { @@ -212,8 +208,8 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromARelationshipR tx.success(); } writes.incrementAndGet(); - } ) ); - race.addContestant( until( end, () -> + } ); + race.addContestant( () -> { try ( Transaction tx = db.getGraphDatabaseAPI().beginTx() ) { @@ -233,7 +229,7 @@ public void shouldWriteOutThePropertyRecordBeforeReferencingItFromARelationshipR } } reads.incrementAndGet(); - } ) ); + } ); race.go(); } } diff --git a/community/kernel/src/test/java/org/neo4j/test/Race.java b/community/kernel/src/test/java/org/neo4j/test/Race.java index e1dfac0227b33..88928bb555e9a 100644 --- a/community/kernel/src/test/java/org/neo4j/test/Race.java +++ b/community/kernel/src/test/java/org/neo4j/test/Race.java @@ -48,41 +48,62 @@ public interface ThrowingRunnable private final List contestants = new ArrayList<>(); private volatile CountDownLatch readySet; private final CountDownLatch go = new CountDownLatch( 1 ); - private final boolean addSomeMinorRandomStartDelays; + private volatile boolean addSomeMinorRandomStartDelays; + private volatile BooleanSupplier endCondition; + private volatile boolean failure; - public Race() + public Race withRandomStartDelays() { - this( false ); + this.addSomeMinorRandomStartDelays = true; + return this; } - public Race( boolean addSomeMinorRandomStartDelays ) + /** + * Adds an end condition to this race. The race will end whenever an end condition is met + * or when there's one contestant failing (throwing any sort of exception). + * + * @param endConditions one or more end conditions, such that when returning {@code true} + * signals that the race should end. + * @return this {@link Race} instance. + */ + public Race withEndCondition( BooleanSupplier... endConditions ) { - this.addSomeMinorRandomStartDelays = addSomeMinorRandomStartDelays; + for ( BooleanSupplier endCondition : endConditions ) + { + this.endCondition = mergeEndCondition( endCondition ); + } + return this; } - public static Runnable timed( long time, TimeUnit unit, Runnable singleOperation ) + /** + * Convenience for adding an end condition which is based on time. This will have contestants + * end after the given duration (time + unit). + * + * @param time time value. + * @param unit unit of time in {@link TimeUnit}. + * @return this {@link Race} instance. + */ + public Race withMaxDuration( long time, TimeUnit unit ) { - return () -> - { - long endTime = currentTimeMillis() + unit.toMillis( time ); - while ( currentTimeMillis() < endTime ) - { - singleOperation.run(); - } - }; + long endTime = currentTimeMillis() + unit.toMillis( time ); + this.endCondition = mergeEndCondition( () -> currentTimeMillis() >= endTime ); + return this; } - public static Runnable until( BooleanSupplier end, Runnable singleOperation ) + private BooleanSupplier mergeEndCondition( BooleanSupplier additionalEndCondition ) { - return () -> - { - while ( !end.getAsBoolean() ) - { - singleOperation.run(); - } - }; + BooleanSupplier existingEndCondition = endCondition; + return existingEndCondition == null ? additionalEndCondition : + () -> existingEndCondition.getAsBoolean() || additionalEndCondition.getAsBoolean(); } + /** + * Convenience for wrapping contestants, especially for lambdas, which throws any sort of + * checked exception. + * + * @param runnable actual contestant. + * @return contestant wrapped in a try-catch (and re-throw as unchecked exception). + */ public static Runnable throwing( ThrowingRunnable runnable ) { return () -> @@ -131,6 +152,11 @@ public void go() throws Throwable */ public void go( long maxWaitTime, TimeUnit unit ) throws Throwable { + if ( endCondition == null ) + { + endCondition = () -> true; + } + readySet = new CountDownLatch( contestants.size() ); for ( Contestant contestant : contestants ) { @@ -195,6 +221,7 @@ private class Contestant extends Thread Contestant( Runnable code, int nr ) { super( code, "Contestant#" + nr ); + this.setUncaughtExceptionHandler( (thread,error) -> {} ); } @Override @@ -219,11 +246,19 @@ public void run() try { - super.run(); + while ( !failure ) + { + super.run(); + if ( endCondition.getAsBoolean() ) + { + break; + } + } } catch ( Throwable e ) { error = e; + failure = true; // <-- global flag throw e; } } diff --git a/community/kernel/src/test/java/org/neo4j/test/RaceTest.java b/community/kernel/src/test/java/org/neo4j/test/RaceTest.java new file mode 100644 index 0000000000000..470f5d2cbf0e5 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/test/RaceTest.java @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.test; + +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import static java.lang.Thread.sleep; +import static java.util.concurrent.ThreadLocalRandom.current; + +import static org.neo4j.test.Race.throwing; + +/** + * Test of a test utility {@link Race}. + */ +public class RaceTest +{ + @Test + public void shouldWaitForAllContestantsToComplete() throws Throwable + { + // GIVEN + Race race = new Race(); + final AtomicInteger completed = new AtomicInteger(); + int count = 5; + race.addContestants( count, throwing( () -> + { + sleep( current().nextInt( 100 ) ); + completed.incrementAndGet(); + } ) ); + + // WHEN + race.go(); + + // THEN + assertEquals( count, completed.get() ); + } + + @Test + public void shouldConsultEndCondition() throws Throwable + { + // GIVEN + CallCountBooleanSupplier endCondition = new CallCountBooleanSupplier( 100 ); + Race race = new Race().withEndCondition( endCondition ); + race.addContestants( 20, throwing( () -> sleep( 10 ) ) ); + + // WHEN + race.go(); + + // THEN + assertTrue( endCondition.callCount.get() >= 100 ); + } + + @Test + public void shouldHaveMultipleEndConditions() throws Throwable + { + // GIVEN + ControlledBooleanSupplier endCondition1 = spy( new ControlledBooleanSupplier( false ) ); + ControlledBooleanSupplier endCondition2 = spy( new ControlledBooleanSupplier( false ) ); + ControlledBooleanSupplier endCondition3 = spy( new ControlledBooleanSupplier( false ) ); + Race race = new Race().withEndCondition( endCondition1, endCondition2, endCondition3 ); + race.addContestant( () -> endCondition2.set( true ) ); + race.addContestants( 3, () -> {} ); + + // WHEN + race.go(); + + // THEN + verify( endCondition1, times( 4 ) ).getAsBoolean(); + verify( endCondition2, times( 4 ) ).getAsBoolean(); + } + + @Test + public void shouldBreakOnError() throws Throwable + { + // GIVEN + String error = "Noooo"; + Race race = new Race(); + race.withEndCondition( () -> false ); // <-- never end + race.addContestant( () -> {throw new RuntimeException( error );} ); + race.addContestants( 3, () -> {} ); + + // WHEN + try + { + race.go(); + fail( "Should've failed "); + } + catch ( Exception e ) + { + // THEN + assertEquals( error, e.getMessage() ); + } + } + + public static class ControlledBooleanSupplier implements BooleanSupplier + { + private volatile boolean value; + + public ControlledBooleanSupplier( boolean initialValue ) + { + this.value = initialValue; + } + + public void set( boolean value ) + { + this.value = value; + } + + @Override + public boolean getAsBoolean() + { + return value; + } + } + + public static class CallCountBooleanSupplier implements BooleanSupplier + { + private final int callCountTriggeringTrueEndCondition; + private final AtomicInteger callCount = new AtomicInteger(); + + public CallCountBooleanSupplier( int callCountTriggeringTrueEndCondition ) + { + this.callCountTriggeringTrueEndCondition = callCountTriggeringTrueEndCondition; + } + + @Override + public boolean getAsBoolean() + { + return callCount.incrementAndGet() >= callCountTriggeringTrueEndCondition; + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java index 375fe7f22dcdd..92d9ae2b1f781 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java @@ -313,7 +313,7 @@ public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Th { // GIVEN TaskExecutor executor = new DynamicTaskExecutor<>( 1, 2, 2, PARK, "test" ); - Race race = new Race( true ); + Race race = new Race().withRandomStartDelays(); race.addContestant( () -> executor.shutdown( SF_AWAIT_ALL_COMPLETED ) ); race.addContestant( () -> executor.processors( 1 ) );