Skip to content

Commit

Permalink
Recycles and reuses batch objects for way less garbage
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint committed Jan 5, 2018
1 parent b5919b5 commit cba5218
Show file tree
Hide file tree
Showing 32 changed files with 267 additions and 78 deletions.
Expand Up @@ -46,7 +46,11 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th
// since the records exists in the store in reverse order.
for ( int i = batch.length - 1; i >= 0; i-- )
{
cache.put( batch[i] );
RelationshipGroupRecord record = batch[i];
if ( record.inUse() )
{
cache.put( record );
}
}
}
}
Expand Up @@ -43,16 +43,18 @@ public CalculateDenseNodesStep( StageControl control, Configuration config, Node
@Override
protected void forkedProcess( int id, int processors, RelationshipRecord[] batch )
{
for ( int i = 0; i < batch.length; i++ )
for ( RelationshipRecord record : batch )
{
RelationshipRecord relationship = batch[i];
long startNodeId = relationship.getFirstNode();
long endNodeId = relationship.getSecondNode();
processNodeId( id, processors, startNodeId );
if ( startNodeId != endNodeId ) // avoid counting loops twice
if ( record.inUse() )
{
// Loops only counts as one
processNodeId( id, processors, endNodeId );
long startNodeId = record.getFirstNode();
long endNodeId = record.getSecondNode();
processNodeId( id, processors, startNodeId );
if ( startNodeId != endNodeId ) // avoid counting loops twice
{
// Loops only counts as one
processNodeId( id, processors, endNodeId );
}
}
}
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

/**
* Stage for counting groups per node, populates {@link RelationshipGroupCache}. Steps:
Expand All @@ -45,9 +46,9 @@ public class CountGroupsStage extends Stage
public CountGroupsStage( Configuration config, RecordStore<RelationshipGroupRecord> store,
RelationshipGroupCache groupCache, StatsProvider... additionalStatsProviders )
{
super( NAME, null, config, 0 );
super( NAME, null, config, RECYCLE_BATCHES );
add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, store, null ) );
add( new ReadRecordsStep<>( control(), config, false, store ) );
add( new CountGroupsStep( control(), config, groupCache, additionalStatsProviders ) );
}
}
Expand Up @@ -43,9 +43,12 @@ public CountGroupsStep( StageControl control, Configuration config, Relationship
@Override
protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) throws Throwable
{
for ( RelationshipGroupRecord group : batch )
for ( RelationshipGroupRecord record : batch )
{
cache.incrementGroupCount( group.getOwningNode() );
if ( record.inUse() )
{
cache.incrementGroupCount( record.getOwningNode() );
}
}
}
}
Expand Up @@ -27,10 +27,11 @@
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

/**
* Counts nodes and their labels and also builds {@link LabelScanStore label index} while doing so.
Expand All @@ -43,9 +44,9 @@ public NodeCountsAndLabelIndexBuildStage( Configuration config, NodeLabelsCache
int highLabelId, CountsAccessor.Updater countsUpdater, ProgressReporter progressReporter,
LabelScanStore labelIndex, StatsProvider... additionalStatsProviders )
{
super( NAME, null, config, Step.ORDER_SEND_DOWNSTREAM );
super( NAME, null, config, ORDER_SEND_DOWNSTREAM | RECYCLE_BATCHES );
add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore ) );
add( new LabelIndexWriterStep( control(), config, labelIndex, nodeStore ) );
add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor(
nodeStore, cache, highLabelId, countsUpdater, progressReporter ), true, additionalStatsProviders ) );
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

/**
* Reads all records from {@link NodeStore} and process the counts in them, populating {@link NodeLabelsCache}
Expand All @@ -42,9 +43,9 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n
CountsAccessor.Updater countsUpdater, ProgressReporter progressReporter,
StatsProvider... additionalStatsProviders )
{
super( NAME, null, config, 0 );
super( NAME, null, config, RECYCLE_BATCHES );
add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) );
add( new ReadRecordsStep<>( control(), config, false, nodeStore ) );
add( new RecordProcessorStep<>( control(), "COUNT", config,
new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater, progressReporter ), true,
additionalStatsProviders ) );
Expand Down
Expand Up @@ -20,6 +20,8 @@
package org.neo4j.unsafe.impl.batchimport;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.forwards;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
Expand All @@ -38,9 +40,9 @@ public class NodeDegreeCountStage extends Stage
public NodeDegreeCountStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache,
StatsProvider memoryUsageStatsProvider )
{
super( NAME, null, config, 0 );
super( NAME, null, config, RECYCLE_BATCHES );
add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, store, null ) );
add( new ReadRecordsStep<>( control(), config, false, store ) );
add( new CalculateDenseNodesStep( control(), config, cache, memoryUsageStatsProvider ) );
}
}
Expand Up @@ -43,7 +43,7 @@ public NodeFirstGroupStage( Configuration config, RecordStore<RelationshipGroupR
{
super( NAME, null, config, 0 );
add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, groupStore, null ) );
add( new ReadRecordsStep<>( control(), config, true, groupStore ) );
add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) );
add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence() ) );
}
Expand Down
Expand Up @@ -70,7 +70,11 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th
{
for ( RelationshipGroupRecord group : batch )
{
assert group.inUse();
if ( !group.inUse() )
{
continue;
}

long nodeId = group.getOwningNode();
if ( cache.getByte( nodeId, 0 ) == 0 )
{
Expand All @@ -88,6 +92,7 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th
}
}
}
control.recycle( batch );
}

@Override
Expand Down
Expand Up @@ -63,9 +63,12 @@ public ProcessRelationshipCountsDataStep( StageControl control, NodeLabelsCache
protected void process( RelationshipRecord[] batch, BatchSender sender )
{
RelationshipCountsProcessor processor = processor();
for ( RelationshipRecord relationship : batch )
for ( RelationshipRecord record : batch )
{
processor.process( relationship.getFirstNode(), relationship.getType(), relationship.getSecondNode() );
if ( record.inUse() )
{
processor.process( record );
}
}
progressMonitor.progress( batch.length );
}
Expand Down
Expand Up @@ -19,6 +19,8 @@
*/
package org.neo4j.unsafe.impl.batchimport;

import java.util.function.Supplier;

import org.neo4j.kernel.impl.store.RecordStore;
import org.neo4j.kernel.impl.store.record.NodeRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
Expand All @@ -28,6 +30,7 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor;
import org.neo4j.unsafe.impl.batchimport.cache.NodeType;
import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.RecordDataAssembler;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;

import static java.lang.System.nanoTime;
Expand Down Expand Up @@ -58,9 +61,10 @@ protected void process()
}
}

private class NodeVisitor implements NodeChangeVisitor, AutoCloseable, GroupVisitor
private class NodeVisitor implements NodeChangeVisitor, AutoCloseable, GroupVisitor, Supplier<RelationshipGroupRecord[]>
{
private RelationshipGroupRecord[] batch = new RelationshipGroupRecord[batchSize];
private final RecordDataAssembler<RelationshipGroupRecord> assembler = new RecordDataAssembler<>( store::newRecord, null );
private RelationshipGroupRecord[] batch = get();
private int cursor;
private long time = nanoTime();

Expand All @@ -74,13 +78,13 @@ public void change( long nodeId, ByteArray array )
public long visit( long nodeId, int typeId, long out, long in, long loop )
{
long id = store.nextId();
RelationshipGroupRecord record = store.newRecord();
RelationshipGroupRecord record = batch[cursor++];
record.setId( id );
batch[cursor++] = record.initialize( true, typeId, out, in, loop, nodeId, loop );
record.initialize( true, typeId, out, in, loop, nodeId, loop );
if ( cursor == batchSize )
{
send();
batch = new RelationshipGroupRecord[batchSize];
batch = control.reuse( this );
cursor = 0;
}
return id;
Expand All @@ -99,9 +103,16 @@ public void close()
{
if ( cursor > 0 )
{
batch = assembler.cutOffAt( batch, cursor );
send();
}
}

@Override
public RelationshipGroupRecord[] get()
{
return assembler.newBatchObject( batchSize );
}
}

@Override
Expand Down
Expand Up @@ -53,7 +53,7 @@ protected void process( T[] batch, BatchSender sender )
if ( !processor.process( item ) )
{
// No change for this record
batch[i] = null;
batch[i].setInUse( false );
}
}
}
Expand Down
Expand Up @@ -66,12 +66,14 @@ public RelationshipCountsProcessor( NodeLabelsCache nodeLabelCache,
this.wildcardCounts = cacheFactory.newLongArray( anyRelationshipType + 1, 0 );
}

public void process( long startNode, int type, long endNode )
@Override
public boolean process( RelationshipRecord record )
{
// Below is logic duplication of CountsState#addRelationship
int type = record.getType();
increment( wildcardCounts, anyRelationshipType );
increment( wildcardCounts, type );
startScratch = nodeLabelCache.get( client, startNode, startScratch );
startScratch = nodeLabelCache.get( client, record.getFirstNode(), startScratch );
for ( int startNodeLabelId : startScratch )
{
if ( startNodeLabelId == -1 )
Expand All @@ -82,7 +84,7 @@ public void process( long startNode, int type, long endNode )
increment( labelsCounts, startNodeLabelId, anyRelationshipType, START );
increment( labelsCounts, startNodeLabelId, type, START );
}
endScratch = nodeLabelCache.get( client, endNode, endScratch );
endScratch = nodeLabelCache.get( client, record.getSecondNode(), endScratch );
for ( int endNodeLabelId : endScratch )
{
if ( endNodeLabelId == -1 )
Expand All @@ -93,13 +95,6 @@ public void process( long startNode, int type, long endNode )
increment( labelsCounts, endNodeLabelId, anyRelationshipType, END );
increment( labelsCounts, endNodeLabelId, type, END );
}
}

@Override
public boolean process( RelationshipRecord record )
{
process( record.getFirstNode(), record.getType(), record.getSecondNode() );
// No need to update the store, we're just reading things here
return false;
}

Expand Down
Expand Up @@ -27,8 +27,8 @@
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

/**
* Reads all records from {@link RelationshipStore} and process the counts in them. Uses a {@link NodeLabelsCache}
Expand All @@ -42,10 +42,10 @@ public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, Rel
int highLabelId, int highRelationshipTypeId, CountsAccessor.Updater countsUpdater,
NumberArrayFactory cacheFactory, ProgressReporter progressReporter )
{
super( NAME, null, config, 0 );
super( NAME, null, config, RECYCLE_BATCHES );
add( new BatchFeedStep( control(), config, allIn( relationshipStore, config ),
relationshipStore.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, false, relationshipStore, null ) );
add( new ReadRecordsStep<>( control(), config, false, relationshipStore ) );
add( new ProcessRelationshipCountsDataStep( control(), cache, config,
highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory, progressReporter ) );
}
Expand Down
Expand Up @@ -165,7 +165,7 @@ public boolean put( RelationshipGroupRecord groupRecord )
long baseIndex = offsets.get( rebase( nodeId ) );
// grouCount is extra validation, really
int groupCount = groupCount( nodeId );
long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType(), groupRecord.getOwningNode() );
long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType(), nodeId );

// Put the group at this index
cache.setByte( index, 0, (byte) 1 );
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

/**
* Takes information about relationship groups in the {@link NodeRelationshipCache}, which is produced
* as a side-effect of linking relationships together, and writes them out into {@link RelationshipGroupStore}.
Expand All @@ -37,7 +39,7 @@ public class RelationshipGroupStage extends Stage
public RelationshipGroupStage( String topic, Configuration config,
RecordStore<RelationshipGroupRecord> store, NodeRelationshipCache cache )
{
super( NAME, topic, config, 0 );
super( NAME, topic, config, RECYCLE_BATCHES );
add( new ReadGroupRecordsByCacheStep( control(), config, store, cache ) );
add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence() ) );
}
Expand Down
Expand Up @@ -85,7 +85,7 @@ protected void forkedProcess( int id, int processors, RelationshipRecord[] batch
if ( changeCount == -1 )
{
// No change for this record, it's OK, all the processors will reach the same conclusion
batch[i] = null;
batch[i].setInUse( false );
}
else
{
Expand Down
Expand Up @@ -26,13 +26,15 @@
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep;
import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep;
import org.neo4j.unsafe.impl.batchimport.staging.RecordDataAssembler;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;
import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores;
import org.neo4j.unsafe.impl.batchimport.store.PrepareIdSequence;

import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.backwards;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM;
import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES;

/**
* Sets {@link RelationshipRecord#setFirstPrevRel(long)} and {@link RelationshipRecord#setSecondPrevRel(long)}
Expand All @@ -57,10 +59,10 @@ public RelationshipLinkbackStage( String topic, Configuration config, BatchingNe
NodeRelationshipCache cache, Predicate<RelationshipRecord> readFilter,
Predicate<RelationshipRecord> changeFilter, int nodeTypes, StatsProvider... additionalStatsProvider )
{
super( NAME, topic, config, ORDER_SEND_DOWNSTREAM );
super( NAME, topic, config, ORDER_SEND_DOWNSTREAM | RECYCLE_BATCHES );
RelationshipStore store = stores.getRelationshipStore();
add( new BatchFeedStep( control(), config, backwards( 0, store.getHighId(), config ), store.getRecordSize() ) );
add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) );
add( new ReadRecordsStep<>( control(), config, true, store, new RecordDataAssembler<>( store::newRecord, readFilter ) ) );
add( new RelationshipLinkbackStep( control(), config, cache, changeFilter, nodeTypes, additionalStatsProvider ) );
add( new UpdateRecordsStep<>( control(), config, store, PrepareIdSequence.of( stores.usesDoubleRelationshipRecordUnits() ) ) );
}
Expand Down

0 comments on commit cba5218

Please sign in to comment.