Skip to content

Commit

Permalink
More consistently removes processors from slow steps in importer
Browse files Browse the repository at this point in the history
previously processors could be stuck on some steps just because not
all processors (including stolen from less active steps) scewing the
distribution indefinitely.
  • Loading branch information
tinwelint committed Apr 27, 2017
1 parent fbef3f5 commit 655a84b
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 27 deletions.
Expand Up @@ -49,11 +49,11 @@ public class DynamicProcessorAssigner extends ExecutionMonitor.Adapter
private final Map<Step<?>,Long/*done batches*/> lastChangedProcessors = new HashMap<>();
private final int availableProcessors;

public DynamicProcessorAssigner( Configuration config, int availableProcessors )
public DynamicProcessorAssigner( Configuration config )
{
super( 500, MILLISECONDS );
this.config = config;
this.availableProcessors = availableProcessors;
this.availableProcessors = config.maxNumberOfProcessors();
}

@Override
Expand All @@ -65,23 +65,20 @@ public void start( StageExecution execution )
@Override
public void check( StageExecution execution )
{
int permits = availableProcessors - countActiveProcessors( execution );
if ( execution.stillExecuting() )
{
int permits = availableProcessors - countActiveProcessors( execution );
if ( permits > 0 )
{
// Be swift at assigning processors to slow steps, i.e. potentially multiple per round
permits -= assignProcessorsToPotentialBottleNeck( execution, permits );
assignProcessorsToPotentialBottleNeck( execution, permits );
}
// Be a little more conservative removing processors from too fast steps
if ( permits == 0 && removeProcessorFromPotentialIdleStep( execution ) )
{
permits++;
}
removeProcessorFromPotentialIdleStep( execution );
}
}

private int assignProcessorsToPotentialBottleNeck( StageExecution execution, int permits )
private void assignProcessorsToPotentialBottleNeck( StageExecution execution, int permits )
{
Pair<Step<?>,Float> bottleNeck = execution.stepsOrderedBy( Keys.avg_processing_time, false ).iterator().next();
Step<?> bottleNeckStep = bottleNeck.first();
Expand All @@ -98,13 +95,11 @@ private int assignProcessorsToPotentialBottleNeck( StageExecution execution, int
if ( after > before )
{
lastChangedProcessors.put( bottleNeckStep, doneBatches );
usedPermits -= after - before;
}
}
return usedPermits;
}

private boolean removeProcessorFromPotentialIdleStep( StageExecution execution )
private void removeProcessorFromPotentialIdleStep( StageExecution execution )
{
for ( Pair<Step<?>,Float> fast : execution.stepsOrderedBy( Keys.avg_processing_time, true ) )
{
Expand All @@ -129,12 +124,11 @@ private boolean removeProcessorFromPotentialIdleStep( StageExecution execution )
if ( fastestStep.processors( -1 ) < before )
{
lastChangedProcessors.put( fastestStep, doneBatches );
return true;
return;
}
}
}
}
return false;
}

private long avg( Step<?> step )
Expand Down
Expand Up @@ -98,8 +98,7 @@ public static void superviseExecution( ExecutionMonitor monitor, Configuration c
*/
public static ExecutionMonitor withDynamicProcessorAssignment( ExecutionMonitor monitor, Configuration config )
{
DynamicProcessorAssigner dynamicProcessorAssigner = new DynamicProcessorAssigner( config,
min( config.maxNumberOfProcessors(), getRuntime().availableProcessors() ) );
DynamicProcessorAssigner dynamicProcessorAssigner = new DynamicProcessorAssigner( config );
return new MultiExecutionMonitor( monitor, dynamicProcessorAssigner );
}
}
Expand Up @@ -40,8 +40,8 @@ public class DynamicProcessorAssignerTest
public void shouldAssignAdditionalCPUToBottleNeckStep() throws Exception
{
// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 );
Configuration config = config( 10, 5 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config );

ControlledStep<?> slowStep = stepWithStats( "slow", 0, avg_processing_time, 10L, done_batches, 10L );
ControlledStep<?> fastStep = stepWithStats( "fast", 0, avg_processing_time, 2L, done_batches, 10L );
Expand All @@ -61,10 +61,10 @@ public void shouldAssignAdditionalCPUToBottleNeckStep() throws Exception
public void shouldRemoveCPUsFromWayTooFastStep() throws Exception
{
// GIVEN
Configuration config = movingAverageConfig( 10 );
Configuration config = config( 10, 3 );
// available processors = 2 is enough because it will see the fast step as only using 20% of a processor
// and it rounds down. So there's room for assigning one more.
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 3 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config );

ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 6L, done_batches, 10L )
.setProcessors( 2 ) );
Expand All @@ -85,8 +85,8 @@ public void shouldRemoveCPUsFromWayTooFastStep() throws Exception
public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Exception
{
// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 3 );
Configuration config = config( 10, 3 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config );

ControlledStep<?> slowStep = spy( stepWithStats( "slow", 1, avg_processing_time, 10L, done_batches, 10L ) );
ControlledStep<?> fastStep = spy( stepWithStats( "fast", 0, avg_processing_time, 7L, done_batches, 10L )
Expand All @@ -107,8 +107,8 @@ public void shouldRemoveCPUsButNotSoThatTheFastStepBecomesBottleneck() throws Ex
public void shouldHandleZeroAverage() throws Exception
{
// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 5 );
Configuration config = config( 10, 5 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config );

ControlledStep<?> aStep = stepWithStats( "slow", 0, avg_processing_time, 0L, done_batches, 0L );
ControlledStep<?> anotherStep = stepWithStats( "fast", 0, avg_processing_time, 0L, done_batches, 0L );
Expand All @@ -133,8 +133,8 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce
// have those be assigned to a more appropriate step instead, where it will benefit the Stage more.

// GIVEN
Configuration config = movingAverageConfig( 10 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config, 3 );
Configuration config = config( 10, 3 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config );
Step<?> wayFastest = stepWithStats( "wayFastest", 0, avg_processing_time, 50L, done_batches, 20L );
Step<?> fast = spy( stepWithStats( "fast", 0, avg_processing_time, 100L, done_batches, 20L )
.setProcessors( 3 ) );
Expand All @@ -149,7 +149,34 @@ public void shouldRemoveCPUsFromTooFastStepEvenIfThereIsAWayFaster() throws Exce
verify( fast ).processors( -1 );
}

private Configuration movingAverageConfig( final int movingAverage )
@Test
public void shouldRemoveCPUsFromTooFastStepEvenIfNotAllPermitsAreUsed() throws Exception
{
// GIVEN
Configuration config = config( 10, 20 );
DynamicProcessorAssigner assigner = new DynamicProcessorAssigner( config );
Step<?> wayFastest = spy( stepWithStats( "wayFastest", 0,
avg_processing_time, 50L,
done_batches, 20L )
.setProcessors( 5 ) );
Step<?> fast = spy( stepWithStats( "fast", 0,
avg_processing_time, 100L,
done_batches, 20L )
.setProcessors( 3 ) );
Step<?> slow = stepWithStats( "slow", 1,
avg_processing_time, 220L,
done_batches, 20L );
StageExecution execution = executionOf( config, slow, wayFastest, fast );
assigner.start( execution );

// WHEN
assigner.check( execution );

// THEN
verify( wayFastest ).processors( -1 );
}

private Configuration config( final int movingAverage, int processors )
{
return new Configuration()
{
Expand All @@ -158,6 +185,12 @@ public int movingAverageSize()
{
return movingAverage;
}

@Override
public int maxNumberOfProcessors()
{
return processors;
}
};
}

Expand Down

0 comments on commit 655a84b

Please sign in to comment.