Skip to content

Commit

Permalink
Simplified TaskExecutor/TicketedProcessing shutdown
Browse files Browse the repository at this point in the history
by removing shutdown flags and having an explicit panic(Throwable)
method besides shutdown(). TicktedProcessing separates shutdown(flags)
into shutdown() and endOfSubmissions() instead of conflating the two
and using the previously available flags to work around and distinguish
the two cases.

This should have no observable functional difference looking at the
current usages of TaskExecutor/TicketedProcessing, only that the code
related to shutdown should be easier to reason about.
  • Loading branch information
tinwelint committed Oct 31, 2016
1 parent 96f3644 commit bce6048
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 89 deletions.
Expand Up @@ -84,7 +84,7 @@ public long position()
public void close()
{
super.close();
processing.shutdown( 0 );
processing.shutdown();
}

@Override
Expand Down
Expand Up @@ -48,7 +48,6 @@ public class DynamicTaskExecutor<LOCAL> implements TaskExecutor<LOCAL>
@SuppressWarnings( "unchecked" )
private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 );
private volatile boolean shutDown;
private volatile boolean abortQueued;
private volatile Throwable panic;
private final Supplier<LOCAL> initialLocalState;
private final int maxProcessorCount;
Expand Down Expand Up @@ -142,16 +141,13 @@ public void submit( Task<LOCAL> task )
@Override
public void assertHealthy()
{
if ( panic != null )
{
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
}
if ( shutDown )
{
if ( panic != null )
{
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
}
if ( abortQueued )
{
throw new TaskExecutionPanicException( "Executor has been shut down, aborting queued" );
}
throw new IllegalStateException( "Executor has been shut down" );
}
}

Expand All @@ -164,30 +160,35 @@ private void notifyProcessors()
}

@Override
public synchronized void shutdown( int flags )
public synchronized void shutdown()
{
if ( shutDown )
{
// We're already shut down, although take the ABORT_QUEUED flag seriously as to abort all
// processors still working on tasks. This looks like a panic.
if ( (flags & TaskExecutor.SF_ABORT_QUEUED) != 0 )
{
this.abortQueued = true;
}
return;
}

boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0;
while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ )
// Await all tasks to go into processing
while ( !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
}
this.shutDown = true;
this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0;
while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ )
// Await all processing tasks to be completed
for ( Processor processor : processors )
{
processor.processorShutDown = true;
}
while ( anyAlive() && panic == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
}
this.shutDown = true;
}

@Override
public void panic( Throwable panic )
{
this.panic = panic;
shutdown();
}

@Override
Expand Down Expand Up @@ -250,8 +251,7 @@ public void run()
}
catch ( Throwable e )
{
panic = e;
shutdown( TaskExecutor.SF_ABORT_QUEUED );
panic( e );
throw launderedException( e );
}
}
Expand Down
Expand Up @@ -32,9 +32,6 @@
*/
public interface TaskExecutor<LOCAL> extends Parallelizable
{
int SF_AWAIT_ALL_COMPLETED = 0x1;
int SF_ABORT_QUEUED = 0x2;

/**
* Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be
* executed in the order of which they arrive.
Expand All @@ -46,23 +43,31 @@ public interface TaskExecutor<LOCAL> extends Parallelizable
/**
* Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}.
*
* @param flags {@link #SF_AWAIT_ALL_COMPLETED} will wait for all queued or already executing tasks to be
* executed and completed, before returning from this method. {@link #SF_ABORT_QUEUED} will have
* submitted tasks which haven't started executing yet cancelled, never to be executed.
* submitted tasks will be processed before returning from this method.
*/
void shutdown();

/**
* Puts this executor into panic mode. Call to {@link #shutdown()} has no effect after a panic.
* Call to {@link #assertHealthy()} will communicate the panic as well.
* This semantically includes something like a shutdown, but submitted tasks which haven't started
* being processed will be aborted and currently processing tasks will not be awaited for completion.
*
* @param panic cause of panic.
*/
void shutdown( int flags );
void panic( Throwable panic );

/**
* @return {@code true} if {@link #shutdown(int)} has been called, otherwise {@code false}.
* @return {@code true} if {@link #shutdown()} has been called, otherwise {@code false}.
*/
boolean isShutdown();

/**
* Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition
* this executor is expected to fulfill.
*
* @throws RuntimeException of some sort if this executor is in a bad stage, the original error that
* made this executor fail.
* @throws RuntimeException of some sort if this executor is in a bad state, containing the original error,
* if any, that made this executor fail.
*/
void assertHealthy();
}
Expand Up @@ -38,7 +38,6 @@
import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputException;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
Expand All @@ -47,6 +46,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing;

import static org.neo4j.csv.reader.Source.singleChunk;
import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.kernel.impl.util.Validators.emptyValidator;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators.noDecorator;

Expand Down Expand Up @@ -263,11 +263,7 @@ protected CharSeeker fetchNextOrNull()
@Override
public void close()
{
// At this point normally the "slurp" above will have called shutdown and so calling close
// before that has completed signals some sort of panic. Encode this knowledge in flags passed
// to shutdown
int flags = processingCompletion.isDone() ? 0 : TaskExecutor.SF_ABORT_QUEUED;
processing.shutdown( flags );
processing.shutdown();
try
{
source.close();
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport.staging;

import java.nio.channels.ShutdownChannelGroupException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongPredicate;

Expand All @@ -31,9 +32,8 @@
import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED;
import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await;

/**
Expand Down Expand Up @@ -157,7 +157,14 @@ private void incrementQueue()
public void close() throws Exception
{
super.close();
executor.shutdown( panic == null ? SF_AWAIT_ALL_COMPLETED : SF_ABORT_QUEUED );
if ( panic == null )
{
executor.shutdown();
}
else
{
executor.panic( panic );
}
}

@Override
Expand Down
Expand Up @@ -30,9 +30,7 @@
import org.neo4j.unsafe.impl.batchimport.Parallelizable;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.helpers.FutureAdapter.future;
Expand Down Expand Up @@ -92,22 +90,14 @@ public boolean test( long ticket )
};
private final Runnable healthCheck;
private volatile boolean done;
private volatile Throwable slurpFailure;

public TicketedProcessing( String name, int maxProcessors, BiFunction<FROM,STATE,TO> processor,
Supplier<STATE> threadLocalStateSupplier )
{
this.processor = processor;
this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, maxProcessors, park, name,
threadLocalStateSupplier );
this.healthCheck = () ->
{
if ( slurpFailure != null )
{
throw new TaskExecutionPanicException( "Failure adding jobs for processing", slurpFailure );
}
executor.assertHealthy();
};
this.healthCheck = executor::assertHealthy;
this.processed = new ArrayBlockingQueue<>( maxProcessors );
}

Expand Down Expand Up @@ -150,18 +140,32 @@ public void submit( FROM job )
} );
}

/**
* Should be called after all calls to {@link #submit(Object)} and {@link #slurp(Iterator, boolean)}
* have been made. This is due to the async nature of this class where submitted jobs gets processed
* and put into result queue for pickup in {@link #next()}, which will not know when to return
* {@code null} (marking the end) otherwise. Then after all processed results have been retrieved
* a call to {@link #shutdown()} should be made.
*/
public void endOfSubmissions()
{
done = true;
}

/**
* Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will
* each be processed and asynchronically made available in order of processing ticket by later calling
* {@link #next()}.
*
* Failure reading from {@code input} or otherwise submitting jobs will fail the processing after that point.
*
* @param input {@link Iterator} of input to process.
* @param shutdownAfterAllSubmitted will call {@link #shutdown(int)} after all jobs submitted if {@code true}.
* @param lastSubmissions will call {@link #endOfSubmissions()} after all jobs submitted if {@code true}.
* @return {@link Future} representing the work of submitting the inputs to be processed. When the future
* is completed all items from the {@code input} {@link Iterator} have been submitted, but some items
* may still be queued and processed.
*/
public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitted )
public Future<Void> slurp( Iterator<FROM> input, boolean lastSubmissions )
{
return future( () ->
{
Expand All @@ -171,38 +175,34 @@ public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitt
{
submit( input.next() );
}
if ( shutdownAfterAllSubmitted )
if ( lastSubmissions )
{
shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED );
endOfSubmissions();
}
return null;
}
catch ( Throwable e )
{
slurpFailure = e;
shutdown( TaskExecutor.SF_ABORT_QUEUED );
executor.panic( e );
throw e;
}
} );
}

/**
* Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking,
* waiting for new processed results.
*
* @param flags see {@link TaskExecutor}
* Shuts down processing. Normally this method gets called after an arbitrary amount of
* {@link #submit(Object)} / {@link #slurp(Iterator, boolean)} concluded with {@link #endOfSubmissions()}
* and after all processed results have been {@link #next() retrieved}.
* Shutdown can be called before everything has been processed in which case it causes jobs to be aborted.
*/
public void shutdown( int flags )
public void shutdown()
{
// This also marks the end of input
done = true;

executor.shutdown( flags );
executor.shutdown();
}

/**
* @return next processed job (blocking call), or {@code null} if all jobs have been processed
* and {@link #shutdown(int)} has been called.
* @return next processed job (blocking call). If all submitted jobs have been processed and returned
* and {@link #endOfSubmissions()} have been called {@code null} will be returned.
*/
public TO next()
{
Expand Down

0 comments on commit bce6048

Please sign in to comment.