Skip to content

Commit

Permalink
Updated javadocs and comments in and around ParallelBatchImporter
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed May 17, 2017
1 parent c73c9b2 commit cd7d77f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 101 deletions.
Expand Up @@ -66,6 +66,7 @@
import static java.lang.Long.max;
import static java.lang.String.format;
import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.Format.bytes;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure;
Expand Down Expand Up @@ -181,7 +182,7 @@ public void doImport( Input input ) throws IOException

RelationshipStore relationshipStore = neoStore.getRelationshipStore();

// Stage 1 -- nodes, properties, labels
// Import nodes, properties, labels
Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
NodeStage nodeStage = new NodeStage( nodeConfig, writeMonitor,
nodes, idMapper, idGenerator, neoStore, inputCache, neoStore.getLabelScanStore(),
Expand All @@ -200,6 +201,7 @@ public void doImport( Input input ) throws IOException
}
}

// Import relationships (unlinked), properties
Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
RelationshipStage unlinkedRelationshipStage =
Expand All @@ -209,8 +211,9 @@ public void doImport( Input input ) throws IOException
executeStage( unlinkedRelationshipStage );
neoStore.stopFlushingPageCache();

// Link relationships together with each other, their nodes and their relationship groups
long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper, neoStore );
linkRelationships( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(),
linkData( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(),
availableMemory );

// Release this potentially really big piece of cached data
Expand All @@ -221,15 +224,16 @@ public void doImport( Input input ) throws IOException
nodeRelationshipCache.close();
nodeRelationshipCache = null;

// Defragment relationships groups for better performance
new RelationshipGroupDefragmenter( config, executionMonitor ).run( max( maxMemory, peakMemoryUsage ),
neoStore, highNodeId );

// Stage 6 -- count nodes per label and labels per node
// Count nodes per label and labels per node
nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() );
memoryUsageStats = new MemoryUsageStatsProvider( nodeLabelsCache );
executeStage( new NodeCountsStage( config, nodeLabelsCache, neoStore.getNodeStore(),
neoStore.getLabelRepository().getHighId(), countsUpdater, memoryUsageStats ) );
// Stage 7 -- count label-[type]->label
// Count label-[type]->label
executeStage( new RelationshipCountsStage( config, nodeLabelsCache, relationshipStore,
neoStore.getLabelRepository().getHighId(),
neoStore.getRelationshipTypeRepository().getHighId(), countsUpdater, AUTO ) );
Expand Down Expand Up @@ -285,22 +289,34 @@ private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users )
return total.getHeapUsage() + total.getOffHeapUsage();
}

private void linkRelationships( NodeRelationshipCache nodeRelationshipCache,
/**
* Performs one or more rounds linking together relationships with each other. Number of rounds required
* is dictated by available memory. The more dense nodes and relationship types, the more memory required.
* Every round all relationships of one or more types are linked.
*
* Links together:
* <ul>
* <li>
* Relationship <--> Relationship. Two sequential passes are made over the relationship store.
* The forward pass links next pointers, each next pointer pointing "backwards" to lower id.
* The backward pass links prev pointers, each prev pointer pointing "forwards" to higher id.
* </li>
* Sparse Node --> Relationship. Sparse nodes are updated with relationship heads of completed chains.
* This is done in the first round only, if there are multiple rounds.
* </li>
* </ul>
*
* Other linking happens after this method.
*
* @param nodeRelationshipCache cache to use for linking.
* @param neoStore the stores.
* @param typeDistribution distribution of imported relationship types.
* @param freeMemoryForDenseNodeCache max available memory to use for caching.
*/
private void linkData( NodeRelationshipCache nodeRelationshipCache,
BatchingNeoStores neoStore, RelationshipTypeDistribution typeDistribution,
long freeMemoryForDenseNodeCache )
{
// Imports the relationships from the Input. This isn't a straight forward as importing nodes,
// since keeping track of and updating heads of relationship chains in scenarios where most nodes
// are dense and there are many relationship types scales poorly w/ regards to cache memory usage
// also as a side-effect time required to update this cache.
//
// The approach is instead to do multiple iterations where each iteration imports relationships
// of a single type. For each iteration Node --> Relationship and Relationship --> Relationship
// stages _for dense nodes only_ are run so that the cache can be reused to hold relationship chain heads
// of the next type in the next iteration. All relationships will be imported this way and then
// finally there will be one Node --> Relationship and Relationship --> Relationship stage linking
// all sparse relationship chains together.

Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() );
Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
Expand Down

This file was deleted.

Expand Up @@ -23,9 +23,13 @@

import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.Input;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
Expand All @@ -35,6 +39,23 @@
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;

/**
* Imports relationships and their properties w/o linking them together. Steps:
* <ol>
* <li>{@link InputIteratorBatcherStep} reading from {@link InputIterator} produced from
* {@link Input#relationships()}.</li>
* <li>{@link InputEntityCacherStep} alternatively {@link InputCache caches} this input data
* (all the {@link InputRelationship input relationships}) if the iterator doesn't support
* {@link InputIterable#supportsMultiplePasses() multiple passes}.</li>
* into {@link PropertyBlock}, low level kernel encoded values.</li>
* <li>{@link RelationshipPreparationStep} uses {@link IdMapper} to look up input id --> node id</li>
* <li>{@link RelationshipRecordPreparationStep} creates {@link RelationshipRecord} and fills them with
* data known at this point, which is start/end node ids and type</li>
* <li>{@link PropertyEncoderStep} encodes properties from {@link InputRelationship input relationships}
* <li>{@link EntityStoreUpdaterStep} forms {@link PropertyRecord property records} out of previously encoded
* {@link PropertyBlock} and writes those as well as the {@link RelationshipRecord} to store.</li>
* </ol>
*/
public class RelationshipStage extends Stage
{
private RelationshipTypeCheckerStep typer;
Expand Down

0 comments on commit cd7d77f

Please sign in to comment.