Skip to content

Commit

Permalink
Ability to read nodes in node-->rel stage in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Apr 27, 2017
1 parent 3b04970 commit e92661e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 30 deletions.
Expand Up @@ -24,13 +24,16 @@
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;

/**
* Updates {@link NodeRecord node records} with relationship/group chain heads after relationship import. Steps:
*
* <ol>
* <li>{@link ReadNodeRecordsByCacheStep} looks at {@link NodeRelationshipCache} for which nodes have had
* <li>{@link ReadNodeIdsByCacheStep} looks at {@link NodeRelationshipCache} for which nodes have had
* relationships imported and loads those {@link NodeRecord records} from store.</li>
* <li>{@link RecordProcessorStep} / {@link NodeFirstRelationshipProcessor} uses {@link NodeRelationshipCache}
* to update each {@link NodeRecord#setNextRel(long)}. For dense nodes {@link RelationshipGroupRecord group records}
Expand All @@ -43,8 +46,9 @@ public class NodeFirstRelationshipStage extends Stage
public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore,
NodeRelationshipCache cache )
{
super( "Node --> Relationship" + topic, config );
add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, NodeType.NODE_TYPE_SPARSE ) );
super( "Node --> Relationship" + topic, config, ORDER_SEND_DOWNSTREAM );
add( new ReadNodeIdsByCacheStep( control(), config, cache, NodeType.NODE_TYPE_SPARSE ) );
add( new ReadRecordsStep<>( control(), config, true, nodeStore ) );
add( new RecordProcessorStep<>( control(), "LINK", config,
new NodeFirstRelationshipProcessor( cache ), false ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
Expand Down
Expand Up @@ -19,10 +19,9 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RecordCursor;
import java.util.Arrays;

import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RecordLoad;
import org.neo4j.unsafe.impl.batchimport.cache.ByteArray;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor;
Expand All @@ -31,39 +30,25 @@

import static java.lang.System.nanoTime;

import static org.neo4j.collection.primitive.PrimitiveLongCollections.iterator;

/**
* Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those
* {@link NodeRecord} and sends downwards.
*/
public class ReadNodeRecordsByCacheStep extends AbstractStep<Void>
public class ReadNodeIdsByCacheStep extends AbstractStep<Void>
{
private final int nodeTypes;
private final NodeRelationshipCache cache;
private final int batchSize;
private final RecordCursor<NodeRecord> recordCursor;

public ReadNodeRecordsByCacheStep( StageControl control, Configuration config,
NodeStore nodeStore, NodeRelationshipCache cache, int nodeTypes )
public ReadNodeIdsByCacheStep( StageControl control, Configuration config,
NodeRelationshipCache cache, int nodeTypes )
{
super( control, ">", config );
this.cache = cache;
this.nodeTypes = nodeTypes;
this.batchSize = config.batchSize();
this.recordCursor = nodeStore.newRecordCursor( nodeStore.newRecord() );
}

@Override
public void start( int orderingGuarantees )
{
super.start( orderingGuarantees );
recordCursor.acquire( 0, RecordLoad.NORMAL );
}

@Override
public void close() throws Exception
{
recordCursor.close();
super.close();
}

@Override
Expand All @@ -87,19 +72,18 @@ public void run()

private class NodeVisitor implements NodeChangeVisitor, AutoCloseable
{
private NodeRecord[] batch = new NodeRecord[batchSize];
private long[] batch = new long[batchSize];
private int cursor;
private long time = nanoTime();

@Override
public void change( long nodeId, ByteArray array )
{
recordCursor.next( nodeId );
batch[cursor++] = recordCursor.get().clone();
batch[cursor++] = nodeId;
if ( cursor == batchSize )
{
send();
batch = new NodeRecord[batchSize];
batch = new long[batchSize];
cursor = 0;
}
}
Expand All @@ -108,7 +92,7 @@ public void change( long nodeId, ByteArray array )
private void send()
{
totalProcessingTime.add( nanoTime() - time );
downstream.receive( doneBatches.getAndIncrement(), batch );
downstream.receive( doneBatches.getAndIncrement(), iterator( batch ) );
time = nanoTime();
assertHealthy();
}
Expand All @@ -118,6 +102,7 @@ public void close()
{
if ( cursor > 0 )
{
batch = Arrays.copyOf( batch, cursor );
send();
}
}
Expand Down

0 comments on commit e92661e

Please sign in to comment.