From 3b04970b51d12dd1d9847ad31768674fbfe3d9a2 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Thu, 6 Apr 2017 11:57:27 +0200 Subject: [PATCH] Parallelised relationship group record writing --- .../NodeFirstRelationshipProcessor.java | 32 +---- .../NodeFirstRelationshipStage.java | 9 +- .../batchimport/NodeSetFirstGroupStep.java | 1 + .../batchimport/ParallelBatchImporter.java | 18 +-- .../ReadGroupRecordsByCacheStep.java | 119 ++++++++++++++++++ .../ReadNodeRecordsByCacheStep.java | 4 +- .../batchimport/RelationshipGroupStage.java | 36 ++++++ 7 files changed, 174 insertions(+), 45 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java index 6fecbbf05ec98..12e23326e4649 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java @@ -19,11 +19,9 @@ */ package org.neo4j.unsafe.impl.batchimport; -import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor; /** * Sets the {@link NodeRecord#setNextRel(long) relationship field} on all {@link NodeRecord nodes}. @@ -33,15 +31,12 @@ * This step also creates {@link RelationshipGroupRecord group records} for the dense nodes as it encounters * dense nodes, where it gets all relationship group information from {@link NodeRelationshipCache}. */ -public class NodeFirstRelationshipProcessor implements RecordProcessor, GroupVisitor +public class NodeFirstRelationshipProcessor implements RecordProcessor { - private final RecordStore relGroupStore; private final NodeRelationshipCache cache; - public NodeFirstRelationshipProcessor( RecordStore relGroupStore, - NodeRelationshipCache cache ) + public NodeFirstRelationshipProcessor( NodeRelationshipCache cache ) { - this.relGroupStore = relGroupStore; this.cache = cache; } @@ -49,35 +44,14 @@ public NodeFirstRelationshipProcessor( RecordStore relG public boolean process( NodeRecord node ) { long nodeId = node.getId(); - long firstRel = cache.getFirstRel( nodeId, this ); + long firstRel = cache.getFirstRel( nodeId, NodeRelationshipCache.NO_GROUP_VISITOR ); if ( firstRel != -1 ) { node.setNextRel( firstRel ); - if ( cache.isDense( nodeId ) ) - { - node.setDense( true ); - } } return true; } - @Override - public long visit( long nodeId, int typeId, long out, long in, long loop ) - { - // Here we'll use the already generated id (below) from the previous visit, if that so happened - long id = relGroupStore.nextId(); - RelationshipGroupRecord groupRecord = new RelationshipGroupRecord( id ); - groupRecord.setType( typeId ); - groupRecord.setInUse( true ); - groupRecord.setFirstOut( out ); - groupRecord.setFirstIn( in ); - groupRecord.setFirstLoop( loop ); - groupRecord.setOwningNode( nodeId ); - relGroupStore.prepareForCommit( groupRecord ); - relGroupStore.updateRecord( groupRecord ); - return id; - } - @Override public void done() { // Nothing to do here diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java index 18fd82dddb5b5..598e870fab923 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java @@ -20,10 +20,10 @@ package org.neo4j.unsafe.impl.batchimport; import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.staging.Stage; /** @@ -41,13 +41,12 @@ public class NodeFirstRelationshipStage extends Stage { public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore, - RecordStore relationshipGroupStore, NodeRelationshipCache cache, - int nodeTypes ) + NodeRelationshipCache cache ) { super( "Node --> Relationship" + topic, config ); - add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, nodeTypes ) ); + add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, NodeType.NODE_TYPE_SPARSE ) ); add( new RecordProcessorStep<>( control(), "LINK", config, - new NodeFirstRelationshipProcessor( relationshipGroupStore, cache ), false ) ); + new NodeFirstRelationshipProcessor( cache ), false ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java index c0512187b9830..663b595166931 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java @@ -78,6 +78,7 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th nodeRecordCursor.next( nodeId ); NodeRecord node = nodeRecordCursor.get().clone(); node.setNextRel( group.getId() ); + node.setDense( true ); current[cursor++] = node; if ( cursor == batchSize ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index efff3ea1c7d30..2c278da9c123e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -306,6 +306,8 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); Iterator> rounds = nodeRelationshipCache.splitRelationshipTypesIntoRounds( typeDistribution.iterator(), freeMemoryForDenseNodeCache ); + Configuration groupConfig = + configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipGroupStore() ); // Do multiple rounds of relationship importing. Each round fits as many relationship types // as it can (comparing with worst-case memory usage and available memory). @@ -337,9 +339,12 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, int nodeTypes = thisIsTheOnlyRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE; - // Set node nextRel fields for dense nodes + // Write relationship groups cached from the relationship import above + executeStage( new RelationshipGroupStage( topic, groupConfig, + neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache ) ); + // Set node nextRel fields executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, nodeTypes ) ); + nodeRelationshipCache ) ); // Link relationship chains together for dense nodes nodeRelationshipCache.setForwardScan( false, true/*dense*/ ); @@ -357,18 +362,13 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, // there were multiple passes of dense linking above. if ( round > 1 ) { - // Set node nextRel fields for sparse nodes - String topic = " Sparse"; - nodeRelationshipCache.setForwardScan( true, false/*sparse*/ ); - executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, NodeType.NODE_TYPE_SPARSE ) ); - // Link relationship chains together for sparse nodes nodeRelationshipCache.setForwardScan( false, false/*sparse*/ ); - executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, + executeStage( new RelationshipLinkbackStage( " Sparse", relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId, NodeType.NODE_TYPE_SPARSE ) ); } + // else we did in the single round above to avoid doing another pass } private static Configuration configWithRecordsPerPageBasedBatchSize( Configuration source, RecordStore store ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java new file mode 100644 index 0000000000000..a0459a28b55ec --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.record.NodeRecord; +import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; +import org.neo4j.unsafe.impl.batchimport.cache.ByteArray; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; +import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +import static java.lang.System.nanoTime; + +/** + * Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those + * {@link NodeRecord} and sends downwards. + */ +public class ReadGroupRecordsByCacheStep extends AbstractStep +{ + private final RecordStore store; + private final NodeRelationshipCache cache; + private final int batchSize; + + public ReadGroupRecordsByCacheStep( StageControl control, Configuration config, + RecordStore store, NodeRelationshipCache cache ) + { + super( control, ">", config ); + this.store = store; + this.cache = cache; + this.batchSize = config.batchSize(); + } + + @Override + public long receive( long ticket, Void ignored ) + { + new Thread() + { + @Override + public void run() + { + assertHealthy(); + try ( NodeVisitor visitor = new NodeVisitor() ) + { + cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_DENSE ); + } + endOfUpstream(); + } + }.start(); + return 0; + } + + private class NodeVisitor implements NodeChangeVisitor, AutoCloseable, GroupVisitor + { + private RelationshipGroupRecord[] batch = new RelationshipGroupRecord[batchSize]; + private int cursor; + private long time = nanoTime(); + + @Override + public void change( long nodeId, ByteArray array ) + { + cache.getFirstRel( nodeId, this ); + } + + @Override + public long visit( long nodeId, int typeId, long out, long in, long loop ) + { + long id = store.nextId(); + RelationshipGroupRecord record = store.newRecord(); + record.setId( id ); + batch[cursor++] = record.initialize( true, typeId, out, in, loop, nodeId, loop ); + if ( cursor == batchSize ) + { + send(); + batch = new RelationshipGroupRecord[batchSize]; + cursor = 0; + } + return id; + } + + @SuppressWarnings( "unchecked" ) + private void send() + { + totalProcessingTime.add( nanoTime() - time ); + downstream.receive( doneBatches.getAndIncrement(), batch ); + time = nanoTime(); + assertHealthy(); + } + + @Override + public void close() + { + if ( cursor > 0 ) + { + send(); + } + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java index 06700e146c81a..795c030f0488f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java @@ -35,7 +35,7 @@ * Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those * {@link NodeRecord} and sends downwards. */ -public class ReadNodeRecordsByCacheStep extends AbstractStep +public class ReadNodeRecordsByCacheStep extends AbstractStep { private final int nodeTypes; private final NodeRelationshipCache cache; @@ -67,7 +67,7 @@ public void close() throws Exception } @Override - public long receive( long ticket, NodeRecord[] batch ) + public long receive( long ticket, Void ignored ) { new Thread() { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java new file mode 100644 index 0000000000000..bce7ae38a7d4c --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.staging.Stage; + +public class RelationshipGroupStage extends Stage +{ + public RelationshipGroupStage( String topic, Configuration config, + RecordStore store, NodeRelationshipCache cache ) + { + super( "RelationshipGroup" + topic, config ); + add( new ReadGroupRecordsByCacheStep( control(), config, store, cache ) ); + add( new UpdateRecordsStep<>( control(), config, store ) ); + } +}