Skip to content

Commit

Permalink
Minor importer performance tweaks
Browse files Browse the repository at this point in the history
- NodeDegreeCountStage was flushing and ordering the batches, which was unnecessary.
  it was also using the wrong batch size such that readers wouldn't have their own
  pages entirely.
- Uses the same PARK time for windows/linux everywhere in the step/execution code.
  • Loading branch information
tinwelint committed Oct 11, 2017
1 parent 9bf63fb commit 2040f61
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 17 deletions.
Expand Up @@ -36,7 +36,7 @@ public class NodeDegreeCountStage extends Stage
{
public NodeDegreeCountStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache )
{
super( "Node Degrees", config, ORDER_SEND_DOWNSTREAM );
super( "Node Degrees", config );
add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, store, null ) );
add( new CalculateDenseNodesStep( control(), config, cache ) );
Expand Down
Expand Up @@ -185,7 +185,7 @@ public void doImport( Input input ) throws IOException
}
// Import relationships (unlinked), properties
Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() );
RelationshipStage unlinkedRelationshipStage =
new RelationshipStage( relationshipConfig, writeMonitor, relationships, idMapper,
badCollector, inputCache, neoStore, storeUpdateMonitor );
Expand All @@ -201,9 +201,7 @@ public void doImport( Input input ) throws IOException
nodeRelationshipCache.setHighNodeId( neoStore.getNodeStore().getHighId() );
NodeDegreeCountStage nodeDegreeStage = new NodeDegreeCountStage( relationshipConfig,
neoStore.getRelationshipStore(), nodeRelationshipCache );
neoStore.startFlushingPageCache();
executeStage( nodeDegreeStage );
neoStore.stopFlushingPageCache();

linkData( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(),
availableMemory );
Expand Down
Expand Up @@ -31,7 +31,9 @@

import static java.lang.Integer.max;
import static java.lang.Integer.min;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS;
import static org.neo4j.helpers.Exceptions.launderedException;

/**
Expand All @@ -40,8 +42,6 @@
*/
public class DynamicTaskExecutor<LOCAL> implements TaskExecutor<LOCAL>
{
public static final ParkStrategy DEFAULT_PARK_STRATEGY = new ParkStrategy.Park( 10, MILLISECONDS );

private final BlockingQueue<Task<LOCAL>> queue;
private final ParkStrategy parkStrategy;
private final String processorThreadNamePrefix;
Expand Down
Expand Up @@ -30,19 +30,24 @@
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.impl.util.MovingAverage;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.stats.ProcessingStats;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.stats.StepStats;

import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS;

/**
* Basic implementation of a {@link Step}. Does the most plumbing job of building a step implementation.
*/
public abstract class AbstractStep<T> implements Step<T>
{
public static final ParkStrategy PARK = new ParkStrategy.Park( IS_OS_WINDOWS ? 10_000 : 500, MICROSECONDS );

private final StageControl control;
private volatile String name;
@SuppressWarnings( "rawtypes" )
Expand Down
Expand Up @@ -22,14 +22,11 @@
import java.util.concurrent.atomic.AtomicReference;

import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy;
import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil;

import static java.lang.Integer.max;
import static java.lang.Integer.min;
import static java.lang.System.nanoTime;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS;
import static org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil.getFieldOffset;

/**
Expand All @@ -48,7 +45,6 @@ public abstract class ForkedProcessorStep<T> extends AbstractStep<T>
protected static final int MAIN = 0;
private final long COMPLETED_PROCESSORS_OFFSET = getFieldOffset( Unit.class, "completedProcessors" );
private final long PROCESSING_TIME_OFFSET = getFieldOffset( Unit.class, "processingTime" );
private static final ParkStrategy PARK = new ParkStrategy.Park( IS_OS_WINDOWS ? 10_000 : 500, MICROSECONDS );

private final Object[] forkedProcessors;
private volatile int numberOfForkedProcessors;
Expand Down Expand Up @@ -189,10 +185,6 @@ void processorDone( long time )
UnsafeUtil.getAndAddLong( this, PROCESSING_TIME_OFFSET, time );
int prevCompletedProcessors = UnsafeUtil.getAndAddInt( this, COMPLETED_PROCESSORS_OFFSET, 1 );
assert prevCompletedProcessors < processors;
if ( prevCompletedProcessors == processors - 1 )
{
PARK.unpark( downstreamSender );
}
}
}

Expand Down
Expand Up @@ -30,7 +30,6 @@

import static java.lang.System.currentTimeMillis;
import static java.lang.System.nanoTime;
import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY;

/**
* {@link Step} that uses {@link TaskExecutor} as a queue and execution mechanism.
Expand Down Expand Up @@ -65,8 +64,7 @@ protected ProcessorStep( StageControl control, String name, Configuration config
public void start( int orderingGuarantees )
{
super.start( orderingGuarantees );
this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, config.maxNumberOfProcessors(),
DEFAULT_PARK_STRATEGY, name(), Sender::new );
this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, config.maxNumberOfProcessors(), PARK, name(), Sender::new );
}

@Override
Expand Down

0 comments on commit 2040f61

Please sign in to comment.