Skip to content

Commit

Permalink
Fixes an issue in ForkedProcessorStep
Browse files Browse the repository at this point in the history
with spurious wake-ups potentially hanging the processing, also tests for it.
Cleaned up volatile variables and clarifies where correct memory barriers are.
  • Loading branch information
tinwelint committed Aug 31, 2016
1 parent a26b055 commit 7252edb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 14 deletions.
Expand Up @@ -40,6 +40,11 @@
*/
public abstract class ForkedProcessorStep<T> extends ProcessorStep<T>
{
// ID 0 is the id of a processor which is always present, no matter how many or few processors
// are assigned to process a batch. Therefore some tasks can be put on this processor, tasks
// which may affect the batches as a whole.
protected static final int MAIN = 0;

// used by forked processors to count down when they're done, so that main processing thread
// knows when they're all done
private final AtomicInteger doneSignal = new AtomicInteger();
Expand All @@ -48,15 +53,19 @@ public abstract class ForkedProcessorStep<T> extends ProcessorStep<T>
// main processing thread communicates batch to process using this variable
// it's not volatile, but piggy-backs on globalTicket for that
private T currentBatch;
// this ticket helps coordinating with the forked processors
private long globalTicket;
// this ticket helps coordinating with the forked processors. It's checked by the forked processors
// and so acts as a useful memory barrier for other variables
private volatile long globalTicket;
// processorCount can be changed asynchronically by calls to processors(int), although its
// changes will only be applied between processing batches as to not interfere
private volatile int processorCount = 1;
// forked processors can communicate errors via this variable
// forked processors can communicate errors via this variable.
// Doesn't need to be volatile - piggy-backs off of doneSignal access between submitter thread
// and the failing processor thread
private Throwable error;
// forked processors can ping main process thread via this variable
private Thread ticketThread;
// represents the submitter thread which called process() method. Forked processor threads can
// ping/unpark submitter thread via this variable. Piggy-backs off of globalTicket/doneSignal.
private Thread submitterThread;

protected ForkedProcessorStep( StageControl control, String name, Configuration config, int maxProcessors )
{
Expand All @@ -79,15 +88,17 @@ protected void process( T batch, BatchSender sender ) throws Throwable
{
// Multiple processors, hand over the state to the processors and let them loose
currentBatch = batch;
ticketThread = Thread.currentThread(); // so that forked processors can unpark
globalTicket++;
// ^^^ --- everything above this line will piggy-back on the volatility from the line below
submitterThread = Thread.currentThread(); // so that forked processors can unpark
// ^^^ --- everything above this line will piggy-back on the volatility from globalTicket
doneSignal.set( processorCount );
globalTicket++;
notifyProcessors();
while ( doneSignal.get() > 0 )
{
LockSupport.park();
}
// any write to "error" is now visible to us because of our check (and forked processor's write)
// to doneSignal
if ( error != null )
{
throw error;
Expand Down Expand Up @@ -170,6 +181,7 @@ class ForkedProcessor extends Thread

ForkedProcessor( int id )
{
super( name() + "-" + id );
this.id = id;
this.localTicket = globalTicket;
start();
Expand All @@ -180,15 +192,16 @@ public void run()
{
while ( !halted )
{
boolean processed = false;
try
{
park();
if ( !halted && localTicket + 1 == globalTicket )
{
// ^^^ we just accessed volatile variable 'halted' and so the rest of the non-volatile
// variables will not be up to date for us
// ^^^ we just accessed volatile variable 'globalTicket' and so currentBatch and
// forkedProcessors will now be up to date for us
processed = true;
forkedProcess( id, forkedProcessors.size(), currentBatch );
localTicket++;
}
}
catch ( Throwable t )
Expand All @@ -200,8 +213,12 @@ public void run()
// ^^^ finish off with counting down doneSignal which serves two purposes:
// - notifying the main submitter thread that we're done
// - going through a volatile memory access to let our changes propagate
doneSignal.decrementAndGet();
LockSupport.unpark( ticketThread );
if ( processed )
{
localTicket++;
doneSignal.decrementAndGet();
LockSupport.unpark( submitterThread );
}
}
}
}
Expand Down
Expand Up @@ -25,6 +25,8 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import org.neo4j.test.OtherThreadRule;
import org.neo4j.test.RandomRule;
import static org.junit.Assert.assertEquals;
Expand All @@ -41,6 +43,8 @@ public class ForkedProcessorStepTest
@Rule
public final OtherThreadRule<Void> t2 = new OtherThreadRule<>();
@Rule
public final OtherThreadRule<Void> t3 = new OtherThreadRule<>();
@Rule
public final RandomRule random = new RandomRule();

@Test
Expand Down Expand Up @@ -129,7 +133,7 @@ public void shouldNotMissABeatUnderStress() throws Exception
// GIVEN
SimpleStageControl control = new SimpleStageControl();
int maxProcessorCount = 10;
try ( Step<Object> step = new ForkedProcessorStep<Object>( control, "Test", DEFAULT, maxProcessorCount )
try ( Step<Object> step = new ForkedProcessorStep<Object>( control, "Stress", DEFAULT, maxProcessorCount )
{
private boolean[] seen = new boolean[maxProcessorCount];

Expand Down Expand Up @@ -166,6 +170,21 @@ protected void process( Object batch, BatchSender sender ) throws Throwable
}
return null;
} );
// Thread doing unpark on all the processor threads, just to verify that it can handle sprious wakeups
t3.execute( ignore ->
{
while ( !step.isCompleted() )
{
for ( Thread thread : Thread.getAllStackTraces().keySet() )
{
if ( thread.getName().contains( "Stress-" ) )
{
LockSupport.unpark( thread );
}
}
}
return null;
} );

// WHEN
long endTime = currentTimeMillis() + SECONDS.toMillis( 1 );
Expand Down

0 comments on commit 7252edb

Please sign in to comment.