diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ImportLogic.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ImportLogic.java index 545b54ac37d20..da92131b0ccb7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ImportLogic.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ImportLogic.java @@ -190,6 +190,7 @@ public T getState( Class type ) public void putState( T state ) { accessibleState.put( state.getClass(), state ); + dependencies.satisfyDependency( state ); } /** @@ -262,7 +263,7 @@ public void calculateNodeDegrees() { Configuration relationshipConfig = configWithRecordsPerPageBasedBatchSize( config, neoStore.getRelationshipStore() ); - nodeRelationshipCache.setHighNodeId( neoStore.getNodeStore().getHighId() ); + nodeRelationshipCache.setNodeCount( getState( RelationshipTypeDistribution.class ).getNodeCount() ); NodeDegreeCountStage nodeDegreeStage = new NodeDegreeCountStage( relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, memoryUsageStats ); executeStage( nodeDegreeStage ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeEncoderStep.java index 9e828dc1263b6..a2e60cea2779c 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeEncoderStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeEncoderStep.java @@ -23,7 +23,6 @@ import org.neo4j.kernel.impl.store.InlineNodeLabels; import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.kernel.impl.store.record.DynamicRecord; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index 141f828369450..459a6242e0710 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -64,7 +64,7 @@ public class RelationshipStage extends Stage public RelationshipStage( Configuration config, IoMonitor writeMonitor, InputIterable relationships, IdMapper idMapper, Collector badCollector, InputCache inputCache, - BatchingNeoStores neoStore, EntityStoreUpdaterStep.Monitor storeUpdateMonitor ) throws IOException + BatchingNeoStores neoStore, CountingStoreUpdateMonitor storeUpdateMonitor ) throws IOException { super( NAME, null, config, ORDER_SEND_DOWNSTREAM ); add( new InputIteratorBatcherStep<>( control(), config, relationships.iterator(), @@ -76,7 +76,8 @@ public RelationshipStage( Configuration config, IoMonitor writeMonitor, RelationshipStore relationshipStore = neoStore.getRelationshipStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); - add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository() ) ); + add( typer = new RelationshipTypeCheckerStep( control(), config, neoStore.getRelationshipTypeRepository(), + storeUpdateMonitor.nodesWritten() ) ); add( new AssignRelationshipIdBatchStep( control(), config, 0 ) ); add( new RelationshipPreparationStep( control(), config, idMapper ) ); add( new RelationshipRecordPreparationStep( control(), config, diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java index dc272dd80f5cd..008f625365aa8 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java @@ -53,13 +53,15 @@ public class RelationshipTypeCheckerStep extends ProcessorStep Integer.compare( (Integer) e2.getKey(), (Integer) e1.getKey() ); private final Map> typeCheckers = new ConcurrentHashMap<>(); private final BatchingRelationshipTypeTokenRepository typeTokenRepository; + private final long nodeCount; private RelationshipTypeDistribution distribution; public RelationshipTypeCheckerStep( StageControl control, Configuration config, - BatchingRelationshipTypeTokenRepository typeTokenRepository ) + BatchingRelationshipTypeTokenRepository typeTokenRepository, long nodeCount ) { super( control, "TYPE", config, 0 ); this.typeTokenRepository = typeTokenRepository; + this.nodeCount = nodeCount; } @Override @@ -102,7 +104,7 @@ protected void done() { typeTokenRepository.getOrCreateId( sortedTypes[i].getKey() ); } - distribution = new RelationshipTypeDistribution( convert( sortedTypes ) ); + distribution = new RelationshipTypeDistribution( nodeCount, convert( sortedTypes ) ); super.done(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java index 529cabf4f88d2..44d09088526f1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java @@ -33,9 +33,11 @@ public class RelationshipTypeDistribution implements Iterable> { // keys can be either String or Integer private final Pair[] sortedTypes; + private final long nodeCount; - public RelationshipTypeDistribution( Pair[] sortedTypes ) + public RelationshipTypeDistribution( long nodeCount, Pair[] sortedTypes ) { + this.nodeCount = nodeCount; this.sortedTypes = sortedTypes; } @@ -64,4 +66,19 @@ public Set types( int startingFromType, int upToType ) } return types; } + + public long getNodeCount() + { + return nodeCount; + } + + public long getRelationshipCount() + { + long sum = 0; + for ( Pair type : sortedTypes ) + { + sum += type.other(); + } + return sum; + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java index 257335f6ae6fa..9a829df388d63 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java @@ -225,7 +225,7 @@ private static boolean isBigCount( int storedCount ) * * @param nodeId high node id in the store, e.g. the highest node id + 1 */ - public void setHighNodeId( long nodeId ) + public void setNodeCount( long nodeId ) { this.highNodeId = nodeId; this.array = arrayFactory.newByteArray( highNodeId, minusOneBytes( ID_AND_COUNT_SIZE ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/HumanUnderstandableExecutionMonitor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/HumanUnderstandableExecutionMonitor.java index bc9e096434887..cb437c911cdc9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/HumanUnderstandableExecutionMonitor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/HumanUnderstandableExecutionMonitor.java @@ -29,6 +29,7 @@ import org.neo4j.unsafe.impl.batchimport.NodeStage; import org.neo4j.unsafe.impl.batchimport.RelationshipGroupStage; import org.neo4j.unsafe.impl.batchimport.RelationshipStage; +import org.neo4j.unsafe.impl.batchimport.RelationshipTypeDistribution; import org.neo4j.unsafe.impl.batchimport.ScanAndCacheGroupsStage; import org.neo4j.unsafe.impl.batchimport.SparseNodeFirstRelationshipStage; import org.neo4j.unsafe.impl.batchimport.cache.GatheringMemoryStatsVisitor; @@ -70,8 +71,6 @@ public class HumanUnderstandableExecutionMonitor implements ExecutionMonitor // assigned later on private final PrintStream out; private DependencyResolver dependencyResolver; - private long actualNodeCount; - private long actualRelationshipCount; // progress of current stage private long goal; @@ -164,7 +163,7 @@ else if ( execution.getStageName().equals( NodeDegreeCountStage.NAME ) ) // - backward linking // - node relationship linking // - forward linking - initializeLinking(); + initializeLinking( dependencyResolver.resolveDependency( RelationshipTypeDistribution.class ) ); } else if ( execution.getStageName().equals( CountGroupsStage.NAME ) ) { @@ -173,7 +172,9 @@ else if ( execution.getStageName().equals( CountGroupsStage.NAME ) ) // Misc: // - relationship group defragmentation // - counts store - initializeMisc( dependencyResolver.resolveDependency( BatchingNeoStores.class ) ); + initializeMisc( + dependencyResolver.resolveDependency( BatchingNeoStores.class ), + dependencyResolver.resolveDependency( RelationshipTypeDistribution.class ) ); } else if ( includeStage( execution ) ) { @@ -227,9 +228,10 @@ ESTIMATED_REQUIRED_MEMORY_USAGE, bytes( initializeProgress( numberOfRelationships ); } - private void initializeLinking() + private void initializeLinking( RelationshipTypeDistribution distribution ) { printStageHeader( "(3/4) Relationship linking" ); + long actualRelationshipCount = distribution.getRelationshipCount(); initializeProgress( actualRelationshipCount + // node degrees actualRelationshipCount * 2 + // start/end forwards, see RelationshipLinkingProgress @@ -237,10 +239,12 @@ private void initializeLinking() ); } - private void initializeMisc( BatchingNeoStores stores ) + private void initializeMisc( BatchingNeoStores stores, RelationshipTypeDistribution distribution ) { printStageHeader( "(4/4) Post processing" ); // written groups + node counts + relationship counts + long actualNodeCount = distribution.getNodeCount(); + long actualRelationshipCount = distribution.getRelationshipCount(); long groupCount = stores.getTemporaryRelationshipGroupStore().getHighId(); initializeProgress( groupCount + // Count groups @@ -352,14 +356,6 @@ private void printStageHeader( String name, Object... data ) @Override public void end( StageExecution execution, long totalTimeMillis ) { - if ( execution.getStageName().equals( NodeStage.NAME ) ) - { - actualNodeCount = progressOf( execution ); - } - else if ( execution.getStageName().equals( RelationshipStage.NAME ) ) - { - actualRelationshipCount = progressOf( execution ); - } } @Override diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ImportLogicTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ImportLogicTest.java index c3db584a8f79e..9bce8cef7ce6a 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ImportLogicTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ImportLogicTest.java @@ -48,7 +48,7 @@ public void shouldSplitUpRelationshipTypesInBatches() throws Exception int numberOfNodes = 100; int numberOfTypes = 10; NodeRelationshipCache cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, denseNodeThreshold ); - cache.setHighNodeId( numberOfNodes + 1 ); + cache.setNodeCount( numberOfNodes + 1 ); Direction[] directions = Direction.values(); for ( int i = 0; i < numberOfNodes; i++ ) { @@ -65,7 +65,7 @@ public void shouldSplitUpRelationshipTypesInBatches() throws Exception numberOfRelationships += count; } types.sort( ( t1, t2 ) -> Long.compare( t2.other(), t1.other() ) ); - RelationshipTypeDistribution typeDistribution = new RelationshipTypeDistribution( types.stream().toArray( Pair[]::new ) ); + RelationshipTypeDistribution typeDistribution = new RelationshipTypeDistribution( 0, types.stream().toArray( Pair[]::new ) ); // WHEN enough memory for all types { diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java index 6fffbae90ce3c..6c9570651742d 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java @@ -66,7 +66,7 @@ private void shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( boole // GIVEN BatchingRelationshipTypeTokenRepository repository = mock( BatchingRelationshipTypeTokenRepository.class ); RelationshipTypeCheckerStep step = - new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT, repository ); + new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT, repository, 0 ); // WHEN Batch relationships = @@ -89,7 +89,7 @@ public void shouldReturnRelationshipTypesInDescendingOrder() throws Throwable // GIVEN BatchingRelationshipTypeTokenRepository repository = mock( BatchingRelationshipTypeTokenRepository.class ); RelationshipTypeCheckerStep step = new RelationshipTypeCheckerStep( mock( StageControl.class ), DEFAULT, - repository ); + repository, 0 ); Batch relationships = batchOfRelationshipsWithRandomTypes( 10, true/*use the raw ids*/ ); step.process( relationships, mock( BatchSender.class ) ); diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java index bd46f0f827664..141c24fcae4b2 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java @@ -84,7 +84,7 @@ public void shouldReportCorrectNumberOfDenseNodes() throws Exception { // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.AUTO_WITHOUT_PAGECACHE, 5, 100, base ); - cache.setHighNodeId( 26 ); + cache.setNodeCount( 26 ); increment( cache, 2, 10 ); increment( cache, 5, 2 ); increment( cache, 7, 12 ); @@ -108,7 +108,7 @@ public void shouldGoThroughThePhases() throws Exception // GIVEN int nodeCount = 10; cache = new NodeRelationshipCache( NumberArrayFactory.OFF_HEAP, 20, 100, base ); - cache.setHighNodeId( nodeCount ); + cache.setNodeCount( nodeCount ); incrementRandomCounts( cache, nodeCount, nodeCount * 20 ); // Test sparse node semantics @@ -135,7 +135,7 @@ public void shouldObserveFirstRelationshipAsEmptyInEachDirection() throws Except Direction[] directions = Direction.values(); GroupVisitor groupVisitor = mock( GroupVisitor.class ); cache.setForwardScan( true, true ); - cache.setHighNodeId( nodes + 1 ); + cache.setNodeCount( nodes + 1 ); for ( int i = 0; i < nodes; i++ ) { assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) ); @@ -169,7 +169,7 @@ public void shouldResetCountAfterGetOnDenseNodes() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.AUTO_WITHOUT_PAGECACHE, 1, 100, base ); long nodeId = 0; int typeId = 3; - cache.setHighNodeId( 1 ); + cache.setNodeCount( 1 ); cache.incrementCount( nodeId ); cache.incrementCount( nodeId ); cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 10, true ); @@ -193,7 +193,7 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception // WHEN long nodeId = 1_000_000 - 1; int typeId = 10; - cache.setHighNodeId( nodeId + 1 ); + cache.setNodeCount( nodeId + 1 ); Direction direction = Direction.OUTGOING; long relId = 10; cache.getAndPutRelationship( nodeId, typeId, direction, relId, false ); @@ -212,7 +212,7 @@ public void shouldPutRandomStuff() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 1000, base ); // mark random nodes as dense (dense node threshold is 1 so enough with one increment - cache.setHighNodeId( nodes ); + cache.setNodeCount( nodes ); for ( long nodeId = 0; nodeId < nodes; nodeId++ ) { if ( random.nextBoolean() ) @@ -249,7 +249,7 @@ public void shouldPut6ByteRelationshipIds() throws Exception long denseNode = 1; long relationshipId = (1L << 48) - 2; int typeId = 10; - cache.setHighNodeId( 2 ); + cache.setNodeCount( 2 ); cache.incrementCount( denseNode ); // WHEN @@ -267,7 +267,7 @@ public void shouldFailFastIfTooBigRelationshipId() throws Exception // GIVEN int typeId = 10; cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); - cache.setHighNodeId( 1 ); + cache.setNodeCount( 1 ); // WHEN cache.getAndPutRelationship( 0, typeId, OUTGOING, (1L << 48) - 2, false ); @@ -290,7 +290,7 @@ public void shouldVisitChangedNodes() throws Exception int nodes = 10; int typeId = 10; cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 2, 100, base ); - cache.setHighNodeId( nodes ); + cache.setNodeCount( nodes ); for ( long nodeId = 0; nodeId < nodes; nodeId++ ) { cache.incrementCount( nodeId ); @@ -342,7 +342,7 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception long nodeId = 5; long count = NodeRelationshipCache.MAX_COUNT - 1; int typeId = 10; - cache.setHighNodeId( 10 ); + cache.setNodeCount( 10 ); cache.setCount( nodeId, count, typeId, OUTGOING ); // WHEN @@ -365,7 +365,7 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 0; int typeId = 10; - cache.setHighNodeId( nodeId + 1 ); + cache.setNodeCount( nodeId + 1 ); cache.incrementCount( nodeId ); GroupVisitor groupVisitor = mock( GroupVisitor.class ); when( groupVisitor.visit( anyLong(), anyInt(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L ); @@ -425,7 +425,7 @@ public void shouldHaveDenseNodesWithBigCounts() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 1; int typeId = 10; - cache.setHighNodeId( nodeId + 1 ); + cache.setNodeCount( nodeId + 1 ); cache.setCount( nodeId, 2, typeId, OUTGOING ); // surely dense now cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 1, true ); cache.getAndPutRelationship( nodeId, typeId, INCOMING, 2, true ); @@ -448,7 +448,7 @@ public void shouldCacheMultipleDenseNodeRelationshipHeads() throws Exception { // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1 ); - cache.setHighNodeId( 10 ); + cache.setNodeCount( 10 ); long nodeId = 3; cache.setCount( nodeId, 10, /*these do not matter ==>*/ 0, OUTGOING ); @@ -486,7 +486,7 @@ public void shouldHaveSparseNodesWithBigCounts() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 1; int typeId = 10; - cache.setHighNodeId( nodeId + 1 ); + cache.setNodeCount( nodeId + 1 ); // WHEN long highCount = NodeRelationshipCache.MAX_COUNT - 100;