Skip to content

Commit

Permalink
Less eager decrementing of processors
Browse files Browse the repository at this point in the history
so that a step will not decrement number of processors until it becomes
the bottleneck itself, instead possible just decrement until only slightly
ahead of the next slowest. This fixes an unnecessary behaviour where a
decrement would be followed by an immediate increment sometimes and it
could keep on flipping like that.
  • Loading branch information
tinwelint committed Apr 9, 2015
1 parent 4cc7ad5 commit aecd5d7
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 52 deletions.
Expand Up @@ -118,8 +118,18 @@ private boolean removeProcessorFromPotentialIdleStep( StageExecution execution )
{
for ( Pair<Step<?>,Float> fast : execution.stepsOrderedBy( Keys.avg_processing_time, true ) )
{
float threshold = 1f - (1f/fast.first().numberOfProcessors());
if ( fast.other() < threshold )
int numberOfProcessors = fast.first().numberOfProcessors();
if ( numberOfProcessors == 1 )
{
continue;
}

// Translate the factor compared to the next (slower) step and see if this step would still
// be faster if we decremented the processor count, with a slight conservative margin as well
// (0.8 instead of 1.0 so that we don't decrement and immediately become the bottleneck ourselves).
float factorWithDecrementedProcessorCount =
fast.other().floatValue()*numberOfProcessors/(numberOfProcessors-1);
if ( factorWithDecrementedProcessorCount < 0.8f )
{
Step<?> fastestStep = fast.first();
long doneBatches = batches( fastestStep );
Expand Down
Expand Up @@ -38,43 +38,42 @@
*/
public class ControlledStep<T> implements Step<T>, StatsProvider
{
public static ControlledStep<?> stepWithAverageOf( long avg )
public static ControlledStep<?> stepWithAverageOf( String name, int maxProcessors, long avg )
{
ControlledStep<?> step = new ControlledStep<>( "test", true );
step.setStat( Keys.avg_processing_time, avg );
return step;
return stepWithStats( name, maxProcessors, Keys.avg_processing_time, avg );
}

public static ControlledStep<?> stepWithStats( Map<Key,Long> statistics )
public static ControlledStep<?> stepWithStats( String name, int maxProcessors,
Map<Key,Long> statistics )
{
ControlledStep<?> step = new ControlledStep<>( "test", true );
ControlledStep<?> step = new ControlledStep<>( name, maxProcessors );
for ( Map.Entry<Key,Long> statistic : statistics.entrySet() )
{
step.setStat( statistic.getKey(), statistic.getValue().longValue() );
}
return step;
}

public static ControlledStep<?> stepWithStats( Object... statisticsAltKeyAndValue )
public static ControlledStep<?> stepWithStats( String name, int maxProcessors, Object... statisticsAltKeyAndValue )
{
return stepWithStats( MapUtil.<Key,Long>genericMap( statisticsAltKeyAndValue ) );
return stepWithStats( name, maxProcessors, MapUtil.<Key,Long>genericMap( statisticsAltKeyAndValue ) );
}

private final String name;
private final Map<Key,ControlledStat> stats = new HashMap<>();
private final boolean allowMultipleProcessors;
private final int maxProcessors;
private volatile int numberOfProcessors = 1;

public ControlledStep( String name, boolean allowMultipleProcessors )
public ControlledStep( String name, int maxProcessors )
{
this( name, allowMultipleProcessors, 1 );
this( name, maxProcessors, 1 );
}

public ControlledStep( String name, boolean allowMultipleProcessors, int initialProcessorCount )
public ControlledStep( String name, int maxProcessors, int initialProcessorCount )
{
this.maxProcessors = maxProcessors == 0 ? Integer.MAX_VALUE : maxProcessors;
this.name = name;
this.allowMultipleProcessors = allowMultipleProcessors;
this.numberOfProcessors = initialProcessorCount;
setNumberOfProcessors( initialProcessorCount );
}

@Override
Expand All @@ -85,15 +84,15 @@ public int numberOfProcessors()

public ControlledStep<T> setNumberOfProcessors( int numberOfProcessors )
{
assertTrue( allowMultipleProcessors );
assertTrue( numberOfProcessors <= maxProcessors );
this.numberOfProcessors = numberOfProcessors;
return this;
}

@Override
public synchronized boolean incrementNumberOfProcessors()
{
if ( !allowMultipleProcessors )
if ( numberOfProcessors >= maxProcessors )
{
return false;
}
Expand Down Expand Up @@ -198,5 +197,17 @@ public long asLong()
{
return value;
}

@Override
public String toString()
{
return "" + value;
}
}

@Override
public String toString()
{
return getClass().getSimpleName() + "[" + name() + ", " + stats + "]";
}
}
Expand Up @@ -24,7 +24,6 @@
import java.util.Arrays;

import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.spy;
Expand All @@ -44,13 +43,8 @@ public void shouldAssignAdditionalCPUToBottleNeckStep() throws Exception
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 );

ControlledStep<?> slowStep = new ControlledStep<>( "slow", true );
slowStep.setStat( Keys.avg_processing_time, 10 );
slowStep.setStat( Keys.done_batches, 10 );

ControlledStep<?> fastStep = new ControlledStep<>( "fast", true );
fastStep.setStat( Keys.avg_processing_time, 2 );
fastStep.setStat( Keys.done_batches, 10 );
ControlledStep<?> slowStep = stepWithStats( "slow", 0, avg_processing_time, 10L, done_batches, 10L );
ControlledStep<?> fastStep = stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L );

StageExecution[] execution = executionOf( config, slowStep, fastStep );
assigner.start( execution );
Expand All @@ -72,38 +66,52 @@ public void shouldRemoveCPUsFromWayTooFastStep() throws Exception
// and it rounds down. So there's room for assigning one more.
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 3 );

ControlledStep<?> slowStep = spy( new ControlledStep<>( "slow", true ) );
slowStep.setStat( Keys.avg_processing_time, 10 );
slowStep.setStat( Keys.done_batches, 10 );

ControlledStep<?> fastStep = spy( new ControlledStep<>( "fast", true, 2 ) );
fastStep.setStat( Keys.avg_processing_time, 2 );
fastStep.setStat( Keys.done_batches, 10 );
ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) );
ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L )
.setNumberOfProcessors( 2 ) );

StageExecution[] execution = executionOf( config, slowStep, fastStep );
assigner.start( execution );

// WHEN first checking
// WHEN checking
assigner.check( execution );
// THEN one additional processor will be added to the slow step

// THEN one processor should be removed from the fast step
verify( fastStep, times( 0 ) ).incrementNumberOfProcessors();
verify( fastStep, times( 1 ) ).decrementNumberOfProcessors();
}

@Test
public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Exception
{
// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 3 );

ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) );
ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 7L, done_batches, 10L )
.setNumberOfProcessors( 3 ) );

StageExecution[] execution = executionOf( config, slowStep, fastStep );
assigner.start( execution );

// WHEN checking the first time
assigner.check( execution );

// THEN one processor should be removed from the fast step
verify( fastStep, times( 0 ) ).incrementNumberOfProcessors();
verify( fastStep, times( 0 ) ).decrementNumberOfProcessors();
}

@Test
public void shouldHandleZeroAverage() throws Exception
{
// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 );

ControlledStep<?> aStep = new ControlledStep<>( "slow", true );
aStep.setStat( Keys.avg_processing_time, 0 );
aStep.setStat( Keys.done_batches, 0 );

ControlledStep<?> anotherStep = new ControlledStep<>( "fast", true );
anotherStep.setStat( Keys.avg_processing_time, 0 );
anotherStep.setStat( Keys.done_batches, 0 );
ControlledStep<?> aStep = stepWithStats( "slow", 0, avg_processing_time, 0L, done_batches, 0L );
ControlledStep<?> anotherStep = stepWithStats( "fast", 0, avg_processing_time, 0L, done_batches, 0L );

StageExecution[] execution = executionOf( config, aStep, anotherStep );
assigner.start( execution );
Expand All @@ -127,9 +135,10 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce
// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 );
Step<?> wayFastest = stepWithStats( avg_processing_time, 0L, done_batches, 20L );
Step<?> fast = spy( stepWithStats( avg_processing_time, 100L, done_batches, 20L ).setNumberOfProcessors( 3 ) );
Step<?> slow = stepWithStats( avg_processing_time, 200L, done_batches, 20L );
Step<?> wayFastest = stepWithStats( "wayFastest", 0, avg_processing_time, 0L, done_batches, 20L );
Step<?> fast = spy( stepWithStats( "fast", 0, avg_processing_time, 100L, done_batches, 20L )
.setNumberOfProcessors( 3 ) );
Step<?> slow = stepWithStats( "slow", 1, avg_processing_time, 220L, done_batches, 20L );
StageExecution[] execution = executionOf( config, slow, wayFastest, fast );
assigner.start( execution );

Expand Down
Expand Up @@ -41,9 +41,9 @@ public void shouldOrderStepsAscending() throws Exception
{
// GIVEN
Collection<Step<?>> steps = new ArrayList<>();
steps.add( stepWithAverageOf( 10 ) );
steps.add( stepWithAverageOf( 5 ) );
steps.add( stepWithAverageOf( 30 ) );
steps.add( stepWithAverageOf( "step1", 0, 10 ) );
steps.add( stepWithAverageOf( "step2", 0, 5 ) );
steps.add( stepWithAverageOf( "step3", 0, 30 ) );
StageExecution execution = new StageExecution( "Test", DEFAULT, steps, true );

// WHEN
Expand All @@ -64,10 +64,10 @@ public void shouldOrderStepsDescending() throws Exception
{
// GIVEN
Collection<Step<?>> steps = new ArrayList<>();
steps.add( stepWithAverageOf( 10 ) );
steps.add( stepWithAverageOf( 5 ) );
steps.add( stepWithAverageOf( 30 ) );
steps.add( stepWithAverageOf( 5 ) );
steps.add( stepWithAverageOf( "step1", 0, 10 ) );
steps.add( stepWithAverageOf( "step2", 0, 5 ) );
steps.add( stepWithAverageOf( "step3", 0, 30 ) );
steps.add( stepWithAverageOf( "step4", 0, 5 ) );
StageExecution execution = new StageExecution( "Test", DEFAULT, steps, true );

// WHEN
Expand Down

0 comments on commit aecd5d7

Please sign in to comment.