From 7252edb8c4eb6f2053582b709d613c8e24cebd91 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Wed, 31 Aug 2016 09:17:44 +0200 Subject: [PATCH] Fixes an issue in ForkedProcessorStep with spurious wake-ups potentially hanging the processing, also tests for it. Cleaned up volatile variables and clarifies where correct memory barriers are. --- .../staging/ForkedProcessorStep.java | 43 +++++++++++++------ .../staging/ForkedProcessorStepTest.java | 21 ++++++++- 2 files changed, 50 insertions(+), 14 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java index 09a1a79b6c1a3..4312df0286bca 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java @@ -40,6 +40,11 @@ */ public abstract class ForkedProcessorStep extends ProcessorStep { + // ID 0 is the id of a processor which is always present, no matter how many or few processors + // are assigned to process a batch. Therefore some tasks can be put on this processor, tasks + // which may affect the batches as a whole. + protected static final int MAIN = 0; + // used by forked processors to count down when they're done, so that main processing thread // knows when they're all done private final AtomicInteger doneSignal = new AtomicInteger(); @@ -48,15 +53,19 @@ public abstract class ForkedProcessorStep extends ProcessorStep // main processing thread communicates batch to process using this variable // it's not volatile, but piggy-backs on globalTicket for that private T currentBatch; - // this ticket helps coordinating with the forked processors - private long globalTicket; + // this ticket helps coordinating with the forked processors. It's checked by the forked processors + // and so acts as a useful memory barrier for other variables + private volatile long globalTicket; // processorCount can be changed asynchronically by calls to processors(int), although its // changes will only be applied between processing batches as to not interfere private volatile int processorCount = 1; - // forked processors can communicate errors via this variable + // forked processors can communicate errors via this variable. + // Doesn't need to be volatile - piggy-backs off of doneSignal access between submitter thread + // and the failing processor thread private Throwable error; - // forked processors can ping main process thread via this variable - private Thread ticketThread; + // represents the submitter thread which called process() method. Forked processor threads can + // ping/unpark submitter thread via this variable. Piggy-backs off of globalTicket/doneSignal. + private Thread submitterThread; protected ForkedProcessorStep( StageControl control, String name, Configuration config, int maxProcessors ) { @@ -79,15 +88,17 @@ protected void process( T batch, BatchSender sender ) throws Throwable { // Multiple processors, hand over the state to the processors and let them loose currentBatch = batch; - ticketThread = Thread.currentThread(); // so that forked processors can unpark - globalTicket++; - // ^^^ --- everything above this line will piggy-back on the volatility from the line below + submitterThread = Thread.currentThread(); // so that forked processors can unpark + // ^^^ --- everything above this line will piggy-back on the volatility from globalTicket doneSignal.set( processorCount ); + globalTicket++; notifyProcessors(); while ( doneSignal.get() > 0 ) { LockSupport.park(); } + // any write to "error" is now visible to us because of our check (and forked processor's write) + // to doneSignal if ( error != null ) { throw error; @@ -170,6 +181,7 @@ class ForkedProcessor extends Thread ForkedProcessor( int id ) { + super( name() + "-" + id ); this.id = id; this.localTicket = globalTicket; start(); @@ -180,15 +192,16 @@ public void run() { while ( !halted ) { + boolean processed = false; try { park(); if ( !halted && localTicket + 1 == globalTicket ) { - // ^^^ we just accessed volatile variable 'halted' and so the rest of the non-volatile - // variables will not be up to date for us + // ^^^ we just accessed volatile variable 'globalTicket' and so currentBatch and + // forkedProcessors will now be up to date for us + processed = true; forkedProcess( id, forkedProcessors.size(), currentBatch ); - localTicket++; } } catch ( Throwable t ) @@ -200,8 +213,12 @@ public void run() // ^^^ finish off with counting down doneSignal which serves two purposes: // - notifying the main submitter thread that we're done // - going through a volatile memory access to let our changes propagate - doneSignal.decrementAndGet(); - LockSupport.unpark( ticketThread ); + if ( processed ) + { + localTicket++; + doneSignal.decrementAndGet(); + LockSupport.unpark( submitterThread ); + } } } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java index 10a30eb0c89d0..3b2b22b361748 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java @@ -25,6 +25,8 @@ import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; + import org.neo4j.test.OtherThreadRule; import org.neo4j.test.RandomRule; import static org.junit.Assert.assertEquals; @@ -41,6 +43,8 @@ public class ForkedProcessorStepTest @Rule public final OtherThreadRule t2 = new OtherThreadRule<>(); @Rule + public final OtherThreadRule t3 = new OtherThreadRule<>(); + @Rule public final RandomRule random = new RandomRule(); @Test @@ -129,7 +133,7 @@ public void shouldNotMissABeatUnderStress() throws Exception // GIVEN SimpleStageControl control = new SimpleStageControl(); int maxProcessorCount = 10; - try ( Step step = new ForkedProcessorStep( control, "Test", DEFAULT, maxProcessorCount ) + try ( Step step = new ForkedProcessorStep( control, "Stress", DEFAULT, maxProcessorCount ) { private boolean[] seen = new boolean[maxProcessorCount]; @@ -166,6 +170,21 @@ protected void process( Object batch, BatchSender sender ) throws Throwable } return null; } ); + // Thread doing unpark on all the processor threads, just to verify that it can handle sprious wakeups + t3.execute( ignore -> + { + while ( !step.isCompleted() ) + { + for ( Thread thread : Thread.getAllStackTraces().keySet() ) + { + if ( thread.getName().contains( "Stress-" ) ) + { + LockSupport.unpark( thread ); + } + } + } + return null; + } ); // WHEN long endTime = currentTimeMillis() + SECONDS.toMillis( 1 );