Skip to content

Commit

Permalink
Removes explicit CalculateDenseNodeStage from importer
Browse files Browse the repository at this point in the history
by instead importing relationships when going through the relationship input
w/o any linking. Then have one pass linking all relationships forwards before
linking them backwards. In most cases this should be faster than the previous
approach.
  • Loading branch information
tinwelint committed May 16, 2017
1 parent a4accd3 commit 9f07dd5
Show file tree
Hide file tree
Showing 17 changed files with 397 additions and 359 deletions.

This file was deleted.

This file was deleted.

Expand Up @@ -44,7 +44,7 @@ public CountGroupsStage( Configuration config, RecordStore<RelationshipGroupReco
{
super( "Count groups", config );
add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, store ) );
add( new ReadRecordsStep<>( control(), config, false, store, null ) );
add( new CountGroupsStep( control(), config, groupCache ) );
}
}
Expand Up @@ -40,7 +40,7 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n
{
super( "Node counts", config );
add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) );
add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor(
nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) );
}
Expand Down
Expand Up @@ -40,7 +40,7 @@ public NodeFirstGroupStage( Configuration config, RecordStore<RelationshipGroupR
{
super( "Node --> Group", config );
add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, groupStore ) );
add( new ReadRecordsStep<>( control(), config, true, groupStore, null ) );
add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore ) );
}
Expand Down
Expand Up @@ -25,6 +25,8 @@
import java.util.Iterator;
import java.util.function.Predicate;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveIntSet;
import org.neo4j.collection.primitive.PrimitiveLongIterator;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.Format;
Expand All @@ -38,6 +40,7 @@
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.unsafe.impl.batchimport.cache.GatheringMemoryStatsVisitor;
Expand All @@ -57,6 +60,7 @@
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static java.lang.Long.max;
Expand Down Expand Up @@ -174,8 +178,6 @@ public void doImport( Input input ) throws IOException
InputIterable<InputNode> nodes = input.nodes();
InputIterable<InputRelationship> relationships = input.relationships();
InputIterable<InputNode> cachedNodes = cachedForSure( nodes, inputCache.nodes( MAIN, true ) );
InputIterable<InputRelationship> cachedRelationships =
cachedForSure( relationships, inputCache.relationships( MAIN, false ) );

RelationshipStore relationshipStore = neoStore.getRelationshipStore();

Expand All @@ -198,17 +200,17 @@ public void doImport( Input input ) throws IOException
}
}

// Stage 2 -- calculate dense node threshold
Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage(
relationshipConfig,
relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore );
executeStage( calculateDenseNodesStage );
RelationshipStage unlinkedRelationshipStage =
new RelationshipStage( relationshipConfig, writeMonitor, relationships, idMapper,
badCollector, inputCache, nodeRelationshipCache, neoStore, storeUpdateMonitor );
neoStore.startFlushingPageCache();
executeStage( unlinkedRelationshipStage );
neoStore.stopFlushingPageCache();

long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper );
importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor,
idMapper, cachedRelationships, calculateDenseNodesStage.getDistribution(),
linkRelationships( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(),
availableMemory );

// Release this potentially really big piece of cached data
Expand Down Expand Up @@ -283,10 +285,9 @@ private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users )
return total.getHeapUsage() + total.getOffHeapUsage();
}

private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore,
IoMonitor writeMonitor, IdMapper idMapper, InputIterable<InputRelationship> relationships,
RelationshipTypeDistribution typeDistribution, long freeMemoryForDenseNodeCache )
private void linkRelationships( NodeRelationshipCache nodeRelationshipCache,
BatchingNeoStores neoStore, RelationshipTypeDistribution typeDistribution,
long freeMemoryForDenseNodeCache )
{
// Imports the relationships from the Input. This isn't a straight forward as importing nodes,
// since keeping track of and updating heads of relationship chains in scenarios where most nodes
Expand All @@ -300,7 +301,6 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
// finally there will be one Node --> Relationship and Relationship --> Relationship stage linking
// all sparse relationship chains together.

long nextRelationshipId = 0;
Configuration relationshipConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() );
Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() );
Expand All @@ -309,7 +309,7 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
Configuration groupConfig =
configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipGroupStore() );

// Do multiple rounds of relationship importing. Each round fits as many relationship types
// Do multiple rounds of relationship linking. Each round fits as many relationship types
// as it can (comparing with worst-case memory usage and available memory).
int typesImported = 0;
int round = 0;
Expand All @@ -318,57 +318,64 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache,
// Figure out which types we can fit in node-->relationship cache memory.
// Types go from biggest to smallest group and so towards the end there will be
// smaller and more groups per round in this loop
Collection<Object> typesToImportThisRound = rounds.next();
boolean thisIsTheOnlyRound = round == 0 && !rounds.hasNext();
Collection<Object> typesToLinkThisRound = rounds.next();
boolean thisIsTheFirstRound = round == 0;
boolean thisIsTheOnlyRound = thisIsTheFirstRound && !rounds.hasNext();

// Import relationships and their properties
nodeRelationshipCache.setForwardScan( true, true/*dense*/ );
String range = typesToImportThisRound.size() == 1
String range = typesToLinkThisRound.size() == 1
? String.valueOf( typesImported + 1 )
: (typesImported + 1) + "-" + (typesImported + typesToImportThisRound.size());
: (typesImported + 1) + "-" + (typesImported + typesToLinkThisRound.size());
String topic = " " + range + "/" + typeDistribution.getNumberOfRelationshipTypes();
Predicate<InputRelationship> typeFilter = thisIsTheOnlyRound
? relationship -> true // optimization when all rels are imported in this round
: relationship -> typesToImportThisRound.contains( relationship.typeAsObject() );
RelationshipStage relationshipStage = new RelationshipStage( topic, config,
writeMonitor, typeFilter, relationships.iterator(), idMapper, neoStore,
nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId );
neoStore.startFlushingPageCache();
executeStage( relationshipStage );
neoStore.stopFlushingPageCache();
int nodeTypes = thisIsTheFirstRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE;
Predicate<RelationshipRecord> readFilter = thisIsTheFirstRound
? null // optimization when all rels are imported in this round
: typeIdFilter( typesToLinkThisRound, neoStore.getRelationshipTypeRepository() );
Predicate<RelationshipRecord> denseChangeFilter = thisIsTheOnlyRound
? null // optimization when all rels are imported in this round
: typeIdFilter( typesToLinkThisRound, neoStore.getRelationshipTypeRepository() );

int nodeTypes = thisIsTheOnlyRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE;
// LINK Forward
RelationshipLinkforwardStage linkForwardStage = new RelationshipLinkforwardStage( topic, relationshipConfig,
neoStore.getRelationshipStore(), nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes );
executeStage( linkForwardStage );

// Write relationship groups cached from the relationship import above
executeStage( new RelationshipGroupStage( topic, groupConfig,
neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache ) );
// Set node nextRel fields
executeStage( new SparseNodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(),
nodeRelationshipCache ) );
if ( thisIsTheFirstRound )
{
// Set node nextRel fields for sparse nodes
executeStage( new SparseNodeFirstRelationshipStage( nodeConfig, neoStore.getNodeStore(),
nodeRelationshipCache ) );
}

// Link relationship chains together for dense nodes
// LINK backward
nodeRelationshipCache.setForwardScan( false, true/*dense*/ );
executeStage( new RelationshipLinkbackStage( topic,
relationshipConfig,
neoStore.getRelationshipStore(),
nodeRelationshipCache, nextRelationshipId,
relationshipStage.getNextRelationshipId(), nodeTypes ) );
nextRelationshipId = relationshipStage.getNextRelationshipId();
typesImported += typesToImportThisRound.size();
executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(),
nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes ) );
typesImported += typesToLinkThisRound.size();
}
}

// There's an optimization above which will piggy-back sparse linking on the dense linking
// if all relationships are imported in one round. The sparse linking below will be done if
// there were multiple passes of dense linking above.
if ( round > 1 )
private static Predicate<RelationshipRecord> typeIdFilter( Collection<Object> typesToLinkThisRound,
BatchingRelationshipTypeTokenRepository relationshipTypeRepository )
{
PrimitiveIntSet set = Primitive.intSet( typesToLinkThisRound.size() );
for ( Object type : typesToLinkThisRound )
{
// Link relationship chains together for sparse nodes
nodeRelationshipCache.setForwardScan( false, false/*sparse*/ );
executeStage( new RelationshipLinkbackStage( " Sparse", relationshipConfig,
neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId,
NodeType.NODE_TYPE_SPARSE ) );
int id;
if ( type instanceof Number )
{
id = ((Number) type).intValue();
}
else
{
id = relationshipTypeRepository.applyAsInt( type );
}
set.add( id );
}
// else we did in the single round above to avoid doing another pass
return relationship -> set.contains( relationship.getType() );
}

private static Configuration configWithRecordsPerPageBasedBatchSize( Configuration source, RecordStore<?> store )
Expand Down

0 comments on commit 9f07dd5

Please sign in to comment.