Skip to content

Commit

Permalink
Fixes importer issue which could hang shutdown during panic
Browse files Browse the repository at this point in the history
this would prevent import tool from shutting down, but more importantly
it would prevent the error to be shown.

This fix treats a call to close() before all processing is done in
ParallelInputEntityDeserializer as a panic and delegates that flag to
its executor so that all processors can now abort their processing
accordingly.
  • Loading branch information
tinwelint committed Nov 7, 2016
1 parent 65f5bcb commit 475e50b
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 13 deletions.
Expand Up @@ -84,7 +84,7 @@ public long position()
public void close()
{
super.close();
processing.shutdown( false );
processing.shutdown( 0 );
}

@Override
Expand Down
Expand Up @@ -168,6 +168,12 @@ public synchronized void shutdown( int flags )
{
if ( shutDown )
{
// We're already shut down, although take the ABORT_QUEUED flag seriously as to abort all
// processors still working on tasks. This looks like a panic.
if ( (flags & TaskExecutor.SF_ABORT_QUEUED) != 0 )
{
this.abortQueued = true;
}
return;
}

Expand Down
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.function.Supplier;

import org.neo4j.csv.reader.BufferedCharSeeker;
Expand All @@ -37,6 +38,7 @@
import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputException;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
Expand Down Expand Up @@ -65,6 +67,7 @@ public class ParallelInputEntityDeserializer<ENTITY extends InputEntity> extends
private final TicketedProcessing<CharSeeker,Header,ENTITY[]> processing;
private final ContinuableArrayCursor<ENTITY> cursor;
private SourceTraceability last = SourceTraceability.EMPTY;
private final Future<Void> processingCompletion;

@SuppressWarnings( "unchecked" )
public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory headerFactory, Configuration config,
Expand Down Expand Up @@ -119,7 +122,7 @@ public ParallelInputEntityDeserializer( Data<ENTITY> data, Header.Factory header
cursor = new ContinuableArrayCursor<>( batchSupplier );

// Start an asynchronous slurp of the chunks fed directly into the processors
processing.slurp( seekers( firstSeeker, source, config ), true );
processingCompletion = processing.slurp( seekers( firstSeeker, source, config ), true );
}
catch ( IOException e )
{
Expand Down Expand Up @@ -260,7 +263,11 @@ protected CharSeeker fetchNextOrNull()
@Override
public void close()
{
processing.shutdown( true );
// At this point normally the "slurp" above will have called shutdown and so calling close
// before that has completed signals some sort of panic. Encode this knowledge in flags passed
// to shutdown
int flags = processingCompletion.isDone() ? 0 : TaskExecutor.SF_ABORT_QUEUED;
processing.shutdown( flags );
try
{
source.close();
Expand Down
Expand Up @@ -128,7 +128,13 @@ public void submit( FROM job )
await( myTurnToAddToProcessedQueue, ticket, healthCheck, park );

// OK now it's my turn to add this result to the result queue which user will pull from later on
while ( !processed.offer( result, 10, MILLISECONDS ) );
boolean offered;
do
{
healthCheck.run();
offered = processed.offer( result, 10, MILLISECONDS );
}
while ( !offered );

// Signal that this ticket has been processed and added to the result queue
processedTicket.incrementAndGet();
Expand All @@ -141,7 +147,7 @@ public void submit( FROM job )
* {@link #next()}.
*
* @param input {@link Iterator} of input to process.
* @param shutdownAfterAllSubmitted will call {@link #shutdown(boolean)} after all jobs submitted if {@code true}.
* @param shutdownAfterAllSubmitted will call {@link #shutdown(int)} after all jobs submitted if {@code true}.
* @return {@link Future} representing the work of submitting the inputs to be processed. When the future
* is completed all items from the {@code input} {@link Iterator} have been submitted, but some items
* may still be queued and processed.
Expand All @@ -156,7 +162,7 @@ public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitt
}
if ( shutdownAfterAllSubmitted )
{
shutdown( true );
shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED );
}
return null;
} );
Expand All @@ -166,18 +172,19 @@ public Future<Void> slurp( Iterator<FROM> input, boolean shutdownAfterAllSubmitt
* Tells this processor that there will be no more submissions and so {@link #next()} will stop blocking,
* waiting for new processed results.
*
* @param awaitAllProcessed if {@code true} will block until all submitted jobs have been processed,
* otherwise if {@code false} will return immediately, where processing will still commence and complete.
* @param flags see {@link TaskExecutor}
*/
public void shutdown( boolean awaitAllProcessed )
public void shutdown( int flags )
{
// This also marks the end of input
done = true;
executor.shutdown( awaitAllProcessed ? TaskExecutor.SF_AWAIT_ALL_COMPLETED : 0 );

executor.shutdown( flags );
}

/**
* @return next processed job (blocking call), or {@code null} if all jobs have been processed
* and {@link #shutdown(boolean)} has been called.
* and {@link #shutdown(int)} has been called.
*/
public TO next()
{
Expand Down
Expand Up @@ -25,9 +25,11 @@
import java.io.StringReader;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

import org.neo4j.csv.reader.CharReadable;
import org.neo4j.kernel.impl.util.Validators;
import org.neo4j.test.RandomRule;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Groups;
import org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators;
Expand All @@ -42,6 +44,7 @@
import static org.neo4j.csv.reader.Readables.wrap;
import static org.neo4j.unsafe.impl.batchimport.input.csv.Configuration.COMMAS;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DataFactories.defaultFormatNodeFileHeader;
import static org.neo4j.unsafe.impl.batchimport.input.csv.DeserializerFactories.defaultNodeDeserializer;
import static org.neo4j.unsafe.impl.batchimport.input.csv.IdType.ACTUAL;

public class ParallelInputEntityDeserializerTest
Expand Down Expand Up @@ -103,6 +106,54 @@ public int bufferSize()
}
}

// Timeout is so that if this bug strikes again it will only cause this test to run for a limited time
// before failing. Normally this test is really quick
@Test( timeout = 10_000 )
public void shouldTreatExternalCloseAsPanic() throws Exception
{
// GIVEN enough data to fill up queues
int entities = 500;
Data<InputNode> data = testData( entities );
Configuration config = new Configuration.Overridden( COMMAS )
{
@Override
public int bufferSize()
{
return 100;
}
};
IdType idType = ACTUAL;
Collector badCollector = mock( Collector.class );
Groups groups = new Groups();

// WHEN closing before having consumed all results
DeserializerFactory<InputNode> deserializerFactory =
defaultNodeDeserializer( groups, config, idType, badCollector );
try ( ParallelInputEntityDeserializer<InputNode> deserializer = new ParallelInputEntityDeserializer<>( data,
defaultFormatNodeFileHeader(), config, idType, 3, 3, deserializerFactory,
Validators.<InputNode>emptyValidator(), InputNode.class ) )
{
deserializer.hasNext();
deserializer.close();

// Why pull some items after it has been closed? The above close() symbolizes a panic from
// somewhere, anywhere in the importer. At that point there are still batches that have been
// processed and are there for the taking. One of the components in the hang scenario that we want
// to test comes from a processor in TicketedProcessing forever trying to offer its processed
// result to the result queue (where the loop didn't care if it had been forcefully shut down.
// To get one of the processing threads into doing that we need to pull some of the already
// processed items so that it wants to go ahead and offer its result.
for ( int i = 0; i < 100 && deserializer.hasNext(); i++ )
{
deserializer.next();
}
}
catch ( TaskExecutionPanicException e )
{
// THEN it should be able to exit (this exception comes as a side effect)
}
}

private Data<InputNode> testData( int entities )
{
StringBuilder string = new StringBuilder();
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.test.OtherThreadExecutor.WorkerCommand;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.test.OtherThreadRule;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -83,7 +84,7 @@ public Void doWork( Void state ) throws Exception
{
processing.submit( i );
}
processing.shutdown( true );
processing.shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED );

// THEN
assertions.get();
Expand Down Expand Up @@ -126,7 +127,7 @@ public Void doWork( Void state ) throws Exception
assertEquals( 3, processing.next().intValue() );

// THEN
processing.shutdown( true );
processing.shutdown( TaskExecutor.SF_AWAIT_ALL_COMPLETED );
}

private static class StringJob
Expand Down

0 comments on commit 475e50b

Please sign in to comment.