diff --git a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java index b466b0688982e..9bc566bd3eff9 100644 --- a/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java +++ b/community/consistency-check/src/test/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporterTest.java @@ -101,6 +101,7 @@ public class ParallelBatchImporterTest private static final int NODE_COUNT = 10_000; private static final int RELATIONSHIPS_PER_NODE = 5; private static final int RELATIONSHIP_COUNT = NODE_COUNT * RELATIONSHIPS_PER_NODE; + private static final int RELATIONSHIP_TYPES = 3; protected final Configuration config = new Configuration() { @Override @@ -124,6 +125,17 @@ public int maxNumberOfProcessors() int cores = Runtime.getRuntime().availableProcessors(); return random.intBetween( cores, cores + 100 ); } + + @Override + public long maxMemoryUsage() + { + // This calculation is just to try and hit some sort of memory limit so that relationship import + // is split up into multiple rounds. Also to see that relationship group defragmentation works + // well when doing multiple rounds. + double ratio = (NODE_COUNT / 1_000D ); + long mebi = 1024 * 1024; + return random.nextInt( (int) (ratio * mebi / 2), (int) (ratio * mebi) ); + } }; private final InputIdGenerator inputIdGenerator; private final IdMapper idMapper; @@ -253,7 +265,7 @@ public abstract static class InputIdGenerator String randomType( Random random ) { - return "TYPE" + random.nextInt( 3 ); + return "TYPE" + random.nextInt( RELATIONSHIP_TYPES ); } @Override diff --git a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java index 4cbb152dcf648..150155d7afabe 100644 --- a/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java +++ b/community/import-tool/src/main/java/org/neo4j/tooling/ImportTool.java @@ -44,6 +44,7 @@ import org.neo4j.io.fs.DefaultFileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.configuration.Settings; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.logging.StoreLogService; import org.neo4j.kernel.impl.storemigration.ExistingTargetStrategy; @@ -82,6 +83,8 @@ import static org.neo4j.kernel.configuration.Settings.parseLongWithUnit; import static org.neo4j.kernel.impl.util.Converters.withDefault; import static org.neo4j.unsafe.impl.batchimport.Configuration.BAD_FILE_NAME; +import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT_MAX_MEMORY_PERCENT; +import static org.neo4j.unsafe.impl.batchimport.Configuration.calculateMaxMemoryFromPercent; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.badCollector; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.collect; import static org.neo4j.unsafe.impl.batchimport.input.Collectors.silentBadCollector; @@ -99,7 +102,6 @@ public class ImportTool { private static final String UNLIMITED = "true"; - private static final int UNSPECIFIED = -1; enum Options { @@ -232,7 +234,13 @@ enum Options READ_BUFFER_SIZE( "read-buffer-size", org.neo4j.csv.reader.Configuration.DEFAULT.bufferSize(), "", "Size of each buffer for reading input data. It has to at least be large enough to hold the " + - "biggest single value in the input data." ); + "biggest single value in the input data." ), + MAX_MEMORY( "max-memory", null, + "", + "(advanced) Maximum memory that importer can use for various data structures and caching " + + "to improve performance. By default set to " + DEFAULT_MAX_MEMORY_PERCENT + + "% of (free memory on machine - max JVM memory). " + + "Values can be plain numbers, like 10000000 or e.g. 20G for 20 gigabyte, or even e.g. 70%." ); private final String key; private final Object defaultValue; @@ -378,10 +386,10 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit Config dbConfig; OutputStream badOutput = null; IdType idType = null; - int pageSize = UNSPECIFIED; org.neo4j.unsafe.impl.batchimport.Configuration configuration = null; File logsDir; File badFile = null; + Long maxMemory = null; boolean success = false; try @@ -402,6 +410,8 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit } nodesFiles = extractInputFiles( args, Options.NODE_DATA.key(), err ); relationshipsFiles = extractInputFiles( args, Options.RELATIONSHIP_DATA.key(), err ); + String maxMemoryString = args.get( Options.MAX_MEMORY.key(), null ); + maxMemory = parseMaxMemory( maxMemoryString ); validateInputFiles( nodesFiles, relationshipsFiles ); enableStacktrace = args.getBoolean( Options.STACKTRACE.key(), Boolean.FALSE, Boolean.TRUE ); @@ -423,7 +433,7 @@ public static void main( String[] incomingArguments, boolean defaultSettingsSuit dbConfig = loadDbConfig( args.interpretOption( Options.DATABASE_CONFIG.key(), Converters.optional(), Converters.toFile(), Validators.REGEX_FILE_EXISTS ) ); - configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, pageSize ); + configuration = importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, maxMemory ); input = new CsvInput( nodeData( inputEncoding, nodesFiles ), defaultFormatNodeFileHeader(), relationshipData( inputEncoding, relationshipsFiles ), defaultFormatRelationshipFileHeader(), idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector, @@ -451,6 +461,20 @@ idType, csvConfiguration( args, defaultSettingsSuitableForTests ), badCollector, } } + private static Long parseMaxMemory( String maxMemoryString ) + { + if ( maxMemoryString != null ) + { + if ( maxMemoryString.endsWith( "%" ) ) + { + int percent = Integer.parseInt( maxMemoryString.substring( 0, maxMemoryString.length() - 1 ) ); + return calculateMaxMemoryFromPercent( percent ); + } + return Settings.parseLongWithUnit( maxMemoryString ); + } + return null; + } + public static void doImport( PrintStream out, PrintStream err, File storeDir, File logsDir, File badFile, FileSystemAbstraction fs, Collection> nodesFiles, Collection> relationshipsFiles, boolean enableStacktrace, Input input, @@ -570,9 +594,11 @@ private static void printOverview( File storeDir, Collection> nod printInputFiles( "Relationships", relationshipsFiles, out ); out.println(); out.println( "Available resources:" ); + printIndented( "Total machine memory: " + bytes( OsBeanUtil.getTotalPhysicalMemory() ), out ); printIndented( "Free machine memory: " + bytes( OsBeanUtil.getFreePhysicalMemory() ), out ); printIndented( "Max heap memory : " + bytes( Runtime.getRuntime().maxMemory() ), out ); printIndented( "Processors: " + configuration.maxNumberOfProcessors(), out ); + printIndented( "Configured max memory: " + bytes( configuration.maxMemoryUsage() ), out ); out.println(); } @@ -623,11 +649,11 @@ public static void validateInputFiles( Collection> nodesFiles, public static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors, final boolean defaultSettingsSuitableForTests, final Config dbConfig ) { - return importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, UNSPECIFIED ); + return importConfiguration( processors, defaultSettingsSuitableForTests, dbConfig, null ); } public static org.neo4j.unsafe.impl.batchimport.Configuration importConfiguration( final Number processors, - final boolean defaultSettingsSuitableForTests, final Config dbConfig, int pageSize ) + final boolean defaultSettingsSuitableForTests, final Config dbConfig, Long maxMemory ) { return new org.neo4j.unsafe.impl.batchimport.Configuration() { @@ -648,6 +674,12 @@ public int denseNodeThreshold() { return dbConfig.get( GraphDatabaseSettings.dense_node_threshold ); } + + @Override + public long maxMemoryUsage() + { + return maxMemory != null ? maxMemory.longValue() : DEFAULT.maxMemoryUsage(); + } }; } diff --git a/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java b/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java index da1c23e5a7487..387b4b7c32f85 100644 --- a/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java +++ b/community/import-tool/src/test/java/org/neo4j/tooling/ImportToolTest.java @@ -1741,6 +1741,52 @@ public void shouldRespectBufferSizeSetting() throws Exception } } + @Test + public void shouldRespectMaxMemoryPercentageSetting() throws Exception + { + // GIVEN + List nodeIds = nodeIds( 10 ); + + // WHEN + importTool( + "--into", dbRule.getStoreDirAbsolutePath(), + "--nodes", nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), + "--max-memory", "60%" ); + } + + @Test + public void shouldFailOnInvalidMaxMemoryPercentageSetting() throws Exception + { + // GIVEN + List nodeIds = nodeIds( 10 ); + + try + { + // WHEN + importTool( "--into", dbRule.getStoreDirAbsolutePath(), "--nodes", + nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), "--max-memory", "110%" ); + fail( "Should have failed" ); + } + catch ( IllegalArgumentException e ) + { + // THEN good + assertThat( e.getMessage(), containsString( "percent" ) ); + } + } + + @Test + public void shouldRespectMaxMemorySuffixedSetting() throws Exception + { + // GIVEN + List nodeIds = nodeIds( 10 ); + + // WHEN + importTool( + "--into", dbRule.getStoreDirAbsolutePath(), + "--nodes", nodeData( true, Configuration.COMMAS, nodeIds, TRUE ).getAbsolutePath(), + "--max-memory", "100M" ); + } + private File writeArrayCsv( String[] headers, String[] values ) throws FileNotFoundException { File data = file( fileName( "whitespace.csv" ) ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java index 05c117d9199de..b66c6f9a0dea8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/IoPrimitiveUtils.java @@ -233,6 +233,16 @@ public static int safeCastLongToInt( long value ) return (int) value; } + public static short safeCastIntToUnsignedShort( int value ) + { + if ( (value & ~0xFFFF) != 0 ) + { + throw new IllegalArgumentException( + "Casting int value " + value + " to an unsigned short would wrap around" ); + } + return (short) value; + } + public static int shortToUnsignedInt( short value ) { return value & 0xFFFF; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java deleted file mode 100644 index d4a05329dc735..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStage.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.kernel.impl.store.record.PropertyBlock; -import org.neo4j.unsafe.batchinsert.BatchInserter; -import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; -import org.neo4j.unsafe.impl.batchimport.input.Input; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.Stage; -import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; - -import static org.neo4j.unsafe.impl.batchimport.staging.Step.ORDER_SEND_DOWNSTREAM; - -/** - * Inserts relationships one by one, {@link BatchInserter} style which may incur random I/O. This stage - * should only be used on relationship types which are very small (<100). Steps: - * - *
    - *
  1. {@link InputIteratorBatcherStep} reading from {@link InputIterator} produced from {@link Input#nodes()}.
  2. - *
  3. {@link RelationshipPreparationStep} looks up {@link InputRelationship#startNode() start node input id} / - * {@link InputRelationship#endNode() end node input id} from {@link IdMapper} and attaches to the batches going - * through because that lookup is costly and this step can be parallelized.
  4. - *
  5. {@link PropertyEncoderStep} encodes properties from {@link InputRelationship input relationships} into - * {@link PropertyBlock}, low level kernel encoded values.
  6. - *
  7. {@link BatchInsertRelationshipsStep} inserts relationships one by one by reading from and updating store - * as it sees required.
  8. - *
- */ -public class BatchInsertRelationshipsStage extends Stage -{ - public BatchInsertRelationshipsStage( Configuration config, IdMapper idMapper, - InputIterator relationships, BatchingNeoStores store, long nextRelationshipId ) - { - super( "Minority relationships", config, ORDER_SEND_DOWNSTREAM ); - add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class ) ); - add( new RelationshipPreparationStep( control(), config, idMapper ) ); - add( new PropertyEncoderStep<>( control(), config, store.getPropertyKeyRepository(), - store.getPropertyStore() ) ); - add( new BatchInsertRelationshipsStep( control(), config, store, - store.getRelationshipTypeRepository(), nextRelationshipId ) ); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java deleted file mode 100644 index 9778c88405780..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/BatchInsertRelationshipsStep.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import java.util.function.ToIntFunction; - -import org.neo4j.kernel.impl.locking.Locks; -import org.neo4j.kernel.impl.locking.NoOpClient; -import org.neo4j.kernel.impl.store.PropertyStore; -import org.neo4j.kernel.impl.store.RecordStore; -import org.neo4j.kernel.impl.store.id.IdSequence; -import org.neo4j.kernel.impl.store.record.PropertyBlock; -import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.kernel.impl.transaction.state.PropertyCreator; -import org.neo4j.kernel.impl.transaction.state.PropertyTraverser; -import org.neo4j.kernel.impl.transaction.state.RelationshipCreator; -import org.neo4j.kernel.impl.transaction.state.RelationshipGroupGetter; -import org.neo4j.kernel.impl.util.ReusableIteratorCostume; -import org.neo4j.unsafe.batchinsert.DirectRecordAccessSet; -import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; -import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; -import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import org.neo4j.unsafe.impl.batchimport.store.BatchingIdSequence; -import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; - -import static org.neo4j.unsafe.impl.batchimport.EntityStoreUpdaterStep.reassignDynamicRecordIds; - -public class BatchInsertRelationshipsStep extends ProcessorStep> -{ - private final ToIntFunction typeToId; - private final RelationshipCreator relationshipCreator; - private final Locks.Client noopLockClient = new NoOpClient(); - private final PropertyCreator propertyCreator; - private final DirectRecordAccessSet recordAccess; - private final PropertyStore propertyStore; - private int pendingRelationshipChanges; - - // Reusable instances for less GC - private final ReusableIteratorCostume blockIterator = new ReusableIteratorCostume<>(); - private final IdSequence relationshipIdGenerator; - - public BatchInsertRelationshipsStep( StageControl control, Configuration config, BatchingNeoStores store, - ToIntFunction typeToId, long nextRelationshipId ) - { - super( control, "INSERT", config, 1 ); - this.typeToId = typeToId; - RecordStore relationshipGroupStore = store.getTemporaryRelationshipGroupStore(); - RelationshipGroupGetter groupGetter = new RelationshipGroupGetter( relationshipGroupStore ); - this.relationshipCreator = new RelationshipCreator( groupGetter, config.denseNodeThreshold() ); - PropertyTraverser propertyTraverser = new PropertyTraverser(); - this.propertyCreator = new PropertyCreator( store.getPropertyStore(), propertyTraverser ); - this.propertyStore = store.getPropertyStore(); - this.recordAccess = new DirectRecordAccessSet( store.getNodeStore(), propertyStore, - store.getRelationshipStore(), relationshipGroupStore, - store.getNeoStores().getPropertyKeyTokenStore(), - store.getNeoStores().getRelationshipTypeTokenStore(), - store.getNeoStores().getLabelTokenStore(), store.getNeoStores().getSchemaStore() ); - this.relationshipIdGenerator = new BatchingIdSequence( nextRelationshipId ); - } - - @Override - protected void process( Batch batch, BatchSender sender ) throws Throwable - { - for ( int i = 0, propertyBlockCursor = 0; i < batch.input.length; i++ ) - { - InputRelationship input = batch.input[i]; - int propertyBlockCount = batch.propertyBlocksLengths[i]; - - // Create relationship - long startNodeId = batch.ids[i*2]; - long endNodeId = batch.ids[i*2+1]; - if ( startNodeId != -1 && endNodeId != -1 ) - { - long id = relationshipIdGenerator.nextId(); - int typeId = typeToId.applyAsInt( input.typeAsObject() ); - relationshipCreator.relationshipCreate( id, typeId, startNodeId, endNodeId, recordAccess, noopLockClient ); - - // Set properties - RelationshipRecord record = recordAccess.getRelRecords().getOrLoad( id, null ).forChangingData(); - if ( input.hasFirstPropertyId() ) - { - record.setNextProp( input.firstPropertyId() ); - } - else - { - if ( propertyBlockCount > 0 ) - { - reassignDynamicRecordIds( propertyStore, batch.propertyBlocks, - propertyBlockCursor, propertyBlockCount ); - long firstProp = propertyCreator.createPropertyChain( record, - blockIterator.dressArray( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount ), - recordAccess.getPropertyRecords() ); - record.setNextProp( firstProp ); - } - } - } - // else --> This is commonly known as input relationship referring to missing node IDs - propertyBlockCursor += propertyBlockCount; - } - - pendingRelationshipChanges += batch.input.length; - if ( pendingRelationshipChanges >= 50_000 ) - { - recordAccess.close(); // <-- happens to be called close even though this impl just flushes - pendingRelationshipChanges = 0; - } - } - - @Override - protected void done() - { - recordAccess.close(); - super.done(); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java index dc6d088b524a5..89011def17d97 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/CalculateDenseNodesStage.java @@ -68,7 +68,7 @@ public CalculateDenseNodesStage( Configuration config, InputIterable( control(), config, - relationships.iterator(), InputRelationship.class ) ); + relationships.iterator(), InputRelationship.class, t -> true ) ); if ( !relationships.supportsMultiplePasses() ) { add( new InputEntityCacherStep<>( control(), config, inputCache.cacheRelationships( MAIN ) ) ); @@ -79,11 +79,8 @@ public CalculateDenseNodesStage( Configuration config, InputIterable 0, was " + percent ); + } + if ( percent > 100 ) + { + throw new IllegalArgumentException( "Expected percentage to be < 100, was " + percent ); + } + long freePhysicalMemory = OsBeanUtil.getFreePhysicalMemory(); + if ( freePhysicalMemory == OsBeanUtil.VALUE_UNAVAILABLE ) + { + throw new UnsupportedOperationException( + "Unable to detect amount of free memory, so max memory has to be explicitly set" ); + } + + double factor = percent / 100D; + return round( (freePhysicalMemory - Runtime.getRuntime().maxMemory()) * factor ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java index e564049a7069f..f4dfccbb535f7 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/InputIteratorBatcherStep.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Predicate; + import org.neo4j.unsafe.impl.batchimport.staging.IteratorBatcherStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -30,10 +32,10 @@ public class InputIteratorBatcherStep extends IteratorBatcherStep { private final InputIterator data; - public InputIteratorBatcherStep( StageControl control, Configuration config, - InputIterator data, Class itemClass ) + InputIteratorBatcherStep( StageControl control, Configuration config, InputIterator data, Class itemClass, + Predicate filter ) { - super( control, config, data, itemClass ); + super( control, config, data, itemClass, filter ); this.data = data; } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java index c0bbce1e74890..6fecbbf05ec98 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipProcessor.java @@ -37,14 +37,12 @@ public class NodeFirstRelationshipProcessor implements RecordProcessor relGroupStore; private final NodeRelationshipCache cache; - private final int relationshipType; public NodeFirstRelationshipProcessor( RecordStore relGroupStore, - NodeRelationshipCache cache, int relationshipType ) + NodeRelationshipCache cache ) { this.relGroupStore = relGroupStore; this.cache = cache; - this.relationshipType = relationshipType; } @Override @@ -64,18 +62,17 @@ public boolean process( NodeRecord node ) } @Override - public long visit( long nodeId, long next, long out, long in, long loop ) + public long visit( long nodeId, int typeId, long out, long in, long loop ) { // Here we'll use the already generated id (below) from the previous visit, if that so happened long id = relGroupStore.nextId(); RelationshipGroupRecord groupRecord = new RelationshipGroupRecord( id ); - groupRecord.setType( relationshipType ); + groupRecord.setType( typeId ); groupRecord.setInUse( true ); groupRecord.setFirstOut( out ); groupRecord.setFirstIn( in ); groupRecord.setFirstLoop( loop ); groupRecord.setOwningNode( nodeId ); - groupRecord.setNext( next ); relGroupStore.prepareForCommit( groupRecord ); relGroupStore.updateRecord( groupRecord ); return id; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java index dadf7c93aa053..18fd82dddb5b5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeFirstRelationshipStage.java @@ -42,12 +42,12 @@ public class NodeFirstRelationshipStage extends Stage { public NodeFirstRelationshipStage( String topic, Configuration config, NodeStore nodeStore, RecordStore relationshipGroupStore, NodeRelationshipCache cache, - boolean denseNodes, int relationshipType ) + int nodeTypes ) { super( "Node --> Relationship" + topic, config ); - add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, denseNodes ) ); + add( new ReadNodeRecordsByCacheStep( control(), config, nodeStore, cache, nodeTypes ) ); add( new RecordProcessorStep<>( control(), "LINK", config, - new NodeFirstRelationshipProcessor( relationshipGroupStore, cache, relationshipType ), false ) ); + new NodeFirstRelationshipProcessor( relationshipGroupStore, cache ), false ) ); add( new UpdateRecordsStep<>( control(), config, nodeStore ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java index 3670661c3fb76..e8d789c908f12 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java @@ -72,7 +72,7 @@ public NodeStage( Configuration config, IoMonitor writeMonitor, { super( "Nodes", config, ORDER_SEND_DOWNSTREAM ); this.cache = cache; - add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class ) ); + add( new InputIteratorBatcherStep<>( control(), config, nodes.iterator(), InputNode.class, t -> true ) ); if ( !nodes.supportsMultiplePasses() ) { add( new InputEntityCacherStep<>( control(), config, inputCache.cacheNodes( MAIN ) ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index dcc0318983470..98260cbe720fb 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -21,7 +21,9 @@ import java.io.File; import java.io.IOException; -import java.util.Set; +import java.util.Collection; +import java.util.Iterator; +import java.util.function.Predicate; import org.neo4j.collection.primitive.PrimitiveLongIterator; import org.neo4j.helpers.Exceptions; @@ -42,6 +44,7 @@ import org.neo4j.unsafe.impl.batchimport.cache.MemoryStatsVisitor; import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdGenerator; import org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper; import org.neo4j.unsafe.impl.batchimport.input.Collector; @@ -49,7 +52,6 @@ import org.neo4j.unsafe.impl.batchimport.input.InputCache; import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.input.InputRelationship; -import org.neo4j.unsafe.impl.batchimport.input.PerTypeRelationshipSplitter; import org.neo4j.unsafe.impl.batchimport.staging.DynamicProcessorAssigner; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitor; import org.neo4j.unsafe.impl.batchimport.staging.Stage; @@ -57,12 +59,10 @@ import org.neo4j.unsafe.impl.batchimport.store.BatchingNeoStores; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; -import static java.lang.Math.max; +import static java.lang.Long.max; import static java.lang.String.format; import static java.lang.System.currentTimeMillis; import static org.neo4j.helpers.Format.bytes; -import static org.neo4j.helpers.collection.Iterators.asSet; -import static org.neo4j.io.ByteUnit.mebiBytes; import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY; import static org.neo4j.unsafe.impl.batchimport.Configuration.withBatchSize; import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure; @@ -155,6 +155,7 @@ public void doImport( Input input ) throws IOException // Things that we need to close later. The reason they're not in the try-with-resource statement // is that we need to close, and set to null, at specific points preferably. So use good ol' finally block. + long maxMemory = config.maxMemoryUsage(); NodeRelationshipCache nodeRelationshipCache = null; NodeLabelsCache nodeLabelsCache = null; long startTime = currentTimeMillis(); @@ -175,7 +176,7 @@ public void doImport( Input input ) throws IOException InputIterable relationships = input.relationships(); InputIterable cachedNodes = cachedForSure( nodes, inputCache.nodes( MAIN, true ) ); InputIterable cachedRelationships = - cachedForSure( relationships, inputCache.relationships( MAIN, true ) ); + cachedForSure( relationships, inputCache.relationships( MAIN, false ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); @@ -201,10 +202,10 @@ public void doImport( Input input ) throws IOException relationships, nodeRelationshipCache, idMapper, badCollector, inputCache, neoStore ); executeStage( calculateDenseNodesStage ); + long availableMemory = maxMemory - totalMemoryUsageOf( nodeRelationshipCache, idMapper ); importRelationships( nodeRelationshipCache, storeUpdateMonitor, neoStore, writeMonitor, - idMapper, cachedRelationships, inputCache, - calculateDenseNodesStage.getRelationshipTypes( Long.MAX_VALUE ), - calculateDenseNodesStage.getRelationshipTypes( 100 ) ); + idMapper, cachedRelationships, calculateDenseNodesStage.getDistribution(), + availableMemory ); // Release this potentially really big piece of cached data long peakMemoryUsage = totalMemoryUsageOf( idMapper, nodeRelationshipCache ); @@ -214,8 +215,8 @@ public void doImport( Input input ) throws IOException nodeRelationshipCache.close(); nodeRelationshipCache = null; - new RelationshipGroupDefragmenter( config, executionMonitor ).run( - max( max( peakMemoryUsage, highNodeId * 4 ), mebiBytes( 1 ) ), neoStore, highNodeId ); + new RelationshipGroupDefragmenter( config, executionMonitor ).run( max( maxMemory, peakMemoryUsage ), + neoStore, highNodeId ); // Stage 6 -- count nodes per label and labels per node nodeLabelsCache = new NodeLabelsCache( AUTO, neoStore.getLabelRepository().getHighId() ); @@ -281,7 +282,7 @@ private long totalMemoryUsageOf( MemoryStatsVisitor.Visitable... users ) private void importRelationships( NodeRelationshipCache nodeRelationshipCache, CountingStoreUpdateMonitor storeUpdateMonitor, BatchingNeoStores neoStore, IoMonitor writeMonitor, IdMapper idMapper, InputIterable relationships, - InputCache inputCache, Object[] allRelationshipTypes, Object[] minorityRelationshipTypes ) + RelationshipTypeDistribution typeDistribution, long freeMemoryForDenseNodeCache ) { // Imports the relationships from the Input. This isn't a straight forward as importing nodes, // since keeping track of and updating heads of relationship chains in scenarios where most nodes @@ -295,65 +296,72 @@ private void importRelationships( NodeRelationshipCache nodeRelationshipCache, // finally there will be one Node --> Relationship and Relationship --> Relationship stage linking // all sparse relationship chains together. - Set minorityRelationshipTypeSet = asSet( minorityRelationshipTypes ); - PerTypeRelationshipSplitter perTypeIterator = new PerTypeRelationshipSplitter( - relationships.iterator(), - allRelationshipTypes, - type -> minorityRelationshipTypeSet.contains( type ), - neoStore.getRelationshipTypeRepository(), - inputCache ); - long nextRelationshipId = 0; Configuration relationshipConfig = withBatchSize( config, neoStore.getRelationshipStore().getRecordsPerPage() ); Configuration nodeConfig = withBatchSize( config, neoStore.getNodeStore().getRecordsPerPage() ); - for ( int i = 0; perTypeIterator.hasNext(); i++ ) + Iterator> rounds = nodeRelationshipCache.splitRelationshipTypesIntoRounds( + typeDistribution.iterator(), freeMemoryForDenseNodeCache ); + + // Do multiple rounds of relationship importing. Each round fits as many relationship types + // as it can (comparing with worst-case memory usage and available memory). + int typesImported = 0; + int round = 0; + for ( round = 0; rounds.hasNext(); round++ ) { - // Stage 3a -- relationships, properties - nodeRelationshipCache.setForwardScan( true ); - Object currentType = perTypeIterator.currentType(); - int currentTypeId = neoStore.getRelationshipTypeRepository().getOrCreateId( currentType ); + // Figure out which types we can fit in node-->relationship cache memory. + // Types go from biggest to smallest group and so towards the end there will be + // smaller and more groups per round in this loop + Collection typesToImportThisRound = rounds.next(); + boolean thisIsTheOnlyRound = round == 0 && !rounds.hasNext(); - InputIterator perType = perTypeIterator.next(); - String topic = " [:" + currentType + "] (" + - (i + 1) + "/" + allRelationshipTypes.length + ")"; - final RelationshipStage relationshipStage = new RelationshipStage( topic, config, - writeMonitor, perType, idMapper, neoStore, nodeRelationshipCache, - storeUpdateMonitor, nextRelationshipId ); + // Import relationships and their properties + nodeRelationshipCache.setForwardScan( true, true/*dense*/ ); + String range = typesToImportThisRound.size() == 1 + ? String.valueOf( typesImported + 1 ) + : (typesImported + 1) + "-" + (typesImported + typesToImportThisRound.size()); + String topic = " " + range + "/" + typeDistribution.getNumberOfRelationshipTypes(); + Predicate typeFilter = thisIsTheOnlyRound + ? relationship -> true // optimization when all rels are imported in this round + : relationship -> typesToImportThisRound.contains( relationship.typeAsObject() ); + RelationshipStage relationshipStage = new RelationshipStage( topic, config, + writeMonitor, typeFilter, relationships.iterator(), idMapper, neoStore, + nodeRelationshipCache, storeUpdateMonitor, nextRelationshipId ); executeStage( relationshipStage ); - // Stage 4a -- set node nextRel fields for dense nodes + int nodeTypes = thisIsTheOnlyRound ? NodeType.NODE_TYPE_ALL : NodeType.NODE_TYPE_DENSE; + + // Set node nextRel fields for dense nodes executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, true/*dense*/, - currentTypeId ) ); + neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, nodeTypes ) ); - // Stage 5a -- link relationship chains together for dense nodes - nodeRelationshipCache.setForwardScan( false ); + // Link relationship chains together for dense nodes + nodeRelationshipCache.setForwardScan( false, true/*dense*/ ); executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), nodeRelationshipCache, nextRelationshipId, - relationshipStage.getNextRelationshipId(), true/*dense*/ ) ); + relationshipStage.getNextRelationshipId(), nodeTypes ) ); nextRelationshipId = relationshipStage.getNextRelationshipId(); - nodeRelationshipCache.clearChangedChunks( true/*dense*/ ); // cheap higher level clearing + typesImported += typesToImportThisRound.size(); } - String topic = " Sparse"; - nodeRelationshipCache.setForwardScan( true ); - // Stage 4b -- set node nextRel fields for sparse nodes - executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), - neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, false/*sparse*/, -1 ) ); - - // Stage 5b -- link relationship chains together for sparse nodes - nodeRelationshipCache.setForwardScan( false ); - executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, neoStore.getRelationshipStore(), - nodeRelationshipCache, 0, nextRelationshipId, false/*sparse*/ ) ); - - if ( minorityRelationshipTypes.length > 0 ) + // There's an optimization above which will piggy-back sparse linking on the dense linking + // if all relationships are imported in one round. The sparse linking below will be done if + // there were multiple passes of dense linking above. + if ( round > 1 ) { - // Do some batch insertion style random-access insertions for super small minority types - executeStage( new BatchInsertRelationshipsStage( config, idMapper, - perTypeIterator.getMinorityRelationships(), neoStore, nextRelationshipId ) ); + // Set node nextRel fields for sparse nodes + String topic = " Sparse"; + nodeRelationshipCache.setForwardScan( true, false/*sparse*/ ); + executeStage( new NodeFirstRelationshipStage( topic, nodeConfig, neoStore.getNodeStore(), + neoStore.getTemporaryRelationshipGroupStore(), nodeRelationshipCache, NodeType.NODE_TYPE_SPARSE ) ); + + // Link relationship chains together for sparse nodes + nodeRelationshipCache.setForwardScan( false, false/*sparse*/ ); + executeStage( new RelationshipLinkbackStage( topic, relationshipConfig, + neoStore.getRelationshipStore(), nodeRelationshipCache, 0, nextRelationshipId, + NodeType.NODE_TYPE_SPARSE ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java index 5398bc7332dc9..06700e146c81a 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ReadNodeRecordsByCacheStep.java @@ -37,17 +37,17 @@ */ public class ReadNodeRecordsByCacheStep extends AbstractStep { - private final boolean denseNodes; + private final int nodeTypes; private final NodeRelationshipCache cache; private final int batchSize; private final RecordCursor recordCursor; public ReadNodeRecordsByCacheStep( StageControl control, Configuration config, - NodeStore nodeStore, NodeRelationshipCache cache, boolean denseNodes ) + NodeStore nodeStore, NodeRelationshipCache cache, int nodeTypes ) { super( control, ">", config ); this.cache = cache; - this.denseNodes = denseNodes; + this.nodeTypes = nodeTypes; this.batchSize = config.batchSize(); this.recordCursor = nodeStore.newRecordCursor( nodeStore.newRecord() ); } @@ -77,7 +77,7 @@ public void run() assertHealthy(); try ( NodeVisitor visitor = new NodeVisitor() ) { - cache.visitChangedNodes( visitor, denseNodes ); + cache.visitChangedNodes( visitor, nodeTypes ); } endOfUpstream(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java index 5798fc87b6822..84505803214e4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipEncoderStep.java @@ -62,10 +62,11 @@ protected void forkedProcess( int id, int processors, Batch private long fromNodeId; private long toNodeId; private long highCacheId; + private final long maxCacheLength; public RelationshipGroupCache( NumberArrayFactory arrayFactory, long maxMemory, long highNodeId ) { @@ -71,7 +74,8 @@ public RelationshipGroupCache( NumberArrayFactory arrayFactory, long maxMemory, bytes( maxMemory ) + " where " + bytes( memoryDedicatedToCounting ) + " was dedicated to group counting" ); } - this.cache = arrayFactory.newByteArray( memoryLeftForGroupCache / GROUP_ENTRY_SIZE, new byte[GROUP_ENTRY_SIZE] ); + maxCacheLength = memoryLeftForGroupCache / GROUP_ENTRY_SIZE; + this.cache = arrayFactory.newDynamicByteArray( max( 1_000, maxCacheLength / 100 ), new byte[GROUP_ENTRY_SIZE] ); } /** @@ -125,7 +129,7 @@ public long prepare( long fromNodeId ) for ( long nodeId = fromNodeId; nodeId < highNodeId; nodeId++ ) { int count = groupCount( nodeId ); - if ( highCacheId + count > cache.length() ) + if ( highCacheId + count > maxCacheLength ) { // Cannot include this one, so up until the previous is good return this.toNodeId = nodeId; @@ -161,7 +165,7 @@ public boolean put( RelationshipGroupRecord groupRecord ) long baseIndex = offsets.get( rebase( nodeId ) ); // grouCount is extra validation, really int groupCount = groupCount( nodeId ); - long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType() ); + long index = scanForFreeFrom( baseIndex, groupCount, groupRecord.getType(), groupRecord.getOwningNode() ); // Put the group at this index cache.setByte( index, 0, (byte) 1 ); @@ -172,7 +176,7 @@ public boolean put( RelationshipGroupRecord groupRecord ) return true; } - private long scanForFreeFrom( long startIndex, int groupCount, int type ) + private long scanForFreeFrom( long startIndex, int groupCount, int type, long owningNodeId ) { long desiredIndex = -1; long freeIndex = -1; @@ -191,7 +195,8 @@ private long scanForFreeFrom( long startIndex, int groupCount, int type ) int existingType = cache.get3ByteInt( candidateIndex, 1 ); if ( existingType == type ) { - throw new IllegalStateException( "Tried to put multiple groups with same type " + type ); + throw new IllegalStateException( + "Tried to put multiple groups with same type " + type + " for node " + owningNodeId ); } if ( type < existingType ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java deleted file mode 100644 index 16f0e3f5621a2..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackProcessor.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport; - -import org.neo4j.graphdb.Direction; -import org.neo4j.kernel.impl.store.record.RelationshipRecord; -import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; -import static org.neo4j.unsafe.impl.batchimport.cache.idmapping.IdMapper.ID_NOT_FOUND; - -/** - * Links the {@code previous} fields in {@link RelationshipRecord relationship records}. This is done after - * a forward pass where the {@code next} fields are linked. - */ -public class RelationshipLinkbackProcessor implements RecordProcessor -{ - private final NodeRelationshipCache cache; - private final boolean denseNodes; - - public RelationshipLinkbackProcessor( NodeRelationshipCache cache, boolean denseNodes ) - { - this.cache = cache; - this.denseNodes = denseNodes; - } - - @Override - public boolean process( RelationshipRecord record ) - { - boolean isLoop = record.getFirstNode() == record.getSecondNode(); - boolean firstIsDense = cache.isDense( record.getFirstNode() ); - boolean changed = false; - if ( isLoop ) - { - if ( firstIsDense == denseNodes ) - { - long prevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.BOTH, record.getId(), false ); - if ( prevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInFirstChain( true ); - record.setFirstInSecondChain( true ); - prevRel = cache.getCount( record.getFirstNode(), Direction.BOTH ); - } - record.setFirstPrevRel( prevRel ); - record.setSecondPrevRel( prevRel ); - changed = true; - } - } - else - { - // Start node - if ( firstIsDense == denseNodes ) - { - long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.OUTGOING, record.getId(), false ); - if ( firstPrevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInFirstChain( true ); - firstPrevRel = cache.getCount( record.getFirstNode(), Direction.OUTGOING ); - } - record.setFirstPrevRel( firstPrevRel ); - changed = true; - } - - // End node - boolean secondIsDense = cache.isDense( record.getSecondNode() ); - if ( secondIsDense == denseNodes ) - { - long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), - Direction.INCOMING, record.getId(), false ); - if ( secondPrevRel == ID_NOT_FOUND ) - { // First one - record.setFirstInSecondChain( true ); - secondPrevRel = cache.getCount( record.getSecondNode(), Direction.INCOMING ); - } - record.setSecondPrevRel( secondPrevRel ); - changed = true; - } - } - return changed; - } - - @Override - public void done() - { // Nothing to do here - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java index 8e30815ade734..cfdc4a7b7ac78 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStage.java @@ -45,12 +45,12 @@ public class RelationshipLinkbackStage extends Stage { public RelationshipLinkbackStage( String topic, Configuration config, RelationshipStore store, - NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, boolean denseNodes ) + NodeRelationshipCache cache, long lowRelationshipId, long highRelationshipId, int nodeTypes ) { super( "Relationship --> Relationship" + topic, config ); add( new ReadRecordsStep<>( control(), config, store, backwards( lowRelationshipId, highRelationshipId, config ) ) ); - add( new RelationshipLinkbackStep( control(), config, cache, denseNodes ) ); + add( new RelationshipLinkbackStep( control(), config, cache, nodeTypes ) ); add( new UpdateRecordsStep<>( control(), config, store ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java index 5002b5b9d538d..7509aab4ade9f 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStep.java @@ -22,6 +22,7 @@ import org.neo4j.graphdb.Direction; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.staging.ForkedProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; @@ -35,14 +36,14 @@ public class RelationshipLinkbackStep extends ForkedProcessorStep { private final NodeRelationshipCache cache; - private final boolean denseNodes; + private final int nodeTypes; public RelationshipLinkbackStep( StageControl control, Configuration config, - NodeRelationshipCache cache, boolean denseNodes ) + NodeRelationshipCache cache, int nodeTypes ) { super( control, "LINK", config, 0 ); this.cache = cache; - this.denseNodes = denseNodes; + this.nodeTypes = nodeTypes; } @Override @@ -77,19 +78,20 @@ public boolean process( RelationshipRecord record, int id, int processors ) boolean firstIsDense = cache.isDense( record.getFirstNode() ); boolean changed = false; boolean isLoop = record.getFirstNode() == record.getSecondNode(); + int typeId = record.getType(); if ( isLoop ) { - if ( firstIsDense == denseNodes ) + if ( NodeType.matchesDense( nodeTypes, firstIsDense ) ) { if ( processFirst ) { long prevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.BOTH, record.getId(), false ); + typeId, Direction.BOTH, record.getId(), false ); if ( prevRel == ID_NOT_FOUND ) { // First one record.setFirstInFirstChain( true ); record.setFirstInSecondChain( true ); - prevRel = cache.getCount( record.getFirstNode(), Direction.BOTH ); + prevRel = cache.getCount( record.getFirstNode(), typeId, Direction.BOTH ); } record.setFirstPrevRel( prevRel ); record.setSecondPrevRel( prevRel ); @@ -100,16 +102,16 @@ public boolean process( RelationshipRecord record, int id, int processors ) else { // Start node - if ( firstIsDense == denseNodes ) + if ( NodeType.matchesDense( nodeTypes, firstIsDense ) ) { if ( processFirst ) { long firstPrevRel = cache.getAndPutRelationship( record.getFirstNode(), - Direction.OUTGOING, record.getId(), false ); + typeId, Direction.OUTGOING, record.getId(), false ); if ( firstPrevRel == ID_NOT_FOUND ) { // First one record.setFirstInFirstChain( true ); - firstPrevRel = cache.getCount( record.getFirstNode(), Direction.OUTGOING ); + firstPrevRel = cache.getCount( record.getFirstNode(), typeId, Direction.OUTGOING ); } record.setFirstPrevRel( firstPrevRel ); } @@ -118,16 +120,16 @@ public boolean process( RelationshipRecord record, int id, int processors ) // End node boolean secondIsDense = cache.isDense( record.getSecondNode() ); - if ( secondIsDense == denseNodes ) + if ( NodeType.matchesDense( nodeTypes, secondIsDense ) ) { if ( processSecond ) { long secondPrevRel = cache.getAndPutRelationship( record.getSecondNode(), - Direction.INCOMING, record.getId(), false ); + typeId, Direction.INCOMING, record.getId(), false ); if ( secondPrevRel == ID_NOT_FOUND ) { // First one record.setFirstInSecondChain( true ); - secondPrevRel = cache.getCount( record.getSecondNode(), Direction.INCOMING ); + secondPrevRel = cache.getCount( record.getSecondNode(), typeId, Direction.INCOMING ); } record.setSecondPrevRel( secondPrevRel ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index ff50c3a69f7da..db7cb78bb2b4b 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -19,6 +19,8 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.function.Predicate; + import org.neo4j.graphdb.Direction; import org.neo4j.kernel.impl.store.PropertyStore; import org.neo4j.kernel.impl.store.RelationshipStore; @@ -74,12 +76,13 @@ public class RelationshipStage extends Stage private AssignRelationshipIdBatchStep idAssigner; public RelationshipStage( String topic, Configuration config, IoMonitor writeMonitor, + Predicate typeFilter, InputIterator relationships, IdMapper idMapper, BatchingNeoStores neoStore, NodeRelationshipCache cache, EntityStoreUpdaterStep.Monitor storeUpdateMonitor, long firstRelationshipId ) { super( "Relationships" + topic, config, ORDER_SEND_DOWNSTREAM ); - add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class ) ); + add( new InputIteratorBatcherStep<>( control(), config, relationships, InputRelationship.class, typeFilter ) ); RelationshipStore relationshipStore = neoStore.getRelationshipStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java index 1550e8fbb8928..4baed296eab5e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStep.java @@ -21,11 +21,9 @@ import org.apache.commons.lang3.mutable.MutableLong; -import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -53,7 +51,7 @@ public class RelationshipTypeCheckerStep extends ProcessorStep Integer.compare( (Integer)e2.getKey(), (Integer)e1.getKey() ); private final Map> typeCheckers = new ConcurrentHashMap<>(); private final BatchingRelationshipTypeTokenRepository typeTokenRepository; - private Map.Entry[] sortedTypes; + private RelationshipTypeDistribution distribution; public RelationshipTypeCheckerStep( StageControl control, Configuration config, BatchingRelationshipTypeTokenRepository typeTokenRepository ) @@ -81,7 +79,7 @@ protected void done() localTypes.forEach( (type,localCount) -> mergedTypes.computeIfAbsent( type, t -> new MutableLong() ).add( localCount.longValue() ) ) ); - sortedTypes = mergedTypes.entrySet().toArray( new Map.Entry[mergedTypes.size()] ); + Map.Entry[] sortedTypes = mergedTypes.entrySet().toArray( new Map.Entry[mergedTypes.size()] ); if ( sortedTypes.length > 0 ) { Comparator> comparator = sortedTypes[0].getKey() instanceof Integer ? @@ -101,32 +99,12 @@ protected void done() { typeTokenRepository.getOrCreateId( sortedTypes[i].getKey() ); } + distribution = new RelationshipTypeDistribution( sortedTypes ); super.done(); } - /** - * Returns relationship types which have number of relationships equal to or lower than the given threshold. - * - * @param belowOrEqualToThreshold threshold where relationship types which have this amount of relationships - * or less will be returned. - * @return the order of which to order {@link InputRelationship} when importing relationships. - * The order in which these relationships are returned will be the reverse order of relationship type ids. - * There are two modes of relationship types here, one is user defined String where this step - * have full control of assigning ids to those and will do so based on size of types. The other mode - * is where types are given as ids straight away (as Integer) where the order is already set and so - * the types will not be sorted by size (which is simply an optimization anyway). - */ - public Object[] getRelationshipTypes( long belowOrEqualToThreshold ) + public RelationshipTypeDistribution getDistribution() { - List result = new ArrayList<>(); - for ( Map.Entry candidate : sortedTypes ) - { - if ( candidate.getValue().longValue() <= belowOrEqualToThreshold ) - { - result.add( candidate.getKey() ); - } - } - - return result.toArray(); + return distribution; } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java new file mode 100644 index 0000000000000..0fcaad65ad9c2 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeDistribution.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.apache.commons.lang3.mutable.MutableLong; + +import java.util.Iterator; +import java.util.Map; +import org.neo4j.helpers.collection.Iterators; + +/** + * Keeps data about how relationships are distributed between different types. + */ +public class RelationshipTypeDistribution implements Iterable> +{ + private final Map.Entry[] sortedTypes; + + public RelationshipTypeDistribution( Map.Entry[] sortedTypes ) + { + this.sortedTypes = sortedTypes; + } + + @Override + public Iterator> iterator() + { + return Iterators.iterator( sortedTypes ); + } + + public int getNumberOfRelationshipTypes() + { + return sortedTypes.length; + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java index 2c515aacbc622..e57503f86db54 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/BaseNumberArray.java @@ -25,7 +25,7 @@ abstract class BaseNumberArray> implements NumberArray { protected final int itemSize; - private final long base; + protected final long base; /** * @param itemSize byte size of each item in this array. diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java index bbb455cb66f1a..510090e5edd60 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCache.java @@ -19,12 +19,23 @@ */ package org.neo4j.unsafe.impl.batchimport.cache; +import org.apache.commons.lang3.mutable.MutableLong; + import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.neo4j.graphdb.Direction; +import org.neo4j.helpers.collection.PrefetchingIterator; +import org.neo4j.kernel.impl.util.IoPrimitiveUtils; +import static java.lang.Long.min; import static java.lang.Math.toIntExact; /** @@ -47,8 +58,8 @@ * so in order to make further changes to N, if another thread accesses N the semantics will no longer hold. * * Since multiple threads are making changes external memory synchronization is also required in between - * a phase of making changes using {@link #getAndPutRelationship(long, Direction, long, boolean)} and e.g - * {@link #visitChangedNodes(NodeChangeVisitor, boolean)}. + * a phase of making changes using {@link #getAndPutRelationship(long, int, Direction, long, boolean)} and e.g + * {@link #visitChangedNodes(NodeChangeVisitor, int)}. */ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable { @@ -75,6 +86,9 @@ public class NodeRelationshipCache implements MemoryStatsVisitor.Visitable private static final int BIG_COUNT_MASK = 0x20000000; private static final int COUNT_FLAGS_MASKS = DENSE_NODE_CHANGED_MASK | SPARSE_NODE_CHANGED_MASK | BIG_COUNT_MASK; private static final int COUNT_MASK = ~COUNT_FLAGS_MASKS; + private static final int TYPE_SIZE = 2; + public static final int GROUP_ENTRY_SIZE = TYPE_SIZE + ID_SIZE/*next*/ + + ID_AND_COUNT_SIZE * Direction.values().length; private ByteArray array; private byte[] chunkChangedArray; @@ -124,12 +138,12 @@ public long incrementCount( long nodeId ) /** * Should only be used by tests */ - void setCount( long nodeId, long count, Direction direction ) + void setCount( long nodeId, long count, int typeId, Direction direction ) { if ( isDense( nodeId ) ) { long relGroupId = all48Bits( array, nodeId, SPARSE_ID_OFFSET ); - relGroupCache.getAndSetCount( relGroupId, direction, count ); + relGroupCache.setCount( relGroupId, typeId, direction, count ); } else { @@ -166,23 +180,23 @@ void setCount( long nodeId, long count, Direction direction ) * so the bigCounts array is shared between all different types of counts, because big counts are so rare * * @param array {@link ByteArray} to set count in - * @param nodeId node id, i.e. array index + * @param index node id, i.e. array index * @param offset offset on that array index (a ByteArray feature) * @param count count to set at this position */ - private void setCount( ByteArray array, long nodeId, int offset, long count ) + private void setCount( ByteArray array, long index, int offset, long count ) { - assertValidCount( nodeId, count ); + assertValidCount( index, count ); if ( count > MAX_SMALL_COUNT ) { - int rawCount = array.getInt( nodeId, offset ); + int rawCount = array.getInt( index, offset ); int slot; if ( rawCount == -1 || !isBigCount( rawCount ) ) { // Allocate a slot in the bigCounts array slot = bigCountsCursor.getAndIncrement(); - array.setInt( nodeId, offset, BIG_COUNT_MASK | slot ); + array.setInt( index, offset, BIG_COUNT_MASK | slot ); } else { @@ -192,7 +206,7 @@ private void setCount( ByteArray array, long nodeId, int offset, long count ) } else { // We can simply set it - array.setInt( nodeId, offset, toIntExact( count ) ); + array.setInt( index, offset, toIntExact( count ) ); } } @@ -257,11 +271,11 @@ private static int countValue( int rawCount ) return rawCount & COUNT_MASK; } - private long incrementCount( ByteArray array, long nodeId, int offset ) + private long incrementCount( ByteArray array, long index, int offset ) { - array = array.at( nodeId ); - long count = getCount( array, nodeId, offset ) + 1; - setCount( array, nodeId, offset, count ); + array = array.at( index ); + long count = getCount( array, index, offset ) + 1; + setCount( array, index, offset, count ); return count; } @@ -291,12 +305,13 @@ private boolean isDense( ByteArray array, long nodeId ) * the {@code direction}. * * @param nodeId node to update relationship head for. + * @param typeId relationship type id. * @param direction {@link Direction} this node represents for this relationship. * @param firstRelId the relationship id which is now the head of this chain. * @param incrementCount as side-effect also increment count for this chain. * @return the previous head of the updated relationship chain. */ - public long getAndPutRelationship( long nodeId, Direction direction, long firstRelId, + public long getAndPutRelationship( long nodeId, int typeId, Direction direction, long firstRelId, boolean incrementCount ) { if ( firstRelId > MAX_RELATIONSHIP_ID ) @@ -320,11 +335,10 @@ public long getAndPutRelationship( long nodeId, Direction direction, long firstR { if ( existingId == EMPTY ) { - existingId = relGroupCache.allocate(); + existingId = relGroupCache.allocate( typeId ); setRelationshipId( array, nodeId, existingId ); - wasChanged = false; // no need to clear when we just allocated it } - return relGroupCache.putRelationship( existingId, direction, firstRelId, incrementCount, wasChanged ); + return relGroupCache.getAndPutRelationship( existingId, typeId, direction, firstRelId, incrementCount ); } // Don't increment count for sparse node since that has already been done in a previous pass @@ -350,6 +364,19 @@ private void markChunkAsChanged( long nodeId, boolean dense ) } } + long calculateNumberOfDenseNodes() + { + long count = 0; + for ( long i = 0; i < highNodeId; i++ ) + { + if ( isDense( i ) ) + { + count++; + } + } + return count; + } + private int chunkOf( long nodeId ) { return toIntExact( nodeId / chunkSize ); @@ -373,7 +400,7 @@ private boolean markAsChanged( ByteArray array, long nodeId, int mask ) return changeBitWasFlipped; } - private static boolean nodeIsChanged( ByteArray array, long nodeId, long mask ) + private boolean nodeIsChanged( ByteArray array, long nodeId, long mask ) { int bits = array.getInt( nodeId, SPARSE_COUNT_OFFSET ); @@ -387,10 +414,11 @@ private static boolean nodeIsChanged( ByteArray array, long nodeId, long mask ) { return false; } - return (bits & mask) != 0; + boolean changeBitIsSet = (bits & mask) != 0; + return changeBitIsSet == forward; } - private void setRelationshipId( ByteArray array, long nodeId, long firstRelId ) + private static void setRelationshipId( ByteArray array, long nodeId, long firstRelId ) { array.set6ByteLong( nodeId, SPARSE_ID_OFFSET, firstRelId ); } @@ -436,7 +464,7 @@ public long getFirstRel( long nodeId, GroupVisitor visitor ) /** * First a note about tracking which nodes have been updated with new relationships by calls to - * {@link #getAndPutRelationship(long, Direction, long, boolean)}: + * {@link #getAndPutRelationship(long, int, Direction, long, boolean)}: * * We use two high bits of the count field in the "main" array to mark whether or not a change * have been made to a node. One bit for a sparse node and one for a dense. Sparse and dense nodes @@ -451,9 +479,34 @@ public long getFirstRel( long nodeId, GroupVisitor visitor ) * * @param forward {@code true} if going forward and having change marked as a set bit, otherwise * change is marked with an unset bit. + * @param denseNodes whether or not this is about dense nodes. If so then some additional cache + * preparation work needs to be done. */ - public void setForwardScan( boolean forward ) + public void setForwardScan( boolean forward, boolean denseNodes ) { + if ( this.forward == forward ) + { + return; + } + + // There's some additional preparations to do for dense nodes between each pass, + // this is because that piece of memory is reused. + if ( denseNodes ) + { + if ( forward ) + { + // Clear relationship group cache and references to it + visitChangedNodes( ( nodeId, array ) -> setRelationshipId( array, nodeId, EMPTY ), + NodeType.NODE_TYPE_DENSE ); + clearChangedChunks( true ); + relGroupCache.clear(); + } + else + { + // Keep the relationship group cache entries, but clear all relationship chain heads + relGroupCache.clearRelationshipIds(); + } + } this.forward = forward; } @@ -466,16 +519,18 @@ public void setForwardScan( boolean forward ) * can be used for the next type import. * * @param nodeId node to get count for. + * @param typeId relationship type id to get count for. * @param direction {@link Direction} to get count for. * @return count (degree) of the requested relationship chain. */ - public long getCount( long nodeId, Direction direction ) + public long getCount( long nodeId, int typeId, Direction direction ) { ByteArray array = this.array.at( nodeId ); - if ( isDense( array, nodeId ) ) + boolean dense = isDense( array, nodeId ); + if ( dense ) { // Indirection into rel group cache long id = getRelationshipId( array, nodeId ); - return id == EMPTY ? 0 : relGroupCache.getAndSetCount( id, direction, 0 ); + return id == EMPTY ? 0 : relGroupCache.getAndResetCount( id, typeId, direction ); } return getCount( array, nodeId, SPARSE_COUNT_OFFSET ); @@ -488,62 +543,85 @@ public interface GroupVisitor * Type can be decided on the outside since there'll be only one type per node. * * @param nodeId node id. - * @param next next relationship group. + * @param typeId relationship type id. * @param out first outgoing relationship id. * @param in first incoming relationship id. * @param loop first loop relationship id. * @return the created relationship group id. */ - long visit( long nodeId, long next, long out, long in, long loop ); + long visit( long nodeId, int typeId, long out, long in, long loop ); } - public static final GroupVisitor NO_GROUP_VISITOR = (nodeId, next, out, in, loop) -> -1; + public static final GroupVisitor NO_GROUP_VISITOR = (nodeId, typeId, out, in, loop) -> -1; private class RelGroupCache implements AutoCloseable, MemoryStatsVisitor.Visitable { - private static final int NEXT_OFFSET = 0; - private static final int BASE_IDS_OFFSET = ID_SIZE; + private static final int TYPE_OFFSET = 0; + private static final int NEXT_OFFSET = TYPE_SIZE; + private static final int BASE_IDS_OFFSET = NEXT_OFFSET + ID_SIZE; // Used for testing high id values. Should always be zero in production + private final byte[] DEFAULT_VALUE = minusOneBytes( GROUP_ENTRY_SIZE ); + private final long chunkSize; private final long base; private final ByteArray array; private final AtomicLong nextFreeId; RelGroupCache( NumberArrayFactory arrayFactory, long chunkSize, long base ) { + this.chunkSize = chunkSize; this.base = base; assert chunkSize > 0; - this.array = arrayFactory.newDynamicByteArray( chunkSize, - minusOneBytes( ID_SIZE/*next*/ + (ID_AND_COUNT_SIZE) * Direction.values().length ) ); + this.array = arrayFactory.newDynamicByteArray( chunkSize, DEFAULT_VALUE ); this.nextFreeId = new AtomicLong( base ); } - private void clearRelationships( ByteArray array, long relGroupId ) + private void clearIndex( ByteArray array, long relGroupId ) { - array.set6ByteLong( relGroupId, directionOffset( Direction.OUTGOING ), EMPTY ); - array.set6ByteLong( relGroupId, directionOffset( Direction.INCOMING ), EMPTY ); - array.set6ByteLong( relGroupId, directionOffset( Direction.BOTH ), EMPTY ); + array.set( relGroupId, DEFAULT_VALUE ); } - /** - * For dense nodes we reset count after reading because we only ever need - * that value once and the piece of memory holding this value will be reused for another - * relationship chain for this node after this point in time, where the count should - * restart from 0. - */ - long getAndSetCount( long id, Direction direction, long newCount ) + long getAndResetCount( long relGroupIndex, int typeId, Direction direction ) + { + long index = rebase( relGroupIndex ); + while ( index != EMPTY ) + { + ByteArray array = this.array.at( index ); + if ( getTypeId( array, index ) == typeId ) + { + int offset = countOffset( direction ); + long count = NodeRelationshipCache.this.getCount( array, index, offset ); + NodeRelationshipCache.this.setCount( array, index, offset, 0 ); + return count; + } + index = getNext( array, index ); + } + return 0; + } + + void setCount( long relGroupIndex, int typeId, Direction direction, long count ) { - id = rebase( id ); - ByteArray array = this.array.at( id ); - if ( id == EMPTY ) + long index = rebase( relGroupIndex ); + while ( index != EMPTY ) { - return 0; + ByteArray array = this.array.at( index ); + if ( getTypeId( array, index ) == typeId ) + { + NodeRelationshipCache.this.setCount( array, index, countOffset( direction ), count ); + break; + } + index = getNext( array, index ); } + } - int offset = countOffset( direction ); - long count = getCount( array, id, offset ); - setCount( array, id, offset, newCount ); - return count; + long getNext( ByteArray array, long index ) + { + return all48Bits( array, index, NEXT_OFFSET ); + } + + int getTypeId( ByteArray array, long index ) + { + return IoPrimitiveUtils.shortToUnsignedInt( array.getShort( index, TYPE_OFFSET ) ); } /** @@ -561,60 +639,94 @@ private long nextFreeId() private long visitGroup( long nodeId, long relGroupIndex, GroupVisitor visitor ) { - long index = rebase( relGroupIndex ); - ByteArray array = this.array.at( index ); - long out = all48Bits( array, index, directionOffset( Direction.OUTGOING ) ); - long in = all48Bits( array, index, directionOffset( Direction.INCOMING ) ); - long loop = all48Bits( array, index, directionOffset( Direction.BOTH ) ); - long next = all48Bits( array, index, NEXT_OFFSET ); - long nextId = out == EMPTY && in == EMPTY && loop == EMPTY ? EMPTY : - visitor.visit( nodeId, next, out, in, loop ); - - // Save the returned next id for later, when the next group for this node is created - // then we know what to point this group's next to. - array.set6ByteLong( index, NEXT_OFFSET, nextId ); - return nextId; + long currentIndex = rebase( relGroupIndex ); + long first = EMPTY; + while ( currentIndex != EMPTY ) + { + ByteArray array = this.array.at( currentIndex ); + long out = all48Bits( array, currentIndex, idOffset( Direction.OUTGOING ) ); + int typeId = getTypeId( array, currentIndex ); + long in = all48Bits( array, currentIndex, idOffset( Direction.INCOMING ) ); + long loop = all48Bits( array, currentIndex, idOffset( Direction.BOTH ) ); + long next = getNext( array, currentIndex ); + long nextId = out == EMPTY && in == EMPTY && loop == EMPTY + ? EMPTY + : visitor.visit( nodeId, typeId, out, in, loop ); + if ( first == EMPTY ) + { // This is the one we return + first = nextId; + } + currentIndex = next; + } + return first; } - private int directionOffset( Direction direction ) + private int idOffset( Direction direction ) { return BASE_IDS_OFFSET + (direction.ordinal() * ID_AND_COUNT_SIZE); } private int countOffset( Direction direction ) { - return directionOffset( direction ) + ID_SIZE; + return idOffset( direction ) + ID_SIZE; } - long allocate() + long allocate( int typeId ) { - return nextFreeId(); + long index = nextFreeId(); + long rebasedIndex = rebase( index ); + ByteArray array = this.array.at( rebasedIndex ); + clearIndex( array, rebasedIndex ); + short shortTypeId = IoPrimitiveUtils.safeCastIntToUnsignedShort( typeId ); + array.setShort( rebasedIndex, TYPE_OFFSET, shortTypeId ); + return index; } - long putRelationship( long relGroupIndex, Direction direction, - long relId, boolean increment, boolean clear ) + private long getAndPutRelationship( long relGroupIndex, int typeId, Direction direction, + long relId, boolean incrementCount ) { long index = rebase( relGroupIndex ); + index = findOrAllocateIndex( index, typeId ); ByteArray array = this.array.at( index ); - int directionOffset = directionOffset( direction ); - long previousId; - if ( clear ) - { - clearRelationships( array, index ); - previousId = EMPTY; - } - else - { - previousId = all48Bits( array, index, directionOffset ); - } + int directionOffset = idOffset( direction ); + long previousId = all48Bits( array, index, directionOffset ); array.set6ByteLong( index, directionOffset, relId ); - if ( increment ) + if ( incrementCount ) { incrementCount( array, index, countOffset( direction ) ); } return previousId; } + private void clearRelationshipIds( ByteArray array, long index ) + { + array.set6ByteLong( index, idOffset( Direction.OUTGOING ), EMPTY ); + array.set6ByteLong( index, idOffset( Direction.INCOMING ), EMPTY ); + array.set6ByteLong( index, idOffset( Direction.BOTH ), EMPTY ); + } + + private long findOrAllocateIndex( long index, int typeId ) + { + long lastIndex = index; + ByteArray array = this.array.at( index ); + while ( index != EMPTY ) + { + lastIndex = index; + array = this.array.at( index ); + int candidateTypeId = getTypeId( array, index ); + if ( candidateTypeId == typeId ) + { + return index; + } + index = getNext( array, index ); + } + + // No such found, create at the end + long newIndex = allocate( typeId ); + array.set6ByteLong( lastIndex, NEXT_OFFSET, newIndex ); + return newIndex; + } + @Override public void close() { @@ -629,6 +741,24 @@ public void acceptMemoryStatsVisitor( MemoryStatsVisitor visitor ) { nullSafeMemoryStatsVisitor( array, visitor ); } + + public void clear() + { + nextFreeId.set( base ); + } + + public void clearRelationshipIds() + { + long highId = rebase( nextFreeId.get() ); + for ( long i = 0; i < highId; ) + { + ByteArray chunk = array.at( i ); + for ( int j = 0; j < chunkSize && i < highId; j++, i++ ) + { + clearRelationshipIds( chunk, i ); + } + } + } } @Override @@ -677,19 +807,23 @@ public interface NodeChangeVisitor /** * Efficiently visits changed nodes, e.g. nodes that have had any relationship chain updated by - * {@link #getAndPutRelationship(long, Direction, long, boolean)}. + * {@link #getAndPutRelationship(long, int, Direction, long, boolean)}. * * @param visitor {@link NodeChangeVisitor} which will be notified about all changes. - * @param denseNodes {@code true} for visiting changed dense nodes, {@code false} for visiting - * changed sparse nodes. + * @param nodeTypes which types to visit (dense/sparse). */ - public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes ) + public void visitChangedNodes( NodeChangeVisitor visitor, int nodeTypes ) { - long mask = changeMask( denseNodes ); - byte chunkMask = chunkChangeMask( denseNodes ); + long denseMask = changeMask( true ); + long sparseMask = changeMask( false ); + byte denseChunkMask = chunkChangeMask( true ); + byte sparseChunkMask = chunkChangeMask( false ); for ( long nodeId = 0; nodeId < highNodeId; ) { - if ( !chunkHasChange( nodeId, chunkMask ) ) + boolean chunkHasChanged = + (NodeType.isDense( nodeTypes ) && chunkHasChange( nodeId, denseChunkMask )) || + (NodeType.isSparse( nodeTypes ) && chunkHasChange( nodeId, sparseChunkMask )); + if ( !chunkHasChanged ) { nodeId += chunkSize; continue; @@ -698,7 +832,11 @@ public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes ) ByteArray chunk = array.at( nodeId ); for ( int i = 0; i < chunkSize && nodeId < highNodeId; i++, nodeId++ ) { - if ( isDense( chunk, nodeId ) == denseNodes && nodeIsChanged( chunk, nodeId, mask ) ) + boolean nodeHasChanged = + (NodeType.isDense( nodeTypes ) && nodeIsChanged( chunk, nodeId, denseMask )) || + (NodeType.isSparse( nodeTypes ) && nodeIsChanged( chunk, nodeId, sparseMask )); + + if ( nodeHasChanged && NodeType.matchesDense( nodeTypes, isDense( array, nodeId ) ) ) { visitor.change( nodeId, chunk ); } @@ -711,7 +849,7 @@ public void visitChangedNodes( NodeChangeVisitor visitor, boolean denseNodes ) * * @param denseNodes {@code true} for clearing marked dense nodes, {@code false} for clearing marked sparse nodes. */ - public void clearChangedChunks( boolean denseNodes ) + private void clearChangedChunks( boolean denseNodes ) { // Executed by a single thread, so no synchronized required byte chunkMask = chunkChangeMask( denseNodes ); @@ -726,4 +864,48 @@ private boolean chunkHasChange( long nodeId, byte chunkMask ) int chunkId = chunkOf( nodeId ); return (chunkChangedArray[chunkId] & chunkMask) != 0; } + + public Iterator> splitRelationshipTypesIntoRounds( + Iterator> allTypes, long freeMemoryForDenseNodeCache ) + { + PrefetchingIterator> peekableAllTypes = + new PrefetchingIterator>() + { + @Override + protected Entry fetchNextOrNull() + { + return allTypes.hasNext() ? allTypes.next() : null; + } + }; + + long numberOfDenseNodes = calculateNumberOfDenseNodes(); + return new PrefetchingIterator>() + { + @Override + protected Collection fetchNextOrNull() + { + long currentSetOfRelationshipsMemoryUsage = 0; + Set typesToImportThisRound = new HashSet<>(); + while ( peekableAllTypes.hasNext() ) + { + // Calculate worst-case scenario + Map.Entry type = peekableAllTypes.peek(); + long relationshipCountForThisType = type.getValue().longValue(); + long maxDenseNodesForThisType = min( numberOfDenseNodes, relationshipCountForThisType * 2/*nodes/rel*/ ); + long memoryUsageForThisType = maxDenseNodesForThisType * NodeRelationshipCache.GROUP_ENTRY_SIZE; + if ( currentSetOfRelationshipsMemoryUsage + memoryUsageForThisType > freeMemoryForDenseNodeCache && + currentSetOfRelationshipsMemoryUsage > 0 ) + { + // OK we've filled what we can into the cache, or worst-case at least + // it's now time to import these types + break; + } + peekableAllTypes.next(); + typesToImportThisRound.add( type.getKey() ); + currentSetOfRelationshipsMemoryUsage += memoryUsageForThisType; + } + return typesToImportThisRound.isEmpty() ? null : typesToImportThisRound; + } + }; + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java new file mode 100644 index 0000000000000..a80a18b7863c4 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/cache/NodeType.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport.cache; + +/** + * Convenient way of selecting nodes based on their dense status. + */ +public class NodeType +{ + public static final int NODE_TYPE_DENSE = 0x1; + public static final int NODE_TYPE_SPARSE = 0x2; + public static final int NODE_TYPE_ALL = NODE_TYPE_DENSE | NODE_TYPE_SPARSE; + + public static boolean isDense( int nodeTypes ) + { + return has( nodeTypes, NODE_TYPE_DENSE ); + } + + public static boolean isSparse( int nodeTypes ) + { + return has( nodeTypes, NODE_TYPE_SPARSE ); + } + + private static boolean has( int nodeTypes, int mask ) + { + return (nodeTypes & mask) != 0; + } + + public static boolean matchesDense( int nodeTypes, boolean isDense ) + { + int mask = isDense ? NODE_TYPE_DENSE : NODE_TYPE_SPARSE; + return has( nodeTypes, mask ); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java deleted file mode 100644 index 580ffe984f5f1..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitter.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.input; - -import java.io.IOException; -import java.util.function.Predicate; -import java.util.function.ToIntFunction; - -import org.neo4j.helpers.collection.PrefetchingIterator; -import org.neo4j.unsafe.impl.batchimport.InputIterator; - -import static java.lang.Integer.max; - -/** - * Takes an {@link InputIterator} and splits up {@link InputRelationship relationships} by type. - * Uses {@link InputCache} to populate (all except the first type) on first pass, then reading from the - * cached relationships per type for all the other types. - */ -public class PerTypeRelationshipSplitter extends PrefetchingIterator> -{ - private static final String MINORITY_TYPE = "minority"; - private final Object[] allRelationshipTypes; - private final InputIterator actual; - private final Predicate minorityRelationshipTypes; - private final ToIntFunction typeToId; - private final InputCache inputCache; - - private int typeCursor; - private boolean allMinority; - - public PerTypeRelationshipSplitter( InputIterator actual, Object[] allRelationshipTypes, - Predicate minorityRelationshipTypes, ToIntFunction typeToId, InputCache inputCache ) - { - this.actual = actual; - this.allRelationshipTypes = allRelationshipTypes; - this.minorityRelationshipTypes = minorityRelationshipTypes; - this.typeToId = typeToId; - this.inputCache = inputCache; - } - - @Override - protected InputIterator fetchNextOrNull() - { - while ( typeCursor < allRelationshipTypes.length ) - { - Object type = allRelationshipTypes[typeCursor++]; - if ( typeCursor == 1 ) - { - if ( minorityRelationshipTypes.test( type ) ) - { - allMinority = true; - return null; - } - - // This is the first relationship type. If we're lucky and this is a new import - // then this type will also represent the type the most relationship are of. - // We'll basically return the actual iterator, but with a filter to only return - // this type. The other relationships will be cached by type. - return new FilteringAndPerTypeCachingInputIterator( actual, type ); - } - - // This isn't the first relationship type. The first pass cached relationships - // per type on disk into InputCache. Simply get the correct one and return. - if ( !minorityRelationshipTypes.test( type ) ) - { - return inputCache.relationships( cacheSubType( type ), true/*delete after use*/ ).iterator(); - } - } - return null; - } - - String cacheSubType( Object type ) - { - return String.valueOf( typeToId.applyAsInt( type ) ); - } - - /** - * @return the type currently being iterated over, e.g. the type that the {@link InputIterator} returned - * from the most recent call to iterates over. - */ - public Object currentType() - { - return allRelationshipTypes[typeCursor-1]; - } - - int highestTypeId() - { - int highest = 0; - for( Object type : allRelationshipTypes ) - { - highest = max( highest, typeToId.applyAsInt( type ) ); - } - return highest; - } - - public InputIterator getMinorityRelationships() - { - return allMinority ? actual : inputCache.relationships( MINORITY_TYPE, true ).iterator(); - } - - public class FilteringAndPerTypeCachingInputIterator extends InputIterator.Delegate - { - private final Object currentType; - // index into this array is actual typeId, which may be 0 - 2^16-1. - private final Receiver[] receivers; - private final Receiver minorityReceiver; - private final InputRelationship[] transport = new InputRelationship[1]; - - @SuppressWarnings( "unchecked" ) - public FilteringAndPerTypeCachingInputIterator( InputIterator actual, Object currentType ) - { - super( actual ); - this.currentType = currentType; - this.receivers = new Receiver[highestTypeId()+1]; - - try - { - for ( Object type : allRelationshipTypes ) - { - if ( type.equals( currentType ) ) - { - // We're iterating over this type, let's not cache it. Also accounted for in the - // receivers array above, which is 1 less than number of types in total. - continue; - } - - // Collect all types that are consider a minority into one set of relationships - if ( !minorityRelationshipTypes.test( type ) ) - { - int typeId = typeToId.applyAsInt( type ); - receivers[typeId] = inputCache.cacheRelationships( cacheSubType( type ) ); - } - } - minorityReceiver = inputCache.cacheRelationships( MINORITY_TYPE ); - } - catch ( IOException e ) - { - throw new InputException( "Error creating a cacher", e ); - } - } - - @Override - protected InputRelationship fetchNextOrNull() - { - while ( true ) - { - InputRelationship candidate = super.fetchNextOrNull(); - if ( candidate == null ) - { - // No more relationships - return null; - } - - Object type = candidate.typeAsObject(); - if ( type.equals( currentType ) ) - { - // This is a relationship of the requested type - return candidate; - } - - // This is a relationships of a different type, cache it - try - { - Receiver receiver = minorityRelationshipTypes.test( type ) ? - minorityReceiver : receivers[typeToId.applyAsInt( type )]; - transport[0] = candidate; - receiver.receive( transport ); - } - catch ( IOException e ) - { - throw new InputException( "Error caching relationship " + candidate, e ); - } - } - } - - @Override - public void close() - { - try - { - for ( Receiver receiver : receivers ) - { - if ( receiver != null ) - { - receiver.close(); - } - } - minorityReceiver.close(); - } - catch ( IOException e ) - { - throw new InputException( "Error closing cacher", e ); - } - - // This will delegate to the actual iterator and so close the external input iterator - super.close(); - } - } -} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java index d0aa8200e9574..5661c9d9134c5 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/staging/IteratorBatcherStep.java @@ -22,6 +22,7 @@ import java.lang.reflect.Array; import java.util.Arrays; import java.util.Iterator; +import java.util.function.Predicate; import org.neo4j.unsafe.impl.batchimport.Configuration; @@ -33,29 +34,36 @@ public abstract class IteratorBatcherStep extends IoProducerStep private final Iterator data; private final Class itemClass; protected long cursor; + private final Predicate filter; - public IteratorBatcherStep( StageControl control, Configuration config, Iterator data, Class itemClass ) + public IteratorBatcherStep( StageControl control, Configuration config, Iterator data, Class itemClass, + Predicate filter ) { super( control, config ); this.data = data; this.itemClass = itemClass; + this.filter = filter; } @Override protected Object nextBatchOrNull( long ticket, int batchSize ) { - if ( !data.hasNext() ) - { - return null; - } - @SuppressWarnings( "unchecked" ) T[] batch = (T[]) Array.newInstance( itemClass, batchSize ); int i = 0; - for ( ; i < batchSize && data.hasNext(); i++ ) + for ( ; i < batchSize && data.hasNext(); ) + { + T candidate = data.next(); + if ( filter.test( candidate ) ) + { + batch[i++] = candidate; + cursor++; + } + } + + if ( i == 0 ) { - batch[i] = data.next(); - cursor++; + return null; // marks the end } return i == batchSize ? batch : Arrays.copyOf( batch, i ); } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java index 9ffaa03de8a98..08db644fccbad 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipLinkbackStageTest.java @@ -26,6 +26,7 @@ import org.neo4j.kernel.impl.store.id.IdGeneratorImpl; import org.neo4j.kernel.impl.store.record.RelationshipRecord; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache; +import org.neo4j.unsafe.impl.batchimport.cache.NodeType; import org.neo4j.unsafe.impl.batchimport.cache.NumberArrayFactory; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionMonitors; import org.neo4j.unsafe.impl.batchimport.staging.ExecutionSupervisors; @@ -41,7 +42,7 @@ public void reservedIdIsSkipped() throws Exception long highId = 5; RelationshipStore store = StoreWithReservedId.newRelationshipStoreMock( highId ); RelationshipLinkbackStage stage = new RelationshipLinkbackStage( "Test", - Configuration.DEFAULT, store, newCache(), 0, highId, false ); + Configuration.DEFAULT, store, newCache(), 0, highId, NodeType.NODE_TYPE_SPARSE ); ExecutionSupervisors.superviseExecution( ExecutionMonitors.invisible(), Configuration.DEFAULT, stage ); diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java index 48f0594b15a3e..7078fea7606fd 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/RelationshipTypeCheckerStepTest.java @@ -19,13 +19,16 @@ */ package org.neo4j.unsafe.impl.batchimport; +import org.apache.commons.lang3.mutable.MutableLong; import org.junit.Rule; import org.junit.Test; import org.mockito.InOrder; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; import java.util.TreeSet; import org.neo4j.kernel.impl.store.record.RelationshipRecord; @@ -39,6 +42,8 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; + +import static org.neo4j.helpers.collection.Iterables.reverse; import static org.neo4j.helpers.collection.Iterators.loop; import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; import static org.neo4j.unsafe.impl.batchimport.input.Group.GLOBAL; @@ -75,12 +80,10 @@ private void shouldReturnRelationshipTypeIdsInReverseOrderOfTokenCreation( boole step.done(); // THEN - Object[] processed = step.getRelationshipTypes( 100 ); - InOrder inOrder = inOrder( repository ); - for ( Object type : reversed( processed ) ) + for ( Entry type : reverse( step.getDistribution() ) ) { - inOrder.verify( repository ).getOrCreateId( type ); + inOrder.verify( repository ).getOrCreateId( type.getKey() ); } inOrder.verifyNoMoreInteractions(); } @@ -101,11 +104,11 @@ public void shouldReturnRelationshipTypesInDescendingOrder() throws Throwable // THEN TreeSet expected = idsOf( relationships ); - Object[] processed = step.getRelationshipTypes( 100 ); - int i = 0; + Iterator> processed = step.getDistribution().iterator(); for ( Object expectedType : loop( expected.descendingIterator() ) ) { - assertEquals( expectedType, processed[i++] ); + Entry entry = processed.next(); + assertEquals( expectedType, entry.getKey() ); } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java index 2838dfbd7cc09..e7048e23782a0 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/cache/NodeRelationshipCacheTest.java @@ -19,6 +19,7 @@ */ package org.neo4j.unsafe.impl.batchimport.cache; +import org.apache.commons.lang3.mutable.MutableLong; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -28,20 +29,28 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveLongObjectMap; import org.neo4j.collection.primitive.PrimitiveLongSet; import org.neo4j.graphdb.Direction; +import org.neo4j.helpers.collection.Iterators; +import org.neo4j.helpers.collection.Pair; import org.neo4j.test.rule.RandomRule; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.GroupVisitor; import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache.NodeChangeVisitor; import static java.lang.Math.max; +import static java.lang.Math.toIntExact; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -126,30 +135,31 @@ public void shouldObserveFirstRelationshipAsEmptyInEachDirection() throws Except // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base ); int nodes = 100; + int typeId = 5; Direction[] directions = Direction.values(); GroupVisitor groupVisitor = mock( GroupVisitor.class ); - cache.setForwardScan( true ); + cache.setForwardScan( true, true ); cache.setHighNodeId( nodes+1 ); for ( int i = 0; i < nodes; i++ ) { assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) ); cache.incrementCount( i ); - long previous = cache.getAndPutRelationship( i, directions[i % directions.length], + long previous = cache.getAndPutRelationship( i, typeId, directions[i % directions.length], random.nextInt( 1_000_000 ), true ); assertEquals( -1L, previous ); } // WHEN - cache.setForwardScan( false ); + cache.setForwardScan( false, true ); for ( int i = 0; i < nodes; i++ ) { - long previous = cache.getAndPutRelationship( i, directions[i % directions.length], + long previous = cache.getAndPutRelationship( i, typeId, directions[i % directions.length], random.nextInt( 1_000_000 ), false ); assertEquals( -1L, previous ); } // THEN - cache.setForwardScan( true ); + cache.setForwardScan( true, true ); for ( int i = 0; i < nodes; i++ ) { assertEquals( -1L, cache.getFirstRel( nodes, groupVisitor ) ); @@ -162,19 +172,20 @@ public void shouldResetCountAfterGetOnDenseNodes() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.AUTO, 1, 100, base ); long nodeId = 0; + int typeId = 3; cache.setHighNodeId( 1 ); cache.incrementCount( nodeId ); cache.incrementCount( nodeId ); - cache.getAndPutRelationship( nodeId, OUTGOING, 10, true ); - cache.getAndPutRelationship( nodeId, OUTGOING, 12, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 10, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 12, true ); assertTrue( cache.isDense( nodeId ) ); // WHEN - long count = cache.getCount( nodeId, OUTGOING ); + long count = cache.getCount( nodeId, typeId, OUTGOING ); assertEquals( 2, count ); // THEN - assertEquals( 0, cache.getCount( nodeId, OUTGOING ) ); + assertEquals( 0, cache.getCount( nodeId, typeId, OUTGOING ) ); } @Test @@ -185,10 +196,11 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception // WHEN long nodeId = 1_000_000 - 1; - cache.setHighNodeId( nodeId+1 ); + int typeId = 10; + cache.setHighNodeId( nodeId + 1 ); Direction direction = Direction.OUTGOING; long relId = 10; - cache.getAndPutRelationship( nodeId, direction, relId, false ); + cache.getAndPutRelationship( nodeId, typeId, direction, relId, false ); // THEN assertEquals( relId, cache.getFirstRel( nodeId, mock( GroupVisitor.class ) ) ); @@ -198,6 +210,7 @@ public void shouldGetAndPutRelationshipAroundChunkEdge() throws Exception public void shouldPutRandomStuff() throws Exception { // GIVEN + int typeId = 10; int nodes = 10_000; PrimitiveLongObjectMap key = Primitive.longObjectMap( nodes ); cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 1000, base ); @@ -219,7 +232,7 @@ public void shouldPutRandomStuff() throws Exception boolean dense = cache.isDense( nodeId ); Direction direction = random.among( Direction.values() ); long relationshipId = random.nextLong( 1_000_000 ); - long previousHead = cache.getAndPutRelationship( nodeId, direction, relationshipId, false ); + long previousHead = cache.getAndPutRelationship( nodeId, typeId, direction, relationshipId, false ); long[] keyIds = key.get( nodeId ); int keyIndex = dense ? direction.ordinal() : 0; if ( keyIds == null ) @@ -239,30 +252,32 @@ public void shouldPut6ByteRelationshipIds() throws Exception long sparseNode = 0; long denseNode = 1; long relationshipId = (1L << 48) - 2; + int typeId = 10; cache.setHighNodeId( 2 ); cache.incrementCount( denseNode ); // WHEN - assertEquals( -1L, cache.getAndPutRelationship( sparseNode, OUTGOING, relationshipId, false ) ); - assertEquals( -1L, cache.getAndPutRelationship( denseNode, OUTGOING, relationshipId, false ) ); + assertEquals( -1L, cache.getAndPutRelationship( sparseNode, typeId, OUTGOING, relationshipId, false ) ); + assertEquals( -1L, cache.getAndPutRelationship( denseNode, typeId, OUTGOING, relationshipId, false ) ); // THEN - assertEquals( relationshipId, cache.getAndPutRelationship( sparseNode, OUTGOING, 1, false ) ); - assertEquals( relationshipId, cache.getAndPutRelationship( denseNode, OUTGOING, 1, false ) ); + assertEquals( relationshipId, cache.getAndPutRelationship( sparseNode, typeId, OUTGOING, 1, false ) ); + assertEquals( relationshipId, cache.getAndPutRelationship( denseNode, typeId, OUTGOING, 1, false ) ); } @Test public void shouldFailFastIfTooBigRelationshipId() throws Exception { // GIVEN + int typeId = 10; cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); cache.setHighNodeId( 1 ); // WHEN - cache.getAndPutRelationship( 0, OUTGOING, (1L << 48) - 2, false ); + cache.getAndPutRelationship( 0, typeId, OUTGOING, (1L << 48) - 2, false ); try { - cache.getAndPutRelationship( 0, OUTGOING, (1L << 48) - 1, false ); + cache.getAndPutRelationship( 0, typeId, OUTGOING, (1L << 48) - 1, false ); fail( "Should fail" ); } catch ( IllegalArgumentException e ) @@ -277,6 +292,7 @@ public void shouldVisitChangedNodes() throws Exception { // GIVEN int nodes = 10; + int typeId = 10; cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 2, 100, base ); cache.setHighNodeId( nodes ); for ( long nodeId = 0; nodeId < nodes; nodeId++ ) @@ -292,7 +308,7 @@ public void shouldVisitChangedNodes() throws Exception for ( int i = 0; i < nodes / 2; i++ ) { long nodeId = random.nextLong( nodes ); - cache.getAndPutRelationship( nodeId, Direction.OUTGOING, random.nextLong( 1_000_000 ), false ); + cache.getAndPutRelationship( nodeId, typeId, Direction.OUTGOING, random.nextLong( 1_000_000 ), false ); boolean dense = cache.isDense( nodeId ); (dense ? keyDenseChanged : keySparseChanged).add( nodeId ); } @@ -304,7 +320,7 @@ public void shouldVisitChangedNodes() throws Exception // THEN (sparse) assertTrue( "Unexpected sparse change reported for " + nodeId, keySparseChanged.remove( nodeId ) ); }; - cache.visitChangedNodes( visitor, false/*sparse*/ ); + cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_SPARSE ); assertTrue( "There was " + keySparseChanged.size() + " expected sparse changes that weren't reported", keySparseChanged.isEmpty() ); } @@ -316,7 +332,7 @@ public void shouldVisitChangedNodes() throws Exception // THEN (dense) assertTrue( "Unexpected dense change reported for " + nodeId, keyDenseChanged.remove( nodeId ) ); }; - cache.visitChangedNodes( visitor, true/*dense*/ ); + cache.visitChangedNodes( visitor, NodeType.NODE_TYPE_DENSE ); assertTrue( "There was " + keyDenseChanged.size() + " expected dense changes that weren reported", keyDenseChanged.isEmpty() ); } @@ -329,8 +345,9 @@ public void shouldFailFastOnTooHighCountOnNode() throws Exception cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 10, 100, base ); long nodeId = 5; long count = NodeRelationshipCache.MAX_COUNT - 1; + int typeId = 10; cache.setHighNodeId( 10 ); - cache.setCount( nodeId, count, OUTGOING ); + cache.setCount( nodeId, count, typeId, OUTGOING ); // WHEN cache.incrementCount( nodeId ); @@ -351,52 +368,53 @@ public void shouldKeepNextGroupIdForNextRound() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 0; - cache.setHighNodeId( nodeId+1 ); + int typeId = 10; + cache.setHighNodeId( nodeId + 1 ); cache.incrementCount( nodeId ); GroupVisitor groupVisitor = mock( GroupVisitor.class ); - when( groupVisitor.visit( anyLong(), anyLong(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L ); + when( groupVisitor.visit( anyLong(), anyInt(), anyLong(), anyLong(), anyLong() ) ).thenReturn( 1L, 2L, 3L ); long firstRelationshipGroupId; { // WHEN importing the first type long relationshipId = 10; - cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, relationshipId, true ); firstRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor ); // THEN assertEquals( 1L, firstRelationshipGroupId ); - verify( groupVisitor ).visit( nodeId, -1L, relationshipId, -1L, -1L ); + verify( groupVisitor ).visit( nodeId, typeId, relationshipId, -1L, -1L ); // Also simulate going back again ("clearing" of the cache requires this) - cache.setForwardScan( false ); - cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, false ); - cache.setForwardScan( true ); + cache.setForwardScan( false, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, relationshipId, false ); + cache.setForwardScan( true, true ); } long secondRelationshipGroupId; { // WHEN importing the second type long relationshipId = 11; - cache.getAndPutRelationship( nodeId, INCOMING, relationshipId, true ); + cache.getAndPutRelationship( nodeId, typeId, INCOMING, relationshipId, true ); secondRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor ); // THEN assertEquals( 2L, secondRelationshipGroupId ); - verify( groupVisitor ).visit( nodeId, firstRelationshipGroupId, -1, relationshipId, -1L ); + verify( groupVisitor ).visit( nodeId, typeId, -1, relationshipId, -1L ); // Also simulate going back again ("clearing" of the cache requires this) - cache.setForwardScan( false ); - cache.getAndPutRelationship( nodeId, OUTGOING, relationshipId, false ); - cache.setForwardScan( true ); + cache.setForwardScan( false, true ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, relationshipId, false ); + cache.setForwardScan( true, true ); } { // WHEN importing the third type long relationshipId = 10; - cache.getAndPutRelationship( nodeId, BOTH, relationshipId, true ); + cache.getAndPutRelationship( nodeId, typeId, BOTH, relationshipId, true ); long thirdRelationshipGroupId = cache.getFirstRel( nodeId, groupVisitor ); assertEquals( 3L, thirdRelationshipGroupId ); - verify( groupVisitor ).visit( nodeId, secondRelationshipGroupId, -1L, -1L, relationshipId ); + verify( groupVisitor ).visit( nodeId, typeId, -1L, -1L, relationshipId ); } } @@ -406,11 +424,12 @@ public void shouldHaveSparseNodesWithBigCounts() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 1; + int typeId = 10; cache.setHighNodeId( nodeId + 1 ); // WHEN long highCount = NodeRelationshipCache.MAX_COUNT - 100; - cache.setCount( nodeId, highCount, OUTGOING ); + cache.setCount( nodeId, highCount, typeId, OUTGOING ); long nextHighCount = cache.incrementCount( nodeId ); // THEN @@ -427,30 +446,118 @@ public void shouldHaveDenseNodesWithBigCounts() throws Exception // GIVEN cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1, 100, base ); long nodeId = 1; + int typeId = 10; cache.setHighNodeId( nodeId + 1 ); - cache.setCount( nodeId, 2, OUTGOING ); // surely dense now - cache.getAndPutRelationship( nodeId, OUTGOING, 1, true ); - cache.getAndPutRelationship( nodeId, INCOMING, 2, true ); + cache.setCount( nodeId, 2, typeId, OUTGOING ); // surely dense now + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 1, true ); + cache.getAndPutRelationship( nodeId, typeId, INCOMING, 2, true ); // WHEN long highCountOut = NodeRelationshipCache.MAX_COUNT - 100; long highCountIn = NodeRelationshipCache.MAX_COUNT - 50; - cache.setCount( nodeId, highCountOut, OUTGOING ); - cache.setCount( nodeId, highCountIn, INCOMING ); - cache.getAndPutRelationship( nodeId, OUTGOING, 1, true /*increment count*/ ); - cache.getAndPutRelationship( nodeId, INCOMING, 2, true /*increment count*/ ); + cache.setCount( nodeId, highCountOut, typeId, OUTGOING ); + cache.setCount( nodeId, highCountIn, typeId, INCOMING ); + cache.getAndPutRelationship( nodeId, typeId, OUTGOING, 1, true /*increment count*/ ); + cache.getAndPutRelationship( nodeId, typeId, INCOMING, 2, true /*increment count*/ ); + + // THEN + assertEquals( highCountOut + 1, cache.getCount( nodeId, typeId, OUTGOING ) ); + assertEquals( highCountIn + 1, cache.getCount( nodeId, typeId, INCOMING ) ); + } + + @Test + public void shouldCacheMultipleDenseNodeRelationshipHeads() throws Exception + { + // GIVEN + cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, 1 ); + cache.setHighNodeId( 10 ); + long nodeId = 3; + cache.setCount( nodeId, 10, /*these do not matter ==>*/ 0, OUTGOING ); + + // WHEN + Map,Long> firstRelationshipIds = new HashMap<>(); + int typeCount = 3; + for ( int typeId = 0, relationshipId = 0; typeId < typeCount; typeId++ ) + { + for ( Direction direction : Direction.values() ) + { + long firstRelationshipId = relationshipId++; + cache.getAndPutRelationship( nodeId, typeId, direction, firstRelationshipId, true ); + firstRelationshipIds.put( Pair.of( typeId, direction ), firstRelationshipId ); + } + } + AtomicInteger visitCount = new AtomicInteger(); + GroupVisitor visitor = new GroupVisitor() + { + @Override + public long visit( long nodeId, int typeId, long out, long in, long loop ) + { + visitCount.incrementAndGet(); + assertEquals( firstRelationshipIds.get( Pair.of( typeId, OUTGOING ) ).longValue(), out ); + assertEquals( firstRelationshipIds.get( Pair.of( typeId, INCOMING ) ).longValue(), in ); + assertEquals( firstRelationshipIds.get( Pair.of( typeId, BOTH ) ).longValue(), loop ); + return 0; + } + }; + cache.getFirstRel( nodeId, visitor ); // THEN - assertEquals( highCountOut + 1, cache.getCount( nodeId, OUTGOING ) ); - assertEquals( highCountIn + 1, cache.getCount( nodeId, INCOMING ) ); + assertEquals( typeCount, visitCount.get() ); + } + + @Test + public void shouldSplitUpRelationshipTypesInBatches() throws Exception + { + // GIVEN + int denseNodeThreshold = 5; + int numberOfNodes = 100; + int numberOfTypes = 10; + cache = new NodeRelationshipCache( NumberArrayFactory.HEAP, denseNodeThreshold ); + cache.setHighNodeId( numberOfNodes + 1 ); + Direction[] directions = Direction.values(); + for ( int i = 0; i < numberOfNodes; i++ ) + { + int count = random.nextInt( 1, denseNodeThreshold * 2 ); + cache.setCount( i, count, random.nextInt( numberOfTypes ), random.among( directions ) ); + } + int numberOfDenseNodes = toIntExact( cache.calculateNumberOfDenseNodes() ); + Map types = new HashMap<>(); + for ( int i = 0; i < numberOfTypes; i++ ) + { + types.put( "TYPE" + i, new MutableLong( random.nextInt( 1, 1_000 ) ) ); + } + + // WHEN enough memory for all types + { + long memory = numberOfDenseNodes * numberOfTypes * NodeRelationshipCache.GROUP_ENTRY_SIZE; + Collection fits = + Iterators.single( cache.splitRelationshipTypesIntoRounds( types.entrySet().iterator(), memory ) ); + // THEN + assertEquals( types.size(), fits.size() ); + } + + // and WHEN less than enough memory for all types + { + int memory = numberOfDenseNodes * numberOfTypes / 5 * NodeRelationshipCache.GROUP_ENTRY_SIZE; + int total = 0; + Iterator> rounds = + cache.splitRelationshipTypesIntoRounds( types.entrySet().iterator(), memory ); + while ( rounds.hasNext() ) + { + Collection round = rounds.next(); + total += round.size(); + } + assertEquals( types.size(), total ); + } } private void testNode( NodeRelationshipCache link, long node, Direction direction ) { - long count = link.getCount( node, direction ); - assertEquals( -1, link.getAndPutRelationship( node, direction, 5, false ) ); - assertEquals( 5, link.getAndPutRelationship( node, direction, 10, false ) ); - assertEquals( count, link.getCount( node, direction ) ); + int typeId = 0; // doesn't matter here because it's all sparse + long count = link.getCount( node, typeId, direction ); + assertEquals( -1, link.getAndPutRelationship( node, typeId, direction, 5, false ) ); + assertEquals( 5, link.getAndPutRelationship( node, typeId, direction, 10, false ) ); + assertEquals( count, link.getCount( node, typeId, direction ) ); } private long findNode( NodeRelationshipCache link, long nodeCount, boolean isDense ) diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java deleted file mode 100644 index 9aea0d0a3dea0..0000000000000 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.unsafe.impl.batchimport.input; - -import org.junit.Rule; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.neo4j.io.fs.DefaultFileSystemAbstraction; -import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; -import org.neo4j.test.rule.RandomRule; -import org.neo4j.test.rule.TestDirectory; -import org.neo4j.unsafe.impl.batchimport.Configuration; -import org.neo4j.unsafe.impl.batchimport.InputIterable; -import org.neo4j.unsafe.impl.batchimport.InputIterator; - -import static org.junit.Assert.assertEquals; -import static org.neo4j.helpers.collection.Iterators.filter; -import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES; -import static org.neo4j.unsafe.impl.batchimport.input.SimpleInputIteratorWrapper.wrap; - -public class PerTypeRelationshipSplitterTest -{ - @Rule - public final RandomRule random = new RandomRule().withSeed( 1460373085111L ); - @Rule - public final TestDirectory directory = TestDirectory.testDirectory(); - - @Test - public void shouldReturnTypesOneByOne() throws Exception - { - // GIVEN - Collection expected = randomRelationships(); - InputIterable relationships = wrap( "test", expected ); - InputCache inputCache = new InputCache( new DefaultFileSystemAbstraction(), directory.directory(), - StandardV3_0.RECORD_FORMATS, Configuration.DEFAULT ); - PerTypeRelationshipSplitter perType = new PerTypeRelationshipSplitter( relationships.iterator(), - types( expected ), type -> false, type -> Integer.parseInt( type.toString() ), inputCache ); - - // WHEN - Set all = new HashSet<>(); - while ( perType.hasNext() ) - { - try ( InputIterator relationshipsOfThisType = perType.next() ) - { - // THEN - Object type = perType.currentType(); - Collection expectedRelationshipsOfThisType = nodesOf( filter( - relationship -> relationship.typeAsObject().equals( type ), expected.iterator() ) ); - assertEquals( expectedRelationshipsOfThisType, nodesOf( relationshipsOfThisType ) ); - all.addAll( expectedRelationshipsOfThisType ); - } - } - - assertEquals( nodesOf( expected.iterator() ), all ); - inputCache.close(); - } - - /** - * Get the nodes of the relationships. We use those to identify relationships, since they have no ID - * and no equals method (which they don't really need). - * - * @param relationship {@link InputRelationship} to get node ids from. - * @return {@link Collection} of node ids from {@link InputRelationship} relationships. - */ - private Collection nodesOf( Iterator relationship ) - { - Collection nodes = new HashSet<>(); - while ( relationship.hasNext() ) - { - nodes.add( relationship.next().startNode() ); - } - return nodes; - } - - private Object[] types( Collection expected ) - { - Set types = new HashSet<>(); - for ( InputRelationship relationship : expected ) - { - types.add( relationship.typeAsObject() ); - } - return types.toArray(); - } - - private Collection randomRelationships() - { - Collection result = new ArrayList<>(); - int count = 100; - Group group = Group.GLOBAL; - boolean typeIds = random.nextBoolean(); - for ( int i = 0; i < count; i++ ) - { - int typeId = random.nextInt( 5 ); - Object node = (long)i; - InputRelationship relationship = new InputRelationship( "test", i, i, NO_PROPERTIES, null, - group, node, group, node, - typeIds ? null : String.valueOf( typeId ), - typeIds ? typeId : null ); - result.add( relationship ); - } - return result; - } -}