Skip to content

Commit

Permalink
Merge pull request #6 from davidegrohmann/3.3-page-cursors-in-propert…
Browse files Browse the repository at this point in the history
…y-cursors

Use PageCursors directly in PropertyCursors
  • Loading branch information
ragadeeshu authored and davidegrohmann committed May 8, 2017
2 parents 48c374c + 32d5a00 commit 4b462d0
Show file tree
Hide file tree
Showing 13 changed files with 189 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ public class NodeCursor implements NodeItem, Cursor<NodeItem>, Disposable
private Iterator<Long> added;
private PageCursor pageCursor;

NodeCursor( NodeRecord nodeRecord, Consumer<NodeCursor> instanceCache, NodeStore nodeStore,
LockService lockService )
NodeCursor( NodeStore nodeStore, Consumer<NodeCursor> instanceCache, LockService lockService )
{
this.pageCursor = nodeStore.newPageCursor();
this.nodeRecord = nodeRecord;
this.nodeRecord = nodeStore.newRecord();
this.instanceCache = instanceCache;
this.nodeStore = nodeStore;
this.lockService = lockService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,54 @@
*/
package org.neo4j.kernel.impl.api.store;

import java.io.IOException;
import java.util.function.IntPredicate;

import org.neo4j.cursor.Cursor;
import org.neo4j.function.Disposable;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.api.properties.DefinedProperty;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.kernel.impl.store.RecordCursors;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.Record;
import org.neo4j.storageengine.api.PropertyItem;
import org.neo4j.storageengine.api.txstate.PropertyContainerState;

import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_PROPERTY;
import static org.neo4j.kernel.impl.store.record.RecordLoad.FORCE;

public abstract class StoreAbstractPropertyCursor implements PropertyItem, Cursor<PropertyItem>, Disposable
{
protected final StorePropertyPayloadCursor payload;
private final RecordCursor<PropertyRecord> recordCursor;
private final PageCursor cursor;
private final PropertyRecord record;
private final PropertyStore propertyStore;

protected boolean fetched;
private boolean doneTraversingTheChain;
private DefinedProperty property;
private IntPredicate propertyKeyIds;
private long nextPropertyId;
private Lock lock;
protected PropertyContainerState state;

StoreAbstractPropertyCursor( RecordCursors cursors )
StoreAbstractPropertyCursor( PropertyStore propertyStore )
{
this.payload = new StorePropertyPayloadCursor( cursors.propertyString(), cursors.propertyArray() );
this.recordCursor = cursors.property();
this.cursor = propertyStore.newPageCursor();
this.propertyStore = propertyStore;
this.record = propertyStore.newRecord();
this.payload = new StorePropertyPayloadCursor( propertyStore.getStringStore(), propertyStore.getArrayStore() );
}

protected final void initialize( IntPredicate propertyKeyIds, long firstPropertyId, Lock lock,
PropertyContainerState state )
{
this.propertyKeyIds = propertyKeyIds;
this.nextPropertyId = firstPropertyId;
this.lock = lock;
this.state = state;
recordCursor.placeAt( firstPropertyId, FORCE );
}

@Override
Expand All @@ -79,16 +87,17 @@ private boolean fetchNext()
}

// No, OK continue down the chain and hunt for more...
if ( recordCursor.next() )
PropertyRecord propertyRecord = readRecord();
nextPropertyId = propertyRecord.getNextProp();
if ( propertyRecord.inUse() )
{
PropertyRecord propertyRecord = recordCursor.get();
payload.init( propertyKeyIds, propertyRecord.getBlocks(), propertyRecord.getNumberOfBlocks() );
if ( payloadHasNext() )
{
return true;
}
}
else if ( Record.NO_NEXT_PROPERTY.is( recordCursor.get().getNextProp() ) )
else if ( Record.NO_NEXT_PROPERTY.is( nextPropertyId ) )
{
// No more records in this chain, i.e. no more properties.
doneTraversingTheChain = true;
Expand All @@ -102,6 +111,23 @@ else if ( Record.NO_NEXT_PROPERTY.is( recordCursor.get().getNextProp() ) )
return (property = nextAdded()) != null;
}

private PropertyRecord readRecord()
{
try
{
record.clear();
if ( !Record.NO_NEXT_PROPERTY.is( nextPropertyId ) )
{
propertyStore.readIntoRecord( nextPropertyId, record, FORCE, cursor );
}
return record;
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}

private boolean payloadHasNext()
{
boolean next = payload.next();
Expand Down Expand Up @@ -156,6 +182,7 @@ public final void close()
payload.close();
propertyKeyIds = null;
property = null;
nextPropertyId = NO_SUCH_PROPERTY;
doClose();
}
finally
Expand All @@ -170,5 +197,7 @@ public final void close()
@Override
public void dispose()
{
cursor.close();
payload.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.neo4j.kernel.api.properties.DefinedProperty;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordCursors;
import org.neo4j.storageengine.api.StorageProperty;
import org.neo4j.storageengine.api.txstate.PropertyContainerState;
Expand All @@ -39,9 +40,9 @@ public class StorePropertyCursor extends StoreAbstractPropertyCursor

private Iterator<StorageProperty> storagePropertyIterator;

public StorePropertyCursor( RecordCursors cursors, Consumer<StorePropertyCursor> instanceCache )
public StorePropertyCursor( PropertyStore propertyStore, Consumer<StorePropertyCursor> instanceCache )
{
super( cursors );
super( propertyStore );
this.instanceCache = instanceCache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@
*/
package org.neo4j.kernel.impl.api.store;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.IntPredicate;

import org.neo4j.function.Disposable;
import org.neo4j.io.pagecache.PageCursor;
import org.neo4j.kernel.impl.store.CommonAbstractStore;
import org.neo4j.kernel.impl.store.DynamicArrayStore;
import org.neo4j.kernel.impl.store.DynamicStringStore;
import org.neo4j.kernel.impl.store.LongerShortString;
import org.neo4j.kernel.impl.store.PropertyType;
import org.neo4j.kernel.impl.store.RecordCursor;
import org.neo4j.kernel.impl.store.ShortArray;
import org.neo4j.kernel.impl.store.UnderlyingStorageException;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.PropertyBlock;
import org.neo4j.kernel.impl.store.record.Record;
Expand All @@ -44,7 +50,7 @@
* During initialization the raw property block {@code long}s are read from
* the given property record.
*/
class StorePropertyPayloadCursor
class StorePropertyPayloadCursor implements Disposable
{
private static final int MAX_BYTES_IN_SHORT_STRING_OR_SHORT_ARRAY = 32;
private static final int INTERNAL_BYTE_ARRAY_SIZE = 4096;
Expand All @@ -55,8 +61,11 @@ class StorePropertyPayloadCursor
*/
private final ByteBuffer cachedBuffer = ByteBuffer.allocate( INTERNAL_BYTE_ARRAY_SIZE );

private final RecordCursor<DynamicRecord> stringRecordCursor;
private final RecordCursor<DynamicRecord> arrayRecordCursor;
private final DynamicRecord record;
private final PageCursor stringCursor;
private final DynamicStringStore stringStore;
private final PageCursor arrayCursor;
private final DynamicArrayStore arrayStore;
private ByteBuffer buffer = cachedBuffer;

private long[] blocks;
Expand All @@ -65,11 +74,13 @@ class StorePropertyPayloadCursor
private IntPredicate propertyKeyIds;
private boolean exhausted;

StorePropertyPayloadCursor( RecordCursor<DynamicRecord> stringRecordCursor,
RecordCursor<DynamicRecord> arrayRecordCursor )
StorePropertyPayloadCursor( DynamicStringStore stringStore, DynamicArrayStore arrayStore )
{
this.stringRecordCursor = stringRecordCursor;
this.arrayRecordCursor = arrayRecordCursor;
this.record = stringStore.newRecord();
this.stringStore = stringStore;
this.stringCursor = stringStore.newPageCursor();
this.arrayStore = arrayStore;
this.arrayCursor = arrayStore.newPageCursor();
}

void init( IntPredicate propertyKeyIds, long[] blocks, int numberOfBlocks )
Expand Down Expand Up @@ -163,15 +174,15 @@ Object value()
return LongerShortString.decode( blocks, position, currentBlocksUsed() );
case STRING:
{
readFromStore( stringRecordCursor );
readFromStore( stringStore, stringCursor );
buffer.flip();
return UTF8.decode( buffer.array(), 0, buffer.limit() );
}
case SHORT_ARRAY:
return ShortArray.decode( valueAsBits() );
case ARRAY:
{
readFromStore( arrayRecordCursor );
readFromStore( arrayStore, arrayCursor );
buffer.flip();
return readArrayFromBuffer( buffer );
}
Expand Down Expand Up @@ -201,16 +212,14 @@ private Bits valueAsBits()
return bits;
}

private void readFromStore( RecordCursor<DynamicRecord> cursor )
private void readFromStore( CommonAbstractStore<DynamicRecord,?> store, PageCursor cursor )
{
buffer.clear();
long startBlockId = PropertyBlock.fetchLong( currentHeader() );
cursor.placeAt( startBlockId, FORCE );
while ( true )
long blockId = PropertyBlock.fetchLong( currentHeader() );
do
{
cursor.next();
DynamicRecord dynamicRecord = cursor.get();
byte[] data = dynamicRecord.getData();
readRecord( store, cursor, blockId );
byte[] data = record.getData();
if ( buffer.remaining() < data.length )
{
buffer.flip();
Expand All @@ -219,10 +228,21 @@ private void readFromStore( RecordCursor<DynamicRecord> cursor )
buffer = newBuffer;
}
buffer.put( data, 0, data.length );
if ( Record.NULL_REFERENCE.is( dynamicRecord.getNextBlock() ) )
{
break;
}
blockId = record.getNextBlock();
}
while ( !Record.NULL_REFERENCE.is( blockId ) );
}

private void readRecord( CommonAbstractStore<DynamicRecord,?> store, PageCursor cursor, long blockId )
{
try
{
record.clear();
store.readIntoRecord( blockId, record, FORCE, cursor );
}
catch ( IOException e )
{
throw new UnderlyingStorageException( e );
}
}

Expand Down Expand Up @@ -292,4 +312,11 @@ private static Object readArrayFromBuffer( ByteBuffer buffer )
buffer.order( ByteOrder.LITTLE_ENDIAN );
}
}

@Override
public void dispose()
{
stringCursor.close();
arrayCursor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.neo4j.kernel.api.properties.DefinedProperty;
import org.neo4j.kernel.impl.locking.Lock;
import org.neo4j.kernel.impl.store.PropertyStore;
import org.neo4j.kernel.impl.store.RecordCursors;
import org.neo4j.storageengine.api.txstate.PropertyContainerState;

Expand All @@ -31,9 +32,9 @@ public class StoreSinglePropertyCursor extends StoreAbstractPropertyCursor
private final Consumer<StoreSinglePropertyCursor> instanceCache;
private int propertyKeyId;

StoreSinglePropertyCursor( RecordCursors cursors, Consumer<StoreSinglePropertyCursor> instanceCache )
StoreSinglePropertyCursor( PropertyStore propertyStore, Consumer<StoreSinglePropertyCursor> instanceCache )
{
super( cursors );
super( propertyStore );
this.instanceCache = instanceCache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public StoreStatement( NeoStores neoStores, Supplier<IndexReaderFactory> indexRe
@Override
protected NodeCursor create()
{
return new NodeCursor( nodeStore.newRecord(), this, nodeStore, lockService );
return new NodeCursor( nodeStore, this, lockService );
}
};
singleRelationshipCursor = new InstanceCache<StoreSingleRelationshipCursor>()
Expand Down Expand Up @@ -124,15 +124,15 @@ protected StoreNodeRelationshipCursor create()
@Override
protected StorePropertyCursor create()
{
return new StorePropertyCursor( recordCursors, this );
return new StorePropertyCursor( neoStores.getPropertyStore(), this );
}
};
singlePropertyCursorCache = new InstanceCache<StoreSinglePropertyCursor>()
{
@Override
protected StoreSinglePropertyCursor create()
{
return new StoreSinglePropertyCursor( recordCursors, this );
return new StoreSinglePropertyCursor( neoStores.getPropertyStore(), this );
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.neo4j.io.IOUtils;
import org.neo4j.kernel.impl.store.record.AbstractBaseRecord;
import org.neo4j.kernel.impl.store.record.DynamicRecord;
import org.neo4j.kernel.impl.store.record.PropertyRecord;
import org.neo4j.kernel.impl.store.record.RelationshipGroupRecord;
import org.neo4j.kernel.impl.store.record.RelationshipRecord;
Expand All @@ -36,16 +35,12 @@ public class RecordCursors implements AutoCloseable
private final RecordCursor<RelationshipRecord> relationship;
private final RecordCursor<RelationshipGroupRecord> relationshipGroup;
private final RecordCursor<PropertyRecord> property;
private final RecordCursor<DynamicRecord> propertyString;
private final RecordCursor<DynamicRecord> propertyArray;

public RecordCursors( NeoStores neoStores )
{
relationship = newCursor( neoStores.getRelationshipStore() );
relationshipGroup = newCursor( neoStores.getRelationshipGroupStore() );
property = newCursor( neoStores.getPropertyStore() );
propertyString = newCursor( neoStores.getPropertyStore().getStringStore() );
propertyArray = newCursor( neoStores.getPropertyStore().getArrayStore() );
}

private static <R extends AbstractBaseRecord> RecordCursor<R> newCursor( RecordStore<R> store )
Expand All @@ -56,8 +51,7 @@ private static <R extends AbstractBaseRecord> RecordCursor<R> newCursor( RecordS
@Override
public void close()
{
IOUtils.closeAll( RuntimeException.class, relationship, relationshipGroup, property, propertyArray,
propertyString );
IOUtils.closeAll( RuntimeException.class, relationship, relationshipGroup, property );
}

public RecordCursor<RelationshipRecord> relationship()
Expand All @@ -74,14 +68,4 @@ public RecordCursor<PropertyRecord> property()
{
return property;
}

public RecordCursor<DynamicRecord> propertyArray()
{
return propertyArray;
}

public RecordCursor<DynamicRecord> propertyString()
{
return propertyString;
}
}

0 comments on commit 4b462d0

Please sign in to comment.