From bf0c0f810b3f561dbf997df80e43ec23139db352 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 23 Aug 2016 20:18:20 +0200 Subject: [PATCH] Revamped parallelism of parts of batch importer The main story here is composed of two things: - a new ForkedProcessorStep which does parallelization inside each batch, executed one by one. This to avoid difficulties parallelizing some steps which has a costly section which isn't parallelizable. With this new step items in a batch can be striped such that each forked processor knows which parts to process. - better mechanical sympathy where most stages are optimized to work with batch sizes matching pages in the page cache of the store they (mainly) work with. The forked processor simplifies a couple of stages, there are now no artificial additional steps for splitting or otherwise modify batches to be better parallelizable. Also the whole stage scales better with added processors because the old way of parallelizing those stages often involved a step which was single-threaded and acted as a divider-of-work. Such a step would often become the bottleneck in the end anyway. About mechanical sympathy the main problem previously was that reader and writer of stages which read from and wrote to the same store actually contended on each other. Given the smaller batch size, there were multiple batches of read records for any given page. Later in the stage where store was updated would often update the same page and so the reader (still reading that page) would need to do mych more retry- reading and so slow the whole stage down. Now with the aligned batch sizes the reader doesn't contend with the writers in the page cache. Additionally the main store updating step have been split into steps updating entities and properties separately, this to have the entity updating able to go even faster. The net result of this change as a whole should be that more often the disk is the only main bottleneck. On test machines and development laptops a 2x-3x performance improvement of the importer have been observed. --- .../ParallelBatchImporterTest.java | 4 +- .../AssignRelationshipIdBatchStep.java | 66 ++++++ .../neo4j/unsafe/impl/batchimport/Batch.java | 2 - .../impl/batchimport/CacheGroupsStep.java | 6 +- .../CalculateDenseNodePrepareStep.java | 116 ---------- .../batchimport/CalculateDenseNodesStage.java | 3 +- .../batchimport/CalculateDenseNodesStep.java | 60 +++-- .../CalculateRelationshipsStep.java | 4 + .../impl/batchimport/Configuration.java | 17 ++ .../impl/batchimport/CountGroupsStage.java | 4 +- .../batchimport/EntityStoreUpdaterStep.java | 5 +- .../impl/batchimport/NodeCountsStage.java | 4 +- .../impl/batchimport/NodeFirstGroupStage.java | 4 +- .../unsafe/impl/batchimport/NodeStage.java | 4 +- .../batchimport/ParallelBatchImporter.java | 27 ++- .../ParallelizationCoordinator.java | 66 ------ .../batchimport/ParallelizeByNodeIdStep.java | 132 ----------- .../ReadRelationshipCountsDataStep.java | 4 +- .../impl/batchimport/RecordIdIteration.java | 66 ------ .../impl/batchimport/RecordIdIterator.java | 182 +++++++++++++++ .../batchimport/RelationshipEncoderStep.java | 79 ++----- .../RelationshipGroupDefragmenter.java | 12 +- .../RelationshipLinkbackStage.java | 9 +- .../batchimport/RelationshipLinkbackStep.java | 139 +++++++++++ .../RelationshipPreparationStep.java | 3 - .../RelationshipRecordPreparationStep.java | 80 +++++++ .../impl/batchimport/RelationshipStage.java | 17 +- .../batchimport/ScanAndCacheGroupsStage.java | 4 +- .../impl/batchimport/UpdateRecordsStep.java | 4 +- .../executor/DynamicTaskExecutor.java | 16 +- .../batchimport/staging/AbstractStep.java | 1 + .../staging/DynamicProcessorAssigner.java | 2 +- .../staging/ForkedProcessorStep.java | 215 ++++++++++++++++++ .../batchimport/staging/ProcessorStep.java | 37 +-- .../batchimport/staging/ReadRecordsStep.java | 36 +-- .../unsafe/impl/batchimport/staging/Step.java | 5 - .../AssignRelationshipIdBatchStepTest.java | 99 ++++++++ .../CalculateDenseNodesStepTest.java | 134 +++++++---- .../impl/batchimport/CapturingStep.java | 51 +++++ .../ParallelizationCoordinatorTest.java | 125 ---------- .../batchimport/RecordIdIteratorTest.java | 129 +++++++++++ .../staging/ForkedProcessorStepTest.java | 187 +++++++++++++++ .../staging/ParallelizeByNodeIdStepTest.java | 115 ---------- .../staging/ProcessorStepTest.java | 20 +- .../staging/ReadRecordsStepTest.java | 106 ++++++++- .../staging/SimpleStageControl.java | 56 +++++ 46 files changed, 1592 insertions(+), 865 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStep.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodePrepareStep.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinator.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordIdIteration.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordIdIterator.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipRecordPreparationStep.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStepTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CapturingStep.java delete mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinatorTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RecordIdIteratorTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java delete mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ParallelizeByNodeIdStepTest.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java diff --git a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java index 33e6568b2115c..96577128c64ca 100644 --- a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java @@ -107,7 +107,7 @@ public class ParallelBatchImporterTest @Override public int batchSize() { - // Set to extra low to exercise the internals and IoQueue a bit more. + // Set to extra low to exercise the internals a bit more. return 100; } @@ -175,7 +175,7 @@ public void shouldImportCsvData() throws Exception nodes( nodeRandomSeed, NODE_COUNT, inputIdGenerator, groups ), relationships( relationshipRandomSeed, RELATIONSHIP_COUNT, inputIdGenerator, groups ), idMapper, idGenerator, - /*insanely high bad tolerance, but it will actually never be that many*/ + /*insanely high bad tolerance, but it will actually never be that many*/ silentBadCollector( RELATIONSHIP_COUNT ) ) ); // THEN diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStep.java new file mode 100644 index 0000000000000..73e0bd2f708fc --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStep.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +/** + * Assigns record ids to {@link Batch} for later record allocation. Since this step is single-threaded + * we can safely assign these ids here. + */ +public class AssignRelationshipIdBatchStep extends ProcessorStep> +{ + private long nextId; + + public AssignRelationshipIdBatchStep( StageControl control, Configuration config, long firstRelationshipId ) + { + super( control, "ASSIGN", config, 1 ); + this.nextId = firstRelationshipId; + } + + @Override + protected void process( Batch batch, BatchSender sender ) throws Throwable + { + // Assign first record id and send + batch.firstRecordId = nextId; + sender.send( batch ); + + // Set state for the next batch + nextId += batch.input.length; + if ( nextId <= IdGeneratorImpl.INTEGER_MINUS_ONE && + nextId + batch.input.length >= IdGeneratorImpl.INTEGER_MINUS_ONE ) + { + // There's this pesky INTEGER_MINUS_ONE ID again. Easiest is to simply skip this batch of ids + // or at least the part up to that id and just continue after it. + nextId = IdGeneratorImpl.INTEGER_MINUS_ONE + 1; + } + } + + public long getNextRelationshipId() + { + return nextId; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java index 9a205be73982a..9eaf76b799c8f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java @@ -45,8 +45,6 @@ public class Batch // using the same index as the record. So it's a collective size suitable for complete looping // over the batch. public PropertyBlock[] propertyBlocks; - // Used by ParallelizeByNodeIdStep to help determine any two batches have any id in common - public long[] sortedIds; // Used by relationship staged to query idMapper and store ids here public long[] ids; public boolean parallelizableWithPrevious; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java index 958a6bf553c10..2c21cb6e915db 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CacheGroupsStep.java @@ -41,9 +41,11 @@ public CacheGroupsStep( StageControl control, Configuration config, Relationship @Override protected void process( RelationshipGroupRecord[] batch, BatchSender sender ) throws Throwable { - for ( RelationshipGroupRecord groupRecord : batch ) + // These records are read page-wise forwards, but should be cached in reverse + // since the records exists in the store in reverse order. + for ( int i = batch.length-1; i >= 0; i-- ) { - cache.put( groupRecord ); + cache.put( batch[i] ); } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodePrepareStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodePrepareStep.java deleted file mode 100644 index 0bd24f181a331..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodePrepareStep.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import java.util.Arrays; - -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.input.Collector; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; - -/** - * Makes some preparations to incoming batches so that {@link CalculateDenseNodesStep} can run parallel batches. - * Sends long[] batches downstream, where each batch is for node ids with a certain radix, such that - * the {@link NodeRelationshipCache} cache can be updated in parallel without synchronization. - * Each id in the long[] has the 0x40000000_00000000 bit set if end node. End node on loop relationships - * are not counted. - */ -public class CalculateDenseNodePrepareStep extends ProcessorStep> -{ - public static final int RADIXES = 10; - - private final int batchSize; - private final long[][] inProgressBatches; - private final int[] cursors; - private final Collector badCollector; - - public CalculateDenseNodePrepareStep( StageControl control, Configuration config, Collector badCollector ) - { - super( control, "DIVIDE", config, 1 ); - this.badCollector = badCollector; - this.batchSize = config.batchSize() * 2; // x2 since we receive (and send) 2 ids per relationship - this.inProgressBatches = new long[RADIXES][batchSize]; - this.cursors = new int[inProgressBatches.length]; - } - - @Override - protected void process( Batch batch, BatchSender sender ) - { - long[] batchIds = batch.ids; - InputRelationship[] input = batch.input; - for ( int i = 0, r = 0; i < batchIds.length; i++, r++ ) - { - // start node - long startNodeId = batchIds[i++]; - InputRelationship relationship = input[r]; - processNodeId( startNodeId, sender, relationship, relationship.startNode() ); - - // end node - long endNodeId = batchIds[i]; - boolean loop = startNodeId == endNodeId; - if ( !loop ) - { - processNodeId( endNodeId, sender, relationship, relationship.endNode() ); - } - } - } - - private void processNodeId( long nodeId, BatchSender sender, InputRelationship relationship, Object inputId ) - { - if ( nodeId != -1 ) - { - int startNodeRadix = radixOf( nodeId ); - inProgressBatches[startNodeRadix][cursors[startNodeRadix]++] = nodeId; - if ( cursors[startNodeRadix] == batchSize ) - { - sender.send( inProgressBatches[startNodeRadix] ); - inProgressBatches[startNodeRadix] = new long[batchSize]; - cursors[startNodeRadix] = 0; - } - } - else - { - badCollector.collectBadRelationship( relationship, inputId ); - } - } - - @Override - protected void lastCallForEmittingOutstandingBatches( BatchSender sender ) - { - for ( int i = 0; i < cursors.length; i++ ) - { - if ( cursors[i] > 0 ) - { - sender.send( cursors[i] == batchSize - ? inProgressBatches[i] - : Arrays.copyOf( inProgressBatches[i], cursors[i] ) ); - } - } - } - - public static int radixOf( long nodeId ) - { - return (int) (nodeId%RADIXES); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java index 304fd265963da..1d5bbafb021ed 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java @@ -55,8 +55,7 @@ public CalculateDenseNodesStage( Configuration config, InputIterable +public class CalculateDenseNodesStep extends ForkedProcessorStep> { private final NodeRelationshipCache cache; - private final StripedLock lock = new StripedLock( RADIXES ); + private final Collector badCollector; - public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache ) + public CalculateDenseNodesStep( StageControl control, Configuration config, NodeRelationshipCache cache, + Collector badCollector ) { - // Max 10 processors since we receive batches split by radix %10 so it doesn't make sense to have more - super( control, "CALCULATOR", config, RADIXES ); + super( control, "CALCULATE", config, 0 ); this.cache = cache; + this.badCollector = badCollector; } @Override - protected void process( long[] ids, BatchSender sender ) + protected void forkedProcess( int id, int processors, Batch batch ) + { + for ( int i = 0, idIndex = 0; i < batch.input.length; i++ ) + { + InputRelationship relationship = batch.input[i]; + long startNodeId = batch.ids[idIndex++]; + long endNodeId = batch.ids[idIndex++]; + processNodeId( id, processors, startNodeId, relationship, relationship.startNode() ); + if ( startNodeId != endNodeId || // avoid counting loops twice + startNodeId == -1 || endNodeId == -1 ) // although always collect bad relationships + { + // Loops only counts as one + processNodeId( id, processors, endNodeId, relationship, relationship.endNode() ); + } + } + } + + private void processNodeId( int id, int processors, long nodeId, + InputRelationship relationship, Object inputId ) { - // We lock because we only want at most one processor processing ids of a certain radix. - try ( Resource automaticallyUnlocked = lock.lock( radixOf( ids[0] ) ) ) + if ( nodeId == -1 ) { - for ( long id : ids ) + if ( id == 0 ) { - if ( id != -1 ) - { - cache.incrementCount( id ); - } + // Only let the processor with id=0 (which always exists) report the bad relationships + badCollector.collectBadRelationship( relationship, inputId ); } } + else if ( nodeId % processors == id ) + { + cache.incrementCount( nodeId ); + } } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java index 79f487ad388b1..d9ac78d4f97a2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateRelationshipsStep.java @@ -26,6 +26,10 @@ import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +/** + * Keeps track of number of relationships to import, this to set highId in relationship store before import. + * This is because of the way double-unit records works, so the secondary units will end up beyond this limit. + */ public class CalculateRelationshipsStep extends ProcessorStep> { private final RelationshipStore relationshipStore; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Configuration.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Configuration.java index 2d1d2b497dbe8..1f1b581d7f8ca 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Configuration.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Configuration.java @@ -80,6 +80,11 @@ class Overridden private final Configuration defaults; private final Config config; + public Overridden( Configuration defaults ) + { + this( defaults, Config.empty() ); + } + public Overridden( Configuration defaults, Config config ) { super( defaults ); @@ -110,4 +115,16 @@ public int movingAverageSize() return defaults.movingAverageSize(); } } + + public static Configuration withBatchSize( Configuration config, int batchSize ) + { + return new Overridden( config ) + { + @Override + public int batchSize() + { + return batchSize; + } + }; + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java index 5e300a10b2aa2..6fbb8add7f47f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CountGroupsStage.java @@ -25,6 +25,8 @@ import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; + /** * Stage for counting groups per node, populates {@link RelationshipGroupCache}. */ @@ -35,7 +37,7 @@ public CountGroupsStage( Configuration config, RecordStore( control(), config, store, RecordIdIteration.allIn( store ) ) ); + add( new ReadRecordsStep<>( control(), config, store, allIn( store, config ) ) ); add( new CountGroupsStep( control(), config, groupCache ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java index 5d461bd4528e2..2e54595f2a4fa 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java @@ -64,6 +64,7 @@ public interface Monitor private final IoMonitor ioMonitor; private final PropertyCreator propertyCreator; private final Monitor monitor; + private long highestId; // Reusable instances for less GC private final BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess(); @@ -133,7 +134,7 @@ protected void process( Batch batch, BatchSender sender ) } propertyBlockCursor += propertyBlockCount; } - entityStore.setHighestPossibleIdInUse( highestId ); + this.highestId = highestId; // Write all the created property records. for ( PropertyRecord propertyRecord : propertyRecords.records() ) @@ -192,5 +193,7 @@ protected void done() // and bytes written. NodeStage and CalculateDenseNodesStage can be run in parallel so if // NodeStage completes before CalculateDenseNodesStage then we want to stop the time in the I/O monitor. ioMonitor.stop(); + + entityStore.setHighestPossibleIdInUse( highestId ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java index 8ae58fe87b795..a732b0db87065 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeCountsStage.java @@ -26,7 +26,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; -import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allIn; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; /** * Reads all records from {@link NodeStore} and process the counts in them, populating {@link NodeLabelsCache} @@ -38,7 +38,7 @@ public NodeCountsStage( Configuration config, NodeLabelsCache cache, NodeStore n int highLabelId, CountsAccessor.Updater countsUpdater, StatsProvider... additionalStatsProviders ) { super( "Node counts", config ); - add( new ReadRecordsStep<>( control(), config, nodeStore, allIn( nodeStore ) ) ); + add( new ReadRecordsStep<>( control(), config, nodeStore, allIn( nodeStore, config ) ) ); add( new RecordProcessorStep<>( control(), "COUNT", config, new NodeCountsProcessor( nodeStore, cache, highLabelId, countsUpdater ), true, additionalStatsProviders ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java index 3e4f231786127..d2518f8fa1dbe 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstGroupStage.java @@ -26,7 +26,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; -import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allIn; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; /** * Updates dense nodes with which will be the {@link NodeRecord#setNextRel(long) first group} to point to, @@ -38,7 +38,7 @@ public NodeFirstGroupStage( Configuration config, RecordStore nodeStore, ByteArray cache ) { super( "Node --> Group", config ); - add( new ReadRecordsStep<>( control(), config, groupStore, allIn( groupStore ) ) ); + add( new ReadRecordsStep<>( control(), config, groupStore, allIn( groupStore, config ) ) ); add( new NodeSetFirstGroupStep( control(), config, nodeStore, cache ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java index 7091c54d2ac6d..5cfdf1d691047 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java @@ -68,8 +68,8 @@ public NodeStage( Configuration config, IoMonitor writeMonitor, add( new NodeEncoderStep( control(), config, idMapper, idGenerator, neoStore.getLabelRepository(), nodeStore, memoryUsage ) ); add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) ); - add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, - writeMonitor, storeUpdateMonitor ) ); + add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor, + storeUpdateMonitor ) ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index d4d067e72d30d..ed57abed03979 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -60,6 +60,7 @@ import static org.neo4j.helpers.collection.Iterators.asSet; import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; +import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; @@ -170,15 +171,15 @@ public void doImport( Input input ) throws IOException } // Stage 2 -- calculate dense node threshold - CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( config, + CalculateDenseNodesStage calculateDenseNodesStage = new CalculateDenseNodesStage( + withBatchSize( config, config.batchSize()*10 ), relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); executeStages( calculateDenseNodesStage ); importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor, idMapper, cachedRelationships, inputCache, calculateDenseNodesStage.getRelationshipTypes( Long.MAX_VALUE ), - // Is batch size a good measure for considering a group of relationships a minority? - calculateDenseNodesStage.getRelationshipTypes( config.batchSize() ) ); + calculateDenseNodesStage.getRelationshipTypes( 100 ) ); // Release this potentially really big piece of cached data long memoryWeCanHoldForCertain = totalMemoryUsageOf( idMapper, nodeRelationshipCache ); @@ -189,7 +190,7 @@ public void doImport( Input input ) throws IOException nodeRelationshipCache = null; new RelationshipGroupDefragmenter( config, executionMonitor ).run( - max( max( memoryWeCanHoldForCertain, highNodeId * 4), mebiBytes( 1 ) ), neoStore, highNodeId ); + max( max( memoryWeCanHoldForCertain, highNodeId * 4 ), mebiBytes( 1 ) ), neoStore, highNodeId ); // Stage 6 -- count nodes per label and labels per node nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); @@ -270,6 +271,9 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, inputCache ); long nextRelationshipId = 0; + Configuration relationshipConfig = withBatchSize( config, + neoStore.getRelationshipStore().getRecordsPerPage() ); + Configuration nodeConfig = withBatchSize( config, neoStore.getNodeStore().getRecordsPerPage() ); for ( int i = 0; perTypeIterator.hasNext(); i++ ) { // Stage 3a -- relationships, properties @@ -280,18 +284,21 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, InputIterator perType = perTypeIterator.next(); String topic = " [:" + currentType + "] (" + (i+1) + "/" + allRelationshipTypes.length + ")"; - final RelationshipStage relationshipStage = new RelationshipStage( topic, config, writeMonitor, - perType, idMapper, neoStore, nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId ); + final RelationshipStage relationshipStage = new RelationshipStage( topic, config, + writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache, + storeUpdateMonitor, nextRelationshipId ); executeStages( relationshipStage ); // Stage 4a -- set node nextRel fields for dense nodes - executeStages( new NodeFirstRelationshipStage( topic, config, neoStore.getNodeStore(), + executeStages( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, true/*dense*/, currentTypeId ) ); // Stage 5a -- link relationship chains together for dense nodes nodeRelationshipCache.setForwardScan( false ); - executeStages( new RelationshipLinkbackStage( topic, config, neoStore.getRelationshipStore(), + executeStages( new RelationshipLinkbackStage( topic, + relationshipConfig, + neoStore.getRelationshipStore(), nodeRelationshipCache, nextRelationshipId, relationshipStage.getNextRelationshipId(), true/*dense*/ ) ); nextRelationshipId = relationshipStage.getNextRelationshipId(); @@ -301,12 +308,12 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, String topic = " Sparse"; nodeRelationshipCache.setForwardScan( true ); // Stage 4b -- set node nextRel fields for sparse nodes - executeStages( new NodeFirstRelationshipStage( topic, config, neoStore.getNodeStore(), + executeStages( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, false/*sparse*/, -1 ) ); // Stage 5b -- link relationship chains together for sparse nodes nodeRelationshipCache.setForwardScan( false ); - executeStages( new RelationshipLinkbackStage( topic, config, neoStore.getRelationshipStore(), + executeStages( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId, false/*sparse*/ ) ); if ( minorityRelationshipTypes.length > 0 ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinator.java deleted file mode 100644 index cf25125f9fd75..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.neo4j.graphdb.Resource; - -/** - * Lock for coordinating concurrency of execution between stretches of parallelizable batches - * versus non-parallelizable batches. Usage is to - * {@link #coordinate(boolean) lock based on whether or not being parallelizable with the previous batch}, - * put that {@link Resource} in a try-with-resource block, process and then exit that try-block. - */ -public class ParallelizationCoordinator -{ - private final ReadWriteLock lock = new ReentrantReadWriteLock( true ); - - public Resource coordinate( boolean parallelizableWithPrevious ) - { - if ( !parallelizableWithPrevious ) - { - // If this batch isn't parallelizable with previous batch then we need to wait for all previous - // batches (potentially many concurrent) to complete before this batch can run. - // Here that translates into acquiring the write lock. - lock.writeLock().lock(); - } - - // Now acquire the read lock, even if we have the write lock. Why? read right below. - final Lock readLock = lock.readLock(); - readLock.lock(); - - if ( !parallelizableWithPrevious ) - { - // Alright, we've made our point above when we acquired the write lock, i.e. awaited previous - // batches to complete and blocking new batches from starting. We can now - // (after having acquired the read lock) release the write lock since: - // - if the next batch isn't parallelizable with this batch it will await this batch to complete - // as a side effect of acquiring the write lock. - // - if the next batch is parallelizable with this batch it can go right ahead and process - // since it'll only need to acquire the read lock. - lock.writeLock().unlock(); - } - - return readLock::unlock; - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java deleted file mode 100644 index 5cfb181116177..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelizeByNodeIdStep.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.Configuration; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; - -import static org.neo4j.unsafe.impl.batchimport.Utils.anyIdCollides; -import static org.neo4j.unsafe.impl.batchimport.Utils.mergeSortedInto; - -/** - * Enables {@link RelationshipEncoderStep} to safely process batches in parallel. - * - * Using actual node ids from {@link Batch#ids}, this step compares ids from consecutive batches and - * detects whether or not there are any overlap of ids between the batches. Batches that have no id overlap - * can be run in parallel. Batches that do have id overlap will not be run in parallel since that would - * introduce a chance for multiple threads updating the same cache entries of the {@link NodeRelationshipCache} - * concurrently. - * - * "Why not solve this with CAS in the {@link NodeRelationshipCache}, you say"... sure, that - * would solve the issue of updating the same cache entries concurrently, but may break a contract of - * the batch importer that relationship ids in any chain are ordered by id. Id ordering inside chains are - * fundamental for implementing the {@link RelationshipLinkbackStage "prev pointer" linking} in the - * efficient way it's done currently, upholding sequential I/O access properties correctly. - * - * This step will have an array, {@link #concurrentNodeIds}, containing all node ids that are potentially - * processed (by the downstream step) at any given point in time. In order to allow one more batch to be - * executed in parallel with any other concurrently executing batches no node id in the tentative batch - * can already exist in {@link #concurrentNodeIds}. This makes this array grow with consecutive parallelizable - * batches until a non-parallelizable batch is encountered, at which point the array can be reset and - * the comparison (and potential growth of it) can begin again. Implementation of this is garbage-free, - * but fact still remains that multiple consecutive parallelizable batches will keep increasing the - * time it takes to detect this property. Therefore there's an upper limit of 10 for the number of - * consecutive parallelizable batches, such that even if there would theoretically be 11 consecutive - * parallelizable batches, the 11:th will not be marked as parallelizable with the other previous 10. - */ -public class ParallelizeByNodeIdStep extends ProcessorStep> -{ - private static final int MAX_PARALLELIZABLE_BATCHES = 10; - - private final int batchSize; - private final int idBatchSize; - - // Since this step is single-threaded and currently RelationshipEncoderStep isn't then this is a perfect - // place for assigning actual relationship ids to batches, set in each Batch and used in RelationshipEncoderStep. - private long firstRecordId; - - // Collection of all ids from batches (theoretically) concurrently processing. Ids in here are sorted. - private final long[] concurrentNodeIds; - private int concurrentBatches; - - public ParallelizeByNodeIdStep( StageControl control, Configuration config ) - { - this( control, config, 0 ); - } - - public ParallelizeByNodeIdStep( StageControl control, Configuration config, long firstRecordId ) - { - super( control, "PARALLELIZE", config, 1 ); - // x2 since ids array cover both start and end nodes - this.batchSize = config.batchSize(); - this.idBatchSize = batchSize*2; - this.concurrentNodeIds = new long[idBatchSize * MAX_PARALLELIZABLE_BATCHES]; - this.firstRecordId = firstRecordId; - } - - @Override - protected void process( Batch batch, BatchSender sender ) throws Throwable - { - // Compare ids with concurrent ids - int concurrentNodeIdsRange = concurrentBatches*idBatchSize; - batch.parallelizableWithPrevious = concurrentBatches < MAX_PARALLELIZABLE_BATCHES && !anyIdCollides( - concurrentNodeIds, concurrentNodeIdsRange, batch.sortedIds, batch.ids.length ); - - // Assign first record id and send - batch.firstRecordId = firstRecordId; - sender.send( batch ); - - // Set state for the next batch - firstRecordId += batch.input.length; - if ( firstRecordId <= IdGeneratorImpl.INTEGER_MINUS_ONE && - firstRecordId + batchSize >= IdGeneratorImpl.INTEGER_MINUS_ONE ) - { - // There's this pesky INTEGER_MINUS_ONE ID again. Easiest is to simply skip this batch of ids - // or at least the part up to that id and just continue after it. - firstRecordId = IdGeneratorImpl.INTEGER_MINUS_ONE + 1; - } - - if ( batch.parallelizableWithPrevious ) - { - mergeSortedInto( batch.sortedIds, concurrentNodeIds, concurrentNodeIdsRange ); - concurrentBatches++; - } - else - { - System.arraycopy( batch.sortedIds, 0, concurrentNodeIds, 0, batch.sortedIds.length ); - concurrentBatches = 1; - } - } - - /** - * @return the next relationship id that this step would return if it were to import more relationships. - * This value can be used to feed into importing the next type, e.g. the constructor. - */ - public long getNextRelationshipId() - { - return firstRecordId; - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java index f8de40284f99d..e3cd667d939c9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadRelationshipCountsDataStep.java @@ -26,7 +26,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allIn; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; /** * Reads from {@link RelationshipStore} and produces batches of startNode,type,endNode values for @@ -39,7 +39,7 @@ public class ReadRelationshipCountsDataStep extends ReadRecordsStep. - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.collection.primitive.PrimitiveLongCollections; -import org.neo4j.collection.primitive.PrimitiveLongIterator; -import org.neo4j.kernel.impl.store.RecordStore; -import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; - -public class RecordIdIteration -{ - public static final PrimitiveLongIterator backwards( long lowIncluded, long highExcluded ) - { - return new PrimitiveLongCollections.PrimitiveLongBaseIterator() - { - private long next = highExcluded - 1; - - @Override - protected boolean fetchNext() - { - return next >= lowIncluded ? next( next-- ) : false; - } - }; - } - - public static final PrimitiveLongIterator forwards( long lowIncluded, long highExcluded ) - { - return new PrimitiveLongCollections.PrimitiveLongBaseIterator() - { - private long nextId = lowIncluded; - - @Override - protected boolean fetchNext() - { - return nextId < highExcluded ? next( nextId++ ) : false; - } - }; - } - - public static PrimitiveLongIterator allIn( RecordStore store ) - { - return forwards( store.getNumberOfReservedLowIds(), store.getHighId() ); - } - - public static PrimitiveLongIterator allInReversed( RecordStore store ) - { - return backwards( store.getNumberOfReservedLowIds(), store.getHighId() ); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordIdIterator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordIdIterator.java new file mode 100644 index 0000000000000..642cc14fcceb3 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RecordIdIterator.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.collection.primitive.PrimitiveLongCollections.PrimitiveLongBaseIterator; +import org.neo4j.collection.primitive.PrimitiveLongIterator; +import org.neo4j.kernel.impl.store.RecordStore; +import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; + +import static java.lang.Long.max; +import static java.lang.Long.min; + +/** + * Returns ids either backwards or forwards. In both directions ids are returned batch-wise, sequentially forwards + * in each batch. This means for example that in a range of ]100-0] (i.e. from 100 (exclusive) to 0 (inclusive) + * going backwards with a batch size of 40 then ids are returned like this: 80-99, 40-79, 0-39. + * This to get higher mechanical sympathy. + */ +public interface RecordIdIterator +{ + /** + * @return next batch of ids as {@link PrimitiveLongIterator}, or {@code null} if there are no more ids to return. + */ + PrimitiveLongIterator nextBatch(); + + static RecordIdIterator backwards( long lowIncluded, long highExcluded, Configuration config ) + { + return new Backwards( lowIncluded, highExcluded, config ); + } + + static RecordIdIterator forwards( long lowIncluded, long highExcluded, Configuration config ) + { + return new Forwards( lowIncluded, highExcluded, config ); + } + + static RecordIdIterator allIn( RecordStore store, Configuration config ) + { + return forwards( store.getNumberOfReservedLowIds(), store.getHighId(), config ); + } + + static RecordIdIterator allInReversed( RecordStore store, + Configuration config ) + { + return backwards( store.getNumberOfReservedLowIds(), store.getHighId(), config ); + } + + static class Forwards extends PrimitiveLongBaseIterator implements RecordIdIterator + { + private final long lowIncluded; + private final long highExcluded; + private final int batchSize; + private long highBatchId; + private long nextId; + private boolean initialized; + + public Forwards( long lowIncluded, long highExcluded, Configuration config ) + { + this.lowIncluded = this.nextId = lowIncluded; + this.highExcluded = highExcluded; + this.batchSize = config.batchSize(); + } + + @Override + public PrimitiveLongIterator nextBatch() + { + assert !initialized || nextId == highBatchId; + + long size = min( batchSize, highExcluded - nextId ); + if ( size > 0 ) + { + if ( !initialized ) + { + highBatchId = findRoofId( lowIncluded ); + initialized = true; + } + else + { + highBatchId = min( nextId + size, highBatchId + size ); + } + return this; + } + return null; + } + + private long findRoofId( long floorId ) + { + int rest = (int) (floorId % batchSize); + return max( rest == 0 ? floorId + batchSize : floorId + batchSize - rest, lowIncluded ); + } + + @Override + protected boolean fetchNext() + { + return nextId < highBatchId ? next( nextId++ ) : false; + } + + @Override + public String toString() + { + return "[" + lowIncluded + "-" + highExcluded + "["; + } + } + + static class Backwards extends PrimitiveLongBaseIterator implements RecordIdIterator + { + private final int batchSize; + private final long lowIncluded; + private final long highExcluded; + + private long nextRoofId; + private long floorId; + private long nextId; + private boolean initialized; + + public Backwards( long lowIncluded, long highExcluded, Configuration config ) + { + this.lowIncluded = lowIncluded; + this.batchSize = config.batchSize(); + this.highExcluded = this.nextId = this.nextRoofId = this.floorId = highExcluded; + } + + @Override + public PrimitiveLongIterator nextBatch() + { + assert !initialized || nextId == nextRoofId; + + long size = floorId - lowIncluded; + if ( size > 0 ) + { + if ( !initialized ) + { + // First time + nextId = floorId = findFloorId( nextRoofId ); + initialized = true; + } + else + { + nextRoofId = floorId; + nextId = floorId = floorId - min( size, batchSize ); + } + return this; + } + return null; + } + + private long findFloorId( long roofId ) + { + int rest = (int) (roofId % batchSize); + return max( rest == 0 ? roofId - batchSize : roofId - rest, lowIncluded ); + } + + @Override + protected boolean fetchNext() + { + return nextId < nextRoofId ? next( nextId++ ) : false; + } + + @Override + public String toString() + { + return "]" + highExcluded + "-" + lowIncluded + "]"; + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java index 16a80777aeb35..0b2dd54f7f972 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java @@ -19,16 +19,12 @@ */ package org.neo4j.unsafe.impl.batchimport; -import org.neo4j.graphdb.Resource; -import org.neo4j.kernel.impl.store.record.Record; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository; - import static org.neo4j.graphdb.Direction.BOTH; import static org.neo4j.graphdb.Direction.INCOMING; import static org.neo4j.graphdb.Direction.OUTGOING; @@ -39,78 +35,49 @@ * relationship ids are kept in {@link NodeRelationshipCache node cache}, which is a point of scalability issues, * although mitigated using multi-pass techniques. */ -public class RelationshipEncoderStep extends ProcessorStep> +public class RelationshipEncoderStep extends ForkedProcessorStep> { - private final BatchingTokenRepository relationshipTypeRepository; private final NodeRelationshipCache cache; - private final ParallelizationCoordinator parallelization = new ParallelizationCoordinator(); - public RelationshipEncoderStep( StageControl control, - Configuration config, - BatchingTokenRepository relationshipTypeRepository, - NodeRelationshipCache cache ) + public RelationshipEncoderStep( StageControl control, Configuration config, NodeRelationshipCache cache ) { super( control, "RELATIONSHIP", config, 0 ); - this.relationshipTypeRepository = relationshipTypeRepository; this.cache = cache; } @Override - protected Resource permit( Batch batch ) - { - return parallelization.coordinate( batch.parallelizableWithPrevious ); - } - - @Override - protected void process( Batch batch, BatchSender sender ) throws Throwable + protected void forkedProcess( int id, int processors, Batch batch ) { - InputRelationship[] input = batch.input; - batch.records = new RelationshipRecord[input.length]; - long[] ids = batch.ids; - long nextRelationshipId = batch.firstRecordId; - for ( int i = 0; i < input.length; i++ ) + for ( int i = 0; i < batch.records.length; i++ ) { - InputRelationship batchRelationship = input[i]; - long relationshipId = nextRelationshipId++; - // Ids have been verified to exist in CalculateDenseNodeStep - long startNodeId = ids[i*2]; - long endNodeId = ids[i*2+1]; - if ( startNodeId == -1 || endNodeId == -1 ) + RelationshipRecord relationship = batch.records[i]; + long startNode = relationship.getFirstNode(); + long endNode = relationship.getSecondNode(); + if ( startNode == -1 || endNode == -1 ) { // This means that we here have a relationship that refers to missing nodes. // It also means that we tolerate some amount of bad relationships and CalculateDenseNodesStep // already have reported this to the bad collector. - batch.records[i] = new RelationshipRecord( relationshipId ); - batch.records[i].setInUse( false ); continue; } - int typeId = batchRelationship.hasTypeId() ? batchRelationship.typeId() : - relationshipTypeRepository.getOrCreateId( batchRelationship.type() ); - RelationshipRecord relationshipRecord = batch.records[i] = new RelationshipRecord( relationshipId, - startNodeId, endNodeId, typeId ); - relationshipRecord.setInUse( true ); - // Set first/second next rel - boolean loop = startNodeId == endNodeId; - long firstNextRel = cache.getAndPutRelationship( - startNodeId, loop ? BOTH : OUTGOING, relationshipId, true ); - relationshipRecord.setFirstNextRel( firstNextRel ); - if ( loop ) + boolean loop = startNode == endNode; + if ( startNode % processors == id ) { - relationshipRecord.setSecondNextRel( firstNextRel ); + long firstNextRel = cache.getAndPutRelationship( + startNode, loop ? BOTH : OUTGOING, relationship.getId(), true ); + relationship.setFirstNextRel( firstNextRel ); + if ( loop ) + { + relationship.setSecondNextRel( firstNextRel ); + } } - else + + if ( !loop && endNode % processors == id ) { - relationshipRecord.setSecondNextRel( cache.getAndPutRelationship( - endNodeId, INCOMING, relationshipId, true ) ); + relationship.setSecondNextRel( cache.getAndPutRelationship( + endNode, INCOMING, relationship.getId(), true ) ); } - - // Most rels will not be first in chain - relationshipRecord.setFirstInFirstChain( false ); - relationshipRecord.setFirstInSecondChain( false ); - relationshipRecord.setFirstPrevRel( Record.NO_NEXT_RELATIONSHIP.intValue() ); - relationshipRecord.setSecondPrevRel( Record.NO_NEXT_RELATIONSHIP.intValue() ); } - sender.send( batch ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java index 5c048d8f5dad8..f55c85799acc8 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipGroupDefragmenter.java @@ -27,6 +27,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; +import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; import static org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory.AUTO; import static org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors.superviseExecution; @@ -64,7 +65,9 @@ public void run( long memoryWeCanHoldForCertain, BatchingNeoStores neoStore, lon RecordStore toStore = neoStore.getRelationshipGroupStore(); // Count all nodes, how many groups each node has each - executeStage( new CountGroupsStage( config, fromStore, groupCache ) ); + Configuration groupConfig = + withBatchSize( config, neoStore.getRelationshipGroupStore().getRecordsPerPage() ); + executeStage( new CountGroupsStage( groupConfig, fromStore, groupCache ) ); long fromNodeId = 0; long toNodeId = 0; while ( fromNodeId < highNodeId ) @@ -73,9 +76,9 @@ public void run( long memoryWeCanHoldForCertain, BatchingNeoStores neoStore, lon // Groups that doesn't fit in this round will be included in consecutive rounds. toNodeId = groupCache.prepare( fromNodeId ); // Cache those groups - executeStage( new ScanAndCacheGroupsStage( config, fromStore, groupCache ) ); + executeStage( new ScanAndCacheGroupsStage( groupConfig, fromStore, groupCache ) ); // And write them in sequential order in the store - executeStage( new WriteGroupsStage( config, groupCache, toStore ) ); + executeStage( new WriteGroupsStage( groupConfig, groupCache, toStore ) ); // Make adjustments for the next iteration fromNodeId = toNodeId; @@ -84,7 +87,8 @@ public void run( long memoryWeCanHoldForCertain, BatchingNeoStores neoStore, lon // Now update nodes to point to the new groups ByteArray groupCountCache = groupCache.getGroupCountCache(); groupCountCache.clear(); - executeStage( new NodeFirstGroupStage( config, toStore, neoStore.getNodeStore(), groupCountCache ) ); + Configuration nodeConfig = withBatchSize( config, neoStore.getNodeStore().getRecordsPerPage() ); + executeStage( new NodeFirstGroupStage( nodeConfig, toStore, neoStore.getNodeStore(), groupCountCache ) ); } catch ( Throwable t ) { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index c1f97384746a3..f6b3cb3286c12 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -24,8 +24,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; - -import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.backwards; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.backwards; /** * Sets {@link RelationshipRecord#setFirstPrevRel(long)} and {@link RelationshipRecord#setSecondPrevRel(long)} @@ -37,9 +36,9 @@ public RelationshipLinkbackStage( String topic, Configuration config, Relationsh NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, boolean denseNodes ) { super( "Relationship --> Relationship" + topic, config ); - add( new ReadRecordsStep<>( control(), config, store, backwards( lowRelationshipId, highRelationshipId ) ) ); - add( new RecordProcessorStep<>( control(), "LINK", config, - new RelationshipLinkbackProcessor( cache, denseNodes ), false ) ); + add( new ReadRecordsStep<>( control(), config, store, + backwards( lowRelationshipId, highRelationshipId, config ) ) ); + add( new RelationshipLinkbackStep( control(), config, cache, denseNodes ) ); add( new UpdateRecordsStep<>( control(), config, store ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java new file mode 100644 index 0000000000000..66202de1128e3 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.graphdb.Direction; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; + +/** + * Links relationship chains together, the "prev" pointers of them. "next" pointers are set when + * initially creating the relationship records. Setting prev pointers at that time would incur + * random access and so that is done here separately with help from {@link NodeRelationshipCache}. + */ +public class RelationshipLinkbackStep extends ForkedProcessorStep +{ + private final NodeRelationshipCache cache; + private final boolean denseNodes; + + public RelationshipLinkbackStep( StageControl control, Configuration config, + NodeRelationshipCache cache, boolean denseNodes ) + { + super( control, "LINK", config, 0 ); + this.cache = cache; + this.denseNodes = denseNodes; + } + + @Override + protected void forkedProcess( int id, int processors, RelationshipRecord[] batch ) + { + for ( int i = batch.length-1; i >= 0; i-- ) + { + RelationshipRecord item = batch[i]; + if ( item != null && item.inUse() ) + { + if ( !process( item, id, processors ) ) + { + // No change for this record, it's OK, all the processors will reach the same conclusion + batch[i] = null; + } + } + } + } + + public boolean process( RelationshipRecord record, int id, int processors ) + { + boolean processFirst = record.getFirstNode() % processors == id; + boolean processSecond = record.getSecondNode() % processors == id; + if ( !processFirst && !processSecond ) + { + // We won't process this relationship, but we cannot return false because that means + // that it won't even be updated. Arriving here merely means that this thread won't process + // this record at all and so we won't even have to ask cache about dense or not (which is costly) + return true; + } + + boolean firstIsDense = cache.isDense( record.getFirstNode() ); + boolean changed = false; + boolean isLoop = record.getFirstNode() == record.getSecondNode(); + if ( isLoop ) + { + if ( firstIsDense == denseNodes ) + { + if ( processFirst ) + { + long prevRel = cache.getAndPutRelationship( record.getFirstNode(), + Direction.BOTH, record.getId(), false ); + if ( prevRel == -1 ) + { // First one + record.setFirstInFirstChain( true ); + record.setFirstInSecondChain( true ); + prevRel = cache.getCount( record.getFirstNode(), Direction.BOTH ); + } + record.setFirstPrevRel( prevRel ); + record.setSecondPrevRel( prevRel ); + } + changed = true; + } + } + else + { + // Start node + if ( firstIsDense == denseNodes ) + { + if ( processFirst ) + { + long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), + Direction.OUTGOING, record.getId(), false ); + if ( firstPrevRel == -1 ) + { // First one + record.setFirstInFirstChain( true ); + firstPrevRel = cache.getCount( record.getFirstNode(), Direction.OUTGOING ); + } + record.setFirstPrevRel( firstPrevRel ); + } + changed = true; + } + + // End node + boolean secondIsDense = cache.isDense( record.getSecondNode() ); + if ( secondIsDense == denseNodes ) + { + if ( processSecond ) + { + long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), + Direction.INCOMING, record.getId(), false ); + if ( secondPrevRel == -1 ) + { // First one + record.setFirstInSecondChain( true ); + secondPrevRel = cache.getCount( record.getSecondNode(), Direction.INCOMING ); + } + record.setSecondPrevRel( secondPrevRel ); + } + changed = true; + } + } + + return changed; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java index ced74f74a0ee2..ad127049712f2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipPreparationStep.java @@ -19,7 +19,6 @@ */ package org.neo4j.unsafe.impl.batchimport; -import java.util.Arrays; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; @@ -53,8 +52,6 @@ protected void process( Batch batch, Batch ids[i*2] = idMapper.get( batchRelationship.startNode(), batchRelationship.startNodeGroup() ); ids[i*2+1] = idMapper.get( batchRelationship.endNode(), batchRelationship.endNodeGroup() ); } - batch.sortedIds = ids.clone(); - Arrays.sort( batch.sortedIds ); sender.send( batch ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipRecordPreparationStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipRecordPreparationStep.java new file mode 100644 index 0000000000000..a0b2081a67dcf --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipRecordPreparationStep.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.neo4j.kernel.impl.store.record.Record; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingRelationshipTypeTokenRepository; + +/** + * Creates and initializes {@link RelationshipRecord} batches to later be filled with actual data + * and pointers. This is a separate step to remove work from main step. + */ +public class RelationshipRecordPreparationStep extends ProcessorStep> +{ + private final BatchingRelationshipTypeTokenRepository relationshipTypeRepository; + + public RelationshipRecordPreparationStep( StageControl control, Configuration config, + BatchingRelationshipTypeTokenRepository relationshipTypeRepository ) + { + super( control, "RECORDS", config, 0 ); + this.relationshipTypeRepository = relationshipTypeRepository; + } + + @Override + protected void process( Batch batch, BatchSender sender ) throws Throwable + { + batch.records = new RelationshipRecord[batch.input.length]; + long id = batch.firstRecordId; + for ( int i = 0, idIndex = 0; i < batch.records.length; i++, id++ ) + { + RelationshipRecord relationship = batch.records[i] = new RelationshipRecord( id ); + InputRelationship batchRelationship = batch.input[i]; + long startNodeId = batch.ids[idIndex++]; + long endNodeId = batch.ids[idIndex++]; + if ( startNodeId == -1 || endNodeId == -1 ) + { + relationship.setInUse( false ); + } + else + { + relationship.setInUse( true ); + + // Most rels will not be first in chain + relationship.setFirstInFirstChain( false ); + relationship.setFirstInSecondChain( false ); + relationship.setFirstPrevRel( Record.NO_NEXT_RELATIONSHIP.intValue() ); + relationship.setSecondPrevRel( Record.NO_NEXT_RELATIONSHIP.intValue() ); + relationship.setFirstNode( startNodeId ); + relationship.setSecondNode( endNodeId ); + + int typeId = batchRelationship.hasTypeId() ? batchRelationship.typeId() : + relationshipTypeRepository.getOrCreateId( batchRelationship.type() ); + relationship.setType( typeId ); + } + } + sender.send( batch ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index e7ec028d9954b..94ca03a07860a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -28,7 +28,6 @@ import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; -import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_PROCESS; import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; /** @@ -37,29 +36,29 @@ */ public class RelationshipStage extends Stage { - private ParallelizeByNodeIdStep parallelizer; + private AssignRelationshipIdBatchStep idAssigner; public RelationshipStage( String topic, Configuration config, IoMonitor writeMonitor, InputIterator relationships, IdMapper idMapper, BatchingNeoStores neoStore, NodeRelationshipCache cache, EntityStoreUpdaterStep.Monitor storeUpdateMonitor, long firstRelationshipId ) { - super( "Relationships" + topic, config, ORDER_SEND_DOWNSTREAM | ORDER_PROCESS ); + super( "Relationships" + topic, config, ORDER_SEND_DOWNSTREAM ); add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); + add( idAssigner = new AssignRelationshipIdBatchStep( control(), config, firstRelationshipId ) ); add( new RelationshipPreparationStep( control(), config, idMapper ) ); + add( new RelationshipRecordPreparationStep( control(), config, neoStore.getRelationshipTypeRepository() ) ); add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); - add( parallelizer = new ParallelizeByNodeIdStep( control(), config, firstRelationshipId ) ); - add( new RelationshipEncoderStep( control(), config, - neoStore.getRelationshipTypeRepository(), cache ) ); - add( new EntityStoreUpdaterStep<>( control(), config, - relationshipStore, propertyStore, writeMonitor, storeUpdateMonitor ) ); + add( new RelationshipEncoderStep( control(), config, cache ) ); + add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore, writeMonitor, + storeUpdateMonitor ) ); } public long getNextRelationshipId() { - return parallelizer.getNextRelationshipId(); + return idAssigner.getNextRelationshipId(); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java index 782c9f790029b..c11216a0865b4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ScanAndCacheGroupsStage.java @@ -25,7 +25,7 @@ import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; -import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allInReversed; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allInReversed; public class ScanAndCacheGroupsStage extends Stage { @@ -33,7 +33,7 @@ public ScanAndCacheGroupsStage( Configuration config, RecordStore( control(), config, store, allInReversed( store ) ) ); + add( new ReadRecordsStep<>( control(), config, store, allInReversed( store, config ) ) ); add( new CacheGroupsStep( control(), config, cache ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java index 9a16a7e8c4f03..80ce30a7b5ec3 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java @@ -53,14 +53,16 @@ public UpdateRecordsStep( StageControl control, Configuration config, RecordStor @Override protected void process( RECORD[] batch, BatchSender sender ) throws Throwable { + int recordsUpdatedInThisBatch = 0; for ( RECORD record : batch ) { if ( record != null && record.inUse() && !IdValidator.isReservedId( record.getId() ) ) { update( record ); + recordsUpdatedInThisBatch++; } } - recordsUpdated += batch.length; + recordsUpdated += recordsUpdatedInThisBatch; } protected void update( RECORD record ) throws Throwable diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java index 51611968452ae..8f8ecb7f760da 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/executor/DynamicTaskExecutor.java @@ -125,12 +125,18 @@ public int processors( int delta ) public void submit( Task task ) { assertHealthy(); - while ( !queue.offer( task ) ) - { // Then just stay here and try - parkAWhile(); - assertHealthy(); + try + { + while ( !queue.offer( task, 10, MILLISECONDS ) ) + { // Then just stay here and try + assertHealthy(); + } + notifyProcessors(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); } - notifyProcessors(); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java index 0f69b26966eb2..bf3956a90fc89 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/AbstractStep.java @@ -159,6 +159,7 @@ protected void assertHealthy() @Override public void setDownstream( Step downstream ) { + assert downstream != this; this.downstream = downstream; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java index e928d2e62196b..d6c27e9ab2f75 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/DynamicProcessorAssigner.java @@ -34,7 +34,7 @@ * Monitors {@link StageExecution executions} and makes changes as the execution goes: *
    *
  • Figures out roughly how many CPUs (henceforth called processors) are busy processing batches. - * The most busy step will have its {@link Step#numberOfProcessors() processors} counted as 1 processor each, all other + * The most busy step will have its {@link Step#processors(int) processors} counted as 1 processor each, all other * will take into consideration how idle the CPUs executing each step is, counted as less than one.
  • *
  • Constantly figures out bottleneck steps and assigns more processors those.
  • *
  • Constantly figures out if there are steps that are way faster than the second fastest step and diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java new file mode 100644 index 0000000000000..09a1a79b6c1a3 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStep.java @@ -0,0 +1,215 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + +import static java.util.concurrent.locks.LockSupport.park; + +/** + * Executes batches sequentially, one at a time, although running multiple processing threads on each batch. + * This is almost the opposite of {@link ProcessorStep} which has ability of running multiple batches in parallel, + * each batch processed by one thread. + * + * The purpose of this type of step is to much better be able to parallelize steps that are seen to be + * bottlenecks, but are generally very hard to figure out how to parallelize. + * + * Extending {@link ProcessorStep} and providing max processors 1, i.e. always single-threaded, i.e. only + * max one batch being processed at any given point in time. This thread instead starts its own army of + * internal "forked" processors which will sit and wait for notifications to start processing the next batch. + */ +public abstract class ForkedProcessorStep extends ProcessorStep +{ + // used by forked processors to count down when they're done, so that main processing thread + // knows when they're all done + private final AtomicInteger doneSignal = new AtomicInteger(); + private final int maxForkedProcessors; + protected final List forkedProcessors = new ArrayList<>(); + // main processing thread communicates batch to process using this variable + // it's not volatile, but piggy-backs on globalTicket for that + private T currentBatch; + // this ticket helps coordinating with the forked processors + private long globalTicket; + // processorCount can be changed asynchronically by calls to processors(int), although its + // changes will only be applied between processing batches as to not interfere + private volatile int processorCount = 1; + // forked processors can communicate errors via this variable + private Throwable error; + // forked processors can ping main process thread via this variable + private Thread ticketThread; + + protected ForkedProcessorStep( StageControl control, String name, Configuration config, int maxProcessors ) + { + super( control, name, config, 1 ); + this.maxForkedProcessors = maxProcessors == 0 ? config.maxNumberOfProcessors() : maxProcessors; + applyProcessorCount(); + } + + @Override + protected void process( T batch, BatchSender sender ) throws Throwable + { + applyProcessorCount(); + int processorCount = forkedProcessors.size(); + if ( processorCount == 1 ) + { + // No need to complicate things, just do the "forked" processing right here + forkedProcess( 0, 1, batch ); + } + else + { + // Multiple processors, hand over the state to the processors and let them loose + currentBatch = batch; + ticketThread = Thread.currentThread(); // so that forked processors can unpark + globalTicket++; + // ^^^ --- everything above this line will piggy-back on the volatility from the line below + doneSignal.set( processorCount ); + notifyProcessors(); + while ( doneSignal.get() > 0 ) + { + LockSupport.park(); + } + if ( error != null ) + { + throw error; + } + } + + if ( downstream != null ) + { + sender.send( batch ); + } + } + + private void notifyProcessors() + { + for ( int i = 0; i < forkedProcessors.size(); i++ ) + { + LockSupport.unpark( forkedProcessors.get( i ) ); + } + } + + @Override + public void close() throws Exception + { + super.close(); + for ( ForkedProcessor forkedProcessor : forkedProcessors ) + { + forkedProcessor.halt(); + } + } + + /** + * This method is called by one of the threads processing this batch, there are multiple threads processing + * this batch in parallel, each with its own {@code id}. + * + * @param id zero-based id of this thread + * @param processors number of processors concurrently processing this batch + * @param batch batch to process + */ + protected abstract void forkedProcess( int id, int processors, T batch ); + + private void applyProcessorCount() + { + int processorCount = this.processorCount; + while ( processorCount != forkedProcessors.size() ) + { + if ( forkedProcessors.size() < processorCount ) + { + forkedProcessors.add( new ForkedProcessor( forkedProcessors.size() ) ); + } + else + { + forkedProcessors.remove( forkedProcessors.size() - 1 ).halt(); + } + } + } + + @Override + public int processors( int delta ) + { + // Don't delegate to ProcessorStep, because we're not parallelizing on batches, we're parallelizing + // inside each batch and batches must be processed in order + int processors = this.processorCount; + processors += delta; + if ( processors < 1 ) + { + processors = 1; + } + if ( processors > maxForkedProcessors ) + { + processors = maxForkedProcessors; + } + return this.processorCount = processors; + } + + class ForkedProcessor extends Thread + { + private final int id; + private volatile boolean halted; + private long localTicket; + + ForkedProcessor( int id ) + { + this.id = id; + this.localTicket = globalTicket; + start(); + } + + @Override + public void run() + { + while ( !halted ) + { + try + { + park(); + if ( !halted && localTicket + 1 == globalTicket ) + { + // ^^^ we just accessed volatile variable 'halted' and so the rest of the non-volatile + // variables will not be up to date for us + forkedProcess( id, forkedProcessors.size(), currentBatch ); + localTicket++; + } + } + catch ( Throwable t ) + { + error = t; + } + finally + { + // ^^^ finish off with counting down doneSignal which serves two purposes: + // - notifying the main submitter thread that we're done + // - going through a volatile memory access to let our changes propagate + doneSignal.decrementAndGet(); + LockSupport.unpark( ticketThread ); + } + } + } + + void halt() + { + halted = true; + LockSupport.unpark( this ); + } + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java index 3c627323a6e8c..ee67e16e6bfb5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStep.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongPredicate; -import org.neo4j.graphdb.Resource; import org.neo4j.unsafe.impl.batchimport.executor.DynamicTaskExecutor; import org.neo4j.unsafe.impl.batchimport.executor.ParkStrategy; import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor; @@ -55,7 +54,6 @@ public abstract class ProcessorStep extends AbstractStep private final Configuration config; private final LongPredicate catchUp = queueSizeThreshold -> queuedBatches.get() <= queueSizeThreshold; protected final AtomicLong begunBatches = new AtomicLong(); - private final LongPredicate rightBeginTicket = ticket -> begunBatches.get() == ticket; // Time stamp for when we processed the last queued batch received from upstream. // Useful for tracking how much time we spend waiting for batches from upstream. @@ -94,25 +92,16 @@ public long receive( final long ticket, final T batch ) sender.initialize( ticket ); try { - // If we're ordering tickets we will force calls to #permit to be ordered by ticket - // since grabbing a permit may include locking. - if ( guarantees( ORDER_PROCESS ) ) + begunBatches.incrementAndGet(); + long startTime1 = nanoTime(); + process( batch, sender ); + if ( downstream == null ) { - await( rightBeginTicket, ticket, healthChecker, park ); - } - try ( Resource precondition = permit( batch ) ) - { - begunBatches.incrementAndGet(); - long startTime1 = nanoTime(); - process( batch, sender ); - if ( downstream == null ) - { - // No batches were emmitted so we couldn't track done batches in that way. - // We can see that we're the last step so increment here instead - doneBatches.incrementAndGet(); - } - totalProcessingTime.add( nanoTime() - startTime1 - sender.sendTime ); + // No batches were emmitted so we couldn't track done batches in that way. + // We can see that we're the last step so increment here instead + doneBatches.incrementAndGet(); } + totalProcessingTime.add( nanoTime() - startTime1 - sender.sendTime ); decrementQueue(); checkNotifyEndDownstream(); @@ -125,16 +114,6 @@ public long receive( final long ticket, final T batch ) return idleTime; } - /** - * Called before {@link #process(Object, BatchSender) processing} and time measurement starts. - * Coordination with other processors should happen in here. - * If total ordering is enabled then calls will arrive in order of ticket. - */ - protected Resource permit( T batch ) throws Throwable - { - return Resource.EMPTY; - } - private void decrementQueue() { // Even though queuedBatches is built into AbstractStep, in that number of received batches diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java index 7d817f849ded8..22832fc44af93 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStep.java @@ -26,6 +26,8 @@ import org.neo4j.kernel.impl.store.RecordStore; import org.neo4j.kernel.impl.store.id.validation.IdValidator; import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; +import org.neo4j.unsafe.impl.batchimport.RecordIdIterator; + import static org.neo4j.kernel.impl.store.record.RecordLoad.CHECK; /** @@ -41,7 +43,7 @@ public class ReadRecordsStep extends IoProduc protected final RECORD record; protected final RecordCursor cursor; protected final long highId; - private final PrimitiveLongIterator ids; + private final RecordIdIterator ids; private final Class klass; private final int recordSize; // volatile since written by processing threads and read by execution monitor @@ -49,7 +51,7 @@ public class ReadRecordsStep extends IoProduc @SuppressWarnings( "unchecked" ) public ReadRecordsStep( StageControl control, Configuration config, RecordStore store, - PrimitiveLongIterator ids ) + RecordIdIterator ids ) { super( control, config ); this.store = store; @@ -71,26 +73,26 @@ public void start( int orderingGuarantees ) @Override protected Object nextBatchOrNull( long ticket, int batchSize ) { - if ( !ids.hasNext() ) + PrimitiveLongIterator ids; + while ( (ids = this.ids.nextBatch()) != null ) { - return null; - } + RECORD[] batch = (RECORD[]) Array.newInstance( klass, batchSize ); + int i = 0; + while ( ids.hasNext() ) + { + if ( cursor.next( ids.next() ) && !IdValidator.isReservedId( record.getId() ) ) + { + batch[i++] = (RECORD) record.clone(); + } + } - RECORD[] batch = (RECORD[]) Array.newInstance( klass, batchSize ); - int i = 0; - while ( i < batchSize && ids.hasNext() ) - { - if ( cursor.next( ids.next() ) && !IdValidator.isReservedId( record.getId() ) ) + if ( i > 0 ) { - RECORD newRecord = (RECORD) record.clone(); - batch[i] = newRecord; - i++; + count += i; + return i == batchSize ? batch : Arrays.copyOf( batch, i ); } } - - count += i; - batch = i == batchSize ? batch : Arrays.copyOf( batch, i ); - return batch.length > 0 ? batch : null; + return null; } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java index c85d5a44fdfa2..4199f03ce7704 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/Step.java @@ -43,11 +43,6 @@ public interface Step extends Parallelizable, AutoCloseable */ int ORDER_SEND_DOWNSTREAM = 0x1; - /** - * Whether or not actual processing of batches are ordered by ticket number. - */ - int ORDER_PROCESS = 0x2; - /** * Starts the processing in this step, such that calls to {@link #receive(long, Object)} can be accepted. * diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStepTest.java new file mode 100644 index 0000000000000..312baf3749779 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/AssignRelationshipIdBatchStepTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.junit.Test; + +import org.neo4j.kernel.impl.store.record.RelationshipRecord; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +import org.neo4j.unsafe.impl.batchimport.staging.Step; +import org.neo4j.unsafe.impl.batchimport.stats.Keys; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +import static org.neo4j.kernel.impl.store.id.IdGeneratorImpl.INTEGER_MINUS_ONE; +import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; +import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; + +public class AssignRelationshipIdBatchStepTest +{ + private final StageControl control = mock( StageControl.class ); + private final Configuration config = withBatchSize( DEFAULT, 10 ); + + @Test + public void shouldAssignNewIdsToBatches() throws Exception + { + // GIVEN + try ( + Step> step = + new AssignRelationshipIdBatchStep( control, config, 100 ); + CapturingStep> results = + new CapturingStep<>( control, "end", config ) ) + { + step.setDownstream( results ); + step.start( ORDER_SEND_DOWNSTREAM ); + results.start( ORDER_SEND_DOWNSTREAM ); + + // WHEN + Batch first = new Batch<>( new InputRelationship[config.batchSize()] ); + step.receive( 0, first ); + Batch second = new Batch<>( new InputRelationship[config.batchSize()] ); + step.receive( 1, second ); + while ( results.stats().stat( Keys.done_batches ).asLong() < 2 ); + + // THEN + assertEquals( 100, first.firstRecordId ); + assertEquals( 100 + config.batchSize(), second.firstRecordId ); + } + } + + @Test + public void shouldAvoidReservedId() throws Exception + { + // GIVEN + try ( + Step> step = + new AssignRelationshipIdBatchStep( control, config, INTEGER_MINUS_ONE - 15 ); + CapturingStep> results = + new CapturingStep<>( control, "end", config ) ) + { + step.setDownstream( results ); + step.start( ORDER_SEND_DOWNSTREAM ); + results.start( ORDER_SEND_DOWNSTREAM ); + + // WHEN + Batch first = new Batch<>( new InputRelationship[config.batchSize()] ); + step.receive( 0, first ); + Batch second = new Batch<>( new InputRelationship[config.batchSize()] ); + step.receive( 1, second ); + Batch third = new Batch<>( new InputRelationship[config.batchSize()] ); + step.receive( 2, third ); + while ( results.stats().stat( Keys.done_batches ).asLong() < 3 ); + + // THEN + assertEquals( INTEGER_MINUS_ONE - 15, first.firstRecordId ); + assertEquals( INTEGER_MINUS_ONE + 1, second.firstRecordId ); + assertEquals( second.firstRecordId + config.batchSize(), third.firstRecordId ); + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java index 21e908244bee3..b0a019a8177a7 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStepTest.java @@ -21,72 +21,122 @@ import org.junit.Test; +import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; +import org.neo4j.unsafe.impl.batchimport.input.Collector; +import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import org.neo4j.unsafe.impl.batchimport.staging.Step; -import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; +import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES; public class CalculateDenseNodesStepTest { - /** - * Batches are provided to {@link CalculateDenseNodesStep} in batches where each id all is of the same radix. - * This test asserts that, regardless of how many processors are assigned to the step there cannot be - * two processors processing multiple batches with ids of the same radix concurrently. - */ @Test - public void shouldPreventMultipleConcurrentProcessorsForAnyGivenRadixBatchSparse() throws Exception + public void shouldNotProcessLoopsTwice() throws Exception { // GIVEN - StageControl control = new StageControl() + NodeRelationshipCache cache = mock( NodeRelationshipCache.class ); + try ( CalculateDenseNodesStep step = new CalculateDenseNodesStep( mock( StageControl.class ), + DEFAULT, cache, mock( Collector.class ) ) ) { - @Override - public void panic( Throwable cause ) - { - cause.printStackTrace(); - } - }; - Configuration config = Configuration.DEFAULT; - NodeRelationshipCache cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, -1 ); - Step step = new CalculateDenseNodesStep( control, config, cache ); - step.start( 0 ); - step.processors( 100 ); + step.processors( 4 ); + step.start( 0 ); - // WHEN sending many batches, all which "happens" to have ids of the same radix, in fact - // this test "happens" to send the same batch of ids over and over, which actually may happen in read life, - // although it's an extreme case. - long[] ids = batchOfIdsWithRadix( 3 ); - int numberOfBatches = 100; - for ( int i = 0; i < numberOfBatches; i++ ) - { - step.receive( i, ids ); + // WHEN + Batch batch = batch( + relationship( 1, 5 ), + relationship( 3, 10 ), + relationship( 2, 2 ), // <-- the loop + relationship( 4, 1 ) ); + step.receive( 0, batch ); + step.endOfUpstream(); + while ( !step.isCompleted() ); + + // THEN + verify( cache, times( 2 ) ).incrementCount( eq( 1L ) ); + verify( cache, times( 1 ) ).incrementCount( eq( 2L ) ); + verify( cache, times( 1 ) ).incrementCount( eq( 3L ) ); + verify( cache, times( 1 ) ).incrementCount( eq( 4L ) ); + verify( cache, times( 1 ) ).incrementCount( eq( 5L ) ); + verify( cache, times( 1 ) ).incrementCount( eq( 10L ) ); } - step.endOfUpstream(); - waitUntilCompleted( step ); + } - // THEN - for ( long id : ids ) + @Test + public void shouldCollectBadRelationships() throws Exception + { + // GIVEN + NodeRelationshipCache cache = mock( NodeRelationshipCache.class ); + Collector collector = mock( Collector.class ); + try ( CalculateDenseNodesStep step = new CalculateDenseNodesStep( mock( StageControl.class ), + DEFAULT, cache, collector ) ) { - assertEquals( numberOfBatches, cache.getCount( id, null /*shouldn't be used here anyway*/ ) ); + step.processors( 4 ); + step.start( 0 ); + + // WHEN + Batch batch = batch( + relationship( 1, 5 ), + relationship( 3, 10 ), + relationship( "a", 2, -1, 2 ), // <-- bad relationship with missing start node + relationship( 2, "b", 2, -1 ), // <-- bad relationship with missing end node + relationship( "c", "d", -1, -1 ) );// <-- bad relationship with missing start and end node + step.receive( 0, batch ); + step.endOfUpstream(); + while ( !step.isCompleted() ); + + // THEN + verify( collector, times( 1 ) ).collectBadRelationship( any( InputRelationship.class ), eq( "a" ) ); + verify( collector, times( 1 ) ).collectBadRelationship( any( InputRelationship.class ), eq( "b" ) ); + verify( collector, times( 1 ) ).collectBadRelationship( any( InputRelationship.class ), eq( "c" ) ); + verify( collector, times( 1 ) ).collectBadRelationship( any( InputRelationship.class ), eq( "d" ) ); } } - private void waitUntilCompleted( Step step ) throws InterruptedException + private Batch batch( Data... relationships ) { - while ( !step.isCompleted() ) + Batch batch = new Batch<>( new InputRelationship[relationships.length] ); + batch.ids = new long[relationships.length * 2]; + for ( int i = 0; i < relationships.length; i++ ) { - Thread.sleep( 1 ); + batch.input[i] = new InputRelationship( "test", i, i, NO_PROPERTIES, null, relationships[i].startNode, + relationships[i].endNode, "TYPE", null ); + batch.ids[i*2] = relationships[i].startNodeId; + batch.ids[i*2 + 1] = relationships[i].endNodeId; } + return batch; + } + + private static Data relationship( Object startNode, Object endNode, long startNodeId, long endNodeId ) + { + return new Data( startNode, endNode, startNodeId, endNodeId ); } - private long[] batchOfIdsWithRadix( int radixOutOfTen ) + private static Data relationship( long startNodeId, long endNodeId ) { - long[] ids = new long[1_000]; - for ( int i = 0; i < ids.length; i++ ) + return new Data( startNodeId, endNodeId, startNodeId, endNodeId ); + } + + private static class Data + { + private final long startNodeId; + private final long endNodeId; + private final Object startNode; + private final Object endNode; + + Data( Object startNode, Object endNode, long startNodeId, long endNodeId ) { - ids[i] = i*10 + radixOutOfTen; + this.startNode = startNode; + this.endNode = endNode; + this.startNodeId = startNodeId; + this.endNodeId = endNodeId; } - return ids; } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CapturingStep.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CapturingStep.java new file mode 100644 index 0000000000000..6543dcd7f4a84 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/CapturingStep.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.ArrayList; +import java.util.List; + +import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; +import org.neo4j.unsafe.impl.batchimport.staging.Configuration; +import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; +import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider; + +public class CapturingStep extends ProcessorStep +{ + private final List receivedBatches = new ArrayList<>(); + + public CapturingStep( StageControl control, String name, Configuration config, + StatsProvider... additionalStatsProvider ) + { + super( control, name, config, 1, additionalStatsProvider ); + } + + public Iterable receivedBatches() + { + return receivedBatches; + } + + @Override + protected void process( T batch, BatchSender sender ) throws Throwable + { + receivedBatches.add( batch ); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinatorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinatorTest.java deleted file mode 100644 index e8705f0ae7c3f..0000000000000 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelizationCoordinatorTest.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.junit.Rule; -import org.junit.Test; - -import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; - -import org.neo4j.graphdb.Resource; -import org.neo4j.test.OtherThreadExecutor.WaitDetails; -import org.neo4j.test.OtherThreadExecutor.WorkerCommand; -import org.neo4j.test.OtherThreadRule; - -public class ParallelizationCoordinatorTest -{ - @Test - public void shouldSerializeNonParallelizableBatches() throws Exception - { - // GIVEN - ParallelizationCoordinator coordinator = new ParallelizationCoordinator(); - - // WHEN - Future r2Future; - try ( Resource r1 = coordinator.coordinate( false ) ) - { - r2Future = t2.execute( coordinate( coordinator, false ) ); - waitUntilAwaitingLock( t2 ); - } - - // THEN - t2.execute( close( r2Future.get() ) ).get(); - } - - @Test - public void shouldParallelizeBatches() throws Exception - { - // GIVEN - ParallelizationCoordinator coordinator = new ParallelizationCoordinator(); - - // WHEN - Resource r1 = coordinator.coordinate( true ); - Resource r2 = coordinator.coordinate( true ); - - // THEN - r1.close(); - r2.close(); - } - - @Test - public void shouldHaveNonParallelizableBatchAwaitPreviousParallelizable() throws Exception - { - // GIVEN - ParallelizationCoordinator coordinator = new ParallelizationCoordinator(); - Resource r1 = coordinator.coordinate( true ); - Resource r2 = coordinator.coordinate( true ); - Future r3Future = t2.execute( coordinate( coordinator, false ) ); - waitUntilAwaitingLock( t2 ); - - // WHEN the previous parallelizable batches are done - r2.close(); - r1.close(); - - // THEN we should be able to continue - t2.execute( close( r3Future.get() ) ).get(); - } - - private void waitUntilAwaitingLock( OtherThreadRule thread ) throws TimeoutException - { - while ( true ) - { - WaitDetails details = thread.get().waitUntilWaiting(); - if ( details.isAt( ParallelizationCoordinator.class, "coordinate" ) ) - { - break; - } - } - } - - private WorkerCommand coordinate( final ParallelizationCoordinator coordinator, - final boolean parallelizable ) - { - return new WorkerCommand() - { - @Override - public Resource doWork( Void state ) throws Exception - { - return coordinator.coordinate( parallelizable ); - } - }; - } - - private WorkerCommand close( final Resource resource ) - { - return new WorkerCommand() - { - @Override - public Void doWork( Void state ) throws Exception - { - resource.close(); - return null; - } - }; - } - - public final @Rule OtherThreadRule t2 = new OtherThreadRule<>(); -} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RecordIdIteratorTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RecordIdIteratorTest.java new file mode 100644 index 0000000000000..341b506d33f9f --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RecordIdIteratorTest.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.junit.Test; + +import org.neo4j.collection.primitive.PrimitiveLongIterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; +import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; + +public class RecordIdIteratorTest +{ + @Test + public void shouldGoPageWiseBackwards() throws Exception + { + // GIVEN + RecordIdIterator ids = RecordIdIterator.backwards( 0, 33, withBatchSize( DEFAULT, 10 ) ); + + // THEN + assertIds( ids, + array( 30, 31, 32 ), + array( 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ), + array( 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 ), + array( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ) ); + } + + @Test + public void shouldGoPageWiseBackwardsOnCleanBreak() throws Exception + { + // GIVEN + RecordIdIterator ids = RecordIdIterator.backwards( 0, 20, withBatchSize( DEFAULT, 10 ) ); + + // THEN + assertIds( ids, + array( 10, 11, 12, 13, 14, 15, 16, 17, 18, 19 ), + array( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ) ); + } + + @Test + public void shouldGoPageWiseBackwardsOnSingleBatch() throws Exception + { + // GIVEN + RecordIdIterator ids = RecordIdIterator.backwards( 0, 8, withBatchSize( DEFAULT, 10 ) ); + + // THEN + assertIds( ids, array( 0, 1, 2, 3, 4, 5, 6, 7 ) ); + } + + @Test + public void shouldGoBackwardsToNonZero() throws Exception + { + // GIVEN + RecordIdIterator ids = RecordIdIterator.backwards( 12, 34, withBatchSize( DEFAULT, 10 ) ); + + // THEN + assertIds( ids, + array( 30, 31, 32, 33 ), + array( 20, 21, 22, 23, 24, 25, 26, 27, 28, 29 ), + array( 12, 13, 14, 15, 16, 17, 18, 19 ) ); + } + + @Test + public void shouldGoForwardsWhenStartingFromNonZero() throws Exception + { + // GIVEN + RecordIdIterator ids = RecordIdIterator.forwards( 1, 12, withBatchSize( DEFAULT, 10 ) ); + + // THEN + assertIds( ids, + array( 1, 2, 3, 4, 5, 6, 7, 8, 9 ), + array( 10, 11 ) ); + } + + @Test + public void shouldGoForwardsWhenStartingFromNonZero2() throws Exception + { + // GIVEN + RecordIdIterator ids = RecordIdIterator.forwards( 34, 66, withBatchSize( DEFAULT, 10 ) ); + + // THEN + assertIds( ids, + array( 34, 35, 36, 37, 38, 39 ), + array( 40, 41, 42, 43, 44, 45, 46, 47, 48, 49 ), + array( 50, 51, 52, 53, 54, 55, 56, 57, 58, 59 ), + array( 60, 61, 62, 63, 64, 65 ) ); + } + + private void assertIds( RecordIdIterator ids, long[]... expectedIds ) + { + for ( long[] expectedArray : expectedIds ) + { + PrimitiveLongIterator iterator = ids.nextBatch(); + assertNotNull( iterator ); + for ( long expectedId : expectedArray ) + { + assertEquals( expectedId, iterator.next() ); + } + assertFalse( iterator.hasNext() ); + } + assertNull( ids.nextBatch() ); + } + + private static long[] array( long... ids ) + { + return ids; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java new file mode 100644 index 0000000000000..10a30eb0c89d0 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ForkedProcessorStepTest.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import org.neo4j.test.OtherThreadRule; +import org.neo4j.test.RandomRule; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.SECONDS; + +import static org.neo4j.unsafe.impl.batchimport.staging.Configuration.DEFAULT; + +public class ForkedProcessorStepTest +{ + @Rule + public final OtherThreadRule t2 = new OtherThreadRule<>(); + @Rule + public final RandomRule random = new RandomRule(); + + @Test + public void shouldProcessBatchBySingleThread() throws Exception + { + // GIVEN + SimpleStageControl control = new SimpleStageControl(); + AtomicReference processed = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch( 1 ); + try ( Step step = new ForkedProcessorStep( control, "Test", DEFAULT, 1 ) + { + @Override + protected void forkedProcess( int id, int processors, Object batch ) + { + try + { + assertEquals( 0, id ); + assertEquals( 1, processors ); + processed.set( batch ); + } + finally + { + latch.countDown(); + } + } + } ) + { + control.steps( step ); + step.start( Step.ORDER_SEND_DOWNSTREAM ); + + // WHEN + Object expectedBatch = new Object(); + step.receive( 0, expectedBatch ); + + // THEN + latch.await(); + assertSame( expectedBatch, processed.get() ); + control.assertHealthy(); + } + } + + @Test + public void shouldProcessBatchByMultipleThreads() throws Exception + { + // GIVEN + SimpleStageControl control = new SimpleStageControl(); + int threadCount = 10; + CountDownLatch latch = new CountDownLatch( threadCount ); + Object expectedBatch = new Object(); + try ( Step step = new ForkedProcessorStep( control, "Test", DEFAULT, threadCount ) + { + @Override + protected void forkedProcess( int id, int processors, Object batch ) + { + try + { + assertSame( expectedBatch, batch ); + } + finally + { + latch.countDown(); + } + } + } ) + { + control.steps( step ); + step.processors( threadCount ); + step.start( Step.ORDER_SEND_DOWNSTREAM ); + + // WHEN + step.receive( 0, expectedBatch ); + + // THEN + latch.await(); + control.assertHealthy(); + } + } + + @Test + public void shouldNotMissABeatUnderStress() throws Exception + { + // Idea is to have a constant load and then change number of processors randomly + // there should be observed processing with various number of threads and they should + // all fire as expected. + + // GIVEN + SimpleStageControl control = new SimpleStageControl(); + int maxProcessorCount = 10; + try ( Step step = new ForkedProcessorStep( control, "Test", DEFAULT, maxProcessorCount ) + { + private boolean[] seen = new boolean[maxProcessorCount]; + + @Override + protected void forkedProcess( int id, int processors, Object batch ) + { + if ( seen[id] ) + { + fail( Arrays.toString( seen ) + " id:" + id + " processors:" + processors ); + } + seen[id] = true; + } + + @Override + protected void process( Object batch, BatchSender sender ) throws Throwable + { + super.process( batch, sender ); + for ( int i = 0; i < forkedProcessors.size(); i++ ) + { + assertTrue( seen[i] ); + } + Arrays.fill( seen, false ); + } + } ) + { + step.start( Step.ORDER_SEND_DOWNSTREAM ); + control.steps( step ); + t2.execute( ignore -> + { + while ( !step.isCompleted() ) + { + Thread.sleep( 10 ); + step.processors( random.nextInt( maxProcessorCount ) + 1 ); + } + return null; + } ); + + // WHEN + long endTime = currentTimeMillis() + SECONDS.toMillis( 1 ); + long count = 0; + for ( ; currentTimeMillis() < endTime; count++ ) + { + step.receive( count, new Object() ); + } + step.endOfUpstream(); + while ( !step.isCompleted() ) + { + Thread.sleep( 10 ); + } + + // THEN the proof is in the pudding, our forked processor has assertions of its own + control.assertHealthy(); + } + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ParallelizeByNodeIdStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ParallelizeByNodeIdStepTest.java deleted file mode 100644 index f6538d67c73d5..0000000000000 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ParallelizeByNodeIdStepTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2002-2016 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.staging; - -import org.junit.Test; - -import java.util.Arrays; - -import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.Batch; -import org.neo4j.unsafe.impl.batchimport.ParallelizeByNodeIdStep; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.Configuration; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -public class ParallelizeByNodeIdStepTest -{ - private final StageControl control = mock( StageControl.class ); - private final BatchSender sender = mock( BatchSender.class ); - - @Test - public void shouldDetectABA() throws Throwable - { - // GIVEN - ProcessorStep> step = new ParallelizeByNodeIdStep( - control, Configuration.DEFAULT ); - int batchSize = Configuration.DEFAULT.batchSize(); - Batch a = new Batch<>( new InputRelationship[batchSize] ); - setIds( a, 0, 2, batchSize*2 ); - Batch b = new Batch<>( new InputRelationship[batchSize] ); - setIds( b, 1, 2, batchSize*2 ); - Batch aa = new Batch<>( new InputRelationship[batchSize] ); - setIds( aa, 0, 2, batchSize*2 ); - Batch bb = new Batch<>( new InputRelationship[batchSize] ); - setIds( bb, 1, 2, batchSize*2 ); - - // WHEN - step.process( a, sender ); - step.process( b, sender ); - step.process( aa, sender ); - step.process( bb, sender ); - - // THEN - assertTrue( a.parallelizableWithPrevious ); // because it's the first batch - assertTrue( b.parallelizableWithPrevious ); // because no id here collides with the previous batch - assertFalse( aa.parallelizableWithPrevious ); // because there's one or more ids in this batch that collides - // with the first batch - assertTrue( bb.parallelizableWithPrevious ); // because no id here collides with aa - } - - @Test - public void shouldSkipReservervedId() throws Throwable - { - // GIVEN - ProcessorStep> step = new ParallelizeByNodeIdStep( control, - Configuration.DEFAULT, IdGeneratorImpl.INTEGER_MINUS_ONE - 123_456 ); - int batchSize = Configuration.DEFAULT.batchSize(); - - // WHEN - Batch batch = new Batch<>( new InputRelationship[batchSize] ); - batch.ids = new long[] {1L}; - batch.sortedIds = batch.ids.clone(); - while ( batch.firstRecordId < IdGeneratorImpl.INTEGER_MINUS_ONE ) - { - step.process( batch, sender ); - assertFalse( "Batch got first id " + batch.firstRecordId + " which contains the reserved id", - idWithin( IdGeneratorImpl.INTEGER_MINUS_ONE, - batch.firstRecordId, batch.firstRecordId + batchSize ) ); - } - - assertTrue( batch.firstRecordId > IdGeneratorImpl.INTEGER_MINUS_ONE ); - } - - private boolean idWithin( long id, long low, long high ) - { - return id >= low && id <= high; - } - - private void setIds( Batch batch, long first, long stride, int count ) - { - batch.ids = new long[count]; - long value = first; - for ( int i = 0; i < count; i++ ) - { - batch.ids[i] = value; - value += stride; - } - batch.sortedIds = batch.ids.clone(); - Arrays.sort( batch.sortedIds ); - } -} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java index ff1aaa1746b77..1c1f1d4c65beb 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ProcessorStepTest.java @@ -26,14 +26,13 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import org.neo4j.graphdb.Resource; import org.neo4j.test.OtherThreadExecutor.WorkerCommand; import org.neo4j.test.OtherThreadRule; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_PROCESS; +import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; public class ProcessorStepTest { @@ -46,7 +45,7 @@ public void shouldUpholdProcessOrderingGuarantee() throws Exception // GIVEN StageControl control = mock( StageControl.class ); MyProcessorStep step = new MyProcessorStep( control, 0 ); - step.start( ORDER_PROCESS ); + step.start( ORDER_SEND_DOWNSTREAM ); step.processors( 4 ); // now at 5 // WHEN @@ -74,7 +73,7 @@ public void shouldHaveTaskQueueSizeEqualToNumberOfProcessorsIfSpecificallySet() final CountDownLatch latch = new CountDownLatch( 1 ); final int processors = 2; final ProcessorStep step = new BlockingProcessorStep( control, processors, latch ); - step.start( ORDER_PROCESS ); + step.start( ORDER_SEND_DOWNSTREAM ); step.processors( 1 ); // now at 2 // adding two should be fine for ( int i = 0; i < processors+1 /* +1 since we allow queueing one more*/; i++ ) @@ -98,7 +97,7 @@ public void shouldHaveTaskQueueSizeEqualToCurrentNumberOfProcessorsIfNotSpecific StageControl control = mock( StageControl.class ); final CountDownLatch latch = new CountDownLatch( 1 ); final ProcessorStep step = new BlockingProcessorStep( control, 0, latch ); - step.start( ORDER_PROCESS ); + step.start( ORDER_SEND_DOWNSTREAM ); step.processors( 2 ); // now at 3 // adding two should be fine for ( int i = 0; i < step.processors( 0 )+1 /* +1 since we allow queueing one more*/; i++ ) @@ -149,19 +148,10 @@ private MyProcessorStep( StageControl control, int maxProcessors ) super( control, "test", Configuration.DEFAULT, maxProcessors ); } - @Override - protected Resource permit( Integer batch ) throws Throwable - { - // Sleep a little to allow other processors much more easily to catch up and have - // a chance to race, if permit ordering guarantee isn't upheld, that is. - Thread.sleep( 10 ); - assertEquals( nextExpected.getAndIncrement(), batch.intValue() ); - return super.permit( batch ); - } - @Override protected void process( Integer batch, BatchSender sender ) throws Throwable { // No processing in this test + nextExpected.incrementAndGet(); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java index 6d9557037bb3f..96b582cba61b3 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/ReadRecordsStepTest.java @@ -26,19 +26,26 @@ import java.util.stream.Stream; import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.impl.store.RecordCursor; import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; +import org.neo4j.kernel.impl.store.record.AbstractBaseRecord; import org.neo4j.kernel.impl.store.record.NodeRecord; +import org.neo4j.kernel.impl.store.record.RecordLoad; import org.neo4j.unsafe.impl.batchimport.StoreWithReservedId; import org.neo4j.unsafe.impl.batchimport.staging.ReadRecordsStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; -import static org.neo4j.unsafe.impl.batchimport.RecordIdIteration.allIn; +import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; +import static org.neo4j.unsafe.impl.batchimport.RecordIdIterator.allIn; public class ReadRecordsStepTest { @@ -47,11 +54,14 @@ public void reservedIdIsSkipped() { long highId = 5; int batchSize = (int) highId; + org.neo4j.unsafe.impl.batchimport.Configuration config = withBatchSize( DEFAULT, batchSize ); NodeStore store = StoreWithReservedId.newNodeStoreMock( highId ); when( store.getHighId() ).thenReturn( highId ); + when( store.getRecordsPerPage() ).thenReturn( 10 ); - ReadRecordsStep step = new ReadRecordsStep<>( mock( StageControl.class ), DEFAULT, - store, allIn( store ) ); + ReadRecordsStep step = new ReadRecordsStep<>( mock( StageControl.class ), config, + store, allIn( store, config ) ); + step.start( 0 ); Object batch = step.nextBatchOrNull( 0, batchSize ); @@ -62,8 +72,98 @@ public void reservedIdIsSkipped() assertFalse( "Batch contains record with reserved id " + Arrays.toString( records ), hasRecordWithReservedId ); } + @Test + public void shouldContinueThroughBigIdHoles() throws Exception + { + // GIVEN + NodeStore store = mock( NodeStore.class ); + long highId = 100L; + when( store.getHighId() ).thenReturn( highId ); + when( store.newRecord() ).thenReturn( new NodeRecord( -1 ) ); + org.neo4j.unsafe.impl.batchimport.Configuration config = withBatchSize( DEFAULT, 10 ); + when( store.newRecordCursor( any( NodeRecord.class ) ) ).thenAnswer( invocation -> + { + return new ControlledRecordCursor<>( (NodeRecord) invocation.getArguments()[0], record -> + { + record.setInUse( record.getId() < config.batchSize() || record.getId() >= highId - config.batchSize()/2 ); + return record.inUse() && record.getId() < highId; + } ); + } ); + ReadRecordsStep step = new ReadRecordsStep<>( mock( StageControl.class ), + config, store, allIn( store, config ) ); + step.start( 0 ); + + // WHEN + NodeRecord[] first = (NodeRecord[]) step.nextBatchOrNull( 0, config.batchSize() ); + NodeRecord[] second = (NodeRecord[]) step.nextBatchOrNull( 1, config.batchSize() ); + NodeRecord[] third = (NodeRecord[]) step.nextBatchOrNull( 2, config.batchSize() ); + + // THEN + assertEquals( config.batchSize(), first.length ); + assertEquals( 0L, first[0].getId() ); + assertEquals( first[0].getId() + config.batchSize() - 1, first[first.length-1].getId() ); + + assertEquals( config.batchSize()/2, second.length ); + assertEquals( highId - 1, second[second.length-1].getId() ); + + assertNull( third ); + } + private static Predicate recordWithReservedId() { return record -> record.getId() == IdGeneratorImpl.INTEGER_MINUS_ONE; } + + private static class ControlledRecordCursor implements RecordCursor + { + private final RECORD record; + private final Predicate populator; + + public ControlledRecordCursor( RECORD record, Predicate populator ) + { + this.record = record; + this.populator = populator; + } + + @Override + public void close() + { // Nothing to close + } + + @Override + public RECORD get() + { + return record; + } + + @Override + public RecordCursor acquire( long id, RecordLoad mode ) + { + return this; + } + + @Override + public void placeAt( long id, RecordLoad mode ) + { // Don't care about this, we only care about next(id) in this class + } + + @Override + public boolean next() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean next( long id ) + { + record.setId( id ); + return populator.test( record ); + } + + @Override + public boolean next( long id, RECORD record, RecordLoad mode ) + { + throw new UnsupportedOperationException(); + } + } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java new file mode 100644 index 0000000000000..1c2d9ff1e1dbf --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/staging/SimpleStageControl.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.staging; + +import org.neo4j.helpers.Exceptions; + +/** + * A simple {@link StageControl} for tests with multiple steps and where an error or assertion failure + * propagates to other steps. Create the {@link SimpleStageControl}, pass it into the {@link Step steps} + * and then when all steps are created, call {@link #steps(Step...)} to let the control know about them. + */ +public class SimpleStageControl implements StageControl +{ + private volatile Throwable panic; + private volatile Step[] steps; + + public void steps( Step... steps ) + { + this.steps = steps; + } + + @Override + public void panic( Throwable cause ) + { + this.panic = cause; + for ( Step step : steps ) + { + step.receivePanic( cause ); + } + } + + public void assertHealthy() + { + if ( panic != null ) + { + throw Exceptions.launderedException( panic ); + } + } +}