Skip to content

Commit

Permalink
Imports relationships per type
Browse files Browse the repository at this point in the history
for better scalability for data sets where most nodes are dense and
containing lots of relationship types.

Problem:
Keeping track of and updating heads of relationship chains in the node -> relationship cache
in scenarios where most nodes are dense and there are many relationship types scales poorly
w/ regards to cache memory usage also as a side-effect time required to update this cache.
Previously all relationship groups would need to be present in the cache and so adding more
relationships of different types to dense nodes would create more and more of these groups
in the cache, linking to each using a pointer into the simple array backing the cache.
Both memory usage would steadily increase as more types were added and finding group
for specific type would be more and more expensive due to the linear and random access
pointer chasing, resulting in poor performance or inability to import at all.

Solution:
The approach is instead to do multiple iterations where each iteration imports relationships
of a single type. For each iteration Node --> Relationship and Relationship --> Relationship
stages _for dense nodes only_ are run so that the cache can be reused to hold relationship chain
heads of the next type in the next iteration. All relationships will be imported this way
and then finally there will be one Node --> Relationship and Relationship --> Relationship stage
linking all sparse relationship chains together.
  • Loading branch information
tinwelint committed Apr 22, 2016
1 parent bd5ddba commit 73a8d48
Show file tree
Hide file tree
Showing 39 changed files with 1,697 additions and 488 deletions.
Expand Up @@ -70,8 +70,8 @@
import org.neo4j.unsafe.impl.batchimport.input.Inputs;
import org.neo4j.unsafe.impl.batchimport.input.SimpleInputIterator;
import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.neo4j.helpers.collection.Iterators.asSet;
Expand All @@ -95,7 +95,8 @@ public class ParallelBatchImporterTest
public final RandomRule random = new RandomRule();

private static final int NODE_COUNT = 10_000;
private static final int RELATIONSHIP_COUNT = NODE_COUNT * 5;
private static final int RELATIONSHIPS_PER_NODE = 5;
private static final int RELATIONSHIP_COUNT = NODE_COUNT * RELATIONSHIPS_PER_NODE;
protected final Configuration config = new Configuration.Default()
{
@Override
Expand All @@ -108,7 +109,8 @@ public int batchSize()
@Override
public int denseNodeThreshold()
{
return 30;
// This will have statistically half the nodes be considered dense
return RELATIONSHIPS_PER_NODE * 2;
}

@Override
Expand Down Expand Up @@ -373,10 +375,11 @@ protected void verifyData( int nodeCount, int relationshipCount, GraphDatabaseSe
if ( !inputIdGenerator.isMiss( input.startNode() ) &&
!inputIdGenerator.isMiss( input.endNode() ) )
{
// A relationship refering to missing nodes. The InputIdGenerator is expected to generate
// A relationship referring to missing nodes. The InputIdGenerator is expected to generate
// some (very few) of those. Skip it.
String name = (String) propertyOf( input, "id" );
Relationship relationship = relationshipByName.get( name );
assertNotNull( "Expected there to be a relationship with name '" + name + "'", relationship );
assertEquals( nodeByInputId.get( uniqueId( input.startNodeGroup(), input.startNode() ) ),
relationship.getStartNode() );
assertEquals( nodeByInputId.get( uniqueId( input.endNodeGroup(), input.endNode() ) ),
Expand Down Expand Up @@ -504,11 +507,18 @@ protected InputRelationship fetchNextOrNull()
startNode = idGenerator.miss( random, startNode, 0.001f );
endNode = idGenerator.miss( random, endNode, 0.001f );

String type = idGenerator.randomType( random );
if ( random.nextFloat() < 0.00005 )
{
// Let there be a small chance of introducing a one-off relationship
// with a type that no, or at least very few, other relationships have.
type += "_odd";
}
return new InputRelationship(
sourceDescription, itemNumber, itemNumber,
properties, null,
startNodeGroup, startNode, endNodeGroup, endNode,
idGenerator.randomType( random ), null );
type, null );
}
finally
{
Expand Down
Expand Up @@ -21,13 +21,16 @@

import java.io.IOException;

import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;

import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;

/**
* Counts number of relationships per node that is going to be imported by {@link RelationshipStage} later.
Expand All @@ -36,20 +39,45 @@
*/
public class CalculateDenseNodesStage extends Stage
{
private RelationshipTypeCheckerStep typer;
private final NodeStore nodeStore;
private final NodeRelationshipCache cache;

public CalculateDenseNodesStage( Configuration config, InputIterable<InputRelationship> relationships,
RelationshipStore relationshipStore, NodeRelationshipCache cache, IdMapper idMapper,
Collector badCollector, InputCache inputCache ) throws IOException
NodeRelationshipCache cache, IdMapper idMapper,
Collector badCollector, InputCache inputCache,
BatchingNeoStores neoStores ) throws IOException
{
super( "Calculate dense nodes", config );
this.cache = cache;
add( new InputIteratorBatcherStep<>( control(), config,
relationships.iterator(), InputRelationship.class ) );
if ( !relationships.supportsMultiplePasses() )
{
add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships() ) );
add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships( MAIN ) ) );
}
add( typer = new RelationshipTypeCheckerStep( control(), config, neoStores.getRelationshipTypeRepository() ) );
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new CalculateRelationshipsStep( control(), config, relationshipStore ) );
add( new CalculateRelationshipsStep( control(), config, neoStores.getRelationshipStore() ) );
add( new CalculateDenseNodePrepareStep( control(), config, badCollector ) );
add( new CalculateDenseNodesStep( control(), config, cache ) );
nodeStore = neoStores.getNodeStore();
}

/*
* @see RelationshipTypeCheckerStep#getRelationshipTypes(int)
*/
public Object[] getRelationshipTypes( int belowOrEqualToThreshold )
{
return typer.getRelationshipTypes( belowOrEqualToThreshold );
}

@Override
public void close()
{
// At this point we know how many nodes we have, so we tell the cache that instead of having the
// cache keeping track of that in a the face of concurrent updates.
cache.setHighNodeId( nodeStore.getHighId() );
super.close();
}
}
Expand Up @@ -22,7 +22,6 @@
import org.neo4j.helpers.progress.ProgressListener;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
import org.neo4j.unsafe.impl.batchimport.input.InputNode;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
Expand All @@ -36,11 +35,10 @@
public class IdMapperPreparationStage extends Stage
{
public IdMapperPreparationStage( Configuration config, IdMapper idMapper, InputIterable<InputNode> nodes,
InputCache inputCache, Collector collector, StatsProvider memoryUsageStats )
Collector collector, StatsProvider memoryUsageStats )
{
super( "Prepare node index", config );
add( new IdMapperPreparationStep( control(), config,
idMapper, idsOf( nodes.supportsMultiplePasses() ? nodes : inputCache.nodes() ),
collector, memoryUsageStats ) );
idMapper, idsOf( nodes ), collector, memoryUsageStats ) );
}
}
Expand Up @@ -65,6 +65,46 @@ public void close()
}
}

public static class Delegate<T> extends PrefetchingIterator<T> implements InputIterator<T>
{
protected final InputIterator<T> actual;

public Delegate( InputIterator<T> actual )
{
this.actual = actual;
}

@Override
public void close()
{
actual.close();
}

@Override
protected T fetchNextOrNull()
{
return actual.hasNext() ? actual.next() : null;
}

@Override
public String sourceDescription()
{
return actual.sourceDescription();
}

@Override
public long lineNumber()
{
return actual.lineNumber();
}

@Override
public long position()
{
return actual.position();
}
}

public static class Empty<T> extends Adapter<T>
{
@Override
Expand Down
Expand Up @@ -21,14 +21,13 @@

import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

/**
* Reads all records from {@link RelationshipStore} and process the counts in them. Uses a {@link NodeLabelsCache}
* previously populated by f.ex {@link ProcessNodeCountsDataStep}.
* Reads all records from {@link NodeStore} and process the counts in them, populating {@link NodeLabelsCache}
* for later use of {@link RelationshipCountsStage}.
*/
public class NodeCountsStage extends Stage
{
Expand Down
Expand Up @@ -37,14 +37,14 @@ public class NodeFirstRelationshipProcessor implements RecordProcessor<NodeRecor
{
private final RecordStore<RelationshipGroupRecord> relGroupStore;
private final NodeRelationshipCache cache;

private long nextGroupId = -1;
private final int relationshipType;

public NodeFirstRelationshipProcessor( RecordStore<RelationshipGroupRecord> relGroupStore,
NodeRelationshipCache cache )
NodeRelationshipCache cache, int relationshipType )
{
this.relGroupStore = relGroupStore;
this.cache = cache;
this.relationshipType = relationshipType;
}

@Override
Expand All @@ -64,22 +64,18 @@ public boolean process( NodeRecord node )
}

@Override
public long visit( long nodeId, int type, long next, long out, long in, long loop )
public long visit( long nodeId, long next, long out, long in, long loop )
{
long id = nextGroupId != -1 ? nextGroupId : relGroupStore.nextId();
nextGroupId = -1;

// Here we'll use the already generated id (below) from the previous visit, if that so happened
long id = relGroupStore.nextId();
RelationshipGroupRecord groupRecord = new RelationshipGroupRecord( id );
groupRecord.setType( type );
groupRecord.setType( relationshipType );
groupRecord.setInUse( true );
groupRecord.setFirstOut( out );
groupRecord.setFirstIn( in );
groupRecord.setFirstLoop( loop );
groupRecord.setOwningNode( nodeId );
if ( next != -1 )
{
groupRecord.setNext( nextGroupId = relGroupStore.nextId() );
}
groupRecord.setNext( next );
relGroupStore.prepareForCommit( groupRecord );
relGroupStore.updateRecord( groupRecord );
return id;
Expand Down
Expand Up @@ -33,14 +33,16 @@
*/
public class NodeFirstRelationshipStage extends Stage
{
public NodeFirstRelationshipStage( Configuration config, NodeStore nodeStore,
RecordStore<RelationshipGroupRecord> relationshipGroupStore, NodeRelationshipCache cache, final Collector collector,
LabelScanStore labelScanStore )
public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore,
RecordStore<RelationshipGroupRecord> relationshipGroupStore, NodeRelationshipCache cache,
final Collector collector, LabelScanStore labelScanStore, boolean denseNodes, int relationshipType )
{
super( "Node --> Relationship", config );
add( new ReadNodeRecordsStep( control(), config, nodeStore ) );
super( "Node --> Relationship" + topic, config );
add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, denseNodes ) );
add( new RecordProcessorStep<>( control(), "LINK", config,
new NodeFirstRelationshipProcessor( relationshipGroupStore, cache ), false ) );
add( new UpdateNodeRecordsStep( control(), config, nodeStore, collector, labelScanStore ) );
new NodeFirstRelationshipProcessor( relationshipGroupStore, cache, relationshipType ), false ) );
boolean shouldAlsoPruneBadNodes = !denseNodes;
add( new UpdateNodeRecordsStep( control(), config, nodeStore, collector, labelScanStore,
shouldAlsoPruneBadNodes ) );
}
}
Expand Up @@ -34,6 +34,7 @@
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;

/**
Expand All @@ -52,7 +53,7 @@ public NodeStage( Configuration config, IoMonitor writeMonitor,
add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class ) );
if ( !nodes.supportsMultiplePasses() )
{
add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes() ) );
add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes( MAIN ) ) );
}

NodeStore nodeStore = neoStore.getNodeStore();
Expand Down

0 comments on commit 73a8d48

Please sign in to comment.