Skip to content

Commit

Permalink
Parallel reading of cached input data
Browse files Browse the repository at this point in the history
For this the format needed to change a bit so that entities are written
in batches with a small header saying how big a batch is, byte-wise as
well as entity count-wise. This favors a setup where there's a dedicated
reader and multiple processors of that data.
  • Loading branch information
tinwelint committed Jun 22, 2016
1 parent ae8a6f5 commit ddb5d41
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 90 deletions.
Expand Up @@ -27,6 +27,9 @@
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.storageengine.api.ReadPastEndException; import org.neo4j.storageengine.api.ReadPastEndException;


/**
* Implementation of {@link ReadableClosablePositionAwareChannel} operating over a {@code byte[]} in memory.
*/
public class InMemoryClosableChannel implements ReadableClosablePositionAwareChannel, FlushablePositionAwareChannel public class InMemoryClosableChannel implements ReadableClosablePositionAwareChannel, FlushablePositionAwareChannel
{ {
private final byte[] bytes; private final byte[] bytes;
Expand All @@ -38,19 +41,22 @@ public InMemoryClosableChannel()
this( 1000 ); this( 1000 );
} }


public InMemoryClosableChannel( byte[] bytes ) public InMemoryClosableChannel( byte[] bytes, boolean append )
{ {
this.bytes = bytes; this.bytes = bytes;
this.asWriter = ByteBuffer.wrap( this.bytes ); this.asWriter = ByteBuffer.wrap( this.bytes );
this.asReader = ByteBuffer.wrap( this.bytes ); this.asReader = ByteBuffer.wrap( this.bytes );
if ( append )
{
this.asWriter.position( bytes.length );
}
} }


public InMemoryClosableChannel( int bufferSize ) public InMemoryClosableChannel( int bufferSize )
{ {
this( new byte[bufferSize] ); this( new byte[bufferSize], false );
} }



public void reset() public void reset()
{ {
asWriter.clear(); asWriter.clear();
Expand Down
Expand Up @@ -170,8 +170,29 @@ public void close() throws IOException
channel.close(); channel.close();
} }


/**
* @return the position of the channel, also taking into account buffer position.
* @throws IOException if underlying channel throws {@link IOException}.
*/
public long position() throws IOException public long position() throws IOException
{ {
return channel.position() + buffer.position(); return channel.position() + buffer.position();
} }

/**
* Sets position of this channel to the new {@code position}. This works only if the underlying channel
* supports positioning.
*
* @param position new position (byte offset) to set as new current position.
* @throws IOException if underlying channel throws {@link IOException}.
*/
public void position( long position ) throws IOException
{
// Currently we take the pessimistic approach of flushing (doesn't imply forcing) buffer to
// channel before moving to a new position. This works in all cases, but there could be
// made an optimization where we could see that we're moving within the current buffer range
// and if so skip flushing and simply move the cursor in the buffer.
prepareForFlush().flush();
channel.position( position );
}
} }
Expand Up @@ -22,8 +22,6 @@
import java.io.Flushable; import java.io.Flushable;
import java.io.IOException; import java.io.IOException;


import org.neo4j.io.fs.StoreChannel;

/** /**
* Decorator around a {@link LogVersionedStoreChannel} making it expose {@link FlushablePositionAwareChannel}. This * Decorator around a {@link LogVersionedStoreChannel} making it expose {@link FlushablePositionAwareChannel}. This
* implementation uses a {@link PhysicalFlushableChannel}, which provides buffering for write operations over the * implementation uses a {@link PhysicalFlushableChannel}, which provides buffering for write operations over the
Expand All @@ -32,7 +30,7 @@
public class PositionAwarePhysicalFlushableChannel implements FlushablePositionAwareChannel public class PositionAwarePhysicalFlushableChannel implements FlushablePositionAwareChannel
{ {
private LogVersionedStoreChannel logVersionedStoreChannel; private LogVersionedStoreChannel logVersionedStoreChannel;
private PhysicalFlushableChannel channel; private final PhysicalFlushableChannel channel;


public PositionAwarePhysicalFlushableChannel( LogVersionedStoreChannel logVersionedStoreChannel) { public PositionAwarePhysicalFlushableChannel( LogVersionedStoreChannel logVersionedStoreChannel) {
this.logVersionedStoreChannel = logVersionedStoreChannel; this.logVersionedStoreChannel = logVersionedStoreChannel;
Expand Down Expand Up @@ -111,4 +109,9 @@ void setChannel( LogVersionedStoreChannel channel )
this.logVersionedStoreChannel = channel; this.logVersionedStoreChannel = channel;
this.channel.setChannel( channel ); this.channel.setChannel( channel );
} }

public void setCurrentPosition( LogPosition position ) throws IOException, UnsupportedOperationException
{
channel.position( position.getByteOffset() );
}
} }
Expand Up @@ -132,7 +132,7 @@ public void doImport( Input input ) throws IOException
additionalInitialIds, dbConfig ); additionalInitialIds, dbConfig );
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset( CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
neoStore.getLastCommittedTransactionId() ); neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats ) ) InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats, config ) )
{ {
Collector badCollector = input.badCollector(); Collector badCollector = input.badCollector();
// Some temporary caches and indexes in the import // Some temporary caches and indexes in the import
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.format.RecordFormats; import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.InputIterable; import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.InputIterator; import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter; import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
Expand Down Expand Up @@ -85,6 +86,9 @@
* ELSE IF {@link #NEW_TYPE} * ELSE IF {@link #NEW_TYPE}
* 4B token id * 4B token id
* </pre> * </pre>
*
* The format stores entities in batches, each batch having a small header containing number of bytes
* and number of entities.
*/ */
public class InputCache implements Closeable public class InputCache implements Closeable
{ {
Expand Down Expand Up @@ -113,37 +117,53 @@ public class InputCache implements Closeable
static final byte NEW_TYPE = 1; static final byte NEW_TYPE = 1;
static final byte END_OF_HEADER = -2; static final byte END_OF_HEADER = -2;
static final short END_OF_ENTITIES = -3; static final short END_OF_ENTITIES = -3;
static final int NO_ENTITIES = 0;
static final long END_OF_CACHE = 0L;


private final FileSystemAbstraction fs; private final FileSystemAbstraction fs;
private final File cacheDirectory; private final File cacheDirectory;
private RecordFormats recordFormats; private final RecordFormats recordFormats;
private final Configuration config;

private final int bufferSize; private final int bufferSize;
private final Set<String> subTypes = new HashSet<>(); private final Set<String> subTypes = new HashSet<>();
private final int batchSize;


public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats ) public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats,
Configuration config )
{ {
this( fs, cacheDirectory, recordFormats, (int) ByteUnit.kibiBytes( 512 ) ); this( fs, cacheDirectory, recordFormats, config, (int) ByteUnit.kibiBytes( 512 ), 10_000 );
} }


public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats, int bufferSize ) /**
* @param fs {@link FileSystemAbstraction} to use
* @param cacheDirectory directory for placing the cached files
* @param config import configuration
* @param bufferSize buffer size for writing/reading cache files
* @param batchSize number of entities in each batch
*/
public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats,
Configuration config, int bufferSize, int batchSize )
{ {
this.fs = fs; this.fs = fs;
this.cacheDirectory = cacheDirectory; this.cacheDirectory = cacheDirectory;
this.recordFormats = recordFormats; this.recordFormats = recordFormats;
this.config = config;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
this.batchSize = batchSize;
} }


public Receiver<InputNode[],IOException> cacheNodes( String subType ) throws IOException public Receiver<InputNode[],IOException> cacheNodes( String subType ) throws IOException
{ {
return new InputNodeCacher( channel( NODES, subType, "rw" ), channel( NODES_HEADER, subType, "rw" ), return new InputNodeCacher( channel( NODES, subType, "rw" ), channel( NODES_HEADER, subType, "rw" ),
recordFormats, bufferSize ); recordFormats, bufferSize, batchSize );
} }


public Receiver<InputRelationship[],IOException> cacheRelationships( String subType ) throws public Receiver<InputRelationship[],IOException> cacheRelationships( String subType ) throws
IOException IOException
{ {
return new InputRelationshipCacher( channel( RELATIONSHIPS, subType, "rw" ), return new InputRelationshipCacher( channel( RELATIONSHIPS, subType, "rw" ),
channel( RELATIONSHIPS_HEADER, subType, "rw" ), recordFormats, bufferSize ); channel( RELATIONSHIPS_HEADER, subType, "rw" ), recordFormats, bufferSize, batchSize );
} }


private StoreChannel channel( String type, String subType, String mode ) throws IOException private StoreChannel channel( String type, String subType, String mode ) throws IOException
Expand All @@ -165,7 +185,8 @@ public InputIterable<InputNode> nodes( String subType, boolean deleteAfterUse )
public InputIterator<InputNode> get() throws IOException public InputIterator<InputNode> get() throws IOException
{ {
return new InputNodeReader( channel( NODES, subType, "r" ), channel( NODES_HEADER, subType, "r" ), return new InputNodeReader( channel( NODES, subType, "r" ), channel( NODES_HEADER, subType, "r" ),
bufferSize, deleteAction( deleteAfterUse, NODES, NODES_HEADER, subType ) ); bufferSize, deleteAction( deleteAfterUse, NODES, NODES_HEADER, subType ),
config.maxNumberOfProcessors() );
} }
} ); } );
} }
Expand All @@ -179,7 +200,8 @@ public InputIterator<InputRelationship> get() throws IOException
{ {
return new InputRelationshipReader( channel( RELATIONSHIPS, subType, "r" ), return new InputRelationshipReader( channel( RELATIONSHIPS, subType, "r" ),
channel( RELATIONSHIPS_HEADER, subType, "r" ), bufferSize, channel( RELATIONSHIPS_HEADER, subType, "r" ), bufferSize,
deleteAction( deleteAfterUse, RELATIONSHIPS, RELATIONSHIPS_HEADER, subType ) ); deleteAction( deleteAfterUse, RELATIONSHIPS, RELATIONSHIPS_HEADER, subType ),
config.maxNumberOfProcessors() );
} }
} ); } );
} }
Expand Down
Expand Up @@ -20,13 +20,16 @@
package org.neo4j.unsafe.impl.batchimport.input; package org.neo4j.unsafe.impl.batchimport.input;


import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;


import org.neo4j.io.ByteUnit; import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.format.RecordFormats; import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.transaction.log.FlushableChannel; import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
import org.neo4j.kernel.impl.transaction.log.LogPosition;
import org.neo4j.kernel.impl.transaction.log.LogPositionMarker;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel; import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PositionAwarePhysicalFlushableChannel; import org.neo4j.kernel.impl.transaction.log.PositionAwarePhysicalFlushableChannel;


Expand All @@ -48,30 +51,35 @@
*/ */
abstract class InputEntityCacher<ENTITY extends InputEntity> implements Receiver<ENTITY[],IOException> abstract class InputEntityCacher<ENTITY extends InputEntity> implements Receiver<ENTITY[],IOException>
{ {
protected final FlushableChannel channel; protected final PositionAwarePhysicalFlushableChannel channel;
private final FlushableChannel header; private final FlushableChannel header;
private final StoreChannel storeChannel; private final StoreChannel storeChannel;
private final StoreChannel headerChannel; private final StoreChannel headerChannel;
private final int[] previousGroupIds; private final int[] previousGroupIds;


private final int[] nextKeyId = new int[HIGH_TOKEN_TYPE]; private final int[] nextKeyId = new int[HIGH_TOKEN_TYPE];
private final int[] maxKeyId = new int[HIGH_TOKEN_TYPE]; private final int[] maxKeyId = new int[HIGH_TOKEN_TYPE];

@SuppressWarnings( "unchecked" ) @SuppressWarnings( "unchecked" )
private final Map<String,Integer>[] tokens = new Map[HIGH_TOKEN_TYPE]; private final Map<String,Integer>[] tokens = new Map[HIGH_TOKEN_TYPE];


protected InputEntityCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, int bufferSize, private final LogPositionMarker positionMarker = new LogPositionMarker();
int groupSlots ) throws IOException private LogPosition currentBatchStartPosition;
private int entitiesWritten;
private final int batchSize;

protected InputEntityCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats,
int bufferSize, int batchSize, int groupSlots )
throws IOException
{ {
this.storeChannel = channel; this.storeChannel = channel;
this.headerChannel = header; this.headerChannel = header;
this.batchSize = batchSize;
this.previousGroupIds = new int[groupSlots]; this.previousGroupIds = new int[groupSlots];


initMaxTokenKeyIds( recordFormats ); initMaxTokenKeyIds( recordFormats );
clearState();


for ( int i = 0; i < groupSlots; i++ )
{
previousGroupIds[i] = Group.GLOBAL.id();
}
// We don't really care about versions, it's just that apart from that the WritableLogChannel // We don't really care about versions, it's just that apart from that the WritableLogChannel
// does precisely what we want and there's certainly value in not duplicating that functionality. // does precisely what we want and there's certainly value in not duplicating that functionality.
this.channel = new PositionAwarePhysicalFlushableChannel( this.channel = new PositionAwarePhysicalFlushableChannel(
Expand All @@ -89,10 +97,56 @@ public void receive( ENTITY[] batch ) throws IOException
{ {
for ( ENTITY entity : batch ) for ( ENTITY entity : batch )
{ {
if ( entitiesWritten % batchSize == 0 )
{
newBatch();
}
entitiesWritten++;
writeEntity( entity ); writeEntity( entity );
} }
} }


// [ A ][ B ][.................................]
// |<-----A------------------------->| (B entities in total)
// |<------------------------------------------->|
private void newBatch() throws IOException
{
channel.getCurrentPosition( positionMarker );

// Set byte size in previous batch
if ( entitiesWritten > 0 )
{
// Remember the current position
// Go back to the start of this batch
channel.setCurrentPosition( currentBatchStartPosition );
// and set the size in that long field (not counting the size of the size field)
channel.putLong( positionMarker.getByteOffset() - currentBatchStartPosition.getByteOffset() - Long.BYTES );
// and number of entities written
channel.putInt( entitiesWritten );
// Now go back to where we were before updating this size field
channel.setCurrentPosition( positionMarker.newPosition() );
}

// Always add mark for the new batch here, this will simplify reader logic
startBatch();
}

private void startBatch() throws IOException
{
// Make room for size in new batch and number of entities
// Until this batch is finished, this mark the end of the cache.
clearState();
entitiesWritten = 0;
currentBatchStartPosition = positionMarker.newPosition();
channel.putLong( InputCache.END_OF_CACHE );
channel.putInt( InputCache.NO_ENTITIES );
}

protected void clearState()
{
Arrays.fill( previousGroupIds, Group.GLOBAL.id() );
}

protected void writeEntity( ENTITY entity ) throws IOException protected void writeEntity( ENTITY entity ) throws IOException
{ {
// properties // properties
Expand Down Expand Up @@ -160,7 +214,7 @@ protected void writeToken( byte type, Object key ) throws IOException
else if ( key instanceof Integer ) else if ( key instanceof Integer )
{ {
// Here we signal that we have a real token id, not to be confused by the local and contrived // Here we signal that we have a real token id, not to be confused by the local and contrived
// toiken ids we generate in here. Following this -1 is the real token id. // token ids we generate in here. Following this -1 is the real token id.
channel.putInt( (short) -1 ); channel.putInt( (short) -1 );
channel.putInt( (Integer) key ); channel.putInt( (Integer) key );
} }
Expand All @@ -173,6 +227,8 @@ else if ( key instanceof Integer )
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
newBatch();

header.put( END_OF_HEADER ); header.put( END_OF_HEADER );
// This is a special value denoting the end of the stream. This is done like this since // This is a special value denoting the end of the stream. This is done like this since
// properties are the first thing read for every entity. // properties are the first thing read for every entity.
Expand Down

0 comments on commit ddb5d41

Please sign in to comment.