From cba5218cd164f1c9818fbbcd930cc9ffb0ab8455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mattias=20Finn=C3=A9?= Date: Mon, 27 Nov 2017 13:33:31 +0100 Subject: [PATCH] Recycles and reuses batch objects for way less garbage --- .../impl/batchimport/CacheGroupsStep.java | 6 +- .../batchimport/CalculateDenseNodesStep.java | 18 +++-- .../impl/batchimport/CountGroupsStage.java | 5 +- .../impl/batchimport/CountGroupsStep.java | 7 +- .../NodeCountsAndLabelIndexBuildStage.java | 7 +- .../impl/batchimport/NodeCountsStage.java | 5 +- .../batchimport/NodeDegreeCountStage.java | 6 +- .../impl/batchimport/NodeFirstGroupStage.java | 2 +- .../batchimport/NodeSetFirstGroupStep.java | 7 +- .../ProcessRelationshipCountsDataStep.java | 7 +- .../ReadGroupRecordsByCacheStep.java | 21 ++++-- .../impl/batchimport/RecordProcessorStep.java | 2 +- .../RelationshipCountsProcessor.java | 15 ++-- .../batchimport/RelationshipCountsStage.java | 6 +- .../batchimport/RelationshipGroupCache.java | 2 +- .../batchimport/RelationshipGroupStage.java | 4 +- .../batchimport/RelationshipLinkStep.java | 2 +- .../RelationshipLinkbackStage.java | 6 +- .../RelationshipLinkforwardStage.java | 6 +- .../batchimport/ScanAndCacheGroupsStage.java | 5 +- .../SparseNodeFirstRelationshipStage.java | 5 +- .../batchimport/staging/AbstractStep.java | 2 +- .../staging/ForkedProcessorStep.java | 5 +- .../batchimport/staging/ProcessorStep.java | 1 + .../batchimport/staging/ReadRecordsStep.java | 32 ++++---- .../staging/RecordDataAssembler.java | 73 +++++++++++++++++++ .../impl/batchimport/staging/Stage.java | 7 ++ .../batchimport/staging/StageControl.java | 6 ++ .../batchimport/staging/StageExecution.java | 42 ++++++++++- .../unsafe/impl/batchimport/staging/Step.java | 1 + .../RelationshipCountsProcessorTest.java | 19 ++++- .../staging/SimpleStageControl.java | 13 ++++ 32 files changed, 267 insertions(+), 78 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/RecordDataAssembler.java diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java index b9f568f1e4f8a..f7ff368c73ed9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java @@ -46,7 +46,11 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th // since the records exists in the store in reverse order. for ( int i = batch.length - 1; i >= 0; i-- ) { - cache.put( batch[i] ); + RelationshipGroupRecord record = batch[i]; + if ( record.inUse() ) + { + cache.put( record ); + } } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStep.java index 5c3a78a83bb04..5cb5c0c62060e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStep.java @@ -43,16 +43,18 @@ public CalculateDenseNodesStep( StageControl control, Configuration config, Node @Override protected void forkedProcess( int id, int processors, RelationshipRecord[] batch ) { - for ( int i = 0; i < batch.length; i++ ) + for ( RelationshipRecord record : batch ) { - RelationshipRecord relationship = batch[i]; - long startNodeId = relationship.getFirstNode(); - long endNodeId = relationship.getSecondNode(); - processNodeId( id, processors, startNodeId ); - if ( startNodeId != endNodeId ) // avoid counting loops twice + if ( record.inUse() ) { - // Loops only counts as one - processNodeId( id, processors, endNodeId ); + long startNodeId = record.getFirstNode(); + long endNodeId = record.getSecondNode(); + processNodeId( id, processors, startNodeId ); + if ( startNodeId != endNodeId ) // avoid counting loops twice + { + // Loops only counts as one + processNodeId( id, processors, endNodeId ); + } } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java index 5aba0cec68265..bf0fd6494e719 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java @@ -27,6 +27,7 @@ import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Stage for counting groups per node, populates {@link RelationshipGroupCache}. Steps: @@ -45,9 +46,9 @@ public class CountGroupsStage extends Stage public CountGroupsStage( Configuration config, RecordStore store, RelationshipGroupCache groupCache, StatsProvider... additionalStatsProviders ) { - super( NAME, null, config, 0 ); + super( NAME, null, config, RECYCLE_BATCHES ); add( new BatchFeedStep( control(), config, allIn( store, config ), store.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, store, null ) ); + add( new ReadRecordsStep<>( control(), config, false, store ) ); add( new CountGroupsStep( control(), config, groupCache, additionalStatsProviders ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStep.java index 6f22304d2d3ec..677aae5e87812 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStep.java @@ -43,9 +43,12 @@ public CountGroupsStep( StageControl control, Configuration config, Relationship @Override protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) throws Throwable { - for ( RelationshipGroupRecord group : batch ) + for ( RelationshipGroupRecord record : batch ) { - cache.incrementGroupCount( group.getOwningNode() ); + if ( record.inUse() ) + { + cache.incrementGroupCount( record.getOwningNode() ); + } } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsAndLabelIndexBuildStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsAndLabelIndexBuildStage.java index 65f793ec0cdaf..46fd54ba563a1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsAndLabelIndexBuildStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsAndLabelIndexBuildStage.java @@ -27,10 +27,11 @@ import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; -import org.neo4j.unsafe.impl.batchimport.staging.Step; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Counts nodes and their labels and also builds {@link LabelScanStore label index} while doing so. @@ -43,9 +44,9 @@ public NodeCountsAndLabelIndexBuildStage( Configuration config, NodeLabelsCache int highLabelId, CountsAccessor.Updater countsUpdater, ProgressReporter progressReporter, LabelScanStore labelIndex, StatsProvider... additionalStatsProviders ) { - super( NAME, null, config, Step.ORDER_SEND_DOWNSTREAM ); + super( NAME, null, config, ORDER_SEND_DOWNSTREAM | RECYCLE_BATCHES ); add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) ); + add( new ReadRecordsStep<>( control(), config, false, nodeStore ) ); add( new LabelIndexWriterStep( control(), config, labelIndex, nodeStore ) ); add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater, progressReporter ), true, additionalStatsProviders ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java index e97826e92f85d..177f79b434647 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java @@ -29,6 +29,7 @@ import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Reads all records from {@link NodeStore} and process the counts in them, populating {@link NodeLabelsCache} @@ -42,9 +43,9 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n CountsAccessor.Updater countsUpdater, ProgressReporter progressReporter, StatsProvider... additionalStatsProviders ) { - super( NAME, null, config, 0 ); + super( NAME, null, config, RECYCLE_BATCHES ); add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) ); + add( new ReadRecordsStep<>( control(), config, false, nodeStore ) ); add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater, progressReporter ), true, additionalStatsProviders ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java index e7fac4814973a..ceb68e8f5d72f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeDegreeCountStage.java @@ -20,6 +20,8 @@ package org.neo4j.unsafe.impl.batchimport; import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.forwards; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; + import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; @@ -38,9 +40,9 @@ public class NodeDegreeCountStage extends Stage public NodeDegreeCountStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache, StatsProvider memoryUsageStatsProvider ) { - super( NAME, null, config, 0 ); + super( NAME, null, config, RECYCLE_BATCHES ); add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, store, null ) ); + add( new ReadRecordsStep<>( control(), config, false, store ) ); add( new CalculateDenseNodesStep( control(), config, cache, memoryUsageStatsProvider ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java index 1a9707236f127..2b0087302d1a4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java @@ -43,7 +43,7 @@ public NodeFirstGroupStage( Configuration config, RecordStore( control(), config, true, groupStore, null ) ); + add( new ReadRecordsStep<>( control(), config, true, groupStore ) ); add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence() ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java index 164fd9baba86f..2c24b8aa358e9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeSetFirstGroupStep.java @@ -70,7 +70,11 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th { for ( RelationshipGroupRecord group : batch ) { - assert group.inUse(); + if ( !group.inUse() ) + { + continue; + } + long nodeId = group.getOwningNode(); if ( cache.getByte( nodeId, 0 ) == 0 ) { @@ -88,6 +92,7 @@ protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) th } } } + control.recycle( batch ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java index 39af3b32c9060..80d0234f648e3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ProcessRelationshipCountsDataStep.java @@ -63,9 +63,12 @@ public ProcessRelationshipCountsDataStep( StageControl control, NodeLabelsCache protected void process( RelationshipRecord[] batch, BatchSender sender ) { RelationshipCountsProcessor processor = processor(); - for ( RelationshipRecord relationship : batch ) + for ( RelationshipRecord record : batch ) { - processor.process( relationship.getFirstNode(), relationship.getType(), relationship.getSecondNode() ); + if ( record.inUse() ) + { + processor.process( record ); + } } progressMonitor.progress( batch.length ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java index 5341c301317fb..425909f131d9a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadGroupRecordsByCacheStep.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Supplier; + import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; @@ -28,6 +30,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor; import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep; +import org.neo4j.unsafe.impl.batchimport.staging.RecordDataAssembler; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; import static java.lang.System.nanoTime; @@ -58,9 +61,10 @@ protected void process() } } - private class NodeVisitor implements NodeChangeVisitor, AutoCloseable, GroupVisitor + private class NodeVisitor implements NodeChangeVisitor, AutoCloseable, GroupVisitor, Supplier { - private RelationshipGroupRecord[] batch = new RelationshipGroupRecord[batchSize]; + private final RecordDataAssembler assembler = new RecordDataAssembler<>( store::newRecord, null ); + private RelationshipGroupRecord[] batch = get(); private int cursor; private long time = nanoTime(); @@ -74,13 +78,13 @@ public void change( long nodeId, ByteArray array ) public long visit( long nodeId, int typeId, long out, long in, long loop ) { long id = store.nextId(); - RelationshipGroupRecord record = store.newRecord(); + RelationshipGroupRecord record = batch[cursor++]; record.setId( id ); - batch[cursor++] = record.initialize( true, typeId, out, in, loop, nodeId, loop ); + record.initialize( true, typeId, out, in, loop, nodeId, loop ); if ( cursor == batchSize ) { send(); - batch = new RelationshipGroupRecord[batchSize]; + batch = control.reuse( this ); cursor = 0; } return id; @@ -99,9 +103,16 @@ public void close() { if ( cursor > 0 ) { + batch = assembler.cutOffAt( batch, cursor ); send(); } } + + @Override + public RelationshipGroupRecord[] get() + { + return assembler.newBatchObject( batchSize ); + } } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java index 70720f9b84a1f..7d3a99b963d38 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java @@ -53,7 +53,7 @@ protected void process( T[] batch, BatchSender sender ) if ( !processor.process( item ) ) { // No change for this record - batch[i] = null; + batch[i].setInUse( false ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessor.java index afd80c72abe0f..a7d0231ce3c0e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessor.java @@ -66,12 +66,14 @@ public RelationshipCountsProcessor( NodeLabelsCache nodeLabelCache, this.wildcardCounts = cacheFactory.newLongArray( anyRelationshipType + 1, 0 ); } - public void process( long startNode, int type, long endNode ) + @Override + public boolean process( RelationshipRecord record ) { // Below is logic duplication of CountsState#addRelationship + int type = record.getType(); increment( wildcardCounts, anyRelationshipType ); increment( wildcardCounts, type ); - startScratch = nodeLabelCache.get( client, startNode, startScratch ); + startScratch = nodeLabelCache.get( client, record.getFirstNode(), startScratch ); for ( int startNodeLabelId : startScratch ) { if ( startNodeLabelId == -1 ) @@ -82,7 +84,7 @@ public void process( long startNode, int type, long endNode ) increment( labelsCounts, startNodeLabelId, anyRelationshipType, START ); increment( labelsCounts, startNodeLabelId, type, START ); } - endScratch = nodeLabelCache.get( client, endNode, endScratch ); + endScratch = nodeLabelCache.get( client, record.getSecondNode(), endScratch ); for ( int endNodeLabelId : endScratch ) { if ( endNodeLabelId == -1 ) @@ -93,13 +95,6 @@ public void process( long startNode, int type, long endNode ) increment( labelsCounts, endNodeLabelId, anyRelationshipType, END ); increment( labelsCounts, endNodeLabelId, type, END ); } - } - - @Override - public boolean process( RelationshipRecord record ) - { - process( record.getFirstNode(), record.getType(), record.getSecondNode() ); - // No need to update the store, we're just reading things here return false; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java index fa4eff3fd9ee8..dd0108d83b32e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java @@ -27,8 +27,8 @@ import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; - import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Reads all records from {@link RelationshipStore} and process the counts in them. Uses a {@link NodeLabelsCache} @@ -42,10 +42,10 @@ public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, Rel int highLabelId, int highRelationshipTypeId, CountsAccessor.Updater countsUpdater, NumberArrayFactory cacheFactory, ProgressReporter progressReporter ) { - super( NAME, null, config, 0 ); + super( NAME, null, config, RECYCLE_BATCHES ); add( new BatchFeedStep( control(), config, allIn( relationshipStore, config ), relationshipStore.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, relationshipStore, null ) ); + add( new ReadRecordsStep<>( control(), config, false, relationshipStore ) ); add( new ProcessRelationshipCountsDataStep( control(), cache, config, highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory, progressReporter ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java index ba2af15b7b042..9f5d07d3e52d7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupCache.java @@ -165,7 +165,7 @@ public boolean put( RelationshipGroupRecord groupRecord ) long baseIndex = offsets.get( rebase( nodeId ) ); // grouCount is extra validation, really int groupCount = groupCount( nodeId ); - long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType(), groupRecord.getOwningNode() ); + long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType(), nodeId ); // Put the group at this index cache.setByte( index, 0, (byte) 1 ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java index ac3a7019113a2..542c48e47eb67 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupStage.java @@ -26,6 +26,8 @@ import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; + /** * Takes information about relationship groups in the {@link NodeRelationshipCache}, which is produced * as a side-effect of linking relationships together, and writes them out into {@link RelationshipGroupStore}. @@ -37,7 +39,7 @@ public class RelationshipGroupStage extends Stage public RelationshipGroupStage( String topic, Configuration config, RecordStore store, NodeRelationshipCache cache ) { - super( NAME, topic, config, 0 ); + super( NAME, topic, config, RECYCLE_BATCHES ); add( new ReadGroupRecordsByCacheStep( control(), config, store, cache ) ); add( new UpdateRecordsStep<>( control(), config, store, new StorePrepareIdSequence() ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java index 8ec4d1047d819..aa1f51baf1eed 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java @@ -85,7 +85,7 @@ protected void forkedProcess( int id, int processors, RelationshipRecord[] batch if ( changeCount == -1 ) { // No change for this record, it's OK, all the processors will reach the same conclusion - batch[i] = null; + batch[i].setInUse( false ); } else { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index 90d4916c4b420..c2ace46641acd 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -26,6 +26,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; +import org.neo4j.unsafe.impl.batchimport.staging.RecordDataAssembler; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; @@ -33,6 +34,7 @@ import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.backwards; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Sets {@link RelationshipRecord#setFirstPrevRel(long)} and {@link RelationshipRecord#setSecondPrevRel(long)} @@ -57,10 +59,10 @@ public RelationshipLinkbackStage( String topic, Configuration config, BatchingNe NodeRelationshipCache cache, Predicate readFilter, Predicate changeFilter, int nodeTypes, StatsProvider... additionalStatsProvider ) { - super( NAME, topic, config, ORDER_SEND_DOWNSTREAM ); + super( NAME, topic, config, ORDER_SEND_DOWNSTREAM | RECYCLE_BATCHES ); RelationshipStore store = stores.getRelationshipStore(); add( new BatchFeedStep( control(), config, backwards( 0, store.getHighId(), config ), store.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) ); + add( new ReadRecordsStep<>( control(), config, true, store, new RecordDataAssembler<>( store::newRecord, readFilter ) ) ); add( new RelationshipLinkbackStep( control(), config, cache, changeFilter, nodeTypes, additionalStatsProvider ) ); add( new UpdateRecordsStep<>( control(), config, store, PrepareIdSequence.of( stores.usesDoubleRelationshipRecordUnits() ) ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java index 3f9d2f1acfae9..7bc6c256b1d08 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java @@ -26,6 +26,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; +import org.neo4j.unsafe.impl.batchimport.staging.RecordDataAssembler; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; @@ -33,6 +34,7 @@ import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.forwards; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; public class RelationshipLinkforwardStage extends Stage { @@ -42,10 +44,10 @@ public RelationshipLinkforwardStage( String topic, Configuration config, Batchin NodeRelationshipCache cache, Predicate readFilter, Predicate denseChangeFilter, int nodeTypes, StatsProvider... additionalStatsProvider ) { - super( NAME, topic, config, ORDER_SEND_DOWNSTREAM ); + super( NAME, topic, config, ORDER_SEND_DOWNSTREAM | RECYCLE_BATCHES ); RelationshipStore store = stores.getRelationshipStore(); add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) ); + add( new ReadRecordsStep<>( control(), config, true, store, new RecordDataAssembler<>( store::newRecord, readFilter ) ) ); add( new RelationshipLinkforwardStep( control(), config, cache, denseChangeFilter, nodeTypes, additionalStatsProvider ) ); add( new UpdateRecordsStep<>( control(), config, store, PrepareIdSequence.of( stores.usesDoubleRelationshipRecordUnits() ) ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java index 66995f154a088..ee0128e5de2eb 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java @@ -27,6 +27,7 @@ import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allInReversed; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Scans {@link RelationshipGroupRecord} from store in reverse, this because during import the relationships @@ -44,9 +45,9 @@ public class ScanAndCacheGroupsStage extends Stage public ScanAndCacheGroupsStage( Configuration config, RecordStore store, RelationshipGroupCache cache, StatsProvider... additionalStatsProviders ) { - super( NAME, null, config, 0 ); + super( NAME, null, config, RECYCLE_BATCHES ); add( new BatchFeedStep( control(), config, allInReversed( store, config ), store.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, store, null ) ); + add( new ReadRecordsStep<>( control(), config, false, store ) ); add( new CacheGroupsStep( control(), config, cache, additionalStatsProviders ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java index 4d94bd7bb0edc..13a4d2b4dbd3b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java @@ -28,6 +28,7 @@ import org.neo4j.unsafe.impl.batchimport.store.StorePrepareIdSequence; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.RECYCLE_BATCHES; /** * Updates sparse {@link NodeRecord node records} with relationship heads after relationship linking. Steps: @@ -46,9 +47,9 @@ public class SparseNodeFirstRelationshipStage extends Stage public SparseNodeFirstRelationshipStage( Configuration config, NodeStore nodeStore, NodeRelationshipCache cache ) { - super( NAME, null, config, ORDER_SEND_DOWNSTREAM ); + super( NAME, null, config, ORDER_SEND_DOWNSTREAM | RECYCLE_BATCHES ); add( new ReadNodeIdsByCacheStep( control(), config, cache, NodeType.NODE_TYPE_SPARSE ) ); - add( new ReadRecordsStep<>( control(), config, true, nodeStore, null ) ); + add( new ReadRecordsStep<>( control(), config, true, nodeStore ) ); add( new RecordProcessorStep<>( control(), "LINK", config, new SparseNodeFirstRelationshipProcessor( cache ), false ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore, new StorePrepareIdSequence() ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java index ea634b6b38154..74f8053f70bb3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java @@ -48,7 +48,7 @@ public abstract class AbstractStep implements Step { public static final ParkStrategy PARK = new ParkStrategy.Park( IS_OS_WINDOWS ? 10_000 : 500, MICROSECONDS ); - private final StageControl control; + protected final StageControl control; private volatile String name; @SuppressWarnings( "rawtypes" ) protected volatile Step downstream; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java index ab8d0fefa6a21..809464449886a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java @@ -154,7 +154,6 @@ public long receive( long ticket, T batch ) protected abstract void forkedProcess( int id, int processors, T batch ) throws Throwable; - @SuppressWarnings( "unchecked" ) void sendDownstream( Unit unit ) { downstreamIdleTime.add( downstream.receive( unit.ticket, unit.batch ) ); @@ -234,6 +233,10 @@ public void run() { sendDownstream( candidate ); } + else + { + control.recycle( candidate.batch ); + } current = candidate; tail.set( current ); queuedBatches.decrementAndGet(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index 4e60c9d877fbb..d24d551777e3f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -86,6 +86,7 @@ public long receive( final long ticket, final T batch ) // No batches were emitted so we couldn't track done batches in that way. // We can see that we're the last step so increment here instead doneBatches.incrementAndGet(); + control.recycle( batch ); } totalProcessingTime.add( nanoTime() - startTime - sender.sendTime ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java index 408ae4468447e..a38616687fdf1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java @@ -19,15 +19,10 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; -import java.lang.reflect.Array; -import java.util.Arrays; -import java.util.function.Predicate; - import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.io.pagecache.PageCursor; import org.neo4j.kernel.impl.store.RecordCursor; import org.neo4j.kernel.impl.store.RecordStore; -import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.unsafe.impl.batchimport.Configuration; @@ -42,18 +37,20 @@ public class ReadRecordsStep extends ProcessorStep { private final RecordStore store; - private final Class klass; - private final Predicate filter; private final int batchSize; + private final RecordDataAssembler assembler; + + public ReadRecordsStep( StageControl control, Configuration config, boolean inRecordWritingStage, RecordStore store ) + { + this( control, config, inRecordWritingStage, store, new RecordDataAssembler<>( store::newRecord, record -> true ) ); + } - @SuppressWarnings( "unchecked" ) public ReadRecordsStep( StageControl control, Configuration config, boolean inRecordWritingStage, - RecordStore store, Predicate filter ) + RecordStore store, RecordDataAssembler converter ) { super( control, ">", config, parallelReading( config, inRecordWritingStage ) ? 0 : 1 ); this.store = store; - this.filter = filter; - this.klass = (Class) store.newRecord().getClass(); + this.assembler = converter; this.batchSize = config.batchSize(); } @@ -78,17 +75,18 @@ protected void process( PrimitiveLongIterator idRange, BatchSender sender ) thro } long id = idRange.next(); - RECORD record = store.newRecord(); - RECORD[] batch = (RECORD[]) Array.newInstance( klass, batchSize ); + RECORD[] batch = control.reuse( () -> assembler.newBatchObject( batchSize ) ); int i = 0; - try ( RecordCursor cursor = store.newRecordCursor( record ).acquire( id, RecordLoad.CHECK ) ) + // Just use the first record in the batch here to satisfy the record cursor. + // The truth is that we'll be using the read method which accepts an external record anyway so it doesn't matter. + try ( RecordCursor cursor = store.newRecordCursor( batch[0] ).acquire( id, RecordLoad.CHECK ) ) { boolean hasNext = true; while ( hasNext ) { - if ( cursor.next( id ) && !IdValidator.isReservedId( record.getId() ) && (filter == null || filter.test( record )) ) + if ( assembler.append( cursor, batch, id, i ) ) { - batch[i++] = (RECORD) record.clone(); + i++; } if ( hasNext = idRange.hasNext() ) { @@ -97,6 +95,6 @@ protected void process( PrimitiveLongIterator idRange, BatchSender sender ) thro } } - sender.send( i == batchSize ? batch : Arrays.copyOf( batch, i ) ); + sender.send( assembler.cutOffAt( batch, i ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/RecordDataAssembler.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/RecordDataAssembler.java new file mode 100644 index 0000000000000..b41d5e9d45ee9 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/RecordDataAssembler.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import java.lang.reflect.Array; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import org.neo4j.kernel.impl.store.RecordCursor; +import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; +import org.neo4j.kernel.impl.store.record.RecordLoad; + +public class RecordDataAssembler +{ + private final Supplier factory; + private final Class klass; + private final Predicate filter; + + @SuppressWarnings( "unchecked" ) + public RecordDataAssembler( Supplier factory, Predicate filter ) + { + this.factory = factory; + this.filter = filter; + this.klass = (Class) factory.get().getClass(); + } + + @SuppressWarnings( "unchecked" ) + public RECORD[] newBatchObject( int batchSize ) + { + Object array = Array.newInstance( klass, batchSize ); + for ( int i = 0; i < batchSize; i++ ) + { + Array.set( array, i, factory.get() ); + } + return (RECORD[]) array; + } + + public boolean append( RecordCursor cursor, RECORD[] array, long id, int index ) + { + RECORD record = array[index]; + if ( !cursor.next( id, record, RecordLoad.CHECK ) || (filter != null && !filter.test( record )) ) + { + return false; + } + return true; + } + + public RECORD[] cutOffAt( RECORD[] array, int length ) + { + for ( int i = length; i < array.length; i++ ) + { + array[i].clear(); + } + return array; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Stage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Stage.java index 38ad818e22e7a..1a690bd558bea 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Stage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Stage.java @@ -92,9 +92,16 @@ public void close() } } } + execution.close(); if ( exception != null ) { throw launderedException( exception ); } } + + @Override + public String toString() + { + return execution.getStageName(); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageControl.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageControl.java index 9f66ff4f34991..7540e30c1bec3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageControl.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageControl.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; +import java.util.function.Supplier; + /** * Represents a means to control and coordinate lifecycle matters about a {@link Stage} and all its * {@link Step steps}. @@ -28,4 +30,8 @@ public interface StageControl void panic( Throwable cause ); void assertHealthy(); + + void recycle( Object batch ); + + T reuse( Supplier fallback ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageExecution.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageExecution.java index 0d1de77262bce..ec429ed2dd20d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageExecution.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/StageExecution.java @@ -24,6 +24,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Supplier; import org.neo4j.helpers.collection.Pair; import org.neo4j.helpers.collection.PrefetchingIterator; @@ -37,7 +39,7 @@ /** * Default implementation of {@link StageControl} */ -public class StageExecution implements StageControl +public class StageExecution implements StageControl, AutoCloseable { private final String stageName; private final String part; @@ -46,6 +48,8 @@ public class StageExecution implements StageControl private long startTime; private final int orderingGuarantees; private volatile Throwable panic; + private final boolean shouldRecycle; + private final ConcurrentLinkedQueue recycled; public StageExecution( String stageName, String part, Configuration config, Collection> pipeline, int orderingGuarantees ) @@ -55,6 +59,8 @@ public StageExecution( String stageName, String part, Configuration config, Coll this.config = config; this.pipeline = pipeline; this.orderingGuarantees = orderingGuarantees; + this.shouldRecycle = (orderingGuarantees & Step.RECYCLE_BATCHES) != 0; + this.recycled = shouldRecycle ? new ConcurrentLinkedQueue<>() : null; } public boolean stillExecuting() @@ -190,4 +196,38 @@ public String toString() { return getClass().getSimpleName() + "[" + name() + "]"; } + + @Override + public void recycle( Object batch ) + { + if ( shouldRecycle ) + { + recycled.offer( batch ); + } + } + + @Override + public T reuse( Supplier fallback ) + { + if ( shouldRecycle ) + { + @SuppressWarnings( "unchecked" ) + T result = (T) recycled.poll(); + if ( result != null ) + { + return result; + } + } + + return fallback.get(); + } + + @Override + public void close() + { + if ( shouldRecycle ) + { + recycled.clear(); + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java index d8352f113a958..fdfa191b66d15 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java @@ -42,6 +42,7 @@ public interface Step extends Parallelizable, AutoCloseable, Panicable * Whether or not tickets arrive in {@link #receive(long, Object)} ordered by ticket number. */ int ORDER_SEND_DOWNSTREAM = 0x1; + int RECYCLE_BATCHES = 0x2; /** * Starts the processing in this step, such that calls to {@link #receive(long, Object)} can be accepted. diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessorTest.java index fd796be788795..9cda3c32f1340 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessorTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsProcessorTest.java @@ -23,6 +23,7 @@ import org.mockito.ArgumentMatcher; import org.neo4j.kernel.impl.api.CountsAccessor; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; @@ -39,8 +40,8 @@ public class RelationshipCountsProcessorTest { private static final int ANY = -1; - private NodeLabelsCache nodeLabelCache = mock( NodeLabelsCache.class ); - private CountsAccessor.Updater countsUpdater = mock( CountsAccessor.Updater.class ); + private final NodeLabelsCache nodeLabelCache = mock( NodeLabelsCache.class ); + private final CountsAccessor.Updater countsUpdater = mock( CountsAccessor.Updater.class ); @Test public void shouldHandleBigNumberOfLabelsAndRelationshipTypes() throws Exception @@ -89,8 +90,8 @@ public void testRelationshipCountersUpdates() RelationshipCountsProcessor countsProcessor = new RelationshipCountsProcessor( nodeLabelCache, labels, relationTypes, countsUpdater, NumberArrayFactory.AUTO_WITHOUT_PAGECACHE ); - countsProcessor.process( 1, 0, 3 ); - countsProcessor.process( 2, 1, 4 ); + countsProcessor.process( record( 1, 0, 3 ) ); + countsProcessor.process( record( 2, 1, 4 ) ); countsProcessor.done(); @@ -116,4 +117,14 @@ public boolean matches( Long argument ) return argument != null && argument >= 0; } } + + private RelationshipRecord record( long startNode, int type, long endNode ) + { + RelationshipRecord record = new RelationshipRecord( 0 ); + record.setInUse( true ); + record.setFirstNode( startNode ); + record.setSecondNode( endNode ); + record.setType( type ); + return record; + } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java index dcbe5253088b0..f086020e53d12 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport.staging; +import java.util.function.Supplier; + import org.neo4j.helpers.Exceptions; /** @@ -54,4 +56,15 @@ public void assertHealthy() throw Exceptions.launderedException( panic ); } } + + @Override + public void recycle( Object batch ) + { + } + + @Override + public T reuse( Supplier fallback ) + { + return fallback.get(); + } }