From 0fa339fdb06866eb0d6bf6495e5adb48ffdf48ac Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 11 Apr 2017 21:07:39 +0200 Subject: [PATCH] Utilizes available memory and imports multiple types at a time Instead of only a single type at a time. This re-introduces caching for keeping relationship heads for multiple types per dense node in NodeRelationshipCache. This will get the best of both the previous strategy of always importing all relationships in one round AND the current strategy of importing one type per round (to reduce memory consumption). Now the amount of available memory will decide how many rounds of relationship imports are required. This also removes the per-type-splitting on-disk caching of the input data which was done when importing relationships of the first (and biggest) relationship type, something which in most case will improve performance of the import in general and avoid sections of relationship import where seemingly there were no progress, due to only caching relationships. There's now an additional setting for how much memory the importer can use as a whole and is by default based on amount of free physical memory on the machine. The defragmentation of relationship groups after relationship import also makes use of available memory and can reduce number of rounds needed. --- .../ParallelBatchImporterTest.java | 14 +- .../java/org/neo4j/tooling/ImportTool.java | 44 ++- .../org/neo4j/tooling/ImportToolTest.java | 46 +++ .../kernel/impl/util/IoPrimitiveUtils.java | 10 + .../BatchInsertRelationshipsStage.java | 60 --- .../BatchInsertRelationshipsStep.java | 134 ------- .../batchimport/CalculateDenseNodesStage.java | 9 +- .../impl/batchimport/Configuration.java | 37 ++ .../batchimport/InputIteratorBatcherStep.java | 8 +- .../NodeFirstRelationshipProcessor.java | 9 +- .../NodeFirstRelationshipStage.java | 6 +- .../unsafe/impl/batchimport/NodeStage.java | 2 +- .../batchimport/ParallelBatchImporter.java | 114 +++--- .../ReadNodeRecordsByCacheStep.java | 8 +- .../batchimport/RelationshipEncoderStep.java | 5 +- .../batchimport/RelationshipGroupCache.java | 17 +- .../RelationshipLinkbackProcessor.java | 103 ----- .../RelationshipLinkbackStage.java | 4 +- .../batchimport/RelationshipLinkbackStep.java | 26 +- .../impl/batchimport/RelationshipStage.java | 5 +- .../RelationshipTypeCheckerStep.java | 32 +- .../RelationshipTypeDistribution.java | 50 +++ .../batchimport/cache/BaseNumberArray.java | 2 +- .../cache/NodeRelationshipCache.java | 366 +++++++++++++----- .../impl/batchimport/cache/NodeType.java | 51 +++ .../input/PerTypeRelationshipSplitter.java | 216 ----------- .../staging/IteratorBatcherStep.java | 26 +- .../RelationshipLinkbackStageTest.java | 3 +- .../RelationshipTypeCheckerStepTest.java | 17 +- .../cache/NodeRelationshipCacheTest.java | 207 +++++++--- .../PerTypeRelationshipSplitterTest.java | 126 ------ 31 files changed, 825 insertions(+), 932 deletions(-) delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java delete mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java delete mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java diff --git a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java index b466b0688982e..9bc566bd3eff9 100644 --- a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java @@ -101,6 +101,7 @@ public class ParallelBatchImporterTest private static final int NODE_COUNT = 10_000; private static final int RELATIONSHIPS_PER_NODE = 5; private static final int RELATIONSHIP_COUNT = NODE_COUNT * RELATIONSHIPS_PER_NODE; + private static final int RELATIONSHIP_TYPES = 3; protected final Configuration config = new Configuration() { @Override @@ -124,6 +125,17 @@ public int maxNumberOfProcessors() int cores = Runtime.getRuntime().availableProcessors(); return random.intBetween( cores, cores + 100 ); } + + @Override + public long maxMemoryUsage() + { + // This calculation is just to try and hit some sort of memory limit so that relationship import + // is split up into multiple rounds. Also to see that relationship group defragmentation works + // well when doing multiple rounds. + double ratio = (NODE_COUNT / 1_000D ); + long mebi = 1024 * 1024; + return random.nextInt( (int) (ratio * mebi / 2), (int) (ratio * mebi) ); + } }; private final InputIdGenerator inputIdGenerator; private final IdMapper idMapper; @@ -253,7 +265,7 @@ public abstract static class InputIdGenerator String randomType( Random random ) { - return "TYPE" + random.nextInt( 3 ); + return "TYPE" + random.nextInt( RELATIONSHIP_TYPES ); } @Override diff --git a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java index 4cbb152dcf648..150155d7afabe 100644 --- a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java +++ b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java @@ -44,6 +44,7 @@ import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.StoreLogService; import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy; @@ -82,6 +83,8 @@ import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; import static org.neo4j.kernel.impl.util.Converters.withDefault; import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME; +import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT_MAX_MEMORY_PERCENT; +import static org.neo4j.unsafe.impl.batchimport.Configuration.calculateMaxMemoryFromPercent; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.badCollector; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.collect; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector; @@ -99,7 +102,6 @@ public class ImportTool { private static final String UNLIMITED = "true"; - private static final int UNSPECIFIED = -1; enum Options { @@ -232,7 +234,13 @@ enum Options READ_BUFFER_SIZE( "read-buffer-size", org.neo4j.csv.reader.Configuration.DEFAULT.bufferSize(), "", "Size of each buffer for reading input data. It has to at least be large enough to hold the " + - "biggest single value in the input data." ); + "biggest single value in the input data." ), + MAX_MEMORY( "max-memory", null, + "", + "(advanced) Maximum memory that importer can use for various data structures and caching " + + "to improve performance. By default set to " + DEFAULT_MAX_MEMORY_PERCENT + + "% of (free memory on machine - max JVM memory). " + + "Values can be plain numbers, like 10000000 or e.g. 20G for 20 gigabyte, or even e.g. 70%." ); private final String key; private final Object defaultValue; @@ -378,10 +386,10 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit Config dbConfig; OutputStream badOutput = null; IdType idType = null; - int pageSize = UNSPECIFIED; org.neo4j.unsafe.impl.batchimport.Configuration configuration = null; File logsDir; File badFile = null; + Long maxMemory = null; boolean success = false; try @@ -402,6 +410,8 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit } nodesFiles = extractInputFiles( args, Options.NODE_DATA.key(), err ); relationshipsFiles = extractInputFiles( args, Options.RELATIONSHIP_DATA.key(), err ); + String maxMemoryString = args.get( Options.MAX_MEMORY.key(), null ); + maxMemory = parseMaxMemory( maxMemoryString ); validateInputFiles( nodesFiles, relationshipsFiles ); enableStacktrace = args.getBoolean( Options.STACKTRACE.key(), Boolean.FALSE, Boolean.TRUE ); @@ -423,7 +433,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit dbConfig = loadDbConfig( args.interpretOption( Options.DATABASE_CONFIG.key(), Converters.optional(), Converters.toFile(), Validators.REGEX_FILE_EXISTS ) ); - configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, pageSize ); + configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, maxMemory ); input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(), relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(), idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector, @@ -451,6 +461,20 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector, } } + private static Long parseMaxMemory( String maxMemoryString ) + { + if ( maxMemoryString != null ) + { + if ( maxMemoryString.endsWith( "%" ) ) + { + int percent = Integer.parseInt( maxMemoryString.substring( 0, maxMemoryString.length() - 1 ) ); + return calculateMaxMemoryFromPercent( percent ); + } + return Settings.parseLongWithUnit( maxMemoryString ); + } + return null; + } + public static void doImport( PrintStream out, PrintStream err, File storeDir, File logsDir, File badFile, FileSystemAbstraction fs, Collection> nodesFiles, Collection> relationshipsFiles, boolean enableStacktrace, Input input, @@ -570,9 +594,11 @@ private static void printOverview( File storeDir, Collection> nod printInputFiles( "Relationships", relationshipsFiles, out ); out.println(); out.println( "Available resources:" ); + printIndented( "Total machine memory: " + bytes( OsBeanUtil.getTotalPhysicalMemory() ), out ); printIndented( "Free machine memory: " + bytes( OsBeanUtil.getFreePhysicalMemory() ), out ); printIndented( "Max heap memory : " + bytes( Runtime.getRuntime().maxMemory() ), out ); printIndented( "Processors: " + configuration.maxNumberOfProcessors(), out ); + printIndented( "Configured max memory: " + bytes( configuration.maxMemoryUsage() ), out ); out.println(); } @@ -623,11 +649,11 @@ public static void validateInputFiles( Collection> nodesFiles, public static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors, final boolean defaultSettingsSuitableForTests, final Config dbConfig ) { - return importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, UNSPECIFIED ); + return importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, null ); } public static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors, - final boolean defaultSettingsSuitableForTests, final Config dbConfig, int pageSize ) + final boolean defaultSettingsSuitableForTests, final Config dbConfig, Long maxMemory ) { return new org.neo4j.unsafe.impl.batchimport.Configuration() { @@ -648,6 +674,12 @@ public int denseNodeThreshold() { return dbConfig.get( GraphDatabaseSettings.dense_node_threshold ); } + + @Override + public long maxMemoryUsage() + { + return maxMemory != null ? maxMemory.longValue() : DEFAULT.maxMemoryUsage(); + } }; } diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java b/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java index da1c23e5a7487..387b4b7c32f85 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java @@ -1741,6 +1741,52 @@ public void shouldRespectBufferSizeSetting() throws Exception } } + @Test + public void shouldRespectMaxMemoryPercentageSetting() throws Exception + { + // GIVEN + List nodeIds = nodeIds( 10 ); + + // WHEN + importTool( + "--into", dbRule.getStoreDirAbsolutePath(), + "--nodes", nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), + "--max-memory", "60%" ); + } + + @Test + public void shouldFailOnInvalidMaxMemoryPercentageSetting() throws Exception + { + // GIVEN + List nodeIds = nodeIds( 10 ); + + try + { + // WHEN + importTool( "--into", dbRule.getStoreDirAbsolutePath(), "--nodes", + nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), "--max-memory", "110%" ); + fail( "Should have failed" ); + } + catch ( IllegalArgumentException e ) + { + // THEN good + assertThat( e.getMessage(), containsString( "percent" ) ); + } + } + + @Test + public void shouldRespectMaxMemorySuffixedSetting() throws Exception + { + // GIVEN + List nodeIds = nodeIds( 10 ); + + // WHEN + importTool( + "--into", dbRule.getStoreDirAbsolutePath(), + "--nodes", nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), + "--max-memory", "100M" ); + } + private File writeArrayCsv( String[] headers, String[] values ) throws FileNotFoundException { File data = file( fileName( "whitespace.csv" ) ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java index 05c117d9199de..b66c6f9a0dea8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java @@ -233,6 +233,16 @@ public static int safeCastLongToInt( long value ) return (int) value; } + public static short safeCastIntToUnsignedShort( int value ) + { + if ( (value & ~0xFFFF) != 0 ) + { + throw new IllegalArgumentException( + "Casting int value " + value + " to an unsigned short would wrap around" ); + } + return (short) value; + } + public static int shortToUnsignedInt( short value ) { return value & 0xFFFF; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java deleted file mode 100644 index d4a05329dc735..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.kernel.impl.store.record.PropertyBlock; -import org.neo4j.unsafe.batchinsert.BatchInserter; -import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; -import org.neo4j.unsafe.impl.batchimport.input.Input; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.Stage; -import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; - -import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; - -/** - * Inserts relationships one by one, {@link BatchInserter} style which may incur random I/O. This stage - * should only be used on relationship types which are very small (<100). Steps: - * - *
    - *
  1. {@link InputIteratorBatcherStep} reading from {@link InputIterator} produced from {@link Input#nodes()}.
  2. - *
  3. {@link RelationshipPreparationStep} looks up {@link InputRelationship#startNode() start node input id} / - * {@link InputRelationship#endNode() end node input id} from {@link IdMapper} and attaches to the batches going - * through because that lookup is costly and this step can be parallelized.
  4. - *
  5. {@link PropertyEncoderStep} encodes properties from {@link InputRelationship input relationships} into - * {@link PropertyBlock}, low level kernel encoded values.
  6. - *
  7. {@link BatchInsertRelationshipsStep} inserts relationships one by one by reading from and updating store - * as it sees required.
  8. - *
- */ -public class BatchInsertRelationshipsStage extends Stage -{ - public BatchInsertRelationshipsStage( Configuration config, IdMapper idMapper, - InputIterator relationships, BatchingNeoStores store, long nextRelationshipId ) - { - super( "Minority relationships", config, ORDER_SEND_DOWNSTREAM ); - add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class ) ); - add( new RelationshipPreparationStep( control(), config, idMapper ) ); - add( new PropertyEncoderStep<>( control(), config, store.getPropertyKeyRepository(), - store.getPropertyStore() ) ); - add( new BatchInsertRelationshipsStep( control(), config, store, - store.getRelationshipTypeRepository(), nextRelationshipId ) ); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java deleted file mode 100644 index 9778c88405780..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import java.util.function.ToIntFunction; - -import org.neo4j.kernel.impl.locking.Locks; -import org.neo4j.kernel.impl.locking.NoOpClient; -import org.neo4j.kernel.impl.store.PropertyStore; -import org.neo4j.kernel.impl.store.RecordStore; -import org.neo4j.kernel.impl.store.id.IdSequence; -import org.neo4j.kernel.impl.store.record.PropertyBlock; -import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.kernel.impl.transaction.state.PropertyCreator; -import org.neo4j.kernel.impl.transaction.state.PropertyTraverser; -import org.neo4j.kernel.impl.transaction.state.RelationshipCreator; -import org.neo4j.kernel.impl.transaction.state.RelationshipGroupGetter; -import org.neo4j.kernel.impl.util.ReusableIteratorCostume; -import org.neo4j.unsafe.batchinsert.DirectRecordAccessSet; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import org.neo4j.unsafe.impl.batchimport.store.BatchingIdSequence; -import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; - -import static org.neo4j.unsafe.impl.batchimport.EntityStoreUpdaterStep.reassignDynamicRecordIds; - -public class BatchInsertRelationshipsStep extends ProcessorStep> -{ - private final ToIntFunction typeToId; - private final RelationshipCreator relationshipCreator; - private final Locks.Client noopLockClient = new NoOpClient(); - private final PropertyCreator propertyCreator; - private final DirectRecordAccessSet recordAccess; - private final PropertyStore propertyStore; - private int pendingRelationshipChanges; - - // Reusable instances for less GC - private final ReusableIteratorCostume blockIterator = new ReusableIteratorCostume<>(); - private final IdSequence relationshipIdGenerator; - - public BatchInsertRelationshipsStep( StageControl control, Configuration config, BatchingNeoStores store, - ToIntFunction typeToId, long nextRelationshipId ) - { - super( control, "INSERT", config, 1 ); - this.typeToId = typeToId; - RecordStore relationshipGroupStore = store.getTemporaryRelationshipGroupStore(); - RelationshipGroupGetter groupGetter = new RelationshipGroupGetter( relationshipGroupStore ); - this.relationshipCreator = new RelationshipCreator( groupGetter, config.denseNodeThreshold() ); - PropertyTraverser propertyTraverser = new PropertyTraverser(); - this.propertyCreator = new PropertyCreator( store.getPropertyStore(), propertyTraverser ); - this.propertyStore = store.getPropertyStore(); - this.recordAccess = new DirectRecordAccessSet( store.getNodeStore(), propertyStore, - store.getRelationshipStore(), relationshipGroupStore, - store.getNeoStores().getPropertyKeyTokenStore(), - store.getNeoStores().getRelationshipTypeTokenStore(), - store.getNeoStores().getLabelTokenStore(), store.getNeoStores().getSchemaStore() ); - this.relationshipIdGenerator = new BatchingIdSequence( nextRelationshipId ); - } - - @Override - protected void process( Batch batch, BatchSender sender ) throws Throwable - { - for ( int i = 0, propertyBlockCursor = 0; i < batch.input.length; i++ ) - { - InputRelationship input = batch.input[i]; - int propertyBlockCount = batch.propertyBlocksLengths[i]; - - // Create relationship - long startNodeId = batch.ids[i*2]; - long endNodeId = batch.ids[i*2+1]; - if ( startNodeId != -1 && endNodeId != -1 ) - { - long id = relationshipIdGenerator.nextId(); - int typeId = typeToId.applyAsInt( input.typeAsObject() ); - relationshipCreator.relationshipCreate( id, typeId, startNodeId, endNodeId, recordAccess, noopLockClient ); - - // Set properties - RelationshipRecord record = recordAccess.getRelRecords().getOrLoad( id, null ).forChangingData(); - if ( input.hasFirstPropertyId() ) - { - record.setNextProp( input.firstPropertyId() ); - } - else - { - if ( propertyBlockCount > 0 ) - { - reassignDynamicRecordIds( propertyStore, batch.propertyBlocks, - propertyBlockCursor, propertyBlockCount ); - long firstProp = propertyCreator.createPropertyChain( record, - blockIterator.dressArray( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount ), - recordAccess.getPropertyRecords() ); - record.setNextProp( firstProp ); - } - } - } - // else --> This is commonly known as input relationship referring to missing node IDs - propertyBlockCursor += propertyBlockCount; - } - - pendingRelationshipChanges += batch.input.length; - if ( pendingRelationshipChanges >= 50_000 ) - { - recordAccess.close(); // <-- happens to be called close even though this impl just flushes - pendingRelationshipChanges = 0; - } - } - - @Override - protected void done() - { - recordAccess.close(); - super.done(); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java index dc6d088b524a5..89011def17d97 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java @@ -68,7 +68,7 @@ public CalculateDenseNodesStage( Configuration config, InputIterable( control(), config, - relationships.iterator(), InputRelationship.class ) ); + relationships.iterator(), InputRelationship.class, t -> true ) ); if ( !relationships.supportsMultiplePasses() ) { add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships( MAIN ) ) ); @@ -79,11 +79,8 @@ public CalculateDenseNodesStage( Configuration config, InputIterable 0, was " + percent ); + } + if ( percent > 100 ) + { + throw new IllegalArgumentException( "Expected percentage to be < 100, was " + percent ); + } + long freePhysicalMemory = OsBeanUtil.getFreePhysicalMemory(); + if ( freePhysicalMemory == OsBeanUtil.VALUE_UNAVAILABLE ) + { + throw new UnsupportedOperationException( + "Unable to detect amount of free memory, so max memory has to be explicitly set" ); + } + + double factor = percent / 100D; + return round( (freePhysicalMemory - Runtime.getRuntime().maxMemory()) * factor ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java index e564049a7069f..f4dfccbb535f7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Predicate; + import org.neo4j.unsafe.impl.batchimport.staging.IteratorBatcherStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -30,10 +32,10 @@ public class InputIteratorBatcherStep extends IteratorBatcherStep { private final InputIterator data; - public InputIteratorBatcherStep( StageControl control, Configuration config, - InputIterator data, Class itemClass ) + InputIteratorBatcherStep( StageControl control, Configuration config, InputIterator data, Class itemClass, + Predicate filter ) { - super( control, config, data, itemClass ); + super( control, config, data, itemClass, filter ); this.data = data; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java index c0bbce1e74890..6fecbbf05ec98 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java @@ -37,14 +37,12 @@ public class NodeFirstRelationshipProcessor implements RecordProcessor relGroupStore; private final NodeRelationshipCache cache; - private final int relationshipType; public NodeFirstRelationshipProcessor( RecordStore relGroupStore, - NodeRelationshipCache cache, int relationshipType ) + NodeRelationshipCache cache ) { this.relGroupStore = relGroupStore; this.cache = cache; - this.relationshipType = relationshipType; } @Override @@ -64,18 +62,17 @@ public boolean process( NodeRecord node ) } @Override - public long visit( long nodeId, long next, long out, long in, long loop ) + public long visit( long nodeId, int typeId, long out, long in, long loop ) { // Here we'll use the already generated id (below) from the previous visit, if that so happened long id = relGroupStore.nextId(); RelationshipGroupRecord groupRecord = new RelationshipGroupRecord( id ); - groupRecord.setType( relationshipType ); + groupRecord.setType( typeId ); groupRecord.setInUse( true ); groupRecord.setFirstOut( out ); groupRecord.setFirstIn( in ); groupRecord.setFirstLoop( loop ); groupRecord.setOwningNode( nodeId ); - groupRecord.setNext( next ); relGroupStore.prepareForCommit( groupRecord ); relGroupStore.updateRecord( groupRecord ); return id; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java index dadf7c93aa053..18fd82dddb5b5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java @@ -42,12 +42,12 @@ public class NodeFirstRelationshipStage extends Stage { public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore, RecordStore relationshipGroupStore, NodeRelationshipCache cache, - boolean denseNodes, int relationshipType ) + int nodeTypes ) { super( "Node --> Relationship" + topic, config ); - add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, denseNodes ) ); + add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, nodeTypes ) ); add( new RecordProcessorStep<>( control(), "LINK", config, - new NodeFirstRelationshipProcessor( relationshipGroupStore, cache, relationshipType ), false ) ); + new NodeFirstRelationshipProcessor( relationshipGroupStore, cache ), false ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java index 3670661c3fb76..e8d789c908f12 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java @@ -72,7 +72,7 @@ public NodeStage( Configuration config, IoMonitor writeMonitor, { super( "Nodes", config, ORDER_SEND_DOWNSTREAM ); this.cache = cache; - add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class ) ); + add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class, t -> true ) ); if ( !nodes.supportsMultiplePasses() ) { add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes( MAIN ) ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index dcc0318983470..98260cbe720fb 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -21,7 +21,9 @@ import java.io.File; import java.io.IOException; -import java.util.Set; +import java.util.Collection; +import java.util.Iterator; +import java.util.function.Predicate; import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.helpers.Exceptions; @@ -42,6 +44,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.MemoryStatsVisitor; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -49,7 +52,6 @@ import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.input.PerTypeRelationshipSplitter; import org.neo4j.unsafe.impl.batchimport.staging.DynamicProcessorAssigner; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -57,12 +59,10 @@ import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; -import static java.lang.Math.max; +import static java.lang.Long.max; import static java.lang.String.format; import static java.lang.System.currentTimeMillis; import static org.neo4j.helpers.Format.bytes; -import static org.neo4j.helpers.collection.Iterators.asSet; -import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure; @@ -155,6 +155,7 @@ public void doImport( Input input ) throws IOException // Things that we need to close later. The reason they're not in the try-with-resource statement // is that we need to close, and set to null, at specific points preferably. So use good ol' finally block. + long maxMemory = config.maxMemoryUsage(); NodeRelationshipCache nodeRelationshipCache = null; NodeLabelsCache nodeLabelsCache = null; long startTime = currentTimeMillis(); @@ -175,7 +176,7 @@ public void doImport( Input input ) throws IOException InputIterable relationships = input.relationships(); InputIterable cachedNodes = cachedForSure( nodes, inputCache.nodes( MAIN, true ) ); InputIterable cachedRelationships = - cachedForSure( relationships, inputCache.relationships( MAIN, true ) ); + cachedForSure( relationships, inputCache.relationships( MAIN, false ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); @@ -201,10 +202,10 @@ public void doImport( Input input ) throws IOException relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); executeStage( calculateDenseNodesStage ); + long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper ); importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor, - idMapper, cachedRelationships, inputCache, - calculateDenseNodesStage.getRelationshipTypes( Long.MAX_VALUE ), - calculateDenseNodesStage.getRelationshipTypes( 100 ) ); + idMapper, cachedRelationships, calculateDenseNodesStage.getDistribution(), + availableMemory ); // Release this potentially really big piece of cached data long peakMemoryUsage = totalMemoryUsageOf( idMapper, nodeRelationshipCache ); @@ -214,8 +215,8 @@ public void doImport( Input input ) throws IOException nodeRelationshipCache.close(); nodeRelationshipCache = null; - new RelationshipGroupDefragmenter( config, executionMonitor ).run( - max( max( peakMemoryUsage, highNodeId * 4 ), mebiBytes( 1 ) ), neoStore, highNodeId ); + new RelationshipGroupDefragmenter( config, executionMonitor ).run( max( maxMemory, peakMemoryUsage ), + neoStore, highNodeId ); // Stage 6 -- count nodes per label and labels per node nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); @@ -281,7 +282,7 @@ private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users ) private void importRelationships( NodeRelationshipCache nodeRelationshipCache, CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore, IoMonitor writeMonitor, IdMapper idMapper, InputIterable relationships, - InputCache inputCache, Object[] allRelationshipTypes, Object[] minorityRelationshipTypes ) + RelationshipTypeDistribution typeDistribution, long freeMemoryForDenseNodeCache ) { // Imports the relationships from the Input. This isn't a straight forward as importing nodes, // since keeping track of and updating heads of relationship chains in scenarios where most nodes @@ -295,65 +296,72 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, // finally there will be one Node --> Relationship and Relationship --> Relationship stage linking // all sparse relationship chains together. - Set minorityRelationshipTypeSet = asSet( minorityRelationshipTypes ); - PerTypeRelationshipSplitter perTypeIterator = new PerTypeRelationshipSplitter( - relationships.iterator(), - allRelationshipTypes, - type -> minorityRelationshipTypeSet.contains( type ), - neoStore.getRelationshipTypeRepository(), - inputCache ); - long nextRelationshipId = 0; Configuration relationshipConfig = withBatchSize( config, neoStore.getRelationshipStore().getRecordsPerPage() ); Configuration nodeConfig = withBatchSize( config, neoStore.getNodeStore().getRecordsPerPage() ); - for ( int i = 0; perTypeIterator.hasNext(); i++ ) + Iterator> rounds = nodeRelationshipCache.splitRelationshipTypesIntoRounds( + typeDistribution.iterator(), freeMemoryForDenseNodeCache ); + + // Do multiple rounds of relationship importing. Each round fits as many relationship types + // as it can (comparing with worst-case memory usage and available memory). + int typesImported = 0; + int round = 0; + for ( round = 0; rounds.hasNext(); round++ ) { - // Stage 3a -- relationships, properties - nodeRelationshipCache.setForwardScan( true ); - Object currentType = perTypeIterator.currentType(); - int currentTypeId = neoStore.getRelationshipTypeRepository().getOrCreateId( currentType ); + // Figure out which types we can fit in node-->relationship cache memory. + // Types go from biggest to smallest group and so towards the end there will be + // smaller and more groups per round in this loop + Collection typesToImportThisRound = rounds.next(); + boolean thisIsTheOnlyRound = round == 0 && !rounds.hasNext(); - InputIterator perType = perTypeIterator.next(); - String topic = " [:" + currentType + "] (" + - (i + 1) + "/" + allRelationshipTypes.length + ")"; - final RelationshipStage relationshipStage = new RelationshipStage( topic, config, - writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache, - storeUpdateMonitor, nextRelationshipId ); + // Import relationships and their properties + nodeRelationshipCache.setForwardScan( true, true/*dense*/ ); + String range = typesToImportThisRound.size() == 1 + ? String.valueOf( typesImported + 1 ) + : (typesImported + 1) + "-" + (typesImported + typesToImportThisRound.size()); + String topic = " " + range + "/" + typeDistribution.getNumberOfRelationshipTypes(); + Predicate typeFilter = thisIsTheOnlyRound + ? relationship -> true // optimization when all rels are imported in this round + : relationship -> typesToImportThisRound.contains( relationship.typeAsObject() ); + RelationshipStage relationshipStage = new RelationshipStage( topic, config, + writeMonitor, typeFilter, relationships.iterator(), idMapper, neoStore, + nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId ); executeStage( relationshipStage ); - // Stage 4a -- set node nextRel fields for dense nodes + int nodeTypes = thisIsTheOnlyRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE; + + // Set node nextRel fields for dense nodes executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, true/*dense*/, - currentTypeId ) ); + neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, nodeTypes ) ); - // Stage 5a -- link relationship chains together for dense nodes - nodeRelationshipCache.setForwardScan( false ); + // Link relationship chains together for dense nodes + nodeRelationshipCache.setForwardScan( false, true/*dense*/ ); executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, nextRelationshipId, - relationshipStage.getNextRelationshipId(), true/*dense*/ ) ); + relationshipStage.getNextRelationshipId(), nodeTypes ) ); nextRelationshipId = relationshipStage.getNextRelationshipId(); - nodeRelationshipCache.clearChangedChunks( true/*dense*/ ); // cheap higher level clearing + typesImported += typesToImportThisRound.size(); } - String topic = " Sparse"; - nodeRelationshipCache.setForwardScan( true ); - // Stage 4b -- set node nextRel fields for sparse nodes - executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, false/*sparse*/, -1 ) ); - - // Stage 5b -- link relationship chains together for sparse nodes - nodeRelationshipCache.setForwardScan( false ); - executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), - nodeRelationshipCache, 0, nextRelationshipId, false/*sparse*/ ) ); - - if ( minorityRelationshipTypes.length > 0 ) + // There's an optimization above which will piggy-back sparse linking on the dense linking + // if all relationships are imported in one round. The sparse linking below will be done if + // there were multiple passes of dense linking above. + if ( round > 1 ) { - // Do some batch insertion style random-access insertions for super small minority types - executeStage( new BatchInsertRelationshipsStage( config, idMapper, - perTypeIterator.getMinorityRelationships(), neoStore, nextRelationshipId ) ); + // Set node nextRel fields for sparse nodes + String topic = " Sparse"; + nodeRelationshipCache.setForwardScan( true, false/*sparse*/ ); + executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), + neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, NodeType.NODE_TYPE_SPARSE ) ); + + // Link relationship chains together for sparse nodes + nodeRelationshipCache.setForwardScan( false, false/*sparse*/ ); + executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, + neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId, + NodeType.NODE_TYPE_SPARSE ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java index 5398bc7332dc9..06700e146c81a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java @@ -37,17 +37,17 @@ */ public class ReadNodeRecordsByCacheStep extends AbstractStep { - private final boolean denseNodes; + private final int nodeTypes; private final NodeRelationshipCache cache; private final int batchSize; private final RecordCursor recordCursor; public ReadNodeRecordsByCacheStep( StageControl control, Configuration config, - NodeStore nodeStore, NodeRelationshipCache cache, boolean denseNodes ) + NodeStore nodeStore, NodeRelationshipCache cache, int nodeTypes ) { super( control, ">", config ); this.cache = cache; - this.denseNodes = denseNodes; + this.nodeTypes = nodeTypes; this.batchSize = config.batchSize(); this.recordCursor = nodeStore.newRecordCursor( nodeStore.newRecord() ); } @@ -77,7 +77,7 @@ public void run() assertHealthy(); try ( NodeVisitor visitor = new NodeVisitor() ) { - cache.visitChangedNodes( visitor, denseNodes ); + cache.visitChangedNodes( visitor, nodeTypes ); } endOfUpstream(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java index 5798fc87b6822..84505803214e4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java @@ -62,10 +62,11 @@ protected void forkedProcess( int id, int processors, Batch private long fromNodeId; private long toNodeId; private long highCacheId; + private final long maxCacheLength; public RelationshipGroupCache( NumberArrayFactory arrayFactory, long maxMemory, long highNodeId ) { @@ -71,7 +74,8 @@ public RelationshipGroupCache( NumberArrayFactory arrayFactory, long maxMemory, bytes( maxMemory ) + " where " + bytes( memoryDedicatedToCounting ) + " was dedicated to group counting" ); } - this.cache = arrayFactory.newByteArray( memoryLeftForGroupCache / GROUP_ENTRY_SIZE, new byte[GROUP_ENTRY_SIZE] ); + maxCacheLength = memoryLeftForGroupCache / GROUP_ENTRY_SIZE; + this.cache = arrayFactory.newDynamicByteArray( max( 1_000, maxCacheLength / 100 ), new byte[GROUP_ENTRY_SIZE] ); } /** @@ -125,7 +129,7 @@ public long prepare( long fromNodeId ) for ( long nodeId = fromNodeId; nodeId < highNodeId; nodeId++ ) { int count = groupCount( nodeId ); - if ( highCacheId + count > cache.length() ) + if ( highCacheId + count > maxCacheLength ) { // Cannot include this one, so up until the previous is good return this.toNodeId = nodeId; @@ -161,7 +165,7 @@ public boolean put( RelationshipGroupRecord groupRecord ) long baseIndex = offsets.get( rebase( nodeId ) ); // grouCount is extra validation, really int groupCount = groupCount( nodeId ); - long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType() ); + long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType(), groupRecord.getOwningNode() ); // Put the group at this index cache.setByte( index, 0, (byte) 1 ); @@ -172,7 +176,7 @@ public boolean put( RelationshipGroupRecord groupRecord ) return true; } - private long scanForFreeFrom( long startIndex, int groupCount, int type ) + private long scanForFreeFrom( long startIndex, int groupCount, int type, long owningNodeId ) { long desiredIndex = -1; long freeIndex = -1; @@ -191,7 +195,8 @@ private long scanForFreeFrom( long startIndex, int groupCount, int type ) int existingType = cache.get3ByteInt( candidateIndex, 1 ); if ( existingType == type ) { - throw new IllegalStateException( "Tried to put multiple groups with same type " + type ); + throw new IllegalStateException( + "Tried to put multiple groups with same type " + type + " for node " + owningNodeId ); } if ( type < existingType ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java deleted file mode 100644 index 16f0e3f5621a2..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.graphdb.Direction; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper.ID_NOT_FOUND; - -/** - * Links the {@code previous} fields in {@link RelationshipRecord relationship records}. This is done after - * a forward pass where the {@code next} fields are linked. - */ -public class RelationshipLinkbackProcessor implements RecordProcessor -{ - private final NodeRelationshipCache cache; - private final boolean denseNodes; - - public RelationshipLinkbackProcessor( NodeRelationshipCache cache, boolean denseNodes ) - { - this.cache = cache; - this.denseNodes = denseNodes; - } - - @Override - public boolean process( RelationshipRecord record ) - { - boolean isLoop = record.getFirstNode() == record.getSecondNode(); - boolean firstIsDense = cache.isDense( record.getFirstNode() ); - boolean changed = false; - if ( isLoop ) - { - if ( firstIsDense == denseNodes ) - { - long prevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.BOTH, record.getId(), false ); - if ( prevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInFirstChain( true ); - record.setFirstInSecondChain( true ); - prevRel = cache.getCount( record.getFirstNode(), Direction.BOTH ); - } - record.setFirstPrevRel( prevRel ); - record.setSecondPrevRel( prevRel ); - changed = true; - } - } - else - { - // Start node - if ( firstIsDense == denseNodes ) - { - long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.OUTGOING, record.getId(), false ); - if ( firstPrevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInFirstChain( true ); - firstPrevRel = cache.getCount( record.getFirstNode(), Direction.OUTGOING ); - } - record.setFirstPrevRel( firstPrevRel ); - changed = true; - } - - // End node - boolean secondIsDense = cache.isDense( record.getSecondNode() ); - if ( secondIsDense == denseNodes ) - { - long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), - Direction.INCOMING, record.getId(), false ); - if ( secondPrevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInSecondChain( true ); - secondPrevRel = cache.getCount( record.getSecondNode(), Direction.INCOMING ); - } - record.setSecondPrevRel( secondPrevRel ); - changed = true; - } - } - return changed; - } - - @Override - public void done() - { // Nothing to do here - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index 8e30815ade734..cfdc4a7b7ac78 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -45,12 +45,12 @@ public class RelationshipLinkbackStage extends Stage { public RelationshipLinkbackStage( String topic, Configuration config, RelationshipStore store, - NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, boolean denseNodes ) + NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, int nodeTypes ) { super( "Relationship --> Relationship" + topic, config ); add( new ReadRecordsStep<>( control(), config, store, backwards( lowRelationshipId, highRelationshipId, config ) ) ); - add( new RelationshipLinkbackStep( control(), config, cache, denseNodes ) ); + add( new RelationshipLinkbackStep( control(), config, cache, nodeTypes ) ); add( new UpdateRecordsStep<>( control(), config, store ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java index 5002b5b9d538d..7509aab4ade9f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java @@ -22,6 +22,7 @@ import org.neo4j.graphdb.Direction; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -35,14 +36,14 @@ public class RelationshipLinkbackStep extends ForkedProcessorStep { private final NodeRelationshipCache cache; - private final boolean denseNodes; + private final int nodeTypes; public RelationshipLinkbackStep( StageControl control, Configuration config, - NodeRelationshipCache cache, boolean denseNodes ) + NodeRelationshipCache cache, int nodeTypes ) { super( control, "LINK", config, 0 ); this.cache = cache; - this.denseNodes = denseNodes; + this.nodeTypes = nodeTypes; } @Override @@ -77,19 +78,20 @@ public boolean process( RelationshipRecord record, int id, int processors ) boolean firstIsDense = cache.isDense( record.getFirstNode() ); boolean changed = false; boolean isLoop = record.getFirstNode() == record.getSecondNode(); + int typeId = record.getType(); if ( isLoop ) { - if ( firstIsDense == denseNodes ) + if ( NodeType.matchesDense( nodeTypes, firstIsDense ) ) { if ( processFirst ) { long prevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.BOTH, record.getId(), false ); + typeId, Direction.BOTH, record.getId(), false ); if ( prevRel == ID_NOT_FOUND ) { // First one record.setFirstInFirstChain( true ); record.setFirstInSecondChain( true ); - prevRel = cache.getCount( record.getFirstNode(), Direction.BOTH ); + prevRel = cache.getCount( record.getFirstNode(), typeId, Direction.BOTH ); } record.setFirstPrevRel( prevRel ); record.setSecondPrevRel( prevRel ); @@ -100,16 +102,16 @@ public boolean process( RelationshipRecord record, int id, int processors ) else { // Start node - if ( firstIsDense == denseNodes ) + if ( NodeType.matchesDense( nodeTypes, firstIsDense ) ) { if ( processFirst ) { long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.OUTGOING, record.getId(), false ); + typeId, Direction.OUTGOING, record.getId(), false ); if ( firstPrevRel == ID_NOT_FOUND ) { // First one record.setFirstInFirstChain( true ); - firstPrevRel = cache.getCount( record.getFirstNode(), Direction.OUTGOING ); + firstPrevRel = cache.getCount( record.getFirstNode(), typeId, Direction.OUTGOING ); } record.setFirstPrevRel( firstPrevRel ); } @@ -118,16 +120,16 @@ public boolean process( RelationshipRecord record, int id, int processors ) // End node boolean secondIsDense = cache.isDense( record.getSecondNode() ); - if ( secondIsDense == denseNodes ) + if ( NodeType.matchesDense( nodeTypes, secondIsDense ) ) { if ( processSecond ) { long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), - Direction.INCOMING, record.getId(), false ); + typeId, Direction.INCOMING, record.getId(), false ); if ( secondPrevRel == ID_NOT_FOUND ) { // First one record.setFirstInSecondChain( true ); - secondPrevRel = cache.getCount( record.getSecondNode(), Direction.INCOMING ); + secondPrevRel = cache.getCount( record.getSecondNode(), typeId, Direction.INCOMING ); } record.setSecondPrevRel( secondPrevRel ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index ff50c3a69f7da..db7cb78bb2b4b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Predicate; + import org.neo4j.graphdb.Direction; import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.RelationshipStore; @@ -74,12 +76,13 @@ public class RelationshipStage extends Stage private AssignRelationshipIdBatchStep idAssigner; public RelationshipStage( String topic, Configuration config, IoMonitor writeMonitor, + Predicate typeFilter, InputIterator relationships, IdMapper idMapper, BatchingNeoStores neoStore, NodeRelationshipCache cache, EntityStoreUpdaterStep.Monitor storeUpdateMonitor, long firstRelationshipId ) { super( "Relationships" + topic, config, ORDER_SEND_DOWNSTREAM ); - add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class ) ); + add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class, typeFilter ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java index 1550e8fbb8928..4baed296eab5e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java @@ -21,11 +21,9 @@ import org.apache.commons.lang3.mutable.MutableLong; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -53,7 +51,7 @@ public class RelationshipTypeCheckerStep extends ProcessorStep Integer.compare( (Integer)e2.getKey(), (Integer)e1.getKey() ); private final Map> typeCheckers = new ConcurrentHashMap<>(); private final BatchingRelationshipTypeTokenRepository typeTokenRepository; - private Map.Entry[] sortedTypes; + private RelationshipTypeDistribution distribution; public RelationshipTypeCheckerStep( StageControl control, Configuration config, BatchingRelationshipTypeTokenRepository typeTokenRepository ) @@ -81,7 +79,7 @@ protected void done() localTypes.forEach( (type,localCount) -> mergedTypes.computeIfAbsent( type, t -> new MutableLong() ).add( localCount.longValue() ) ) ); - sortedTypes = mergedTypes.entrySet().toArray( new Map.Entry[mergedTypes.size()] ); + Map.Entry[] sortedTypes = mergedTypes.entrySet().toArray( new Map.Entry[mergedTypes.size()] ); if ( sortedTypes.length > 0 ) { Comparator> comparator = sortedTypes[0].getKey() instanceof Integer ? @@ -101,32 +99,12 @@ protected void done() { typeTokenRepository.getOrCreateId( sortedTypes[i].getKey() ); } + distribution = new RelationshipTypeDistribution( sortedTypes ); super.done(); } - /** - * Returns relationship types which have number of relationships equal to or lower than the given threshold. - * - * @param belowOrEqualToThreshold threshold where relationship types which have this amount of relationships - * or less will be returned. - * @return the order of which to order {@link InputRelationship} when importing relationships. - * The order in which these relationships are returned will be the reverse order of relationship type ids. - * There are two modes of relationship types here, one is user defined String where this step - * have full control of assigning ids to those and will do so based on size of types. The other mode - * is where types are given as ids straight away (as Integer) where the order is already set and so - * the types will not be sorted by size (which is simply an optimization anyway). - */ - public Object[] getRelationshipTypes( long belowOrEqualToThreshold ) + public RelationshipTypeDistribution getDistribution() { - List result = new ArrayList<>(); - for ( Map.Entry candidate : sortedTypes ) - { - if ( candidate.getValue().longValue() <= belowOrEqualToThreshold ) - { - result.add( candidate.getKey() ); - } - } - - return result.toArray(); + return distribution; } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java new file mode 100644 index 0000000000000..0fcaad65ad9c2 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.apache.commons.lang3.mutable.MutableLong; + +import java.util.Iterator; +import java.util.Map; +import org.neo4j.helpers.collection.Iterators; + +/** + * Keeps data about how relationships are distributed between different types. + */ +public class RelationshipTypeDistribution implements Iterable> +{ + private final Map.Entry[] sortedTypes; + + public RelationshipTypeDistribution( Map.Entry[] sortedTypes ) + { + this.sortedTypes = sortedTypes; + } + + @Override + public Iterator> iterator() + { + return Iterators.iterator( sortedTypes ); + } + + public int getNumberOfRelationshipTypes() + { + return sortedTypes.length; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java index 2c515aacbc622..e57503f86db54 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java @@ -25,7 +25,7 @@ abstract class BaseNumberArray> implements NumberArray { protected final int itemSize; - private final long base; + protected final long base; /** * @param itemSize byte size of each item in this array. diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java index bbb455cb66f1a..510090e5edd60 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java @@ -19,12 +19,23 @@ */ package org.neo4j.unsafe.impl.batchimport.cache; +import org.apache.commons.lang3.mutable.MutableLong; + import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.neo4j.graphdb.Direction; +import org.neo4j.helpers.collection.PrefetchingIterator; +import org.neo4j.kernel.impl.util.IoPrimitiveUtils; +import static java.lang.Long.min; import static java.lang.Math.toIntExact; /** @@ -47,8 +58,8 @@ * so in order to make further changes to N, if another thread accesses N the semantics will no longer hold. * * Since multiple threads are making changes external memory synchronization is also required in between - * a phase of making changes using {@link #getAndPutRelationship(long, Direction, long, boolean)} and e.g - * {@link #visitChangedNodes(NodeChangeVisitor, boolean)}. + * a phase of making changes using {@link #getAndPutRelationship(long, int, Direction, long, boolean)} and e.g + * {@link #visitChangedNodes(NodeChangeVisitor, int)}. */ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable { @@ -75,6 +86,9 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable private static final int BIG_COUNT_MASK = 0x20000000; private static final int COUNT_FLAGS_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK | BIG_COUNT_MASK; private static final int COUNT_MASK = ~COUNT_FLAGS_MASKS; + private static final int TYPE_SIZE = 2; + public static final int GROUP_ENTRY_SIZE = TYPE_SIZE + ID_SIZE/*next*/ + + ID_AND_COUNT_SIZE * Direction.values().length; private ByteArray array; private byte[] chunkChangedArray; @@ -124,12 +138,12 @@ public long incrementCount( long nodeId ) /** * Should only be used by tests */ - void setCount( long nodeId, long count, Direction direction ) + void setCount( long nodeId, long count, int typeId, Direction direction ) { if ( isDense( nodeId ) ) { long relGroupId = all48Bits( array, nodeId, SPARSE_ID_OFFSET ); - relGroupCache.getAndSetCount( relGroupId, direction, count ); + relGroupCache.setCount( relGroupId, typeId, direction, count ); } else { @@ -166,23 +180,23 @@ void setCount( long nodeId, long count, Direction direction ) * so the bigCounts array is shared between all different types of counts, because big counts are so rare * * @param array {@link ByteArray} to set count in - * @param nodeId node id, i.e. array index + * @param index node id, i.e. array index * @param offset offset on that array index (a ByteArray feature) * @param count count to set at this position */ - private void setCount( ByteArray array, long nodeId, int offset, long count ) + private void setCount( ByteArray array, long index, int offset, long count ) { - assertValidCount( nodeId, count ); + assertValidCount( index, count ); if ( count > MAX_SMALL_COUNT ) { - int rawCount = array.getInt( nodeId, offset ); + int rawCount = array.getInt( index, offset ); int slot; if ( rawCount == -1 || !isBigCount( rawCount ) ) { // Allocate a slot in the bigCounts array slot = bigCountsCursor.getAndIncrement(); - array.setInt( nodeId, offset, BIG_COUNT_MASK | slot ); + array.setInt( index, offset, BIG_COUNT_MASK | slot ); } else { @@ -192,7 +206,7 @@ private void setCount( ByteArray array, long nodeId, int offset, long count ) } else { // We can simply set it - array.setInt( nodeId, offset, toIntExact( count ) ); + array.setInt( index, offset, toIntExact( count ) ); } } @@ -257,11 +271,11 @@ private static int countValue( int rawCount ) return rawCount & COUNT_MASK; } - private long incrementCount( ByteArray array, long nodeId, int offset ) + private long incrementCount( ByteArray array, long index, int offset ) { - array = array.at( nodeId ); - long count = getCount( array, nodeId, offset ) + 1; - setCount( array, nodeId, offset, count ); + array = array.at( index ); + long count = getCount( array, index, offset ) + 1; + setCount( array, index, offset, count ); return count; } @@ -291,12 +305,13 @@ private boolean isDense( ByteArray array, long nodeId ) * the {@code direction}. * * @param nodeId node to update relationship head for. + * @param typeId relationship type id. * @param direction {@link Direction} this node represents for this relationship. * @param firstRelId the relationship id which is now the head of this chain. * @param incrementCount as side-effect also increment count for this chain. * @return the previous head of the updated relationship chain. */ - public long getAndPutRelationship( long nodeId, Direction direction, long firstRelId, + public long getAndPutRelationship( long nodeId, int typeId, Direction direction, long firstRelId, boolean incrementCount ) { if ( firstRelId > MAX_RELATIONSHIP_ID ) @@ -320,11 +335,10 @@ public long getAndPutRelationship( long nodeId, Direction direction, long firstR { if ( existingId == EMPTY ) { - existingId = relGroupCache.allocate(); + existingId = relGroupCache.allocate( typeId ); setRelationshipId( array, nodeId, existingId ); - wasChanged = false; // no need to clear when we just allocated it } - return relGroupCache.putRelationship( existingId, direction, firstRelId, incrementCount, wasChanged ); + return relGroupCache.getAndPutRelationship( existingId, typeId, direction, firstRelId, incrementCount ); } // Don't increment count for sparse node since that has already been done in a previous pass @@ -350,6 +364,19 @@ private void markChunkAsChanged( long nodeId, boolean dense ) } } + long calculateNumberOfDenseNodes() + { + long count = 0; + for ( long i = 0; i < highNodeId; i++ ) + { + if ( isDense( i ) ) + { + count++; + } + } + return count; + } + private int chunkOf( long nodeId ) { return toIntExact( nodeId / chunkSize ); @@ -373,7 +400,7 @@ private boolean markAsChanged( ByteArray array, long nodeId, int mask ) return changeBitWasFlipped; } - private static boolean nodeIsChanged( ByteArray array, long nodeId, long mask ) + private boolean nodeIsChanged( ByteArray array, long nodeId, long mask ) { int bits = array.getInt( nodeId, SPARSE_COUNT_OFFSET ); @@ -387,10 +414,11 @@ private static boolean nodeIsChanged( ByteArray array, long nodeId, long mask ) { return false; } - return (bits & mask) != 0; + boolean changeBitIsSet = (bits & mask) != 0; + return changeBitIsSet == forward; } - private void setRelationshipId( ByteArray array, long nodeId, long firstRelId ) + private static void setRelationshipId( ByteArray array, long nodeId, long firstRelId ) { array.set6ByteLong( nodeId, SPARSE_ID_OFFSET, firstRelId ); } @@ -436,7 +464,7 @@ public long getFirstRel( long nodeId, GroupVisitor visitor ) /** * First a note about tracking which nodes have been updated with new relationships by calls to - * {@link #getAndPutRelationship(long, Direction, long, boolean)}: + * {@link #getAndPutRelationship(long, int, Direction, long, boolean)}: * * We use two high bits of the count field in the "main" array to mark whether or not a change * have been made to a node. One bit for a sparse node and one for a dense. Sparse and dense nodes @@ -451,9 +479,34 @@ public long getFirstRel( long nodeId, GroupVisitor visitor ) * * @param forward {@code true} if going forward and having change marked as a set bit, otherwise * change is marked with an unset bit. + * @param denseNodes whether or not this is about dense nodes. If so then some additional cache + * preparation work needs to be done. */ - public void setForwardScan( boolean forward ) + public void setForwardScan( boolean forward, boolean denseNodes ) { + if ( this.forward == forward ) + { + return; + } + + // There's some additional preparations to do for dense nodes between each pass, + // this is because that piece of memory is reused. + if ( denseNodes ) + { + if ( forward ) + { + // Clear relationship group cache and references to it + visitChangedNodes( ( nodeId, array ) -> setRelationshipId( array, nodeId, EMPTY ), + NodeType.NODE_TYPE_DENSE ); + clearChangedChunks( true ); + relGroupCache.clear(); + } + else + { + // Keep the relationship group cache entries, but clear all relationship chain heads + relGroupCache.clearRelationshipIds(); + } + } this.forward = forward; } @@ -466,16 +519,18 @@ public void setForwardScan( boolean forward ) * can be used for the next type import. * * @param nodeId node to get count for. + * @param typeId relationship type id to get count for. * @param direction {@link Direction} to get count for. * @return count (degree) of the requested relationship chain. */ - public long getCount( long nodeId, Direction direction ) + public long getCount( long nodeId, int typeId, Direction direction ) { ByteArray array = this.array.at( nodeId ); - if ( isDense( array, nodeId ) ) + boolean dense = isDense( array, nodeId ); + if ( dense ) { // Indirection into rel group cache long id = getRelationshipId( array, nodeId ); - return id == EMPTY ? 0 : relGroupCache.getAndSetCount( id, direction, 0 ); + return id == EMPTY ? 0 : relGroupCache.getAndResetCount( id, typeId, direction ); } return getCount( array, nodeId, SPARSE_COUNT_OFFSET ); @@ -488,62 +543,85 @@ public interface GroupVisitor * Type can be decided on the outside since there'll be only one type per node. * * @param nodeId node id. - * @param next next relationship group. + * @param typeId relationship type id. * @param out first outgoing relationship id. * @param in first incoming relationship id. * @param loop first loop relationship id. * @return the created relationship group id. */ - long visit( long nodeId, long next, long out, long in, long loop ); + long visit( long nodeId, int typeId, long out, long in, long loop ); } - public static final GroupVisitor NO_GROUP_VISITOR = (nodeId, next, out, in, loop) -> -1; + public static final GroupVisitor NO_GROUP_VISITOR = (nodeId, typeId, out, in, loop) -> -1; private class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable { - private static final int NEXT_OFFSET = 0; - private static final int BASE_IDS_OFFSET = ID_SIZE; + private static final int TYPE_OFFSET = 0; + private static final int NEXT_OFFSET = TYPE_SIZE; + private static final int BASE_IDS_OFFSET = NEXT_OFFSET + ID_SIZE; // Used for testing high id values. Should always be zero in production + private final byte[] DEFAULT_VALUE = minusOneBytes( GROUP_ENTRY_SIZE ); + private final long chunkSize; private final long base; private final ByteArray array; private final AtomicLong nextFreeId; RelGroupCache( NumberArrayFactory arrayFactory, long chunkSize, long base ) { + this.chunkSize = chunkSize; this.base = base; assert chunkSize > 0; - this.array = arrayFactory.newDynamicByteArray( chunkSize, - minusOneBytes( ID_SIZE/*next*/ + (ID_AND_COUNT_SIZE) * Direction.values().length ) ); + this.array = arrayFactory.newDynamicByteArray( chunkSize, DEFAULT_VALUE ); this.nextFreeId = new AtomicLong( base ); } - private void clearRelationships( ByteArray array, long relGroupId ) + private void clearIndex( ByteArray array, long relGroupId ) { - array.set6ByteLong( relGroupId, directionOffset( Direction.OUTGOING ), EMPTY ); - array.set6ByteLong( relGroupId, directionOffset( Direction.INCOMING ), EMPTY ); - array.set6ByteLong( relGroupId, directionOffset( Direction.BOTH ), EMPTY ); + array.set( relGroupId, DEFAULT_VALUE ); } - /** - * For dense nodes we reset count after reading because we only ever need - * that value once and the piece of memory holding this value will be reused for another - * relationship chain for this node after this point in time, where the count should - * restart from 0. - */ - long getAndSetCount( long id, Direction direction, long newCount ) + long getAndResetCount( long relGroupIndex, int typeId, Direction direction ) + { + long index = rebase( relGroupIndex ); + while ( index != EMPTY ) + { + ByteArray array = this.array.at( index ); + if ( getTypeId( array, index ) == typeId ) + { + int offset = countOffset( direction ); + long count = NodeRelationshipCache.this.getCount( array, index, offset ); + NodeRelationshipCache.this.setCount( array, index, offset, 0 ); + return count; + } + index = getNext( array, index ); + } + return 0; + } + + void setCount( long relGroupIndex, int typeId, Direction direction, long count ) { - id = rebase( id ); - ByteArray array = this.array.at( id ); - if ( id == EMPTY ) + long index = rebase( relGroupIndex ); + while ( index != EMPTY ) { - return 0; + ByteArray array = this.array.at( index ); + if ( getTypeId( array, index ) == typeId ) + { + NodeRelationshipCache.this.setCount( array, index, countOffset( direction ), count ); + break; + } + index = getNext( array, index ); } + } - int offset = countOffset( direction ); - long count = getCount( array, id, offset ); - setCount( array, id, offset, newCount ); - return count; + long getNext( ByteArray array, long index ) + { + return all48Bits( array, index, NEXT_OFFSET ); + } + + int getTypeId( ByteArray array, long index ) + { + return IoPrimitiveUtils.shortToUnsignedInt( array.getShort( index, TYPE_OFFSET ) ); } /** @@ -561,60 +639,94 @@ private long nextFreeId() private long visitGroup( long nodeId, long relGroupIndex, GroupVisitor visitor ) { - long index = rebase( relGroupIndex ); - ByteArray array = this.array.at( index ); - long out = all48Bits( array, index, directionOffset( Direction.OUTGOING ) ); - long in = all48Bits( array, index, directionOffset( Direction.INCOMING ) ); - long loop = all48Bits( array, index, directionOffset( Direction.BOTH ) ); - long next = all48Bits( array, index, NEXT_OFFSET ); - long nextId = out == EMPTY && in == EMPTY && loop == EMPTY ? EMPTY : - visitor.visit( nodeId, next, out, in, loop ); - - // Save the returned next id for later, when the next group for this node is created - // then we know what to point this group's next to. - array.set6ByteLong( index, NEXT_OFFSET, nextId ); - return nextId; + long currentIndex = rebase( relGroupIndex ); + long first = EMPTY; + while ( currentIndex != EMPTY ) + { + ByteArray array = this.array.at( currentIndex ); + long out = all48Bits( array, currentIndex, idOffset( Direction.OUTGOING ) ); + int typeId = getTypeId( array, currentIndex ); + long in = all48Bits( array, currentIndex, idOffset( Direction.INCOMING ) ); + long loop = all48Bits( array, currentIndex, idOffset( Direction.BOTH ) ); + long next = getNext( array, currentIndex ); + long nextId = out == EMPTY && in == EMPTY && loop == EMPTY + ? EMPTY + : visitor.visit( nodeId, typeId, out, in, loop ); + if ( first == EMPTY ) + { // This is the one we return + first = nextId; + } + currentIndex = next; + } + return first; } - private int directionOffset( Direction direction ) + private int idOffset( Direction direction ) { return BASE_IDS_OFFSET + (direction.ordinal() * ID_AND_COUNT_SIZE); } private int countOffset( Direction direction ) { - return directionOffset( direction ) + ID_SIZE; + return idOffset( direction ) + ID_SIZE; } - long allocate() + long allocate( int typeId ) { - return nextFreeId(); + long index = nextFreeId(); + long rebasedIndex = rebase( index ); + ByteArray array = this.array.at( rebasedIndex ); + clearIndex( array, rebasedIndex ); + short shortTypeId = IoPrimitiveUtils.safeCastIntToUnsignedShort( typeId ); + array.setShort( rebasedIndex, TYPE_OFFSET, shortTypeId ); + return index; } - long putRelationship( long relGroupIndex, Direction direction, - long relId, boolean increment, boolean clear ) + private long getAndPutRelationship( long relGroupIndex, int typeId, Direction direction, + long relId, boolean incrementCount ) { long index = rebase( relGroupIndex ); + index = findOrAllocateIndex( index, typeId ); ByteArray array = this.array.at( index ); - int directionOffset = directionOffset( direction ); - long previousId; - if ( clear ) - { - clearRelationships( array, index ); - previousId = EMPTY; - } - else - { - previousId = all48Bits( array, index, directionOffset ); - } + int directionOffset = idOffset( direction ); + long previousId = all48Bits( array, index, directionOffset ); array.set6ByteLong( index, directionOffset, relId ); - if ( increment ) + if ( incrementCount ) { incrementCount( array, index, countOffset( direction ) ); } return previousId; } + private void clearRelationshipIds( ByteArray array, long index ) + { + array.set6ByteLong( index, idOffset( Direction.OUTGOING ), EMPTY ); + array.set6ByteLong( index, idOffset( Direction.INCOMING ), EMPTY ); + array.set6ByteLong( index, idOffset( Direction.BOTH ), EMPTY ); + } + + private long findOrAllocateIndex( long index, int typeId ) + { + long lastIndex = index; + ByteArray array = this.array.at( index ); + while ( index != EMPTY ) + { + lastIndex = index; + array = this.array.at( index ); + int candidateTypeId = getTypeId( array, index ); + if ( candidateTypeId == typeId ) + { + return index; + } + index = getNext( array, index ); + } + + // No such found, create at the end + long newIndex = allocate( typeId ); + array.set6ByteLong( lastIndex, NEXT_OFFSET, newIndex ); + return newIndex; + } + @Override public void close() { @@ -629,6 +741,24 @@ public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor ) { nullSafeMemoryStatsVisitor( array, visitor ); } + + public void clear() + { + nextFreeId.set( base ); + } + + public void clearRelationshipIds() + { + long highId = rebase( nextFreeId.get() ); + for ( long i = 0; i < highId; ) + { + ByteArray chunk = array.at( i ); + for ( int j = 0; j < chunkSize && i < highId; j++, i++ ) + { + clearRelationshipIds( chunk, i ); + } + } + } } @Override @@ -677,19 +807,23 @@ public interface NodeChangeVisitor /** * Efficiently visits changed nodes, e.g. nodes that have had any relationship chain updated by - * {@link #getAndPutRelationship(long, Direction, long, boolean)}. + * {@link #getAndPutRelationship(long, int, Direction, long, boolean)}. * * @param visitor {@link NodeChangeVisitor} which will be notified about all changes. - * @param denseNodes {@code true} for visiting changed dense nodes, {@code false} for visiting - * changed sparse nodes. + * @param nodeTypes which types to visit (dense/sparse). */ - public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes ) + public void visitChangedNodes( NodeChangeVisitor visitor, int nodeTypes ) { - long mask = changeMask( denseNodes ); - byte chunkMask = chunkChangeMask( denseNodes ); + long denseMask = changeMask( true ); + long sparseMask = changeMask( false ); + byte denseChunkMask = chunkChangeMask( true ); + byte sparseChunkMask = chunkChangeMask( false ); for ( long nodeId = 0; nodeId < highNodeId; ) { - if ( !chunkHasChange( nodeId, chunkMask ) ) + boolean chunkHasChanged = + (NodeType.isDense( nodeTypes ) && chunkHasChange( nodeId, denseChunkMask )) || + (NodeType.isSparse( nodeTypes ) && chunkHasChange( nodeId, sparseChunkMask )); + if ( !chunkHasChanged ) { nodeId += chunkSize; continue; @@ -698,7 +832,11 @@ public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes ) ByteArray chunk = array.at( nodeId ); for ( int i = 0; i < chunkSize && nodeId < highNodeId; i++, nodeId++ ) { - if ( isDense( chunk, nodeId ) == denseNodes && nodeIsChanged( chunk, nodeId, mask ) ) + boolean nodeHasChanged = + (NodeType.isDense( nodeTypes ) && nodeIsChanged( chunk, nodeId, denseMask )) || + (NodeType.isSparse( nodeTypes ) && nodeIsChanged( chunk, nodeId, sparseMask )); + + if ( nodeHasChanged && NodeType.matchesDense( nodeTypes, isDense( array, nodeId ) ) ) { visitor.change( nodeId, chunk ); } @@ -711,7 +849,7 @@ public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes ) * * @param denseNodes {@code true} for clearing marked dense nodes, {@code false} for clearing marked sparse nodes. */ - public void clearChangedChunks( boolean denseNodes ) + private void clearChangedChunks( boolean denseNodes ) { // Executed by a single thread, so no synchronized required byte chunkMask = chunkChangeMask( denseNodes ); @@ -726,4 +864,48 @@ private boolean chunkHasChange( long nodeId, byte chunkMask ) int chunkId = chunkOf( nodeId ); return (chunkChangedArray[chunkId] & chunkMask) != 0; } + + public Iterator> splitRelationshipTypesIntoRounds( + Iterator> allTypes, long freeMemoryForDenseNodeCache ) + { + PrefetchingIterator> peekableAllTypes = + new PrefetchingIterator>() + { + @Override + protected Entry fetchNextOrNull() + { + return allTypes.hasNext() ? allTypes.next() : null; + } + }; + + long numberOfDenseNodes = calculateNumberOfDenseNodes(); + return new PrefetchingIterator>() + { + @Override + protected Collection fetchNextOrNull() + { + long currentSetOfRelationshipsMemoryUsage = 0; + Set typesToImportThisRound = new HashSet<>(); + while ( peekableAllTypes.hasNext() ) + { + // Calculate worst-case scenario + Map.Entry type = peekableAllTypes.peek(); + long relationshipCountForThisType = type.getValue().longValue(); + long maxDenseNodesForThisType = min( numberOfDenseNodes, relationshipCountForThisType * 2/*nodes/rel*/ ); + long memoryUsageForThisType = maxDenseNodesForThisType * NodeRelationshipCache.GROUP_ENTRY_SIZE; + if ( currentSetOfRelationshipsMemoryUsage + memoryUsageForThisType > freeMemoryForDenseNodeCache && + currentSetOfRelationshipsMemoryUsage > 0 ) + { + // OK we've filled what we can into the cache, or worst-case at least + // it's now time to import these types + break; + } + peekableAllTypes.next(); + typesToImportThisRound.add( type.getKey() ); + currentSetOfRelationshipsMemoryUsage += memoryUsageForThisType; + } + return typesToImportThisRound.isEmpty() ? null : typesToImportThisRound; + } + }; + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java new file mode 100644 index 0000000000000..a80a18b7863c4 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.cache; + +/** + * Convenient way of selecting nodes based on their dense status. + */ +public class NodeType +{ + public static final int NODE_TYPE_DENSE = 0x1; + public static final int NODE_TYPE_SPARSE = 0x2; + public static final int NODE_TYPE_ALL = NODE_TYPE_DENSE | NODE_TYPE_SPARSE; + + public static boolean isDense( int nodeTypes ) + { + return has( nodeTypes, NODE_TYPE_DENSE ); + } + + public static boolean isSparse( int nodeTypes ) + { + return has( nodeTypes, NODE_TYPE_SPARSE ); + } + + private static boolean has( int nodeTypes, int mask ) + { + return (nodeTypes & mask) != 0; + } + + public static boolean matchesDense( int nodeTypes, boolean isDense ) + { + int mask = isDense ? NODE_TYPE_DENSE : NODE_TYPE_SPARSE; + return has( nodeTypes, mask ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java deleted file mode 100644 index 580ffe984f5f1..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.input; - -import java.io.IOException; -import java.util.function.Predicate; -import java.util.function.ToIntFunction; - -import org.neo4j.helpers.collection.PrefetchingIterator; -import org.neo4j.unsafe.impl.batchimport.InputIterator; - -import static java.lang.Integer.max; - -/** - * Takes an {@link InputIterator} and splits up {@link InputRelationship relationships} by type. - * Uses {@link InputCache} to populate (all except the first type) on first pass, then reading from the - * cached relationships per type for all the other types. - */ -public class PerTypeRelationshipSplitter extends PrefetchingIterator> -{ - private static final String MINORITY_TYPE = "minority"; - private final Object[] allRelationshipTypes; - private final InputIterator actual; - private final Predicate minorityRelationshipTypes; - private final ToIntFunction typeToId; - private final InputCache inputCache; - - private int typeCursor; - private boolean allMinority; - - public PerTypeRelationshipSplitter( InputIterator actual, Object[] allRelationshipTypes, - Predicate minorityRelationshipTypes, ToIntFunction typeToId, InputCache inputCache ) - { - this.actual = actual; - this.allRelationshipTypes = allRelationshipTypes; - this.minorityRelationshipTypes = minorityRelationshipTypes; - this.typeToId = typeToId; - this.inputCache = inputCache; - } - - @Override - protected InputIterator fetchNextOrNull() - { - while ( typeCursor < allRelationshipTypes.length ) - { - Object type = allRelationshipTypes[typeCursor++]; - if ( typeCursor == 1 ) - { - if ( minorityRelationshipTypes.test( type ) ) - { - allMinority = true; - return null; - } - - // This is the first relationship type. If we're lucky and this is a new import - // then this type will also represent the type the most relationship are of. - // We'll basically return the actual iterator, but with a filter to only return - // this type. The other relationships will be cached by type. - return new FilteringAndPerTypeCachingInputIterator( actual, type ); - } - - // This isn't the first relationship type. The first pass cached relationships - // per type on disk into InputCache. Simply get the correct one and return. - if ( !minorityRelationshipTypes.test( type ) ) - { - return inputCache.relationships( cacheSubType( type ), true/*delete after use*/ ).iterator(); - } - } - return null; - } - - String cacheSubType( Object type ) - { - return String.valueOf( typeToId.applyAsInt( type ) ); - } - - /** - * @return the type currently being iterated over, e.g. the type that the {@link InputIterator} returned - * from the most recent call to iterates over. - */ - public Object currentType() - { - return allRelationshipTypes[typeCursor-1]; - } - - int highestTypeId() - { - int highest = 0; - for( Object type : allRelationshipTypes ) - { - highest = max( highest, typeToId.applyAsInt( type ) ); - } - return highest; - } - - public InputIterator getMinorityRelationships() - { - return allMinority ? actual : inputCache.relationships( MINORITY_TYPE, true ).iterator(); - } - - public class FilteringAndPerTypeCachingInputIterator extends InputIterator.Delegate - { - private final Object currentType; - // index into this array is actual typeId, which may be 0 - 2^16-1. - private final Receiver[] receivers; - private final Receiver minorityReceiver; - private final InputRelationship[] transport = new InputRelationship[1]; - - @SuppressWarnings( "unchecked" ) - public FilteringAndPerTypeCachingInputIterator( InputIterator actual, Object currentType ) - { - super( actual ); - this.currentType = currentType; - this.receivers = new Receiver[highestTypeId()+1]; - - try - { - for ( Object type : allRelationshipTypes ) - { - if ( type.equals( currentType ) ) - { - // We're iterating over this type, let's not cache it. Also accounted for in the - // receivers array above, which is 1 less than number of types in total. - continue; - } - - // Collect all types that are consider a minority into one set of relationships - if ( !minorityRelationshipTypes.test( type ) ) - { - int typeId = typeToId.applyAsInt( type ); - receivers[typeId] = inputCache.cacheRelationships( cacheSubType( type ) ); - } - } - minorityReceiver = inputCache.cacheRelationships( MINORITY_TYPE ); - } - catch ( IOException e ) - { - throw new InputException( "Error creating a cacher", e ); - } - } - - @Override - protected InputRelationship fetchNextOrNull() - { - while ( true ) - { - InputRelationship candidate = super.fetchNextOrNull(); - if ( candidate == null ) - { - // No more relationships - return null; - } - - Object type = candidate.typeAsObject(); - if ( type.equals( currentType ) ) - { - // This is a relationship of the requested type - return candidate; - } - - // This is a relationships of a different type, cache it - try - { - Receiver receiver = minorityRelationshipTypes.test( type ) ? - minorityReceiver : receivers[typeToId.applyAsInt( type )]; - transport[0] = candidate; - receiver.receive( transport ); - } - catch ( IOException e ) - { - throw new InputException( "Error caching relationship " + candidate, e ); - } - } - } - - @Override - public void close() - { - try - { - for ( Receiver receiver : receivers ) - { - if ( receiver != null ) - { - receiver.close(); - } - } - minorityReceiver.close(); - } - catch ( IOException e ) - { - throw new InputException( "Error closing cacher", e ); - } - - // This will delegate to the actual iterator and so close the external input iterator - super.close(); - } - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java index d0aa8200e9574..5661c9d9134c5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java @@ -22,6 +22,7 @@ import java.lang.reflect.Array; import java.util.Arrays; import java.util.Iterator; +import java.util.function.Predicate; import org.neo4j.unsafe.impl.batchimport.Configuration; @@ -33,29 +34,36 @@ public abstract class IteratorBatcherStep extends IoProducerStep private final Iterator data; private final Class itemClass; protected long cursor; + private final Predicate filter; - public IteratorBatcherStep( StageControl control, Configuration config, Iterator data, Class itemClass ) + public IteratorBatcherStep( StageControl control, Configuration config, Iterator data, Class itemClass, + Predicate filter ) { super( control, config ); this.data = data; this.itemClass = itemClass; + this.filter = filter; } @Override protected Object nextBatchOrNull( long ticket, int batchSize ) { - if ( !data.hasNext() ) - { - return null; - } - @SuppressWarnings( "unchecked" ) T[] batch = (T[]) Array.newInstance( itemClass, batchSize ); int i = 0; - for ( ; i < batchSize && data.hasNext(); i++ ) + for ( ; i < batchSize && data.hasNext(); ) + { + T candidate = data.next(); + if ( filter.test( candidate ) ) + { + batch[i++] = candidate; + cursor++; + } + } + + if ( i == 0 ) { - batch[i] = data.next(); - cursor++; + return null; // marks the end } return i == batchSize ? batch : Arrays.copyOf( batch, i ); } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java index 9ffaa03de8a98..08db644fccbad 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java @@ -26,6 +26,7 @@ import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors; @@ -41,7 +42,7 @@ public void reservedIdIsSkipped() throws Exception long highId = 5; RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId ); RelationshipLinkbackStage stage = new RelationshipLinkbackStage( "Test", - Configuration.DEFAULT, store, newCache(), 0, highId, false ); + Configuration.DEFAULT, store, newCache(), 0, highId, NodeType.NODE_TYPE_SPARSE ); ExecutionSupervisors.superviseExecution( ExecutionMonitors.invisible(), Configuration.DEFAULT, stage ); diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java index 48f0594b15a3e..7078fea7606fd 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java @@ -19,13 +19,16 @@ */ package org.neo4j.unsafe.impl.batchimport; +import org.apache.commons.lang3.mutable.MutableLong; import org.junit.Rule; import org.junit.Test; import org.mockito.InOrder; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.TreeSet; import org.neo4j.kernel.impl.store.record.RelationshipRecord; @@ -39,6 +42,8 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; + +import static org.neo4j.helpers.collection.Iterables.reverse; import static org.neo4j.helpers.collection.Iterators.loop; import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; import static org.neo4j.unsafe.impl.batchimport.input.Group.GLOBAL; @@ -75,12 +80,10 @@ private void shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( boole step.done(); // THEN - Object[] processed = step.getRelationshipTypes( 100 ); - InOrder inOrder = inOrder( repository ); - for ( Object type : reversed( processed ) ) + for ( Entry type : reverse( step.getDistribution() ) ) { - inOrder.verify( repository ).getOrCreateId( type ); + inOrder.verify( repository ).getOrCreateId( type.getKey() ); } inOrder.verifyNoMoreInteractions(); } @@ -101,11 +104,11 @@ public void shouldReturnRelationshipTypesInDescendingOrder() throws Throwable // THEN TreeSet expected = idsOf( relationships ); - Object[] processed = step.getRelationshipTypes( 100 ); - int i = 0; + Iterator> processed = step.getDistribution().iterator(); for ( Object expectedType : loop( expected.descendingIterator() ) ) { - assertEquals( expectedType, processed[i++] ); + Entry entry = processed.next(); + assertEquals( expectedType, entry.getKey() ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java index 2838dfbd7cc09..e7048e23782a0 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java @@ -19,6 +19,7 @@ */ package org.neo4j.unsafe.impl.batchimport.cache; +import org.apache.commons.lang3.mutable.MutableLong; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -28,20 +29,28 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongSet; import org.neo4j.graphdb.Direction; +import org.neo4j.helpers.collection.Iterators; +import org.neo4j.helpers.collection.Pair; import org.neo4j.test.rule.RandomRule; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor; import static java.lang.Math.max; +import static java.lang.Math.toIntExact; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -126,30 +135,31 @@ public void shouldObserveFirstRelationshipAsEmptyInEachDirection() throws Except // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base ); int nodes = 100; + int typeId = 5; Direction[] directions = Direction.values(); GroupVisitor groupVisitor = mock( GroupVisitor.class ); - cache.setForwardScan( true ); + cache.setForwardScan( true, true ); cache.setHighNodeId( nodes+1 ); for ( int i = 0; i < nodes; i++ ) { assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) ); cache.incrementCount( i ); - long previous = cache.getAndPutRelationship( i, directions[i % directions.length], + long previous = cache.getAndPutRelationship( i, typeId, directions[i % directions.length], random.nextInt( 1_000_000 ), true ); assertEquals( -1L, previous ); } // WHEN - cache.setForwardScan( false ); + cache.setForwardScan( false, true ); for ( int i = 0; i < nodes; i++ ) { - long previous = cache.getAndPutRelationship( i, directions[i % directions.length], + long previous = cache.getAndPutRelationship( i, typeId, directions[i % directions.length], random.nextInt( 1_000_000 ), false ); assertEquals( -1L, previous ); } // THEN - cache.setForwardScan( true ); + cache.setForwardScan( true, true ); for ( int i = 0; i < nodes; i++ ) { assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) ); @@ -162,19 +172,20 @@ public void shouldResetCountAfterGetOnDenseNodes() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base ); long nodeId = 0; + int typeId = 3; cache.setHighNodeId( 1 ); cache.incrementCount( nodeId ); cache.incrementCount( nodeId ); - cache.getAndPutRelationship( nodeId, OUTGOING, 10, true ); - cache.getAndPutRelationship( nodeId, OUTGOING, 12, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 10, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 12, true ); assertTrue( cache.isDense( nodeId ) ); // WHEN - long count = cache.getCount( nodeId, OUTGOING ); + long count = cache.getCount( nodeId, typeId, OUTGOING ); assertEquals( 2, count ); // THEN - assertEquals( 0, cache.getCount( nodeId, OUTGOING ) ); + assertEquals( 0, cache.getCount( nodeId, typeId, OUTGOING ) ); } @Test @@ -185,10 +196,11 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception // WHEN long nodeId = 1_000_000 - 1; - cache.setHighNodeId( nodeId+1 ); + int typeId = 10; + cache.setHighNodeId( nodeId + 1 ); Direction direction = Direction.OUTGOING; long relId = 10; - cache.getAndPutRelationship( nodeId, direction, relId, false ); + cache.getAndPutRelationship( nodeId, typeId, direction, relId, false ); // THEN assertEquals( relId, cache.getFirstRel( nodeId, mock( GroupVisitor.class ) ) ); @@ -198,6 +210,7 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception public void shouldPutRandomStuff() throws Exception { // GIVEN + int typeId = 10; int nodes = 10_000; PrimitiveLongObjectMap key = Primitive.longObjectMap( nodes ); cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 1000, base ); @@ -219,7 +232,7 @@ public void shouldPutRandomStuff() throws Exception boolean dense = cache.isDense( nodeId ); Direction direction = random.among( Direction.values() ); long relationshipId = random.nextLong( 1_000_000 ); - long previousHead = cache.getAndPutRelationship( nodeId, direction, relationshipId, false ); + long previousHead = cache.getAndPutRelationship( nodeId, typeId, direction, relationshipId, false ); long[] keyIds = key.get( nodeId ); int keyIndex = dense ? direction.ordinal() : 0; if ( keyIds == null ) @@ -239,30 +252,32 @@ public void shouldPut6ByteRelationshipIds() throws Exception long sparseNode = 0; long denseNode = 1; long relationshipId = (1L << 48) - 2; + int typeId = 10; cache.setHighNodeId( 2 ); cache.incrementCount( denseNode ); // WHEN - assertEquals( -1L, cache.getAndPutRelationship( sparseNode, OUTGOING, relationshipId, false ) ); - assertEquals( -1L, cache.getAndPutRelationship( denseNode, OUTGOING, relationshipId, false ) ); + assertEquals( -1L, cache.getAndPutRelationship( sparseNode, typeId, OUTGOING, relationshipId, false ) ); + assertEquals( -1L, cache.getAndPutRelationship( denseNode, typeId, OUTGOING, relationshipId, false ) ); // THEN - assertEquals( relationshipId, cache.getAndPutRelationship( sparseNode, OUTGOING, 1, false ) ); - assertEquals( relationshipId, cache.getAndPutRelationship( denseNode, OUTGOING, 1, false ) ); + assertEquals( relationshipId, cache.getAndPutRelationship( sparseNode, typeId, OUTGOING, 1, false ) ); + assertEquals( relationshipId, cache.getAndPutRelationship( denseNode, typeId, OUTGOING, 1, false ) ); } @Test public void shouldFailFastIfTooBigRelationshipId() throws Exception { // GIVEN + int typeId = 10; cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); cache.setHighNodeId( 1 ); // WHEN - cache.getAndPutRelationship( 0, OUTGOING, (1L << 48) - 2, false ); + cache.getAndPutRelationship( 0, typeId, OUTGOING, (1L << 48) - 2, false ); try { - cache.getAndPutRelationship( 0, OUTGOING, (1L << 48) - 1, false ); + cache.getAndPutRelationship( 0, typeId, OUTGOING, (1L << 48) - 1, false ); fail( "Should fail" ); } catch ( IllegalArgumentException e ) @@ -277,6 +292,7 @@ public void shouldVisitChangedNodes() throws Exception { // GIVEN int nodes = 10; + int typeId = 10; cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 2, 100, base ); cache.setHighNodeId( nodes ); for ( long nodeId = 0; nodeId < nodes; nodeId++ ) @@ -292,7 +308,7 @@ public void shouldVisitChangedNodes() throws Exception for ( int i = 0; i < nodes / 2; i++ ) { long nodeId = random.nextLong( nodes ); - cache.getAndPutRelationship( nodeId, Direction.OUTGOING, random.nextLong( 1_000_000 ), false ); + cache.getAndPutRelationship( nodeId, typeId, Direction.OUTGOING, random.nextLong( 1_000_000 ), false ); boolean dense = cache.isDense( nodeId ); (dense ? keyDenseChanged : keySparseChanged).add( nodeId ); } @@ -304,7 +320,7 @@ public void shouldVisitChangedNodes() throws Exception // THEN (sparse) assertTrue( "Unexpected sparse change reported for " + nodeId, keySparseChanged.remove( nodeId ) ); }; - cache.visitChangedNodes( visitor, false/*sparse*/ ); + cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_SPARSE ); assertTrue( "There was " + keySparseChanged.size() + " expected sparse changes that weren't reported", keySparseChanged.isEmpty() ); } @@ -316,7 +332,7 @@ public void shouldVisitChangedNodes() throws Exception // THEN (dense) assertTrue( "Unexpected dense change reported for " + nodeId, keyDenseChanged.remove( nodeId ) ); }; - cache.visitChangedNodes( visitor, true/*dense*/ ); + cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_DENSE ); assertTrue( "There was " + keyDenseChanged.size() + " expected dense changes that weren reported", keyDenseChanged.isEmpty() ); } @@ -329,8 +345,9 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 100, base ); long nodeId = 5; long count = NodeRelationshipCache.MAX_COUNT - 1; + int typeId = 10; cache.setHighNodeId( 10 ); - cache.setCount( nodeId, count, OUTGOING ); + cache.setCount( nodeId, count, typeId, OUTGOING ); // WHEN cache.incrementCount( nodeId ); @@ -351,52 +368,53 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 0; - cache.setHighNodeId( nodeId+1 ); + int typeId = 10; + cache.setHighNodeId( nodeId + 1 ); cache.incrementCount( nodeId ); GroupVisitor groupVisitor = mock( GroupVisitor.class ); - when( groupVisitor.visit( anyLong(), anyLong(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L ); + when( groupVisitor.visit( anyLong(), anyInt(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L ); long firstRelationshipGroupId; { // WHEN importing the first type long relationshipId = 10; - cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, relationshipId, true ); firstRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor ); // THEN assertEquals( 1L, firstRelationshipGroupId ); - verify( groupVisitor ).visit( nodeId, -1L, relationshipId, -1L, -1L ); + verify( groupVisitor ).visit( nodeId, typeId, relationshipId, -1L, -1L ); // Also simulate going back again ("clearing" of the cache requires this) - cache.setForwardScan( false ); - cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, false ); - cache.setForwardScan( true ); + cache.setForwardScan( false, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, relationshipId, false ); + cache.setForwardScan( true, true ); } long secondRelationshipGroupId; { // WHEN importing the second type long relationshipId = 11; - cache.getAndPutRelationship( nodeId, INCOMING, relationshipId, true ); + cache.getAndPutRelationship( nodeId, typeId, INCOMING, relationshipId, true ); secondRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor ); // THEN assertEquals( 2L, secondRelationshipGroupId ); - verify( groupVisitor ).visit( nodeId, firstRelationshipGroupId, -1, relationshipId, -1L ); + verify( groupVisitor ).visit( nodeId, typeId, -1, relationshipId, -1L ); // Also simulate going back again ("clearing" of the cache requires this) - cache.setForwardScan( false ); - cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, false ); - cache.setForwardScan( true ); + cache.setForwardScan( false, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, relationshipId, false ); + cache.setForwardScan( true, true ); } { // WHEN importing the third type long relationshipId = 10; - cache.getAndPutRelationship( nodeId, BOTH, relationshipId, true ); + cache.getAndPutRelationship( nodeId, typeId, BOTH, relationshipId, true ); long thirdRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor ); assertEquals( 3L, thirdRelationshipGroupId ); - verify( groupVisitor ).visit( nodeId, secondRelationshipGroupId, -1L, -1L, relationshipId ); + verify( groupVisitor ).visit( nodeId, typeId, -1L, -1L, relationshipId ); } } @@ -406,11 +424,12 @@ public void shouldHaveSparseNodesWithBigCounts() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 1; + int typeId = 10; cache.setHighNodeId( nodeId + 1 ); // WHEN long highCount = NodeRelationshipCache.MAX_COUNT - 100; - cache.setCount( nodeId, highCount, OUTGOING ); + cache.setCount( nodeId, highCount, typeId, OUTGOING ); long nextHighCount = cache.incrementCount( nodeId ); // THEN @@ -427,30 +446,118 @@ public void shouldHaveDenseNodesWithBigCounts() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 1; + int typeId = 10; cache.setHighNodeId( nodeId + 1 ); - cache.setCount( nodeId, 2, OUTGOING ); // surely dense now - cache.getAndPutRelationship( nodeId, OUTGOING, 1, true ); - cache.getAndPutRelationship( nodeId, INCOMING, 2, true ); + cache.setCount( nodeId, 2, typeId, OUTGOING ); // surely dense now + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 1, true ); + cache.getAndPutRelationship( nodeId, typeId, INCOMING, 2, true ); // WHEN long highCountOut = NodeRelationshipCache.MAX_COUNT - 100; long highCountIn = NodeRelationshipCache.MAX_COUNT - 50; - cache.setCount( nodeId, highCountOut, OUTGOING ); - cache.setCount( nodeId, highCountIn, INCOMING ); - cache.getAndPutRelationship( nodeId, OUTGOING, 1, true /*increment count*/ ); - cache.getAndPutRelationship( nodeId, INCOMING, 2, true /*increment count*/ ); + cache.setCount( nodeId, highCountOut, typeId, OUTGOING ); + cache.setCount( nodeId, highCountIn, typeId, INCOMING ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 1, true /*increment count*/ ); + cache.getAndPutRelationship( nodeId, typeId, INCOMING, 2, true /*increment count*/ ); + + // THEN + assertEquals( highCountOut + 1, cache.getCount( nodeId, typeId, OUTGOING ) ); + assertEquals( highCountIn + 1, cache.getCount( nodeId, typeId, INCOMING ) ); + } + + @Test + public void shouldCacheMultipleDenseNodeRelationshipHeads() throws Exception + { + // GIVEN + cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1 ); + cache.setHighNodeId( 10 ); + long nodeId = 3; + cache.setCount( nodeId, 10, /*these do not matter ==>*/ 0, OUTGOING ); + + // WHEN + Map,Long> firstRelationshipIds = new HashMap<>(); + int typeCount = 3; + for ( int typeId = 0, relationshipId = 0; typeId < typeCount; typeId++ ) + { + for ( Direction direction : Direction.values() ) + { + long firstRelationshipId = relationshipId++; + cache.getAndPutRelationship( nodeId, typeId, direction, firstRelationshipId, true ); + firstRelationshipIds.put( Pair.of( typeId, direction ), firstRelationshipId ); + } + } + AtomicInteger visitCount = new AtomicInteger(); + GroupVisitor visitor = new GroupVisitor() + { + @Override + public long visit( long nodeId, int typeId, long out, long in, long loop ) + { + visitCount.incrementAndGet(); + assertEquals( firstRelationshipIds.get( Pair.of( typeId, OUTGOING ) ).longValue(), out ); + assertEquals( firstRelationshipIds.get( Pair.of( typeId, INCOMING ) ).longValue(), in ); + assertEquals( firstRelationshipIds.get( Pair.of( typeId, BOTH ) ).longValue(), loop ); + return 0; + } + }; + cache.getFirstRel( nodeId, visitor ); // THEN - assertEquals( highCountOut + 1, cache.getCount( nodeId, OUTGOING ) ); - assertEquals( highCountIn + 1, cache.getCount( nodeId, INCOMING ) ); + assertEquals( typeCount, visitCount.get() ); + } + + @Test + public void shouldSplitUpRelationshipTypesInBatches() throws Exception + { + // GIVEN + int denseNodeThreshold = 5; + int numberOfNodes = 100; + int numberOfTypes = 10; + cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, denseNodeThreshold ); + cache.setHighNodeId( numberOfNodes + 1 ); + Direction[] directions = Direction.values(); + for ( int i = 0; i < numberOfNodes; i++ ) + { + int count = random.nextInt( 1, denseNodeThreshold * 2 ); + cache.setCount( i, count, random.nextInt( numberOfTypes ), random.among( directions ) ); + } + int numberOfDenseNodes = toIntExact( cache.calculateNumberOfDenseNodes() ); + Map types = new HashMap<>(); + for ( int i = 0; i < numberOfTypes; i++ ) + { + types.put( "TYPE" + i, new MutableLong( random.nextInt( 1, 1_000 ) ) ); + } + + // WHEN enough memory for all types + { + long memory = numberOfDenseNodes * numberOfTypes * NodeRelationshipCache.GROUP_ENTRY_SIZE; + Collection fits = + Iterators.single( cache.splitRelationshipTypesIntoRounds( types.entrySet().iterator(), memory ) ); + // THEN + assertEquals( types.size(), fits.size() ); + } + + // and WHEN less than enough memory for all types + { + int memory = numberOfDenseNodes * numberOfTypes / 5 * NodeRelationshipCache.GROUP_ENTRY_SIZE; + int total = 0; + Iterator> rounds = + cache.splitRelationshipTypesIntoRounds( types.entrySet().iterator(), memory ); + while ( rounds.hasNext() ) + { + Collection round = rounds.next(); + total += round.size(); + } + assertEquals( types.size(), total ); + } } private void testNode( NodeRelationshipCache link, long node, Direction direction ) { - long count = link.getCount( node, direction ); - assertEquals( -1, link.getAndPutRelationship( node, direction, 5, false ) ); - assertEquals( 5, link.getAndPutRelationship( node, direction, 10, false ) ); - assertEquals( count, link.getCount( node, direction ) ); + int typeId = 0; // doesn't matter here because it's all sparse + long count = link.getCount( node, typeId, direction ); + assertEquals( -1, link.getAndPutRelationship( node, typeId, direction, 5, false ) ); + assertEquals( 5, link.getAndPutRelationship( node, typeId, direction, 10, false ) ); + assertEquals( count, link.getCount( node, typeId, direction ) ); } private long findNode( NodeRelationshipCache link, long nodeCount, boolean isDense ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java deleted file mode 100644 index 9aea0d0a3dea0..0000000000000 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.input; - -import org.junit.Rule; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; -import org.neo4j.test.rule.RandomRule; -import org.neo4j.test.rule.TestDirectory; -import org.neo4j.unsafe.impl.batchimport.Configuration; -import org.neo4j.unsafe.impl.batchimport.InputIterable; -import org.neo4j.unsafe.impl.batchimport.InputIterator; - -import static org.junit.Assert.assertEquals; -import static org.neo4j.helpers.collection.Iterators.filter; -import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES; -import static org.neo4j.unsafe.impl.batchimport.input.SimpleInputIteratorWrapper.wrap; - -public class PerTypeRelationshipSplitterTest -{ - @Rule - public final RandomRule random = new RandomRule().withSeed( 1460373085111L ); - @Rule - public final TestDirectory directory = TestDirectory.testDirectory(); - - @Test - public void shouldReturnTypesOneByOne() throws Exception - { - // GIVEN - Collection expected = randomRelationships(); - InputIterable relationships = wrap( "test", expected ); - InputCache inputCache = new InputCache( new DefaultFileSystemAbstraction(), directory.directory(), - StandardV3_0.RECORD_FORMATS, Configuration.DEFAULT ); - PerTypeRelationshipSplitter perType = new PerTypeRelationshipSplitter( relationships.iterator(), - types( expected ), type -> false, type -> Integer.parseInt( type.toString() ), inputCache ); - - // WHEN - Set all = new HashSet<>(); - while ( perType.hasNext() ) - { - try ( InputIterator relationshipsOfThisType = perType.next() ) - { - // THEN - Object type = perType.currentType(); - Collection expectedRelationshipsOfThisType = nodesOf( filter( - relationship -> relationship.typeAsObject().equals( type ), expected.iterator() ) ); - assertEquals( expectedRelationshipsOfThisType, nodesOf( relationshipsOfThisType ) ); - all.addAll( expectedRelationshipsOfThisType ); - } - } - - assertEquals( nodesOf( expected.iterator() ), all ); - inputCache.close(); - } - - /** - * Get the nodes of the relationships. We use those to identify relationships, since they have no ID - * and no equals method (which they don't really need). - * - * @param relationship {@link InputRelationship} to get node ids from. - * @return {@link Collection} of node ids from {@link InputRelationship} relationships. - */ - private Collection nodesOf( Iterator relationship ) - { - Collection nodes = new HashSet<>(); - while ( relationship.hasNext() ) - { - nodes.add( relationship.next().startNode() ); - } - return nodes; - } - - private Object[] types( Collection expected ) - { - Set types = new HashSet<>(); - for ( InputRelationship relationship : expected ) - { - types.add( relationship.typeAsObject() ); - } - return types.toArray(); - } - - private Collection randomRelationships() - { - Collection result = new ArrayList<>(); - int count = 100; - Group group = Group.GLOBAL; - boolean typeIds = random.nextBoolean(); - for ( int i = 0; i < count; i++ ) - { - int typeId = random.nextInt( 5 ); - Object node = (long)i; - InputRelationship relationship = new InputRelationship( "test", i, i, NO_PROPERTIES, null, - group, node, group, node, - typeIds ? null : String.valueOf( typeId ), - typeIds ? typeId : null ); - result.add( relationship ); - } - return result; - } -}