Skip to content

Commit

Permalink
Removes unused ability to execute multiple concurrent stages in import
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Mar 7, 2017
1 parent 9ebfebe commit 37c4053
Show file tree
Hide file tree
Showing 16 changed files with 150 additions and 403 deletions.
Expand Up @@ -159,23 +159,23 @@ public void doImport( Input input ) throws IOException
NodeStage nodeStage = new NodeStage( config, writeMonitor, NodeStage nodeStage = new NodeStage( config, writeMonitor,
nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(), nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(),
storeUpdateMonitor, nodeRelationshipCache, memoryUsageStats ); storeUpdateMonitor, nodeRelationshipCache, memoryUsageStats );
executeStages( nodeStage ); executeStage( nodeStage );
if ( idMapper.needsPreparation() ) if ( idMapper.needsPreparation() )
{ {
executeStages( new IdMapperPreparationStage( config, idMapper, cachedNodes, executeStage( new IdMapperPreparationStage( config, idMapper, cachedNodes,
badCollector, memoryUsageStats ) ); badCollector, memoryUsageStats ) );
PrimitiveLongIterator duplicateNodeIds = badCollector.leftOverDuplicateNodesIds(); PrimitiveLongIterator duplicateNodeIds = badCollector.leftOverDuplicateNodesIds();
if ( duplicateNodeIds.hasNext() ) if ( duplicateNodeIds.hasNext() )
{ {
executeStages( new DeleteDuplicateNodesStage( config, duplicateNodeIds, neoStore ) ); executeStage( new DeleteDuplicateNodesStage( config, duplicateNodeIds, neoStore ) );
} }
} }


// Stage 2 -- calculate dense node threshold // Stage 2 -- calculate dense node threshold
CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage(
withBatchSize( config, config.batchSize()*10 ), withBatchSize( config, config.batchSize()*10 ),
relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore );
executeStages( calculateDenseNodesStage ); executeStage( calculateDenseNodesStage );


importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor, importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor,
idMapper, cachedRelationships, inputCache, idMapper, cachedRelationships, inputCache,
Expand All @@ -196,10 +196,10 @@ public void doImport( Input input ) throws IOException
// Stage 6 -- count nodes per label and labels per node // Stage 6 -- count nodes per label and labels per node
nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() );
memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache ); memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache );
executeStages( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(), executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(),
neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) ); neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) );
// Stage 7 -- count label-[type]->label // Stage 7 -- count label-[type]->label
executeStages( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore, executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore,
neoStore.getLabelRepository().getHighId(), neoStore.getLabelRepository().getHighId(),
neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) ); neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) );


Expand Down Expand Up @@ -282,16 +282,16 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
final RelationshipStage relationshipStage = new RelationshipStage( topic, config, final RelationshipStage relationshipStage = new RelationshipStage( topic, config,
writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache, writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache,
storeUpdateMonitor, nextRelationshipId ); storeUpdateMonitor, nextRelationshipId );
executeStages( relationshipStage ); executeStage( relationshipStage );


// Stage 4a -- set node nextRel fields for dense nodes // Stage 4a -- set node nextRel fields for dense nodes
executeStages( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(),
neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, true/*dense*/, neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, true/*dense*/,
currentTypeId ) ); currentTypeId ) );


// Stage 5a -- link relationship chains together for dense nodes // Stage 5a -- link relationship chains together for dense nodes
nodeRelationshipCache.setForwardScan( false ); nodeRelationshipCache.setForwardScan( false );
executeStages( new RelationshipLinkbackStage( topic, executeStage( new RelationshipLinkbackStage( topic,
relationshipConfig, relationshipConfig,
neoStore.getRelationshipStore(), neoStore.getRelationshipStore(),
nodeRelationshipCache, nextRelationshipId, nodeRelationshipCache, nextRelationshipId,
Expand All @@ -303,24 +303,24 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
String topic = " Sparse"; String topic = " Sparse";
nodeRelationshipCache.setForwardScan( true ); nodeRelationshipCache.setForwardScan( true );
// Stage 4b -- set node nextRel fields for sparse nodes // Stage 4b -- set node nextRel fields for sparse nodes
executeStages( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(),
neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, false/*sparse*/, -1 ) ); neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, false/*sparse*/, -1 ) );


// Stage 5b -- link relationship chains together for sparse nodes // Stage 5b -- link relationship chains together for sparse nodes
nodeRelationshipCache.setForwardScan( false ); nodeRelationshipCache.setForwardScan( false );
executeStages( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(),
nodeRelationshipCache, 0, nextRelationshipId, false/*sparse*/ ) ); nodeRelationshipCache, 0, nextRelationshipId, false/*sparse*/ ) );


if ( minorityRelationshipTypes.length > 0 ) if ( minorityRelationshipTypes.length > 0 )
{ {
// Do some batch insertion style random-access insertions for super small minority types // Do some batch insertion style random-access insertions for super small minority types
executeStages( new BatchInsertRelationshipsStage( config, idMapper, executeStage( new BatchInsertRelationshipsStage( config, idMapper,
perTypeIterator.getMinorityRelationships(), neoStore, nextRelationshipId ) ); perTypeIterator.getMinorityRelationships(), neoStore, nextRelationshipId ) );
} }
} }


private void executeStages( Stage... stages ) private void executeStage( Stage stage )
{ {
superviseExecution( executionMonitor, config, stages ); superviseExecution( executionMonitor, config, stage );
} }
} }
Expand Up @@ -31,7 +31,7 @@
public abstract class CoarseBoundedProgressExecutionMonitor extends ExecutionMonitor.Adapter public abstract class CoarseBoundedProgressExecutionMonitor extends ExecutionMonitor.Adapter
{ {
private final long totalNumberOfBatches; private final long totalNumberOfBatches;
private long[] prevDoneBatches; private long prevDoneBatches;
private long totalReportedBatches = 0; private long totalReportedBatches = 0;


public CoarseBoundedProgressExecutionMonitor( long highNodeId, long highRelationshipId, public CoarseBoundedProgressExecutionMonitor( long highNodeId, long highRelationshipId,
Expand All @@ -51,26 +51,24 @@ protected long total()
} }


@Override @Override
public void check( StageExecution[] executions ) public void check( StageExecution execution )
{ {
update( executions ); update( execution );
} }


@Override @Override
public void start( StageExecution[] executions ) public void start( StageExecution execution )
{ {
prevDoneBatches = new long[executions.length]; prevDoneBatches = 0;
} }


private void update( StageExecution[] executions ) private void update( StageExecution execution )
{ {
long diff = 0; long diff = 0;
for ( int i = 0; i < executions.length; i++ ) long doneBatches = doneBatches( execution );
{ diff += doneBatches - prevDoneBatches;
long doneBatches = doneBatches( executions[i] ); prevDoneBatches = doneBatches;
diff += doneBatches - prevDoneBatches[i];
prevDoneBatches[i] = doneBatches;
}
if ( diff > 0 ) if ( diff > 0 )
{ {
totalReportedBatches += diff; totalReportedBatches += diff;
Expand Down
Expand Up @@ -44,19 +44,15 @@ public CoarseUnboundedProgressExecutionMonitor( int dotEveryN, PrintStream out )
} }


@Override @Override
public void start( StageExecution[] executions ) public void start( StageExecution execution )
{ {
prevN = 0; prevN = 0;
} }


@Override @Override
public void check( StageExecution[] executions ) public void check( StageExecution execution )
{ {
int n = prevN; int n = max( prevN, n( execution ) );
for ( StageExecution execution : executions )
{
n = max( n, n( execution ) );
}


while ( prevN < n ) while ( prevN < n )
{ {
Expand Down
Expand Up @@ -50,34 +50,21 @@ public DetailedExecutionMonitor( PrintStream out, long intervalSeconds )
} }


@Override @Override
public void start( StageExecution[] executions ) public void start( StageExecution execution )
{ {
StringBuilder names = new StringBuilder(); StringBuilder names = new StringBuilder();
for ( StageExecution execution : executions ) names.append( names.length() > 0 ? ", " : "" ).append( execution.getStageName() );
{
names.append( names.length() > 0 ? ", " : "" ).append( execution.getStageName() );
}
out.println( format( "%n>>>>> EXECUTING STAGE(s) %s <<<<<%n", names ) ); out.println( format( "%n>>>>> EXECUTING STAGE(s) %s <<<<<%n", names ) );
} }


@Override @Override
public void end( StageExecution[] executions, long totalTimeMillis ) public void end( StageExecution execution, long totalTimeMillis )
{ {
out.println( "Stage total time " + duration( totalTimeMillis ) ); out.println( "Stage total time " + duration( totalTimeMillis ) );
} }


@Override @Override
public void check( StageExecution[] executions ) public void check( StageExecution execution )
{
boolean first = true;
for ( StageExecution execution : executions )
{
printStats( execution, first );
first = false;
}
}

private void printStats( StageExecution execution, boolean first )
{ {
Step<?> bottleNeck = execution.stepsOrderedBy( Keys.avg_processing_time, false ).iterator().next().first(); Step<?> bottleNeck = execution.stepsOrderedBy( Keys.avg_processing_time, false ).iterator().next().first();


Expand All @@ -86,7 +73,7 @@ private void printStats( StageExecution execution, boolean first )
for ( Step<?> step : execution.steps() ) for ( Step<?> step : execution.steps() )
{ {
StepStats stats = step.stats(); StepStats stats = step.stats();
builder.append( i > 0 ? format( "%n " ) : (first ? "--" : " -") ) builder.append( i > 0 ? format( "%n " ) : "--" )
.append( stats.toString( DetailLevel.BASIC ) ) .append( stats.toString( DetailLevel.BASIC ) )
.append( step == bottleNeck ? " <== BOTTLE NECK" : "" ); .append( step == bottleNeck ? " <== BOTTLE NECK" : "" );
i++; i++;
Expand Down
Expand Up @@ -57,29 +57,26 @@ public DynamicProcessorAssigner( Configuration config, int availableProcessors )
} }


@Override @Override
public void start( StageExecution[] executions ) public void start( StageExecution execution )
{ // A new stage begins, any data that we had is irrelevant { // A new stage begins, any data that we had is irrelevant
lastChangedProcessors.clear(); lastChangedProcessors.clear();
} }


@Override @Override
public void check( StageExecution[] executions ) public void check( StageExecution execution )
{ {
int permits = availableProcessors - countActiveProcessors( executions ); int permits = availableProcessors - countActiveProcessors( execution );
for ( StageExecution execution : executions ) if ( execution.stillExecuting() )
{ {
if ( execution.stillExecuting() ) if ( permits > 0 )
{ {
if ( permits > 0 ) // Be swift at assigning processors to slow steps, i.e. potentially multiple per round
{ permits -= assignProcessorsToPotentialBottleNeck( execution, permits );
// Be swift at assigning processors to slow steps, i.e. potentially multiple per round }
permits -= assignProcessorsToPotentialBottleNeck( execution, permits ); // Be a little more conservative removing processors from too fast steps
} if ( permits == 0 && removeProcessorFromPotentialIdleStep( execution ) )
// Be a little more conservative removing processors from too fast steps {
if ( permits == 0 && removeProcessorFromPotentialIdleStep( execution ) ) permits++;
{
permits++;
}
} }
} }
} }
Expand Down Expand Up @@ -150,24 +147,21 @@ private long batches( Step<?> step )
return step.stats().stat( Keys.done_batches ).asLong(); return step.stats().stat( Keys.done_batches ).asLong();
} }


private int countActiveProcessors( StageExecution[] executions ) private int countActiveProcessors( StageExecution execution )
{ {
float processors = 0; float processors = 0;
for ( StageExecution execution : executions ) if ( execution.stillExecuting() )
{ {
if ( execution.stillExecuting() ) long highestAverage = avg( execution.stepsOrderedBy(
Keys.avg_processing_time, false ).iterator().next().first() );
for ( Step<?> step : execution.steps() )
{ {
long highestAverage = avg( execution.stepsOrderedBy( // Calculate how active each step is so that a step that is very cheap
Keys.avg_processing_time, false ).iterator().next().first() ); // and idles a lot counts for less than 1 processor, so that bottlenecks can
for ( Step<?> step : execution.steps() ) // "steal" some of its processing power.
{ long avg = avg( step );
// Calculate how active each step is so that a step that is very cheap float factor = (float)avg / (float)highestAverage;
// and idles a lot counts for less than 1 processor, so that bottlenecks can processors += factor * step.processors( 0 );
// "steal" some of its processing power.
long avg = avg( step );
float factor = (float)avg / (float)highestAverage;
processors += factor * step.processors( 0 );
}
} }
} }
return Math.round( processors ); return Math.round( processors );
Expand Down
Expand Up @@ -30,17 +30,17 @@
public interface ExecutionMonitor public interface ExecutionMonitor
{ {
/** /**
* Signals the start of one or more stages, * Signals the start of a {@link StageExecution}.
*/ */
void start( StageExecution[] executions ); void start( StageExecution execution );


/** /**
* Signals the end of the executions previously {@link #start(StageExecution[]) stated} * Signals the end of the execution previously {@link #start(StageExecution) started}.
*/ */
void end( StageExecution[] executions, long totalTimeMillis ); void end( StageExecution execution, long totalTimeMillis );


/** /**
* Signals the end of the import as a whole * Called after all {@link StageExecution stage executions} have run.
*/ */
void done( long totalTimeMillis, String additionalInformation ); void done( long totalTimeMillis, String additionalInformation );


Expand All @@ -50,9 +50,9 @@ public interface ExecutionMonitor
long nextCheckTime(); long nextCheckTime();


/** /**
* Called with currently executing {@link StageExecution} instances so that data from them can be gathered. * Called periodically while executing a {@link StageExecution}.
*/ */
void check( StageExecution[] executions ); void check( StageExecution execution );


/** /**
* Base implementation with most methods defaulting to not doing anything. * Base implementation with most methods defaulting to not doing anything.
Expand Down Expand Up @@ -80,12 +80,12 @@ public long nextCheckTime()
} }


@Override @Override
public void start( StageExecution[] executions ) public void start( StageExecution execution )
{ // Do nothing by default { // Do nothing by default
} }


@Override @Override
public void end( StageExecution[] executions, long totalTimeMillis ) public void end( StageExecution execution, long totalTimeMillis )
{ // Do nothing by default { // Do nothing by default
} }


Expand Down
Expand Up @@ -41,12 +41,12 @@ public static ExecutionMonitor defaultVisible()
private static final ExecutionMonitor INVISIBLE = new ExecutionMonitor() private static final ExecutionMonitor INVISIBLE = new ExecutionMonitor()
{ {
@Override @Override
public void start( StageExecution[] executions ) public void start( StageExecution execution )
{ // Do nothing { // Do nothing
} }


@Override @Override
public void end( StageExecution[] executions, long totalTimeMillis ) public void end( StageExecution execution, long totalTimeMillis )
{ // Do nothing { // Do nothing
} }


Expand All @@ -57,7 +57,7 @@ public long nextCheckTime()
} }


@Override @Override
public void check( StageExecution[] executions ) public void check( StageExecution execution )
{ // Do nothing { // Do nothing
} }


Expand Down

0 comments on commit 37c4053

Please sign in to comment.