From d5fc190b8b3c365b4762f1669ea0997a7fff7a23 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Mon, 7 Nov 2016 10:38:18 +0100 Subject: [PATCH] Fixes a race condition in ParallelInputEntityDeserializer where calling close() before everything was processed to completion should (and now is) interpreted as panic and delegated to its executor as well. Previously this would be delegated as a shutdown, which would wait for everything to complete, whereas it probably wouldn't be. --- .../src/main/java/org/neo4j/helpers/FutureAdapter.java | 8 +++++++- .../input/csv/ParallelInputEntityDeserializer.java | 10 ++++++++-- .../impl/batchimport/staging/TicketedProcessing.java | 10 +++++++++- .../input/csv/ParallelInputEntityDeserializerTest.java | 2 +- 4 files changed, 25 insertions(+), 5 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/helpers/FutureAdapter.java b/community/kernel/src/main/java/org/neo4j/helpers/FutureAdapter.java index cb0c8649aca51..c2445b47997e8 100644 --- a/community/kernel/src/main/java/org/neo4j/helpers/FutureAdapter.java +++ b/community/kernel/src/main/java/org/neo4j/helpers/FutureAdapter.java @@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; @@ -159,7 +160,12 @@ public Integer get( long timeout, TimeUnit unit ) throws InterruptedException, E public static Future future( final Callable task ) { - ExecutorService executor = Executors.newSingleThreadExecutor(); + return future( task, Executors.defaultThreadFactory() ); + } + + public static Future future( final Callable task, ThreadFactory factory ) + { + ExecutorService executor = Executors.newSingleThreadExecutor( factory ); Future future = executor.submit( task ); executor.shutdown(); return future; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java index 0523160bba170..3b9c1583a42b7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializer.java @@ -46,7 +46,6 @@ import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; import static org.neo4j.csv.reader.Source.singleChunk; -import static org.neo4j.helpers.ArrayUtil.array; import static org.neo4j.kernel.impl.util.Validators.emptyValidator; import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator; @@ -263,7 +262,14 @@ protected CharSeeker fetchNextOrNull() @Override public void close() { - processing.shutdown(); + if ( processingCompletion.isDone() ) + { + processing.shutdown(); + } + else + { + processing.panic( new IllegalStateException( "Processing not completed when closing, indicating panic" ) ); + } try { source.close(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java index fd649eeda181d..8c81a71445c1e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java @@ -27,6 +27,7 @@ import java.util.function.LongPredicate; import java.util.function.Supplier; +import org.neo4j.helpers.NamedThreadFactory; import org.neo4j.unsafe.impl.batchimport.Parallelizable; import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; @@ -66,6 +67,7 @@ public class TicketedProcessing implements Parallelizable { private static final ParkStrategy park = new ParkStrategy.Park( 10, MILLISECONDS ); + private final String name; private final TaskExecutor executor; private final BiFunction processor; private final ArrayBlockingQueue processed; @@ -94,6 +96,7 @@ public boolean test( long ticket ) public TicketedProcessing( String name, int maxProcessors, BiFunction processor, Supplier threadLocalStateSupplier ) { + this.name = name; this.processor = processor; this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, maxProcessors, park, name, threadLocalStateSupplier ); @@ -186,7 +189,7 @@ public Future slurp( Iterator input, boolean lastSubmissions ) executor.panic( e ); throw e; } - } ); + }, NamedThreadFactory.named( name + "-slurper" ) ); } /** @@ -200,6 +203,11 @@ public void shutdown() executor.shutdown(); } + public void panic( Throwable panic ) + { + executor.panic( panic ); + } + /** * @return next processed job (blocking call). If all submitted jobs have been processed and returned * and {@link #endOfSubmissions()} have been called {@code null} will be returned. diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java index 18536f7f38176..2f20b744efa87 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/csv/ParallelInputEntityDeserializerTest.java @@ -139,7 +139,7 @@ public int bufferSize() // somewhere, anywhere in the importer. At that point there are still batches that have been // processed and are there for the taking. One of the components in the hang scenario that we want // to test comes from a processor in TicketedProcessing forever trying to offer its processed - // result to the result queue (where the loop didn't care if it had been forcefully shut down. + // result to the result queue, where the loop didn't care if it had been forcefully shut down. // To get one of the processing threads into doing that we need to pull some of the already // processed items so that it wants to go ahead and offer its result. for ( int i = 0; i < 100 && deserializer.hasNext(); i++ )