Skip to content

Commit

Permalink
Limit amount of tokens created by batch import in accordance with sto…
Browse files Browse the repository at this point in the history
…re format capabilities.

Batch import currently allow create only 2^16 different tokens of each type during import.
This commit introduce a check for max allowable number of tokens of particular type in accordance with current store format.
In case if number of tokens over exceed maximum allowable number exception will be thrown and import will be interrupted.
  • Loading branch information
MishaDemianenko committed Jun 7, 2016
1 parent b077c72 commit 71ee2d3
Show file tree
Hide file tree
Showing 13 changed files with 401 additions and 68 deletions.
Expand Up @@ -403,8 +403,7 @@ private NeoStores instantiateLegacyStore( RecordFormats format, File storeDir )
}

private void prepareBatchImportMigration( File storeDir, File migrationDir, RecordFormats oldFormat,
RecordFormats newFormat )
throws IOException
RecordFormats newFormat ) throws IOException
{
BatchingNeoStores.createStore( fileSystem, migrationDir.getPath(), config, newFormat );

Expand Down
Expand Up @@ -31,7 +31,10 @@
import org.neo4j.kernel.configuration.Config;
import org.neo4j.kernel.impl.api.CountsAccessor;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.kernel.impl.logging.NullLogService;
import org.neo4j.kernel.impl.store.RelationshipStore;
import org.neo4j.kernel.impl.store.format.RecordFormatSelector;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.logging.Log;
import org.neo4j.unsafe.impl.batchimport.cache.NodeLabelsCache;
import org.neo4j.unsafe.impl.batchimport.cache.NodeRelationshipCache;
Expand All @@ -51,7 +54,6 @@
import org.neo4j.unsafe.impl.batchimport.store.io.IoMonitor;

import static java.lang.System.currentTimeMillis;

import static org.neo4j.helpers.collection.Iterators.asSet;
import static org.neo4j.unsafe.impl.batchimport.AdditionalInitialIds.EMPTY;
import static org.neo4j.unsafe.impl.batchimport.SourceOrCachedInputIterable.cachedForSure;
Expand Down Expand Up @@ -125,11 +127,12 @@ public void doImport( Input input ) throws IOException
boolean hasBadEntries = false;
File badFile = new File( storeDir, Configuration.BAD_FILE_NAME );
CountingStoreUpdateMonitor storeUpdateMonitor = new CountingStoreUpdateMonitor();
try ( BatchingNeoStores neoStore = new BatchingNeoStores( fileSystem, storeDir, config, logService,
RecordFormats recordFormats = RecordFormatSelector.autoSelectFormat( dbConfig, NullLogService.getInstance() );
try ( BatchingNeoStores neoStore = new BatchingNeoStores( fileSystem, storeDir, recordFormats, config, logService,
additionalInitialIds, dbConfig );
CountsAccessor.Updater countsUpdater = neoStore.getCountsStore().reset(
neoStore.getLastCommittedTransactionId() );
InputCache inputCache = new InputCache( fileSystem, storeDir ) )
InputCache inputCache = new InputCache( fileSystem, storeDir, recordFormats ) )
{
Collector badCollector = input.badCollector();
// Some temporary caches and indexes in the import
Expand Down
Expand Up @@ -24,10 +24,12 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.function.ThrowingSupplier;
import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.unsafe.impl.batchimport.InputIterable;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.ParallelBatchImporter;
Expand Down Expand Up @@ -70,10 +72,6 @@
* <pre>
* Relationship format:
* - properties (see "Properties format")
* - specific id:
* - 1B specific id boolean, {@link #SPECIFIC_ID} or {@link #UNSPECIFIED_ID}
* IF {@link #SPECIFIC_ID}
* - 8B specific relationship id
* - start node group (see "Group format")
* - end node group (see "Group format")
* - start node id
Expand Down Expand Up @@ -118,31 +116,34 @@ public class InputCache implements Closeable

private final FileSystemAbstraction fs;
private final File cacheDirectory;
private RecordFormats recordFormats;
private final int bufferSize;
private final Set<String> subTypes = new HashSet<>();

public InputCache( FileSystemAbstraction fs, File cacheDirectory )
public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats )
{
this( fs, cacheDirectory, (int) ByteUnit.kibiBytes( 512 ) );
this( fs, cacheDirectory, recordFormats, (int) ByteUnit.kibiBytes( 512 ) );
}

public InputCache( FileSystemAbstraction fs, File cacheDirectory, int bufferSize )
public InputCache( FileSystemAbstraction fs, File cacheDirectory, RecordFormats recordFormats, int bufferSize )
{
this.fs = fs;
this.cacheDirectory = cacheDirectory;
this.recordFormats = recordFormats;
this.bufferSize = bufferSize;
}

public Receiver<InputNode[],IOException> cacheNodes( String subType ) throws IOException
{
return new InputNodeCacher( channel( NODES, subType, "rw" ), channel( NODES_HEADER, subType, "rw" ),
bufferSize );
recordFormats, bufferSize );
}

public Receiver<InputRelationship[],IOException> cacheRelationships( String subType ) throws IOException
public Receiver<InputRelationship[],IOException> cacheRelationships( String subType ) throws
IOException
{
return new InputRelationshipCacher( channel( RELATIONSHIPS, subType, "rw" ),
channel( RELATIONSHIPS_HEADER, subType, "rw" ), bufferSize );
channel( RELATIONSHIPS_HEADER, subType, "rw" ), recordFormats, bufferSize );
}

private StoreChannel channel( String type, String subType, String mode ) throws IOException
Expand Down
Expand Up @@ -22,8 +22,10 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.neo4j.io.ByteUnit;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.format.RecordFormats;
import org.neo4j.kernel.impl.transaction.log.FlushableChannel;
import org.neo4j.kernel.impl.transaction.log.PhysicalLogVersionedStoreChannel;
import org.neo4j.kernel.impl.transaction.log.PositionAwarePhysicalFlushableChannel;
Expand All @@ -34,9 +36,11 @@
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.GROUP_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_FIRST_PROPERTY_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HIGH_TOKEN_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.PROPERTY_KEY_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;

/**
* Abstract class for caching {@link InputEntity} or derivative to disk using a binary format.
Expand All @@ -49,16 +53,20 @@ abstract class InputEntityCacher<ENTITY extends InputEntity> implements Receiver
private final StoreChannel headerChannel;
private final int[] previousGroupIds;

private final short[] nextKeyId = new short[HIGH_TOKEN_TYPE];
private final int[] nextKeyId = new int[HIGH_TOKEN_TYPE];
private final long[] maxKeyId = new long[HIGH_TOKEN_TYPE];
@SuppressWarnings( "unchecked" )
private final Map<String,Short>[] tokens = new Map[HIGH_TOKEN_TYPE];
private final Map<String,Integer>[] tokens = new Map[HIGH_TOKEN_TYPE];

protected InputEntityCacher( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots )
throws IOException
protected InputEntityCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, int bufferSize,
int groupSlots ) throws IOException
{
this.storeChannel = channel;
this.headerChannel = header;
this.previousGroupIds = new int[groupSlots];

initMaxTokenKeyIds( recordFormats );

for ( int i = 0; i < groupSlots; i++ )
{
previousGroupIds[i] = Group.GLOBAL.id();
Expand Down Expand Up @@ -134,25 +142,26 @@ protected void writeToken( byte type, Object key ) throws IOException
{
if ( key instanceof String )
{
Short id = tokens[type].get( key );
Integer id = tokens[type].get( key );
if ( id == null )
{
if ( nextKeyId[type] == -1 )
if ( nextKeyId[type] == maxKeyId[type] )
{
throw new IllegalArgumentException( "Too many tokens" );
throw new UnsupportedOperationException( "Too many tokens. Creation of more then " +
maxKeyId[type] + " tokens is not supported." );
}
tokens[type].put( (String) key, id = nextKeyId[type]++ );
header.put( type );
ValueType.stringType().write( key, header );
}
channel.putShort( id );
channel.putInt( id );
}
else if ( key instanceof Integer )
{
// Here we signal that we have a real token id, not to be confused by the local and contrived
// toiken ids we generate in here. Following this -1 is the real token id.
channel.putShort( (short) -1 );
channel.putShort( safeCastLongToShort( (Integer) key ) );
channel.putInt( (short) -1 );
channel.putInt( (Integer) key );
}
else
{
Expand All @@ -173,4 +182,12 @@ public void close() throws IOException
storeChannel.close();
headerChannel.close();
}

private void initMaxTokenKeyIds( RecordFormats recordFormats )
{
maxKeyId[PROPERTY_KEY_TOKEN] = recordFormats.propertyKeyToken().getMaxId();
maxKeyId[LABEL_TOKEN] = recordFormats.labelToken().getMaxId();
maxKeyId[RELATIONSHIP_TYPE_TOKEN] = recordFormats.relationshipTypeToken().getMaxId();
maxKeyId[GROUP_TOKEN] = recordFormats.relationshipGroup().getMaxId();
}
}
Expand Up @@ -41,9 +41,9 @@
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HIGH_TOKEN_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.PROPERTY_KEY_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;

/**
* Abstract class for reading cached entities previously stored using {@link InputEntityCacher} or derivative.
Expand Down Expand Up @@ -87,11 +87,11 @@ private void readHeader( StoreChannel header ) throws IOException
{
try ( ReadableClosableChannel reader = reader( header, (int) ByteUnit.kibiBytes( 8 ) ) )
{
short[] tokenIds = new short[HIGH_TOKEN_TYPE];
int[] tokenIds = new int[HIGH_TOKEN_TYPE];
byte type;
while ( (type = reader.get()) != END_OF_HEADER )
{
short tokenId = tokenIds[type]++;
int tokenId = tokenIds[type]++;
String name = (String) ValueType.stringType().read( reader );
tokens[type].put( tokenId, name );
}
Expand Down Expand Up @@ -143,12 +143,11 @@ private Object readProperties() throws IOException

protected Object readToken( byte type ) throws IOException
{
short id = channel.getShort();
int id = channel.getInt();
if ( id == -1 )
{
// This is a real token id
int tokenId = channel.getShort() & 0xFFFF;
return tokenId; // as Integer
return channel.getInt();
}

String name = tokens[type].get( id );
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;

import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.format.RecordFormats;

import static org.neo4j.helpers.ArrayUtil.contains;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_LABEL_CHANGES;
Expand All @@ -37,9 +38,10 @@ public class InputNodeCacher extends InputEntityCacher<InputNode>
{
private String[] previousLabels = InputEntity.NO_LABELS;

public InputNodeCacher( StoreChannel channel, StoreChannel header, int bufferSize ) throws IOException
public InputNodeCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats, int bufferSize )
throws IOException
{
super( channel, header, bufferSize, 1 );
super( channel, header, recordFormats, bufferSize, 1 );
}

@Override
Expand Down
Expand Up @@ -20,12 +20,14 @@
package org.neo4j.unsafe.impl.batchimport.input;

import java.io.IOException;

import org.neo4j.io.fs.StoreChannel;
import org.neo4j.kernel.impl.store.format.RecordFormats;

import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_TYPE_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_TYPE_ID;

/**
* Caches {@link InputRelationship} to disk using a binary format.
Expand All @@ -34,9 +36,10 @@ public class InputRelationshipCacher extends InputEntityCacher<InputRelationship
{
private String previousType;

public InputRelationshipCacher( StoreChannel channel, StoreChannel header, int bufferSize ) throws IOException
public InputRelationshipCacher( StoreChannel channel, StoreChannel header, RecordFormats recordFormats,
int bufferSize ) throws IOException
{
super( channel, header, bufferSize, 2 );
super( channel, header, recordFormats, bufferSize, 2 );
}

@Override
Expand Down
Expand Up @@ -20,7 +20,7 @@
package org.neo4j.unsafe.impl.batchimport.input;

/**
* A {@link Listener} which is designed to receive one or more items, to then finally be closed
* Listener which is designed to receive one or more items, to then finally be closed
* when all items have been received.
*/
public interface Receiver<T,EXCEPTION extends Exception> extends AutoCloseable
Expand Down
Expand Up @@ -90,8 +90,8 @@ public class BatchingNeoStores implements AutoCloseable
private final LabelScanStore labelScanStore;
private final IoTracer ioTracer;

public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Configuration config,
LogService logService, AdditionalInitialIds initialIds, Config dbConfig )
public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, RecordFormats recordFormats,
Configuration config, LogService logService, AdditionalInitialIds initialIds, Config dbConfig )
{
this.fileSystem = fileSystem;
this.logProvider = logService.getInternalLogProvider();
Expand All @@ -109,7 +109,7 @@ public BatchingNeoStores( FileSystemAbstraction fileSystem, File storeDir, Confi
final PageCacheTracer tracer = new DefaultPageCacheTracer();
this.pageCache = createPageCache( fileSystem, neo4jConfig, logProvider, tracer );
this.ioTracer = tracer::bytesWritten;
this.neoStores = newNeoStores( pageCache );
this.neoStores = newNeoStores( pageCache, recordFormats );
if ( alreadyContainsData( neoStores ) )
{
neoStores.close();
Expand Down Expand Up @@ -206,11 +206,11 @@ public static void createStore( FileSystemAbstraction fileSystem, String storeDi
}
}

private NeoStores newNeoStores( PageCache pageCache )
private NeoStores newNeoStores( PageCache pageCache, RecordFormats recordFormats )
{
BatchingIdGeneratorFactory idGeneratorFactory = new BatchingIdGeneratorFactory( fileSystem );
StoreFactory storeFactory = new StoreFactory( storeDir, neo4jConfig, idGeneratorFactory, pageCache, fileSystem,
logProvider );
recordFormats, logProvider );
return storeFactory.openAllNeoStores( true );
}

Expand Down

0 comments on commit 71ee2d3

Please sign in to comment.