Skip to content

Commit

Permalink
Individal id spaces for different token types in InputCache
Browse files Browse the repository at this point in the history
as well as support for Integer as token key. Previously all token types
shared a single id space (2^16 bits), which would theoretically run out in
some scenarios. This is now more correct and will handle all types of
input.
  • Loading branch information
tinwelint committed Apr 25, 2016
1 parent bf7948b commit b22d0c5
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 31 deletions.
Expand Up @@ -100,7 +100,11 @@ public class InputCache implements Closeable


static final byte SAME_GROUP = 0; static final byte SAME_GROUP = 0;
static final byte NEW_GROUP = 1; static final byte NEW_GROUP = 1;
static final byte TOKEN = 1; static final byte PROPERTY_KEY_TOKEN = 1;
static final byte LABEL_TOKEN = 2;
static final byte RELATIONSHIP_TYPE_TOKEN = 3;
static final byte GROUP_TOKEN = 4;
static final byte HIGH_TOKEN_TYPE = 5;
static final short HAS_FIRST_PROPERTY_ID = -1; static final short HAS_FIRST_PROPERTY_ID = -1;
static final byte HAS_LABEL_FIELD = 3; static final byte HAS_LABEL_FIELD = 3;
static final byte LABEL_REMOVAL = 1; static final byte LABEL_REMOVAL = 1;
Expand Down
Expand Up @@ -31,10 +31,12 @@
import static org.neo4j.unsafe.impl.batchimport.Utils.safeCastLongToShort; import static org.neo4j.unsafe.impl.batchimport.Utils.safeCastLongToShort;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_ENTITIES; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_ENTITIES;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_HEADER; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_HEADER;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.GROUP_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_FIRST_PROPERTY_ID; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_FIRST_PROPERTY_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HIGH_TOKEN_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.TOKEN; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.PROPERTY_KEY_TOKEN;


/** /**
* Abstract class for caching {@link InputEntity} or derivative to disk using a binary format. * Abstract class for caching {@link InputEntity} or derivative to disk using a binary format.
Expand All @@ -47,8 +49,9 @@ abstract class InputEntityCacher<ENTITY extends InputEntity> implements Receiver
private final StoreChannel headerChannel; private final StoreChannel headerChannel;
private final int[] previousGroupIds; private final int[] previousGroupIds;


private short nextKeyId; private final short[] nextKeyId = new short[HIGH_TOKEN_TYPE];
private final Map<String,Short> tokens = new HashMap<>(); @SuppressWarnings( "unchecked" )
private final Map<String,Short>[] tokens = new Map[HIGH_TOKEN_TYPE];


protected InputEntityCacher( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots ) protected InputEntityCacher( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots )
throws IOException throws IOException
Expand All @@ -66,6 +69,10 @@ protected InputEntityCacher( StoreChannel channel, StoreChannel header, int buff
new PhysicalLogVersionedStoreChannel( channel, 0, (byte)0 ), bufferSize ); new PhysicalLogVersionedStoreChannel( channel, 0, (byte)0 ), bufferSize );
this.header = new PositionAwarePhysicalFlushableChannel( this.header = new PositionAwarePhysicalFlushableChannel(
new PhysicalLogVersionedStoreChannel( header, 0, (byte)0 ), (int) ByteUnit.kibiBytes( 8 ) ); new PhysicalLogVersionedStoreChannel( header, 0, (byte)0 ), (int) ByteUnit.kibiBytes( 8 ) );
for ( int i = 0; i < tokens.length; i++ )
{
tokens[i] = new HashMap<>();
}
} }


@Override @Override
Expand All @@ -90,13 +97,13 @@ protected void writeEntity( ENTITY entity ) throws IOException
channel.putShort( safeCastLongToShort( properties.length/2 ) ); channel.putShort( safeCastLongToShort( properties.length/2 ) );
for ( int i = 0; i < properties.length; i++ ) for ( int i = 0; i < properties.length; i++ )
{ {
String key = (String) properties[i++]; Object key = properties[i++];
Object value = properties[i]; Object value = properties[i];
if ( value == null ) if ( value == null )
{ {
continue; continue;
} }
writeToken( key ); writeToken( PROPERTY_KEY_TOKEN, key );
writeValue( value ); writeValue( value );
} }
} }
Expand All @@ -112,7 +119,7 @@ protected void writeGroup( Group group, int slot ) throws IOException
{ {
channel.put( NEW_GROUP ); channel.put( NEW_GROUP );
channel.putInt( previousGroupIds[slot] = group.id() ); channel.putInt( previousGroupIds[slot] = group.id() );
writeToken( group.name() ); writeToken( GROUP_TOKEN, group.name() );
} }
} }


Expand All @@ -123,16 +130,34 @@ protected void writeValue( Object value ) throws IOException
type.write( value, channel ); type.write( value, channel );
} }


protected void writeToken( String key ) throws IOException protected void writeToken( byte type, Object key ) throws IOException
{ {
Short id = tokens.get( key ); if ( key instanceof String )
if ( id == null ) {
Short id = tokens[type].get( key );
if ( id == null )
{
if ( nextKeyId[type] == -1 )
{
throw new IllegalArgumentException( "Too many tokens" );
}
tokens[type].put( (String) key, id = nextKeyId[type]++ );
header.put( type );
ValueType.stringType().write( key, header );
}
channel.putShort( id );
}
else if ( key instanceof Integer )
{
// Here we signal that we have a real token id, not to be confused by the local and contrived
// toiken ids we generate in here. Following this -1 is the real token id.
channel.putShort( (short) -1 );
channel.putShort( safeCastLongToShort( (Integer) key ) );
}
else
{ {
tokens.put( key, id = nextKeyId++ ); throw new IllegalArgumentException( "Invalid key " + key + ", " + key.getClass() );
header.put( TOKEN );
ValueType.stringType().write( key, header );
} }
channel.putShort( id );
} }


@Override @Override
Expand Down
Expand Up @@ -35,10 +35,15 @@


import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS; import static org.neo4j.kernel.impl.transaction.log.LogVersionBridge.NO_MORE_CHANNELS;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_ENTITIES; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_ENTITIES;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.END_OF_HEADER;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.GROUP_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_FIRST_PROPERTY_ID; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_FIRST_PROPERTY_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HIGH_TOKEN_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_GROUP;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.TOKEN; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.PROPERTY_KEY_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;


/** /**
* Abstract class for reading cached entities previously stored using {@link InputEntityCacher} or derivative. * Abstract class for reading cached entities previously stored using {@link InputEntityCacher} or derivative.
Expand All @@ -50,12 +55,18 @@ abstract class InputEntityReader<ENTITY extends InputEntity> extends Prefetching
private final LogPositionMarker positionMarker = new LogPositionMarker(); private final LogPositionMarker positionMarker = new LogPositionMarker();
private int lineNumber; private int lineNumber;
private final Group[] previousGroups; private final Group[] previousGroups;
private final PrimitiveIntObjectMap<String> tokens = Primitive.intObjectMap(); private final PrimitiveIntObjectMap<String>[] tokens;
private final Runnable closeAction; private final Runnable closeAction;


@SuppressWarnings( "unchecked" )
InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots, InputEntityReader( StoreChannel channel, StoreChannel header, int bufferSize, int groupSlots,
Runnable closeAction ) throws IOException Runnable closeAction ) throws IOException
{ {
tokens = new PrimitiveIntObjectMap[HIGH_TOKEN_TYPE];
tokens[PROPERTY_KEY_TOKEN] = Primitive.intObjectMap();
tokens[LABEL_TOKEN] = Primitive.intObjectMap();
tokens[RELATIONSHIP_TYPE_TOKEN] = Primitive.intObjectMap();
tokens[GROUP_TOKEN] = Primitive.intObjectMap();
this.previousGroups = new Group[groupSlots]; this.previousGroups = new Group[groupSlots];
for ( int i = 0; i < groupSlots; i++ ) for ( int i = 0; i < groupSlots; i++ )
{ {
Expand All @@ -76,9 +87,13 @@ private void readHeader( StoreChannel header ) throws IOException
{ {
try ( ReadableClosableChannel reader = reader( header, (int) ByteUnit.kibiBytes( 8 ) ) ) try ( ReadableClosableChannel reader = reader( header, (int) ByteUnit.kibiBytes( 8 ) ) )
{ {
for ( short id = 0; reader.get() == TOKEN; id++ ) short[] tokenIds = new short[HIGH_TOKEN_TYPE];
byte type;
while ( (type = reader.get()) != END_OF_HEADER )
{ {
tokens.put( id, (String) ValueType.stringType().read( reader ) ); short tokenId = tokenIds[type]++;
String name = (String) ValueType.stringType().read( reader );
tokens[type].put( tokenId, name );
} }
} }
} }
Expand Down Expand Up @@ -119,17 +134,24 @@ private Object readProperties() throws IOException
Object[] properties = new Object[count*2]; Object[] properties = new Object[count*2];
for ( int i = 0; i < properties.length; i++ ) for ( int i = 0; i < properties.length; i++ )
{ {
properties[i++] = readToken(); properties[i++] = readToken( PROPERTY_KEY_TOKEN );
properties[i] = readValue(); properties[i] = readValue();
} }
return properties; return properties;
} }
} }


protected String readToken() throws IOException protected Object readToken( byte type ) throws IOException
{ {
short id = channel.getShort(); short id = channel.getShort();
String name = tokens.get( id ); if ( id == -1 )
{
// This is a real token id
int tokenId = channel.getShort() & 0xFFFF;
return tokenId; // as Integer
}

String name = tokens[type].get( id );
if ( name == null ) if ( name == null )
{ {
throw new IllegalArgumentException( "Unknown token " + id ); throw new IllegalArgumentException( "Unknown token " + id );
Expand All @@ -148,7 +170,8 @@ protected Group readGroup( int slot ) throws IOException
switch ( groupMode ) switch ( groupMode )
{ {
case SAME_GROUP: return previousGroups[slot]; case SAME_GROUP: return previousGroups[slot];
case NEW_GROUP: return previousGroups[slot] = new Group.Adapter( channel.getInt(), readToken() ); case NEW_GROUP: return previousGroups[slot] = new Group.Adapter( channel.getInt(),
(String) readToken( GROUP_TOKEN ) );
default: throw new IllegalArgumentException( "Unknown group mode " + groupMode ); default: throw new IllegalArgumentException( "Unknown group mode " + groupMode );
} }
} }
Expand Down
Expand Up @@ -28,6 +28,7 @@
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_LABEL_FIELD; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_LABEL_FIELD;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_ADDITION; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_ADDITION;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_REMOVAL; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_REMOVAL;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_TOKEN;


/** /**
* Caches {@link InputNode} to disk using a binary format. * Caches {@link InputNode} to disk using a binary format.
Expand Down Expand Up @@ -62,21 +63,21 @@ protected void writeEntity( InputNode node ) throws IOException
else else
{ // diff from previous node { // diff from previous node
String[] labels = node.labels(); String[] labels = node.labels();
writeDiff( LABEL_REMOVAL, previousLabels, labels ); writeLabelDiff( LABEL_REMOVAL, previousLabels, labels );
writeDiff( LABEL_ADDITION, labels, previousLabels ); writeLabelDiff( LABEL_ADDITION, labels, previousLabels );
channel.put( END_OF_LABEL_CHANGES ); channel.put( END_OF_LABEL_CHANGES );
previousLabels = labels; previousLabels = labels;
} }
} }


protected void writeDiff( byte mode, String[] compare, String[] with ) throws IOException protected void writeLabelDiff( byte mode, String[] compare, String[] with ) throws IOException
{ {
for ( String value : compare ) for ( String value : compare )
{ {
if ( !contains( with, value ) ) if ( !contains( with, value ) )
{ {
channel.put( mode ); channel.put( mode );
writeToken( value ); writeToken( LABEL_TOKEN, value );
} }
} }
} }
Expand Down
Expand Up @@ -28,6 +28,7 @@
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_LABEL_FIELD; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_LABEL_FIELD;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_ADDITION; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_ADDITION;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_REMOVAL; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_REMOVAL;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.LABEL_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_LABELS; import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_LABELS;
import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES; import static org.neo4j.unsafe.impl.batchimport.input.InputEntity.NO_PROPERTIES;


Expand Down Expand Up @@ -72,9 +73,10 @@ else if ( labelsMode == END_OF_LABEL_CHANGES )
{ {
switch ( labelsMode ) switch ( labelsMode )
{ {
case LABEL_REMOVAL: remove( readToken(), newLabels, cursor-- ); break; case LABEL_REMOVAL: remove( (String) readToken( LABEL_TOKEN ), newLabels, cursor-- ); break;
case LABEL_ADDITION: case LABEL_ADDITION:
(newLabels = ensureRoomForOneMore( newLabels, cursor ))[cursor++] = readToken(); break; (newLabels = ensureRoomForOneMore( newLabels, cursor ))[cursor++] =
(String) readToken( LABEL_TOKEN ); break;
default: throw new IllegalArgumentException( "Unrecognized label mode " + labelsMode ); default: throw new IllegalArgumentException( "Unrecognized label mode " + labelsMode );
} }
labelsMode = channel.get(); labelsMode = channel.get();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;


import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_TYPE; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_TYPE; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SPECIFIC_ID; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SPECIFIC_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_TYPE_ID; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_TYPE_ID;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void writeEntity( InputRelationship relationship ) throws IOException
else else
{ {
channel.put( NEW_TYPE ); channel.put( NEW_TYPE );
writeToken( previousType = relationship.type() ); writeToken( RELATIONSHIP_TYPE_TOKEN, previousType = relationship.type() );
} }
} }
} }
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;


import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_TYPE; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.NEW_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.RELATIONSHIP_TYPE_TOKEN;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_TYPE; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SAME_TYPE;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SPECIFIC_ID; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.SPECIFIC_ID;
import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_TYPE_ID; import static org.neo4j.unsafe.impl.batchimport.input.InputCache.HAS_TYPE_ID;
Expand Down Expand Up @@ -62,7 +63,7 @@ protected InputRelationship readNextOrNull( Object properties ) throws IOExcepti
switch ( typeMode ) switch ( typeMode )
{ {
case SAME_TYPE: type = previousType; break; case SAME_TYPE: type = previousType; break;
case NEW_TYPE: type = previousType = readToken(); break; case NEW_TYPE: type = previousType = (String) readToken( RELATIONSHIP_TYPE_TOKEN ); break;
case HAS_TYPE_ID: type = channel.getInt(); break; case HAS_TYPE_ID: type = channel.getInt(); break;
default: throw new IllegalArgumentException( "Unrecognized type mode " + typeMode ); default: throw new IllegalArgumentException( "Unrecognized type mode " + typeMode );
} }
Expand Down
Expand Up @@ -259,7 +259,7 @@ private Object[] randomProperties( Randoms random )
Object[] properties = new Object[length*2]; Object[] properties = new Object[length*2];
for ( int i = 0; i < properties.length; i++ ) for ( int i = 0; i < properties.length; i++ )
{ {
properties[i++] = random.among( TOKENS ); properties[i++] = random.random().nextFloat() < 0.2f ? random.intBetween( 0, 10 ) : random.among( TOKENS );
properties[i] = random.propertyValue(); properties[i] = random.propertyValue();
} }
return properties; return properties;
Expand Down

0 comments on commit b22d0c5

Please sign in to comment.