From b5bfe6ebe1387c3ef9c6b1b83cbf03e4a03dca30 Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Tue, 11 Apr 2017 14:55:38 +0200 Subject: [PATCH] Importer can write batches of records in parallel This means that there may be multiple threads writing records into the stores as the final step in most stages. This doesn't mean that there will be multiple threads flushing pages to disk since flushing is separated from writing records. There have been some amount of changes to which steps does what and generally more work have been moved into steps that are naturally parallelizable already. For example property encoder step now also creates property records instead of the writer. For this purpose the IdRangeIterator moved from ha component and IdSequence got a new method nextIdBatch, which some implementations already had. --- .../impl/store/id/validation/IdValidator.java | 6 + .../transaction/state/PropertyCreator.java | 10 + .../neo4j/unsafe/impl/batchimport/Batch.java | 15 +- .../batchimport/EntityStoreUpdaterStep.java | 95 ++------- .../unsafe/impl/batchimport/HighestId.java | 49 +++++ .../unsafe/impl/batchimport/NodeStage.java | 2 +- .../impl/batchimport/PropertyEncoderStep.java | 186 +++++++++++++++--- .../impl/batchimport/RelationshipStage.java | 6 +- .../RelativeIdRecordAllocator.java | 5 + .../impl/batchimport/UpdateRecordsStep.java | 2 +- .../store/BatchingIdGeneratorFactory.java | 2 +- .../store/BatchingPropertyRecordAccess.java | 4 +- .../org/neo4j/test/rule/NeoStoresRule.java | 86 ++++++-- .../impl/batchimport/BatchCollector.java | 41 ++++ .../impl/batchimport/HighestIdTest.java | 93 +++++++++ .../batchimport/PropertyEncoderStepTest.java | 163 ++++++++------- 16 files changed, 560 insertions(+), 205 deletions(-) create mode 100644 community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/HighestId.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/BatchCollector.java create mode 100644 community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/HighestIdTest.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/validation/IdValidator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/validation/IdValidator.java index e54f4e37e51d9..3783319ff25d2 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/validation/IdValidator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/store/id/validation/IdValidator.java @@ -88,4 +88,10 @@ public static void assertIdWithinCapacity( long id, long maxId ) throw new IdCapacityExceededException( id, maxId ); } } + + public static boolean hasReservedIdInRange( long startIdInclusive, long endIdExclusive ) + { + return startIdInclusive <= IdGeneratorImpl.INTEGER_MINUS_ONE && + endIdExclusive > IdGeneratorImpl.INTEGER_MINUS_ONE; + } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/PropertyCreator.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/PropertyCreator.java index 9ea061afa38e5..9f49af5a08bf7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/PropertyCreator.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/state/PropertyCreator.java @@ -20,6 +20,7 @@ package org.neo4j.kernel.impl.transaction.state; import java.util.Iterator; +import java.util.function.Consumer; import org.neo4j.kernel.impl.store.DynamicRecordAllocator; import org.neo4j.kernel.impl.store.PropertyStore; @@ -196,6 +197,13 @@ public PropertyBlock encodeValue( PropertyBlock block, int propertyKey, Object v public long createPropertyChain( PrimitiveRecord owner, Iterator properties, RecordAccess propertyRecords ) + { + return createPropertyChain( owner, properties, propertyRecords, p -> {} ); + } + + public long createPropertyChain( PrimitiveRecord owner, Iterator properties, + RecordAccess propertyRecords, + Consumer createdPropertyRecords ) { if ( properties == null || !properties.hasNext() ) { @@ -203,6 +211,7 @@ public long createPropertyChain( PrimitiveRecord owner, Iterator } PropertyRecord currentRecord = propertyRecords.create( propertyRecordIdGenerator.nextId(), owner ) .forChangingData(); + createdPropertyRecords.accept( currentRecord ); currentRecord.setInUse( true ); currentRecord.setCreated(); PropertyRecord firstRecord = currentRecord; @@ -216,6 +225,7 @@ public long createPropertyChain( PrimitiveRecord owner, Iterator // Create new record long propertyId = propertyRecordIdGenerator.nextId(); currentRecord = propertyRecords.create( propertyId, owner ).forChangingData(); + createdPropertyRecords.accept( currentRecord ); currentRecord.setInUse( true ); currentRecord.setCreated(); // Set up links diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java index 4b375b0848446..72d06134fb5e9 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/Batch.java @@ -20,7 +20,7 @@ package org.neo4j.unsafe.impl.batchimport; import org.neo4j.kernel.impl.store.record.PrimitiveRecord; -import org.neo4j.kernel.impl.store.record.PropertyBlock; +import org.neo4j.kernel.impl.store.record.PropertyRecord; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.staging.Step; @@ -39,15 +39,12 @@ public class Batch public final INPUT[] input; public RECORD[] records; - public int[] propertyBlocksLengths; - // This is a special succer. All property blocks for ALL records in this batch sits in this - // single array. The number of property blocks for a given record sits in propertyBlocksLengths - // using the same index as the record. So it's a collective size suitable for complete looping - // over the batch. - public PropertyBlock[] propertyBlocks; - // Used by relationship staged to query idMapper and store ids here + + public PropertyRecord[][] propertyRecords; + public int numberOfProperties; + + // Used by relationship stages to query idMapper and store ids here public long[] ids; - public boolean parallelizableWithPrevious; public long firstRecordId; public long[][] labels; diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java index 46cfe839ac65e..ecb9655a31587 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/EntityStoreUpdaterStep.java @@ -19,24 +19,16 @@ */ package org.neo4j.unsafe.impl.batchimport; -import java.util.Iterator; - -import org.neo4j.kernel.impl.store.AbstractDynamicStore; import org.neo4j.kernel.impl.store.CommonAbstractStore; import org.neo4j.kernel.impl.store.PropertyStore; -import org.neo4j.kernel.impl.store.PropertyType; import org.neo4j.kernel.impl.store.StoreHeader; -import org.neo4j.kernel.impl.store.record.DynamicRecord; import org.neo4j.kernel.impl.store.record.PrimitiveRecord; import org.neo4j.kernel.impl.store.record.PropertyBlock; import org.neo4j.kernel.impl.store.record.PropertyRecord; -import org.neo4j.kernel.impl.transaction.state.PropertyCreator; -import org.neo4j.kernel.impl.util.ReusableIteratorCostume; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import org.neo4j.unsafe.impl.batchimport.store.BatchingPropertyRecordAccess; import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor; import static java.lang.Math.max; @@ -62,24 +54,18 @@ public interface Monitor private final CommonAbstractStore entityStore; private final PropertyStore propertyStore; private final IoMonitor ioMonitor; - private final PropertyCreator propertyCreator; private final Monitor monitor; - private long highestId; - - // Reusable instances for less GC - private final BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess(); - private final ReusableIteratorCostume blockIterator = new ReusableIteratorCostume<>(); + private final HighestId highestId = new HighestId(); EntityStoreUpdaterStep( StageControl control, Configuration config, CommonAbstractStore entityStore, PropertyStore propertyStore, IoMonitor ioMonitor, Monitor monitor ) { - super( control, "v", config, 1, ioMonitor ); + super( control, "v", config, 0, ioMonitor ); this.entityStore = entityStore; this.propertyStore = propertyStore; this.monitor = monitor; - this.propertyCreator = new PropertyCreator( propertyStore, null ); this.ioMonitor = ioMonitor; this.ioMonitor.reset(); } @@ -87,9 +73,6 @@ public interface Monitor @Override protected void process( Batch batch, BatchSender sender ) { - // Clear reused data structures - propertyRecords.close(); - // Write the entity records, and at the same time allocate property records for its property blocks. long highestId = 0; RECORD[] records = batch.records; @@ -98,31 +81,13 @@ protected void process( Batch batch, BatchSender sender ) return; } - int propertyBlockCursor = 0, skipped = 0; + int skipped = 0; for ( int i = 0; i < records.length; i++ ) { RECORD record = records[i]; - int propertyBlockCount = batch.propertyBlocksLengths[i]; if ( record != null ) { - INPUT input = batch.input[i]; - if ( input.hasFirstPropertyId() ) - { - record.setNextProp( input.firstPropertyId() ); - } - else - { - if ( propertyBlockCount > 0 ) - { - reassignDynamicRecordIds( propertyStore, batch.propertyBlocks, - propertyBlockCursor, propertyBlockCount ); - long firstProp = propertyCreator.createPropertyChain( record, - blockIterator.dressArray( batch.propertyBlocks, propertyBlockCursor, propertyBlockCount ), - propertyRecords ); - record.setNextProp( firstProp ); - } - } highestId = max( highestId, record.getId() ); entityStore.prepareForCommit( record ); entityStore.updateRecord( record ); @@ -132,55 +97,27 @@ protected void process( Batch batch, BatchSender sender ) // of number of bad relationships. Just don't import this relationship. skipped++; } - propertyBlockCursor += propertyBlockCount; } - this.highestId = highestId; - // Write all the created property records. - for ( PropertyRecord propertyRecord : propertyRecords.records() ) - { - propertyStore.updateRecord( propertyRecord ); - } + this.highestId.offer( highestId ); + writePropertyRecords( batch.propertyRecords, propertyStore ); monitor.entitiesWritten( records[0].getClass(), records.length - skipped ); - monitor.propertiesWritten( propertyBlockCursor ); + monitor.propertiesWritten( batch.numberOfProperties ); } - static void reassignDynamicRecordIds( PropertyStore propertyStore, PropertyBlock[] blocks, int offset, int length ) + static void writePropertyRecords( PropertyRecord[][] batch, PropertyStore propertyStore ) { - // OK, so here we have property blocks, potentially referring to DynamicRecords. The DynamicRecords - // have ids that we need to re-assign in here, because the ids are generated by multiple property encoders, - // and so we let each one of the encoders generate their own bogus ids and we re-assign those ids here, - // where we know we have a single thread doing this. - for ( int i = 0; i < length; i++ ) - { - PropertyBlock block = blocks[offset + i]; - PropertyType type = block.getType(); - switch ( type ) - { - case STRING: - reassignDynamicRecordIds( block, type, propertyStore.getStringStore() ); - break; - case ARRAY: - reassignDynamicRecordIds( block, type, propertyStore.getArrayStore() ); - break; - default: // No need to do anything be default, we only need to relink for dynamic records - } - } - } - - static void reassignDynamicRecordIds( PropertyBlock block, PropertyType type, AbstractDynamicStore store ) - { - Iterator dynamicRecords = block.getValueRecords().iterator(); - long newId = store.nextId(); - block.getValueBlocks()[0] = PropertyStore.singleBlockLongValue( block.getKeyIndexId(), type, newId ); - while ( dynamicRecords.hasNext() ) + // Write all the created property records. + for ( PropertyRecord[] propertyRecords : batch ) { - DynamicRecord dynamicRecord = dynamicRecords.next(); - dynamicRecord.setId( newId ); - if ( dynamicRecords.hasNext() ) + if ( propertyRecords != null ) { - dynamicRecord.setNextBlock( newId = store.nextId() ); + for ( PropertyRecord propertyRecord : propertyRecords ) + { + propertyStore.prepareForCommit( propertyRecord ); + propertyStore.updateRecord( propertyRecord ); + } } } } @@ -194,6 +131,6 @@ protected void done() // NodeStage completes before CalculateDenseNodesStage then we want to stop the time in the I/O monitor. ioMonitor.stop(); - entityStore.setHighestPossibleIdInUse( highestId ); + entityStore.setHighestPossibleIdInUse( highestId.get() ); } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/HighestId.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/HighestId.java new file mode 100644 index 0000000000000..aadac970f2ac7 --- /dev/null +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/HighestId.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Tracks a highest id when there are potentially multiple concurrent threads calling {@link #offer(long)}. + */ +public class HighestId +{ + private final AtomicLong highestId = new AtomicLong(); + + public void offer( long candidate ) + { + long currentHighest; + do + { + currentHighest = highestId.get(); + if ( candidate < currentHighest ) + { + return; + } + } + while ( !highestId.compareAndSet( currentHighest, candidate ) ); + } + + public long get() + { + return highestId.get(); + } +} diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java index e8d789c908f12..644468ea3afe1 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/NodeStage.java @@ -80,9 +80,9 @@ public NodeStage( Configuration config, IoMonitor writeMonitor, nodeStore = neoStore.getNodeStore(); PropertyStore propertyStore = neoStore.getPropertyStore(); - add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); add( new NodeEncoderStep( control(), config, idMapper, idGenerator, neoStore.getLabelRepository(), nodeStore, memoryUsage ) ); + add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); add( new LabelScanStorePopulationStep( control(), config, labelScanStore ) ); add( new EntityStoreUpdaterStep<>( control(), config, nodeStore, propertyStore, writeMonitor, storeUpdateMonitor ) ); diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStep.java index 6b10fe7b9ea23..0590d8adf0243 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStep.java @@ -19,25 +19,37 @@ */ package org.neo4j.unsafe.impl.batchimport; +import java.util.Iterator; + import org.neo4j.kernel.impl.store.PropertyStore; +import org.neo4j.kernel.impl.store.PropertyType; +import org.neo4j.kernel.impl.store.id.IdRangeIterator; +import org.neo4j.kernel.impl.store.id.IdSequence; +import org.neo4j.kernel.impl.store.record.DynamicRecord; import org.neo4j.kernel.impl.store.record.PrimitiveRecord; import org.neo4j.kernel.impl.store.record.PropertyBlock; +import org.neo4j.kernel.impl.store.record.PropertyRecord; +import org.neo4j.kernel.impl.store.record.Record; import org.neo4j.kernel.impl.transaction.state.PropertyCreator; -import org.neo4j.kernel.impl.util.MovingAverage; +import org.neo4j.kernel.impl.util.ReusableIteratorCostume; +import org.neo4j.kernel.impl.util.collection.ArrayCollection; import org.neo4j.unsafe.impl.batchimport.input.InputEntity; import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep; import org.neo4j.unsafe.impl.batchimport.staging.Stage; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; +import org.neo4j.unsafe.impl.batchimport.store.BatchingIdSequence; +import org.neo4j.unsafe.impl.batchimport.store.BatchingPropertyRecordAccess; import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingPropertyKeyTokenRepository; -import static java.lang.Math.max; -import static java.util.Arrays.copyOf; +import static java.lang.Math.toIntExact; /** - * Encodes property data into {@link PropertyBlock property blocks}, attaching them to each + * Encodes property data into {@link PropertyRecord property records}, attaching them to each * {@link Batch}. This step is designed to handle multiple threads doing the property encoding, * since property encoding is potentially the most costly step in this {@link Stage}. + * The delivered {@link PropertyRecord property records} all have local ids and so reassignment of those + * ids will have to be done later. */ public class PropertyEncoderStep extends ProcessorStep> @@ -45,16 +57,16 @@ public class PropertyEncoderStep batch, BatchSender sender ) { RelativeIdRecordAllocator stringAllocator = new RelativeIdRecordAllocator( stringDataSize ); RelativeIdRecordAllocator arrayAllocator = new RelativeIdRecordAllocator( arrayDataSize ); - PropertyCreator propertyCreator = new PropertyCreator( stringAllocator, arrayAllocator, null, null ); - - int blockCountGuess = (int) averageBlocksPerBatch.average(); - PropertyBlock[] propertyBlocks = new PropertyBlock[blockCountGuess == 0 - ? batch.input.length - : blockCountGuess + batch.input.length / 20 /*some upper margin*/]; - int blockCursor = 0; - int[] lengths = new int[batch.input.length]; + IdSequence relativePropertyRecordIds = new BatchingIdSequence(); + PropertyCreator propertyCreator = new PropertyCreator( stringAllocator, arrayAllocator, + relativePropertyRecordIds, null ); + ArrayCollection propertyRecordCollection = new ArrayCollection<>( 4 ); + BatchingPropertyRecordAccess propertyRecords = new BatchingPropertyRecordAccess(); + ReusableIteratorCostume blockIterator = new ReusableIteratorCostume<>(); + batch.propertyRecords = new PropertyRecord[batch.input.length][]; + int totalNumberOfProperties = 0; + int totalNumberOfPropertyRecords = 0; for ( int i = 0; i < batch.input.length; i++ ) { - stringAllocator.initialize(); - arrayAllocator.initialize(); - INPUT input = batch.input[i]; if ( !input.hasFirstPropertyId() ) - { // Encode the properties and attach the blocks to the BatchEntity. + { // Encode the properties and attach the blocks to the Batch instance. // Dynamic records for each entity will start from 0, they will be reassigned later anyway int count = input.properties().length >> 1; - if ( blockCursor + count > propertyBlocks.length ) + if ( count > 0 ) { - propertyBlocks = copyOf( propertyBlocks, max( propertyBlocks.length << 1, blockCursor + count ) ); + PropertyBlock[] propertyBlocks = new PropertyBlock[count]; + propertyKeyHolder.propertyKeysAndValues( propertyBlocks, 0, input.properties(), propertyCreator ); + + // Create the property records with local ids, they will have to be reassigned to real ids later + propertyCreator.createPropertyChain( null, // owner assigned in a later step + blockIterator.dressArray( propertyBlocks, 0, count ), + propertyRecords, propertyRecordCollection::add ); + batch.propertyRecords[i] = propertyRecordCollection.toArray( + new PropertyRecord[propertyRecordCollection.size()] ); + totalNumberOfPropertyRecords += propertyRecordCollection.size(); + totalNumberOfProperties += count; + propertyRecordCollection.clear(); } - propertyKeyHolder.propertyKeysAndValues( propertyBlocks, blockCursor, - input.properties(), propertyCreator ); - lengths[i] = count; - blockCursor += count; } } - batch.propertyBlocks = propertyBlocks; - batch.propertyBlocksLengths = lengths; - averageBlocksPerBatch.add( blockCursor ); + + // Enter a synchronized block which assigns id ranges + IdRangeIterator propertyRecordsIdRange; + IdRangeIterator dynamicStringRecordsIdRange; + IdRangeIterator dynamicArrayRecordsIdRange; + synchronized ( propertyStore ) + { + propertyRecordsIdRange = idRange( totalNumberOfPropertyRecords, propertyStore ); + dynamicStringRecordsIdRange = idRange( toIntExact( stringAllocator.peek() ), + propertyStore.getStringStore() ); + dynamicArrayRecordsIdRange = idRange( toIntExact( arrayAllocator.peek() ), + propertyStore.getArrayStore() ); + } + + // Do reassignment of ids here + for ( int i = 0; i < batch.input.length; i++ ) + { + INPUT input = batch.input[i]; + RECORD record = batch.records[i]; + if ( record != null ) + { + reassignPropertyIds( input, record, batch.propertyRecords[i], + propertyRecordsIdRange, dynamicStringRecordsIdRange, dynamicArrayRecordsIdRange ); + } + } + + // Assigned so that next single-threaded step can assign id ranges quickly + batch.numberOfProperties = totalNumberOfProperties; sender.send( batch ); } + + private static IdRangeIterator idRange( int size, IdSequence idSource ) + { + return size > 0 ? new IdRangeIterator( idSource.nextIdBatch( size ) ) : IdRangeIterator.EMPTY_ID_RANGE_ITERATOR; + } + + private static void reassignPropertyIds( InputEntity input, PrimitiveRecord record, PropertyRecord[] propertyRecords, + IdRangeIterator propertyRecordsIdRange, + IdRangeIterator dynamicStringRecordsIdRange, + IdRangeIterator dynamicArrayRecordsIdRange ) + { + if ( input.hasFirstPropertyId() ) + { + record.setNextProp( input.firstPropertyId() ); + } + else + { + if ( propertyRecords != null ) + { + reassignDynamicRecordIds( dynamicStringRecordsIdRange, dynamicArrayRecordsIdRange, propertyRecords ); + long firstProp = reassignPropertyRecordIds( record, propertyRecordsIdRange, propertyRecords ); + record.setNextProp( firstProp ); + } + } + } + + private static long reassignPropertyRecordIds( PrimitiveRecord record, IdRangeIterator ids, + PropertyRecord[] propertyRecords ) + { + long newId = ids.next(); + long firstId = newId; + PropertyRecord prev = null; + for ( PropertyRecord propertyRecord : propertyRecords ) + { + record.setIdTo( propertyRecord ); + propertyRecord.setId( newId ); + if ( !Record.NO_NEXT_PROPERTY.is( propertyRecord.getNextProp() ) ) + { + propertyRecord.setNextProp( newId = ids.next() ); + } + if ( prev != null ) + { + propertyRecord.setPrevProp( prev.getId() ); + } + prev = propertyRecord; + } + return firstId; + } + + private static void reassignDynamicRecordIds( IdRangeIterator stringRecordsIds, IdRangeIterator arrayRecordsIds, + PropertyRecord[] propertyRecords ) + { + // OK, so here we have property blocks, potentially referring to DynamicRecords. The DynamicRecords + // have ids that we need to re-assign in here, because the ids are generated by multiple property encoders, + // and so we let each one of the encoders generate their own bogus ids and we re-assign those ids here, + // where we know we have a single thread doing this. + for ( PropertyRecord propertyRecord : propertyRecords ) + { + for ( PropertyBlock block : propertyRecord ) + { + PropertyType type = block.getType(); + switch ( type ) + { + case STRING: + reassignDynamicRecordIds( block, type, stringRecordsIds ); + break; + case ARRAY: + reassignDynamicRecordIds( block, type, arrayRecordsIds ); + break; + default: // No need to do anything be default, we only need to relink for dynamic records + } + } + } + } + + private static void reassignDynamicRecordIds( PropertyBlock block, PropertyType type, IdRangeIterator ids ) + { + Iterator dynamicRecords = block.getValueRecords().iterator(); + long newId = ids.next(); + block.getValueBlocks()[0] = PropertyStore.singleBlockLongValue( block.getKeyIndexId(), type, newId ); + while ( dynamicRecords.hasNext() ) + { + DynamicRecord dynamicRecord = dynamicRecords.next(); + dynamicRecord.setId( newId ); + if ( dynamicRecords.hasNext() ) + { + dynamicRecord.setNextBlock( newId = ids.next() ); + } + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java index db7cb78bb2b4b..ebcf707272e2e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelationshipStage.java @@ -89,10 +89,10 @@ public RelationshipStage( String topic, Configuration config, IoMonitor writeMon add( idAssigner = new AssignRelationshipIdBatchStep( control(), config, firstRelationshipId ) ); add( new RelationshipPreparationStep( control(), config, idMapper ) ); add( new RelationshipRecordPreparationStep( control(), config, neoStore.getRelationshipTypeRepository() ) ); - add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); add( new RelationshipEncoderStep( control(), config, cache ) ); - add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore, writeMonitor, - storeUpdateMonitor ) ); + add( new PropertyEncoderStep<>( control(), config, neoStore.getPropertyKeyRepository(), propertyStore ) ); + add( new EntityStoreUpdaterStep<>( control(), config, relationshipStore, propertyStore, + writeMonitor, storeUpdateMonitor ) ); } public long getNextRelationshipId() diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelativeIdRecordAllocator.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelativeIdRecordAllocator.java index 49f4d5c4e9a26..8f44de38c2ef2 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelativeIdRecordAllocator.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/RelativeIdRecordAllocator.java @@ -49,6 +49,11 @@ public int getRecordDataSize() return dataSize; } + public long peek() + { + return id; + } + @Override public DynamicRecord nextRecord() { diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java index 40f0dd2ee5907..e646de4a6e9ef 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/UpdateRecordsStep.java @@ -45,7 +45,7 @@ public class UpdateRecordsStep public UpdateRecordsStep( StageControl control, Configuration config, RecordStore store ) { - super( control, "v", config, 1 ); + super( control, "v", config, 0 ); this.store = store; this.recordSize = store.getRecordSize(); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdGeneratorFactory.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdGeneratorFactory.java index 911f9a1921a1f..7ee74957da270 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdGeneratorFactory.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingIdGeneratorFactory.java @@ -96,7 +96,7 @@ public long nextId() @Override public IdRange nextIdBatch( int size ) { - throw new UnsupportedOperationException(); + return idSequence.nextIdBatch( size ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingPropertyRecordAccess.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingPropertyRecordAccess.java index 5259abc1b4e28..e75c4e44113b4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingPropertyRecordAccess.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/store/BatchingPropertyRecordAccess.java @@ -30,6 +30,8 @@ public class BatchingPropertyRecordAccess extends BatchingRecordAccess testClass, StoreType... stores ) this.stores = stores; } + public Builder builder() + { + return new Builder(); + } + public NeoStores open( String... config ) throws IOException { Config configuration = Config.embeddedDefaults( stringMap( config ) ); @@ -72,17 +80,34 @@ public NeoStores open( RecordFormats format, String... config ) throws IOExcepti efs = new EphemeralFileSystemAbstraction(); Config conf = Config.embeddedDefaults( stringMap( config ) ); pageCache = getOrCreatePageCache( conf, efs ); - return open( efs, pageCache, format, config ); + return open( efs, pageCache, format, fs -> new DefaultIdGeneratorFactory( fs ), config ); } - public NeoStores open( FileSystemAbstraction fs, PageCache pageCache, RecordFormats format, String... config ) - throws IOException + public NeoStores open( FileSystemAbstraction fs, PageCache pageCache, RecordFormats format, + Function idGeneratorFactory, String... config ) + throws IOException { assert neoStores == null : "Already opened"; + if ( fs == null ) + { + fs = efs = new EphemeralFileSystemAbstraction(); + } TestDirectory testDirectory = TestDirectory.testDirectory( testClass, fs ); File storeDir = testDirectory.makeGraphDbDir(); + if ( config == null ) + { + config = new String[0]; + } Config configuration = Config.embeddedDefaults( stringMap( config ) ); - StoreFactory storeFactory = new StoreFactory( storeDir, configuration, new DefaultIdGeneratorFactory( fs ), + if ( pageCache == null ) + { + pageCache = this.pageCache = getOrCreatePageCache( configuration, fs ); + } + if ( format == null ) + { + format = RecordFormatSelector.defaultFormat(); + } + StoreFactory storeFactory = new StoreFactory( storeDir, configuration, idGeneratorFactory.apply( fs ), pageCache, fs, format, NullLogProvider.getInstance() ); return neoStores = stores.length == 0 ? storeFactory.openAllNeoStores( true ) @@ -92,14 +117,7 @@ public NeoStores open( FileSystemAbstraction fs, PageCache pageCache, RecordForm @Override protected void after( boolean successful ) throws Throwable { - if ( neoStores != null ) - { - neoStores.close(); - } - if ( pageCache != null ) - { - pageCache.close(); - } + IOUtils.closeAll( neoStores, pageCache ); if ( efs != null ) { efs.close(); @@ -113,4 +131,48 @@ private static PageCache getOrCreatePageCache( Config config, FileSystemAbstract PageCursorTracerSupplier.NULL, log ); return pageCacheFactory.getOrCreatePageCache(); } + + public class Builder + { + private FileSystemAbstraction fs; + private String[] config; + private RecordFormats format; + private PageCache pageCache; + private Function idGeneratorFactory; + + public Builder with( FileSystemAbstraction fs ) + { + this.fs = fs; + return this; + } + + public Builder with( String... config ) + { + this.config = config; + return this; + } + + public Builder with( RecordFormats format ) + { + this.format = format; + return this; + } + + public Builder with( PageCache pageCache ) + { + this.pageCache = pageCache; + return this; + } + + public Builder with( Function idGeneratorFactory ) + { + this.idGeneratorFactory = idGeneratorFactory; + return this; + } + + public NeoStores build() throws IOException + { + return open( fs, pageCache, format, idGeneratorFactory, config ); + } + } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/BatchCollector.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/BatchCollector.java new file mode 100644 index 0000000000000..2847ee3a5bcba --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/BatchCollector.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import java.util.ArrayList; +import java.util.List; + +import org.neo4j.unsafe.impl.batchimport.staging.BatchSender; + +public class BatchCollector implements BatchSender +{ + private final List batches = new ArrayList<>(); + + @Override + public synchronized void send( Object batch ) + { + batches.add( (T) batch ); + } + + public List getBatches() + { + return batches; + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/HighestIdTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/HighestIdTest.java new file mode 100644 index 0000000000000..52ecbe9916186 --- /dev/null +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/HighestIdTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.neo4j.unsafe.impl.batchimport; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLongArray; + +import org.neo4j.test.Race; +import org.neo4j.test.rule.RepeatRule; +import org.neo4j.test.rule.RepeatRule.Repeat; + +import static org.junit.Assert.assertEquals; + +import static java.lang.Math.max; + +public class HighestIdTest +{ + @Rule + public final RepeatRule repeater = new RepeatRule(); + + @Repeat( times = 100 ) + @Test + public void shouldKeepHighest() throws Throwable + { + // GIVEN + Race race = new Race(); + HighestId highestId = new HighestId(); + int threads = Runtime.getRuntime().availableProcessors(); + CountDownLatch latch = new CountDownLatch( threads ); + AtomicLongArray highestIds = new AtomicLongArray( threads ); + for ( int c = 0; c < threads; c++ ) + { + int cc = c; + race.addContestant( new Runnable() + { + boolean run = false; + ThreadLocalRandom random = ThreadLocalRandom.current(); + + @Override + public void run() + { + if ( run ) + { + return; + } + + long highest = 0; + for ( int i = 0; i < 10; i++ ) + { + long nextLong = random.nextLong( 100 ); + highestId.offer( nextLong ); + highest = max( highest, nextLong ); + } + highestIds.set( cc, highest ); + latch.countDown(); + run = true; + } + } ); + } + race.withEndCondition( () -> latch.getCount() == 0 ); + + // WHEN + race.go(); + + long highest = 0; + for ( int i = 0; i < threads; i++ ) + { + highest = max( highest, highestIds.get( i ) ); + } + assertEquals( highest, highestId.get() ); + } +} diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStepTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStepTest.java index bc91195113770..be9b1c70cc636 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStepTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/PropertyEncoderStepTest.java @@ -19,106 +19,127 @@ */ package org.neo4j.unsafe.impl.batchimport; -import org.junit.After; -import org.junit.Before; +import org.apache.commons.lang3.StringUtils; import org.junit.Rule; import org.junit.Test; -import java.io.File; -import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; -import org.neo4j.io.pagecache.PageCache; +import org.neo4j.collection.primitive.Primitive; +import org.neo4j.collection.primitive.PrimitiveLongSet; import org.neo4j.kernel.impl.store.NeoStores; -import org.neo4j.kernel.impl.store.StoreFactory; +import org.neo4j.kernel.impl.store.PropertyStore; +import org.neo4j.kernel.impl.store.StoreType; +import org.neo4j.kernel.impl.store.record.DynamicRecord; import org.neo4j.kernel.impl.store.record.NodeRecord; -import org.neo4j.logging.NullLogProvider; -import org.neo4j.test.rule.PageCacheRule; -import org.neo4j.test.rule.fs.EphemeralFileSystemRule; +import org.neo4j.kernel.impl.store.record.PropertyBlock; +import org.neo4j.kernel.impl.store.record.PropertyRecord; +import org.neo4j.test.Race; +import org.neo4j.test.rule.NeoStoresRule; import org.neo4j.unsafe.impl.batchimport.input.InputNode; import org.neo4j.unsafe.impl.batchimport.staging.StageControl; -import org.neo4j.unsafe.impl.batchimport.staging.Step; +import org.neo4j.unsafe.impl.batchimport.store.BatchingIdGeneratorFactory; import org.neo4j.unsafe.impl.batchimport.store.BatchingTokenRepository.BatchingPropertyKeyTokenRepository; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.neo4j.unsafe.impl.batchimport.Configuration.DEFAULT; public class PropertyEncoderStepTest { - @Rule - public final EphemeralFileSystemRule fsRule = new EphemeralFileSystemRule(); - @Rule - public final PageCacheRule pageCacheRule = new PageCacheRule(); - private NeoStores neoStores; - private PageCache pageCache; + private static final String LONG_STRING = StringUtils.repeat( "12%$heya", 40 ); - @Before - public void setUpNeoStore() - { - File storeDir = new File( "dir" ); - pageCache = pageCacheRule.getPageCache( fsRule.get() ); - StoreFactory factory = new StoreFactory( storeDir, pageCache, fsRule.get(), NullLogProvider.getInstance() ); - neoStores = factory.openAllNeoStores( true ); - } - - @After - public void closeNeoStore() throws IOException - { - neoStores.close(); - pageCache.close(); - } + @Rule + public final NeoStoresRule neoStoresRule = new NeoStoresRule( getClass(), + StoreType.PROPERTY, StoreType.PROPERTY_KEY_TOKEN, StoreType.PROPERTY_KEY_TOKEN_NAME, + StoreType.PROPERTY_STRING, StoreType.PROPERTY_ARRAY ); - @SuppressWarnings( "unchecked" ) @Test - public void shouldGrowPropertyBlocksArrayProperly() throws Exception + public void shouldAssignCorrectIdsOnParallelExecution() throws Throwable { - // GIVEN StageControl control = mock( StageControl.class ); - BatchingPropertyKeyTokenRepository tokens = - new BatchingPropertyKeyTokenRepository( neoStores.getPropertyKeyTokenStore() ); - Step> step = - new PropertyEncoderStep<>( control, DEFAULT, tokens, neoStores.getPropertyStore() ); - @SuppressWarnings( "rawtypes" ) - Step downstream = mock( Step.class ); - step.setDownstream( downstream ); + int batchSize = 100; + Configuration config = new Configuration() + { + @Override + public int batchSize() + { + return batchSize; + } + }; + NeoStores stores = neoStoresRule.builder().with( fs -> new BatchingIdGeneratorFactory( fs ) ).build(); + BatchingPropertyKeyTokenRepository keyRepository = + new BatchingPropertyKeyTokenRepository( stores.getPropertyKeyTokenStore() ); + PropertyStore propertyStore = stores.getPropertyStore(); + PropertyEncoderStep encoder = + new PropertyEncoderStep<>( control, config, keyRepository, propertyStore ); + BatchCollector> sender = new BatchCollector<>(); // WHEN - step.start( 0 ); - step.receive( 0, smallbatch() ); - step.endOfUpstream(); - awaitCompleted( step, control ); + Race race = new Race(); + for ( int i = 0; i < Runtime.getRuntime().availableProcessors(); i++ ) + { + int id = i; + race.addContestant( () -> encoder.process( batch( id, batchSize ), sender ) ); + } + race.go(); - // THEN - verify( downstream ).receive( anyLong(), any() ); - verifyNoMoreInteractions( control ); - step.close(); + assertUniqueIds( sender.getBatches() ); } - private void awaitCompleted( Step step, StageControl control ) throws InterruptedException + private void assertUniqueIds( List> batches ) { - while ( !step.isCompleted() ) + PrimitiveLongSet ids = Primitive.longSet( 1_000 ); + PrimitiveLongSet stringIds = Primitive.longSet( 100 ); + PrimitiveLongSet arrayIds = Primitive.longSet( 100 ); + for ( Batch batch : batches ) { - Thread.sleep( 10 ); - verifyNoMoreInteractions( control ); + for ( PropertyRecord[] records : batch.propertyRecords ) + { + for ( PropertyRecord record : records ) + { + assertTrue( ids.add( record.getId() ) ); + for ( PropertyBlock block : record ) + { + for ( DynamicRecord dynamicRecord : block.getValueRecords() ) + { + switch ( dynamicRecord.getType() ) + { + case STRING: + assertTrue( stringIds.add( dynamicRecord.getId() ) ); + break; + case ARRAY: + assertTrue( arrayIds.add( dynamicRecord.getId() ) ); + break; + default: + fail( "Unexpected property type " + dynamicRecord.getType() ); + } + } + } + } + } } } - private Batch smallbatch() + protected Batch batch( int id, int batchSize ) { - return new Batch<>( new InputNode[] {new InputNode( "source", 1, 0, "1", new Object[] { - "key1", "value1", - "key2", "value2", - "key3", "value3", - "key4", "value4", - "key5", "value5" - }, null, new String[] { - "label1", - "label2", - "label3", - "label4" - }, null )} ); + InputNode[] input = new InputNode[batchSize]; + NodeRecord[] records = new NodeRecord[batchSize]; + ThreadLocalRandom random = ThreadLocalRandom.current(); + for ( int i = 0; i < batchSize; i++ ) + { + String value = id + "_" + i; + if ( random.nextFloat() < 0.01 ) + { + value += LONG_STRING; + } + input[i] = new InputNode( "source", 0, 0, null, + new Object[] {"key", value}, null, InputNode.NO_LABELS, null ); + records[i] = new NodeRecord( -1 ); + } + Batch batch = new Batch<>( input ); + batch.records = records; + return batch; } }