From bce60489937cf0d27c3d11a4fc4715721dada0b5 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Sun, 30 Oct 2016 22:27:35 +0100 Subject: [PATCH] Simplified TaskExecutor/TicketedProcessing shutdown by removing shutdown flags and having an explicit panic(Throwable) method besides shutdown(). TicktedProcessing separates shutdown(flags) into shutdown() and endOfSubmissions() instead of conflating the two and using the previously available flags to work around and distinguish the two cases. This should have no observable functional difference looking at the current usages of TaskExecutor/TicketedProcessing, only that the code related to shutdown should be easier to reason about. --- .../neo4j/tooling/EntityDataGenerator.java | 2 +- .../executor/DynamicTaskExecutor.java | 46 +++++++-------- .../batchimport/executor/TaskExecutor.java | 25 +++++---- .../csv/ParallelInputEntityDeserializer.java | 8 +-- .../batchimport/staging/ProcessorStep.java | 13 ++++- .../staging/TicketedProcessing.java | 56 +++++++++---------- .../executor/DynamicTaskExecutorTest.java | 19 +++---- .../ParallelInputEntityDeserializerTest.java | 3 +- .../staging/TicketedProcessingTest.java | 7 +-- 9 files changed, 90 insertions(+), 89 deletions(-) diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java index da83720927eb6..da079055eda4e 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java @@ -84,7 +84,7 @@ public long position() public void close() { super.close(); - processing.shutdown( 0 ); + processing.shutdown(); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 0f503613e9604..0f7f6e8b03cc6 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -48,7 +48,6 @@ public class DynamicTaskExecutor implements TaskExecutor @SuppressWarnings( "unchecked" ) private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 ); private volatile boolean shutDown; - private volatile boolean abortQueued; private volatile Throwable panic; private final Supplier initialLocalState; private final int maxProcessorCount; @@ -142,16 +141,13 @@ public void submit( Task task ) @Override public void assertHealthy() { + if ( panic != null ) + { + throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); + } if ( shutDown ) { - if ( panic != null ) - { - throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); - } - if ( abortQueued ) - { - throw new TaskExecutionPanicException( "Executor has been shut down, aborting queued" ); - } + throw new IllegalStateException( "Executor has been shut down" ); } } @@ -164,30 +160,35 @@ private void notifyProcessors() } @Override - public synchronized void shutdown( int flags ) + public synchronized void shutdown() { if ( shutDown ) { - // We're already shut down, although take the ABORT_QUEUED flag seriously as to abort all - // processors still working on tasks. This looks like a panic. - if ( (flags & TaskExecutor.SF_ABORT_QUEUED) != 0 ) - { - this.abortQueued = true; - } return; } - boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0; - while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) + // Await all tasks to go into processing + while ( !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } - this.shutDown = true; - this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0; - while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ ) + // Await all processing tasks to be completed + for ( Processor processor : processors ) + { + processor.processorShutDown = true; + } + while ( anyAlive() && panic == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } + this.shutDown = true; + } + + @Override + public void panic( Throwable panic ) + { + this.panic = panic; + shutdown(); } @Override @@ -250,8 +251,7 @@ public void run() } catch ( Throwable e ) { - panic = e; - shutdown( TaskExecutor.SF_ABORT_QUEUED ); + panic( e ); throw launderedException( e ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java index fcbacf79362b2..bd5cdd60156a2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java @@ -32,9 +32,6 @@ */ public interface TaskExecutor extends Parallelizable { - int SF_AWAIT_ALL_COMPLETED = 0x1; - int SF_ABORT_QUEUED = 0x2; - /** * Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be * executed in the order of which they arrive. @@ -46,14 +43,22 @@ public interface TaskExecutor extends Parallelizable /** * Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}. * - * @param flags {@link #SF_AWAIT_ALL_COMPLETED} will wait for all queued or already executing tasks to be - * executed and completed, before returning from this method. {@link #SF_ABORT_QUEUED} will have - * submitted tasks which haven't started executing yet cancelled, never to be executed. + * submitted tasks will be processed before returning from this method. + */ + void shutdown(); + + /** + * Puts this executor into panic mode. Call to {@link #shutdown()} has no effect after a panic. + * Call to {@link #assertHealthy()} will communicate the panic as well. + * This semantically includes something like a shutdown, but submitted tasks which haven't started + * being processed will be aborted and currently processing tasks will not be awaited for completion. + * + * @param panic cause of panic. */ - void shutdown( int flags ); + void panic( Throwable panic ); /** - * @return {@code true} if {@link #shutdown(int)} has been called, otherwise {@code false}. + * @return {@code true} if {@link #shutdown()} has been called, otherwise {@code false}. */ boolean isShutdown(); @@ -61,8 +66,8 @@ public interface TaskExecutor extends Parallelizable * Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition * this executor is expected to fulfill. * - * @throws RuntimeException of some sort if this executor is in a bad stage, the original error that - * made this executor fail. + * @throws RuntimeException of some sort if this executor is in a bad state, containing the original error, + * if any, that made this executor fail. */ void assertHealthy(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java index cb531747641be..0523160bba170 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java @@ -38,7 +38,6 @@ import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor; import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputException; import org.neo4j.unsafe.impl.batchimport.input.InputNode; @@ -47,6 +46,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; import static org.neo4j.csv.reader.Source.singleChunk; +import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.kernel.impl.util.Validators.emptyValidator; import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator; @@ -263,11 +263,7 @@ protected CharSeeker fetchNextOrNull() @Override public void close() { - // At this point normally the "slurp" above will have called shutdown and so calling close - // before that has completed signals some sort of panic. Encode this knowledge in flags passed - // to shutdown - int flags = processingCompletion.isDone() ? 0 : TaskExecutor.SF_ABORT_QUEUED; - processing.shutdown( flags ); + processing.shutdown(); try { source.close(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index ee67e16e6bfb5..358e5bc77bae3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -19,6 +19,7 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; +import java.nio.channels.ShutdownChannelGroupException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongPredicate; @@ -31,9 +32,8 @@ import static java.lang.System.nanoTime; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED; import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await; /** @@ -157,7 +157,14 @@ private void incrementQueue() public void close() throws Exception { super.close(); - executor.shutdown( panic == null ? SF_AWAIT_ALL_COMPLETED : SF_ABORT_QUEUED ); + if ( panic == null ) + { + executor.shutdown(); + } + else + { + executor.panic( panic ); + } } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java index 16387d0fa2e97..fd649eeda181d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java @@ -30,9 +30,7 @@ import org.neo4j.unsafe.impl.batchimport.Parallelizable; import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; - import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.helpers.FutureAdapter.future; @@ -92,7 +90,6 @@ public boolean test( long ticket ) }; private final Runnable healthCheck; private volatile boolean done; - private volatile Throwable slurpFailure; public TicketedProcessing( String name, int maxProcessors, BiFunction processor, Supplier threadLocalStateSupplier ) @@ -100,14 +97,7 @@ public TicketedProcessing( String name, int maxProcessors, BiFunction( 1, maxProcessors, maxProcessors, park, name, threadLocalStateSupplier ); - this.healthCheck = () -> - { - if ( slurpFailure != null ) - { - throw new TaskExecutionPanicException( "Failure adding jobs for processing", slurpFailure ); - } - executor.assertHealthy(); - }; + this.healthCheck = executor::assertHealthy; this.processed = new ArrayBlockingQueue<>( maxProcessors ); } @@ -150,18 +140,32 @@ public void submit( FROM job ) } ); } + /** + * Should be called after all calls to {@link #submit(Object)} and {@link #slurp(Iterator, boolean)} + * have been made. This is due to the async nature of this class where submitted jobs gets processed + * and put into result queue for pickup in {@link #next()}, which will not know when to return + * {@code null} (marking the end) otherwise. Then after all processed results have been retrieved + * a call to {@link #shutdown()} should be made. + */ + public void endOfSubmissions() + { + done = true; + } + /** * Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will * each be processed and asynchronically made available in order of processing ticket by later calling * {@link #next()}. * + * Failure reading from {@code input} or otherwise submitting jobs will fail the processing after that point. + * * @param input {@link Iterator} of input to process. - * @param shutdownAfterAllSubmitted will call {@link #shutdown(int)} after all jobs submitted if {@code true}. + * @param lastSubmissions will call {@link #endOfSubmissions()} after all jobs submitted if {@code true}. * @return {@link Future} representing the work of submitting the inputs to be processed. When the future * is completed all items from the {@code input} {@link Iterator} have been submitted, but some items * may still be queued and processed. */ - public Future slurp( Iterator input, boolean shutdownAfterAllSubmitted ) + public Future slurp( Iterator input, boolean lastSubmissions ) { return future( () -> { @@ -171,38 +175,34 @@ public Future slurp( Iterator input, boolean shutdownAfterAllSubmitt { submit( input.next() ); } - if ( shutdownAfterAllSubmitted ) + if ( lastSubmissions ) { - shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED ); + endOfSubmissions(); } return null; } catch ( Throwable e ) { - slurpFailure = e; - shutdown( TaskExecutor.SF_ABORT_QUEUED ); + executor.panic( e ); throw e; } } ); } /** - * Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking, - * waiting for new processed results. - * - * @param flags see {@link TaskExecutor} + * Shuts down processing. Normally this method gets called after an arbitrary amount of + * {@link #submit(Object)} / {@link #slurp(Iterator, boolean)} concluded with {@link #endOfSubmissions()} + * and after all processed results have been {@link #next() retrieved}. + * Shutdown can be called before everything has been processed in which case it causes jobs to be aborted. */ - public void shutdown( int flags ) + public void shutdown() { - // This also marks the end of input - done = true; - - executor.shutdown( flags ); + executor.shutdown(); } /** - * @return next processed job (blocking call), or {@code null} if all jobs have been processed - * and {@link #shutdown(int)} has been called. + * @return next processed job (blocking call). If all submitted jobs have been processed and returned + * and {@link #endOfSubmissions()} have been called {@code null} will be returned. */ public TO next() { diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java index b9bd26df00928..4d02df148864e 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java @@ -33,7 +33,6 @@ import org.neo4j.test.RepeatRule; import org.neo4j.test.RepeatRule.Repeat; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy.Park; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -42,9 +41,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED; - public class DynamicTaskExecutorTest { private static final Park PARK = new ParkStrategy.Park( 1, MILLISECONDS ); @@ -72,7 +68,7 @@ public void shouldExecuteTasksInParallel() throws Exception while ( task1.executed == 0 ) { // Busy loop } - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN assertEquals( 1, task1.executed ); @@ -100,7 +96,7 @@ public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception while ( task1.executed == 0 ) { // Busy loop } - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN assertEquals( 1, task1.executed ); @@ -132,7 +128,7 @@ public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception Thread.sleep( 200 ); // gosh, a Thread.sleep... assertEquals( 0, task4.executed ); task3.latch.finish(); - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN assertEquals( 1, task1.executed ); @@ -154,7 +150,7 @@ public void shouldExecuteMultipleTasks() throws Exception { executor.submit( tasks[i] = new ExpensiveTask( 10 ) ); } - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN for ( ExpensiveTask task : tasks ) @@ -208,7 +204,6 @@ public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Excep // THEN assertExceptionOnSubmit( executor, exception ); - executor.shutdown( SF_ABORT_QUEUED ); // call would block if the shutdown as part of failure doesn't complete properly } @Test @@ -264,7 +259,7 @@ public void shouldLetShutdownCompleteInEventOfPanic() throws Exception @Override public Void doWork( Void state ) throws Exception { - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); return null; } } ); @@ -301,7 +296,7 @@ public void shouldRespectMaxProcessors() throws Exception assertEquals( 1, executor.processors( -2 ) ); assertEquals( 1, executor.processors( -2 ) ); assertEquals( 1, executor.processors( 0 ) ); - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); } @Repeat( times = 100 ) @@ -311,7 +306,7 @@ public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Th // GIVEN TaskExecutor executor = new DynamicTaskExecutor<>( 1, 2, 2, PARK, "test" ); Race race = new Race( true ); - race.addContestant( () -> executor.shutdown( SF_AWAIT_ALL_COMPLETED ) ); + race.addContestant( () -> executor.shutdown() ); race.addContestant( () -> executor.processors( 1 ) ); // WHEN diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java index eb4bb27128fbb..18536f7f38176 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java @@ -29,7 +29,6 @@ import org.neo4j.csv.reader.CharReadable; import org.neo4j.kernel.impl.util.Validators; import org.neo4j.test.RandomRule; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators; @@ -148,7 +147,7 @@ public int bufferSize() deserializer.next(); } } - catch ( TaskExecutionPanicException e ) + catch ( IllegalStateException e ) { // THEN it should be able to exit (this exception comes as a side effect) } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java index d5b93e2af602f..908044a843b36 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java @@ -32,8 +32,6 @@ import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; import org.neo4j.test.OtherThreadRule; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -90,7 +88,8 @@ public Void doWork( Void state ) throws Exception { processing.submit( i ); } - processing.shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED ); + processing.endOfSubmissions(); + processing.shutdown(); // THEN assertions.get(); @@ -133,7 +132,7 @@ public Void doWork( Void state ) throws Exception assertEquals( 3, processing.next().intValue() ); // THEN - processing.shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED ); + processing.shutdown(); } @Test( timeout = 10_000 )