Skip to content

Commit

Permalink
More aggressive multithreaded testing of WorkSync
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisvest committed Sep 22, 2016
1 parent 3a2c377 commit 2aa0166
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 14 deletions.
Expand Up @@ -83,15 +83,15 @@ public void apply( W work )
tryCount++;
try
{
if ( lock.tryLock( tryCount < 10? 0 : 10, TimeUnit.MILLISECONDS ) )
if ( tryLock( tryCount, unit ) )
{
try
{
doSynchronizedWork();
}
finally
{
lock.unlock();
unlock();
}
}
}
Expand All @@ -111,6 +111,16 @@ public void apply( W work )
}
}

private boolean tryLock( int tryCount, WorkUnit<Material,W> unit ) throws InterruptedException
{
return lock.tryLock( tryCount < 10? 0 : 10, TimeUnit.MILLISECONDS );
}

private void unlock()
{
lock.unlock();
}

private void doSynchronizedWork()
{
WorkUnit<Material,W> batch = reverse( stack.getAndSet( stackEnd ) );
Expand Down
Expand Up @@ -21,12 +21,17 @@

import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -76,16 +81,16 @@ private class Adder
{
public void add( int delta )
{
sum.getAndAdd( delta );
count.getAndIncrement();
sum.add( delta );
count.increment();
}
}

private class RunnableWork implements Runnable
{
private final AddWork addWork;

public RunnableWork( AddWork addWork )
RunnableWork( AddWork addWork )
{
this.addWork = addWork;
}
Expand All @@ -97,19 +102,45 @@ public void run()
}
}

private AtomicInteger sum = new AtomicInteger();
private AtomicInteger count = new AtomicInteger();
private static class UnsynchronisedAdder
{
// The volatile modifier prevents hoisting and reordering optimisations that could *hide* races
private volatile long value;

public void add( long delta )
{
long v = value;
// Make sure other threads have a chance to run and race with our update
Thread.yield();
// Allow an up to ~50 micro-second window for racing and losing updates
usleep( ThreadLocalRandom.current().nextInt( 50 ) );
value = v + delta;
}

public void increment()
{
add( 1 );
}

public long sum()
{
return value;
}
}

private UnsynchronisedAdder sum = new UnsynchronisedAdder();
private UnsynchronisedAdder count = new UnsynchronisedAdder();
private Adder adder = new Adder();
private WorkSync<Adder,AddWork> sync = new WorkSync<>( adder );

@Test
public void mustApplyWork() throws Exception
{
sync.apply( new AddWork( 10 ) );
assertThat( sum.get(), is( 10 ) );
assertThat( sum.sum(), is( 10L ) );

sync.apply( new AddWork( 20 ) );
assertThat( sum.get(), is( 30 ) );
assertThat( sum.sum(), is( 30L ) );
}

@Test
Expand All @@ -123,7 +154,7 @@ public void mustCombineWork() throws Exception
executor.shutdown();
assertTrue( executor.awaitTermination( 2, TimeUnit.SECONDS ) );

assertThat( count.get(), lessThan( sum.get() ) );
assertThat( count.sum(), lessThan( sum.sum() ) );
}

@Test
Expand All @@ -133,7 +164,7 @@ public void mustApplyWorkEvenWhenInterrupted() throws Exception

sync.apply( new AddWork( 10 ) );

assertThat( sum.get(), is( 10 ) );
assertThat( sum.sum(), is( 10L ) );
assertTrue( Thread.interrupted() );
}

Expand Down Expand Up @@ -170,7 +201,54 @@ public void add( int delta )
broken.set( false );
sync.apply( new AddWork( 20 ) );

assertThat( sum.get(), is( 20 ) );
assertThat( count.get(), is( 1 ) );
assertThat( sum.sum(), is( 20L ) );
assertThat( count.sum(), is( 1L ) );
}

@Test
public void mustNotApplyWorkInParallelUnderStress() throws Exception
{
int workers = Runtime.getRuntime().availableProcessors() * 5;
int iterations = 1_000;
int incrementValue = 42;
CountDownLatch startLatch = new CountDownLatch( workers );
CountDownLatch endLatch = new CountDownLatch( workers );
ExecutorService executor = Executors.newFixedThreadPool( workers );
AtomicBoolean start = new AtomicBoolean();
Callable<Void> work = () ->
{
startLatch.countDown();
boolean spin;
do
{
spin = !start.get();
}
while ( spin );

for ( int i = 0; i < iterations; i++ )
{
sync.apply( new AddWork( incrementValue ) );
}

endLatch.countDown();
return null;
};

List<Future<Void>> futureList = new ArrayList<>();
for ( int i = 0; i < workers; i++ )
{
futureList.add( executor.submit( work ) );
}
startLatch.await();
start.set( true );
endLatch.await();

for ( Future<Void> future : futureList )
{
future.get(); // check for any exceptions
}

assertThat( count.sum(), lessThan( (long) (workers * iterations) ) );
assertThat( sum.sum(), is( (long) (incrementValue * workers * iterations) ) );
}
}

0 comments on commit 2aa0166

Please sign in to comment.