diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java index c5102cb9d7886..300c7a46da6df 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/EntityDataGenerator.java @@ -87,22 +87,9 @@ public void close() processing.shutdown( false ); } - // We have to let our processing framework know about changes in processor count assigned to us @Override - public int numberOfProcessors() + public int processors( int delta ) { - return processing.numberOfProcessors(); - } - - @Override - public boolean incrementNumberOfProcessors() - { - return processing.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return processing.decrementNumberOfProcessors(); + return processing.processors( delta ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java index 79587051c931a..a3e924d603c1e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java @@ -105,21 +105,9 @@ public long position() } @Override - public int numberOfProcessors() + public int processors( int delta ) { - return actual.numberOfProcessors(); - } - - @Override - public boolean incrementNumberOfProcessors() - { - return actual.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return actual.decrementNumberOfProcessors(); + return actual.processors( delta ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java index 872bd1fdb41e2..1cdd003ad6c7b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Parallelizable.java @@ -21,85 +21,27 @@ /** * Represents something that can be parallelizable, in this case that means the ability to dynamically change - * the number of processors executing that tasks ahead. + * the number of processors executing tasks. */ public interface Parallelizable { /** - * @return number of processors processing incoming tasks in parallel. - */ - default int numberOfProcessors() - { - return 1; - } - - /** - * Increments number of processors that processes tasks in parallel. + * Change number of processors assigned to this {@link Parallelizable}. Accepts a {@code delta}, + * which may specify positive or negative value, even zero. This instances may have internal constraints + * in the number of processors, min or max, which may be assigned and so potentially the change will + * only be partially accepted or not at all. This is why this call returns the total number of processors + * this instance now has accepted after any effect of this call. * - * @return {@code true} if one more processor was assigned, otherwise {@code false}. - */ - default boolean incrementNumberOfProcessors() - { - return false; - } - - /** - * Decrements number of processors that processes tasks in parallel. + * {@link Parallelizable} is used in many call stacks where call delegation is predominant and so + * reducing number of methods to delegate is favored. This is why this method looks and functions + * like this, it can cater for incrementing, decrementing and even getting number of processors. * - * @return {@code true} if one processor was unassigned, otherwise {@code false}. + * @param delta number of processors to add or remove, i.e. negative or positive value. A value of + * zero will result in merely the current number of assigned processors to be returned. + * @return the number of assigned processors as a result this call. */ - default boolean decrementNumberOfProcessors() + default int processors( int delta ) { - return false; - } - - /** - * Tries to set specified number of processors. If {@code processors} would be out of bounds - * for what this instance can assign then a value within bounds will be set instead. - * - * @param processors number of desired processors. - * @return number of actual processors after this call. - */ - default int setNumberOfProcessors( int processors ) - { - int current; - while ( (current = numberOfProcessors()) != processors ) - { - boolean success = current < processors ? incrementNumberOfProcessors() : - decrementNumberOfProcessors(); - if ( !success ) - { - break; - } - } - return current; - } - - class Delegate implements Parallelizable - { - protected final Parallelizable actual; - - public Delegate( Parallelizable actual ) - { - this.actual = actual; - } - - @Override - public int numberOfProcessors() - { - return actual.numberOfProcessors(); - } - - @Override - public boolean incrementNumberOfProcessors() - { - return actual.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return actual.decrementNumberOfProcessors(); - } + return 1; } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 2bbb38342c287..9c51240e38ce7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -27,7 +27,11 @@ import java.util.function.Supplier; import org.neo4j.function.Suppliers; + +import static java.lang.Integer.max; +import static java.lang.Integer.min; import static java.util.concurrent.TimeUnit.MILLISECONDS; + import static org.neo4j.helpers.Exceptions.launderedException; /** @@ -68,51 +72,45 @@ public DynamicTaskExecutor( int initialProcessorCount, int maxProcessorCount, in this.processorThreadNamePrefix = processorThreadNamePrefix; this.initialLocalState = initialLocalState; this.queue = new ArrayBlockingQueue<>( maxQueueSize ); - setNumberOfProcessors( initialProcessorCount ); - } - - @Override - public int numberOfProcessors() - { - return processors.length; + processors( initialProcessorCount ); } @Override - public synchronized boolean incrementNumberOfProcessors() + public int processors( int delta ) { - if ( shutDown ) - { - return false; - } - int currentNumber = numberOfProcessors(); - if ( currentNumber >= maxProcessorCount ) + if ( shutDown || delta == 0 ) { - return false; + return processors.length; } - Processor[] newProcessors = Arrays.copyOf( processors, currentNumber + 1 ); - newProcessors[currentNumber] = new Processor( processorThreadNamePrefix + "-" + currentNumber ); - this.processors = newProcessors; - return true; - } - - @Override - public synchronized boolean decrementNumberOfProcessors() - { - if ( shutDown ) + int requestedNumber = processors.length + delta; + if ( delta > 0 ) { - return false; + requestedNumber = min( requestedNumber, maxProcessorCount ); + if ( requestedNumber > processors.length ) + { + Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber ); + for ( int i = processors.length; i < requestedNumber; i++ ) + { + newProcessors[i] = new Processor( processorThreadNamePrefix + "-" + i ); + } + this.processors = newProcessors; + } } - int currentNumber = numberOfProcessors(); - if ( currentNumber == 1 ) + else { - return false; + requestedNumber = max( 1, requestedNumber ); + if ( requestedNumber < processors.length ) + { + Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber ); + for ( int i = newProcessors.length; i < processors.length; i++ ) + { + processors[i].shutDown = true; + } + this.processors = newProcessors; + } } - - Processor[] newProcessors = Arrays.copyOf( processors, currentNumber - 1 ); - processors[currentNumber-1].shutDown = true; - processors = newProcessors; - return true; + return processors.length; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java index a1cef13348feb..11a4e85765d04 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java @@ -286,21 +286,9 @@ public void close() } @Override - public boolean incrementNumberOfProcessors() + public int processors( int delta ) { - return processing.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return processing.decrementNumberOfProcessors(); - } - - @Override - public int numberOfProcessors() - { - return processing.numberOfProcessors(); + return processing.processors( delta ); } private class BatchProvidingIterator extends PrefetchingIterator diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java index 273e563a107e1..0f69b26966eb2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java @@ -173,7 +173,7 @@ public StepStats stats() protected void collectStatsProviders( Collection into ) { into.add( new ProcessingStats( doneBatches.get()+queuedBatches.get(), doneBatches.get(), - totalProcessingTime.total(), totalProcessingTime.average() / numberOfProcessors(), + totalProcessingTime.total(), totalProcessingTime.average() / processors( 0 ), upstreamIdleTime.get(), downstreamIdleTime.get() ) ); into.addAll( additionalStatsProvider ); } @@ -240,6 +240,6 @@ protected void resetStats() public String toString() { return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(), - name, numberOfProcessors(), doneBatches.get() ); + name, processors( 0 ), doneBatches.get() ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java index 41ece6802e947..b8452bcba3838 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java @@ -101,7 +101,8 @@ private int assignProcessorsToPotentialBottleNeck( StageExecution execution, int int optimalProcessorIncrement = min( max( 1, (int) bottleNeck.other().floatValue() - 1 ), permits ); for ( int i = 0; i < optimalProcessorIncrement; i++ ) { - if ( bottleNeckStep.incrementNumberOfProcessors() ) + int before = bottleNeckStep.processors( 0 ); + if ( bottleNeckStep.processors( 1 ) > before ) { lastChangedProcessors.put( bottleNeckStep, doneBatches ); usedPermits++; @@ -115,7 +116,7 @@ private boolean removeProcessorFromPotentialIdleStep( StageExecution execution ) { for ( Pair,Float> fast : execution.stepsOrderedBy( Keys.avg_processing_time, true ) ) { - int numberOfProcessors = fast.first().numberOfProcessors(); + int numberOfProcessors = fast.first().processors( 0 ); if ( numberOfProcessors == 1 ) { continue; @@ -132,7 +133,8 @@ private boolean removeProcessorFromPotentialIdleStep( StageExecution execution ) long doneBatches = batches( fastestStep ); if ( batchesPassedSinceLastChange( fastestStep, doneBatches ) >= config.movingAverageSize() ) { - if ( fastestStep.decrementNumberOfProcessors() ) + int before = fastestStep.processors( 0 ); + if ( fastestStep.processors( -1 ) < before ) { lastChangedProcessors.put( fastestStep, doneBatches ); return true; @@ -169,7 +171,7 @@ private int countActiveProcessors( StageExecution[] executions ) // "steal" some of its processing power. long avg = avg( step ); float factor = (float)avg / (float)highestAverage; - processors += factor * step.numberOfProcessors(); + processors += factor * step.processors( 0 ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java index 5996f273d5edf..dc1e801c60995 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java @@ -69,22 +69,9 @@ protected long position() return data.position(); } - // We have to let our processing framework know about changes in processor count assigned to us @Override - public int numberOfProcessors() + public int processors( int delta ) { - return data.numberOfProcessors(); - } - - @Override - public boolean incrementNumberOfProcessors() - { - return data.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return data.decrementNumberOfProcessors(); + return data.processors( delta ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index b68a07508c328..3c627323a6e8c 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -87,7 +87,7 @@ private int theoreticalMaxProcessors() public long receive( final long ticket, final T batch ) { // Don't go too far ahead - long idleTime = await( catchUp, executor.numberOfProcessors(), healthChecker, park ); + long idleTime = await( catchUp, executor.processors( 0 ), healthChecker, park ); incrementQueue(); executor.submit( sender -> { assertHealthy(); @@ -182,21 +182,9 @@ public void close() throws Exception } @Override - public int numberOfProcessors() + public int processors( int delta ) { - return executor.numberOfProcessors(); - } - - @Override - public boolean incrementNumberOfProcessors() - { - return executor.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return executor.decrementNumberOfProcessors(); + return executor.processors( delta ); } @SuppressWarnings( "unchecked" ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java index 063f3de7dfacc..d5e892d2d253a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/SpectrumExecutionMonitor.java @@ -154,11 +154,11 @@ private void printSpectrum( StringBuilder builder, StageExecution execution, int boolean isBottleNeck = bottleNeck.first() == step; String name = (isBottleNeck ? "*" : "") + - stats.toString( DetailLevel.IMPORTANT ) + (step.numberOfProcessors() > 1 - ? "(" + step.numberOfProcessors() + ")" + stats.toString( DetailLevel.IMPORTANT ) + (step.processors( 0 ) > 1 + ? "(" + step.processors( 0 ) + ")" : ""); int charIndex = 0; // negative value "delays" the text, i.e. pushes it to the right - char backgroundChar = step.numberOfProcessors() > 1 ? '=' : '-'; + char backgroundChar = step.processors( 0 ) > 1 ? '=' : '-'; for ( int i = 0; i < stepWidth; i++, charIndex++ ) { char ch = backgroundChar; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java index 32c7401e13cdc..1e9610f4e6c32 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessing.java @@ -184,20 +184,8 @@ public TO next() } @Override - public int numberOfProcessors() + public int processors( int delta ) { - return executor.numberOfProcessors(); - } - - @Override - public boolean incrementNumberOfProcessors() - { - return executor.incrementNumberOfProcessors(); - } - - @Override - public boolean decrementNumberOfProcessors() - { - return executor.decrementNumberOfProcessors(); + return executor.processors( delta ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java index aed0bff4a670f..21e908244bee3 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java @@ -51,7 +51,7 @@ public void panic( Throwable cause ) NodeRelationshipCache cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, -1 ); Step step = new CalculateDenseNodesStep( control, config, cache ); step.start( 0 ); - maxOutNumberOfProcessors( step ); + step.processors( 100 ); // WHEN sending many batches, all which "happens" to have ids of the same radix, in fact // this test "happens" to send the same batch of ids over and over, which actually may happen in read life, @@ -89,12 +89,4 @@ private long[] batchOfIdsWithRadix( int radixOutOfTen ) } return ids; } - - private void maxOutNumberOfProcessors( Step step ) - { - for ( int i = 0; i < 100 && step.incrementNumberOfProcessors(); i++ ) - { - // Then increment number of processors - } - } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java index a94792d73c8cb..5e234a2bfd98d 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutorTest.java @@ -31,7 +31,6 @@ import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy.Park; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -85,7 +84,7 @@ public void shouldIncrementNumberOfProcessorsWhenRunning() throws Exception executor.submit( task1 ); task1.latch.awaitStart(); executor.submit( task2 ); - executor.setNumberOfProcessors( 2 ); + executor.processors( 1 ); // now at 2 while ( task2.executed == 0 ) { // With one additional worker, the second task can execute even if task one is still executing } @@ -118,7 +117,7 @@ public void shouldDecrementNumberOfProcessorsWhenRunning() throws Exception task2.latch.awaitStart(); executor.submit( task3 ); executor.submit( task4 ); - executor.setNumberOfProcessors( 1 ); + executor.processors( -1 ); // it started at 2 ^^^ task1.latch.finish(); task2.latch.finish(); task3.latch.awaitStart(); @@ -280,16 +279,20 @@ public Void doWork( Void state ) throws Exception public void shouldRespectMaxProcessors() throws Exception { // GIVEN - final TaskExecutor executor = new DynamicTaskExecutor<>( 1, 4, 10, PARK, + int maxProcessors = 4; + final TaskExecutor executor = new DynamicTaskExecutor<>( 1, maxProcessors, 10, PARK, getClass().getSimpleName() ); // WHEN/THEN - assertEquals( 1, executor.numberOfProcessors() ); - assertFalse( executor.decrementNumberOfProcessors() ); - assertTrue( executor.incrementNumberOfProcessors() ); - assertEquals( 2, executor.numberOfProcessors() ); - executor.setNumberOfProcessors( 10 ); - assertEquals( 4, executor.numberOfProcessors() ); + assertEquals( 1, executor.processors( 0 ) ); + assertEquals( 2, executor.processors( 1 ) ); + assertEquals( 4, executor.processors( 3 ) /*would have gone to 5 otherwise*/ ); + assertEquals( 4, executor.processors( 0 ) ); + assertEquals( 4, executor.processors( 1 ) ); + assertEquals( 3, executor.processors( -1 ) ); + assertEquals( 1, executor.processors( -2 ) ); + assertEquals( 1, executor.processors( -2 ) ); + assertEquals( 1, executor.processors( 0 ) ); executor.shutdown( SF_AWAIT_ALL_COMPLETED ); } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java index ba8c391da2a74..4c91f8325eaba 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java @@ -99,7 +99,7 @@ public void shouldCacheAndRetrieveNodes() throws Exception // WHEN/THEN try ( InputIterator reader = cache.nodes( MAIN, true ).iterator() ) { - reader.setNumberOfProcessors( 50 ); + reader.processors( 50 - reader.processors( 0 ) ); Iterator expected = nodes.iterator(); while ( expected.hasNext() ) { @@ -141,7 +141,7 @@ public void shouldCacheAndRetrieveRelationships() throws Exception // WHEN/THEN try ( InputIterator reader = cache.relationships( MAIN, true ).iterator() ) { - reader.setNumberOfProcessors( 50 ); + reader.processors( 50 - reader.processors( 0 ) ); Iterator expected = relationships.iterator(); while ( expected.hasNext() ) { @@ -186,7 +186,7 @@ StandardV3_0.RECORD_FORMATS, withMaxProcessors( 8 ) ) ) { try ( InputIterator reader = cache.relationships( MAIN, false ).iterator() ) { - reader.setNumberOfProcessors( i ); + reader.processors( i - reader.processors( 0 ) ); long time = currentTimeMillis(); count( reader ); time = currentTimeMillis() - time; diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java index cbe867c1610ff..8928ade4781e1 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ControlledStep.java @@ -33,6 +33,9 @@ import static org.junit.Assert.assertTrue; +import static java.lang.Integer.max; +import static java.lang.Integer.min; + /** * A bit like a mocked {@link Step}, but easier to work with. */ @@ -74,43 +77,29 @@ public ControlledStep( String name, int maxProcessors, int initialProcessorCount { this.maxProcessors = maxProcessors == 0 ? Integer.MAX_VALUE : maxProcessors; this.name = name; - setNumberOfProcessors( initialProcessorCount ); - } - - @Override - public int numberOfProcessors() - { - return numberOfProcessors; + processors( initialProcessorCount-1 ); } public ControlledStep setProcessors( int numberOfProcessors ) { assertTrue( numberOfProcessors <= maxProcessors ); this.numberOfProcessors = numberOfProcessors; - setNumberOfProcessors( numberOfProcessors ); + processors( numberOfProcessors-numberOfProcessors ); return this; } @Override - public synchronized boolean incrementNumberOfProcessors() + public int processors( int delta ) { - if ( numberOfProcessors >= maxProcessors ) + if ( delta > 0 ) { - return false; + numberOfProcessors = min( numberOfProcessors + delta, maxProcessors ); } - numberOfProcessors++; - return true; - } - - @Override - public synchronized boolean decrementNumberOfProcessors() - { - if ( numberOfProcessors == 1 ) + else if ( delta < 0 ) { - return false; + numberOfProcessors = max( 1, numberOfProcessors + delta ); } - numberOfProcessors--; - return true; + return numberOfProcessors; } @Override diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java index 9614d5d0088a2..4fd09d0dc0736 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssignerTest.java @@ -54,8 +54,8 @@ public void shouldAssignAdditionalCPUToBottleNeckStep() throws Exception assigner.check( execution ); // THEN - assertEquals( 5, slowStep.numberOfProcessors() ); - assertEquals( 1, fastStep.numberOfProcessors() ); + assertEquals( 5, slowStep.processors( 0 ) ); + assertEquals( 1, fastStep.processors( 0 ) ); } @Test @@ -78,8 +78,7 @@ public void shouldRemoveCPUsFromWayTooFastStep() throws Exception assigner.check( execution ); // THEN one processor should be removed from the fast step - verify( fastStep, times( 0 ) ).incrementNumberOfProcessors(); - verify( fastStep, times( 1 ) ).decrementNumberOfProcessors(); + verify( fastStep, times( 1 ) ).processors( -1 ); } @Test @@ -100,8 +99,8 @@ public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Ex assigner.check( execution ); // THEN one processor should be removed from the fast step - verify( fastStep, times( 0 ) ).incrementNumberOfProcessors(); - verify( fastStep, times( 0 ) ).decrementNumberOfProcessors(); + verify( fastStep, times( 0 ) ).processors( 1 ); + verify( fastStep, times( 0 ) ).processors( -1 ); } @Test @@ -121,8 +120,8 @@ public void shouldHandleZeroAverage() throws Exception assigner.check( execution ); // THEN - assertEquals( 1, aStep.numberOfProcessors() ); - assertEquals( 1, anotherStep.numberOfProcessors() ); + assertEquals( 1, aStep.processors( 0 ) ); + assertEquals( 1, anotherStep.processors( 0 ) ); } @Test @@ -147,7 +146,7 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce assigner.check( execution ); // THEN - verify( fast ).decrementNumberOfProcessors(); + verify( fast ).processors( -1 ); } private Configuration movingAverageConfig( final int movingAverage ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java index 2c9657aad178d..0ee93db89b87d 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorAssignmentStrategies.java @@ -60,7 +60,8 @@ private void saturate( final int availableProcessor, StageExecution[] executions { for ( Step step : execution.steps() ) { - if ( random.nextBoolean() && step.incrementNumberOfProcessors() && --processors == 0 ) + int before = step.processors( 0 ); + if ( random.nextBoolean() && step.processors( 1 ) > before && --processors == 0 ) { return; } @@ -105,7 +106,8 @@ private void saturate( StageExecution[] executions ) { for ( Step step : execution.steps() ) { - if ( random.nextBoolean() && step.incrementNumberOfProcessors() ) + int before = step.processors( 0 ); + if ( random.nextBoolean() && step.processors( -1 ) < before ) { processors--; if ( --maxThisCheck == 0 ) @@ -136,7 +138,7 @@ protected void registerProcessorCount( StageExecution[] executions ) processors.put( execution.getStageName(), byStage ); for ( Step step : execution.steps() ) { - byStage.put( step.name(), step.numberOfProcessors() ); + byStage.put( step.name(), step.processors( 0 ) ); } } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java index 0e68378927e0f..ff1aaa1746b77 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java @@ -47,7 +47,7 @@ public void shouldUpholdProcessOrderingGuarantee() throws Exception StageControl control = mock( StageControl.class ); MyProcessorStep step = new MyProcessorStep( control, 0 ); step.start( ORDER_PROCESS ); - step.setNumberOfProcessors( 5 ); + step.processors( 4 ); // now at 5 // WHEN int batches = 10; @@ -75,7 +75,7 @@ public void shouldHaveTaskQueueSizeEqualToNumberOfProcessorsIfSpecificallySet() final int processors = 2; final ProcessorStep step = new BlockingProcessorStep( control, processors, latch ); step.start( ORDER_PROCESS ); - step.setNumberOfProcessors( 2 ); + step.processors( 1 ); // now at 2 // adding two should be fine for ( int i = 0; i < processors+1 /* +1 since we allow queueing one more*/; i++ ) { @@ -99,9 +99,9 @@ public void shouldHaveTaskQueueSizeEqualToCurrentNumberOfProcessorsIfNotSpecific final CountDownLatch latch = new CountDownLatch( 1 ); final ProcessorStep step = new BlockingProcessorStep( control, 0, latch ); step.start( ORDER_PROCESS ); - step.setNumberOfProcessors( 3 ); + step.processors( 2 ); // now at 3 // adding two should be fine - for ( int i = 0; i < step.numberOfProcessors()+1 /* +1 since we allow queueing one more*/; i++ ) + for ( int i = 0; i < step.processors( 0 )+1 /* +1 since we allow queueing one more*/; i++ ) { step.receive( i, null ); } @@ -112,7 +112,7 @@ public void shouldHaveTaskQueueSizeEqualToCurrentNumberOfProcessorsIfNotSpecific @Override public Void doWork( Void state ) throws Exception { - step.receive( step.numberOfProcessors(), null ); + step.receive( step.processors( 0 ), null ); return null; } } ); diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/StageTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/StageTest.java index 2226efc4d7fe4..ff3682727b91d 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/StageTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/StageTest.java @@ -79,7 +79,7 @@ protected Object nextBatchOrNull( long ticket, int batchSize ) for ( Step step : execution.steps() ) { // we start off with two in each step - step.incrementNumberOfProcessors(); + step.processors( 1 ); } new ExecutionSupervisor( ExecutionMonitors.invisible() ).supervise( execution ); diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java index 9aed766eb5c64..ab331e09732f9 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/TicketedProcessingTest.java @@ -57,7 +57,7 @@ public void shouldReturnTicketsInOrder() throws Exception int processorCount = Runtime.getRuntime().availableProcessors(); TicketedProcessing processing = new TicketedProcessing<>( "Doubler", processorCount, processor, () -> null ); - processing.setNumberOfProcessors( processorCount ); + processing.processors( processorCount - processing.processors( 0 ) ); // WHEN Future assertions = asserter.execute( new WorkerCommand()