Skip to content

Commit

Permalink
Faster forked processor step
Browse files Browse the repository at this point in the history
by doing way less unpark and instead implementing a design where each forked
processor can walk the queue of batches individually. In the previous design
the act of unparking all the forked processors became the dominating operation
when number of assigned processors went up. Now profiling doesn't even show it
and throughput is better.
  • Loading branch information
tinwelint committed May 16, 2017
1 parent dc4dcd4 commit 26a1dad
Show file tree
Hide file tree
Showing 7 changed files with 382 additions and 282 deletions.
Expand Up @@ -40,7 +40,7 @@ public class CalculateDenseNodesStep extends ForkedProcessorStep<Batch<InputRela
public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache,
Collector badCollector )
{
super( control, "CALCULATE", config, 0 );
super( control, "CALCULATE", config );
this.cache = cache;
this.badCollector = badCollector;
}
Expand Down
Expand Up @@ -41,7 +41,7 @@ public class RelationshipEncoderStep extends ForkedProcessorStep<Batch<InputRela

public RelationshipEncoderStep( StageControl control, Configuration config, NodeRelationshipCache cache )
{
super( control, "RELATIONSHIP", config, 0 );
super( control, "RELATIONSHIP", config );
this.cache = cache;
}

Expand Down
Expand Up @@ -42,7 +42,7 @@ public abstract class RelationshipLinkStep extends ForkedProcessorStep<Relations
public RelationshipLinkStep( StageControl control, Configuration config,
NodeRelationshipCache cache, Predicate<RelationshipRecord> filter, int nodeTypes, boolean forwards )
{
super( control, "LINK", config, 0 );
super( control, "LINK", config );
this.cache = cache;
this.filter = filter;
this.nodeTypes = nodeTypes;
Expand Down
Expand Up @@ -74,12 +74,14 @@ public boolean test( long ticket )
protected long startTime, endTime;
private final List<StatsProvider> additionalStatsProvider;
protected final Runnable healthChecker = () -> assertHealthy();
protected final Configuration config;

public AbstractStep( StageControl control, String name, Configuration config,
StatsProvider... additionalStatsProvider )
{
this.control = control;
this.name = name;
this.config = config;
this.totalProcessingTime = new MovingAverage( config.movingAverageSize() );
this.additionalStatsProvider = asList( additionalStatsProvider );
}
Expand Down

0 comments on commit 26a1dad

Please sign in to comment.