Skip to content

Commit

Permalink
Reduces Parallelizable interface to one method
Browse files Browse the repository at this point in the history
because most of the time that interface is about delegation and delegating
4 methods in loads of places turns out to be really annoying.
The interface is reduced to one method `processers(delta)` which can
be used to do everything those 4 methods could previous AND delegation
becomes very simple.
  • Loading branch information
tinwelint committed Jun 22, 2016
1 parent ddb5d41 commit ab150a5
Show file tree
Hide file tree
Showing 20 changed files with 118 additions and 265 deletions.
Expand Up @@ -87,22 +87,9 @@ public void close()
processing.shutdown( false ); processing.shutdown( false );
} }


// We have to let our processing framework know about changes in processor count assigned to us
@Override @Override
public int numberOfProcessors() public int processors( int delta )
{ {
return processing.numberOfProcessors(); return processing.processors( delta );
}

@Override
public boolean incrementNumberOfProcessors()
{
return processing.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return processing.decrementNumberOfProcessors();
} }
} }
Expand Up @@ -105,21 +105,9 @@ public long position()
} }


@Override @Override
public int numberOfProcessors() public int processors( int delta )
{ {
return actual.numberOfProcessors(); return actual.processors( delta );
}

@Override
public boolean incrementNumberOfProcessors()
{
return actual.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return actual.decrementNumberOfProcessors();
} }
} }


Expand Down
Expand Up @@ -21,85 +21,27 @@


/** /**
* Represents something that can be parallelizable, in this case that means the ability to dynamically change * Represents something that can be parallelizable, in this case that means the ability to dynamically change
* the number of processors executing that tasks ahead. * the number of processors executing tasks.
*/ */
public interface Parallelizable public interface Parallelizable
{ {
/** /**
* @return number of processors processing incoming tasks in parallel. * Change number of processors assigned to this {@link Parallelizable}. Accepts a {@code delta},
*/ * which may specify positive or negative value, even zero. This instances may have internal constraints
default int numberOfProcessors() * in the number of processors, min or max, which may be assigned and so potentially the change will
{ * only be partially accepted or not at all. This is why this call returns the total number of processors
return 1; * this instance now has accepted after any effect of this call.
}

/**
* Increments number of processors that processes tasks in parallel.
* *
* @return {@code true} if one more processor was assigned, otherwise {@code false}. * {@link Parallelizable} is used in many call stacks where call delegation is predominant and so
*/ * reducing number of methods to delegate is favored. This is why this method looks and functions
default boolean incrementNumberOfProcessors() * like this, it can cater for incrementing, decrementing and even getting number of processors.
{
return false;
}

/**
* Decrements number of processors that processes tasks in parallel.
* *
* @return {@code true} if one processor was unassigned, otherwise {@code false}. * @param delta number of processors to add or remove, i.e. negative or positive value. A value of
* zero will result in merely the current number of assigned processors to be returned.
* @return the number of assigned processors as a result this call.
*/ */
default boolean decrementNumberOfProcessors() default int processors( int delta )
{ {
return false; return 1;
}

/**
* Tries to set specified number of processors. If {@code processors} would be out of bounds
* for what this instance can assign then a value within bounds will be set instead.
*
* @param processors number of desired processors.
* @return number of actual processors after this call.
*/
default int setNumberOfProcessors( int processors )
{
int current;
while ( (current = numberOfProcessors()) != processors )
{
boolean success = current < processors ? incrementNumberOfProcessors() :
decrementNumberOfProcessors();
if ( !success )
{
break;
}
}
return current;
}

class Delegate implements Parallelizable
{
protected final Parallelizable actual;

public Delegate( Parallelizable actual )
{
this.actual = actual;
}

@Override
public int numberOfProcessors()
{
return actual.numberOfProcessors();
}

@Override
public boolean incrementNumberOfProcessors()
{
return actual.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return actual.decrementNumberOfProcessors();
}
} }
} }
Expand Up @@ -27,7 +27,11 @@
import java.util.function.Supplier; import java.util.function.Supplier;


import org.neo4j.function.Suppliers; import org.neo4j.function.Suppliers;

import static java.lang.Integer.max;
import static java.lang.Integer.min;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;

import static org.neo4j.helpers.Exceptions.launderedException; import static org.neo4j.helpers.Exceptions.launderedException;


/** /**
Expand Down Expand Up @@ -68,51 +72,45 @@ public DynamicTaskExecutor( int initialProcessorCount, int maxProcessorCount, in
this.processorThreadNamePrefix = processorThreadNamePrefix; this.processorThreadNamePrefix = processorThreadNamePrefix;
this.initialLocalState = initialLocalState; this.initialLocalState = initialLocalState;
this.queue = new ArrayBlockingQueue<>( maxQueueSize ); this.queue = new ArrayBlockingQueue<>( maxQueueSize );
setNumberOfProcessors( initialProcessorCount ); processors( initialProcessorCount );
}

@Override
public int numberOfProcessors()
{
return processors.length;
} }


@Override @Override
public synchronized boolean incrementNumberOfProcessors() public int processors( int delta )
{ {
if ( shutDown ) if ( shutDown || delta == 0 )
{
return false;
}
int currentNumber = numberOfProcessors();
if ( currentNumber >= maxProcessorCount )
{ {
return false; return processors.length;
} }


Processor[] newProcessors = Arrays.copyOf( processors, currentNumber + 1 ); int requestedNumber = processors.length + delta;
newProcessors[currentNumber] = new Processor( processorThreadNamePrefix + "-" + currentNumber ); if ( delta > 0 )
this.processors = newProcessors;
return true;
}

@Override
public synchronized boolean decrementNumberOfProcessors()
{
if ( shutDown )
{ {
return false; requestedNumber = min( requestedNumber, maxProcessorCount );
if ( requestedNumber > processors.length )
{
Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber );
for ( int i = processors.length; i < requestedNumber; i++ )
{
newProcessors[i] = new Processor( processorThreadNamePrefix + "-" + i );
}
this.processors = newProcessors;
}
} }
int currentNumber = numberOfProcessors(); else
if ( currentNumber == 1 )
{ {
return false; requestedNumber = max( 1, requestedNumber );
if ( requestedNumber < processors.length )
{
Processor[] newProcessors = Arrays.copyOf( processors, requestedNumber );
for ( int i = newProcessors.length; i < processors.length; i++ )
{
processors[i].shutDown = true;
}
this.processors = newProcessors;
}
} }

return processors.length;
Processor[] newProcessors = Arrays.copyOf( processors, currentNumber - 1 );
processors[currentNumber-1].shutDown = true;
processors = newProcessors;
return true;
} }


@Override @Override
Expand Down
Expand Up @@ -286,21 +286,9 @@ public void close()
} }


@Override @Override
public boolean incrementNumberOfProcessors() public int processors( int delta )
{ {
return processing.incrementNumberOfProcessors(); return processing.processors( delta );
}

@Override
public boolean decrementNumberOfProcessors()
{
return processing.decrementNumberOfProcessors();
}

@Override
public int numberOfProcessors()
{
return processing.numberOfProcessors();
} }


private class BatchProvidingIterator extends PrefetchingIterator<byte[]> private class BatchProvidingIterator extends PrefetchingIterator<byte[]>
Expand Down
Expand Up @@ -173,7 +173,7 @@ public StepStats stats()
protected void collectStatsProviders( Collection<StatsProvider> into ) protected void collectStatsProviders( Collection<StatsProvider> into )
{ {
into.add( new ProcessingStats( doneBatches.get()+queuedBatches.get(), doneBatches.get(), into.add( new ProcessingStats( doneBatches.get()+queuedBatches.get(), doneBatches.get(),
totalProcessingTime.total(), totalProcessingTime.average() / numberOfProcessors(), totalProcessingTime.total(), totalProcessingTime.average() / processors( 0 ),
upstreamIdleTime.get(), downstreamIdleTime.get() ) ); upstreamIdleTime.get(), downstreamIdleTime.get() ) );
into.addAll( additionalStatsProvider ); into.addAll( additionalStatsProvider );
} }
Expand Down Expand Up @@ -240,6 +240,6 @@ protected void resetStats()
public String toString() public String toString()
{ {
return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(), return format( "%s[%s, processors:%d, batches:%d", getClass().getSimpleName(),
name, numberOfProcessors(), doneBatches.get() ); name, processors( 0 ), doneBatches.get() );
} }
} }
Expand Up @@ -101,7 +101,8 @@ private int assignProcessorsToPotentialBottleNeck( StageExecution execution, int
int optimalProcessorIncrement = min( max( 1, (int) bottleNeck.other().floatValue() - 1 ), permits ); int optimalProcessorIncrement = min( max( 1, (int) bottleNeck.other().floatValue() - 1 ), permits );
for ( int i = 0; i < optimalProcessorIncrement; i++ ) for ( int i = 0; i < optimalProcessorIncrement; i++ )
{ {
if ( bottleNeckStep.incrementNumberOfProcessors() ) int before = bottleNeckStep.processors( 0 );
if ( bottleNeckStep.processors( 1 ) > before )
{ {
lastChangedProcessors.put( bottleNeckStep, doneBatches ); lastChangedProcessors.put( bottleNeckStep, doneBatches );
usedPermits++; usedPermits++;
Expand All @@ -115,7 +116,7 @@ private boolean removeProcessorFromPotentialIdleStep( StageExecution execution )
{ {
for ( Pair<Step<?>,Float> fast : execution.stepsOrderedBy( Keys.avg_processing_time, true ) ) for ( Pair<Step<?>,Float> fast : execution.stepsOrderedBy( Keys.avg_processing_time, true ) )
{ {
int numberOfProcessors = fast.first().numberOfProcessors(); int numberOfProcessors = fast.first().processors( 0 );
if ( numberOfProcessors == 1 ) if ( numberOfProcessors == 1 )
{ {
continue; continue;
Expand All @@ -132,7 +133,8 @@ private boolean removeProcessorFromPotentialIdleStep( StageExecution execution )
long doneBatches = batches( fastestStep ); long doneBatches = batches( fastestStep );
if ( batchesPassedSinceLastChange( fastestStep, doneBatches ) >= config.movingAverageSize() ) if ( batchesPassedSinceLastChange( fastestStep, doneBatches ) >= config.movingAverageSize() )
{ {
if ( fastestStep.decrementNumberOfProcessors() ) int before = fastestStep.processors( 0 );
if ( fastestStep.processors( -1 ) < before )
{ {
lastChangedProcessors.put( fastestStep, doneBatches ); lastChangedProcessors.put( fastestStep, doneBatches );
return true; return true;
Expand Down Expand Up @@ -169,7 +171,7 @@ private int countActiveProcessors( StageExecution[] executions )
// "steal" some of its processing power. // "steal" some of its processing power.
long avg = avg( step ); long avg = avg( step );
float factor = (float)avg / (float)highestAverage; float factor = (float)avg / (float)highestAverage;
processors += factor * step.numberOfProcessors(); processors += factor * step.processors( 0 );
} }
} }
} }
Expand Down
Expand Up @@ -69,22 +69,9 @@ protected long position()
return data.position(); return data.position();
} }


// We have to let our processing framework know about changes in processor count assigned to us
@Override @Override
public int numberOfProcessors() public int processors( int delta )
{ {
return data.numberOfProcessors(); return data.processors( delta );
}

@Override
public boolean incrementNumberOfProcessors()
{
return data.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return data.decrementNumberOfProcessors();
} }
} }
Expand Up @@ -87,7 +87,7 @@ private int theoreticalMaxProcessors()
public long receive( final long ticket, final T batch ) public long receive( final long ticket, final T batch )
{ {
// Don't go too far ahead // Don't go too far ahead
long idleTime = await( catchUp, executor.numberOfProcessors(), healthChecker, park ); long idleTime = await( catchUp, executor.processors( 0 ), healthChecker, park );
incrementQueue(); incrementQueue();
executor.submit( sender -> { executor.submit( sender -> {
assertHealthy(); assertHealthy();
Expand Down Expand Up @@ -182,21 +182,9 @@ public void close() throws Exception
} }


@Override @Override
public int numberOfProcessors() public int processors( int delta )
{ {
return executor.numberOfProcessors(); return executor.processors( delta );
}

@Override
public boolean incrementNumberOfProcessors()
{
return executor.incrementNumberOfProcessors();
}

@Override
public boolean decrementNumberOfProcessors()
{
return executor.decrementNumberOfProcessors();
} }


@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
Expand Down
Expand Up @@ -154,11 +154,11 @@ private void printSpectrum( StringBuilder builder, StageExecution execution, int
boolean isBottleNeck = bottleNeck.first() == step; boolean isBottleNeck = bottleNeck.first() == step;
String name = String name =
(isBottleNeck ? "*" : "") + (isBottleNeck ? "*" : "") +
stats.toString( DetailLevel.IMPORTANT ) + (step.numberOfProcessors() > 1 stats.toString( DetailLevel.IMPORTANT ) + (step.processors( 0 ) > 1
? "(" + step.numberOfProcessors() + ")" ? "(" + step.processors( 0 ) + ")"
: ""); : "");
int charIndex = 0; // negative value "delays" the text, i.e. pushes it to the right int charIndex = 0; // negative value "delays" the text, i.e. pushes it to the right
char backgroundChar = step.numberOfProcessors() > 1 ? '=' : '-'; char backgroundChar = step.processors( 0 ) > 1 ? '=' : '-';
for ( int i = 0; i < stepWidth; i++, charIndex++ ) for ( int i = 0; i < stepWidth; i++, charIndex++ )
{ {
char ch = backgroundChar; char ch = backgroundChar;
Expand Down

0 comments on commit ab150a5

Please sign in to comment.