diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java deleted file mode 100644 index 89011def17d97..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import java.io.IOException; - -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; -import org.neo4j.unsafe.impl.batchimport.input.Collector; -import org.neo4j.unsafe.impl.batchimport.input.Input; -import org.neo4j.unsafe.impl.batchimport.input.InputCache; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.Stage; -import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; - -import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; - -/** - * Counts number of relationships per node that is going to be imported by {@link RelationshipStage} later. - * Dense node threshold is calculated based on these counts, so that correct relationship representation can be written - * per node. Steps: - * - *
    - *
  1. {@link InputIteratorBatcherStep} reading from {@link InputIterator} produced from {@link Input#relationships()}. - *
  2. - *
  3. {@link InputEntityCacherStep} alternatively {@link InputCache caches} this input data - * (all the {@link InputRelationship input relationships}) if the iterator doesn't support - * {@link InputIterable#supportsMultiplePasses() multiple passes}.
  4. - *
  5. {@link RelationshipTypeCheckerStep} keeps track of all different types of all - * {@link InputRelationship input relationships} so that the upcoming relationship import knows which - * types to import, i.e. how to split the import.
  6. - *
  7. {@link RelationshipPreparationStep} looks up {@link InputRelationship#startNode() start node input id} / - * {@link InputRelationship#endNode() end node input id} from {@link IdMapper} and attaches to the batches going - * through because that lookup is costly and this step can be parallelized.
  8. - *
  9. {@link CalculateRelationshipsStep} simply counts the input relationships going through and in the - * end sets that count as high id in relationship store, this to more predictably create secondary record units - * for those records that require it.
  10. - *
  11. For each node id {@link NodeRelationshipCache#incrementCount(long) updates the node->relationship cache} - * so that in the end we will know how many relationships each node in the import will have and hence also - * which nodes will have a dense representation in the store.
  12. - *
- */ -public class CalculateDenseNodesStage extends Stage -{ - private RelationshipTypeCheckerStep typer; - - public CalculateDenseNodesStage( Configuration config, InputIterable relationships, - NodeRelationshipCache cache, IdMapper idMapper, - Collector badCollector, InputCache inputCache, - BatchingNeoStores neoStores ) throws IOException - { - super( "Calculate dense nodes", config ); - add( new InputIteratorBatcherStep<>( control(), config, - relationships.iterator(), InputRelationship.class, t -> true ) ); - if ( !relationships.supportsMultiplePasses() ) - { - add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships( MAIN ) ) ); - } - add( typer = new RelationshipTypeCheckerStep( control(), config, neoStores.getRelationshipTypeRepository() ) ); - add( new RelationshipPreparationStep( control(), config, idMapper ) ); - add( new CalculateRelationshipsStep( control(), config, neoStores.getRelationshipStore() ) ); - add( new CalculateDenseNodesStep( control(), config, cache, badCollector ) ); - } - - public RelationshipTypeDistribution getDistribution() - { - return typer.getDistribution(); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java deleted file mode 100644 index fcddb5f46da9c..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.kernel.impl.store.RelationshipStore; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; - -/** - * Keeps track of number of relationships to import, this to set highId in relationship store before import. - * This is because of the way double-unit records works, so the secondary units will end up beyond this limit. - */ -public class CalculateRelationshipsStep extends ProcessorStep> -{ - private final RelationshipStore relationshipStore; - private long numberOfRelationships; - - public CalculateRelationshipsStep( StageControl control, Configuration config, RelationshipStore relationshipStore ) - { - super( control, "RelationshipCalculator", config, 1 ); - this.relationshipStore = relationshipStore; - } - - @Override - protected void process( Batch batch, BatchSender sender ) throws Throwable - { - numberOfRelationships += batch.input.length; - sender.send( batch ); - } - - @Override - protected void done() - { - long highestId = relationshipStore.getHighId() + numberOfRelationships; - relationshipStore.setHighestPossibleIdInUse( highestId ); - super.done(); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java index 3bff6fea33678..6a814ca2a6014 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java @@ -44,7 +44,7 @@ public CountGroupsStage( Configuration config, RecordStore( control(), config, false, store ) ); + add( new ReadRecordsStep<>( control(), config, false, store, null ) ); add( new CountGroupsStep( control(), config, groupCache ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java index 8e448dcbc6aad..088958e21be73 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java @@ -40,7 +40,7 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n { super( "Node counts", config ); add( new BatchFeedStep( control(), config, allIn( nodeStore, config ), nodeStore.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, nodeStore ) ); + add( new ReadRecordsStep<>( control(), config, false, nodeStore, null ) ); add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java index 7b6f32b39e63a..2a986c811db65 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java @@ -40,7 +40,7 @@ public NodeFirstGroupStage( Configuration config, RecordStore Group", config ); add( new BatchFeedStep( control(), config, allIn( groupStore, config ), groupStore.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, true, groupStore ) ); + add( new ReadRecordsStep<>( control(), config, true, groupStore, null ) ); add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 2326dd9412fc3..025876b92df1e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -25,6 +25,8 @@ import java.util.Iterator; import java.util.function.Predicate; +import org.neo4j.collection.primitive.Primitive; +import org.neo4j.collection.primitive.PrimitiveIntSet; import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Format; @@ -38,6 +40,7 @@ import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.kernel.impl.store.format.RecordFormatSelector; import org.neo4j.kernel.impl.store.format.RecordFormats; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.logging.Log; import org.neo4j.logging.NullLogProvider; import org.neo4j.unsafe.impl.batchimport.cache.GatheringMemoryStatsVisitor; @@ -57,6 +60,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; +import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; import static java.lang.Long.max; @@ -174,8 +178,6 @@ public void doImport( Input input ) throws IOException InputIterable nodes = input.nodes(); InputIterable relationships = input.relationships(); InputIterable cachedNodes = cachedForSure( nodes, inputCache.nodes( MAIN, true ) ); - InputIterable cachedRelationships = - cachedForSure( relationships, inputCache.relationships( MAIN, false ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); @@ -198,17 +200,17 @@ public void doImport( Input input ) throws IOException } } - // Stage 2 -- calculate dense node threshold Configuration relationshipConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); - CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( - relationshipConfig, - relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); - executeStage( calculateDenseNodesStage ); + RelationshipStage unlinkedRelationshipStage = + new RelationshipStage( relationshipConfig, writeMonitor, relationships, idMapper, + badCollector, inputCache, nodeRelationshipCache, neoStore, storeUpdateMonitor ); + neoStore.startFlushingPageCache(); + executeStage( unlinkedRelationshipStage ); + neoStore.stopFlushingPageCache(); long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper ); - importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor, - idMapper, cachedRelationships, calculateDenseNodesStage.getDistribution(), + linkRelationships( nodeRelationshipCache, neoStore, unlinkedRelationshipStage.getDistribution(), availableMemory ); // Release this potentially really big piece of cached data @@ -283,10 +285,9 @@ private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users ) return total.getHeapUsage() + total.getOffHeapUsage(); } - private void importRelationships( NodeRelationshipCache nodeRelationshipCache, - CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore, - IoMonitor writeMonitor, IdMapper idMapper, InputIterable relationships, - RelationshipTypeDistribution typeDistribution, long freeMemoryForDenseNodeCache ) + private void linkRelationships( NodeRelationshipCache nodeRelationshipCache, + BatchingNeoStores neoStore, RelationshipTypeDistribution typeDistribution, + long freeMemoryForDenseNodeCache ) { // Imports the relationships from the Input. This isn't a straight forward as importing nodes, // since keeping track of and updating heads of relationship chains in scenarios where most nodes @@ -300,7 +301,6 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, // finally there will be one Node --> Relationship and Relationship --> Relationship stage linking // all sparse relationship chains together. - long nextRelationshipId = 0; Configuration relationshipConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() ); Configuration nodeConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getNodeStore() ); @@ -309,7 +309,7 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, Configuration groupConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipGroupStore() ); - // Do multiple rounds of relationship importing. Each round fits as many relationship types + // Do multiple rounds of relationship linking. Each round fits as many relationship types // as it can (comparing with worst-case memory usage and available memory). int typesImported = 0; int round = 0; @@ -318,57 +318,64 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, // Figure out which types we can fit in node-->relationship cache memory. // Types go from biggest to smallest group and so towards the end there will be // smaller and more groups per round in this loop - Collection typesToImportThisRound = rounds.next(); - boolean thisIsTheOnlyRound = round == 0 && !rounds.hasNext(); + Collection typesToLinkThisRound = rounds.next(); + boolean thisIsTheFirstRound = round == 0; + boolean thisIsTheOnlyRound = thisIsTheFirstRound && !rounds.hasNext(); - // Import relationships and their properties nodeRelationshipCache.setForwardScan( true, true/*dense*/ ); - String range = typesToImportThisRound.size() == 1 + String range = typesToLinkThisRound.size() == 1 ? String.valueOf( typesImported + 1 ) - : (typesImported + 1) + "-" + (typesImported + typesToImportThisRound.size()); + : (typesImported + 1) + "-" + (typesImported + typesToLinkThisRound.size()); String topic = " " + range + "/" + typeDistribution.getNumberOfRelationshipTypes(); - Predicate typeFilter = thisIsTheOnlyRound - ? relationship -> true // optimization when all rels are imported in this round - : relationship -> typesToImportThisRound.contains( relationship.typeAsObject() ); - RelationshipStage relationshipStage = new RelationshipStage( topic, config, - writeMonitor, typeFilter, relationships.iterator(), idMapper, neoStore, - nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId ); - neoStore.startFlushingPageCache(); - executeStage( relationshipStage ); - neoStore.stopFlushingPageCache(); + int nodeTypes = thisIsTheFirstRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE; + Predicate readFilter = thisIsTheFirstRound + ? null // optimization when all rels are imported in this round + : typeIdFilter( typesToLinkThisRound, neoStore.getRelationshipTypeRepository() ); + Predicate denseChangeFilter = thisIsTheOnlyRound + ? null // optimization when all rels are imported in this round + : typeIdFilter( typesToLinkThisRound, neoStore.getRelationshipTypeRepository() ); - int nodeTypes = thisIsTheOnlyRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE; + // LINK Forward + RelationshipLinkforwardStage linkForwardStage = new RelationshipLinkforwardStage( topic, relationshipConfig, + neoStore.getRelationshipStore(), nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes ); + executeStage( linkForwardStage ); // Write relationship groups cached from the relationship import above executeStage( new RelationshipGroupStage( topic, groupConfig, neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache ) ); - // Set node nextRel fields - executeStage( new SparseNodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - nodeRelationshipCache ) ); + if ( thisIsTheFirstRound ) + { + // Set node nextRel fields for sparse nodes + executeStage( new SparseNodeFirstRelationshipStage( nodeConfig, neoStore.getNodeStore(), + nodeRelationshipCache ) ); + } - // Link relationship chains together for dense nodes + // LINK backward nodeRelationshipCache.setForwardScan( false, true/*dense*/ ); - executeStage( new RelationshipLinkbackStage( topic, - relationshipConfig, - neoStore.getRelationshipStore(), - nodeRelationshipCache, nextRelationshipId, - relationshipStage.getNextRelationshipId(), nodeTypes ) ); - nextRelationshipId = relationshipStage.getNextRelationshipId(); - typesImported += typesToImportThisRound.size(); + executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), + nodeRelationshipCache, readFilter, denseChangeFilter, nodeTypes ) ); + typesImported += typesToLinkThisRound.size(); } + } - // There's an optimization above which will piggy-back sparse linking on the dense linking - // if all relationships are imported in one round. The sparse linking below will be done if - // there were multiple passes of dense linking above. - if ( round > 1 ) + private static Predicate typeIdFilter( Collection typesToLinkThisRound, + BatchingRelationshipTypeTokenRepository relationshipTypeRepository ) + { + PrimitiveIntSet set = Primitive.intSet( typesToLinkThisRound.size() ); + for ( Object type : typesToLinkThisRound ) { - // Link relationship chains together for sparse nodes - nodeRelationshipCache.setForwardScan( false, false/*sparse*/ ); - executeStage( new RelationshipLinkbackStage( " Sparse", relationshipConfig, - neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId, - NodeType.NODE_TYPE_SPARSE ) ); + int id; + if ( type instanceof Number ) + { + id = ((Number) type).intValue(); + } + else + { + id = relationshipTypeRepository.applyAsInt( type ); + } + set.add( id ); } - // else we did in the single round above to avoid doing another pass + return relationship -> set.contains( relationship.getType() ); } private static Configuration configWithRecordsPerPageBasedBatchSize( Configuration source, RecordStore store ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java index 4371430ccf00d..fdee421831d34 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipCountsStage.java @@ -43,7 +43,7 @@ public RelationshipCountsStage( Configuration config, NodeLabelsCache cache, super( "Relationship counts", config ); add( new BatchFeedStep( control(), config, allIn( relationshipStore, config ), relationshipStore.getRecordSize() ) ); - add( new ReadRecordsStep<>( control(), config, false, relationshipStore ) ); + add( new ReadRecordsStep<>( control(), config, false, relationshipStore, null ) ); add( new ProcessRelationshipCountsDataStep( control(), cache, config, highLabelId, highRelationshipTypeId, countsUpdater, cacheFactory ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java new file mode 100644 index 0000000000000..b6a9be20f1a6b --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkStep.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.function.Predicate; + +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; +import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +/** + * Links relationship chains together, the "prev" pointers of them. "next" pointers are set when + * initially creating the relationship records. Setting prev pointers at that time would incur + * random access and so that is done here separately with help from {@link NodeRelationshipCache}. + */ +public abstract class RelationshipLinkStep extends ForkedProcessorStep +{ + protected final NodeRelationshipCache cache; + private final int nodeTypes; + private final Predicate filter; + private final boolean forwards; + + public RelationshipLinkStep( StageControl control, Configuration config, + NodeRelationshipCache cache, Predicate filter, int nodeTypes, boolean forwards ) + { + super( control, "LINK", config, 0 ); + this.cache = cache; + this.filter = filter; + this.nodeTypes = nodeTypes; + this.forwards = forwards; + } + + @Override + protected void forkedProcess( int id, int processors, RelationshipRecord[] batch ) + { + int stride = forwards ? 1 : -1; + int start = forwards ? 0 : batch.length - 1; + int end = forwards ? batch.length : -1; + + for ( int i = start; i != end; i += stride ) + { + RelationshipRecord item = batch[i]; + if ( item != null && item.inUse() ) + { + if ( !process( item, id, processors ) ) + { + // No change for this record, it's OK, all the processors will reach the same conclusion + batch[i] = null; + } + } + } + } + + public boolean process( RelationshipRecord record, int id, int processors ) + { + long startNode = record.getFirstNode(); + long endNode = record.getSecondNode(); + boolean processFirst = startNode % processors == id; + boolean processSecond = endNode % processors == id; + if ( !processFirst && !processSecond ) + { + // We won't process this relationship, but we cannot return false because that means + // that it won't even be updated. Arriving here merely means that this thread won't process + // this record at all and so we won't even have to ask cache about dense or not (which is costly) + return true; + } + + boolean firstIsDense = cache.isDense( startNode ); + boolean changed = false; + boolean isLoop = startNode == endNode; + if ( isLoop ) + { + // Both start/end node + if ( shouldChange( firstIsDense, record ) ) + { + if ( processFirst ) + { + linkLoop( record ); + } + changed = true; + } + } + else + { + // Start node + if ( shouldChange( firstIsDense, record ) ) + { + if ( processFirst ) + { + linkStart( record ); + } + changed = true; + } + + // End node + boolean secondIsDense = cache.isDense( endNode ); + if ( shouldChange( secondIsDense, record ) ) + { + if ( processSecond ) + { + linkEnd( record ); + } + changed = true; + } + } + + return changed; + } + + protected abstract void linkStart( RelationshipRecord record ); + + protected abstract void linkEnd( RelationshipRecord record ); + + protected abstract void linkLoop( RelationshipRecord record ); + + private boolean shouldChange( boolean isDense, RelationshipRecord record ) + { + if ( !NodeType.matchesDense( nodeTypes, isDense ) ) + { + return false; + } + // Here we have a special case where we want to filter on type, but only for dense nodes + if ( isDense && filter != null && !filter.test( record ) ) + { + return false; + } + return true; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index a6d374d43d3f3..4dec2b353d73e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Predicate; + import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; @@ -47,13 +49,13 @@ public class RelationshipLinkbackStage extends Stage { public RelationshipLinkbackStage( String topic, Configuration config, RelationshipStore store, - NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, int nodeTypes ) + NodeRelationshipCache cache, Predicate readFilter, + Predicate changeFilter, int nodeTypes ) { - super( "Relationship --> Relationship" + topic, config ); - add( new BatchFeedStep( control(), config, backwards( lowRelationshipId, highRelationshipId, config ), - store.getRecordSize()) ); - add( new ReadRecordsStep<>( control(), config, true, store ) ); - add( new RelationshipLinkbackStep( control(), config, cache, nodeTypes ) ); + super( "Relationship <-- Relationship" + topic, config ); + add( new BatchFeedStep( control(), config, backwards( 0, store.getHighId(), config ), store.getRecordSize()) ); + add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) ); + add( new RelationshipLinkbackStep( control(), config, cache, changeFilter, nodeTypes ) ); add( new UpdateRecordsStep<>( control(), config, store ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java index a15d113787313..3bf9dc7d00f08 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java @@ -19,11 +19,11 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Predicate; + import org.neo4j.graphdb.Direction; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.cache.NodeType; -import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper.ID_NOT_FOUND; @@ -33,110 +33,55 @@ * initially creating the relationship records. Setting prev pointers at that time would incur * random access and so that is done here separately with help from {@link NodeRelationshipCache}. */ -public class RelationshipLinkbackStep extends ForkedProcessorStep +public class RelationshipLinkbackStep extends RelationshipLinkStep { - private final NodeRelationshipCache cache; - private final int nodeTypes; - public RelationshipLinkbackStep( StageControl control, Configuration config, - NodeRelationshipCache cache, int nodeTypes ) + NodeRelationshipCache cache, Predicate filter, int nodeTypes ) { - super( control, "LINK", config, 0 ); - this.cache = cache; - this.nodeTypes = nodeTypes; + super( control, config, cache, filter, nodeTypes, false ); } @Override - protected void forkedProcess( int id, int processors, RelationshipRecord[] batch ) + protected void linkStart( RelationshipRecord record ) { - for ( int i = batch.length - 1; i >= 0; i-- ) - { - RelationshipRecord item = batch[i]; - if ( item != null && item.inUse() ) - { - if ( !process( item, id, processors ) ) - { - // No change for this record, it's OK, all the processors will reach the same conclusion - batch[i] = null; - } - } + int typeId = record.getType(); + long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), + typeId, Direction.OUTGOING, record.getId(), false ); + if ( firstPrevRel == ID_NOT_FOUND ) + { // First one + record.setFirstInFirstChain( true ); + firstPrevRel = cache.getCount( record.getFirstNode(), typeId, Direction.OUTGOING ); } + record.setFirstPrevRel( firstPrevRel ); } - public boolean process( RelationshipRecord record, int id, int processors ) + @Override + protected void linkEnd( RelationshipRecord record ) { - boolean processFirst = record.getFirstNode() % processors == id; - boolean processSecond = record.getSecondNode() % processors == id; - if ( !processFirst && !processSecond ) - { - // We won't process this relationship, but we cannot return false because that means - // that it won't even be updated. Arriving here merely means that this thread won't process - // this record at all and so we won't even have to ask cache about dense or not (which is costly) - return true; - } - - boolean firstIsDense = cache.isDense( record.getFirstNode() ); - boolean changed = false; - boolean isLoop = record.getFirstNode() == record.getSecondNode(); int typeId = record.getType(); - if ( isLoop ) - { - if ( NodeType.matchesDense( nodeTypes, firstIsDense ) ) - { - if ( processFirst ) - { - long prevRel = cache.getAndPutRelationship( record.getFirstNode(), - typeId, Direction.BOTH, record.getId(), false ); - if ( prevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInFirstChain( true ); - record.setFirstInSecondChain( true ); - prevRel = cache.getCount( record.getFirstNode(), typeId, Direction.BOTH ); - } - record.setFirstPrevRel( prevRel ); - record.setSecondPrevRel( prevRel ); - } - changed = true; - } + long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), + typeId, Direction.INCOMING, record.getId(), false ); + if ( secondPrevRel == ID_NOT_FOUND ) + { // First one + record.setFirstInSecondChain( true ); + secondPrevRel = cache.getCount( record.getSecondNode(), typeId, Direction.INCOMING ); } - else - { - // Start node - if ( NodeType.matchesDense( nodeTypes, firstIsDense ) ) - { - if ( processFirst ) - { - long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), - typeId, Direction.OUTGOING, record.getId(), false ); - if ( firstPrevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInFirstChain( true ); - firstPrevRel = cache.getCount( record.getFirstNode(), typeId, Direction.OUTGOING ); - } - record.setFirstPrevRel( firstPrevRel ); - } - changed = true; - } + record.setSecondPrevRel( secondPrevRel ); + } - // End node - boolean secondIsDense = cache.isDense( record.getSecondNode() ); - if ( NodeType.matchesDense( nodeTypes, secondIsDense ) ) - { - if ( processSecond ) - { - long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), - typeId, Direction.INCOMING, record.getId(), false ); - if ( secondPrevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInSecondChain( true ); - secondPrevRel = cache.getCount( record.getSecondNode(), typeId, Direction.INCOMING ); - } - record.setSecondPrevRel( secondPrevRel ); - } - changed = true; - } + @Override + protected void linkLoop( RelationshipRecord record ) + { + int typeId = record.getType(); + long prevRel = cache.getAndPutRelationship( record.getFirstNode(), + typeId, Direction.BOTH, record.getId(), false ); + if ( prevRel == ID_NOT_FOUND ) + { // First one + record.setFirstInFirstChain( true ); + record.setFirstInSecondChain( true ); + prevRel = cache.getCount( record.getFirstNode(), typeId, Direction.BOTH ); } - - return changed; + record.setFirstPrevRel( prevRel ); + record.setSecondPrevRel( prevRel ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java new file mode 100644 index 0000000000000..d147caf7fbb81 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStage.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.function.Predicate; + +import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.staging.BatchFeedStep; +import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; +import org.neo4j.unsafe.impl.batchimport.staging.Stage; + +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.forwards; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; + +public class RelationshipLinkforwardStage extends Stage +{ + public RelationshipLinkforwardStage( String topic, Configuration config, RelationshipStore store, + NodeRelationshipCache cache, Predicate readFilter, + Predicate denseChangeFilter, int nodeTypes ) + { + super( "Relationship --> Relationship " + topic, config, ORDER_SEND_DOWNSTREAM ); + add( new BatchFeedStep( control(), config, forwards( 0, store.getHighId(), config ), store.getRecordSize()) ); + add( new ReadRecordsStep<>( control(), config, true, store, readFilter ) ); + add( new RelationshipLinkforwardStep( control(), config, cache, denseChangeFilter, nodeTypes ) ); + add( new UpdateRecordsStep<>( control(), config, store ) ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStep.java new file mode 100644 index 0000000000000..61caaccb4ed7c --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkforwardStep.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.function.Predicate; + +import org.neo4j.graphdb.Direction; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +import static org.neo4j.graphdb.Direction.BOTH; + +public class RelationshipLinkforwardStep extends RelationshipLinkStep +{ + public RelationshipLinkforwardStep( StageControl control, Configuration config, NodeRelationshipCache cache, + Predicate filter, int nodeTypes ) + { + super( control, config, cache, filter, nodeTypes, true ); + } + + @Override + protected void linkStart( RelationshipRecord record ) + { + long firstNextRel = cache.getAndPutRelationship( record.getFirstNode(), + record.getType(), Direction.OUTGOING, record.getId(), true ); + record.setFirstNextRel( firstNextRel ); + } + + @Override + protected void linkEnd( RelationshipRecord record ) + { + long secondNextRel = cache.getAndPutRelationship( record.getSecondNode(), + record.getType(), Direction.INCOMING, record.getId(), true ); + record.setSecondNextRel( secondNextRel ); + } + + @Override + protected void linkLoop( RelationshipRecord record ) + { + long firstNextRel = cache.getAndPutRelationship( + record.getFirstNode(), record.getType(), BOTH, record.getId(), true ); + record.setFirstNextRel( firstNextRel ); + record.setSecondNextRel( firstNextRel ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index ebcf707272e2e..c0c681072dc88 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -19,84 +19,53 @@ */ package org.neo4j.unsafe.impl.batchimport; -import java.util.function.Predicate; +import java.io.IOException; -import org.neo4j.graphdb.Direction; import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.RelationshipStore; -import org.neo4j.kernel.impl.store.record.PropertyBlock; -import org.neo4j.kernel.impl.store.record.PropertyRecord; -import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; -import org.neo4j.unsafe.impl.batchimport.input.Input; -import org.neo4j.unsafe.impl.batchimport.input.InputNode; +import org.neo4j.unsafe.impl.batchimport.input.Collector; +import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; +import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; -/** - * Imports the initial part of relationships, namely the relationship record itself and its properties. - * Only the "next" pointers are set at this stage. The "prev" pointers are set in a - * {@link RelationshipLinkbackStage later stage} to avoid random store access. Steps: - * - *
    - *
  1. {@link InputIteratorBatcherStep} reading from {@link InputIterator} produced from {@link Input#relationships()}. - *
  2. - *
  3. {@link AssignRelationshipIdBatchStep} assigns record ids to batches. This to have one source of id allocation - * since there are later steps which can be multi-threaded.
  4. - *
  5. {@link RelationshipPreparationStep} looks up {@link InputRelationship#startNode() start node input id} / - * {@link InputRelationship#endNode() end node input id} from {@link IdMapper} and attaches to the batches going - * through because that lookup is costly and this step can be parallelized.
  6. - *
  7. {@link RelationshipRecordPreparationStep} creates {@link RelationshipRecord relationship record instances} - * and initializes them with default values suitable for import.
  8. - *
  9. {@link PropertyEncoderStep} encodes properties from {@link InputNode input nodes} into {@link PropertyBlock}, - * low level kernel encoded values.
  10. - *
  11. {@link RelationshipEncoderStep} sets "next" pointers in {@link RelationshipRecord} by getting id - * from {@link NodeRelationshipCache} based on node id and {@link Direction} and at the same time updating - * that cache entry to the id of the relationship. This forms the relationship chain linked lists.
  12. - *
  13. {@link EntityStoreUpdaterStep} forms {@link PropertyRecord property records} out of previously encoded - * {@link PropertyBlock} and writes those as well as the {@link RelationshipRecord} to store.
  14. - *
- * - * This stage can be run multiple times, once per relationship type and new relationships are being appended - * to the end of the store, that's why this stage accepts a relationship id to start at (firstRelationshipId). - * - * It is also to be said that the relationship type ids are imported descending, i.e. w/ the highest type id first - * down to the lowest last. This simply because all records (even {@link RelationshipGroupRecord relationship groups}) - * are appended to the end, only "next" pointers can be provided in new records in the face of the sequential-only - * I/O restriction and {@link RelationshipGroupRecord} chains must be in order of ascending type id. - */ public class RelationshipStage extends Stage { - private AssignRelationshipIdBatchStep idAssigner; + private RelationshipTypeCheckerStep typer; - public RelationshipStage( String topic, Configuration config, IoMonitor writeMonitor, - Predicate typeFilter, - InputIterator relationships, IdMapper idMapper, BatchingNeoStores neoStore, - NodeRelationshipCache cache, EntityStoreUpdaterStep.Monitor storeUpdateMonitor, - long firstRelationshipId ) + public RelationshipStage( Configuration config, IoMonitor writeMonitor, + InputIterable relationships, IdMapper idMapper, + Collector badCollector, InputCache inputCache, NodeRelationshipCache cache, + BatchingNeoStores neoStore, EntityStoreUpdaterStep.Monitor storeUpdateMonitor ) throws IOException { - super( "Relationships" + topic, config, ORDER_SEND_DOWNSTREAM ); - add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class, typeFilter ) ); + super( "Relationships", config, ORDER_SEND_DOWNSTREAM ); + add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), + InputRelationship.class, r -> true ) ); + if ( !relationships.supportsMultiplePasses() ) + { + add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships( MAIN ) ) ); + } RelationshipStore relationshipStore = neoStore.getRelationshipStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); - add( idAssigner = new AssignRelationshipIdBatchStep( control(), config, firstRelationshipId ) ); + add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository() ) ); + add( new AssignRelationshipIdBatchStep( control(), config, 0 ) ); add( new RelationshipPreparationStep( control(), config, idMapper ) ); add( new RelationshipRecordPreparationStep( control(), config, neoStore.getRelationshipTypeRepository() ) ); - add( new RelationshipEncoderStep( control(), config, cache ) ); + add( new CalculateDenseNodesStep( control(), config, cache, badCollector ) ); add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore, writeMonitor, storeUpdateMonitor ) ); } - public long getNextRelationshipId() + public RelationshipTypeDistribution getDistribution() { - return idAssigner.getNextRelationshipId(); + return typer.getDistribution(); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java index 1e4f627f5e40b..19e62046931a6 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java @@ -43,7 +43,7 @@ public ScanAndCacheGroupsStage( Configuration config, RecordStore( control(), config, false, store ) ); + add( new ReadRecordsStep<>( control(), config, false, store, null ) ); add( new CacheGroupsStep( control(), config, cache ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java index 9b78e9c2eb2b0..a3c21ab109bd2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SparseNodeFirstRelationshipStage.java @@ -41,12 +41,11 @@ */ public class SparseNodeFirstRelationshipStage extends Stage { - public SparseNodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore, - NodeRelationshipCache cache ) + public SparseNodeFirstRelationshipStage( Configuration config, NodeStore nodeStore, NodeRelationshipCache cache ) { - super( "Node --> Relationship" + topic, config, ORDER_SEND_DOWNSTREAM ); + super( "Node --> Relationship", config, ORDER_SEND_DOWNSTREAM ); add( new ReadNodeIdsByCacheStep( control(), config, cache, NodeType.NODE_TYPE_SPARSE ) ); - add( new ReadRecordsStep<>( control(), config, true, nodeStore ) ); + add( new ReadRecordsStep<>( control(), config, true, nodeStore, null ) ); add( new RecordProcessorStep<>( control(), "LINK", config, new SparseNodeFirstRelationshipProcessor( cache ), false ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java index b91f03ae6763f..c1960b44b83e0 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java @@ -21,6 +21,7 @@ import java.lang.reflect.Array; import java.util.Arrays; +import java.util.function.Predicate; import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.kernel.impl.store.RecordCursor; @@ -39,16 +40,18 @@ */ public class ReadRecordsStep extends ProcessorStep { - protected final RecordStore store; + private final RecordStore store; private final Class klass; - protected final int batchSize; + private final Predicate filter; + private final int batchSize; @SuppressWarnings( "unchecked" ) public ReadRecordsStep( StageControl control, Configuration config, boolean inRecordWritingStage, - RecordStore store ) + RecordStore store, Predicate filter ) { super( control, ">", config, parallelReading( config, inRecordWritingStage ) ? 0 : 1 ); this.store = store; + this.filter = filter; this.klass = (Class) store.newRecord().getClass(); this.batchSize = config.batchSize(); } @@ -82,7 +85,7 @@ protected void process( PrimitiveLongIterator idRange, BatchSender sender ) thro boolean hasNext = true; while ( hasNext ) { - if ( cursor.next( id ) && !IdValidator.isReservedId( id ) ) + if ( cursor.next( id ) && !IdValidator.isReservedId( id ) && (filter == null || filter.test( record )) ) { batch[i++] = (RECORD) record.clone(); } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java index 08db644fccbad..287b0a9089349 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java @@ -42,7 +42,7 @@ public void reservedIdIsSkipped() throws Exception long highId = 5; RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId ); RelationshipLinkbackStage stage = new RelationshipLinkbackStage( "Test", - Configuration.DEFAULT, store, newCache(), 0, highId, NodeType.NODE_TYPE_SPARSE ); + Configuration.DEFAULT, store, newCache(), null, null, NodeType.NODE_TYPE_SPARSE ); ExecutionSupervisors.superviseExecution( ExecutionMonitors.invisible(), Configuration.DEFAULT, stage );