From e92661edf7295d73623d2c3858432b1be2bc6a00 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Thu, 6 Apr 2017 14:16:35 +0200 Subject: [PATCH] Ability to read nodes in node-->rel stage in parallel --- .../NodeFirstRelationshipStage.java | 10 +++-- ...eStep.java => ReadNodeIdsByCacheStep.java} | 39 ++++++------------- 2 files changed, 19 insertions(+), 30 deletions(-) rename community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/{ReadNodeRecordsByCacheStep.java => ReadNodeIdsByCacheStep.java} (72%) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java index 598e870fab923..380e5b535f0e2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java @@ -24,13 +24,16 @@ import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.cache.NodeType; +import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; + /** * Updates {@link NodeRecord node records} with relationship/group chain heads after relationship import. Steps: * *
    - *
  1. {@link ReadNodeRecordsByCacheStep} looks at {@link NodeRelationshipCache} for which nodes have had + *
  2. {@link ReadNodeIdsByCacheStep} looks at {@link NodeRelationshipCache} for which nodes have had * relationships imported and loads those {@link NodeRecord records} from store.
  3. *
  4. {@link RecordProcessorStep} / {@link NodeFirstRelationshipProcessor} uses {@link NodeRelationshipCache} * to update each {@link NodeRecord#setNextRel(long)}. For dense nodes {@link RelationshipGroupRecord group records} @@ -43,8 +46,9 @@ public class NodeFirstRelationshipStage extends Stage public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore, NodeRelationshipCache cache ) { - super( "Node --> Relationship" + topic, config ); - add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, NodeType.NODE_TYPE_SPARSE ) ); + super( "Node --> Relationship" + topic, config, ORDER_SEND_DOWNSTREAM ); + add( new ReadNodeIdsByCacheStep( control(), config, cache, NodeType.NODE_TYPE_SPARSE ) ); + add( new ReadRecordsStep<>( control(), config, true, nodeStore ) ); add( new RecordProcessorStep<>( control(), "LINK", config, new NodeFirstRelationshipProcessor( cache ), false ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeIdsByCacheStep.java similarity index 72% rename from community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java rename to community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeIdsByCacheStep.java index 795c030f0488f..f408c8a905c49 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeIdsByCacheStep.java @@ -19,10 +19,9 @@ */ package org.neo4j.unsafe.impl.batchimport; -import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.kernel.impl.store.RecordCursor; +import java.util.Arrays; + import org.neo4j.kernel.impl.store.record.NodeRecord; -import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.unsafe.impl.batchimport.cache.ByteArray; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor; @@ -31,39 +30,25 @@ import static java.lang.System.nanoTime; +import static org.neo4j.collection.primitive.PrimitiveLongCollections.iterator; + /** * Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those * {@link NodeRecord} and sends downwards. */ -public class ReadNodeRecordsByCacheStep extends AbstractStep +public class ReadNodeIdsByCacheStep extends AbstractStep { private final int nodeTypes; private final NodeRelationshipCache cache; private final int batchSize; - private final RecordCursor recordCursor; - public ReadNodeRecordsByCacheStep( StageControl control, Configuration config, - NodeStore nodeStore, NodeRelationshipCache cache, int nodeTypes ) + public ReadNodeIdsByCacheStep( StageControl control, Configuration config, + NodeRelationshipCache cache, int nodeTypes ) { super( control, ">", config ); this.cache = cache; this.nodeTypes = nodeTypes; this.batchSize = config.batchSize(); - this.recordCursor = nodeStore.newRecordCursor( nodeStore.newRecord() ); - } - - @Override - public void start( int orderingGuarantees ) - { - super.start( orderingGuarantees ); - recordCursor.acquire( 0, RecordLoad.NORMAL ); - } - - @Override - public void close() throws Exception - { - recordCursor.close(); - super.close(); } @Override @@ -87,19 +72,18 @@ public void run() private class NodeVisitor implements NodeChangeVisitor, AutoCloseable { - private NodeRecord[] batch = new NodeRecord[batchSize]; + private long[] batch = new long[batchSize]; private int cursor; private long time = nanoTime(); @Override public void change( long nodeId, ByteArray array ) { - recordCursor.next( nodeId ); - batch[cursor++] = recordCursor.get().clone(); + batch[cursor++] = nodeId; if ( cursor == batchSize ) { send(); - batch = new NodeRecord[batchSize]; + batch = new long[batchSize]; cursor = 0; } } @@ -108,7 +92,7 @@ public void change( long nodeId, ByteArray array ) private void send() { totalProcessingTime.add( nanoTime() - time ); - downstream.receive( doneBatches.getAndIncrement(), batch ); + downstream.receive( doneBatches.getAndIncrement(), iterator( batch ) ); time = nanoTime(); assertHealthy(); } @@ -118,6 +102,7 @@ public void close() { if ( cursor > 0 ) { + batch = Arrays.copyOf( batch, cursor ); send(); } }