Skip to content

Commit

Permalink
InputIterator implements Parallelizable
Browse files Browse the repository at this point in the history
as to be able to listen to requests from the batch importer
about the need for more threads in input steps.
  • Loading branch information
tinwelint committed Jun 14, 2016
1 parent f9ff135 commit e9267be
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 71 deletions.
Expand Up @@ -28,7 +28,7 @@
/** /**
* A {@link ResourceIterator} with added methods suitable for {@link Input} into a {@link BatchImporter}. * A {@link ResourceIterator} with added methods suitable for {@link Input} into a {@link BatchImporter}.
*/ */
public interface InputIterator<T> extends ResourceIterator<T>, SourceTraceability public interface InputIterator<T> extends ResourceIterator<T>, SourceTraceability, Parallelizable
{ {
public static abstract class Adapter<T> extends PrefetchingIterator<T> implements InputIterator<T> public static abstract class Adapter<T> extends PrefetchingIterator<T> implements InputIterator<T>
{ {
Expand Down Expand Up @@ -103,6 +103,24 @@ public long position()
{ {
return actual.position(); return actual.position();
} }

@Override
public int numberOfProcessors()
{
return actual.numberOfProcessors();
}

@Override
public boolean incrementNumberOfProcessors()
{
return actual.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return actual.decrementNumberOfProcessors();
}
} }


public static class Empty<T> extends Adapter<T> public static class Empty<T> extends Adapter<T>
Expand Down
Expand Up @@ -26,21 +26,80 @@
public interface Parallelizable public interface Parallelizable
{ {
/** /**
* The number of processors processing incoming tasks in parallel. * @return number of processors processing incoming tasks in parallel.
*/ */
int numberOfProcessors(); default int numberOfProcessors()
{
return 1;
}


/** /**
* Increments number of processors that processes tasks in parallel. * Increments number of processors that processes tasks in parallel.
* *
* @return {@code true} if one more processor was assigned, otherwise {@code false}. * @return {@code true} if one more processor was assigned, otherwise {@code false}.
*/ */
boolean incrementNumberOfProcessors(); default boolean incrementNumberOfProcessors()
{
return false;
}


/** /**
* Decrements number of processors that processes tasks in parallel. * Decrements number of processors that processes tasks in parallel.
* *
* @return {@code true} if one processor was unassigned, otherwise {@code false}. * @return {@code true} if one processor was unassigned, otherwise {@code false}.
*/ */
boolean decrementNumberOfProcessors(); default boolean decrementNumberOfProcessors()
{
return false;
}

/**
* Tries to set specified number of processors. If {@code processors} would be out of bounds
* for what this instance can assign then a value within bounds will be set instead.
*
* @param processors number of desired processors.
* @return number of actual processors after this call.
*/
default int setNumberOfProcessors( int processors )
{
int current;
while ( (current = numberOfProcessors()) != processors )
{
boolean success = current < processors ? incrementNumberOfProcessors() :
decrementNumberOfProcessors();
if ( !success )
{
break;
}
}
return current;
}

class Delegate implements Parallelizable
{
protected final Parallelizable actual;

public Delegate( Parallelizable actual )
{
this.actual = actual;
}

@Override
public int numberOfProcessors()
{
return actual.numberOfProcessors();
}

@Override
public boolean incrementNumberOfProcessors()
{
return actual.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return actual.decrementNumberOfProcessors();
}
}
} }
Expand Up @@ -71,37 +71,6 @@ public DynamicTaskExecutor( int initialProcessorCount, int maxProcessorCount, in
setNumberOfProcessors( initialProcessorCount ); setNumberOfProcessors( initialProcessorCount );
} }


@Override
public synchronized void setNumberOfProcessors( int count )
{
assertHealthy();
assert count > 0;
if ( count == processors.length )
{
return;
}

count = min( count, maxProcessorCount );
Processor[] newProcessors;
if ( count > processors.length )
{ // Add one or more
newProcessors = Arrays.copyOf( processors, count );
for ( int i = processors.length; i < newProcessors.length; i++ )
{
newProcessors[i] = new Processor( processorThreadNamePrefix + "-" + i );
}
}
else
{ // Remove one or more
newProcessors = Arrays.copyOf( processors, count );
for ( int i = newProcessors.length; i < processors.length; i++ )
{
processors[i].shutDown = true;
}
}
this.processors = newProcessors;
}

@Override @Override
public int numberOfProcessors() public int numberOfProcessors()
{ {
Expand All @@ -111,22 +80,38 @@ public int numberOfProcessors()
@Override @Override
public synchronized boolean incrementNumberOfProcessors() public synchronized boolean incrementNumberOfProcessors()
{ {
if ( numberOfProcessors() >= maxProcessorCount ) if ( shutDown )
{ {
return false; return false;
} }
setNumberOfProcessors( numberOfProcessors() + 1 ); int currentNumber = numberOfProcessors();
if ( currentNumber >= maxProcessorCount )
{
return false;
}

Processor[] newProcessors = Arrays.copyOf( processors, currentNumber + 1 );
newProcessors[currentNumber] = new Processor( processorThreadNamePrefix + "-" + currentNumber );
this.processors = newProcessors;
return true; return true;
} }


@Override @Override
public synchronized boolean decrementNumberOfProcessors() public synchronized boolean decrementNumberOfProcessors()
{ {
if ( numberOfProcessors() == 1 ) if ( shutDown )
{
return false;
}
int currentNumber = numberOfProcessors();
if ( currentNumber == 1 )
{ {
return false; return false;
} }
setNumberOfProcessors( numberOfProcessors() - 1 );
Processor[] newProcessors = Arrays.copyOf( processors, currentNumber - 1 );
processors[currentNumber-1].shutDown = true;
processors = newProcessors;
return true; return true;
} }


Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.neo4j.csv.reader.Extractors; import org.neo4j.csv.reader.Extractors;
import org.neo4j.csv.reader.Mark; import org.neo4j.csv.reader.Mark;
import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.kernel.impl.util.Validator; import org.neo4j.kernel.impl.util.Validator;
import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.Collector;
Expand All @@ -40,8 +39,7 @@
* Converts a line of csv data into an {@link InputEntity} (either a node or relationship). * Converts a line of csv data into an {@link InputEntity} (either a node or relationship).
* Does so by seeking values, using {@link CharSeeker}, interpreting the values using a {@link Header}. * Does so by seeking values, using {@link CharSeeker}, interpreting the values using a {@link Header}.
*/ */
public class InputEntityDeserializer<ENTITY extends InputEntity> public class InputEntityDeserializer<ENTITY extends InputEntity> extends InputIterator.Adapter<ENTITY>
extends PrefetchingIterator<ENTITY> implements InputIterator<ENTITY>
{ {
private final Header header; private final Header header;
private final CharSeeker data; private final CharSeeker data;
Expand Down
Expand Up @@ -95,28 +95,6 @@ protected boolean guarantees( int orderingGuaranteeFlag )
return (orderingGuarantees & orderingGuaranteeFlag) != 0; return (orderingGuarantees & orderingGuaranteeFlag) != 0;
} }


/**
* The number of processors processing incoming batches in parallel for this step. Exposed as a method
* since this number can change over time depending on the load.
*/
@Override
public int numberOfProcessors()
{
return 1;
}

@Override
public boolean incrementNumberOfProcessors()
{
return false;
}

@Override
public boolean decrementNumberOfProcessors()
{
return false;
}

@Override @Override
public String name() public String name()
{ {
Expand Down Expand Up @@ -290,6 +268,7 @@ protected void resetStats()
@Override @Override
public String toString() public String toString()
{ {
return format( "Step[%s, processors:%d, batches:%d", name, numberOfProcessors(), doneBatches.get() ); return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(),
name, numberOfProcessors(), doneBatches.get() );
} }
} }
Expand Up @@ -58,7 +58,7 @@ protected Object nextBatchOrNull( long ticket, int batchSize )
} }


@Override @Override
public void close() public void close() throws Exception
{ {
data.close(); data.close();
} }
Expand All @@ -68,4 +68,23 @@ protected long position()
{ {
return data.position(); return data.position();
} }

// We have to let our processing framework know about changes in processor count assigned to us
@Override
public int numberOfProcessors()
{
return data.numberOfProcessors();
}

@Override
public boolean incrementNumberOfProcessors()
{
return data.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return data.decrementNumberOfProcessors();
}
} }
Expand Up @@ -83,10 +83,11 @@ public int numberOfProcessors()
return numberOfProcessors; return numberOfProcessors;
} }


public ControlledStep<T> setNumberOfProcessors( int numberOfProcessors ) public ControlledStep<T> setProcessors( int numberOfProcessors )
{ {
assertTrue( numberOfProcessors <= maxProcessors ); assertTrue( numberOfProcessors <= maxProcessors );
this.numberOfProcessors = numberOfProcessors; this.numberOfProcessors = numberOfProcessors;
setNumberOfProcessors( numberOfProcessors );
return this; return this;
} }


Expand Down
Expand Up @@ -69,7 +69,7 @@ public void shouldRemoveCPUsFromWayTooFastStep() throws Exception


ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) ); ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) );
ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L ) ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L )
.setNumberOfProcessors( 2 ) ); .setProcessors( 2 ) );


StageExecution[] execution = executionOf( config, slowStep, fastStep ); StageExecution[] execution = executionOf( config, slowStep, fastStep );
assigner.start( execution ); assigner.start( execution );
Expand All @@ -91,7 +91,7 @@ public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Ex


ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) ); ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) );
ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 7L, done_batches, 10L ) ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 7L, done_batches, 10L )
.setNumberOfProcessors( 3 ) ); .setProcessors( 3 ) );


StageExecution[] execution = executionOf( config, slowStep, fastStep ); StageExecution[] execution = executionOf( config, slowStep, fastStep );
assigner.start( execution ); assigner.start( execution );
Expand Down Expand Up @@ -138,7 +138,7 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 ); DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 );
Step<?> wayFastest = stepWithStats( "wayFastest", 0, avg_processing_time, 0L, done_batches, 20L ); Step<?> wayFastest = stepWithStats( "wayFastest", 0, avg_processing_time, 0L, done_batches, 20L );
Step<?> fast = spy( stepWithStats( "fast", 0, avg_processing_time, 100L, done_batches, 20L ) Step<?> fast = spy( stepWithStats( "fast", 0, avg_processing_time, 100L, done_batches, 20L )
.setNumberOfProcessors( 3 ) ); .setProcessors( 3 ) );
Step<?> slow = stepWithStats( "slow", 1, avg_processing_time, 220L, done_batches, 20L ); Step<?> slow = stepWithStats( "slow", 1, avg_processing_time, 220L, done_batches, 20L );
StageExecution[] execution = executionOf( config, slow, wayFastest, fast ); StageExecution[] execution = executionOf( config, slow, wayFastest, fast );
assigner.start( execution ); assigner.start( execution );
Expand Down

0 comments on commit e9267be

Please sign in to comment.