diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java index da83720927eb6..da079055eda4e 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java @@ -84,7 +84,7 @@ public long position() public void close() { super.close(); - processing.shutdown( 0 ); + processing.shutdown(); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 0f503613e9604..0f7f6e8b03cc6 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -48,7 +48,6 @@ public class DynamicTaskExecutor implements TaskExecutor @SuppressWarnings( "unchecked" ) private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 ); private volatile boolean shutDown; - private volatile boolean abortQueued; private volatile Throwable panic; private final Supplier initialLocalState; private final int maxProcessorCount; @@ -142,16 +141,13 @@ public void submit( Task task ) @Override public void assertHealthy() { + if ( panic != null ) + { + throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); + } if ( shutDown ) { - if ( panic != null ) - { - throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic ); - } - if ( abortQueued ) - { - throw new TaskExecutionPanicException( "Executor has been shut down, aborting queued" ); - } + throw new IllegalStateException( "Executor has been shut down" ); } } @@ -164,30 +160,35 @@ private void notifyProcessors() } @Override - public synchronized void shutdown( int flags ) + public synchronized void shutdown() { if ( shutDown ) { - // We're already shut down, although take the ABORT_QUEUED flag seriously as to abort all - // processors still working on tasks. This looks like a panic. - if ( (flags & TaskExecutor.SF_ABORT_QUEUED) != 0 ) - { - this.abortQueued = true; - } return; } - boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0; - while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) + // Await all tasks to go into processing + while ( !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } - this.shutDown = true; - this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0; - while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ ) + // Await all processing tasks to be completed + for ( Processor processor : processors ) + { + processor.processorShutDown = true; + } + while ( anyAlive() && panic == null /*all bets are off in the event of panic*/ ) { parkAWhile(); } + this.shutDown = true; + } + + @Override + public void panic( Throwable panic ) + { + this.panic = panic; + shutdown(); } @Override @@ -250,8 +251,7 @@ public void run() } catch ( Throwable e ) { - panic = e; - shutdown( TaskExecutor.SF_ABORT_QUEUED ); + panic( e ); throw launderedException( e ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java index fcbacf79362b2..bd5cdd60156a2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/TaskExecutor.java @@ -32,9 +32,6 @@ */ public interface TaskExecutor extends Parallelizable { - int SF_AWAIT_ALL_COMPLETED = 0x1; - int SF_ABORT_QUEUED = 0x2; - /** * Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be * executed in the order of which they arrive. @@ -46,14 +43,22 @@ public interface TaskExecutor extends Parallelizable /** * Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}. * - * @param flags {@link #SF_AWAIT_ALL_COMPLETED} will wait for all queued or already executing tasks to be - * executed and completed, before returning from this method. {@link #SF_ABORT_QUEUED} will have - * submitted tasks which haven't started executing yet cancelled, never to be executed. + * submitted tasks will be processed before returning from this method. + */ + void shutdown(); + + /** + * Puts this executor into panic mode. Call to {@link #shutdown()} has no effect after a panic. + * Call to {@link #assertHealthy()} will communicate the panic as well. + * This semantically includes something like a shutdown, but submitted tasks which haven't started + * being processed will be aborted and currently processing tasks will not be awaited for completion. + * + * @param panic cause of panic. */ - void shutdown( int flags ); + void panic( Throwable panic ); /** - * @return {@code true} if {@link #shutdown(int)} has been called, otherwise {@code false}. + * @return {@code true} if {@link #shutdown()} has been called, otherwise {@code false}. */ boolean isShutdown(); @@ -61,8 +66,8 @@ public interface TaskExecutor extends Parallelizable * Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition * this executor is expected to fulfill. * - * @throws RuntimeException of some sort if this executor is in a bad stage, the original error that - * made this executor fail. + * @throws RuntimeException of some sort if this executor is in a bad state, containing the original error, + * if any, that made this executor fail. */ void assertHealthy(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java index cb531747641be..0523160bba170 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java @@ -38,7 +38,6 @@ import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor; import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.input.InputException; import org.neo4j.unsafe.impl.batchimport.input.InputNode; @@ -47,6 +46,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; import static org.neo4j.csv.reader.Source.singleChunk; +import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.kernel.impl.util.Validators.emptyValidator; import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator; @@ -263,11 +263,7 @@ protected CharSeeker fetchNextOrNull() @Override public void close() { - // At this point normally the "slurp" above will have called shutdown and so calling close - // before that has completed signals some sort of panic. Encode this knowledge in flags passed - // to shutdown - int flags = processingCompletion.isDone() ? 0 : TaskExecutor.SF_ABORT_QUEUED; - processing.shutdown( flags ); + processing.shutdown(); try { source.close(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index ee67e16e6bfb5..358e5bc77bae3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -19,6 +19,7 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; +import java.nio.channels.ShutdownChannelGroupException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongPredicate; @@ -31,9 +32,8 @@ import static java.lang.System.nanoTime; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED; import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await; /** @@ -157,7 +157,14 @@ private void incrementQueue() public void close() throws Exception { super.close(); - executor.shutdown( panic == null ? SF_AWAIT_ALL_COMPLETED : SF_ABORT_QUEUED ); + if ( panic == null ) + { + executor.shutdown(); + } + else + { + executor.panic( panic ); + } } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java index 16387d0fa2e97..fd649eeda181d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java @@ -30,9 +30,7 @@ import org.neo4j.unsafe.impl.batchimport.Parallelizable; import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; - import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.neo4j.helpers.FutureAdapter.future; @@ -92,7 +90,6 @@ public boolean test( long ticket ) }; private final Runnable healthCheck; private volatile boolean done; - private volatile Throwable slurpFailure; public TicketedProcessing( String name, int maxProcessors, BiFunction processor, Supplier threadLocalStateSupplier ) @@ -100,14 +97,7 @@ public TicketedProcessing( String name, int maxProcessors, BiFunction( 1, maxProcessors, maxProcessors, park, name, threadLocalStateSupplier ); - this.healthCheck = () -> - { - if ( slurpFailure != null ) - { - throw new TaskExecutionPanicException( "Failure adding jobs for processing", slurpFailure ); - } - executor.assertHealthy(); - }; + this.healthCheck = executor::assertHealthy; this.processed = new ArrayBlockingQueue<>( maxProcessors ); } @@ -150,18 +140,32 @@ public void submit( FROM job ) } ); } + /** + * Should be called after all calls to {@link #submit(Object)} and {@link #slurp(Iterator, boolean)} + * have been made. This is due to the async nature of this class where submitted jobs gets processed + * and put into result queue for pickup in {@link #next()}, which will not know when to return + * {@code null} (marking the end) otherwise. Then after all processed results have been retrieved + * a call to {@link #shutdown()} should be made. + */ + public void endOfSubmissions() + { + done = true; + } + /** * Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will * each be processed and asynchronically made available in order of processing ticket by later calling * {@link #next()}. * + * Failure reading from {@code input} or otherwise submitting jobs will fail the processing after that point. + * * @param input {@link Iterator} of input to process. - * @param shutdownAfterAllSubmitted will call {@link #shutdown(int)} after all jobs submitted if {@code true}. + * @param lastSubmissions will call {@link #endOfSubmissions()} after all jobs submitted if {@code true}. * @return {@link Future} representing the work of submitting the inputs to be processed. When the future * is completed all items from the {@code input} {@link Iterator} have been submitted, but some items * may still be queued and processed. */ - public Future slurp( Iterator input, boolean shutdownAfterAllSubmitted ) + public Future slurp( Iterator input, boolean lastSubmissions ) { return future( () -> { @@ -171,38 +175,34 @@ public Future slurp( Iterator input, boolean shutdownAfterAllSubmitt { submit( input.next() ); } - if ( shutdownAfterAllSubmitted ) + if ( lastSubmissions ) { - shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED ); + endOfSubmissions(); } return null; } catch ( Throwable e ) { - slurpFailure = e; - shutdown( TaskExecutor.SF_ABORT_QUEUED ); + executor.panic( e ); throw e; } } ); } /** - * Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking, - * waiting for new processed results. - * - * @param flags see {@link TaskExecutor} + * Shuts down processing. Normally this method gets called after an arbitrary amount of + * {@link #submit(Object)} / {@link #slurp(Iterator, boolean)} concluded with {@link #endOfSubmissions()} + * and after all processed results have been {@link #next() retrieved}. + * Shutdown can be called before everything has been processed in which case it causes jobs to be aborted. */ - public void shutdown( int flags ) + public void shutdown() { - // This also marks the end of input - done = true; - - executor.shutdown( flags ); + executor.shutdown(); } /** - * @return next processed job (blocking call), or {@code null} if all jobs have been processed - * and {@link #shutdown(int)} has been called. + * @return next processed job (blocking call). If all submitted jobs have been processed and returned + * and {@link #endOfSubmissions()} have been called {@code null} will be returned. */ public TO next() { diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java index b9bd26df00928..4d02df148864e 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java @@ -33,7 +33,6 @@ import org.neo4j.test.RepeatRule; import org.neo4j.test.RepeatRule.Repeat; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy.Park; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -42,9 +41,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED; -import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED; - public class DynamicTaskExecutorTest { private static final Park PARK = new ParkStrategy.Park( 1, MILLISECONDS ); @@ -72,7 +68,7 @@ public void shouldExecuteTasksInParallel() throws Exception while ( task1.executed == 0 ) { // Busy loop } - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN assertEquals( 1, task1.executed ); @@ -100,7 +96,7 @@ public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception while ( task1.executed == 0 ) { // Busy loop } - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN assertEquals( 1, task1.executed ); @@ -132,7 +128,7 @@ public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception Thread.sleep( 200 ); // gosh, a Thread.sleep... assertEquals( 0, task4.executed ); task3.latch.finish(); - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN assertEquals( 1, task1.executed ); @@ -154,7 +150,7 @@ public void shouldExecuteMultipleTasks() throws Exception { executor.submit( tasks[i] = new ExpensiveTask( 10 ) ); } - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); // THEN for ( ExpensiveTask task : tasks ) @@ -208,7 +204,6 @@ public void shouldShutDownOnTaskFailureEvenIfOtherTasksArePending() throws Excep // THEN assertExceptionOnSubmit( executor, exception ); - executor.shutdown( SF_ABORT_QUEUED ); // call would block if the shutdown as part of failure doesn't complete properly } @Test @@ -264,7 +259,7 @@ public void shouldLetShutdownCompleteInEventOfPanic() throws Exception @Override public Void doWork( Void state ) throws Exception { - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); return null; } } ); @@ -301,7 +296,7 @@ public void shouldRespectMaxProcessors() throws Exception assertEquals( 1, executor.processors( -2 ) ); assertEquals( 1, executor.processors( -2 ) ); assertEquals( 1, executor.processors( 0 ) ); - executor.shutdown( SF_AWAIT_ALL_COMPLETED ); + executor.shutdown(); } @Repeat( times = 100 ) @@ -311,7 +306,7 @@ public void shouldCopeWithConcurrentIncrementOfProcessorsAndShutdown() throws Th // GIVEN TaskExecutor executor = new DynamicTaskExecutor<>( 1, 2, 2, PARK, "test" ); Race race = new Race( true ); - race.addContestant( () -> executor.shutdown( SF_AWAIT_ALL_COMPLETED ) ); + race.addContestant( () -> executor.shutdown() ); race.addContestant( () -> executor.processors( 1 ) ); // WHEN diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java index eb4bb27128fbb..18536f7f38176 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java @@ -29,7 +29,6 @@ import org.neo4j.csv.reader.CharReadable; import org.neo4j.kernel.impl.util.Validators; import org.neo4j.test.RandomRule; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Groups; import org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators; @@ -148,7 +147,7 @@ public int bufferSize() deserializer.next(); } } - catch ( TaskExecutionPanicException e ) + catch ( IllegalStateException e ) { // THEN it should be able to exit (this exception comes as a side effect) } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java index d5b93e2af602f..908044a843b36 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java @@ -32,8 +32,6 @@ import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException; import org.neo4j.test.OtherThreadRule; -import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -90,7 +88,8 @@ public Void doWork( Void state ) throws Exception { processing.submit( i ); } - processing.shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED ); + processing.endOfSubmissions(); + processing.shutdown(); // THEN assertions.get(); @@ -133,7 +132,7 @@ public Void doWork( Void state ) throws Exception assertEquals( 3, processing.next().intValue() ); // THEN - processing.shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED ); + processing.shutdown(); } @Test( timeout = 10_000 )