diff --git a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java index 7de28826d7a06..8e33466c97f94 100644 --- a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java @@ -70,8 +70,8 @@ import org.neo4j.unsafe.impl.batchimport.input.Inputs; import org.neo4j.unsafe.impl.batchimport.input.SimpleInputIterator; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; - import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.neo4j.helpers.collection.Iterators.asSet; @@ -95,7 +95,8 @@ public class ParallelBatchImporterTest public final RandomRule random = new RandomRule(); private static final int NODE_COUNT = 10_000; - private static final int RELATIONSHIP_COUNT = NODE_COUNT * 5; + private static final int RELATIONSHIPS_PER_NODE = 5; + private static final int RELATIONSHIP_COUNT = NODE_COUNT * RELATIONSHIPS_PER_NODE; protected final Configuration config = new Configuration.Default() { @Override @@ -108,7 +109,8 @@ public int batchSize() @Override public int denseNodeThreshold() { - return 30; + // This will have statistically half the nodes be considered dense + return RELATIONSHIPS_PER_NODE * 2; } @Override @@ -373,10 +375,11 @@ protected void verifyData( int nodeCount, int relationshipCount, GraphDatabaseSe if ( !inputIdGenerator.isMiss( input.startNode() ) && !inputIdGenerator.isMiss( input.endNode() ) ) { - // A relationship refering to missing nodes. The InputIdGenerator is expected to generate + // A relationship referring to missing nodes. The InputIdGenerator is expected to generate // some (very few) of those. Skip it. String name = (String) propertyOf( input, "id" ); Relationship relationship = relationshipByName.get( name ); + assertNotNull( "Expected there to be a relationship with name '" + name + "'", relationship ); assertEquals( nodeByInputId.get( uniqueId( input.startNodeGroup(), input.startNode() ) ), relationship.getStartNode() ); assertEquals( nodeByInputId.get( uniqueId( input.endNodeGroup(), input.endNode() ) ), @@ -504,11 +507,18 @@ protected InputRelationship fetchNextOrNull() startNode = idGenerator.miss( random, startNode, 0.001f ); endNode = idGenerator.miss( random, endNode, 0.001f ); + String type = idGenerator.randomType( random ); + if ( random.nextFloat() < 0.00005 ) + { + // Let there be a small chance of introducing a one-off relationship + // with a type that no, or at least very few, other relationships have. + type += "_odd"; + } return new InputRelationship( sourceDescription, itemNumber, itemNumber, properties, null, startNodeGroup, startNode, endNodeGroup, endNode, - idGenerator.randomType( random ), null ); + type, null ); } finally { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java index e5f32322164e1..02815f65171cd 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java @@ -21,13 +21,16 @@ import java.io.IOException; -import org.neo4j.kernel.impl.store.RelationshipStore; +import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; import org.neo4j.unsafe.impl.batchimport.staging.Stage; +import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; + +import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; /** * Counts number of relationships per node that is going to be imported by {@link RelationshipStage} later. @@ -36,20 +39,45 @@ */ public class CalculateDenseNodesStage extends Stage { + private RelationshipTypeCheckerStep typer; + private final NodeStore nodeStore; + private final NodeRelationshipCache cache; + public CalculateDenseNodesStage( Configuration config, InputIterable relationships, - RelationshipStore relationshipStore, NodeRelationshipCache cache, IdMapper idMapper, - Collector badCollector, InputCache inputCache ) throws IOException + NodeRelationshipCache cache, IdMapper idMapper, + Collector badCollector, InputCache inputCache, + BatchingNeoStores neoStores ) throws IOException { super( "Calculate dense nodes", config ); + this.cache = cache; add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), InputRelationship.class ) ); if ( !relationships.supportsMultiplePasses() ) { - add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships() ) ); + add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships( MAIN ) ) ); } + add( typer = new RelationshipTypeCheckerStep( control(), config, neoStores.getRelationshipTypeRepository() ) ); add( new RelationshipPreparationStep( control(), config, idMapper ) ); - add( new CalculateRelationshipsStep( control(), config, relationshipStore ) ); + add( new CalculateRelationshipsStep( control(), config, neoStores.getRelationshipStore() ) ); add( new CalculateDenseNodePrepareStep( control(), config, badCollector ) ); add( new CalculateDenseNodesStep( control(), config, cache ) ); + nodeStore = neoStores.getNodeStore(); + } + + /* + * @see RelationshipTypeCheckerStep#getRelationshipTypes(int) + */ + public Object[] getRelationshipTypes( int belowOrEqualToThreshold ) + { + return typer.getRelationshipTypes( belowOrEqualToThreshold ); + } + + @Override + public void close() + { + // At this point we know how many nodes we have, so we tell the cache that instead of having the + // cache keeping track of that in a the face of concurrent updates. + cache.setHighNodeId( nodeStore.getHighId() ); + super.close(); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdMapperPreparationStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdMapperPreparationStage.java index 58c79ca2cc079..94af0bbeda13b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdMapperPreparationStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/IdMapperPreparationStage.java @@ -22,7 +22,6 @@ import org.neo4j.helpers.progress.ProgressListener; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; -import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; @@ -36,11 +35,10 @@ public class IdMapperPreparationStage extends Stage { public IdMapperPreparationStage( Configuration config, IdMapper idMapper, InputIterable nodes, - InputCache inputCache, Collector collector, StatsProvider memoryUsageStats ) + Collector collector, StatsProvider memoryUsageStats ) { super( "Prepare node index", config ); add( new IdMapperPreparationStep( control(), config, - idMapper, idsOf( nodes.supportsMultiplePasses() ? nodes : inputCache.nodes() ), - collector, memoryUsageStats ) ); + idMapper, idsOf( nodes ), collector, memoryUsageStats ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java index e1575910287d2..2c31743e624a4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIterator.java @@ -65,6 +65,46 @@ public void close() } } + public static class Delegate extends PrefetchingIterator implements InputIterator + { + protected final InputIterator actual; + + public Delegate( InputIterator actual ) + { + this.actual = actual; + } + + @Override + public void close() + { + actual.close(); + } + + @Override + protected T fetchNextOrNull() + { + return actual.hasNext() ? actual.next() : null; + } + + @Override + public String sourceDescription() + { + return actual.sourceDescription(); + } + + @Override + public long lineNumber() + { + return actual.lineNumber(); + } + + @Override + public long position() + { + return actual.position(); + } + } + public static class Empty extends Adapter { @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java index 2c05d28fabe56..3e34c94f9e321 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java @@ -21,14 +21,13 @@ import org.neo4j.kernel.impl.api.CountsAccessor; import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.kernel.impl.store.RelationshipStore; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; /** - * Reads all records from {@link RelationshipStore} and process the counts in them. Uses a {@link NodeLabelsCache} - * previously populated by f.ex {@link ProcessNodeCountsDataStep}. + * Reads all records from {@link NodeStore} and process the counts in them, populating {@link NodeLabelsCache} + * for later use of {@link RelationshipCountsStage}. */ public class NodeCountsStage extends Stage { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java index 0eebf9ef52b6f..9d668233cfc0a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java @@ -37,14 +37,14 @@ public class NodeFirstRelationshipProcessor implements RecordProcessor relGroupStore; private final NodeRelationshipCache cache; - - private long nextGroupId = -1; + private final int relationshipType; public NodeFirstRelationshipProcessor( RecordStore relGroupStore, - NodeRelationshipCache cache ) + NodeRelationshipCache cache, int relationshipType ) { this.relGroupStore = relGroupStore; this.cache = cache; + this.relationshipType = relationshipType; } @Override @@ -64,22 +64,18 @@ public boolean process( NodeRecord node ) } @Override - public long visit( long nodeId, int type, long next, long out, long in, long loop ) + public long visit( long nodeId, long next, long out, long in, long loop ) { - long id = nextGroupId != -1 ? nextGroupId : relGroupStore.nextId(); - nextGroupId = -1; - + // Here we'll use the already generated id (below) from the previous visit, if that so happened + long id = relGroupStore.nextId(); RelationshipGroupRecord groupRecord = new RelationshipGroupRecord( id ); - groupRecord.setType( type ); + groupRecord.setType( relationshipType ); groupRecord.setInUse( true ); groupRecord.setFirstOut( out ); groupRecord.setFirstIn( in ); groupRecord.setFirstLoop( loop ); groupRecord.setOwningNode( nodeId ); - if ( next != -1 ) - { - groupRecord.setNext( nextGroupId = relGroupStore.nextId() ); - } + groupRecord.setNext( next ); relGroupStore.prepareForCommit( groupRecord ); relGroupStore.updateRecord( groupRecord ); return id; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java index b23e336e26334..01c7440b87420 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java @@ -33,14 +33,16 @@ */ public class NodeFirstRelationshipStage extends Stage { - public NodeFirstRelationshipStage( Configuration config, NodeStore nodeStore, - RecordStore relationshipGroupStore, NodeRelationshipCache cache, final Collector collector, - LabelScanStore labelScanStore ) + public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore, + RecordStore relationshipGroupStore, NodeRelationshipCache cache, + final Collector collector, LabelScanStore labelScanStore, boolean denseNodes, int relationshipType ) { - super( "Node --> Relationship", config ); - add( new ReadNodeRecordsStep( control(), config, nodeStore ) ); + super( "Node --> Relationship" + topic, config ); + add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, denseNodes ) ); add( new RecordProcessorStep<>( control(), "LINK", config, - new NodeFirstRelationshipProcessor( relationshipGroupStore, cache ), false ) ); - add( new UpdateNodeRecordsStep( control(), config, nodeStore, collector, labelScanStore ) ); + new NodeFirstRelationshipProcessor( relationshipGroupStore, cache, relationshipType ), false ) ); + boolean shouldAlsoPruneBadNodes = !denseNodes; + add( new UpdateNodeRecordsStep( control(), config, nodeStore, collector, labelScanStore, + shouldAlsoPruneBadNodes ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java index da27eb440e562..18f583a0ac151 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java @@ -34,6 +34,7 @@ import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; +import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; /** @@ -52,7 +53,7 @@ public NodeStage( Configuration config, IoMonitor writeMonitor, add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class ) ); if ( !nodes.supportsMultiplePasses() ) { - add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes() ) ); + add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes( MAIN ) ) ); } NodeStore nodeStore = neoStore.getNodeStore(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 572bccf198fdd..cf8d97ea4fce0 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; - import org.neo4j.helpers.Exceptions; import org.neo4j.helpers.Format; import org.neo4j.io.fs.DefaultFileSystemAbstraction; @@ -40,6 +39,7 @@ import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.input.PerTypeRelationshipSplitter; import org.neo4j.unsafe.impl.batchimport.staging.DynamicProcessorAssigner; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -48,8 +48,11 @@ import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; import static java.lang.System.currentTimeMillis; + import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; +import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; +import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseExecution; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.withDynamicProcessorAssignment; @@ -133,6 +136,9 @@ public void doImport( Input input ) throws IOException StatsProvider memoryUsageStats = new MemoryUsageStatsProvider( nodeRelationshipCache, idMapper ); InputIterable nodes = input.nodes(); InputIterable relationships = input.relationships(); + InputIterable cachedNodes = cachedForSure( nodes, inputCache.nodes( MAIN, true ) ); + InputIterable cachedRelationships = + cachedForSure( relationships, inputCache.relationships( MAIN, true ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); @@ -142,8 +148,8 @@ public void doImport( Input input ) throws IOException storeUpdateMonitor, memoryUsageStats ); // Stage 2 -- calculate dense node threshold - CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config, relationships, - relationshipStore, nodeRelationshipCache, idMapper, badCollector, inputCache ); + CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config, + relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); // Execute stages 1 and 2 in parallel or sequentially? if ( idMapper.needsPreparation() ) @@ -151,7 +157,7 @@ public void doImport( Input input ) throws IOException // So we need to execute the node stage first as it fills the id mapper and prepares it in the end, // before executing any stage that needs ids from the id mapper, for example calc dense node stage. executeStages( nodeStage ); - executeStages( new IdMapperPreparationStage( config, idMapper, nodes, inputCache, + executeStages( new IdMapperPreparationStage( config, idMapper, cachedNodes, badCollector, memoryUsageStats ) ); executeStages( calculateDenseNodesStage ); } @@ -161,20 +167,9 @@ public void doImport( Input input ) throws IOException executeStages( nodeStage, calculateDenseNodesStage ); } - // Stage 3 -- relationships, properties - final RelationshipStage relationshipStage = new RelationshipStage( config, writeMonitor, - relationships.supportsMultiplePasses() ? relationships : inputCache.relationships(), - idMapper, neoStore, nodeRelationshipCache, input.specificRelationshipIds(), storeUpdateMonitor ); - executeStages( relationshipStage ); - - // Stage 4 -- set node nextRel fields - executeStages( new NodeFirstRelationshipStage( config, neoStore.getNodeStore(), - neoStore.getRelationshipGroupStore(), nodeRelationshipCache, badCollector, - neoStore.getLabelScanStore() ) ); - // Stage 5 -- link relationship chains together - nodeRelationshipCache.clearRelationships(); - executeStages( new RelationshipLinkbackStage( config, relationshipStore, - nodeRelationshipCache ) ); + importRelationships( input, nodeRelationshipCache, storeUpdateMonitor, neoStore, badCollector, writeMonitor, + idMapper, cachedRelationships, + inputCache, calculateDenseNodesStage.getRelationshipTypes( 100 ) ); // Release this potentially really big piece of cached data nodeRelationshipCache.close(); @@ -223,6 +218,69 @@ public void doImport( Input input ) throws IOException } } + private void importRelationships( Input input, NodeRelationshipCache nodeRelationshipCache, + CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore, Collector badCollector, + IoMonitor writeMonitor, IdMapper idMapper, InputIterable relationships, + InputCache inputCache, Object[] allRelationshipTypes ) + { + // Imports the relationships from the Input. This isn't a straight forward as importing nodes, + // since keeping track of and updating heads of relationship chains in scenarios where most nodes + // are dense and there are many relationship types scales poorly w/ regards to cache memory usage + // also as a side-effect time required to update this cache. + // + // The approach is instead to do multiple iterations where each iteration imports relationships + // of a single type. For each iteration Node --> Relationship and Relationship --> Relationship + // stages _for dense nodes only_ are run so that the cache can be reused to hold relationship chain heads + // of the next type in the next iteration. All relationships will be imported this way and then + // finally there will be one Node --> Relationship and Relationship --> Relationship stage linking + // all sparse relationship chains together. + + PerTypeRelationshipSplitter perTypeIterator = + new PerTypeRelationshipSplitter( relationships.iterator(), allRelationshipTypes, + type -> neoStore.getRelationshipTypeRepository().getOrCreateId( type ), inputCache ); + + long nextRelationshipId = 0; + for ( int i = 0; perTypeIterator.hasNext(); i++ ) + { + // Stage 3a -- relationships, properties + nodeRelationshipCache.setForwardScan( true ); + Object currentType = perTypeIterator.currentType(); + int currentTypeId = neoStore.getRelationshipTypeRepository().getOrCreateId( currentType ); + InputIterator perType = perTypeIterator.next(); + String topic = " [:" + currentType + "] (" + + (i+1) + "/" + allRelationshipTypes.length + ")"; + final RelationshipStage relationshipStage = new RelationshipStage( topic, config, writeMonitor, + perType, idMapper, + neoStore, nodeRelationshipCache, input.specificRelationshipIds(), storeUpdateMonitor, + nextRelationshipId ); + executeStages( relationshipStage ); + + // Stage 4a -- set node nextRel fields for dense nodes + executeStages( new NodeFirstRelationshipStage( topic, config, neoStore.getNodeStore(), + neoStore.getRelationshipGroupStore(), nodeRelationshipCache, badCollector, + neoStore.getLabelScanStore(), true/*dense*/, currentTypeId ) ); + + // Stage 5a -- link relationship chains together for dense nodes + nodeRelationshipCache.setForwardScan( false ); + executeStages( new RelationshipLinkbackStage( topic, config, neoStore.getRelationshipStore(), + nodeRelationshipCache, nextRelationshipId, true/*dense*/ ) ); + nextRelationshipId = relationshipStage.getNextRelationshipId(); + nodeRelationshipCache.clearChangedChunks( true/*dense*/ ); // cheap higher level clearing + } + + String topic = " Sparse/final"; + nodeRelationshipCache.setForwardScan( true ); + // Stage 4b -- set node nextRel fields for sparse nodes + executeStages( new NodeFirstRelationshipStage( topic, config, neoStore.getNodeStore(), + neoStore.getRelationshipGroupStore(), nodeRelationshipCache, badCollector, + neoStore.getLabelScanStore(), false/*sparse*/, -1 ) ); + + // Stage 5b -- link relationship chains together for sparse nodes + nodeRelationshipCache.setForwardScan( false ); + executeStages( new RelationshipLinkbackStage( topic, config, neoStore.getRelationshipStore(), + nodeRelationshipCache, 0, false/*sparse*/ ) ); + } + private void executeStages( Stage... stages ) { superviseExecution( executionMonitor, config, stages ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java index 7919d28ed6a6a..5cfb181116177 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java @@ -120,4 +120,13 @@ protected void process( Batch batch, Batch concurrentBatches = 1; } } + + /** + * @return the next relationship id that this step would return if it were to import more relationships. + * This value can be used to feed into importing the next type, e.g. the constructor. + */ + public long getNextRelationshipId() + { + return firstRecordId; + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java new file mode 100644 index 0000000000000..b24c03b8b7c63 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.impl.store.record.NodeRecord; +import org.neo4j.kernel.impl.store.record.RecordLoad; +import org.neo4j.unsafe.impl.batchimport.cache.ByteArray; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor; +import org.neo4j.unsafe.impl.batchimport.staging.AbstractStep; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +import static java.lang.System.nanoTime; + +/** + * Using the {@link NodeRelationshipCache} efficiently looks for changed nodes and reads those + * {@link NodeRecord} and sends downwards. + */ +public class ReadNodeRecordsByCacheStep extends AbstractStep +{ + private final boolean denseNodes; + private final NodeRelationshipCache cache; + private final int batchSize; + private final NodeStore nodeStore; + + public ReadNodeRecordsByCacheStep( StageControl control, Configuration config, + NodeStore nodeStore, NodeRelationshipCache cache, boolean denseNodes ) + { + super( control, ">", config ); + this.nodeStore = nodeStore; + this.cache = cache; + this.denseNodes = denseNodes; + this.batchSize = config.batchSize(); + } + + @Override + public long receive( long ticket, NodeRecord[] batch ) + { + new Thread() + { + @Override + public void run() + { + assertHealthy(); + try ( NodeVisitor visitor = new NodeVisitor() ) + { + cache.visitChangedNodes( visitor, denseNodes ); + } + endOfUpstream(); + } + }.start(); + return 0; + } + + private class NodeVisitor implements NodeChangeVisitor, AutoCloseable + { + private NodeRecord[] batch = new NodeRecord[batchSize]; + private int cursor; + private long time = nanoTime(); + + @Override + public void change( long nodeId, ByteArray array ) + { + batch[cursor++] = nodeStore.getRecord( nodeId, nodeStore.newRecord(), RecordLoad.CHECK ); + if ( cursor == batchSize ) + { + send(); + batch = new NodeRecord[batchSize]; + cursor = 0; + } + } + + @SuppressWarnings( "unchecked" ) + private void send() + { + totalProcessingTime.add( nanoTime() - time ); + downstream.receive( doneBatches.getAndIncrement(), batch ); + time = nanoTime(); + assertHealthy(); + } + + @Override + public void close() + { + if ( cursor > 0 ) + { + send(); + } + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java index 6e1793d076ac0..3dc11c45f3323 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStep.java @@ -33,19 +33,21 @@ */ public class ReadRelationshipRecordsBackwardsStep extends ReadRecordsStep { + private final long firstRelationshipId; private long id; public ReadRelationshipRecordsBackwardsStep( StageControl control, Configuration config, - RelationshipStore store ) + RelationshipStore store, long firstRelationshipId ) { super( control, config, store ); - id = highId; + this.firstRelationshipId = firstRelationshipId; + this.id = highId; } @Override protected Object nextBatchOrNull( long ticket, int batchSize ) { - int size = (int) min( batchSize, id ); + int size = (int) min( batchSize, id-firstRelationshipId ); RelationshipRecord[] batch = new RelationshipRecord[size]; boolean seenReservedId = false; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java index 7124053e1adcc..0d0676de05557 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordProcessorStep.java @@ -45,11 +45,16 @@ public RecordProcessorStep( StageControl control, String name, Configuration con @Override protected void process( T[] batch, BatchSender sender ) { - for ( T item : batch ) + for ( int i = 0; i < batch.length; i++ ) { - if ( item.inUse() ) + T item = batch[i]; + if ( item != null && item.inUse() ) { - processor.process( item ); + if ( !processor.process( item ) ) + { + // No change for this record + batch[i] = null; + } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java index 8aaddd4259a80..65633e8bdd307 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java @@ -107,7 +107,7 @@ protected void process( Batch batch, Batch // Set first/second next rel boolean loop = startNodeId == endNodeId; long firstNextRel = cache.getAndPutRelationship( - startNodeId, typeId, loop ? BOTH : OUTGOING, relationshipId, true ); + startNodeId, loop ? BOTH : OUTGOING, relationshipId, true ); relationshipRecord.setFirstNextRel( firstNextRel ); if ( loop ) { @@ -116,7 +116,7 @@ protected void process( Batch batch, Batch else { relationshipRecord.setSecondNextRel( cache.getAndPutRelationship( - endNodeId, typeId, INCOMING, relationshipId, true ) ); + endNodeId, INCOMING, relationshipId, true ) ); } // Most rels will not be first in chain diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java index b2741ac604146..85e1b41362d3d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java @@ -30,55 +30,69 @@ public class RelationshipLinkbackProcessor implements RecordProcessor { private final NodeRelationshipCache cache; + private final boolean denseNodes; - public RelationshipLinkbackProcessor( NodeRelationshipCache cache ) + public RelationshipLinkbackProcessor( NodeRelationshipCache cache, boolean denseNodes ) { this.cache = cache; + this.denseNodes = denseNodes; } @Override public boolean process( RelationshipRecord record ) { boolean isLoop = record.getFirstNode() == record.getSecondNode(); + boolean firstIsDense = cache.isDense( record.getFirstNode() ); + boolean changed = false; if ( isLoop ) { - long prevRel = cache.getAndPutRelationship( record.getFirstNode(), - record.getType(), Direction.BOTH, record.getId(), false ); - if ( prevRel == -1 ) - { // First one - record.setFirstInFirstChain( true ); - record.setFirstInSecondChain( true ); - prevRel = cache.getCount( record.getFirstNode(), - record.getType(), Direction.BOTH ); + if ( firstIsDense == denseNodes ) + { + long prevRel = cache.getAndPutRelationship( record.getFirstNode(), + Direction.BOTH, record.getId(), false ); + if ( prevRel == -1 ) + { // First one + record.setFirstInFirstChain( true ); + record.setFirstInSecondChain( true ); + prevRel = cache.getCount( record.getFirstNode(), Direction.BOTH ); + } + record.setFirstPrevRel( prevRel ); + record.setSecondPrevRel( prevRel ); + changed = true; } - record.setFirstPrevRel( prevRel ); - record.setSecondPrevRel( prevRel ); } else { // Start node - long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), - record.getType(), Direction.OUTGOING, record.getId(), false ); - if ( firstPrevRel == -1 ) - { // First one - record.setFirstInFirstChain( true ); - firstPrevRel = cache.getCount( record.getFirstNode(), - record.getType(), Direction.OUTGOING ); + if ( firstIsDense == denseNodes ) + { + long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), + Direction.OUTGOING, record.getId(), false ); + if ( firstPrevRel == -1 ) + { // First one + record.setFirstInFirstChain( true ); + firstPrevRel = cache.getCount( record.getFirstNode(), Direction.OUTGOING ); + } + record.setFirstPrevRel( firstPrevRel ); + changed = true; } - record.setFirstPrevRel( firstPrevRel ); // End node - long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), - record.getType(), Direction.INCOMING, record.getId(), false ); - if ( secondPrevRel == -1 ) - { // First one - record.setFirstInSecondChain( true ); - secondPrevRel = cache.getCount( record.getSecondNode(), - record.getType(), Direction.INCOMING ); + boolean secondIsDense = cache.isDense( record.getSecondNode() ); + if ( secondIsDense == denseNodes ) + { + long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), + Direction.INCOMING, record.getId(), false ); + if ( secondPrevRel == -1 ) + { // First one + record.setFirstInSecondChain( true ); + secondPrevRel = cache.getCount( record.getSecondNode(), Direction.INCOMING ); + } + record.setSecondPrevRel( secondPrevRel ); + changed = true; } - record.setSecondPrevRel( secondPrevRel ); } - return true; + return changed; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index 379fc9db0a16b..069df021e15f1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -30,12 +30,13 @@ */ public class RelationshipLinkbackStage extends Stage { - public RelationshipLinkbackStage( Configuration config, RelationshipStore store, NodeRelationshipCache cache ) + public RelationshipLinkbackStage( String topic, Configuration config, RelationshipStore store, NodeRelationshipCache cache, + long firstRelationshipId, boolean denseNodes ) { - super( "Relationship --> Relationship", config ); - add( new ReadRelationshipRecordsBackwardsStep( control(), config, store ) ); + super( "Relationship --> Relationship" + topic, config ); + add( new ReadRelationshipRecordsBackwardsStep( control(), config, store, firstRelationshipId ) ); add( new RecordProcessorStep<>( control(), "LINK", config, - new RelationshipLinkbackProcessor( cache ), false ) ); + new RelationshipLinkbackProcessor( cache, denseNodes ), false ) ); add( new UpdateRecordsStep<>( control(), config, store ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java index 500a3bebc8d86..ced74f74a0ee2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java @@ -20,7 +20,6 @@ package org.neo4j.unsafe.impl.batchimport; import java.util.Arrays; - import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index bf9015258163f..ba9337811744d 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -37,22 +37,30 @@ */ public class RelationshipStage extends Stage { - public RelationshipStage( Configuration config, IoMonitor writeMonitor, - InputIterable relationships, IdMapper idMapper, - BatchingNeoStores neoStore, NodeRelationshipCache cache, boolean specificIds, - EntityStoreUpdaterStep.Monitor storeUpdateMonitor ) + private ParallelizeByNodeIdStep parallelizer; + private RelationshipEncoderStep encoder; + + public RelationshipStage( String topic, Configuration config, IoMonitor writeMonitor, + InputIterator relationships, IdMapper idMapper, BatchingNeoStores neoStore, + NodeRelationshipCache cache, boolean specificIds, EntityStoreUpdaterStep.Monitor storeUpdateMonitor, + long firstRelationshipId ) { - super( "Relationships", config, ORDER_SEND_DOWNSTREAM | ORDER_PROCESS ); - add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), InputRelationship.class ) ); + super( "Relationships" + topic, config, ORDER_SEND_DOWNSTREAM | ORDER_PROCESS ); + add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); add( new RelationshipPreparationStep( control(), config, idMapper ) ); add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); - add( new ParallelizeByNodeIdStep( control(), config ) ); - add( new RelationshipEncoderStep( control(), config, + add( parallelizer = new ParallelizeByNodeIdStep( control(), config, firstRelationshipId ) ); + add( encoder = new RelationshipEncoderStep( control(), config, neoStore.getRelationshipTypeRepository(), cache, specificIds ) ); add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore, writeMonitor, storeUpdateMonitor ) ); } + + public long getNextRelationshipId() + { + return parallelizer.getNextRelationshipId(); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java new file mode 100644 index 0000000000000..770ebdb32b6c3 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository; + +/** + * Counts relationships per type to later be able to provide all types, even sorted in descending order + * of number of relationships per type. + */ +public class RelationshipTypeCheckerStep extends ProcessorStep> +{ + private static final Comparator> SORT_BY_COUNT_DESC = + (e1,e2) -> Long.compare( e2.getValue().get(), e1.getValue().get() ); + private static final Comparator> SORT_BY_ID_DESC = + (e1,e2) -> Integer.compare( (Integer)e2.getKey(), (Integer)e1.getKey() ); + private final Map allTypes = new HashMap<>(); + private final BatchingRelationshipTypeTokenRepository typeTokenRepository; + private Map.Entry[] sortedTypes; + private long totalCount; + + public RelationshipTypeCheckerStep( StageControl control, Configuration config, + BatchingRelationshipTypeTokenRepository typeTokenRepository ) + { + super( control, "TYPE", config, 0 ); + this.typeTokenRepository = typeTokenRepository; + } + + @Override + protected void process( Batch batch, BatchSender sender ) throws Throwable + { + for ( InputRelationship relationship : batch.input ) + { + Object type = relationship.typeAsObject(); + AtomicLong count = allTypes.get( type ); + // Check w/o synchronized, it's fine + if ( count == null ) + { + synchronized ( allTypes ) + { + if ( (count = allTypes.get( type )) == null ) + { + allTypes.put( type, count = new AtomicLong() ); + } + } + } + count.incrementAndGet(); + } + totalCount += batch.input.length; + sender.send( batch ); + } + + @SuppressWarnings( "unchecked" ) + @Override + protected void done() + { + sortedTypes = allTypes.entrySet().toArray( new Map.Entry[allTypes.size()] ); + if ( sortedTypes.length > 0 ) + { + Comparator> comparator = sortedTypes[0].getKey() instanceof Integer ? + SORT_BY_ID_DESC : SORT_BY_COUNT_DESC; + Arrays.sort( sortedTypes, comparator ); + } + + // Create the types in the reverse order of which is returned in getAllTypes() + // Why do we do that? Well, it's so that the relationship groups can be created iteratively + // and still keeping order of (ascending) type in its chains. Relationship groups have next pointers + // and creating these groups while still adhering to principal of sequential I/O doesn't allow us + // to go back and update a previous group to point to a next relationship group. This is why we + // create the groups in ascending id order whereas next pointers will always point backwards to + // lower ids (and therefore relationship type ids). This fulfills the constraint of having + // relationship group record chains be in order of ascending relationship type. + for ( int i = sortedTypes.length - 1; i >= 0; i-- ) + { + typeTokenRepository.getOrCreateId( sortedTypes[i].getKey() ); + } + super.done(); + } + + /** + * Returns relationship types which have percentage of relationships out of the total or less than that. + * E.g. a value of {@code 20} would return types which have got 20% of relationships of less. + * + * @param belowOrEqualToPercentage threshold where relationship types which have this percentage + * out of all relationships or less will be returned. + * @return the order of which to order {@link InputRelationship} when importing relationships. + * The order in which these relationships are returned will be the reverse order of relationship type ids. + * There are two modes of relationship types here, one is user defined String where this step + * have full control of assigning ids to those and will do so based on size of types. The other mode + * is where types are given as ids straight away (as Integer) where the order is already set and so + * the types will not be sorted by size (which is simply an optimization anyway). + */ + public Object[] getRelationshipTypes( int belowOrEqualToPercentage ) + { + List result = new ArrayList<>(); + for ( Map.Entry candidate : sortedTypes ) + { + if ( percentage( totalCount, candidate.getValue().get() ) <= belowOrEqualToPercentage ) + { + result.add( candidate.getKey() ); + } + } + + return result.toArray(); + } + + private int percentage( long total, long count ) + { + return (int) Math.round( (count*100D)/total ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SourceOrCachedInputIterable.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SourceOrCachedInputIterable.java new file mode 100644 index 0000000000000..6745610016c6f --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/SourceOrCachedInputIterable.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.unsafe.impl.batchimport.input.InputEntity; + +/** + * Convenience for where there's an {@link InputIterable} which doesn't + * {@link InputIterable#supportsMultiplePasses() passes multiple support}, in which case a cached + * {@link InputIterator} will be returned instead. + * + * @param type of {@link InputEntity} of this iterator. + */ +public class SourceOrCachedInputIterable implements InputIterable +{ + private final InputIterable source; + private final InputIterable cached; + + public SourceOrCachedInputIterable( InputIterable source, InputIterable cached ) + { + this.source = source; + this.cached = cached; + } + + @Override + public InputIterator iterator() + { + return source.supportsMultiplePasses() ? source.iterator() : cached.iterator(); + } + + @Override + public boolean supportsMultiplePasses() + { + return true; + } + + public static InputIterable cachedForSure( + InputIterable source, InputIterable cached ) + { + return new SourceOrCachedInputIterable<>( source, cached ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateNodeRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateNodeRecordsStep.java index 95cae7647f06c..7d19ce64f6f69 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateNodeRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateNodeRecordsStep.java @@ -19,6 +19,7 @@ */ package org.neo4j.unsafe.impl.batchimport; +import org.neo4j.collection.primitive.PrimitiveLongCollections; import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.kernel.api.labelscan.LabelScanStore; import org.neo4j.kernel.api.labelscan.LabelScanWriter; @@ -38,15 +39,19 @@ public class UpdateNodeRecordsStep extends UpdateRecordsStep { private final PrimitiveLongIterator ids; + private final boolean pruneDeletedNodesFromLabelScanStore; + private long current; private boolean end; private final LabelScanWriter labelScanWriter; public UpdateNodeRecordsStep( StageControl control, Configuration config, RecordStore store, - Collector collector, LabelScanStore labelScanStore ) + Collector collector, LabelScanStore labelScanStore, boolean pruneDeletedNodesFromLabelScanStore ) { super( control, config, store ); - this.ids = collector.leftOverDuplicateNodesIds(); + this.pruneDeletedNodesFromLabelScanStore = pruneDeletedNodesFromLabelScanStore; + this.ids = pruneDeletedNodesFromLabelScanStore ? + collector.leftOverDuplicateNodesIds() : PrimitiveLongCollections.emptyIterator(); goToNextId(); this.labelScanWriter = end ? LabelScanWriter.EMPTY : labelScanStore.newWriter(); } @@ -75,7 +80,7 @@ protected boolean accept( NodeRecord node ) protected void update( NodeRecord node ) throws Throwable { super.update( node ); - if ( !node.inUse() ) + if ( !node.inUse() && pruneDeletedNodesFromLabelScanStore ) { // Only the "labelsAfter" is considered labelScanWriter.write( NodeLabelUpdate.labelChanges( current, EMPTY_LONG_ARRAY, EMPTY_LONG_ARRAY ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java index 9741feb6ae555..c8faab8d86ff5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java @@ -57,7 +57,7 @@ protected void process( RECORD[] batch, BatchSender sender ) throws Throwable { for ( RECORD record : batch ) { - if ( !IdValidator.isReservedId( record.getId() ) ) + if ( record != null && !IdValidator.isReservedId( record.getId() ) ) { if ( record.inUse() && !accept( record ) ) { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java index e3f74f452c251..be001710528e5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java @@ -19,49 +19,74 @@ */ package org.neo4j.unsafe.impl.batchimport.cache; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; - import org.neo4j.graphdb.Direction; +import static java.lang.Math.toIntExact; + /** * Caches of parts of node store and relationship group store. A crucial part of batch import where * any random access must be covered by this cache. All I/O, both read and write must be sequential. + * + *
+ * Main array (index into array is nodeId):
+ * [ID,DEGREE]
+ *
+ * ID means:
+ * - DEGREE >= THRESHOLD: pointer into RelationshipGroupCache array
+ *   RelationshipGroupCache array:
+ *   [NEXT,OUT_ID,OUT_DEGREE,IN_ID,IN_DEGREE,LOOP_ID,LOOP_DEGREE]
+ * - DEGREE < THRESHOLD: last seen relationship id for this node
  */
 public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable
 {
+    private static final int CHUNK_SIZE = 1_000_000;
     private static final long EMPTY = -1;
-    private static final byte[] DEFAULT_VALUE = new byte[10];
     private static final long MAX_RELATIONSHIP_ID = (1L << 48/*6B*/) - 2/*reserving -1 as legal default value*/;
+    static final int MAX_COUNT = (1 << 30/*2 change bits*/) - 2/*reserving -1 as legal default value*/;
+
+    // Sizes and offsets of values in each sparse node ByteArray item
     private static final int ID_SIZE = 6;
     private static final int COUNT_SIZE = 4;
     private static final int ID_AND_COUNT_SIZE = ID_SIZE + COUNT_SIZE;
     private static final int SPARSE_ID_OFFSET = 0;
     private static final int SPARSE_COUNT_OFFSET = ID_SIZE;
-    static
-    {
-        // This looks odd, but we're using the array itself to create a default byte[] for another
-        ByteArray array = NumberArrayFactory.HEAP.newByteArray( 1, DEFAULT_VALUE.clone() );
-        array.set6ByteLong( 0, SPARSE_ID_OFFSET, EMPTY );
-        array.setInt( 0, SPARSE_COUNT_OFFSET, 0 );
-        array.get( 0, DEFAULT_VALUE );
-    }
+
+    // Masking for tracking changes per node
+    private static final long DENSE_NODE_CHANGED_MASK = 0x80000000;
+    private static final long SPARSE_NODE_CHANGED_MASK = 0x40000000;
+    private static final long NODE_CHANGED_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK;
+    private static final long COUNT_MASK = ~NODE_CHANGED_MASKS;
 
     private final ByteArray array;
+    private byte[] chunkChangedArray;
     private final int denseNodeThreshold;
     private final RelGroupCache relGroupCache;
+    private long highId;
+    private volatile boolean oneMeansChanged = true;
+    private final int chunkSize;
 
     public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThreshold )
     {
-        this( arrayFactory, denseNodeThreshold, 1_000_000, 0 );
+        this( arrayFactory, denseNodeThreshold, CHUNK_SIZE, 0 );
     }
 
     NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThreshold, int chunkSize, long base )
     {
-        this.array = arrayFactory.newDynamicByteArray( chunkSize, DEFAULT_VALUE );
+        this.chunkSize = chunkSize;
+        this.array = arrayFactory.newDynamicByteArray( chunkSize, minusOneBytes( ID_AND_COUNT_SIZE ) );
         this.denseNodeThreshold = denseNodeThreshold;
         this.relGroupCache = new RelGroupCache( arrayFactory, chunkSize, base );
     }
 
+    private static byte[] minusOneBytes( int length )
+    {
+        byte[] bytes = new byte[length];
+        Arrays.fill( bytes, (byte) -1 );
+        return bytes;
+    }
+
     /**
      * Increment relationship count for {@code nodeId}.
      * @param nodeId node to increment relationship count for.
@@ -69,22 +94,62 @@ public NodeRelationshipCache( NumberArrayFactory arrayFactory, int denseNodeThre
      */
     public int incrementCount( long nodeId )
     {
-        ByteArray array = this.array.at( nodeId );
-        int count = getCount( array, nodeId ) + 1;
-        setCount( array, nodeId, count );
-        return count;
+        return incrementCount( array, nodeId, SPARSE_COUNT_OFFSET );
     }
 
-    private void setCount( ByteArray array, long nodeId, int count )
+    void setCount( long nodeId, int count )
     {
+        assertValidCount( nodeId, count );
         array.setInt( nodeId, SPARSE_COUNT_OFFSET, count );
     }
 
-    private static int getCount( ByteArray array, long nodeId )
+    private static void assertValidCount( long nodeId, int count )
     {
-        return array.getInt( nodeId, SPARSE_COUNT_OFFSET );
+        if ( count > MAX_COUNT )
+        {
+            // Meaning there are bits outside of this mask, meaning this value is too big
+            throw new IllegalStateException( "Tried to increment count of " + nodeId + " to " + count +
+                    ", which is too big in one single import" );
+        }
     }
 
+    /**
+     * Called by the one calling {@link #incrementCount(long)} after all nodes have been added.
+     * Done like this since currently it's just overhead trying to maintain a high id in the face
+     * of current updates, whereas it's much simpler to do this from the code incrementing the counts.
+     *
+     * @param nodeId high node id in the store, e.g. the highest node id + 1
+     */
+    public void setHighNodeId( long nodeId )
+    {
+        this.highId = nodeId;
+        this.chunkChangedArray = new byte[chunkOf( nodeId ) + 1];
+    }
+
+    private static int getCount( ByteArray array, long index, int offset )
+    {
+        long rawCount = array.getInt( index, offset ) & COUNT_MASK;
+        if ( rawCount == COUNT_MASK )
+        {
+            return 0;
+        }
+        return (int) rawCount;
+    }
+
+    private static int incrementCount( ByteArray array, long index, int offset )
+    {
+        array = array.at( index );
+        int count = getCount( array, index, offset ) + 1;
+        assertValidCount( index, count );
+        array.setInt( index, offset, count );
+        return count;
+    }
+
+    /**
+     * @param nodeId node to check whether dense or not.
+     * @return whether or not the given {@code nodeId} is dense. A node is sparse if it has less relationships,
+     * e.g. has had less calls to {@link #incrementCount(long)}, then the given dense node threshold.
+     */
     public boolean isDense( long nodeId )
     {
         return isDense( array, nodeId );
@@ -97,10 +162,21 @@ private boolean isDense( ByteArray array, long nodeId )
             return false;
         }
 
-        return getCount( array, nodeId ) >= denseNodeThreshold;
+        return getCount( array, nodeId, SPARSE_COUNT_OFFSET ) >= denseNodeThreshold;
     }
 
-    public long getAndPutRelationship( long nodeId, int type, Direction direction, long firstRelId,
+    /**
+     * Puts a relationship id to be the head of a relationship chain. If the node is sparse then
+     * the head is set directly in the cache, else if dense which head to update will depend on
+     * the {@code direction}.
+     *
+     * @param nodeId node to update relationship head for.
+     * @param direction {@link Direction} this node represents for this relationship.
+     * @param firstRelId the relationship id which is now the head of this chain.
+     * @param incrementCount as side-effect also increment count for this chain.
+     * @return the previous head of the updated relationship chain.
+     */
+    public long getAndPutRelationship( long nodeId, Direction direction, long firstRelId,
             boolean incrementCount )
     {
         if ( firstRelId > MAX_RELATIONSHIP_ID )
@@ -117,20 +193,81 @@ public long getAndPutRelationship( long nodeId, int type, Direction direction, l
 
         ByteArray array = this.array.at( nodeId );
         long existingId = all48Bits( array, nodeId, SPARSE_ID_OFFSET );
-        if ( isDense( array, nodeId ) )
+        boolean dense = isDense( array, nodeId );
+        boolean wasChanged = markAsChanged( array, nodeId, changeMask( dense ) );
+        markChunkAsChanged( nodeId, dense );
+        if ( dense )
         {
             if ( existingId == EMPTY )
             {
-                existingId = relGroupCache.allocate( type, direction, firstRelId, incrementCount );
+                existingId = relGroupCache.allocate();
                 setRelationshipId( array, nodeId, existingId );
-                return EMPTY;
+                wasChanged = false; // no need to clear when we just allocated it
             }
-            return relGroupCache.putRelationship( existingId, type, direction, firstRelId, incrementCount );
+            return relGroupCache.putRelationship( existingId, direction, firstRelId, incrementCount, wasChanged );
         }
 
         // Don't increment count for sparse node since that has already been done in a previous pass
         setRelationshipId( array, nodeId, firstRelId );
-        return existingId;
+        return wasChanged ? EMPTY : existingId;
+    }
+
+    private void markChunkAsChanged( long nodeId, boolean dense )
+    {
+        byte mask = chunkChangeMask( dense );
+        if ( !chunkHasChange( nodeId, mask ) )
+        {
+            int chunk = chunkOf( nodeId );
+            if ( (chunkChangedArray[chunk] & mask) == 0 )
+            {
+                // Multiple threads may update this chunk array, synchronized performance-wise is fine on change since
+                // it'll only happen at most a couple of times for each chunk (1M).
+                synchronized ( chunkChangedArray )
+                {
+                    chunkChangedArray[chunk] |= mask;
+                }
+            }
+        }
+    }
+
+    private int chunkOf( long nodeId )
+    {
+        return toIntExact( nodeId / chunkSize );
+    }
+
+    private static byte chunkChangeMask( boolean dense )
+    {
+        return (byte) (1 << (dense ? 1 : 0));
+    }
+
+    private boolean markAsChanged( ByteArray array, long nodeId, long mask )
+    {
+        long bits = array.getInt( nodeId, SPARSE_COUNT_OFFSET ) & 0xFFFFFFFF;
+        boolean changeBitIsSet = (bits & mask) != 0;
+        boolean changeBitWasFlipped = changeBitIsSet != oneMeansChanged;
+        if ( changeBitWasFlipped )
+        {
+            bits ^= mask; // flip the mask bit
+            array.setInt( nodeId, SPARSE_COUNT_OFFSET, (int) bits );
+        }
+        return changeBitWasFlipped;
+    }
+
+    private static boolean nodeIsChanged( ByteArray array, long nodeId, long mask )
+    {
+        long bits = array.getInt( nodeId, SPARSE_COUNT_OFFSET ) & 0xFFFFFFFF;
+
+        // The values in the cache are initialized with -1, i.e. all bits set, i.e. also the
+        // change bits set. For nodes that gets at least one call to incrementCount these will be
+        // set properly to reflect the count, e.g. 1, 2, 3, a.s.o. Nodes that won't get any call
+        // to incrementCount will not see any changes to them either, so for this matter we check
+        // if the count field is -1 as a whole and if so we can tell we've just run into such a node
+        // and we can safely say it hasn't been changed.
+        if ( bits == 0xFFFFFFFF )
+        {
+            return false;
+        }
+        return (bits & mask) != 0;
     }
 
     private void setRelationshipId( ByteArray array, long nodeId, long firstRelId )
@@ -138,7 +275,7 @@ private void setRelationshipId( ByteArray array, long nodeId, long firstRelId )
         array.set6ByteLong( nodeId, SPARSE_ID_OFFSET, firstRelId );
     }
 
-    private long getRelationshipId( ByteArray array, long nodeId )
+    private static long getRelationshipId( ByteArray array, long nodeId )
     {
         return array.get6ByteLong( nodeId, SPARSE_ID_OFFSET );
     }
@@ -155,65 +292,92 @@ private static long all48Bits( long raw )
 
     /**
      * Used when setting node nextRel fields. Gets the first relationship for this node,
-     * or the first relationship group id (where it it first visits all the groups before returning the first one).
+     * or the relationship group id. As a side effect this method also creates a relationship group
+     * if this node is dense, and returns that relationship group record id.
+     *
+     * @param nodeId id to get first relationship for.
+     * @param visitor {@link GroupVisitor} which will be notified with data about group to be created.
+     * This visitor is expected to create the group.
+     * @return the first relationship if node is sparse, or the result of {@link GroupVisitor} if dense.
      */
     public long getFirstRel( long nodeId, GroupVisitor visitor )
     {
+        assert oneMeansChanged : "This should only be done at forward scan";
+
         ByteArray array = this.array.at( nodeId );
         long id = getRelationshipId( array, nodeId );
-        if ( isDense( array, nodeId ) )
+        if ( id != EMPTY && isDense( array, nodeId ) )
         {   // Indirection into rel group cache
-            return relGroupCache.visitGroups( nodeId, id, visitor );
+            return relGroupCache.visitGroup( nodeId, id, visitor );
         }
 
         return id;
     }
 
-    public void clearRelationships()
+    /**
+     * First a note about tracking which nodes have been updated with new relationships by calls to
+     * {@link #getAndPutRelationship(long, Direction, long, boolean)}:
+     *
+     * We use two high bits of the count field in the "main" array to mark whether or not a change
+     * have been made to a node. One bit for a sparse node and one for a dense. Sparse and dense nodes
+     * now have different import cycles. When importing the relationships, all relationships are imported,
+     * one type at a time, but only dense nodes and relationship chains for dense nodes are updated
+     * for every type. After all types have been imported the sparse chains and nodes are updated in one pass.
+     *
+     * Tells this cache which direction it's about to observe changes for. If {@code true} then changes
+     * marked as the change-bit set and an unset change-bit means a change is the first one for that node.
+     * {@code false} is the opposite. This is so that there won't need to be any clearing of the cache
+     * in between forward and backward linking, since the cache can be rather large.
+     *
+     * @param forward {@code true} if going forward and having change marked as a set bit, otherwise
+     * change is marked with an unset bit.
+     */
+    public void setForwardScan( boolean forward )
     {
-        long length = array.length();
-        for ( long nodeId = 0; nodeId < length; nodeId++ )
-        {
-            if ( !isDense( nodeId ) )
-            {
-                setRelationshipId( array, nodeId, -1 );
-            }
-        }
-        relGroupCache.clearRelationships();
+        oneMeansChanged = forward;
     }
 
-    public int getCount( long nodeId, int type, Direction direction )
+    /**
+     * Returns the count (degree) of the requested relationship chain. If node is sparse then the single count
+     * for this node is returned, otherwise if the node is dense the count for the chain for the specific
+     * direction is returned.
+     *
+     * @param nodeId node to get count for.
+     * @param direction {@link Direction} to get count for.
+     * @return count (degree) of the requested relationship chain.
+     */
+    public int getCount( long nodeId, Direction direction )
     {
         ByteArray array = this.array.at( nodeId );
         if ( isDense( array, nodeId ) )
         {   // Indirection into rel group cache
             long id = getRelationshipId( array, nodeId );
-            return id == EMPTY ? 0 : relGroupCache.getCount( id, type, direction );
+            return id == EMPTY ? 0 : relGroupCache.getCount( id, direction );
         }
 
-        return getCount( array, nodeId );
+        return getCount( array, nodeId, SPARSE_COUNT_OFFSET );
     }
 
     public interface GroupVisitor
     {
         /**
          * Visits with data required to create a relationship group.
+         * Type can be decided on the outside since there'll be only one type per node.
          *
          * @param nodeId node id.
-         * @param type relationship type.
          * @param next next relationship group.
          * @param out first outgoing relationship id.
          * @param in first incoming relationship id.
          * @param loop first loop relationship id.
          * @return the created relationship group id.
          */
-        long visit( long nodeId, int type, long next, long out, long in, long loop );
+        long visit( long nodeId, long next, long out, long in, long loop );
     }
 
     public static final GroupVisitor NO_GROUP_VISITOR = new GroupVisitor()
     {
         @Override
-        public long visit( long nodeId, int type, long next, long out, long in, long loop )
+        public long visit( long nodeId, long next, long out, long in, long loop )
         {
             return -1;
         }
@@ -221,25 +385,8 @@ public long visit( long nodeId, int type, long next, long out, long in, long loo
 
     private static class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable
     {
-        private static final int TYPE_SIZE = 2;
         private static final int NEXT_OFFSET = 0;
-        private static final int TYPE_OFFSET = 6;
-        private static final int BASE_IDS_OFFSET = ID_SIZE + TYPE_SIZE;
-        private static final byte[] DEFAULT_VALUE =
-                new byte[ID_SIZE/*next*/ + TYPE_SIZE + (ID_SIZE + COUNT_SIZE) * Direction.values().length];
-        static
-        {
-            ByteArray defaultArray = NumberArrayFactory.HEAP.newByteArray( 1, DEFAULT_VALUE.clone() );
-            defaultArray.set6ByteLong( 0, NEXT_OFFSET, EMPTY );
-            defaultArray.setShort( 0, TYPE_OFFSET, (short) EMPTY );
-            for ( int i = 0, offsetBase = BASE_IDS_OFFSET; i < Direction.values().length;
-                    i++, offsetBase += ID_AND_COUNT_SIZE )
-            {
-                defaultArray.set6ByteLong( 0, offsetBase, EMPTY );
-                defaultArray.setInt( 0, offsetBase + ID_SIZE, 0 );
-            }
-            defaultArray.get( 0, DEFAULT_VALUE );
-        }
+        private static final int BASE_IDS_OFFSET = ID_SIZE;
 
         // Used for testing high id values. Should always be zero in production
         private final long base;
@@ -250,35 +397,21 @@ private static class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.
         {
             this.base = base;
             assert chunkSize > 0;
-            // We can use this array to have "entries" accommodating one entire group, e.g:
-            // - next
-            // - type
-            // - out
-            // - out degree
-            // - in
-            // - in degree
-            // - loop
-            // - loop degree
-            this.array = arrayFactory.newDynamicByteArray( chunkSize, DEFAULT_VALUE );
+            this.array = arrayFactory.newDynamicByteArray( chunkSize,
+                    minusOneBytes( ID_SIZE/*next*/ + (ID_SIZE + COUNT_SIZE) * Direction.values().length ) );
             this.nextFreeId = new AtomicLong( base );
         }
 
-        public int getCount( long id, int type, Direction direction )
+        private void clearRelationships( ByteArray array, long relGroupId )
         {
-            id = findGroupIndexForType( id, type );
-            return id == EMPTY ? 0 : array.getInt( rebase( id ), countOffset( direction ) );
+            array.set6ByteLong( relGroupId, directionOffset( Direction.OUTGOING ), EMPTY );
+            array.set6ByteLong( relGroupId, directionOffset( Direction.INCOMING ), EMPTY );
+            array.set6ByteLong( relGroupId, directionOffset( Direction.BOTH ), EMPTY );
         }
 
-        private void clearRelationships()
+        int getCount( long id, Direction direction )
         {
-            long length = array.length();
-            for ( long i = 0; i < length; i++ )
-            {
-                ByteArray array = this.array.at( i );
-                array.set6ByteLong( i, directionOffset( Direction.OUTGOING ), EMPTY );
-                array.set6ByteLong( i, directionOffset( Direction.INCOMING ), EMPTY );
-                array.set6ByteLong( i, directionOffset( Direction.BOTH ), EMPTY );
-            }
+            return id == EMPTY ? 0 : NodeRelationshipCache.getCount( array, rebase( id ), countOffset( direction ) );
         }
 
         /**
@@ -294,139 +427,60 @@ private long nextFreeId()
             return nextFreeId.getAndIncrement();
         }
 
-        private void initializeGroup( ByteArray array, long relGroupIndex, int type )
-        {
-            array.setShort( rebase( relGroupIndex ), TYPE_OFFSET, (short) type );
-            // All other values are set to defaults automatically
-        }
-
-        private long visitGroups( long nodeId, long relGroupIndex, GroupVisitor visitor )
+        private long visitGroup( long nodeId, long relGroupIndex, GroupVisitor visitor )
         {
-            long currentIndex = relGroupIndex;
-            long first = -1;
-            while ( currentIndex != EMPTY )
-            {
-                long index = rebase( currentIndex );
-                ByteArray array = this.array.at( index );
-                int type = array.getShort( index, TYPE_OFFSET );
-                long out = all48Bits( array, index, directionOffset( Direction.OUTGOING ) );
-                long in = all48Bits( array, index, directionOffset( Direction.INCOMING ) );
-                long loop = all48Bits( array, index, directionOffset( Direction.BOTH ) );
-                long next = all48Bits( array, index, NEXT_OFFSET );
-                long id = visitor.visit( nodeId, type, next, out, in, loop );
-                if ( first == -1 )
-                {   // This is the one we return
-                    first = id;
-                }
-
-                currentIndex = next;
-            }
-            return first;
+            long index = rebase( relGroupIndex );
+            ByteArray array = this.array.at( index );
+            long out = all48Bits( array, index, directionOffset( Direction.OUTGOING ) );
+            long in = all48Bits( array, index, directionOffset( Direction.INCOMING ) );
+            long loop = all48Bits( array, index, directionOffset( Direction.BOTH ) );
+            long next = all48Bits( array, index, NEXT_OFFSET );
+            long nextId = out == EMPTY && in == EMPTY && loop == EMPTY ? EMPTY :
+                visitor.visit( nodeId, next, out, in, loop );
+
+            // Save the returned next id for later, when the next group for this node is created
+            // then we know what to point this group's next to.
+            array.set6ByteLong( index, NEXT_OFFSET, nextId );
+            return nextId;
         }
 
-        private int directionOffset( Direction direction )
+        private static int directionOffset( Direction direction )
         {
             return BASE_IDS_OFFSET + (direction.ordinal() * ID_AND_COUNT_SIZE);
         }
 
-        private int countOffset( Direction direction )
+        private static int countOffset( Direction direction )
         {
             return directionOffset( direction ) + ID_SIZE;
         }
 
-        public long allocate( int type, Direction direction, long relId, boolean incrementCount )
+        long allocate()
         {
-            long index = nextFreeId();
-            ByteArray array = this.array.at( rebase( index ) );
-            initializeGroup( array, index, type );
-            putRelField( array, index, direction, relId, incrementCount );
-            return index;
+            return nextFreeId();
         }
 
-        private long putRelField( ByteArray array, long relGroupIndex, Direction direction,
-                long relId, boolean increment )
+        long putRelationship( long relGroupIndex, Direction direction,
+                long relId, boolean increment, boolean clear )
         {
             long index = rebase( relGroupIndex );
+            ByteArray array = this.array.at( index );
             int directionOffset = directionOffset( direction );
-            long previousId = all48Bits( array, index, directionOffset );
-            array.set6ByteLong( index, directionOffset, relId );
-            if ( increment )
+            long previousId;
+            if ( clear )
             {
-                int countOffset = countOffset( direction );
-                array.setInt( index, countOffset, array.getInt( index, countOffset ) + 1 );
+                clearRelationships( array, index );
+                previousId = EMPTY;
             }
-            return previousId;
-        }
-
-        public long putRelationship( long relGroupIndex, int type, Direction direction, long relId,
-                boolean trueForIncrement )
-        {
-            long currentIndex = relGroupIndex;
-            long previousIndex = EMPTY;
-            while ( currentIndex != EMPTY )
+            else
             {
-                long currentIndexRebased = rebase( currentIndex );
-                ByteArray array = this.array.at( currentIndexRebased );
-                long foundType = array.getShort( currentIndexRebased, TYPE_OFFSET );
-                if ( foundType == type )
-                {   // Found it
-                    return putRelField( array, currentIndex, direction, relId, trueForIncrement );
-                }
-                else if ( foundType > type )
-                {   // We came too far, create room for it
-                    break;
-                }
-                previousIndex = currentIndex;
-                currentIndex = all48Bits( array, currentIndexRebased, NEXT_OFFSET );
+                previousId = all48Bits( array, index, directionOffset );
             }
-
-            long newIndex = nextFreeId();
-            if ( previousIndex == EMPTY )
-            {   // We are at the start
-                array.swap( rebase( currentIndex ), rebase( newIndex ), 1 );
-                long swap = newIndex;
-                newIndex = currentIndex;
-                currentIndex = swap;
-            }
-
-            ByteArray array = this.array.at( rebase( newIndex ) );
-            initializeGroup( array, newIndex, type );
-            if ( currentIndex != EMPTY )
-            {   // We are NOT at the end
-                setNextField( array, newIndex, currentIndex );
-            }
-
-            if ( previousIndex != EMPTY )
-            {   // We are NOT at the start
-                setNextField( this.array, previousIndex, newIndex );
-            }
-
-            return putRelField( array, newIndex, direction, relId, trueForIncrement );
-        }
-
-        private void setNextField( ByteArray array, long relGroupIndex, long next )
-        {
-            array.set6ByteLong( rebase( relGroupIndex ), NEXT_OFFSET, next );
-        }
-
-        private long findGroupIndexForType( long relGroupIndex, int type )
-        {
-            long currentIndex = relGroupIndex;
-            while ( currentIndex != EMPTY )
+            array.set6ByteLong( index, directionOffset, relId );
+            if ( increment )
             {
-                long index = rebase( currentIndex );
-                int foundType = array.getShort( index, TYPE_OFFSET );
-                if ( foundType == type )
-                {   // Found it
-                    return currentIndex;
-                }
-                else if ( foundType > type )
-                {   // We came too far, create room for it
-                    break;
-                }
-                currentIndex = all48Bits( array, index, NEXT_OFFSET );
+                incrementCount( array, index, countOffset( direction ) );
             }
-            return EMPTY;
+            return previousId;
         }
 
         @Override
@@ -460,4 +514,67 @@ public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor )
         array.acceptMemoryStatsVisitor( visitor );
         relGroupCache.acceptMemoryStatsVisitor( visitor );
     }
+
+    private static long changeMask( boolean dense )
+    {
+        return dense ? DENSE_NODE_CHANGED_MASK : SPARSE_NODE_CHANGED_MASK;
+    }
+
+    @FunctionalInterface
+    public interface NodeChangeVisitor
+    {
+        void change( long nodeId, ByteArray array );
+    }
+
+    /**
+     * Efficiently visits changed nodes, e.g. nodes that have had any relationship chain updated by
+     * {@link #getAndPutRelationship(long, Direction, long, boolean)}.
+     *
+     * @param visitor {@link NodeChangeVisitor} which will be notified about all changes.
+     * @param denseNodes {@code true} for visiting changed dense nodes, {@code false} for visiting
+     * changed sparse nodes.
+     */
+    public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes )
+    {
+        long mask = changeMask( denseNodes );
+        byte chunkMask = chunkChangeMask( denseNodes );
+        for ( long nodeId = 0; nodeId < highId; )
+        {
+            if ( !chunkHasChange( nodeId, chunkMask ) )
+            {
+                nodeId += chunkSize;
+                continue;
+            }
+
+            ByteArray chunk = array.at( nodeId );
+            for ( int i = 0; i < chunkSize && nodeId < highId; i++, nodeId++ )
+            {
+                if ( isDense( chunk, nodeId ) == denseNodes && nodeIsChanged( chunk, nodeId, mask ) )
+                {
+                    visitor.change( nodeId, chunk );
+                }
+            }
+        }
+    }
+
+    /**
+     * Clears the high-level change marks.
+     *
+     * @param denseNodes {@code true} for clearing marked dense nodes, {@code false} for clearing marked sparse nodes.
+     */
+    public void clearChangedChunks( boolean denseNodes )
+    {
+        // Executed by a single thread, so no synchronized required
+        byte chunkMask = chunkChangeMask( denseNodes );
+        for ( int i = 0; i < chunkChangedArray.length; i++ )
+        {
+            chunkChangedArray[i] &= ~chunkMask;
+        }
+    }
+
+    private boolean chunkHasChange( long nodeId, byte chunkMask )
+    {
+        int chunkId = chunkOf( nodeId );
+        return (chunkChangedArray[chunkId] & chunkMask) != 0;
+    }
 }
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java
index 85415578accc8..97c14bb52a235 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java
@@ -22,7 +22,8 @@
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
-
+import java.util.HashSet;
+import java.util.Set;
 import org.neo4j.function.ThrowingSupplier;
 import org.neo4j.io.ByteUnit;
 import org.neo4j.io.fs.FileSystemAbstraction;
@@ -89,6 +90,8 @@
  */
 public class InputCache implements Closeable
 {
+    public static final String MAIN = "main";
+
     private static final String HEADER = "-header";
     private static final String NODES = "nodes";
     private static final String RELATIONSHIPS = "relationships";
@@ -114,6 +117,7 @@ public class InputCache implements Closeable
     private final FileSystemAbstraction fs;
     private final File cacheDirectory;
     private final int bufferSize;
+    private final Set subTypes = new HashSet<>();
 
     public InputCache( FileSystemAbstraction fs, File cacheDirectory )
     {
@@ -127,52 +131,71 @@ public InputCache( FileSystemAbstraction fs, File cacheDirectory, int bufferSize
         this.bufferSize = bufferSize;
     }
 
-    public Receiver cacheNodes() throws IOException
+    public Receiver cacheNodes( String subType ) throws IOException
     {
-        return new InputNodeCacher( channel( NODES, "rw" ), channel( NODES_HEADER, "rw" ), bufferSize );
+        return new InputNodeCacher( channel( NODES, subType, "rw" ), channel( NODES_HEADER, subType, "rw" ),
+                bufferSize );
     }
 
-    public Receiver cacheRelationships() throws IOException
+    public Receiver cacheRelationships( String subType ) throws IOException
     {
-        return new InputRelationshipCacher( channel( RELATIONSHIPS, "rw" ),
-                channel( RELATIONSHIPS_HEADER, "rw" ), bufferSize );
+        return new InputRelationshipCacher( channel( RELATIONSHIPS, subType, "rw" ),
+                channel( RELATIONSHIPS_HEADER, subType, "rw" ), bufferSize );
     }
 
-    private StoreChannel channel( String type, String mode ) throws IOException
+    private StoreChannel channel( String type, String subType, String mode ) throws IOException
     {
-        return fs.open( file( type ), mode );
+        return fs.open( file( type, subType ), mode );
     }
 
-    private File file( String type )
+    private File file( String type, String subType )
     {
-        return new File( cacheDirectory, "input-" + type );
+        subTypes.add( subType );
+        return new File( cacheDirectory, "input-" + type + "-" + subType );
     }
 
-    public InputIterable nodes()
+    public InputIterable nodes( String subType, boolean deleteAfterUse )
     {
         return entities( new ThrowingSupplier, IOException>()
         {
             @Override
             public InputIterator get() throws IOException
             {
-                return new InputNodeReader( channel( NODES, "r" ), channel( NODES_HEADER, "r" ), bufferSize );
+                return new InputNodeReader( channel( NODES, subType, "r" ), channel( NODES_HEADER, subType, "r" ),
+                        bufferSize, deleteAction( deleteAfterUse, NODES, NODES_HEADER, subType ) );
             }
         } );
     }
 
-    public InputIterable relationships()
+    public InputIterable relationships( String subType, boolean deleteAfterUse )
     {
         return entities( new ThrowingSupplier, IOException>()
         {
             @Override
             public InputIterator get() throws IOException
             {
-                return new InputRelationshipReader( channel( RELATIONSHIPS, "r" ),
-                        channel( RELATIONSHIPS_HEADER, "r" ), bufferSize );
+                return new InputRelationshipReader( channel( RELATIONSHIPS, subType, "r" ),
+                        channel( RELATIONSHIPS_HEADER, subType, "r" ), bufferSize,
+                        deleteAction( deleteAfterUse, RELATIONSHIPS, RELATIONSHIPS_HEADER, subType ) );
             }
         } );
     }
 
+    protected Runnable deleteAction( boolean deleteAfterUse, String type, String header, String subType )
+    {
+        if ( !deleteAfterUse )
+        {
+            return () -> {};
+        }
+
+        return () ->
+        {
+            fs.deleteFile( file( type, subType ) );
+            fs.deleteFile( file( header, subType ) );
+            subTypes.remove( subType );
+        };
+    }
+
     private  InputIterable entities(
             final ThrowingSupplier, IOException> factory )
     {
@@ -202,9 +225,12 @@ public boolean supportsMultiplePasses()
     @Override
     public void close() throws IOException
     {
-        fs.deleteFile( file( NODES ) );
-        fs.deleteFile( file( RELATIONSHIPS ) );
-        fs.deleteFile( file( NODES_HEADER ) );
-        fs.deleteFile( file( RELATIONSHIPS_HEADER ) );
+        for ( String subType : subTypes )
+        {
+            fs.deleteFile( file( NODES, subType ) );
+            fs.deleteFile( file( RELATIONSHIPS, subType ) );
+            fs.deleteFile( file( NODES_HEADER, subType ) );
+            fs.deleteFile( file( RELATIONSHIPS_HEADER, subType ) );
+        }
     }
 }
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java
index 50d8f923c98c7..acef976c4935c 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java
@@ -22,7 +22,6 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-
 import org.neo4j.io.ByteUnit;
 import org.neo4j.io.fs.StoreChannel;
 import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java
index d2228bc4e401f..956350235f2dc 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java
@@ -51,8 +51,10 @@ abstract class InputEntityReader extends Prefetching
     private int lineNumber;
     private final Group[] previousGroups;
     private final PrimitiveIntObjectMap tokens = Primitive.intObjectMap();
+    private final Runnable closeAction;
 
-    InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots ) throws IOException
+    InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots,
+            Runnable closeAction ) throws IOException
     {
         this.previousGroups = new Group[groupSlots];
         for ( int i = 0; i < groupSlots; i++ )
@@ -60,6 +62,7 @@ abstract class InputEntityReader extends Prefetching
             previousGroups[i] = Group.GLOBAL;
         }
         this.channel = reader( channel, bufferSize );
+        this.closeAction = closeAction;
         readHeader( header );
     }
 
@@ -181,6 +184,7 @@ public void close()
         try
         {
             channel.close();
+            closeAction.run();
         }
         catch ( IOException e )
         {
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java
index 4ab35fc9afff3..2c5748518b688 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java
@@ -36,11 +36,12 @@
  */
 public class InputNodeReader extends InputEntityReader
 {
-    private String[] previousLabels = InputNode.NO_LABELS;
+    private String[] previousLabels = InputEntity.NO_LABELS;
 
-    public InputNodeReader( StoreChannel channel, StoreChannel header, int bufferSize ) throws IOException
+    public InputNodeReader( StoreChannel channel, StoreChannel header, int bufferSize, Runnable closeAction )
+            throws IOException
     {
-        super( channel, header, bufferSize, 1 );
+        super( channel, header, bufferSize, 1, closeAction );
     }
 
     @Override
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationship.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationship.java
index 93fa416adb3de..f1cf295b5313a 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationship.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationship.java
@@ -137,4 +137,9 @@ protected void toStringFields( Collection> fields )
             fields.add( Pair.of( "type", type ) );
         }
     }
+
+    public Object typeAsObject()
+    {
+        return hasTypeId() ? typeId() : type();
+    }
 }
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java
index 8c0668ad2f70d..1ba68878885e3 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java
@@ -20,7 +20,6 @@
 package org.neo4j.unsafe.impl.batchimport.input;
 
 import java.io.IOException;
-
 import org.neo4j.io.fs.StoreChannel;
 
 import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_TYPE;
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipReader.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipReader.java
index d1c4f40526022..5ff55a21249f6 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipReader.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipReader.java
@@ -36,9 +36,10 @@ public class InputRelationshipReader extends InputEntityReader.
+ */
+package org.neo4j.unsafe.impl.batchimport.input;
+
+import java.io.IOException;
+import java.util.function.ToIntFunction;
+
+import org.neo4j.helpers.collection.PrefetchingIterator;
+import org.neo4j.unsafe.impl.batchimport.InputIterator;
+
+import static java.lang.Integer.max;
+
+/**
+ * Takes an {@link InputIterator} and splits up {@link InputRelationship relationships} by type.
+ * Uses {@link InputCache} to populate (all except the first type) on first pass, then reading from the
+ * cached relationships per type for all the other types.
+ */
+public class PerTypeRelationshipSplitter extends PrefetchingIterator>
+{
+    private final Object[] allRelationshipTypes;
+    private final InputIterator actual;
+    private final ToIntFunction typeToId;
+    private final InputCache inputCache;
+
+    private int typeCursor;
+
+    public PerTypeRelationshipSplitter( InputIterator actual, Object[] allRelationshipTypes,
+            ToIntFunction typeToId, InputCache inputCache )
+    {
+        this.actual = actual;
+        this.allRelationshipTypes = allRelationshipTypes;
+        this.typeToId = typeToId;
+        this.inputCache = inputCache;
+    }
+
+    @Override
+    protected InputIterator fetchNextOrNull()
+    {
+        if ( typeCursor == allRelationshipTypes.length )
+        {
+            return null;
+        }
+
+        Object type = allRelationshipTypes[typeCursor++];
+        if ( typeCursor == 1 )
+        {
+            // This is the first relationship type. If we're lucky and this is a new import
+            // then this type will also represent the type the most relationship are of.
+            // We'll basically return the actual iterator, but with a filter to only return
+            // this type. The other relationships will be cached by type.
+            return new FilteringAndPerTypeCachingInputIterator( actual, type );
+        }
+
+        // This isn't the first relationship type. The first pass cached relationships
+        // per type on disk into InputCache. Simply get the correct one and return.
+        return inputCache.relationships( cacheSubType( type ), true/*delete after use*/ ).iterator();
+    }
+
+    String cacheSubType( Object type )
+    {
+        return String.valueOf( typeToId.applyAsInt( type ) );
+    }
+
+    /**
+     * @return the type currently being iterated over, e.g. the type that the {@link InputIterator} returned
+     * from the most recent call to iterates over.
+     */
+    public Object currentType()
+    {
+        return allRelationshipTypes[typeCursor-1];
+    }
+
+    int highestTypeId()
+    {
+        int highest = 0;
+        for( Object type : allRelationshipTypes )
+        {
+            highest = max( highest, typeToId.applyAsInt( type ) );
+        }
+        return highest;
+    }
+
+    public class FilteringAndPerTypeCachingInputIterator extends InputIterator.Delegate
+    {
+        private final Object currentType;
+        // index into this array is actual typeId, which may be 0 - 2^16-1
+        private final Receiver[] receivers;
+        private final InputRelationship[] transport = new InputRelationship[1];
+
+        @SuppressWarnings( "unchecked" )
+        public FilteringAndPerTypeCachingInputIterator( InputIterator actual, Object currentType )
+        {
+            super( actual );
+            this.currentType = currentType;
+            this.receivers = new Receiver[highestTypeId()+1];
+            for ( Object type : allRelationshipTypes )
+            {
+                if ( type.equals( currentType ) )
+                {
+                    // We're iterating over this type, let's not cache it. Also accounted for in the
+                    // receivers array above, which is 1 less than number of types in total.
+                    continue;
+                }
+
+                try
+                {
+                    int typeId = typeToId.applyAsInt( type );
+                    receivers[typeId] = inputCache.cacheRelationships( cacheSubType( type ) );
+                }
+                catch ( IOException e )
+                {
+                    throw new InputException( "Error creating a cacher", e );
+                }
+            }
+        }
+
+        @Override
+        protected InputRelationship fetchNextOrNull()
+        {
+            while ( true )
+            {
+                InputRelationship candidate = super.fetchNextOrNull();
+                if ( candidate == null )
+                {
+                    // No more relationships
+                    return null;
+                }
+
+                if ( candidate.typeAsObject().equals( currentType ) )
+                {
+                    // This is a relationship of the requested type
+                    return candidate;
+                }
+
+                // This is a relationships of a different type, cache it
+                transport[0] = candidate;
+                try
+                {
+                    int typeId = typeToId.applyAsInt( candidate.typeAsObject() );
+                    receivers[typeId].receive( transport );
+                }
+                catch ( IOException e )
+                {
+                    throw new InputException( "Error caching relationship " + candidate, e );
+                }
+            }
+        }
+
+        @Override
+        public void close()
+        {
+            for ( Receiver receiver : receivers )
+            {
+                if ( receiver != null )
+                {
+                    try
+                    {
+                        receiver.close();
+                    }
+                    catch ( IOException e )
+                    {
+                        throw new InputException( "Error closing cacher", e );
+                    }
+                }
+            }
+
+            // This will delegate to the actual iterator and so close the external input iterator
+            super.close();
+        }
+    }
+}
diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingTokenRepository.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingTokenRepository.java
index 0706723e66030..929702174a7ba 100644
--- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingTokenRepository.java
+++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingTokenRepository.java
@@ -39,11 +39,11 @@
 
 /**
  * Batching version of a {@link TokenStore} where tokens can be created and retrieved, but only persisted
- * to storage as part of {@link #close() closing}.
+ * to storage as part of {@link #close() closing}. Instances of this class are thread safe
+ * to call {@link #getOrCreateId(String)} methods on.
  */
 public abstract class BatchingTokenRepository
 {
-    // TODO more efficient data structure
     private final Map tokens = new HashMap<>();
     private final TokenStore store;
     private int highId;
@@ -51,10 +51,16 @@ public abstract class BatchingTokenRepository store )
     {
         this.store = store;
-        // TODO read the store into the repository, i.e. into existing?
         this.highId = (int)store.getHighId();
     }
 
+    /**
+     * Returns the id for token with the specified {@code name}, potentially creating that token and
+     * assigning a new id as part of this call.
+     *
+     * @param name token name.
+     * @return the id (created or existing) for the token by this name.
+     */
     public int getOrCreateId( String name )
     {
         assert name != null;
@@ -75,15 +81,42 @@ public int getOrCreateId( String name )
     }
 
     /**
-     * Converts label names into label ids. Also sorts and deduplicates.
+     * Returns the id for token with the specified {@code key}, which can be a {@link String} if representing
+     * a user-defined name or an {@link Integer} if representing an existing type from an external source,
+     * which wants to preserve its name --> id tokens. Also see {@link #getOrCreateId(String)} for more details.
+     *
+     * @param key name or id of this token.
+     * @return the id (created or existing) for the token key.
+     */
+    public int getOrCreateId( Object key )
+    {
+        if ( key instanceof String )
+        {
+            // A name was supplied, get or create a token id for it
+            return getOrCreateId( (String) key );
+        }
+        else if ( key instanceof Integer )
+        {
+            // A raw token id was supplied, just use it
+            return (Integer) key;
+        }
+        throw new IllegalArgumentException( "Expected either a String or Integer for property key, but was '" +
+                key + "'" + ", " + key.getClass() );
+    }
+
+    /**
+     * Returns or creates multiple tokens for given token names.
+     *
+     * @param names token names to lookup or create token ids for.
+     * @return {@code long[]} containing the label ids.
      */
-    public long[] getOrCreateIds( String[] labels )
+    public long[] getOrCreateIds( String[] names )
     {
-        long[] result = new long[labels.length];
+        long[] result = new long[names.length];
         int from, to;
-        for ( from = 0, to = 0; from < labels.length; from++ )
+        for ( from = 0, to = 0; from < names.length; from++ )
         {
-            int id = getOrCreateId( labels[from] );
+            int id = getOrCreateId( names[from] );
             if ( !contains( result, id, to ) )
             {
                 result[to++] = id;
@@ -116,6 +149,9 @@ public int getHighId()
 
     protected abstract RECORD createRecord( int key );
 
+    /**
+     * Closes this repository and writes all created tokens to the underlying store.
+     */
     public void close()
     {
         // Batch-friendly record access
@@ -181,22 +217,6 @@ public void propertyKeysAndValues( PropertyBlock[] target, int offset, Object[]
                 target[offset+i] = creator.encodeValue( new PropertyBlock(), key, value );
             }
         }
-
-        private int getOrCreateId( Object key )
-        {
-            if ( key instanceof String )
-            {
-                // A name was supplied, get or create a token id for it
-                return getOrCreateId( (String) key );
-            }
-            else if ( key instanceof Integer )
-            {
-                // A raw token id was supplied, just use it
-                return (Integer) key;
-            }
-            throw new IllegalArgumentException( "Expected either a String or Integer for property key, but was '" +
-                    key + "'" + ", " + key.getClass() );
-        }
     }
 
     public static class BatchingLabelTokenRepository extends BatchingTokenRepository
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java
index f79a5ca926e58..aed0bff4a670f 100644
--- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java
@@ -68,7 +68,7 @@ public void panic( Throwable cause )
         // THEN
         for ( long id : ids )
         {
-            assertEquals( numberOfBatches, cache.getCount( id, 0, null /*shouldn't be used here anyway*/ ) );
+            assertEquals( numberOfBatches, cache.getCount( id, null /*shouldn't be used here anyway*/ ) );
         }
     }
 
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java
index e03c3b03ea4e7..29124b26f957b 100644
--- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipRecordsBackwardsStepTest.java
@@ -46,7 +46,7 @@ public void reservedIdIsSkipped()
         when( store.getHighId() ).thenReturn( highId );
 
         ReadRelationshipRecordsBackwardsStep step = new ReadRelationshipRecordsBackwardsStep(
-                mock( StageControl.class ), Configuration.DEFAULT, store );
+                mock( StageControl.class ), Configuration.DEFAULT, store, 0 );
 
         Object batch = step.nextBatchOrNull( 0, batchSize );
 
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java
index ce6d45ad3d6de..a867686a45278 100644
--- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java
@@ -40,7 +40,8 @@ public void reservedIdIsSkipped() throws Exception
     {
         long highId = 5;
         RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId );
-        RelationshipLinkbackStage stage = new RelationshipLinkbackStage( Configuration.DEFAULT, store, newCache() );
+        RelationshipLinkbackStage stage = new RelationshipLinkbackStage( "Test",
+                Configuration.DEFAULT, store, newCache(), 0, false );
 
         ExecutionSupervisors.superviseExecution( ExecutionMonitors.invisible(), Configuration.DEFAULT, stage );
 
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java
new file mode 100644
index 0000000000000..f8a5276150232
--- /dev/null
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see .
+ */
+package org.neo4j.unsafe.impl.batchimport;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.neo4j.kernel.impl.store.record.RelationshipRecord;
+import org.neo4j.test.RandomRule;
+import org.neo4j.unsafe.impl.batchimport.input.InputRelationship;
+import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
+import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
+import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+
+import static java.util.Arrays.asList;
+
+import static org.neo4j.helpers.collection.Iterators.loop;
+import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT;
+import static org.neo4j.unsafe.impl.batchimport.input.Group.GLOBAL;
+import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES;
+
+public class RelationshipTypeCheckerStepTest
+{
+    @Rule
+    public final RandomRule random = new RandomRule();
+
+    @Test
+    public void shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation() throws Throwable
+    {
+        shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( true );
+    }
+
+    @Test
+    public void shouldReturnRelationshipTypeNamesInReverseOrderOfTokenCreation() throws Throwable
+    {
+        shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( false );
+    }
+
+    private void shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( boolean typeIds ) throws Throwable
+    {
+        // GIVEN
+        BatchingRelationshipTypeTokenRepository repository = mock( BatchingRelationshipTypeTokenRepository.class );
+        RelationshipTypeCheckerStep step =
+                new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT, repository );
+
+        // WHEN
+        Batch relationships =
+                batchOfRelationshipsWithRandomTypes( 10, typeIds );
+        step.process( relationships, mock( BatchSender.class ) );
+        step.done();
+
+        // THEN
+        Object[] processed = step.getRelationshipTypes( 100 );
+
+        InOrder inOrder = inOrder( repository );
+        for ( Object type : reversed( processed ) )
+        {
+            inOrder.verify( repository ).getOrCreateId( type );
+        }
+        inOrder.verifyNoMoreInteractions();
+    }
+
+    @Test
+    public void shouldReturnRelationshipTypesInDescendingOrder() throws Throwable
+    {
+        // GIVEN
+        BatchingRelationshipTypeTokenRepository repository = mock( BatchingRelationshipTypeTokenRepository.class );
+        RelationshipTypeCheckerStep step = new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT,
+                repository );
+        Batch relationships =
+                batchOfRelationshipsWithRandomTypes( 10, true/*use the raw ids*/ );
+        step.process( relationships, mock( BatchSender.class ) );
+
+        // WHEN
+        step.done();
+
+        // THEN
+        TreeSet expected = idsOf( relationships );
+        Object[] processed = step.getRelationshipTypes( 100 );
+        int i = 0;
+        for ( Object expectedType : loop( expected.descendingIterator() ) )
+        {
+            assertEquals( expectedType, processed[i++] );
+        }
+    }
+
+    private TreeSet idsOf( Batch relationships )
+    {
+        TreeSet types = new TreeSet<>();
+        for ( InputRelationship relationship : relationships.input )
+        {
+            types.add( relationship.typeId() );
+        }
+        return types;
+    }
+
+    private Batch batchOfRelationshipsWithRandomTypes(
+            int maxTypes, boolean typeIds )
+    {
+        InputRelationship[] relationships = new InputRelationship[100];
+        for ( int i = 0; i < relationships.length; i++ )
+        {
+            int typeId = random.nextInt( maxTypes );
+            relationships[i] = new InputRelationship( "test", i, i, NO_PROPERTIES, null, GLOBAL,
+                    0L, GLOBAL, 0L,
+                    typeIds ? null : "TYPE_" + String.valueOf( typeId ),
+                    typeIds ? typeId : null );
+        }
+        return new Batch<>( relationships );
+    }
+
+    private Object[] reversed( Object[] objects )
+    {
+        List list = new ArrayList<>( asList( objects ) );
+        Collections.reverse( list );
+        return list.toArray();
+    }
+}
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java
index 4e678fbdfa2a9..fbd5296d10505 100644
--- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java
@@ -20,40 +20,51 @@
 package org.neo4j.unsafe.impl.batchimport.cache;
 
 import org.junit.After;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.InOrder;
-
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Random;
-
+import org.neo4j.collection.primitive.Primitive;
+import org.neo4j.collection.primitive.PrimitiveLongObjectMap;
+import org.neo4j.collection.primitive.PrimitiveLongSet;
 import org.neo4j.graphdb.Direction;
+import org.neo4j.test.RandomRule;
 import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor;
+import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.inOrder;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import static java.lang.Math.max;
-import static java.lang.System.currentTimeMillis;
 
+import static org.neo4j.graphdb.Direction.BOTH;
+import static org.neo4j.graphdb.Direction.INCOMING;
 import static org.neo4j.graphdb.Direction.OUTGOING;
 
 @RunWith( Parameterized.class )
 public class NodeRelationshipCacheTest
 {
+    @Rule
+    public final RandomRule random = new RandomRule();
     @Parameterized.Parameter( 0 )
     public long base;
     private NodeRelationshipCache cache;
 
+    @After
+    public void after()
+    {
+        cache.close();
+    }
+
     @Parameterized.Parameters
     public static Collection data()
     {
@@ -74,6 +85,7 @@ public void shouldReportCorrectNumberOfDenseNodes() throws Exception
         increment( cache, 23, 4 );
         increment( cache, 24, 5 );
         increment( cache, 25, 6 );
+        cache.setHighNodeId( 25 );
 
         // THEN
         assertFalse( cache.isDense( 0 ) );
@@ -91,123 +103,59 @@ public void shouldGoThroughThePhases() throws Exception
         // GIVEN
         int nodeCount = 10;
         cache = new NodeRelationshipCache( NumberArrayFactory.OFF_HEAP, 20, 100, base );
+        cache.setHighNodeId( nodeCount );
         incrementRandomCounts( cache, nodeCount, nodeCount*20 );
 
         // Test sparse node semantics
         {
             long node = findNode( cache, nodeCount, false );
-            testNode( cache, node, -1, null );
+            testNode( cache, node, null );
         }
 
         // Test dense node semantics
         {
             long node = findNode( cache, nodeCount, true );
-            testNode( cache, node, 4, Direction.OUTGOING );
-            testNode( cache, node, 4, Direction.INCOMING );
-            testNode( cache, node, 2, Direction.OUTGOING );
+            testNode( cache, node, Direction.OUTGOING );
+            testNode( cache, node, Direction.INCOMING );
         }
     }
 
     @Test
-    public void shouldAddGroupAfterTheFirst() throws Exception
-    {
-        // GIVEN a dense node
-        long denseNode = 0;
-        cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base );
-        cache.incrementCount( denseNode );
-        cache.getAndPutRelationship( denseNode, 0, Direction.OUTGOING, 0, true );
-
-        // WHEN
-        cache.getAndPutRelationship( denseNode, 1, Direction.INCOMING, 1, true );
-        // just fill more data into the groups
-        cache.getAndPutRelationship( denseNode, 0, Direction.INCOMING, 2, true );
-        cache.getAndPutRelationship( denseNode, 1, Direction.OUTGOING, 3, true );
-
-        // THEN
-        GroupVisitor visitor = mock( GroupVisitor.class );
-        assertEquals( 0L, cache.getFirstRel( denseNode, visitor ) );
-        InOrder order = inOrder( visitor );
-        order.verify( visitor ).visit( denseNode, 0, base + 1L, 0L, 2L, -1L );
-        order.verify( visitor ).visit( denseNode, 1, -1L, 3L, 1L, -1L );
-        order.verifyNoMoreInteractions();
-    }
-
-    @Test
-    public void shouldAddGroupBeforeTheFirst() throws Exception
-    {
-        // GIVEN a dense node
-        long denseNode = 0;
-        cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base );
-        cache.incrementCount( denseNode );
-        cache.getAndPutRelationship( denseNode, 1, Direction.INCOMING, 1, true );
-
-        // WHEN
-        cache.getAndPutRelationship( denseNode, 0, Direction.OUTGOING, 0, true );
-        // just fill more data into the groups
-        cache.getAndPutRelationship( denseNode, 0, Direction.INCOMING, 2, true );
-        cache.getAndPutRelationship( denseNode, 1, Direction.OUTGOING, 3, true );
-
-        // THEN
-        GroupVisitor visitor = mock( GroupVisitor.class );
-        assertEquals( 0L, cache.getFirstRel( denseNode, visitor ) );
-        InOrder order = inOrder( visitor );
-        order.verify( visitor ).visit( denseNode, 0, base + 1L, 0L, 2L, -1L );
-        order.verify( visitor ).visit( denseNode, 1, -1L, 3L, 1L, -1L );
-        order.verifyNoMoreInteractions();
-    }
-
-    @Test
-    public void shouldAddGroupInTheMiddleIfTwo() throws Exception
-    {
-        // GIVEN a dense node
-        long denseNode = 0;
-        cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base );
-        cache.incrementCount( denseNode );
-        cache.getAndPutRelationship( denseNode, 0, Direction.OUTGOING, 0, true );
-        cache.getAndPutRelationship( denseNode, 2, Direction.OUTGOING, 1, true );
-
-        // WHEN
-        cache.getAndPutRelationship( denseNode, 1, Direction.INCOMING, 2, true );
-        // just fill more data into the groups
-        cache.getAndPutRelationship( denseNode, 0, Direction.INCOMING, 3, true );
-        cache.getAndPutRelationship( denseNode, 1, Direction.OUTGOING, 4, true );
-        cache.getAndPutRelationship( denseNode, 2, Direction.INCOMING, 5, true );
-        cache.getAndPutRelationship( denseNode, 1, Direction.BOTH, 6, true );
-
-        // THEN
-        GroupVisitor visitor = mock( GroupVisitor.class );
-        assertEquals( 0L, cache.getFirstRel( denseNode, visitor ) );
-        verify( visitor ).visit( denseNode, 0, base + 2L, 0L, 3L, -1L );
-        verify( visitor ).visit( denseNode, 1, base + 1L, 4L, 2L, 6L );
-        verify( visitor ).visit( denseNode, 2, -1L, 1L, 5L, -1L );
-        verifyNoMoreInteractions( visitor );
-    }
-
-    @Test
-    public void shouldClearRelationships() throws Exception
+    public void shouldObserveFirstRelationshipAsEmptyInEachDirection() throws Exception
     {
         // GIVEN
         cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base );
         int nodes = 100;
         Direction[] directions = Direction.values();
         GroupVisitor groupVisitor = mock( GroupVisitor.class );
+        cache.setForwardScan( true );
+        cache.setHighNodeId( nodes );
         for ( int i = 0; i < nodes; i++ )
         {
             assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) );
             cache.incrementCount( i );
-            cache.getAndPutRelationship( i, i % 5, directions[i % directions.length],
+            long previous = cache.getAndPutRelationship( i, directions[i % directions.length],
                     random.nextInt( 1_000_000 ), true );
-            assertEquals( 1, cache.getCount( i, i % 5, directions[i % directions.length] ) );
+            assertEquals( -1L, previous );
+            assertEquals( 1, cache.getCount( i, directions[i % directions.length] ) );
         }
 
         // WHEN
-        cache.clearRelationships();
+        cache.setForwardScan( false );
+        for ( int i = 0; i < nodes; i++ )
+        {
+            long previous = cache.getAndPutRelationship( i, directions[i % directions.length],
+                    random.nextInt( 1_000_000 ), false );
+            assertEquals( -1L, previous );
+            assertEquals( 1, cache.getCount( i, directions[i % directions.length] ) );
+        }
 
         // THEN
+        cache.setForwardScan( true );
         for ( int i = 0; i < nodes; i++ )
         {
             assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) );
-            assertEquals( 1, cache.getCount( i, i % 5, directions[i % directions.length] ) );
+            assertEquals( 1, cache.getCount( i, directions[i % directions.length] ) );
         }
     }
 
@@ -219,10 +167,10 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception
 
         // WHEN
         long nodeId = 1_000_000 - 1;
-        int type = 0;
+        cache.setHighNodeId( nodeId );
         Direction direction = Direction.OUTGOING;
         long relId = 10;
-        cache.getAndPutRelationship( nodeId, type, direction, relId, false );
+        cache.getAndPutRelationship( nodeId, direction, relId, false );
 
         // THEN
         assertEquals( relId, cache.getFirstRel( nodeId, mock( GroupVisitor.class ) ) );
@@ -232,13 +180,36 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception
     public void shouldPutRandomStuff() throws Exception
     {
         // GIVEN
-        cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 1000, base );
+        int nodes = 10_000;
+        PrimitiveLongObjectMap key = Primitive.longObjectMap( nodes );
+        cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 1000, base );
+
+        // mark random nodes as dense (dense node threshold is 1 so enough with one increment
+        for ( long nodeId = 0; nodeId < nodes; nodeId++ )
+        {
+            if ( random.nextBoolean() )
+            {
+                cache.incrementCount( nodeId );
+            }
+        }
+        cache.setHighNodeId( nodes );
 
         // WHEN
-        for ( int i = 0; i < 10_000; i++ )
+        for ( int i = 0; i < 100_000; i++ )
         {
-            cache.getAndPutRelationship( random.nextInt( 100_000 ), random.nextInt( 5 ),
-                    Direction.OUTGOING, random.nextInt( 1_000_000 ), true );
+            long nodeId = random.nextLong( nodes );
+            boolean dense = cache.isDense( nodeId );
+            Direction direction = random.among( Direction.values() );
+            long relationshipId = random.nextLong( 1_000_000 );
+            long previousHead = cache.getAndPutRelationship( nodeId, direction, relationshipId, false );
+            long[] keyIds = key.get( nodeId );
+            int keyIndex = dense ? direction.ordinal() : 0;
+            if ( keyIds == null )
+            {
+                key.put( nodeId, keyIds = minusOneLongs( Direction.values().length ) );
+            }
+            assertEquals( keyIds[keyIndex], previousHead );
+            keyIds[keyIndex] = relationshipId;
         }
     }
 
@@ -251,15 +222,15 @@ public void shouldPut6ByteRelationshipIds() throws Exception
         long denseNode = 1;
         long relationshipId = (1L << 48) - 2;
         cache.incrementCount( denseNode );
+        cache.setHighNodeId( 1 );
 
         // WHEN
-        assertEquals( -1L, cache.getAndPutRelationship( sparseNode, 0, OUTGOING, relationshipId, false ) );
-        assertEquals( -1L, cache.getAndPutRelationship( denseNode, 0, OUTGOING, relationshipId, false ) );
+        assertEquals( -1L, cache.getAndPutRelationship( sparseNode, OUTGOING, relationshipId, false ) );
+        assertEquals( -1L, cache.getAndPutRelationship( denseNode, OUTGOING, relationshipId, false ) );
 
         // THEN
-        GroupVisitor groupVisitor = mock( GroupVisitor.class );
-        assertEquals( relationshipId, cache.getAndPutRelationship( sparseNode, 0, OUTGOING, 1, false ) );
-        assertEquals( relationshipId, cache.getAndPutRelationship( denseNode, 0, OUTGOING, 1, false ) );
+        assertEquals( relationshipId, cache.getAndPutRelationship( sparseNode, OUTGOING, 1, false ) );
+        assertEquals( relationshipId, cache.getAndPutRelationship( denseNode, OUTGOING, 1, false ) );
     }
 
     @Test
@@ -267,12 +238,13 @@ public void shouldFailFastIfTooBigRelationshipId() throws Exception
     {
         // GIVEN
         cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
+        cache.setHighNodeId( 1 );
 
         // WHEN
-        cache.getAndPutRelationship( 0, 0, OUTGOING, (1L << 48) - 2, false );
+        cache.getAndPutRelationship( 0, OUTGOING, (1L << 48) - 2, false );
         try
         {
-            cache.getAndPutRelationship( 0, 0, OUTGOING, (1L << 48) - 1, false );
+            cache.getAndPutRelationship( 0, OUTGOING, (1L << 48) - 1, false );
             fail( "Should fail" );
         }
         catch ( IllegalArgumentException e )
@@ -282,12 +254,143 @@ public void shouldFailFastIfTooBigRelationshipId() throws Exception
         }
     }
 
-    private void testNode( NodeRelationshipCache link, long node, int type, Direction direction )
+    @Test
+    public void shouldVisitChangedNodes() throws Exception
+    {
+        // GIVEN
+        int nodes = 10;
+        cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 2, 100, base );
+        for ( long nodeId = 0; nodeId < nodes; nodeId++ )
+        {
+            cache.incrementCount( nodeId );
+            if ( random.nextBoolean() )
+            {
+                cache.incrementCount( nodeId );
+                System.out.println( nodeId + " is dense" );
+            }
+        }
+        cache.setHighNodeId( nodes );
+        PrimitiveLongSet keySparseChanged = Primitive.longSet( nodes );
+        PrimitiveLongSet keyDenseChanged = Primitive.longSet( nodes );
+        for ( int i = 0; i < nodes / 2; i++ )
+        {
+            long nodeId = random.nextLong( nodes );
+            cache.getAndPutRelationship( nodeId, Direction.OUTGOING, random.nextLong( 1_000_000 ), false );
+            boolean dense = cache.isDense( nodeId );
+            (dense ? keyDenseChanged : keySparseChanged).add( nodeId );
+            System.out.println( nodeId + " changed" );
+        }
+
+        {
+            // WHEN (sparse)
+            NodeChangeVisitor visitor = (nodeId, array) ->
+            {
+                // THEN (sparse)
+                assertTrue( "Unexpected sparse change reported for " + nodeId, keySparseChanged.remove( nodeId ) );
+            };
+            cache.visitChangedNodes( visitor, false/*sparse*/ );
+            assertTrue( "There was " + keySparseChanged.size() + " expected sparse changes that weren't reported",
+                    keySparseChanged.isEmpty() );
+        }
+
+        {
+            // WHEN (dense)
+            NodeChangeVisitor visitor = (nodeId, array) ->
+            {
+                // THEN (dense)
+                assertTrue( "Unexpected dense change reported for " + nodeId, keyDenseChanged.remove( nodeId ) );
+            };
+            cache.visitChangedNodes( visitor, true/*dense*/ );
+            assertTrue( "There was " + keyDenseChanged.size() + " expected dense changes that weren reported",
+                    keyDenseChanged.isEmpty() );
+        }
+    }
+
+    @Test
+    public void shouldFailFastOnTooHighCountOnNode() throws Exception
     {
-        int count = link.getCount( node, type, direction );
-        assertEquals( -1, link.getAndPutRelationship( node, type, direction, 5, false ) );
-        assertEquals( 5, link.getAndPutRelationship( node, type, direction, 10, false ) );
-        assertEquals( count, link.getCount( node, type, direction ) );
+        // GIVEN
+        cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 100, base );
+        long nodeId = 5;
+        int count = NodeRelationshipCache.MAX_COUNT - 5;
+        cache.setCount( nodeId, count );
+
+        // WHEN
+        for ( int i = 0; i < 10; i++ )
+        {
+            try
+            {
+                cache.incrementCount( i );
+            }
+            catch ( IllegalStateException e )
+            {
+                assertEquals( NodeRelationshipCache.MAX_COUNT + 1, i );
+                break;
+            }
+        }
+    }
+
+    @Test
+    public void shouldKeepNextGroupIdForNextRound() throws Exception
+    {
+        // GIVEN
+        cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base );
+        long nodeId = 0;
+        cache.incrementCount( nodeId );
+        cache.setHighNodeId( nodeId+1 );
+        GroupVisitor groupVisitor = mock( GroupVisitor.class );
+        when( groupVisitor.visit( anyLong(), anyLong(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L );
+
+        long firstRelationshipGroupId;
+        {
+            // WHEN importing the first type
+            long relationshipId = 10;
+            cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, true );
+            firstRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor );
+
+            // THEN
+            assertEquals( 1L, firstRelationshipGroupId );
+            verify( groupVisitor ).visit( nodeId, -1L, relationshipId, -1L, -1L );
+
+            // Also simulate going back again ("clearing" of the cache requires this)
+            cache.setForwardScan( false );
+            cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, false );
+            cache.setForwardScan( true );
+        }
+
+        long secondRelationshipGroupId;
+        {
+            // WHEN importing the second type
+            long relationshipId = 11;
+            cache.getAndPutRelationship( nodeId, INCOMING, relationshipId, true );
+            secondRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor );
+
+            // THEN
+            assertEquals( 2L, secondRelationshipGroupId );
+            verify( groupVisitor ).visit( nodeId, firstRelationshipGroupId, -1, relationshipId, -1L );
+
+            // Also simulate going back again ("clearing" of the cache requires this)
+            cache.setForwardScan( false );
+            cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, false );
+            cache.setForwardScan( true );
+        }
+
+        {
+            // WHEN importing the third type
+            long relationshipId = 10;
+            cache.getAndPutRelationship( nodeId, BOTH, relationshipId, true );
+            long thirdRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor );
+            assertEquals( 3L, thirdRelationshipGroupId );
+            verify( groupVisitor ).visit( nodeId, secondRelationshipGroupId, -1L, -1L, relationshipId );
+        }
+    }
+
+    private void testNode( NodeRelationshipCache link, long node, Direction direction )
+    {
+        int count = link.getCount( node, direction );
+        assertEquals( -1, link.getAndPutRelationship( node, direction, 5, false ) );
+        assertEquals( 5, link.getAndPutRelationship( node, direction, 10, false ) );
+        assertEquals( count, link.getCount( node, direction ) );
     }
 
     private long findNode( NodeRelationshipCache link, long nodeCount, boolean isDense )
@@ -313,21 +416,6 @@ private int incrementRandomCounts( NodeRelationshipCache link, int nodeCount, in
         return highestSeenCount;
     }
 
-    private Random random;
-
-    @Before
-    public void before()
-    {
-        long seed = currentTimeMillis();
-        random = new Random( seed );
-    }
-
-    @After
-    public void after()
-    {
-        cache.close();
-    }
-
     private void increment( NodeRelationshipCache cache, long node, int count )
     {
         for ( int i = 0; i < count; i++ )
@@ -335,4 +423,11 @@ private void increment( NodeRelationshipCache cache, long node, int count )
             cache.incrementCount( node );
         }
     }
+
+    private long[] minusOneLongs( int length )
+    {
+        long[] array = new long[length];
+        Arrays.fill( array, -1 );
+        return array;
+    }
 }
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java
index c96ae64238f14..7ce42f3360096 100644
--- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java
@@ -35,13 +35,13 @@
 import org.neo4j.test.TargetDirectory;
 import org.neo4j.test.TargetDirectory.TestDirectory;
 import org.neo4j.unsafe.impl.batchimport.InputIterator;
-
 import static java.lang.Math.abs;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.neo4j.helpers.collection.Iterators.asSet;
+import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN;
 import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_LABELS;
 import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES;
 
@@ -57,7 +57,7 @@ public void shouldCacheAndRetrieveNodes() throws Exception
         {
             List nodes = new ArrayList<>();
             Randoms random = new Randoms( randomRule.random(), Randoms.DEFAULT );
-            try ( Receiver cacher = cache.cacheNodes() )
+            try ( Receiver cacher = cache.cacheNodes( MAIN ) )
             {
                 InputNode[] batch = new InputNode[BATCH_SIZE];
                 for ( int b = 0; b < BATCHES; b++ )
@@ -73,7 +73,7 @@ public void shouldCacheAndRetrieveNodes() throws Exception
             }
 
             // WHEN/THEN
-            try ( InputIterator reader = cache.nodes().iterator() )
+            try ( InputIterator reader = cache.nodes( MAIN, true ).iterator() )
             {
                 Iterator expected = nodes.iterator();
                 while ( expected.hasNext() )
@@ -97,7 +97,7 @@ public void shouldCacheAndRetrieveRelationships() throws Exception
         {
             List relationships = new ArrayList<>();
             Randoms random = new Randoms( randomRule.random(), Randoms.DEFAULT );
-            try ( Receiver cacher = cache.cacheRelationships() )
+            try ( Receiver cacher = cache.cacheRelationships( MAIN ) )
             {
                 InputRelationship[] batch = new InputRelationship[BATCH_SIZE];
                 for ( int b = 0; b < BATCHES; b++ )
@@ -113,7 +113,7 @@ public void shouldCacheAndRetrieveRelationships() throws Exception
             }
 
             // WHEN/THEN
-            try ( InputIterator reader = cache.relationships().iterator() )
+            try ( InputIterator reader = cache.relationships( MAIN, true ).iterator() )
             {
                 Iterator expected = relationships.iterator();
                 while ( expected.hasNext() )
diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java
new file mode 100644
index 0000000000000..9fe3f1e579631
--- /dev/null
+++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see .
+ */
+package org.neo4j.unsafe.impl.batchimport.input;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.neo4j.io.fs.DefaultFileSystemAbstraction;
+import org.neo4j.test.RandomRule;
+import org.neo4j.test.TargetDirectory;
+import org.neo4j.unsafe.impl.batchimport.InputIterable;
+import org.neo4j.unsafe.impl.batchimport.InputIterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.neo4j.helpers.collection.Iterators.filter;
+import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES;
+import static org.neo4j.unsafe.impl.batchimport.input.SimpleInputIteratorWrapper.wrap;
+
+public class PerTypeRelationshipSplitterTest
+{
+    @Rule
+    public final RandomRule random = new RandomRule().withSeed( 1460373085111L );
+    @Rule
+    public final TargetDirectory.TestDirectory directory = TargetDirectory.testDirForTest( getClass() );
+
+    @Test
+    public void shouldReturnTypesOneByOne() throws Exception
+    {
+        // GIVEN
+        Collection expected = randomRelationships();
+        InputIterable relationships = wrap( "test", expected );
+        InputCache inputCache = new InputCache( new DefaultFileSystemAbstraction(), directory.directory() );
+        PerTypeRelationshipSplitter perType = new PerTypeRelationshipSplitter( relationships.iterator(),
+                types( expected ), type -> Integer.parseInt( type.toString() ), inputCache );
+
+        // WHEN
+        Set all = new HashSet<>();
+        while ( perType.hasNext() )
+        {
+            try ( InputIterator relationshipsOfThisType = perType.next() )
+            {
+                // THEN
+                Object type = perType.currentType();
+                Collection expectedRelationshipsOfThisType = nodesOf( filter(
+                        relationship -> relationship.typeAsObject().equals( type ), expected.iterator() ) );
+                assertEquals( expectedRelationshipsOfThisType, nodesOf( relationshipsOfThisType ) );
+                all.addAll( expectedRelationshipsOfThisType );
+            }
+        }
+
+        assertEquals( nodesOf( expected.iterator() ), all );
+    }
+
+    /**
+     * Get the nodes of the relationships. We use those to identify relationships, since they have no ID
+     * and no equals method (which they don't really need).
+     *
+     * @param relationship {@link InputRelationship} to get node ids from.
+     * @return {@link Collection} of node ids from {@link InputRelationship} relationships.
+     */
+    private Collection nodesOf( Iterator relationship )
+    {
+        Collection nodes = new HashSet<>();
+        while ( relationship.hasNext() )
+        {
+            nodes.add( relationship.next().startNode() );
+        }
+        return nodes;
+    }
+
+    private Object[] types( Collection expected )
+    {
+        Set types = new HashSet<>();
+        for ( InputRelationship relationship : expected )
+        {
+            types.add( relationship.typeAsObject() );
+        }
+        return types.toArray();
+    }
+
+    private Collection randomRelationships()
+    {
+        Collection result = new ArrayList<>();
+        int count = 100;
+        Group group = Group.GLOBAL;
+        boolean typeIds = random.nextBoolean();
+        for ( int i = 0; i < count; i++ )
+        {
+            int typeId = random.nextInt( 5 );
+            Object node = (long)i;
+            InputRelationship relationship = new InputRelationship( "test", i, i, NO_PROPERTIES, null,
+                    group, node, group, node,
+                    typeIds ? null : String.valueOf( typeId ),
+                    typeIds ? typeId : null );
+            result.add( relationship );
+        }
+        return result;
+    }
+}