From 894abf1a348142689bb8b418e4cf7ae88861c4e5 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 1 Nov 2016 09:43:32 +0100 Subject: [PATCH] Can capture multiple panics in DynamicTaskExecutor by adding them as suppressed to the first one. --- .../executor/DynamicTaskExecutor.java | 16 ++++++++--- .../batchimport/executor/TaskExecutor.java | 3 ++- .../executor/DynamicTaskExecutorTest.java | 27 +++++++++++++++++++ .../staging/TicketedProcessingTest.java | 2 +- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 0f7f6e8b03cc6..def7aa55d7f68 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.neo4j.function.Suppliers; @@ -48,7 +49,7 @@ public class DynamicTaskExecutor implements TaskExecutor @SuppressWarnings( "unchecked" ) private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 ); private volatile boolean shutDown; - private volatile Throwable panic; + private final AtomicReference panic = new AtomicReference<>(); private final Supplier initialLocalState; private final int maxProcessorCount; @@ -141,6 +142,7 @@ public void submit( Task task ) @Override public void assertHealthy() { + Throwable panic = this.panic.get(); if ( panic != null ) { throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); @@ -168,7 +170,7 @@ public synchronized void shutdown() } // Await all tasks to go into processing - while ( !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) + while ( !queue.isEmpty() && panic.get() == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } @@ -177,7 +179,7 @@ public synchronized void shutdown() { processor.processorShutDown = true; } - while ( anyAlive() && panic == null /*all bets are off in the event of panic*/ ) + while ( anyAlive() && panic.get() == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } @@ -187,7 +189,13 @@ public synchronized void shutdown() @Override public void panic( Throwable panic ) { - this.panic = panic; + if ( !this.panic.compareAndSet( null, panic ) ) + { + // there was already a panic set, add this new one as suppressed + this.panic.get().addSuppressed( panic ); + } + // else this was the first panic set + shutdown(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java index bd5cdd60156a2..348b8a886894f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java @@ -58,7 +58,8 @@ public interface TaskExecutor extends Parallelizable void panic( Throwable panic ); /** - * @return {@code true} if {@link #shutdown()} has been called, otherwise {@code false}. + * @return {@code true} if {@link #shutdown()} or {@link #panic(Throwable)} has been called, + * otherwise {@code false}. */ boolean isShutdown(); diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java index 4d02df148864e..d6dd9956fdf0b 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java @@ -316,6 +316,33 @@ public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Th // shutdown() would hang, that's why we wait for 10 seconds here to cap it if there's an issue. } + @Test + public void shouldAddConsecutivePanicsAsSuppressed() throws Exception + { + // GIVEN + TaskExecutor executor = new DynamicTaskExecutor<>( 1, 2, 2, PARK, "test" ); + String firstMessage = "First"; + String secondMessage = "Second"; + + // WHEN + executor.panic( new RuntimeException( firstMessage ) ); + executor.panic( new RuntimeException( secondMessage ) ); + + // THEN + try + { + executor.submit( new EmptyTask() ); + fail( "Should fail" ); + } + catch ( TaskExecutionPanicException e ) + { + Throwable first = e.getCause(); + assertEquals( firstMessage, first.getMessage() ); + assertEquals( 1, first.getSuppressed().length ); + assertEquals( secondMessage, first.getSuppressed()[0].getMessage() ); + } + } + private void assertExceptionOnSubmit( TaskExecutor executor, IOException exception ) { Exception submitException = null; diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java index 908044a843b36..f5e5f322fbaab 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java @@ -89,10 +89,10 @@ public Void doWork( Void state ) throws Exception processing.submit( i ); } processing.endOfSubmissions(); - processing.shutdown(); // THEN assertions.get(); + processing.shutdown(); } @Test