Skip to content

Commit

Permalink
Revert "Merge pull request #8318 from tinwelint/3.0-pbi-par-input-pan…
Browse files Browse the repository at this point in the history
…ic-hang-fix"

This reverts commit 857209a, reversing
changes made to f0de132.
  • Loading branch information
burqen committed Nov 9, 2016
1 parent 857209a commit ef3d8ae
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 255 deletions.
Expand Up @@ -84,7 +84,7 @@ public long position()
public void close() public void close()
{ {
super.close(); super.close();
processing.shutdown(); processing.shutdown( false );
} }


@Override @Override
Expand Down
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;
Expand Down Expand Up @@ -160,12 +159,7 @@ public Integer get( long timeout, TimeUnit unit ) throws InterruptedException, E


public static <T> Future<T> future( final Callable<T> task ) public static <T> Future<T> future( final Callable<T> task )
{ {
return future( task, Executors.defaultThreadFactory() ); ExecutorService executor = Executors.newSingleThreadExecutor();
}

public static <T> Future<T> future( final Callable<T> task, ThreadFactory factory )
{
ExecutorService executor = Executors.newSingleThreadExecutor( factory );
Future<T> future = executor.submit( task ); Future<T> future = executor.submit( task );
executor.shutdown(); executor.shutdown();
return future; return future;
Expand Down
Expand Up @@ -24,7 +24,6 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.function.Suppliers; import org.neo4j.function.Suppliers;
Expand All @@ -49,7 +48,8 @@ public class DynamicTaskExecutor<LOCAL> implements TaskExecutor<LOCAL>
@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 ); private volatile Processor[] processors = (Processor[]) Array.newInstance( Processor.class, 0 );
private volatile boolean shutDown; private volatile boolean shutDown;
private final AtomicReference<Throwable> panic = new AtomicReference<>(); private volatile boolean abortQueued;
private volatile Throwable panic;
private final Supplier<LOCAL> initialLocalState; private final Supplier<LOCAL> initialLocalState;
private final int maxProcessorCount; private final int maxProcessorCount;


Expand Down Expand Up @@ -142,14 +142,16 @@ public void submit( Task<LOCAL> task )
@Override @Override
public void assertHealthy() public void assertHealthy()
{ {
Throwable panic = this.panic.get();
if ( panic != null )
{
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
}
if ( shutDown ) if ( shutDown )
{ {
throw new IllegalStateException( "Executor has been shut down" ); if ( panic != null )
{
throw new TaskExecutionPanicException( "Executor has been shut down in panic", panic );
}
if ( abortQueued )
{
throw new TaskExecutionPanicException( "Executor has been shut down, aborting queued" );
}
} }
} }


Expand All @@ -162,41 +164,24 @@ private void notifyProcessors()
} }


@Override @Override
public synchronized void shutdown() public synchronized void shutdown( int flags )
{ {
if ( shutDown ) if ( shutDown )
{ {
return; return;
} }


// Await all tasks to go into processing boolean awaitAllCompleted = (flags & TaskExecutor.SF_AWAIT_ALL_COMPLETED) != 0;
while ( !queue.isEmpty() && panic.get() == null /*all bets are off in the event of panic*/ ) while ( awaitAllCompleted && !queue.isEmpty() && panic == null /*all bets are off in the event of panic*/ )
{
parkAWhile();
}
// Await all processing tasks to be completed
for ( Processor processor : processors )
{
processor.processorShutDown = true;
}
while ( anyAlive() && panic.get() == null /*all bets are off in the event of panic*/ )
{ {
parkAWhile(); parkAWhile();
} }
this.shutDown = true; this.shutDown = true;
} this.abortQueued = (flags & TaskExecutor.SF_ABORT_QUEUED) != 0;

while ( awaitAllCompleted && anyAlive() && panic == null /*all bets are off in the event of panic*/ )
@Override
public void panic( Throwable panic )
{
if ( !this.panic.compareAndSet( null, panic ) )
{ {
// there was already a panic set, add this new one as suppressed parkAWhile();
this.panic.get().addSuppressed( panic );
} }
// else this was the first panic set

shutdown();
} }


@Override @Override
Expand Down Expand Up @@ -259,7 +244,8 @@ public void run()
} }
catch ( Throwable e ) catch ( Throwable e )
{ {
panic( e ); panic = e;
shutdown( TaskExecutor.SF_ABORT_QUEUED );
throw launderedException( e ); throw launderedException( e );
} }
} }
Expand Down
Expand Up @@ -32,6 +32,9 @@
*/ */
public interface TaskExecutor<LOCAL> extends Parallelizable public interface TaskExecutor<LOCAL> extends Parallelizable
{ {
int SF_AWAIT_ALL_COMPLETED = 0x1;
int SF_ABORT_QUEUED = 0x2;

/** /**
* Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be * Submits a task to be executed by one of the processors in this {@link TaskExecutor}. Tasks will be
* executed in the order of which they arrive. * executed in the order of which they arrive.
Expand All @@ -43,32 +46,23 @@ public interface TaskExecutor<LOCAL> extends Parallelizable
/** /**
* Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}. * Shuts down this {@link TaskExecutor}, disallowing new tasks to be {@link #submit(Task) submitted}.
* *
* submitted tasks will be processed before returning from this method. * @param flags {@link #SF_AWAIT_ALL_COMPLETED} will wait for all queued or already executing tasks to be
*/ * executed and completed, before returning from this method. {@link #SF_ABORT_QUEUED} will have
void shutdown(); * submitted tasks which haven't started executing yet cancelled, never to be executed.

/**
* Puts this executor into panic mode. Call to {@link #shutdown()} has no effect after a panic.
* Call to {@link #assertHealthy()} will communicate the panic as well.
* This semantically includes something like a shutdown, but submitted tasks which haven't started
* being processed will be aborted and currently processing tasks will not be awaited for completion.
*
* @param panic cause of panic.
*/ */
void panic( Throwable panic ); void shutdown( int flags );


/** /**
* @return {@code true} if {@link #shutdown()} or {@link #panic(Throwable)} has been called, * @return {@code true} if {@link #shutdown(int)} has been called, otherwise {@code false}.
* otherwise {@code false}.
*/ */
boolean isShutdown(); boolean isShutdown();


/** /**
* Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition * Asserts that this {@link TaskExecutor} is healthy. Useful to call when deciding to wait on a condition
* this executor is expected to fulfill. * this executor is expected to fulfill.
* *
* @throws RuntimeException of some sort if this executor is in a bad state, containing the original error, * @throws RuntimeException of some sort if this executor is in a bad stage, the original error that
* if any, that made this executor fail. * made this executor fail.
*/ */
void assertHealthy(); void assertHealthy();
} }
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.csv.reader.BufferedCharSeeker; import org.neo4j.csv.reader.BufferedCharSeeker;
Expand Down Expand Up @@ -66,7 +65,6 @@ public class ParallelInputEntityDeserializer<ENTITY extends InputEntity> extends
private final TicketedProcessing<CharSeeker,Header,ENTITY[]> processing; private final TicketedProcessing<CharSeeker,Header,ENTITY[]> processing;
private final ContinuableArrayCursor<ENTITY> cursor; private final ContinuableArrayCursor<ENTITY> cursor;
private SourceTraceability last = SourceTraceability.EMPTY; private SourceTraceability last = SourceTraceability.EMPTY;
private final Future<Void> processingCompletion;


@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config, public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config,
Expand Down Expand Up @@ -121,7 +119,7 @@ public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory header
cursor = new ContinuableArrayCursor<>( batchSupplier ); cursor = new ContinuableArrayCursor<>( batchSupplier );


// Start an asynchronous slurp of the chunks fed directly into the processors // Start an asynchronous slurp of the chunks fed directly into the processors
processingCompletion = processing.slurp( seekers( firstSeeker, source, config ), true ); processing.slurp( seekers( firstSeeker, source, config ), true );
} }
catch ( IOException e ) catch ( IOException e )
{ {
Expand Down Expand Up @@ -262,14 +260,7 @@ protected CharSeeker fetchNextOrNull()
@Override @Override
public void close() public void close()
{ {
if ( processingCompletion.isDone() ) processing.shutdown( true );
{
processing.shutdown();
}
else
{
processing.panic( new IllegalStateException( "Processing not completed when closing, indicating panic" ) );
}
try try
{ {
source.close(); source.close();
Expand Down
Expand Up @@ -19,7 +19,6 @@
*/ */
package org.neo4j.unsafe.impl.batchimport.staging; package org.neo4j.unsafe.impl.batchimport.staging;


import java.nio.channels.ShutdownChannelGroupException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongPredicate; import java.util.function.LongPredicate;


Expand All @@ -32,8 +31,9 @@
import static java.lang.System.nanoTime; import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;


import static org.neo4j.helpers.ArrayUtil.array;
import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY; import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_ABORT_QUEUED;
import static org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor.SF_AWAIT_ALL_COMPLETED;
import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await; import static org.neo4j.unsafe.impl.batchimport.staging.Processing.await;


/** /**
Expand Down Expand Up @@ -157,14 +157,7 @@ private void incrementQueue()
public void close() throws Exception public void close() throws Exception
{ {
super.close(); super.close();
if ( panic == null ) executor.shutdown( panic == null ? SF_AWAIT_ALL_COMPLETED : SF_ABORT_QUEUED );
{
executor.shutdown();
}
else
{
executor.panic( panic );
}
} }


@Override @Override
Expand Down
Expand Up @@ -27,11 +27,11 @@
import java.util.function.LongPredicate; import java.util.function.LongPredicate;
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.unsafe.impl.batchimport.Parallelizable; import org.neo4j.unsafe.impl.batchimport.Parallelizable;
import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;


import static org.neo4j.helpers.FutureAdapter.future; import static org.neo4j.helpers.FutureAdapter.future;
Expand Down Expand Up @@ -67,7 +67,6 @@ public class TicketedProcessing<FROM,STATE,TO> implements Parallelizable
{ {
private static final ParkStrategy park = new ParkStrategy.Park( 10, MILLISECONDS ); private static final ParkStrategy park = new ParkStrategy.Park( 10, MILLISECONDS );


private final String name;
private final TaskExecutor<STATE> executor; private final TaskExecutor<STATE> executor;
private final BiFunction<FROM,STATE,TO> processor; private final BiFunction<FROM,STATE,TO> processor;
private final ArrayBlockingQueue<TO> processed; private final ArrayBlockingQueue<TO> processed;
Expand Down Expand Up @@ -96,7 +95,6 @@ public boolean test( long ticket )
public TicketedProcessing( String name, int maxProcessors, BiFunction<FROM,STATE,TO> processor, public TicketedProcessing( String name, int maxProcessors, BiFunction<FROM,STATE,TO> processor,
Supplier<STATE> threadLocalStateSupplier ) Supplier<STATE> threadLocalStateSupplier )
{ {
this.name = name;
this.processor = processor; this.processor = processor;
this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, maxProcessors, park, name, this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, maxProcessors, park, name,
threadLocalStateSupplier ); threadLocalStateSupplier );
Expand Down Expand Up @@ -130,87 +128,56 @@ public void submit( FROM job )
await( myTurnToAddToProcessedQueue, ticket, healthCheck, park ); await( myTurnToAddToProcessedQueue, ticket, healthCheck, park );


// OK now it's my turn to add this result to the result queue which user will pull from later on // OK now it's my turn to add this result to the result queue which user will pull from later on
boolean offered; while ( !processed.offer( result, 10, MILLISECONDS ) );
do
{
healthCheck.run();
offered = processed.offer( result, 10, MILLISECONDS );
}
while ( !offered );


// Signal that this ticket has been processed and added to the result queue // Signal that this ticket has been processed and added to the result queue
processedTicket.incrementAndGet(); processedTicket.incrementAndGet();
} ); } );
} }


/**
* Should be called after all calls to {@link #submit(Object)} and {@link #slurp(Iterator, boolean)}
* have been made. This is due to the async nature of this class where submitted jobs gets processed
* and put into result queue for pickup in {@link #next()}, which will not know when to return
* {@code null} (marking the end) otherwise. Then after all processed results have been retrieved
* a call to {@link #shutdown()} should be made.
*/
public void endOfSubmissions()
{
done = true;
}

/** /**
* Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will * Essentially starting a thread {@link #submit(Object) submitting} a stream of inputs which will
* each be processed and asynchronically made available in order of processing ticket by later calling * each be processed and asynchronically made available in order of processing ticket by later calling
* {@link #next()}. * {@link #next()}.
* *
* Failure reading from {@code input} or otherwise submitting jobs will fail the processing after that point.
*
* @param input {@link Iterator} of input to process. * @param input {@link Iterator} of input to process.
* @param lastSubmissions will call {@link #endOfSubmissions()} after all jobs submitted if {@code true}. * @param shutdownAfterAllSubmitted will call {@link #shutdown(boolean)} after all jobs submitted if {@code true}.
* @return {@link Future} representing the work of submitting the inputs to be processed. When the future * @return {@link Future} representing the work of submitting the inputs to be processed. When the future
* is completed all items from the {@code input} {@link Iterator} have been submitted, but some items * is completed all items from the {@code input} {@link Iterator} have been submitted, but some items
* may still be queued and processed. * may still be queued and processed.
*/ */
public Future<Void> slurp( Iterator<FROM> input, boolean lastSubmissions ) public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitted )
{ {
return future( () -> return future( () ->
{ {
try while ( input.hasNext() )
{ {
while ( input.hasNext() ) submit( input.next() );
{
submit( input.next() );
}
if ( lastSubmissions )
{
endOfSubmissions();
}
return null;
} }
catch ( Throwable e ) if ( shutdownAfterAllSubmitted )
{ {
executor.panic( e ); shutdown( true );
throw e;
} }
}, NamedThreadFactory.named( name + "-slurper" ) ); return null;
} );
} }


/** /**
* Shuts down processing. Normally this method gets called after an arbitrary amount of * Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking,
* {@link #submit(Object)} / {@link #slurp(Iterator, boolean)} concluded with {@link #endOfSubmissions()} * waiting for new processed results.
* and after all processed results have been {@link #next() retrieved}. *
* Shutdown can be called before everything has been processed in which case it causes jobs to be aborted. * @param awaitAllProcessed if {@code true} will block until all submitted jobs have been processed,
* otherwise if {@code false} will return immediately, where processing will still commence and complete.
*/ */
public void shutdown() public void shutdown( boolean awaitAllProcessed )
{ {
executor.shutdown(); done = true;
} executor.shutdown( awaitAllProcessed ? TaskExecutor.SF_AWAIT_ALL_COMPLETED : 0 );

public void panic( Throwable panic )
{
executor.panic( panic );
} }


/** /**
* @return next processed job (blocking call). If all submitted jobs have been processed and returned * @return next processed job (blocking call), or {@code null} if all jobs have been processed
* and {@link #endOfSubmissions()} have been called {@code null} will be returned. * and {@link #shutdown(boolean)} has been called.
*/ */
public TO next() public TO next()
{ {
Expand Down

0 comments on commit ef3d8ae

Please sign in to comment.