Skip to content

Commit

Permalink
Merge pull request #8198 from tinwelint/3.0-pbi-par-input-panic-hang-fix
Browse files Browse the repository at this point in the history
Fixes importer issue which could hang shutdown during panic
  • Loading branch information
burqen committed Nov 1, 2016
2 parents 844933c + 6aa4216 commit cbe280b
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public long position()
public void close()
{
super.close();
processing.shutdown( false );
processing.shutdown();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.neo4j.function.Suppliers;
Expand All @@ -48,8 +49,7 @@ public class DynamicTaskExecutor<LOCAL> implements TaskExecutor<LOCAL>
@SuppressWarnings( "unchecked" )
private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 );
private volatile boolean shutDown;
private volatile boolean abortQueued;
private volatile Throwable panic;
private final AtomicReference<Throwable> panic = new AtomicReference<>();
private final Supplier<LOCAL> initialLocalState;
private final int maxProcessorCount;

Expand Down Expand Up @@ -142,16 +142,14 @@ public void submit( Task<LOCAL> task )
@Override
public void assertHealthy()
{
Throwable panic = this.panic.get();
if ( panic != null )
{
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
}
if ( shutDown )
{
if ( panic != null )
{
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
}
if ( abortQueued )
{
throw new TaskExecutionPanicException( "Executor has been shut down, aborting queued" );
}
throw new IllegalStateException( "Executor has been shut down" );
}
}

Expand All @@ -164,24 +162,41 @@ private void notifyProcessors()
}

@Override
public synchronized void shutdown( int flags )
public synchronized void shutdown()
{
if ( shutDown )
{
return;
}

boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0;
while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ )
// Await all tasks to go into processing
while ( !queue.isEmpty() && panic.get() == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
}
this.shutDown = true;
this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0;
while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ )
// Await all processing tasks to be completed
for ( Processor processor : processors )
{
processor.processorShutDown = true;
}
while ( anyAlive() && panic.get() == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
}
this.shutDown = true;
}

@Override
public void panic( Throwable panic )
{
if ( !this.panic.compareAndSet( null, panic ) )
{
// there was already a panic set, add this new one as suppressed
this.panic.get().addSuppressed( panic );
}
// else this was the first panic set

shutdown();
}

@Override
Expand Down Expand Up @@ -244,8 +259,7 @@ public void run()
}
catch ( Throwable e )
{
panic = e;
shutdown( TaskExecutor.SF_ABORT_QUEUED );
panic( e );
throw launderedException( e );
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
*/
public interface TaskExecutor<LOCAL> extends Parallelizable
{
int SF_AWAIT_ALL_COMPLETED = 0x1;
int SF_ABORT_QUEUED = 0x2;

/**
* Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be
* executed in the order of which they arrive.
Expand All @@ -46,23 +43,32 @@ public interface TaskExecutor<LOCAL> extends Parallelizable
/**
* Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}.
*
* @param flags {@link #SF_AWAIT_ALL_COMPLETED} will wait for all queued or already executing tasks to be
* executed and completed, before returning from this method. {@link #SF_ABORT_QUEUED} will have
* submitted tasks which haven't started executing yet cancelled, never to be executed.
* submitted tasks will be processed before returning from this method.
*/
void shutdown();

/**
* Puts this executor into panic mode. Call to {@link #shutdown()} has no effect after a panic.
* Call to {@link #assertHealthy()} will communicate the panic as well.
* This semantically includes something like a shutdown, but submitted tasks which haven't started
* being processed will be aborted and currently processing tasks will not be awaited for completion.
*
* @param panic cause of panic.
*/
void shutdown( int flags );
void panic( Throwable panic );

/**
* @return {@code true} if {@link #shutdown(int)} has been called, otherwise {@code false}.
* @return {@code true} if {@link #shutdown()} or {@link #panic(Throwable)} has been called,
* otherwise {@code false}.
*/
boolean isShutdown();

/**
* Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition
* this executor is expected to fulfill.
*
* @throws RuntimeException of some sort if this executor is in a bad stage, the original error that
* made this executor fail.
* @throws RuntimeException of some sort if this executor is in a bad state, containing the original error,
* if any, that made this executor fail.
*/
void assertHealthy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Supplier;

import org.neo4j.csv.reader.BufferedCharSeeker;
Expand All @@ -45,6 +46,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing;

import static org.neo4j.csv.reader.Source.singleChunk;
import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.kernel.impl.util.Validators.emptyValidator;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator;

Expand All @@ -65,6 +67,7 @@ public class ParallelInputEntityDeserializer<ENTITY extends InputEntity> extends
private final TicketedProcessing<CharSeeker,Header,ENTITY[]> processing;
private final ContinuableArrayCursor<ENTITY> cursor;
private SourceTraceability last = SourceTraceability.EMPTY;
private final Future<Void> processingCompletion;

@SuppressWarnings( "unchecked" )
public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config,
Expand Down Expand Up @@ -119,7 +122,7 @@ public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory header
cursor = new ContinuableArrayCursor<>( batchSupplier );

// Start an asynchronous slurp of the chunks fed directly into the processors
processing.slurp( seekers( firstSeeker, source, config ), true );
processingCompletion = processing.slurp( seekers( firstSeeker, source, config ), true );
}
catch ( IOException e )
{
Expand Down Expand Up @@ -260,7 +263,7 @@ protected CharSeeker fetchNextOrNull()
@Override
public void close()
{
processing.shutdown( true );
processing.shutdown();
try
{
source.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport.staging;

import java.nio.channels.ShutdownChannelGroupException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongPredicate;

Expand All @@ -31,9 +32,8 @@
import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED;
import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await;

/**
Expand Down Expand Up @@ -157,7 +157,14 @@ private void incrementQueue()
public void close() throws Exception
{
super.close();
executor.shutdown( panic == null ? SF_AWAIT_ALL_COMPLETED : SF_ABORT_QUEUED );
if ( panic == null )
{
executor.shutdown();
}
else
{
executor.panic( panic );
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.helpers.FutureAdapter.future;
Expand Down Expand Up @@ -128,56 +127,82 @@ public void submit( FROM job )
await( myTurnToAddToProcessedQueue, ticket, healthCheck, park );

// OK now it's my turn to add this result to the result queue which user will pull from later on
while ( !processed.offer( result, 10, MILLISECONDS ) );
boolean offered;
do
{
healthCheck.run();
offered = processed.offer( result, 10, MILLISECONDS );
}
while ( !offered );

// Signal that this ticket has been processed and added to the result queue
processedTicket.incrementAndGet();
} );
}

/**
* Should be called after all calls to {@link #submit(Object)} and {@link #slurp(Iterator, boolean)}
* have been made. This is due to the async nature of this class where submitted jobs gets processed
* and put into result queue for pickup in {@link #next()}, which will not know when to return
* {@code null} (marking the end) otherwise. Then after all processed results have been retrieved
* a call to {@link #shutdown()} should be made.
*/
public void endOfSubmissions()
{
done = true;
}

/**
* Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will
* each be processed and asynchronically made available in order of processing ticket by later calling
* {@link #next()}.
*
* Failure reading from {@code input} or otherwise submitting jobs will fail the processing after that point.
*
* @param input {@link Iterator} of input to process.
* @param shutdownAfterAllSubmitted will call {@link #shutdown(boolean)} after all jobs submitted if {@code true}.
* @param lastSubmissions will call {@link #endOfSubmissions()} after all jobs submitted if {@code true}.
* @return {@link Future} representing the work of submitting the inputs to be processed. When the future
* is completed all items from the {@code input} {@link Iterator} have been submitted, but some items
* may still be queued and processed.
*/
public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitted )
public Future<Void> slurp( Iterator<FROM> input, boolean lastSubmissions )
{
return future( () ->
{
while ( input.hasNext() )
try
{
submit( input.next() );
while ( input.hasNext() )
{
submit( input.next() );
}
if ( lastSubmissions )
{
endOfSubmissions();
}
return null;
}
if ( shutdownAfterAllSubmitted )
catch ( Throwable e )
{
shutdown( true );
executor.panic( e );
throw e;
}
return null;
} );
}

/**
* Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking,
* waiting for new processed results.
*
* @param awaitAllProcessed if {@code true} will block until all submitted jobs have been processed,
* otherwise if {@code false} will return immediately, where processing will still commence and complete.
* Shuts down processing. Normally this method gets called after an arbitrary amount of
* {@link #submit(Object)} / {@link #slurp(Iterator, boolean)} concluded with {@link #endOfSubmissions()}
* and after all processed results have been {@link #next() retrieved}.
* Shutdown can be called before everything has been processed in which case it causes jobs to be aborted.
*/
public void shutdown( boolean awaitAllProcessed )
public void shutdown()
{
done = true;
executor.shutdown( awaitAllProcessed ? TaskExecutor.SF_AWAIT_ALL_COMPLETED : 0 );
executor.shutdown();
}

/**
* @return next processed job (blocking call), or {@code null} if all jobs have been processed
* and {@link #shutdown(boolean)} has been called.
* @return next processed job (blocking call). If all submitted jobs have been processed and returned
* and {@link #endOfSubmissions()} have been called {@code null} will be returned.
*/
public TO next()
{
Expand Down

0 comments on commit cbe280b

Please sign in to comment.