Skip to content

Commit

Permalink
Can capture multiple panics in DynamicTaskExecutor
Browse files Browse the repository at this point in the history
by adding them as suppressed to the first one.
  • Loading branch information
tinwelint committed Nov 7, 2016
1 parent a0f7f47 commit 894abf1
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 6 deletions.
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.function.Suppliers; import org.neo4j.function.Suppliers;
Expand All @@ -48,7 +49,7 @@ public class DynamicTaskExecutor<LOCAL> implements TaskExecutor<LOCAL>
@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 ); private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 );
private volatile boolean shutDown; private volatile boolean shutDown;
private volatile Throwable panic; private final AtomicReference<Throwable> panic = new AtomicReference<>();
private final Supplier<LOCAL> initialLocalState; private final Supplier<LOCAL> initialLocalState;
private final int maxProcessorCount; private final int maxProcessorCount;


Expand Down Expand Up @@ -141,6 +142,7 @@ public void submit( Task<LOCAL> task )
@Override @Override
public void assertHealthy() public void assertHealthy()
{ {
Throwable panic = this.panic.get();
if ( panic != null ) if ( panic != null )
{ {
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
Expand Down Expand Up @@ -168,7 +170,7 @@ public synchronized void shutdown()
} }


// Await all tasks to go into processing // Await all tasks to go into processing
while ( !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) while ( !queue.isEmpty() && panic.get() == null /*all bets are off in the event of panic*/ )
{ {
parkAWhile(); parkAWhile();
} }
Expand All @@ -177,7 +179,7 @@ public synchronized void shutdown()
{ {
processor.processorShutDown = true; processor.processorShutDown = true;
} }
while ( anyAlive() && panic == null /*all bets are off in the event of panic*/ ) while ( anyAlive() && panic.get() == null /*all bets are off in the event of panic*/ )
{ {
parkAWhile(); parkAWhile();
} }
Expand All @@ -187,7 +189,13 @@ public synchronized void shutdown()
@Override @Override
public void panic( Throwable panic ) public void panic( Throwable panic )
{ {
this.panic = panic; if ( !this.panic.compareAndSet( null, panic ) )
{
// there was already a panic set, add this new one as suppressed
this.panic.get().addSuppressed( panic );
}
// else this was the first panic set

shutdown(); shutdown();
} }


Expand Down
Expand Up @@ -58,7 +58,8 @@ public interface TaskExecutor<LOCAL> extends Parallelizable
void panic( Throwable panic ); void panic( Throwable panic );


/** /**
* @return {@code true} if {@link #shutdown()} has been called, otherwise {@code false}. * @return {@code true} if {@link #shutdown()} or {@link #panic(Throwable)} has been called,
* otherwise {@code false}.
*/ */
boolean isShutdown(); boolean isShutdown();


Expand Down
Expand Up @@ -316,6 +316,33 @@ public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Th
// shutdown() would hang, that's why we wait for 10 seconds here to cap it if there's an issue. // shutdown() would hang, that's why we wait for 10 seconds here to cap it if there's an issue.
} }


@Test
public void shouldAddConsecutivePanicsAsSuppressed() throws Exception
{
// GIVEN
TaskExecutor<Void> executor = new DynamicTaskExecutor<>( 1, 2, 2, PARK, "test" );
String firstMessage = "First";
String secondMessage = "Second";

// WHEN
executor.panic( new RuntimeException( firstMessage ) );
executor.panic( new RuntimeException( secondMessage ) );

// THEN
try
{
executor.submit( new EmptyTask() );
fail( "Should fail" );
}
catch ( TaskExecutionPanicException e )
{
Throwable first = e.getCause();
assertEquals( firstMessage, first.getMessage() );
assertEquals( 1, first.getSuppressed().length );
assertEquals( secondMessage, first.getSuppressed()[0].getMessage() );
}
}

private void assertExceptionOnSubmit( TaskExecutor<Void> executor, IOException exception ) private void assertExceptionOnSubmit( TaskExecutor<Void> executor, IOException exception )
{ {
Exception submitException = null; Exception submitException = null;
Expand Down
Expand Up @@ -89,10 +89,10 @@ public Void doWork( Void state ) throws Exception
processing.submit( i ); processing.submit( i );
} }
processing.endOfSubmissions(); processing.endOfSubmissions();
processing.shutdown();


// THEN // THEN
assertions.get(); assertions.get();
processing.shutdown();
} }


@Test @Test
Expand Down

0 comments on commit 894abf1

Please sign in to comment.