Skip to content

Commit

Permalink
Renamed NodeRelationshipLink-->NodeRelationshipCache
Browse files Browse the repository at this point in the history
to have similar names as other e.g. NodeLabelsCache. Also removed
unnecessary NodeRelationshipLink interface as well.

(cherry picked from commit 829ee8a)
  • Loading branch information
tinwelint committed Apr 9, 2015
1 parent 1eb6131 commit 6763221
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 166 deletions.
Expand Up @@ -22,7 +22,7 @@
import java.util.Arrays;

import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
Expand All @@ -32,7 +32,7 @@
/**
* Makes some preparations to incoming batches so that {@link CalculateDenseNodesStep} can run parallel batches.
* Sends long[] batches downstream, where each batch is for node ids with a certain radix, such that
* the {@link NodeRelationshipLink} cache can be updated in parallel without synchronization.
* the {@link NodeRelationshipCache} cache can be updated in parallel without synchronization.
* Each id in the long[] has the 0x40000000_00000000 bit set if end node. End node on loop relationships
* are not counted.
*/
Expand Down
Expand Up @@ -21,7 +21,7 @@

import java.io.IOException;

import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
import org.neo4j.unsafe.impl.batchimport.input.InputCache;
Expand All @@ -36,7 +36,7 @@
public class CalculateDenseNodesStage extends Stage
{
public CalculateDenseNodesStage( Configuration config, InputIterable<InputRelationship> relationships,
NodeRelationshipLink nodeRelationshipLink, IdMapper idMapper,
NodeRelationshipCache cache, IdMapper idMapper,
Collector<InputRelationship> badRelationshipsCollector,
InputCache inputCache ) throws IOException
{
Expand All @@ -49,6 +49,6 @@ public CalculateDenseNodesStage( Configuration config, InputIterable<InputRelati
}
add( new RelationshipPreparationStep( control(), config, idMapper ) );
add( new CalculateDenseNodePrepareStep( control(), config, badRelationshipsCollector ) );
add( new CalculateDenseNodesStep( control(), config, nodeRelationshipLink ) );
add( new CalculateDenseNodesStep( control(), config, cache ) );
}
}
Expand Up @@ -19,7 +19,7 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
Expand All @@ -29,13 +29,12 @@
*/
public class CalculateDenseNodesStep extends ProcessorStep<long[]>
{
private final NodeRelationshipLink nodeRelationshipLink;
private final NodeRelationshipCache cache;

public CalculateDenseNodesStep( StageControl control, Configuration config,
NodeRelationshipLink nodeRelationshipLink )
public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache )
{
super( control, "CALCULATOR", config, true );
this.nodeRelationshipLink = nodeRelationshipLink;
this.cache = cache;
}

@Override
Expand All @@ -45,7 +44,7 @@ protected void process( long[] ids, BatchSender sender )
{
if ( id != -1 )
{
nodeRelationshipLink.incrementCount( id );
cache.incrementCount( id );
}
}
}
Expand Down
Expand Up @@ -22,40 +22,39 @@
import org.neo4j.kernel.impl.store.RelationshipGroupStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink.GroupVisitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor;

/**
* Sets the {@link NodeRecord#setNextRel(long) relationship field} on all {@link NodeRecord nodes}.
* This is done after all relationships have been imported and the {@link NodeRelationshipLink node cache}
* This is done after all relationships have been imported and the {@link NodeRelationshipCache node cache}
* points to the first relationship for each node.
*
* This step also creates {@link RelationshipGroupRecord group records} for the dense nodes as it encounters
* dense nodes, where it gets all relationship group information from {@link NodeRelationshipLink}.
* dense nodes, where it gets all relationship group information from {@link NodeRelationshipCache}.
*/
public class NodeFirstRelationshipProcessor implements RecordProcessor<NodeRecord>, GroupVisitor
{
private final RelationshipGroupStore relGroupStore;
private final NodeRelationshipLink nodeRelationshipLink;
private final NodeRelationshipCache cache;

private long nextGroupId = -1;

public NodeFirstRelationshipProcessor( RelationshipGroupStore relGroupStore,
NodeRelationshipLink nodeRelationshipLink )
public NodeFirstRelationshipProcessor( RelationshipGroupStore relGroupStore, NodeRelationshipCache cache )
{
this.relGroupStore = relGroupStore;
this.nodeRelationshipLink = nodeRelationshipLink;
this.cache = cache;
}

@Override
public boolean process( NodeRecord node )
{
long nodeId = node.getId();
long firstRel = nodeRelationshipLink.getFirstRel( nodeId, this );
long firstRel = cache.getFirstRel( nodeId, this );
if ( firstRel != -1 )
{
node.setNextRel( firstRel );
if ( nodeRelationshipLink.isDense( nodeId ) )
if ( cache.isDense( nodeId ) )
{
node.setDense( true );
}
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.neo4j.kernel.impl.store.NodeStore;
import org.neo4j.kernel.impl.store.RelationshipGroupStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

/**
Expand All @@ -31,7 +31,7 @@
public class NodeFirstRelationshipStage extends Stage
{
public NodeFirstRelationshipStage( Configuration config, NodeStore nodeStore,
RelationshipGroupStore relationshipGroupStore, NodeRelationshipLink cache )
RelationshipGroupStore relationshipGroupStore, NodeRelationshipCache cache )
{
super( "Node --> Relationship", config, false );
add( new ReadNodeRecordsStep( control(), config, nodeStore ) );
Expand Down
Expand Up @@ -34,8 +34,7 @@
import org.neo4j.kernel.logging.Logging;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLinkImpl;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator;
import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper;
import org.neo4j.unsafe.impl.batchimport.input.Collector;
Expand Down Expand Up @@ -121,7 +120,7 @@ public void doImport( Input input ) throws IOException

// Things that we need to close later. The reason they're not in the try-with-resource statement
// is that we need to close, and set to null, at specific points preferably. So use good ol' finally block.
NodeRelationshipLink nodeRelationshipLink = null;
NodeRelationshipCache nodeRelationshipCache = null;
NodeLabelsCache nodeLabelsCache = null;
long startTime = currentTimeMillis();
boolean hasBadRelationships = false;
Expand All @@ -139,8 +138,8 @@ public void doImport( Input input ) throws IOException
// Some temporary caches and indexes in the import
IdMapper idMapper = input.idMapper();
IdGenerator idGenerator = input.idGenerator();
nodeRelationshipLink = new NodeRelationshipLinkImpl( AUTO, config.denseNodeThreshold() );
StatsProvider memoryUsageStats = new MemoryUsageStatsProvider( nodeRelationshipLink, idMapper );
nodeRelationshipCache = new NodeRelationshipCache( AUTO, config.denseNodeThreshold() );
StatsProvider memoryUsageStats = new MemoryUsageStatsProvider( nodeRelationshipCache, idMapper );
InputIterable<InputNode> nodes = input.nodes();
InputIterable<InputRelationship> relationships = input.relationships();

Expand All @@ -150,7 +149,7 @@ public void doImport( Input input ) throws IOException

// Stage 2 -- calculate dense node threshold
CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config, relationships,
nodeRelationshipLink, idMapper, badRelationships, inputCache );
nodeRelationshipCache, idMapper, badRelationships, inputCache );

// Execute stages 1 and 2 in parallel or sequentially?
if ( idMapper.needsPreparation() )
Expand All @@ -170,25 +169,25 @@ public void doImport( Input input ) throws IOException
// Stage 3 -- relationships, properties
final RelationshipStage relationshipStage = new RelationshipStage( config, writeMonitor, writerFactory,
relationships.supportsMultiplePasses() ? relationships : inputCache.relationships(),
idMapper, neoStore, nodeRelationshipLink, input.specificRelationshipIds() );
idMapper, neoStore, nodeRelationshipCache, input.specificRelationshipIds() );
executeStages( relationshipStage );
nodeRelationshipLink.fixate();
nodeRelationshipCache.fixate();

// Prepare for updating
neoStore.flush();
writerFactory.awaitEverythingWritten();

// Stage 4 -- set node nextRel fields
executeStages( new NodeFirstRelationshipStage( config, neoStore.getNodeStore(),
neoStore.getRelationshipGroupStore(), nodeRelationshipLink ) );
neoStore.getRelationshipGroupStore(), nodeRelationshipCache ) );
// Stage 5 -- link relationship chains together
nodeRelationshipLink.clearRelationships();
nodeRelationshipCache.clearRelationships();
executeStages( new RelationshipLinkbackStage( config, neoStore.getRelationshipStore(),
nodeRelationshipLink ) );
nodeRelationshipCache ) );

// Release this potentially really big piece of cached data
nodeRelationshipLink.close();
nodeRelationshipLink = null;
nodeRelationshipCache.close();
nodeRelationshipCache = null;

// Stage 6 -- count nodes per label and labels per node
nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() );
Expand Down Expand Up @@ -219,9 +218,9 @@ public void doImport( Input input ) throws IOException
finally
{
writerFactory.shutdown();
if ( nodeRelationshipLink != null )
if ( nodeRelationshipCache != null )
{
nodeRelationshipLink.close();
nodeRelationshipCache.close();
}
if ( nodeLabelsCache != null )
{
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
Expand All @@ -36,14 +36,14 @@
/**
* Creates batches of relationship records, with the "next" relationship
* pointers set to the next relationships (previously created) in their respective chains. The previous
* relationship ids are kept in {@link NodeRelationshipLink node cache}, which is a point of scalability issues,
* relationship ids are kept in {@link NodeRelationshipCache node cache}, which is a point of scalability issues,
* although mitigated using multi-pass techniques.
*/
public class RelationshipEncoderStep extends ProcessorStep<Batch<InputRelationship,RelationshipRecord>>
{
private final BatchingTokenRepository<?> relationshipTypeRepository;
private final RelationshipStore relationshipStore;
private final NodeRelationshipLink nodeRelationshipLink;
private final NodeRelationshipCache cache;

// There are two "modes" in generating relationship ids
// - ids are decided by InputRelationship#id() (f.ex. store migration, where ids should be kept intact).
Expand All @@ -57,13 +57,13 @@ public RelationshipEncoderStep( StageControl control,
Configuration config,
BatchingTokenRepository<?> relationshipTypeRepository,
RelationshipStore relationshipStore,
NodeRelationshipLink nodeRelationshipLink,
NodeRelationshipCache cache,
boolean specificIds )
{
super( control, "RELATIONSHIP", config, false );
this.relationshipTypeRepository = relationshipTypeRepository;
this.relationshipStore = relationshipStore;
this.nodeRelationshipLink = nodeRelationshipLink;
this.cache = cache;
this.specificIds = specificIds;
}

Expand Down Expand Up @@ -102,7 +102,7 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch

// Set first/second next rel
boolean loop = startNodeId == endNodeId;
long firstNextRel = nodeRelationshipLink.getAndPutRelationship(
long firstNextRel = cache.getAndPutRelationship(
startNodeId, typeId, loop ? BOTH : OUTGOING, relationshipId, true );
relationshipRecord.setFirstNextRel( firstNextRel );
if ( loop )
Expand All @@ -111,7 +111,7 @@ protected void process( Batch<InputRelationship,RelationshipRecord> batch, Batch
}
else
{
relationshipRecord.setSecondNextRel( nodeRelationshipLink.getAndPutRelationship(
relationshipRecord.setSecondNextRel( cache.getAndPutRelationship(
endNodeId, typeId, INCOMING, relationshipId, true ) );
}

Expand Down
Expand Up @@ -21,19 +21,19 @@

import org.neo4j.graphdb.Direction;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;

/**
* Links the {@code previous} fields in {@link RelationshipRecord relationship records}. This is done after
* a forward pass where the {@code next} fields are linked.
*/
public class RelationshipLinkbackProcessor implements RecordProcessor<RelationshipRecord>
{
private final NodeRelationshipLink nodeRelationshipLink;
private final NodeRelationshipCache cache;

public RelationshipLinkbackProcessor( NodeRelationshipLink nodeRelationshipLink )
public RelationshipLinkbackProcessor( NodeRelationshipCache cache )
{
this.nodeRelationshipLink = nodeRelationshipLink;
this.cache = cache;
}

@Override
Expand All @@ -42,13 +42,13 @@ public boolean process( RelationshipRecord record )
boolean isLoop = record.getFirstNode() == record.getSecondNode();
if ( isLoop )
{
long prevRel = nodeRelationshipLink.getAndPutRelationship( record.getFirstNode(),
long prevRel = cache.getAndPutRelationship( record.getFirstNode(),
record.getType(), Direction.BOTH, record.getId(), false );
if ( prevRel == -1 )
{ // First one
record.setFirstInFirstChain( true );
record.setFirstInSecondChain( true );
prevRel = nodeRelationshipLink.getCount( record.getFirstNode(),
prevRel = cache.getCount( record.getFirstNode(),
record.getType(), Direction.BOTH );
}
record.setFirstPrevRel( prevRel );
Expand All @@ -57,23 +57,23 @@ public boolean process( RelationshipRecord record )
else
{
// Start node
long firstPrevRel = nodeRelationshipLink.getAndPutRelationship( record.getFirstNode(),
long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(),
record.getType(), Direction.OUTGOING, record.getId(), false );
if ( firstPrevRel == -1 )
{ // First one
record.setFirstInFirstChain( true );
firstPrevRel = nodeRelationshipLink.getCount( record.getFirstNode(),
firstPrevRel = cache.getCount( record.getFirstNode(),
record.getType(), Direction.OUTGOING );
}
record.setFirstPrevRel( firstPrevRel );

// End node
long secondPrevRel = nodeRelationshipLink.getAndPutRelationship( record.getSecondNode(),
long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(),
record.getType(), Direction.INCOMING, record.getId(), false );
if ( secondPrevRel == -1 )
{ // First one
record.setFirstInSecondChain( true );
secondPrevRel = nodeRelationshipLink.getCount( record.getSecondNode(),
secondPrevRel = cache.getCount( record.getSecondNode(),
record.getType(), Direction.INCOMING );
}
record.setSecondPrevRel( secondPrevRel );
Expand Down
Expand Up @@ -21,7 +21,7 @@

import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipLink;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

/**
Expand All @@ -30,7 +30,7 @@
*/
public class RelationshipLinkbackStage extends Stage
{
public RelationshipLinkbackStage( Configuration config, RelationshipStore store, NodeRelationshipLink cache )
public RelationshipLinkbackStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache )
{
super( "Relationship --> Relationship", config, false );
add( new ReadRelationshipRecordsBackwardsStep( control(), config, store ) );
Expand Down

0 comments on commit 6763221

Please sign in to comment.