From 2aa0166f331f0d21cfa7e42bb5cd40a2c2b253a3 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 17 Aug 2016 11:57:27 +0200 Subject: [PATCH] More aggressive multithreaded testing of WorkSync --- .../java/org/neo4j/concurrent/WorkSync.java | 14 ++- .../org/neo4j/concurrent/WorkSyncTest.java | 102 +++++++++++++++--- 2 files changed, 102 insertions(+), 14 deletions(-) diff --git a/community/primitive-collections/src/main/java/org/neo4j/concurrent/WorkSync.java b/community/primitive-collections/src/main/java/org/neo4j/concurrent/WorkSync.java index 6b3c5ccad8dd2..d74577ed035c8 100644 --- a/community/primitive-collections/src/main/java/org/neo4j/concurrent/WorkSync.java +++ b/community/primitive-collections/src/main/java/org/neo4j/concurrent/WorkSync.java @@ -83,7 +83,7 @@ public void apply( W work ) tryCount++; try { - if ( lock.tryLock( tryCount < 10? 0 : 10, TimeUnit.MILLISECONDS ) ) + if ( tryLock( tryCount, unit ) ) { try { @@ -91,7 +91,7 @@ public void apply( W work ) } finally { - lock.unlock(); + unlock(); } } } @@ -111,6 +111,16 @@ public void apply( W work ) } } + private boolean tryLock( int tryCount, WorkUnit unit ) throws InterruptedException + { + return lock.tryLock( tryCount < 10? 0 : 10, TimeUnit.MILLISECONDS ); + } + + private void unlock() + { + lock.unlock(); + } + private void doSynchronizedWork() { WorkUnit batch = reverse( stack.getAndSet( stackEnd ) ); diff --git a/community/primitive-collections/src/test/java/org/neo4j/concurrent/WorkSyncTest.java b/community/primitive-collections/src/test/java/org/neo4j/concurrent/WorkSyncTest.java index 4de6d465baa36..c4b264670c0bb 100644 --- a/community/primitive-collections/src/test/java/org/neo4j/concurrent/WorkSyncTest.java +++ b/community/primitive-collections/src/test/java/org/neo4j/concurrent/WorkSyncTest.java @@ -21,12 +21,17 @@ import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -76,8 +81,8 @@ private class Adder { public void add( int delta ) { - sum.getAndAdd( delta ); - count.getAndIncrement(); + sum.add( delta ); + count.increment(); } } @@ -85,7 +90,7 @@ private class RunnableWork implements Runnable { private final AddWork addWork; - public RunnableWork( AddWork addWork ) + RunnableWork( AddWork addWork ) { this.addWork = addWork; } @@ -97,8 +102,34 @@ public void run() } } - private AtomicInteger sum = new AtomicInteger(); - private AtomicInteger count = new AtomicInteger(); + private static class UnsynchronisedAdder + { + // The volatile modifier prevents hoisting and reordering optimisations that could *hide* races + private volatile long value; + + public void add( long delta ) + { + long v = value; + // Make sure other threads have a chance to run and race with our update + Thread.yield(); + // Allow an up to ~50 micro-second window for racing and losing updates + usleep( ThreadLocalRandom.current().nextInt( 50 ) ); + value = v + delta; + } + + public void increment() + { + add( 1 ); + } + + public long sum() + { + return value; + } + } + + private UnsynchronisedAdder sum = new UnsynchronisedAdder(); + private UnsynchronisedAdder count = new UnsynchronisedAdder(); private Adder adder = new Adder(); private WorkSync sync = new WorkSync<>( adder ); @@ -106,10 +137,10 @@ public void run() public void mustApplyWork() throws Exception { sync.apply( new AddWork( 10 ) ); - assertThat( sum.get(), is( 10 ) ); + assertThat( sum.sum(), is( 10L ) ); sync.apply( new AddWork( 20 ) ); - assertThat( sum.get(), is( 30 ) ); + assertThat( sum.sum(), is( 30L ) ); } @Test @@ -123,7 +154,7 @@ public void mustCombineWork() throws Exception executor.shutdown(); assertTrue( executor.awaitTermination( 2, TimeUnit.SECONDS ) ); - assertThat( count.get(), lessThan( sum.get() ) ); + assertThat( count.sum(), lessThan( sum.sum() ) ); } @Test @@ -133,7 +164,7 @@ public void mustApplyWorkEvenWhenInterrupted() throws Exception sync.apply( new AddWork( 10 ) ); - assertThat( sum.get(), is( 10 ) ); + assertThat( sum.sum(), is( 10L ) ); assertTrue( Thread.interrupted() ); } @@ -170,7 +201,54 @@ public void add( int delta ) broken.set( false ); sync.apply( new AddWork( 20 ) ); - assertThat( sum.get(), is( 20 ) ); - assertThat( count.get(), is( 1 ) ); + assertThat( sum.sum(), is( 20L ) ); + assertThat( count.sum(), is( 1L ) ); + } + + @Test + public void mustNotApplyWorkInParallelUnderStress() throws Exception + { + int workers = Runtime.getRuntime().availableProcessors() * 5; + int iterations = 1_000; + int incrementValue = 42; + CountDownLatch startLatch = new CountDownLatch( workers ); + CountDownLatch endLatch = new CountDownLatch( workers ); + ExecutorService executor = Executors.newFixedThreadPool( workers ); + AtomicBoolean start = new AtomicBoolean(); + Callable work = () -> + { + startLatch.countDown(); + boolean spin; + do + { + spin = !start.get(); + } + while ( spin ); + + for ( int i = 0; i < iterations; i++ ) + { + sync.apply( new AddWork( incrementValue ) ); + } + + endLatch.countDown(); + return null; + }; + + List> futureList = new ArrayList<>(); + for ( int i = 0; i < workers; i++ ) + { + futureList.add( executor.submit( work ) ); + } + startLatch.await(); + start.set( true ); + endLatch.await(); + + for ( Future future : futureList ) + { + future.get(); // check for any exceptions + } + + assertThat( count.sum(), lessThan( (long) (workers * iterations) ) ); + assertThat( sum.sum(), is( (long) (incrementValue * workers * iterations) ) ); } }