diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java index 2c31743e624a4..79587051c931a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java @@ -28,7 +28,7 @@ /** * A {@link ResourceIterator} with added methods suitable for {@link Input} into a {@link BatchImporter}. */ -public interface InputIterator extends ResourceIterator, SourceTraceability +public interface InputIterator extends ResourceIterator, SourceTraceability, Parallelizable { public static abstract class Adapter extends PrefetchingIterator implements InputIterator { @@ -103,6 +103,24 @@ public long position() { return actual.position(); } + + @Override + public int numberOfProcessors() + { + return actual.numberOfProcessors(); + } + + @Override + public boolean incrementNumberOfProcessors() + { + return actual.incrementNumberOfProcessors(); + } + + @Override + public boolean decrementNumberOfProcessors() + { + return actual.decrementNumberOfProcessors(); + } } public static class Empty extends Adapter diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java index 34948cc07bca4..872bd1fdb41e2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java @@ -26,21 +26,80 @@ public interface Parallelizable { /** - * The number of processors processing incoming tasks in parallel. + * @return number of processors processing incoming tasks in parallel. */ - int numberOfProcessors(); + default int numberOfProcessors() + { + return 1; + } /** * Increments number of processors that processes tasks in parallel. * * @return {@code true} if one more processor was assigned, otherwise {@code false}. */ - boolean incrementNumberOfProcessors(); + default boolean incrementNumberOfProcessors() + { + return false; + } /** * Decrements number of processors that processes tasks in parallel. * * @return {@code true} if one processor was unassigned, otherwise {@code false}. */ - boolean decrementNumberOfProcessors(); + default boolean decrementNumberOfProcessors() + { + return false; + } + + /** + * Tries to set specified number of processors. If {@code processors} would be out of bounds + * for what this instance can assign then a value within bounds will be set instead. + * + * @param processors number of desired processors. + * @return number of actual processors after this call. + */ + default int setNumberOfProcessors( int processors ) + { + int current; + while ( (current = numberOfProcessors()) != processors ) + { + boolean success = current < processors ? incrementNumberOfProcessors() : + decrementNumberOfProcessors(); + if ( !success ) + { + break; + } + } + return current; + } + + class Delegate implements Parallelizable + { + protected final Parallelizable actual; + + public Delegate( Parallelizable actual ) + { + this.actual = actual; + } + + @Override + public int numberOfProcessors() + { + return actual.numberOfProcessors(); + } + + @Override + public boolean incrementNumberOfProcessors() + { + return actual.incrementNumberOfProcessors(); + } + + @Override + public boolean decrementNumberOfProcessors() + { + return actual.decrementNumberOfProcessors(); + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index a429bf42e4578..4189c19705b77 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -71,37 +71,6 @@ public DynamicTaskExecutor( int initialProcessorCount, int maxProcessorCount, in setNumberOfProcessors( initialProcessorCount ); } - @Override - public synchronized void setNumberOfProcessors( int count ) - { - assertHealthy(); - assert count > 0; - if ( count == processors.length ) - { - return; - } - - count = min( count, maxProcessorCount ); - Processor[] newProcessors; - if ( count > processors.length ) - { // Add one or more - newProcessors = Arrays.copyOf( processors, count ); - for ( int i = processors.length; i < newProcessors.length; i++ ) - { - newProcessors[i] = new Processor( processorThreadNamePrefix + "-" + i ); - } - } - else - { // Remove one or more - newProcessors = Arrays.copyOf( processors, count ); - for ( int i = newProcessors.length; i < processors.length; i++ ) - { - processors[i].shutDown = true; - } - } - this.processors = newProcessors; - } - @Override public int numberOfProcessors() { @@ -111,22 +80,38 @@ public int numberOfProcessors() @Override public synchronized boolean incrementNumberOfProcessors() { - if ( numberOfProcessors() >= maxProcessorCount ) + if ( shutDown ) { return false; } - setNumberOfProcessors( numberOfProcessors() + 1 ); + int currentNumber = numberOfProcessors(); + if ( currentNumber >= maxProcessorCount ) + { + return false; + } + + Processor[] newProcessors = Arrays.copyOf( processors, currentNumber + 1 ); + newProcessors[currentNumber] = new Processor( processorThreadNamePrefix + "-" + currentNumber ); + this.processors = newProcessors; return true; } @Override public synchronized boolean decrementNumberOfProcessors() { - if ( numberOfProcessors() == 1 ) + if ( shutDown ) + { + return false; + } + int currentNumber = numberOfProcessors(); + if ( currentNumber == 1 ) { return false; } - setNumberOfProcessors( numberOfProcessors() - 1 ); + + Processor[] newProcessors = Arrays.copyOf( processors, currentNumber - 1 ); + processors[currentNumber-1].shutDown = true; + processors = newProcessors; return true; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputEntityDeserializer.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputEntityDeserializer.java index 11533520f948e..4bf57ae164842 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputEntityDeserializer.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/csv/InputEntityDeserializer.java @@ -26,7 +26,6 @@ import org.neo4j.csv.reader.Extractors; import org.neo4j.csv.reader.Mark; import org.neo4j.helpers.Exceptions; -import org.neo4j.helpers.collection.PrefetchingIterator; import org.neo4j.kernel.impl.util.Validator; import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -40,8 +39,7 @@ * Converts a line of csv data into an {@link InputEntity} (either a node or relationship). * Does so by seeking values, using {@link CharSeeker}, interpreting the values using a {@link Header}. */ -public class InputEntityDeserializer - extends PrefetchingIterator implements InputIterator +public class InputEntityDeserializer extends InputIterator.Adapter { private final Header header; private final CharSeeker data; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java index f12933f3cf5bf..9d1bdbde65008 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java @@ -95,28 +95,6 @@ protected boolean guarantees( int orderingGuaranteeFlag ) return (orderingGuarantees & orderingGuaranteeFlag) != 0; } - /** - * The number of processors processing incoming batches in parallel for this step. Exposed as a method - * since this number can change over time depending on the load. - */ - @Override - public int numberOfProcessors() - { - return 1; - } - - @Override - public boolean incrementNumberOfProcessors() - { - return false; - } - - @Override - public boolean decrementNumberOfProcessors() - { - return false; - } - @Override public String name() { @@ -290,6 +268,7 @@ protected void resetStats() @Override public String toString() { - return format( "Step[%s, processors:%d, batches:%d", name, numberOfProcessors(), doneBatches.get() ); + return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(), + name, numberOfProcessors(), doneBatches.get() ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java index a7696bfecf437..5996f273d5edf 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java @@ -58,7 +58,7 @@ protected Object nextBatchOrNull( long ticket, int batchSize ) } @Override - public void close() + public void close() throws Exception { data.close(); } @@ -68,4 +68,23 @@ protected long position() { return data.position(); } + + // We have to let our processing framework know about changes in processor count assigned to us + @Override + public int numberOfProcessors() + { + return data.numberOfProcessors(); + } + + @Override + public boolean incrementNumberOfProcessors() + { + return data.incrementNumberOfProcessors(); + } + + @Override + public boolean decrementNumberOfProcessors() + { + return data.decrementNumberOfProcessors(); + } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java index 1a50f31f71653..cbe867c1610ff 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java @@ -83,10 +83,11 @@ public int numberOfProcessors() return numberOfProcessors; } - public ControlledStep setNumberOfProcessors( int numberOfProcessors ) + public ControlledStep setProcessors( int numberOfProcessors ) { assertTrue( numberOfProcessors <= maxProcessors ); this.numberOfProcessors = numberOfProcessors; + setNumberOfProcessors( numberOfProcessors ); return this; } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java index 99693ee6f39b3..9614d5d0088a2 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java @@ -69,7 +69,7 @@ public void shouldRemoveCPUsFromWayTooFastStep() throws Exception ControlledStep slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) ); ControlledStep fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L ) - .setNumberOfProcessors( 2 ) ); + .setProcessors( 2 ) ); StageExecution[] execution = executionOf( config, slowStep, fastStep ); assigner.start( execution ); @@ -91,7 +91,7 @@ public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Ex ControlledStep slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) ); ControlledStep fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 7L, done_batches, 10L ) - .setNumberOfProcessors( 3 ) ); + .setProcessors( 3 ) ); StageExecution[] execution = executionOf( config, slowStep, fastStep ); assigner.start( execution ); @@ -138,7 +138,7 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 ); Step wayFastest = stepWithStats( "wayFastest", 0, avg_processing_time, 0L, done_batches, 20L ); Step fast = spy( stepWithStats( "fast", 0, avg_processing_time, 100L, done_batches, 20L ) - .setNumberOfProcessors( 3 ) ); + .setProcessors( 3 ) ); Step slow = stepWithStats( "slow", 1, avg_processing_time, 220L, done_batches, 20L ); StageExecution[] execution = executionOf( config, slow, wayFastest, fast ); assigner.start( execution );