From ddb5d414ac35f2ac9c768e95543611258a454dcb Mon Sep 17 00:00:00 2001 From: Mattias Persson Date: Mon, 13 Jun 2016 12:55:20 +0200 Subject: [PATCH] Parallel reading of cached input data For this the format needed to change a bit so that entities are written in batches with a small header saying how big a batch is, byte-wise as well as entity count-wise. This favors a setup where there's a dedicated reader and multiple processors of that data. --- .../log/InMemoryClosableChannel.java | 12 +- .../log/PhysicalFlushableChannel.java | 21 +++ ...PositionAwarePhysicalFlushableChannel.java | 9 +- .../batchimport/ParallelBatchImporter.java | 2 +- .../impl/batchimport/input/InputCache.java | 38 +++- .../batchimport/input/InputEntityCacher.java | 72 ++++++- .../batchimport/input/InputEntityReader.java | 175 +++++++++++++++--- .../batchimport/input/InputNodeCacher.java | 12 +- .../batchimport/input/InputNodeReader.java | 27 +-- .../input/InputRelationshipCacher.java | 12 +- .../input/InputRelationshipReader.java | 25 +-- .../batchimport/input/InputCacheTest.java | 70 ++++++- .../InputEntityCacherTokenCreationTest.java | 6 +- .../PerTypeRelationshipSplitterTest.java | 3 +- 14 files changed, 394 insertions(+), 90 deletions(-) rename community/kernel/src/{test => main}/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java (94%) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java similarity index 94% rename from community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java rename to community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java index 35c49f246b21..beadba6659d8 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/InMemoryClosableChannel.java @@ -27,6 +27,9 @@ import org.neo4j.io.fs.StoreChannel; import org.neo4j.storageengine.api.ReadPastEndException; +/** + * Implementation of {@link ReadableClosablePositionAwareChannel} operating over a {@code byte[]} in memory. + */ public class InMemoryClosableChannel implements ReadableClosablePositionAwareChannel, FlushablePositionAwareChannel { private final byte[] bytes; @@ -38,19 +41,22 @@ public InMemoryClosableChannel() this( 1000 ); } - public InMemoryClosableChannel( byte[] bytes ) + public InMemoryClosableChannel( byte[] bytes, boolean append ) { this.bytes = bytes; this.asWriter = ByteBuffer.wrap( this.bytes ); this.asReader = ByteBuffer.wrap( this.bytes ); + if ( append ) + { + this.asWriter.position( bytes.length ); + } } public InMemoryClosableChannel( int bufferSize ) { - this( new byte[bufferSize] ); + this( new byte[bufferSize], false ); } - public void reset() { asWriter.clear(); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalFlushableChannel.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalFlushableChannel.java index ff1c41036bcf..779118e6f0bd 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalFlushableChannel.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PhysicalFlushableChannel.java @@ -170,8 +170,29 @@ public void close() throws IOException channel.close(); } + /** + * @return the position of the channel, also taking into account buffer position. + * @throws IOException if underlying channel throws {@link IOException}. + */ public long position() throws IOException { return channel.position() + buffer.position(); } + + /** + * Sets position of this channel to the new {@code position}. This works only if the underlying channel + * supports positioning. + * + * @param position new position (byte offset) to set as new current position. + * @throws IOException if underlying channel throws {@link IOException}. + */ + public void position( long position ) throws IOException + { + // Currently we take the pessimistic approach of flushing (doesn't imply forcing) buffer to + // channel before moving to a new position. This works in all cases, but there could be + // made an optimization where we could see that we're moving within the current buffer range + // and if so skip flushing and simply move the cursor in the buffer. + prepareForFlush().flush(); + channel.position( position ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PositionAwarePhysicalFlushableChannel.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PositionAwarePhysicalFlushableChannel.java index 254fadf97665..edbd6984adf8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PositionAwarePhysicalFlushableChannel.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/transaction/log/PositionAwarePhysicalFlushableChannel.java @@ -22,8 +22,6 @@ import java.io.Flushable; import java.io.IOException; -import org.neo4j.io.fs.StoreChannel; - /** * Decorator around a {@link LogVersionedStoreChannel} making it expose {@link FlushablePositionAwareChannel}. This * implementation uses a {@link PhysicalFlushableChannel}, which provides buffering for write operations over the @@ -32,7 +30,7 @@ public class PositionAwarePhysicalFlushableChannel implements FlushablePositionAwareChannel { private LogVersionedStoreChannel logVersionedStoreChannel; - private PhysicalFlushableChannel channel; + private final PhysicalFlushableChannel channel; public PositionAwarePhysicalFlushableChannel( LogVersionedStoreChannel logVersionedStoreChannel) { this.logVersionedStoreChannel = logVersionedStoreChannel; @@ -111,4 +109,9 @@ void setChannel( LogVersionedStoreChannel channel ) this.logVersionedStoreChannel = channel; this.channel.setChannel( channel ); } + + public void setCurrentPosition( LogPosition position ) throws IOException, UnsupportedOperationException + { + channel.position( position.getByteOffset() ); + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java index 16cd332edf7c..abc8d03d0b11 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/ParallelBatchImporter.java @@ -132,7 +132,7 @@ public void doImport( Input input ) throws IOException additionalInitialIds, dbConfig ); CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset( neoStore.getLastCommittedTransactionId() ); - InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats ) ) + InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats, config ) ) { Collector badCollector = input.badCollector(); // Some temporary caches and indexes in the import diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java index 8e042e1b0280..33d3aa5307dd 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputCache.java @@ -30,6 +30,7 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.StoreChannel; import org.neo4j.kernel.impl.store.format.RecordFormats; +import org.neo4j.unsafe.impl.batchimport.Configuration; import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; @@ -85,6 +86,9 @@ * ELSE IF {@link #NEW_TYPE} * 4B token id * + * + * The format stores entities in batches, each batch having a small header containing number of bytes + * and number of entities. */ public class InputCache implements Closeable { @@ -113,37 +117,53 @@ public class InputCache implements Closeable static final byte NEW_TYPE = 1; static final byte END_OF_HEADER = -2; static final short END_OF_ENTITIES = -3; + static final int NO_ENTITIES = 0; + static final long END_OF_CACHE = 0L; private final FileSystemAbstraction fs; private final File cacheDirectory; - private RecordFormats recordFormats; + private final RecordFormats recordFormats; + private final Configuration config; + private final int bufferSize; private final Set subTypes = new HashSet<>(); + private final int batchSize; - public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats ) + public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats, + Configuration config ) { - this( fs, cacheDirectory, recordFormats, (int) ByteUnit.kibiBytes( 512 ) ); + this( fs, cacheDirectory, recordFormats, config, (int) ByteUnit.kibiBytes( 512 ), 10_000 ); } - public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats, int bufferSize ) + /** + * @param fs {@link FileSystemAbstraction} to use + * @param cacheDirectory directory for placing the cached files + * @param config import configuration + * @param bufferSize buffer size for writing/reading cache files + * @param batchSize number of entities in each batch + */ + public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats, + Configuration config, int bufferSize, int batchSize ) { this.fs = fs; this.cacheDirectory = cacheDirectory; this.recordFormats = recordFormats; + this.config = config; this.bufferSize = bufferSize; + this.batchSize = batchSize; } public Receiver cacheNodes( String subType ) throws IOException { return new InputNodeCacher( channel( NODES, subType, "rw" ), channel( NODES_HEADER, subType, "rw" ), - recordFormats, bufferSize ); + recordFormats, bufferSize, batchSize ); } public Receiver cacheRelationships( String subType ) throws IOException { return new InputRelationshipCacher( channel( RELATIONSHIPS, subType, "rw" ), - channel( RELATIONSHIPS_HEADER, subType, "rw" ), recordFormats, bufferSize ); + channel( RELATIONSHIPS_HEADER, subType, "rw" ), recordFormats, bufferSize, batchSize ); } private StoreChannel channel( String type, String subType, String mode ) throws IOException @@ -165,7 +185,8 @@ public InputIterable nodes( String subType, boolean deleteAfterUse ) public InputIterator get() throws IOException { return new InputNodeReader( channel( NODES, subType, "r" ), channel( NODES_HEADER, subType, "r" ), - bufferSize, deleteAction( deleteAfterUse, NODES, NODES_HEADER, subType ) ); + bufferSize, deleteAction( deleteAfterUse, NODES, NODES_HEADER, subType ), + config.maxNumberOfProcessors() ); } } ); } @@ -179,7 +200,8 @@ public InputIterator get() throws IOException { return new InputRelationshipReader( channel( RELATIONSHIPS, subType, "r" ), channel( RELATIONSHIPS_HEADER, subType, "r" ), bufferSize, - deleteAction( deleteAfterUse, RELATIONSHIPS, RELATIONSHIPS_HEADER, subType ) ); + deleteAction( deleteAfterUse, RELATIONSHIPS, RELATIONSHIPS_HEADER, subType ), + config.maxNumberOfProcessors() ); } } ); } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java index ae3478aa27da..72f22a139a2e 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacher.java @@ -20,6 +20,7 @@ package org.neo4j.unsafe.impl.batchimport.input; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -27,6 +28,8 @@ import org.neo4j.io.fs.StoreChannel; import org.neo4j.kernel.impl.store.format.RecordFormats; import org.neo4j.kernel.impl.transaction.log.FlushableChannel; +import org.neo4j.kernel.impl.transaction.log.LogPosition; +import org.neo4j.kernel.impl.transaction.log.LogPositionMarker; import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.PositionAwarePhysicalFlushableChannel; @@ -48,7 +51,7 @@ */ abstract class InputEntityCacher implements Receiver { - protected final FlushableChannel channel; + protected final PositionAwarePhysicalFlushableChannel channel; private final FlushableChannel header; private final StoreChannel storeChannel; private final StoreChannel headerChannel; @@ -56,22 +59,27 @@ abstract class InputEntityCacher implements Receiver private final int[] nextKeyId = new int[HIGH_TOKEN_TYPE]; private final int[] maxKeyId = new int[HIGH_TOKEN_TYPE]; + @SuppressWarnings( "unchecked" ) private final Map[] tokens = new Map[HIGH_TOKEN_TYPE]; - protected InputEntityCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, int bufferSize, - int groupSlots ) throws IOException + private final LogPositionMarker positionMarker = new LogPositionMarker(); + private LogPosition currentBatchStartPosition; + private int entitiesWritten; + private final int batchSize; + + protected InputEntityCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, + int bufferSize, int batchSize, int groupSlots ) + throws IOException { this.storeChannel = channel; this.headerChannel = header; + this.batchSize = batchSize; this.previousGroupIds = new int[groupSlots]; initMaxTokenKeyIds( recordFormats ); + clearState(); - for ( int i = 0; i < groupSlots; i++ ) - { - previousGroupIds[i] = Group.GLOBAL.id(); - } // We don't really care about versions, it's just that apart from that the WritableLogChannel // does precisely what we want and there's certainly value in not duplicating that functionality. this.channel = new PositionAwarePhysicalFlushableChannel( @@ -89,10 +97,56 @@ public void receive( ENTITY[] batch ) throws IOException { for ( ENTITY entity : batch ) { + if ( entitiesWritten % batchSize == 0 ) + { + newBatch(); + } + entitiesWritten++; writeEntity( entity ); } } + // [ A ][ B ][.................................] + // |<-----A------------------------->| (B entities in total) + // |<------------------------------------------->| + private void newBatch() throws IOException + { + channel.getCurrentPosition( positionMarker ); + + // Set byte size in previous batch + if ( entitiesWritten > 0 ) + { + // Remember the current position + // Go back to the start of this batch + channel.setCurrentPosition( currentBatchStartPosition ); + // and set the size in that long field (not counting the size of the size field) + channel.putLong( positionMarker.getByteOffset() - currentBatchStartPosition.getByteOffset() - Long.BYTES ); + // and number of entities written + channel.putInt( entitiesWritten ); + // Now go back to where we were before updating this size field + channel.setCurrentPosition( positionMarker.newPosition() ); + } + + // Always add mark for the new batch here, this will simplify reader logic + startBatch(); + } + + private void startBatch() throws IOException + { + // Make room for size in new batch and number of entities + // Until this batch is finished, this mark the end of the cache. + clearState(); + entitiesWritten = 0; + currentBatchStartPosition = positionMarker.newPosition(); + channel.putLong( InputCache.END_OF_CACHE ); + channel.putInt( InputCache.NO_ENTITIES ); + } + + protected void clearState() + { + Arrays.fill( previousGroupIds, Group.GLOBAL.id() ); + } + protected void writeEntity( ENTITY entity ) throws IOException { // properties @@ -160,7 +214,7 @@ protected void writeToken( byte type, Object key ) throws IOException else if ( key instanceof Integer ) { // Here we signal that we have a real token id, not to be confused by the local and contrived - // toiken ids we generate in here. Following this -1 is the real token id. + // token ids we generate in here. Following this -1 is the real token id. channel.putInt( (short) -1 ); channel.putInt( (Integer) key ); } @@ -173,6 +227,8 @@ else if ( key instanceof Integer ) @Override public void close() throws IOException { + newBatch(); + header.put( END_OF_HEADER ); // This is a special value denoting the end of the stream. This is done like this since // properties are the first thing read for every entity. diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java index 8cffbd6e4282..a1cef13348fe 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityReader.java @@ -20,20 +20,27 @@ package org.neo4j.unsafe.impl.batchimport.input; import java.io.IOException; +import java.util.Iterator; +import java.util.function.BiFunction; +import java.util.function.Supplier; import org.neo4j.collection.primitive.Primitive; import org.neo4j.collection.primitive.PrimitiveIntObjectMap; import org.neo4j.helpers.collection.PrefetchingIterator; import org.neo4j.io.ByteUnit; import org.neo4j.io.fs.StoreChannel; +import org.neo4j.kernel.impl.transaction.log.InMemoryClosableChannel; import org.neo4j.kernel.impl.transaction.log.LogPositionMarker; import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.ReadAheadLogChannel; import org.neo4j.kernel.impl.transaction.log.ReadableClosableChannel; -import org.neo4j.kernel.impl.transaction.log.ReadableLogChannel; +import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; +import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor; import org.neo4j.unsafe.impl.batchimport.InputIterator; +import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing; import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS; +import static org.neo4j.unsafe.impl.batchimport.Utils.safeCastLongToInt; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_ENTITIES; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_HEADER; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.GROUP_TOKEN; @@ -47,34 +54,90 @@ /** * Abstract class for reading cached entities previously stored using {@link InputEntityCacher} or derivative. + * Entity data is read in batches, each handed off to one ore more processors which interprets the bytes + * into {@link InputEntity} instances. From the outside this is simply an {@link InputIterator}, + * the parallelization happens inside. */ -abstract class InputEntityReader extends PrefetchingIterator - implements InputIterator +abstract class InputEntityReader extends InputIterator.Adapter { - protected final ReadableLogChannel channel; + // Used by BatchProvidingIterator. To feed jobs into TicketedProcessing private final LogPositionMarker positionMarker = new LogPositionMarker(); private int lineNumber; - private final Group[] previousGroups; + private TicketedProcessing processing; + + // Used by workers, immutable private final PrimitiveIntObjectMap[] tokens; + + // Not used by workers private final Runnable closeAction; + private final ReadAheadLogChannel cacheChannel; + private final ContinuableArrayCursor processedEntities; + + protected static class ProcessorState + { + // Used by workers, mutable + protected final Group[] previousGroups; + protected String previousType; + protected String[] previousLabels = InputEntity.NO_LABELS; + protected ReadableClosablePositionAwareChannel batchChannel; + + public ProcessorState( byte[] batchData ) + { + this.batchChannel = new InMemoryClosableChannel( batchData, true/*append*/ ); + this.previousGroups = new Group[2]; + for ( int i = 0; i < previousGroups.length; i++ ) + { + previousGroups[i] = Group.GLOBAL; + } + } + } @SuppressWarnings( "unchecked" ) - InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots, - Runnable closeAction ) throws IOException + InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, Runnable closeAction, + int maxNbrOfProcessors ) + throws IOException { tokens = new PrimitiveIntObjectMap[HIGH_TOKEN_TYPE]; tokens[PROPERTY_KEY_TOKEN] = Primitive.intObjectMap(); tokens[LABEL_TOKEN] = Primitive.intObjectMap(); tokens[RELATIONSHIP_TYPE_TOKEN] = Primitive.intObjectMap(); tokens[GROUP_TOKEN] = Primitive.intObjectMap(); - this.previousGroups = new Group[groupSlots]; - for ( int i = 0; i < groupSlots; i++ ) - { - previousGroups[i] = Group.GLOBAL; - } - this.channel = reader( channel, bufferSize ); + cacheChannel = reader( channel, bufferSize ); this.closeAction = closeAction; readHeader( header ); + + /** The processor is the guy converting the byte[] to ENTITY[] + * we will have a lot of those guys + */ + BiFunction processor = (batchData,ignore) -> + { + ProcessorState state = new ProcessorState( batchData ); + try + { + int nbrOfEntries = state.batchChannel.getInt(); + + // Read all Entities and put in ENTITY[] to return. + Object[] result = new Object[nbrOfEntries]; + for ( int i = 0; i < nbrOfEntries; i++ ) + { + result[i] = readOneEntity( state ); + } + + return result; + } + catch ( IOException e ) + { + throw new IllegalStateException( e ); + } + }; + Supplier noState = () -> null; + processing = new TicketedProcessing<>( getClass().getName(), maxNbrOfProcessors, processor, noState ); + + // This iterator is only called from TicketedProcessing.slurp that submit jobs to new threads. + Iterator iterator = new BatchProvidingIterator(); + processing.slurp( iterator, true ); + + processedEntities = new ContinuableArrayCursor<>( () -> processing.next() ); } private ReadAheadLogChannel reader( StoreChannel channel, int bufferSize ) throws IOException @@ -98,19 +161,19 @@ private void readHeader( StoreChannel header ) throws IOException } } - @Override - protected final ENTITY fetchNextOrNull() + protected final ENTITY readOneEntity( ProcessorState state ) { + ReadableClosablePositionAwareChannel channel = state.batchChannel; try { - lineNumber++; - Object properties = readProperties(); + // Read next entity + Object properties = readProperties( channel ); if ( properties == null ) { return null; } - return readNextOrNull( properties ); + return readNextOrNull( properties, state ); } catch ( IOException e ) { @@ -118,9 +181,16 @@ protected final ENTITY fetchNextOrNull() } } - protected abstract ENTITY readNextOrNull( Object properties ) throws IOException; + @Override + @SuppressWarnings( "unchecked" ) + protected ENTITY fetchNextOrNull() + { + return processedEntities.next() ? (ENTITY) processedEntities.get() : null; + } + + protected abstract ENTITY readNextOrNull( Object properties, ProcessorState state ) throws IOException; - private Object readProperties() throws IOException + private Object readProperties( ReadableClosablePositionAwareChannel channel ) throws IOException { short count = channel.getShort(); switch ( count ) @@ -134,14 +204,14 @@ private Object readProperties() throws IOException Object[] properties = new Object[count*2]; for ( int i = 0; i < properties.length; i++ ) { - properties[i++] = readToken( PROPERTY_KEY_TOKEN ); - properties[i] = readValue(); + properties[i++] = readToken( PROPERTY_KEY_TOKEN, channel ); + properties[i] = readValue( channel ); } return properties; } } - protected Object readToken( byte type ) throws IOException + protected Object readToken( byte type, ReadableClosablePositionAwareChannel channel ) throws IOException { int id = channel.getInt(); if ( id == -1 ) @@ -158,19 +228,20 @@ protected Object readToken( byte type ) throws IOException return name; } - protected Object readValue() throws IOException + protected Object readValue( ReadableClosablePositionAwareChannel channel ) throws IOException { return ValueType.typeOf( channel.get() ).read( channel ); } - protected Group readGroup( int slot ) throws IOException + protected Group readGroup( int slot, ProcessorState state ) throws IOException { + ReadableClosablePositionAwareChannel channel = state.batchChannel; byte groupMode = channel.get(); switch ( groupMode ) { - case SAME_GROUP: return previousGroups[slot]; - case NEW_GROUP: return previousGroups[slot] = new Group.Adapter( channel.getInt(), - (String) readToken( GROUP_TOKEN ) ); + case SAME_GROUP: return state.previousGroups[slot]; + case NEW_GROUP: return state.previousGroups[slot] = new Group.Adapter( channel.getInt(), + (String) readToken( GROUP_TOKEN, channel ) ); default: throw new IllegalArgumentException( "Unknown group mode " + groupMode ); } } @@ -192,7 +263,7 @@ public long position() { try { - return channel.getCurrentPosition( positionMarker ).getByteOffset(); + return cacheChannel.getCurrentPosition( positionMarker ).getByteOffset(); } catch ( IOException e ) { @@ -205,7 +276,7 @@ public void close() { try { - channel.close(); + cacheChannel.close(); closeAction.run(); } catch ( IOException e ) @@ -213,4 +284,48 @@ public void close() throw new InputException( "Couldn't close channel for cached input data", e ); } } + + @Override + public boolean incrementNumberOfProcessors() + { + return processing.incrementNumberOfProcessors(); + } + + @Override + public boolean decrementNumberOfProcessors() + { + return processing.decrementNumberOfProcessors(); + } + + @Override + public int numberOfProcessors() + { + return processing.numberOfProcessors(); + } + + private class BatchProvidingIterator extends PrefetchingIterator + { + @Override + protected byte[] fetchNextOrNull() + { + try + { + int batchSize = safeCastLongToInt( cacheChannel.getLong() ); + if ( batchSize == InputCache.END_OF_CACHE ) + { + // We have reached end of cache + return null; + } + byte[] bytes = new byte[batchSize]; + cacheChannel.get( bytes, batchSize ); + + return bytes; + } + catch ( IOException e ) + { + // Batch size was probably wrong if we ended up here. + throw new RuntimeException( e ); + } + } + } } diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeCacher.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeCacher.java index 0c4ae92eb295..78a6a8eb2779 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeCacher.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeCacher.java @@ -38,10 +38,11 @@ public class InputNodeCacher extends InputEntityCacher { private String[] previousLabels = InputEntity.NO_LABELS; - public InputNodeCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, int bufferSize ) + public InputNodeCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, + int bufferSize, int batchSize ) throws IOException { - super( channel, header, recordFormats, bufferSize, 1 ); + super( channel, header, recordFormats, bufferSize, batchSize, 1 ); } @Override @@ -72,6 +73,13 @@ protected void writeEntity( InputNode node ) throws IOException } } + @Override + protected void clearState() + { + previousLabels = InputEntity.NO_LABELS; + super.clearState(); + } + protected void writeLabelDiff( byte mode, String[] compare, String[] with ) throws IOException { for ( String value : compare ) diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java index cb5902359017..98dd198567f4 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputNodeReader.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.neo4j.io.fs.StoreChannel; +import org.neo4j.kernel.impl.transaction.log.ReadableClosablePositionAwareChannel; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_LABEL_CHANGES; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_LABEL_FIELD; @@ -37,22 +38,22 @@ */ public class InputNodeReader extends InputEntityReader { - private String[] previousLabels = InputEntity.NO_LABELS; - - public InputNodeReader( StoreChannel channel, StoreChannel header, int bufferSize, Runnable closeAction ) - throws IOException + public InputNodeReader( StoreChannel channel, StoreChannel header, int bufferSize, Runnable closeAction, + int maxNbrOfProcessors ) throws IOException { - super( channel, header, bufferSize, 1, closeAction ); + super( channel, header, bufferSize, closeAction, maxNbrOfProcessors ); } @Override - protected InputNode readNextOrNull( Object properties ) throws IOException + protected InputNode readNextOrNull( Object properties, ProcessorState state ) throws IOException { + ReadableClosablePositionAwareChannel channel = state.batchChannel; + // group - Group group = readGroup( 0 ); + Group group = readGroup( 0, state ); // id - Object id = readValue(); + Object id = readValue( channel ); // labels (diff from previous node) byte labelsMode = channel.get(); @@ -63,25 +64,25 @@ protected InputNode readNextOrNull( Object properties ) throws IOException } else if ( labelsMode == END_OF_LABEL_CHANGES ) { // Same as for previous node - labels = previousLabels; + labels = state.previousLabels; } else { - String[] newLabels = previousLabels.clone(); + String[] newLabels = state.previousLabels.clone(); int cursor = newLabels.length; while ( labelsMode != END_OF_LABEL_CHANGES ) { switch ( labelsMode ) { - case LABEL_REMOVAL: remove( (String) readToken( LABEL_TOKEN ), newLabels, cursor-- ); break; + case LABEL_REMOVAL: remove( (String) readToken( LABEL_TOKEN, channel ), newLabels, cursor-- ); break; case LABEL_ADDITION: (newLabels = ensureRoomForOneMore( newLabels, cursor ))[cursor++] = - (String) readToken( LABEL_TOKEN ); break; + (String) readToken( LABEL_TOKEN, channel ); break; default: throw new IllegalArgumentException( "Unrecognized label mode " + labelsMode ); } labelsMode = channel.get(); } - labels = previousLabels = cursor == newLabels.length ? newLabels : Arrays.copyOf( newLabels, cursor ); + labels = state.previousLabels = cursor == newLabels.length ? newLabels : Arrays.copyOf( newLabels, cursor ); } return new InputNode( sourceDescription(), lineNumber(), position(), diff --git a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java index 3b3cff991bd8..f590bb2d3c30 100644 --- a/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java +++ b/community/kernel/src/main/java/org/neo4j/unsafe/impl/batchimport/input/InputRelationshipCacher.java @@ -37,9 +37,10 @@ public class InputRelationshipCacher extends InputEntityCacher { - private String previousType; - - public InputRelationshipReader( StoreChannel channel, StoreChannel header, int bufferSize, Runnable closeAction ) - throws IOException + public InputRelationshipReader( StoreChannel channel, StoreChannel header, int bufferSize, Runnable closeAction, + int maxNbrOfProcessors ) throws IOException { - super( channel, header, bufferSize, 2, closeAction ); + super( channel, header, bufferSize, closeAction, maxNbrOfProcessors ); } @Override - protected InputRelationship readNextOrNull( Object properties ) throws IOException + protected InputRelationship readNextOrNull( Object properties, ProcessorState state ) throws IOException { + ReadableClosablePositionAwareChannel channel = state.batchChannel; + // groups - Group startNodeGroup = readGroup( 0 ); - Group endNodeGroup = readGroup( 1 ); + Group startNodeGroup = readGroup( 0, state ); + Group endNodeGroup = readGroup( 1, state ); // ids - Object startNodeId = readValue(); - Object endNodeId = readValue(); + Object startNodeId = readValue( channel ); + Object endNodeId = readValue( channel ); // type byte typeMode = channel.get(); Object type; switch ( typeMode ) { - case SAME_TYPE: type = previousType; break; - case NEW_TYPE: type = previousType = (String) readToken( RELATIONSHIP_TYPE_TOKEN ); break; + case SAME_TYPE: type = state.previousType; break; + case NEW_TYPE: type = state.previousType = (String) readToken( RELATIONSHIP_TYPE_TOKEN, channel ); break; case HAS_TYPE_ID: type = channel.getInt(); break; default: throw new IllegalArgumentException( "Unrecognized type mode " + typeMode ); } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java index 2b16db0bceee..ba8c391da2a7 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputCacheTest.java @@ -19,6 +19,7 @@ */ package org.neo4j.unsafe.impl.batchimport.input; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.RuleChain; @@ -35,14 +36,21 @@ import org.neo4j.test.Randoms; import org.neo4j.test.TargetDirectory; import org.neo4j.test.TargetDirectory.TestDirectory; +import org.neo4j.unsafe.impl.batchimport.Configuration; +import org.neo4j.unsafe.impl.batchimport.Configuration.Default; import org.neo4j.unsafe.impl.batchimport.InputIterator; import static java.lang.Math.abs; +import static java.lang.System.currentTimeMillis; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; + +import static org.neo4j.helpers.Format.duration; import static org.neo4j.helpers.collection.Iterators.asSet; +import static org.neo4j.helpers.collection.Iterators.count; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.MAIN; import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_LABELS; import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES; @@ -69,7 +77,7 @@ public void shouldCacheAndRetrieveNodes() throws Exception { // GIVEN try ( InputCache cache = new InputCache( fileSystemRule.get(), dir.directory(), StandardV3_0.RECORD_FORMATS, - (int) ByteUnit.kibiBytes( 8 ) ) ) + withMaxProcessors( 50 ), (int) ByteUnit.kibiBytes( 8 ), BATCH_SIZE ) ) { List nodes = new ArrayList<>(); Randoms random = getRandoms(); @@ -91,6 +99,7 @@ public void shouldCacheAndRetrieveNodes() throws Exception // WHEN/THEN try ( InputIterator reader = cache.nodes( MAIN, true ).iterator() ) { + reader.setNumberOfProcessors( 50 ); Iterator expected = nodes.iterator(); while ( expected.hasNext() ) { @@ -110,7 +119,7 @@ public void shouldCacheAndRetrieveRelationships() throws Exception { // GIVEN try ( InputCache cache = new InputCache( fileSystemRule.get(), dir.directory(), StandardV3_0.RECORD_FORMATS, - (int) ByteUnit.kibiBytes( 8 ) ) ) + withMaxProcessors( 50 ), (int) ByteUnit.kibiBytes( 8 ), BATCH_SIZE ) ) { List relationships = new ArrayList<>(); Randoms random = getRandoms(); @@ -132,6 +141,7 @@ public void shouldCacheAndRetrieveRelationships() throws Exception // WHEN/THEN try ( InputIterator reader = cache.relationships( MAIN, true ).iterator() ) { + reader.setNumberOfProcessors( 50 ); Iterator expected = relationships.iterator(); while ( expected.hasNext() ) { @@ -146,6 +156,58 @@ public void shouldCacheAndRetrieveRelationships() throws Exception assertNoFilesLeftBehind(); } + @Ignore( "Shows performance improvement of adding more threads" ) + @Test + public void shouldReadQuickly() throws Exception + { + try ( InputCache cache = new InputCache( fileSystemRule.get(), dir.directory(), + StandardV3_0.RECORD_FORMATS, withMaxProcessors( 8 ) ) ) + { + Randoms random = new Randoms( randomRule.random(), Randoms.DEFAULT ); + try ( Receiver cacher = cache.cacheRelationships( MAIN ) ) + { + InputRelationship[] batch = new InputRelationship[1_000]; + for ( int i = 0; i < batch.length; i++ ) + { + batch[i] = randomRelationship( random ); + } + + for ( int b = 0; b < 100_000; b++ ) + { + cacher.receive( batch ); + if ( b % 10_000 == 0 ) + { + System.out.println( b ); + } + } + } + + for ( int i = 1; i <= 8; i++ ) + { + try ( InputIterator reader = cache.relationships( MAIN, false ).iterator() ) + { + reader.setNumberOfProcessors( i ); + long time = currentTimeMillis(); + count( reader ); + time = currentTimeMillis() - time; + System.out.println( i + ":" + duration( time ) ); + } + } + } + } + + private Default withMaxProcessors( int maxProcessors ) + { + return new Configuration.Default() + { + @Override + public int maxNumberOfProcessors() + { + return maxProcessors; + } + }; + } + private void assertNoFilesLeftBehind() { assertEquals( 0, fileSystemRule.get().listFiles( dir.directory() ).length ); @@ -193,7 +255,7 @@ private InputRelationship randomRelationship( Randoms random ) NO_PROPERTIES, abs( random.random().nextLong() ), randomGroup( random, 0 ), randomId( random ), randomGroup( random, 1 ), randomId( random ), - null, abs( random.random().nextInt( Short.MAX_VALUE ) ) ); + null, abs( random.random().nextInt( 20_000 ) ) ); } return new InputRelationship( null, 0, 0, @@ -254,7 +316,7 @@ private Group randomGroup( Randoms random, int slot ) { if ( random.random().nextFloat() < 0.01f ) { // Next group - return previousGroups[slot] = new Group.Adapter( previousGroups[slot].id()+1, random.string() ); + return previousGroups[slot] = new Group.Adapter( random.nextInt( 20_000 ), random.string() ); } // Keep same as previous return previousGroups[slot]; diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacherTokenCreationTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacherTokenCreationTest.java index 4fd4df57a620..831f9f42c608 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacherTokenCreationTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/InputEntityCacherTokenCreationTest.java @@ -248,13 +248,13 @@ private TestInputEntityCacher getEntityCacher( RecordFormats recordFormats ) thr private InputNodeCacher getNodeCacher( RecordFormats recordFormats ) throws IOException { return new InputNodeCacher( mock( StoreChannel.class ), - mock( StoreChannel.class ), recordFormats, 100 ); + mock( StoreChannel.class ), recordFormats, 100, 100 ); } private InputRelationshipCacher getRelationshipCacher( RecordFormats recordFormats ) throws IOException { return new InputRelationshipCacher( mock( StoreChannel.class ), - mock( StoreChannel.class ), recordFormats, 100 ); + mock( StoreChannel.class ), recordFormats, 100, 100 ); } private class TestInputEntityCacher extends InputEntityCacher @@ -262,7 +262,7 @@ private class TestInputEntityCacher extends InputEntityCacher TestInputEntityCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, int bufferSize, int groupSlots ) throws IOException { - super( channel, header, recordFormats, bufferSize, groupSlots ); + super( channel, header, recordFormats, bufferSize, 100, groupSlots ); } } } diff --git a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java index 5a58ba83d8d3..949db0cc5fbe 100644 --- a/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java +++ b/community/kernel/src/test/java/org/neo4j/unsafe/impl/batchimport/input/PerTypeRelationshipSplitterTest.java @@ -32,6 +32,7 @@ import org.neo4j.kernel.impl.store.format.standard.StandardV3_0; import org.neo4j.test.RandomRule; import org.neo4j.test.TargetDirectory; +import org.neo4j.unsafe.impl.batchimport.Configuration; import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterator; @@ -54,7 +55,7 @@ public void shouldReturnTypesOneByOne() throws Exception Collection expected = randomRelationships(); InputIterable relationships = wrap( "test", expected ); InputCache inputCache = new InputCache( new DefaultFileSystemAbstraction(), directory.directory(), - StandardV3_0.RECORD_FORMATS ); + StandardV3_0.RECORD_FORMATS, Configuration.DEFAULT ); PerTypeRelationshipSplitter perType = new PerTypeRelationshipSplitter( relationships.iterator(), types( expected ), type -> false, type -> Integer.parseInt( type.toString() ), inputCache );