Skip to content

Commit

Permalink
Fixes a race between internal panic and shutdown in DynamicTaskExecutor
Browse files Browse the repository at this point in the history
Race could be observed as a call to DynamicTaskExecutor#shutdown(true)
hanging forever occasionally if there was a concurrent, already submitted
task, failing where the failing task would also like to shut down. This
would be a problem if there was only one processor currently, or rather
if all current processors failed at the same time. This would prevent
the original exception from propagating out to user.

Added a check in shutdown for a panic, which now can only be set by a
processor observing a failing task. The shutdown call would then exit
prematurely. For what TaskExecutor is used for, in import tool, this
behaviour is fine because the original exception will be thrown anyway by
means of the StageControl "panic propagation". To further alleviate this
problem ExecutorServiceStep was changed to pass in false (i.e. don't wait
for tasks to complete) in the event of panic.
  • Loading branch information
tinwelint committed Mar 17, 2015
1 parent 6071538 commit 11363b0
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 19 deletions.
Expand Up @@ -42,7 +42,7 @@ public class DynamicTaskExecutor implements TaskExecutor
private final String processorThreadNamePrefix; private final String processorThreadNamePrefix;
private volatile Processor[] processors = new Processor[0]; private volatile Processor[] processors = new Processor[0];
private volatile boolean shutDown; private volatile boolean shutDown;
private volatile Throwable shutDownCause; private volatile Throwable panic;


public DynamicTaskExecutor( int initialProcessorCount, int maxQueueSize, ParkStrategy parkStrategy, public DynamicTaskExecutor( int initialProcessorCount, int maxQueueSize, ParkStrategy parkStrategy,
String processorThreadNamePrefix ) String processorThreadNamePrefix )
Expand Down Expand Up @@ -125,8 +125,8 @@ public void assertHealthy()
if ( shutDown ) if ( shutDown )
{ {
String message = "Executor has been shut down"; String message = "Executor has been shut down";
throw shutDownCause != null throw panic != null
? new IllegalStateException( message, shutDownCause ) ? new IllegalStateException( message, panic )
: new IllegalStateException( message ); : new IllegalStateException( message );
} }
} }
Expand All @@ -140,29 +140,23 @@ private void notifyProcessors()
} }


@Override @Override
public void shutdown( boolean awaitAllCompleted ) public synchronized void shutdown( boolean awaitAllCompleted )
{
shutdown0( awaitAllCompleted, null );
}

private synchronized void shutdown0( boolean awaitAllCompleted, Throwable cause )
{ {
if ( shutDown ) if ( shutDown )
{ {
return; return;
} }


this.shutDownCause = cause;
this.shutDown = true; this.shutDown = true;
while ( awaitAllCompleted && !queue.isEmpty() ) while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ )
{ {
parkAWhile(); parkAWhile();
} }
for ( Processor processor : processors ) for ( Processor processor : processors )
{ {
processor.shutDown = true; processor.shutDown = true;
} }
while ( awaitAllCompleted && anyAlive() ) while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ )
{ {
parkAWhile(); parkAWhile();
} }
Expand Down Expand Up @@ -218,8 +212,8 @@ public void run()
} }
catch ( Throwable e ) catch ( Throwable e )
{ {
// TODO too defensive to shut down? panic = e;
shutdown0( false, e ); shutdown( false );
throw launderedException( e ); throw launderedException( e );
} }
} }
Expand Down
Expand Up @@ -46,7 +46,7 @@ public abstract class AbstractStep<T> implements Step<T>
@SuppressWarnings( "rawtypes" ) @SuppressWarnings( "rawtypes" )
private volatile Step downstream; private volatile Step downstream;
private volatile boolean endOfUpstream; private volatile boolean endOfUpstream;
private volatile Throwable panic; protected volatile Throwable panic;
private volatile boolean completed; private volatile boolean completed;
protected boolean orderedTickets; protected boolean orderedTickets;
protected final PrimitiveLongPredicate rightTicket = new PrimitiveLongPredicate() protected final PrimitiveLongPredicate rightTicket = new PrimitiveLongPredicate()
Expand Down
Expand Up @@ -148,7 +148,7 @@ private void receivedBatch()
public void close() public void close()
{ {
super.close(); super.close();
executor.shutdown( true ); executor.shutdown( panic == null );
} }


@Override @Override
Expand Down
Expand Up @@ -287,6 +287,18 @@ public String toString()
} }
return builder.toString(); return builder.toString();
} }

public boolean isAt( Class<?> clz, String method )
{
for ( StackTraceElement element : stackTrace )
{
if ( element.getClassName().equals( clz.getName() ) && element.getMethodName().equals( method ) )
{
return true;
}
}
return false;
}
} }


public Thread.State state() public Thread.State state()
Expand Down
Expand Up @@ -23,11 +23,14 @@


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future;


import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.impl.transaction.log.ParkStrategy; import org.neo4j.kernel.impl.transaction.log.ParkStrategy;
import org.neo4j.test.Barrier;
import org.neo4j.test.DoubleLatch; import org.neo4j.test.DoubleLatch;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.OtherThreadExecutor.WorkerCommand;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -154,6 +157,7 @@ public void shouldShutDownOnTaskFailure() throws Exception
FailingTask task = new FailingTask( exception ); FailingTask task = new FailingTask( exception );
executor.submit( task ); executor.submit( task );
task.latch.await(); task.latch.await();
task.latch.release();


// THEN // THEN
assertExceptionOnSubmit( executor, exception ); assertExceptionOnSubmit( executor, exception );
Expand Down Expand Up @@ -181,6 +185,7 @@ public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Excep
// WHEN // WHEN
firstBlockingTask.latch.finish(); firstBlockingTask.latch.finish();
failingTask.latch.await(); failingTask.latch.await();
failingTask.latch.release();


// THEN // THEN
assertExceptionOnSubmit( executor, exception ); assertExceptionOnSubmit( executor, exception );
Expand All @@ -198,6 +203,7 @@ public void shouldSurfaceTaskErrorInAssertHealthy() throws Exception
FailingTask failingTask = new FailingTask( exception ); FailingTask failingTask = new FailingTask( exception );
executor.submit( failingTask ); executor.submit( failingTask );
failingTask.latch.await(); failingTask.latch.await();
failingTask.latch.release();


// WHEN // WHEN
for ( int i = 0; i < 5; i++ ) for ( int i = 0; i < 5; i++ )
Expand All @@ -217,6 +223,45 @@ public void shouldSurfaceTaskErrorInAssertHealthy() throws Exception
fail( "Should not be considered healthy after failing task" ); fail( "Should not be considered healthy after failing task" );
} }


@Test
public void shouldLetShutdownCompleteInEventOfPanic() throws Exception
{
// GIVEN
final TaskExecutor executor = new DynamicTaskExecutor( 2, 10, new ParkStrategy.Park( 1 ), getClass().getSimpleName() );
IOException exception = new IOException( "Failure" );

// WHEN
FailingTask failingTask = new FailingTask( exception );
executor.submit( failingTask );
failingTask.latch.await();

// WHEN
try ( OtherThreadExecutor<Void> closer = new OtherThreadExecutor<>( "closer", null ) )
{
Future<Void> shutdown = closer.executeDontWait( new WorkerCommand<Void,Void>()
{
@Override
public Void doWork( Void state ) throws Exception
{
executor.shutdown( true );
return null;
}
} );
while ( !closer.waitUntilWaiting().isAt( DynamicTaskExecutor.class, "shutdown" ) )
{
Thread.sleep( 10 );
}

// Here we've got a shutdown call stuck awaiting queue to be empty (since true was passed in)
// at the same time we've got a FailingTask ready to throw its exception and another task
// sitting in the queue after it. Now make the task throw that exception.
failingTask.latch.release();

// Some time after throwing this, the shutdown request should have been completed.
shutdown.get();
}
}

private void assertExceptionOnSubmit( TaskExecutor executor, IOException exception ) private void assertExceptionOnSubmit( TaskExecutor executor, IOException exception )
{ {
Exception submitException = null; Exception submitException = null;
Expand Down Expand Up @@ -260,7 +305,7 @@ public Void call() throws Exception
private static class FailingTask implements Callable<Void> private static class FailingTask implements Callable<Void>
{ {
private final Exception exception; private final Exception exception;
private final CountDownLatch latch = new CountDownLatch( 1 ); private final Barrier.Control latch = new Barrier.Control();


public FailingTask( Exception exception ) public FailingTask( Exception exception )
{ {
Expand All @@ -276,7 +321,7 @@ public Void call() throws Exception
} }
finally finally
{ {
latch.countDown(); latch.reached();
} }
} }
} }
Expand Down

0 comments on commit 11363b0

Please sign in to comment.