diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 514dffa3028b5..4e36be3464269 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -66,6 +66,7 @@ import static java.lang.Long.max; import static java.lang.String.format; import static java.lang.System.currentTimeMillis; + import static org.neo4j.helpers.Format.bytes; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure; @@ -181,7 +182,7 @@ public void doImport( Input input ) throws IOException RelationshipStore relationshipStore = neoStore.getRelationshipStore(); - // Stage 1 -- nodes, properties, labels + // Import nodes, properties, labels Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); NodeStage nodeStage = new NodeStage( nodeConfig, writeMonitor, nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(), @@ -200,6 +201,7 @@ public void doImport( Input input ) throws IOException } } + // Import relationships (unlinked), properties Configuration relationshipConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); RelationshipStage unlinkedRelationshipStage = @@ -209,8 +211,9 @@ public void doImport( Input input ) throws IOException executeStage( unlinkedRelationshipStage ); neoStore.stopFlushingPageCache(); + // Link relationships together with each other, their nodes and their relationship groups long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper, neoStore ); - linkRelationships( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(), + linkData( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(), availableMemory ); // Release this potentially really big piece of cached data @@ -221,15 +224,16 @@ public void doImport( Input input ) throws IOException nodeRelationshipCache.close(); nodeRelationshipCache = null; + // Defragment relationships groups for better performance new RelationshipGroupDefragmenter( config, executionMonitor ).run( max( maxMemory, peakMemoryUsage ), neoStore, highNodeId ); - // Stage 6 -- count nodes per label and labels per node + // Count nodes per label and labels per node nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache ); executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(), neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) ); - // Stage 7 -- count label-[type]->label + // Count label-[type]->label executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore, neoStore.getLabelRepository().getHighId(), neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) ); @@ -285,22 +289,34 @@ private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users ) return total.getHeapUsage() + total.getOffHeapUsage(); } - private void linkRelationships( NodeRelationshipCache nodeRelationshipCache, + /** + * Performs one or more rounds linking together relationships with each other. Number of rounds required + * is dictated by available memory. The more dense nodes and relationship types, the more memory required. + * Every round all relationships of one or more types are linked. + * + * Links together: + * + * + * Other linking happens after this method. + * + * @param nodeRelationshipCache cache to use for linking. + * @param neoStore the stores. + * @param typeDistribution distribution of imported relationship types. + * @param freeMemoryForDenseNodeCache max available memory to use for caching. + */ + private void linkData( NodeRelationshipCache nodeRelationshipCache, BatchingNeoStores neoStore, RelationshipTypeDistribution typeDistribution, long freeMemoryForDenseNodeCache ) { - // Imports the relationships from the Input. This isn't a straight forward as importing nodes, - // since keeping track of and updating heads of relationship chains in scenarios where most nodes - // are dense and there are many relationship types scales poorly w/ regards to cache memory usage - // also as a side-effect time required to update this cache. - // - // The approach is instead to do multiple iterations where each iteration imports relationships - // of a single type. For each iteration Node --> Relationship and Relationship --> Relationship - // stages _for dense nodes only_ are run so that the cache can be reused to hold relationship chain heads - // of the next type in the next iteration. All relationships will be imported this way and then - // finally there will be one Node --> Relationship and Relationship --> Relationship stage linking - // all sparse relationship chains together. - Configuration relationshipConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() ); Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java deleted file mode 100644 index 50465c215014d..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; - -import static org.neo4j.graphdb.Direction.BOTH; -import static org.neo4j.graphdb.Direction.INCOMING; -import static org.neo4j.graphdb.Direction.OUTGOING; - -/** - * Creates batches of relationship records, with the "next" relationship - * pointers set to the next relationships (previously created) in their respective chains. The previous - * relationship ids are kept in {@link NodeRelationshipCache node cache}, which is a point of scalability issues, - * although mitigated using multi-pass techniques. - */ -public class RelationshipEncoderStep extends ForkedProcessorStep> -{ - private final NodeRelationshipCache cache; - - public RelationshipEncoderStep( StageControl control, Configuration config, NodeRelationshipCache cache ) - { - super( control, "RELATIONSHIP", config ); - this.cache = cache; - } - - @Override - protected void forkedProcess( int id, int processors, Batch batch ) - { - for ( int i = 0; i < batch.records.length; i++ ) - { - RelationshipRecord relationship = batch.records[i]; - long startNode = relationship.getFirstNode(); - long endNode = relationship.getSecondNode(); - if ( !relationship.inUse() ) - { // This means that we here have a relationship that refers to missing nodes. - // It also means that we tolerate some amount of bad relationships and CalculateDenseNodesStep - // already have reported this to the bad collector. - continue; - } - - // Set first/second next rel - boolean loop = startNode == endNode; - int typeId = relationship.getType(); - if ( startNode % processors == id ) - { - long firstNextRel = cache.getAndPutRelationship( - startNode, typeId, loop ? BOTH : OUTGOING, relationship.getId(), true ); - relationship.setFirstNextRel( firstNextRel ); - if ( loop ) - { - relationship.setSecondNextRel( firstNextRel ); - } - } - - if ( !loop && endNode % processors == id ) - { - relationship.setSecondNextRel( cache.getAndPutRelationship( - endNode, typeId, INCOMING, relationship.getId(), true ) ); - } - } - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index c0c681072dc88..1adcfcd7dcc62 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -23,9 +23,13 @@ import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.record.PropertyBlock; +import org.neo4j.kernel.impl.store.record.PropertyRecord; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; +import org.neo4j.unsafe.impl.batchimport.input.Input; import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -35,6 +39,23 @@ import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; +/** + * Imports relationships and their properties w/o linking them together. Steps: + *
    + *
  1. {@link InputIteratorBatcherStep} reading from {@link InputIterator} produced from + * {@link Input#relationships()}.
  2. + *
  3. {@link InputEntityCacherStep} alternatively {@link InputCache caches} this input data + * (all the {@link InputRelationship input relationships}) if the iterator doesn't support + * {@link InputIterable#supportsMultiplePasses() multiple passes}.
  4. + * into {@link PropertyBlock}, low level kernel encoded values. + *
  5. {@link RelationshipPreparationStep} uses {@link IdMapper} to look up input id --> node id
  6. + *
  7. {@link RelationshipRecordPreparationStep} creates {@link RelationshipRecord} and fills them with + * data known at this point, which is start/end node ids and type
  8. + *
  9. {@link PropertyEncoderStep} encodes properties from {@link InputRelationship input relationships} + *
  10. {@link EntityStoreUpdaterStep} forms {@link PropertyRecord property records} out of previously encoded + * {@link PropertyBlock} and writes those as well as the {@link RelationshipRecord} to store.
  11. + *
+ */ public class RelationshipStage extends Stage { private RelationshipTypeCheckerStep typer;