Skip to content

Commit

Permalink
Reuse internal values array in StorePropertyCursor
Browse files Browse the repository at this point in the history
Introduce little helper class for representing a block of data.

This should reduce GC.
  • Loading branch information
davidegrohmann committed Aug 28, 2015
1 parent a8e818e commit ac9fdef
Show file tree
Hide file tree
Showing 2 changed files with 357 additions and 240 deletions.
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.kernel.impl.util.Bits;

import static org.neo4j.kernel.impl.store.PropertyStore.DEFAULT_PAYLOAD_SIZE;
import static org.neo4j.kernel.impl.store.PropertyType.ARRAY;
import static org.neo4j.kernel.impl.store.PropertyType.BOOL;
import static org.neo4j.kernel.impl.store.PropertyType.BYTE;
Expand All @@ -67,6 +68,8 @@ public class StorePropertyCursor implements Cursor<PropertyItem>, PropertyItem
private static final int INTERNAL_BYTE_ARRAY_SIZE = 4096;

private final ByteBuffer cachedBuffer = ByteBuffer.allocate( INTERNAL_BYTE_ARRAY_SIZE );
private final Block block = new Block();

private final PropertyStore propertyStore;
private final Consumer<StorePropertyCursor> instanceCache;
private final DynamicStringStore stringStore;
Expand All @@ -76,16 +79,12 @@ public class StorePropertyCursor implements Cursor<PropertyItem>, PropertyItem
private PageCursor cursor;

private int offsetAtBeginning;
private int blocks;
private int remainingBlocksToRead;
private long header;
private PropertyType type;
private int keyId;
private ByteBuffer buffer;

private AbstractDynamicStore.DynamicRecordCursor recordCursor;
private long currentRecordId;
private long[] valuesAfterHeader;
private boolean seekingForFirstBlock;

public StorePropertyCursor( PropertyStore propertyStore, Consumer<StorePropertyCursor> instanceCache )
Expand All @@ -105,9 +104,8 @@ public StorePropertyCursor init( long firstPropertyId )

buffer = cachedBuffer;
nextPropertyRecordId = firstPropertyId;
blocks = 0;
remainingBlocksToRead = 0;
seekingForFirstBlock = false;
block.init();

return this;
}
Expand Down Expand Up @@ -151,44 +149,14 @@ private void readPropertyData() throws IOException
break;
case SHORT_STRING:
case SHORT_ARRAY:
ensureValuesLoaded();
block.ensureLoadedData( cursor );
break;

default:
throw new IllegalStateException();
}
}

private void ensureValuesLoaded()
{
if ( remainingBlocksToRead > 0 )
{
if ( valuesAfterHeader == null || valuesAfterHeader.length < remainingBlocksToRead )
{
valuesAfterHeader = new long[remainingBlocksToRead];
}

int offset = cursor.getOffset();
try
{
do
{
cursor.setOffset( offset );
for ( int i = 0; i < blocks; i++ )
{
valuesAfterHeader[i] = cursor.getLong();
}
}
while ( cursor.shouldRetry() );
remainingBlocksToRead -= blocks;
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}
}

private void readFromStore( AbstractDynamicStore store ) throws IOException
{
int storeOffset = cursor.getOffset();
Expand All @@ -201,7 +169,7 @@ private void readFromStore( AbstractDynamicStore store ) throws IOException
}

buffer.clear();
long startBlockId = PropertyBlock.fetchLong( header );
long startBlockId = PropertyBlock.fetchLong( block.header() );
try ( GenericCursor<DynamicRecord> records = store.getRecordsCursor( startBlockId, true, recordCursor ) )
{
while ( records.next() )
Expand Down Expand Up @@ -231,7 +199,7 @@ private int newCapacity( int required )
{
newCapacity = buffer.capacity() * 2;
}
while ( newCapacity - buffer.limit() < required );
while ( newCapacity - buffer.limit() < required );
return newCapacity;
}

Expand All @@ -246,7 +214,6 @@ public void close()
}

type = null;
valuesAfterHeader = null;
recordCursor = null;
buffer = null;

Expand Down Expand Up @@ -287,35 +254,18 @@ private void nextRecord()
private boolean nextBlock()
{
// Skip remaining data from previous property (if it was not read)
if (remainingBlocksToRead > 0)
{
cursor.setOffset( cursor.getOffset() + remainingBlocksToRead * 8 );
remainingBlocksToRead = 0;
}
block.skipUnreadData( cursor );
block.init();

int offset = cursor.getOffset();
if ( offset - offsetAtBeginning < PropertyStore.RECORD_SIZE )
if ( cursor.getOffset() - offsetAtBeginning < PropertyStore.RECORD_SIZE )
{
try
{
do
{
cursor.setOffset( offset );
header = cursor.getLong();
}
while ( cursor.shouldRetry() );
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}

type = PropertyType.getPropertyType( header, true );
block.fetchHeader( cursor );
type = PropertyType.getPropertyType( block.header(), true );
if ( type != null )
{
seekingForFirstBlock = false;
keyId = (int) (header & KEY_BITMASK);
blocks = remainingBlocksToRead = type.calculateNumberOfBlocksUsed( header ) - 1;
keyId = (int) (block.header() & KEY_BITMASK);
block.remaining( type.calculateNumberOfBlocksUsed( block.header() ) - 1 );
return true;
}
}
Expand Down Expand Up @@ -389,6 +339,7 @@ private Object getRightArray()
}
}

@Override
public int propertyKeyId()
{
return keyId;
Expand Down Expand Up @@ -430,65 +381,65 @@ private boolean parseBooleanValue()
{
assertReadingStatus();
assertOfType( BOOL );
return PropertyBlock.fetchByte( header ) == 1;
return PropertyBlock.fetchByte( block.header() ) == 1;
}

private byte parseByteValue()
{
assertReadingStatus();
assertOfType( BYTE );
return PropertyBlock.fetchByte( header );
return PropertyBlock.fetchByte( block.header() );
}

private short parseShortValue()
{
assertReadingStatus();
assertOfType( SHORT );
return PropertyBlock.fetchShort( header );
return PropertyBlock.fetchShort( block.header() );
}

private int parseIntValue()
{
assertReadingStatus();
assertOfType( INT );
return PropertyBlock.fetchInt( header );
return PropertyBlock.fetchInt( block.header() );
}

private long parseLongValue()
{
assertReadingStatus();
assertOfType( LONG );
if ( PropertyBlock.valueIsInlined( header ) )
if ( PropertyBlock.valueIsInlined( block.header() ) )
{
return PropertyBlock.fetchLong( header ) >>> 1;
return PropertyBlock.fetchLong( block.header() ) >>> 1;
}
else
{
ensureValuesLoaded();
return valuesAfterHeader[0];
block.ensureLoadedData( cursor );
return block.peekSingleValue();
}
}

private float parseFloatValue()
{
assertReadingStatus();
assertOfType( FLOAT );
return Float.intBitsToFloat( PropertyBlock.fetchInt( header ) );
return Float.intBitsToFloat( PropertyBlock.fetchInt( block.header() ) );
}

private double parseDoubleValue()
{
assertReadingStatus();
assertOfType( DOUBLE );
ensureValuesLoaded();
return Double.longBitsToDouble( valuesAfterHeader[0] );
block.ensureLoadedData( cursor );
return Double.longBitsToDouble( block.peekSingleValue() );
}

private char parseCharValue()
{
assertReadingStatus();
assertOfType( CHAR );
return (char) PropertyBlock.fetchShort( header );
return (char) PropertyBlock.fetchShort( block.header() );
}

private String parseStringValue()
Expand All @@ -505,7 +456,7 @@ private String parseStringValue()
}
if ( type == SHORT_STRING )
{
return LongerShortString.decode( getBitsFromLongs() );
return LongerShortString.decode( block.toBits() );
}
else // STRING
{
Expand All @@ -528,28 +479,14 @@ private Object parseArrayValue()
}
if ( type == SHORT_ARRAY )
{
return ShortArray.decode( getBitsFromLongs() );
return ShortArray.decode( block.toBits() );
}
else
{
return getRightArray();
}
}

private Bits getBitsFromLongs()
{
Bits bits = Bits.bits( BITS_BYTE_SIZE );
bits.put( header );
if ( valuesAfterHeader != null )
{
for ( int i = 0; i < blocks; i++ )
{
bits.put( valuesAfterHeader[i] );
}
}
return bits;
}

private void assertReadingStatus()
{
if ( type == null )
Expand All @@ -573,4 +510,104 @@ private void assertOfOneOfTypes( PropertyType type1, PropertyType type2 )
throw new IllegalStateException( "Expected type " + type1 + " or " + type2 + " but was " + this.type );
}
}

private static class Block
{
public static final int VALUES_SIZE = DEFAULT_PAYLOAD_SIZE / 8;

private int writeIndex;
private long[] values = new long[VALUES_SIZE];
private int remaining;

public void init()
{
writeIndex = 0;
remaining = 0;
for ( int i = 0; i < VALUES_SIZE; i++ )
{
values[i] = -1;
}
}

public long header()
{
assert writeIndex > 0;
return values[0];
}

public long peekSingleValue()
{
assert writeIndex > 1;
return values[1];
}

public Bits toBits()
{
Bits bits = Bits.bits( BITS_BYTE_SIZE );
for ( int i = 0; i < writeIndex; i++ )
{
bits.put( values[i] );
}
return bits;
}

public void fetchHeader( PageCursor cursor )
{
try
{
fetchLongs( cursor, 1 );
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}

public void remaining( int remaining )
{
this.remaining = remaining;
}

public void ensureLoadedData( PageCursor cursor )
{
if ( remaining <= 0)
{
return;
}

try
{
fetchLongs( cursor, remaining );
remaining = 0;
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}

private void fetchLongs( PageCursor cursor, int num ) throws IOException
{
int offset = cursor.getOffset();
do
{
cursor.setOffset( offset );
for ( int i = 0; i < num; i++ )
{
values[writeIndex+i] = cursor.getLong();
}
}
while ( cursor.shouldRetry() );
writeIndex += num;
}

public void skipUnreadData( PageCursor cursor )
{
if ( remaining > 0 )
{
cursor.setOffset( cursor.getOffset() + remaining * 8 );
remaining = 0;
}
}
}
}

0 comments on commit ac9fdef

Please sign in to comment.