Skip to content

Commit

Permalink
Fixes a race condition in ParallelInputEntityDeserializer
Browse files Browse the repository at this point in the history
where calling close() before everything was processed to completion should
(and now is) interpreted as panic and delegated to its executor as well.
Previously this would be delegated as a shutdown, which would wait for
everything to complete, whereas it probably wouldn't be.
  • Loading branch information
tinwelint committed Nov 7, 2016
1 parent 894abf1 commit d5fc190
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 5 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -159,7 +160,12 @@ public Integer get( long timeout, TimeUnit unit ) throws InterruptedException, E

public static <T> Future<T> future( final Callable<T> task )
{
ExecutorService executor = Executors.newSingleThreadExecutor();
return future( task, Executors.defaultThreadFactory() );
}

public static <T> Future<T> future( final Callable<T> task, ThreadFactory factory )
{
ExecutorService executor = Executors.newSingleThreadExecutor( factory );
Future<T> future = executor.submit( task );
executor.shutdown();
return future;
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing;

import static org.neo4j.csv.reader.Source.singleChunk;
import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.kernel.impl.util.Validators.emptyValidator;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator;

Expand Down Expand Up @@ -263,7 +262,14 @@ protected CharSeeker fetchNextOrNull()
@Override
public void close()
{
processing.shutdown();
if ( processingCompletion.isDone() )
{
processing.shutdown();
}
else
{
processing.panic( new IllegalStateException( "Processing not completed when closing, indicating panic" ) );
}
try
{
source.close();
Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.function.LongPredicate;
import java.util.function.Supplier;

import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.unsafe.impl.batchimport.Parallelizable;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class TicketedProcessing<FROM,STATE,TO> implements Parallelizable
{
private static final ParkStrategy park = new ParkStrategy.Park( 10, MILLISECONDS );

private final String name;
private final TaskExecutor<STATE> executor;
private final BiFunction<FROM,STATE,TO> processor;
private final ArrayBlockingQueue<TO> processed;
Expand Down Expand Up @@ -94,6 +96,7 @@ public boolean test( long ticket )
public TicketedProcessing( String name, int maxProcessors, BiFunction<FROM,STATE,TO> processor,
Supplier<STATE> threadLocalStateSupplier )
{
this.name = name;
this.processor = processor;
this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, maxProcessors, park, name,
threadLocalStateSupplier );
Expand Down Expand Up @@ -186,7 +189,7 @@ public Future<Void> slurp( Iterator<FROM> input, boolean lastSubmissions )
executor.panic( e );
throw e;
}
} );
}, NamedThreadFactory.named( name + "-slurper" ) );
}

/**
Expand All @@ -200,6 +203,11 @@ public void shutdown()
executor.shutdown();
}

public void panic( Throwable panic )
{
executor.panic( panic );
}

/**
* @return next processed job (blocking call). If all submitted jobs have been processed and returned
* and {@link #endOfSubmissions()} have been called {@code null} will be returned.
Expand Down
Expand Up @@ -139,7 +139,7 @@ public int bufferSize()
// somewhere, anywhere in the importer. At that point there are still batches that have been
// processed and are there for the taking. One of the components in the hang scenario that we want
// to test comes from a processor in TicketedProcessing forever trying to offer its processed
// result to the result queue (where the loop didn't care if it had been forcefully shut down.
// result to the result queue, where the loop didn't care if it had been forcefully shut down.
// To get one of the processing threads into doing that we need to pull some of the already
// processed items so that it wants to go ahead and offer its result.
for ( int i = 0; i < 100 && deserializer.hasNext(); i++ )
Expand Down

0 comments on commit d5fc190

Please sign in to comment.