From 2040f6144699cd13c44868028ff04f024a3f1a40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Tue, 10 Oct 2017 12:45:56 +0200 Subject: [PATCH] Minor importer performance tweaks - NodeDegreeCountStage was flushing and ordering the batches, which was unnecessary. it was also using the wrong batch size such that readers wouldn't have their own pages entirely. - Uses the same PARK time for windows/linux everywhere in the step/execution code. --- .../unsafe/impl/batchimport/NodeDegreeCountStage.java | 2 +- .../unsafe/impl/batchimport/ParallelBatchImporter.java | 4 +--- .../impl/batchimport/executor/DynamicTaskExecutor.java | 4 ++-- .../unsafe/impl/batchimport/staging/AbstractStep.java | 5 +++++ .../impl/batchimport/staging/ForkedProcessorStep.java | 8 -------- .../unsafe/impl/batchimport/staging/ProcessorStep.java | 4 +--- 6 files changed, 10 insertions(+), 17 deletions(-) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java index 33f5564c6156e..1e6a2ef12b121 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java @@ -36,7 +36,7 @@ public class NodeDegreeCountStage extends Stage { public NodeDegreeCountStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache ) { - super( "Node Degrees", config, ORDER_SEND_DOWNSTREAM ); + super( "Node Degrees", config ); add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize() ) ); add( new ReadRecordsStep<>( control(), config, false, store, null ) ); add( new CalculateDenseNodesStep( control(), config, cache ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 5d25c91ece9c3..0dc19e39918e9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -185,7 +185,7 @@ public void doImport( Input input ) throws IOException } // Import relationships (unlinked), properties Configuration relationshipConfig = - configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); + configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() ); RelationshipStage unlinkedRelationshipStage = new RelationshipStage( relationshipConfig, writeMonitor, relationships, idMapper, badCollector, inputCache, neoStore, storeUpdateMonitor ); @@ -201,9 +201,7 @@ public void doImport( Input input ) throws IOException nodeRelationshipCache.setHighNodeId( neoStore.getNodeStore().getHighId() ); NodeDegreeCountStage nodeDegreeStage = new NodeDegreeCountStage( relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache ); - neoStore.startFlushingPageCache(); executeStage( nodeDegreeStage ); - neoStore.stopFlushingPageCache(); linkData( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(), availableMemory ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 8243c09dcd3e8..caa4ac4d73fb0 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -31,7 +31,9 @@ import static java.lang.Integer.max; import static java.lang.Integer.min; +import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS; import static org.neo4j.helpers.Exceptions.launderedException; /** @@ -40,8 +42,6 @@ */ public class DynamicTaskExecutor implements TaskExecutor { - public static final ParkStrategy DEFAULT_PARK_STRATEGY = new ParkStrategy.Park( 10, MILLISECONDS ); - private final BlockingQueue> queue; private final ParkStrategy parkStrategy; private final String processorThreadNamePrefix; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java index e8b934bcd8743..795c1649e9fe7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java @@ -30,6 +30,7 @@ import org.neo4j.helpers.Exceptions; import org.neo4j.kernel.impl.util.MovingAverage; import org.neo4j.unsafe.impl.batchimport.Configuration; +import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.batchimport.stats.ProcessingStats; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import org.neo4j.unsafe.impl.batchimport.stats.StepStats; @@ -37,12 +38,16 @@ import static java.lang.String.format; import static java.lang.System.currentTimeMillis; import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS; /** * Basic implementation of a {@link Step}. Does the most plumbing job of building a step implementation. */ public abstract class AbstractStep implements Step { + public static final ParkStrategy PARK = new ParkStrategy.Park( IS_OS_WINDOWS ? 10_000 : 500, MICROSECONDS ); + private final StageControl control; private volatile String name; @SuppressWarnings( "rawtypes" ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java index b81f9ba4983f1..2b374f5a76c74 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java @@ -22,14 +22,11 @@ import java.util.concurrent.atomic.AtomicReference; import org.neo4j.unsafe.impl.batchimport.Configuration; -import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil; import static java.lang.Integer.max; import static java.lang.Integer.min; import static java.lang.System.nanoTime; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static org.apache.commons.lang3.SystemUtils.IS_OS_WINDOWS; import static org.neo4j.unsafe.impl.internal.dragons.UnsafeUtil.getFieldOffset; /** @@ -48,7 +45,6 @@ public abstract class ForkedProcessorStep extends AbstractStep protected static final int MAIN = 0; private final long COMPLETED_PROCESSORS_OFFSET = getFieldOffset( Unit.class, "completedProcessors" ); private final long PROCESSING_TIME_OFFSET = getFieldOffset( Unit.class, "processingTime" ); - private static final ParkStrategy PARK = new ParkStrategy.Park( IS_OS_WINDOWS ? 10_000 : 500, MICROSECONDS ); private final Object[] forkedProcessors; private volatile int numberOfForkedProcessors; @@ -189,10 +185,6 @@ void processorDone( long time ) UnsafeUtil.getAndAddLong( this, PROCESSING_TIME_OFFSET, time ); int prevCompletedProcessors = UnsafeUtil.getAndAddInt( this, COMPLETED_PROCESSORS_OFFSET, 1 ); assert prevCompletedProcessors < processors; - if ( prevCompletedProcessors == processors - 1 ) - { - PARK.unpark( downstreamSender ); - } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index 80355917683b7..5d73159916333 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -30,7 +30,6 @@ import static java.lang.System.currentTimeMillis; import static java.lang.System.nanoTime; -import static org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor.DEFAULT_PARK_STRATEGY; /** * {@link Step} that uses {@link TaskExecutor} as a queue and execution mechanism. @@ -65,8 +64,7 @@ protected ProcessorStep( StageControl control, String name, Configuration config public void start( int orderingGuarantees ) { super.start( orderingGuarantees ); - this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, config.maxNumberOfProcessors(), - DEFAULT_PARK_STRATEGY, name(), Sender::new ); + this.executor = new DynamicTaskExecutor<>( 1, maxProcessors, config.maxNumberOfProcessors(), PARK, name(), Sender::new ); } @Override