Skip to content

Commit

Permalink
Add methods in ReadOperations to create parallel scans
Browse files Browse the repository at this point in the history
This is the first steps to expose parallel scans up the stack and
eventually into the Cypher runtime.
  • Loading branch information
davidegrohmann committed May 8, 2017
1 parent dc7454c commit 507f311
Show file tree
Hide file tree
Showing 19 changed files with 403 additions and 46 deletions.
Expand Up @@ -48,9 +48,9 @@
import org.neo4j.kernel.api.schema.SchemaDescriptor; import org.neo4j.kernel.api.schema.SchemaDescriptor;
import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptor; import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptor;
import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.RelationshipVisitor;
import org.neo4j.kernel.impl.api.store.RelationshipIterator; import org.neo4j.kernel.impl.api.store.RelationshipIterator;
import org.neo4j.register.Register.DoubleLongRegister; import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.PropertyItem;
import org.neo4j.storageengine.api.RelationshipItem; import org.neo4j.storageengine.api.RelationshipItem;
Expand Down Expand Up @@ -212,6 +212,10 @@ long nodesCountIndexed( IndexDescriptor index, long nodeId, Object value )


Cursor<NodeItem> nodeCursorById( long nodeId ) throws EntityNotFoundException; Cursor<NodeItem> nodeCursorById( long nodeId ) throws EntityNotFoundException;


Cursor<NodeItem> nodeGeCursor( BatchingLongProgression progression );

BatchingLongProgression parallelNodeScan();

Cursor<RelationshipItem> relationshipCursorById( long relId ) throws EntityNotFoundException; Cursor<RelationshipItem> relationshipCursorById( long relId ) throws EntityNotFoundException;


Cursor<PropertyItem> nodeGetProperties( NodeItem node ); Cursor<PropertyItem> nodeGetProperties( NodeItem node );
Expand Down
Expand Up @@ -65,7 +65,7 @@ public void visitCreatedNode( long id )
public void visitDeletedNode( long id ) public void visitDeletedNode( long id )
{ {
counts.incrementNodeCount( ANY_LABEL, -1 ); counts.incrementNodeCount( ANY_LABEL, -1 );
storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY ) storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY )
.forAll( this::decrementCountForLabelsAndRelationships ); .forAll( this::decrementCountForLabelsAndRelationships );
super.visitDeletedNode( id ); super.visitDeletedNode( id );
} }
Expand Down Expand Up @@ -125,7 +125,7 @@ public void visitNodeLabelChanges( long id, final Set<Integer> added, final Set<
} }
// get the relationship counts from *before* this transaction, // get the relationship counts from *before* this transaction,
// the relationship changes will compensate for what happens during the transaction // the relationship changes will compensate for what happens during the transaction
storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY ) storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY )
.forAll( node -> storeLayer.degrees( statement, node, ( type, out, in ) -> .forAll( node -> storeLayer.degrees( statement, node, ( type, out, in ) ->
{ {
added.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, out, in ) ); added.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, out, in ) );
Expand Down Expand Up @@ -161,6 +161,6 @@ private void updateRelationshipCount( long startNode, int type, long endNode, in


private void visitLabels( long nodeId, PrimitiveIntVisitor<RuntimeException> visitor ) private void visitLabels( long nodeId, PrimitiveIntVisitor<RuntimeException> visitor )
{ {
storeLayer.nodeCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) ); storeLayer.nodeGetSingleCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) );
} }
} }
Expand Up @@ -72,6 +72,7 @@
import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.constraints.ConstraintSemantics;
import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.LockTracer;
import org.neo4j.kernel.impl.locking.Locks; import org.neo4j.kernel.impl.locking.Locks;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.PropertyItem;
Expand Down Expand Up @@ -465,6 +466,18 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement state )
return entityReadOperations.relationshipsGetAll( state ); return entityReadOperations.relationshipsGetAll( state );
} }


@Override
public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement )
{
return entityReadOperations.parallelNodeScanProgression( statement );
}

@Override
public Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression )
{
return entityReadOperations.nodeGetCursor( statement, progression );
}

@Override @Override
public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement ) public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement )
{ {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.neo4j.kernel.guard.Guard; import org.neo4j.kernel.guard.Guard;
import org.neo4j.kernel.impl.api.operations.EntityReadOperations; import org.neo4j.kernel.impl.api.operations.EntityReadOperations;
import org.neo4j.kernel.impl.api.operations.EntityWriteOperations; import org.neo4j.kernel.impl.api.operations.EntityWriteOperations;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.PropertyItem;
Expand Down Expand Up @@ -234,6 +235,20 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement statement )
return entityReadDelegate.relationshipsGetAll( statement ); return entityReadDelegate.relationshipsGetAll( statement );
} }


@Override
public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement )
{
guard.check( statement );
return entityReadDelegate.parallelNodeScanProgression( statement );
}

@Override
public Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression )
{
guard.check( statement );
return entityReadDelegate.nodeGetCursor( statement, progression );
}

@Override @Override
public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement ) public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement )
{ {
Expand Down
Expand Up @@ -105,6 +105,7 @@
import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo; import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo;
import org.neo4j.register.Register.DoubleLongRegister; import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.PropertyItem;
import org.neo4j.storageengine.api.RelationshipItem; import org.neo4j.storageengine.api.RelationshipItem;
Expand Down Expand Up @@ -554,6 +555,20 @@ public Cursor<NodeItem> nodeCursorById( long nodeId ) throws EntityNotFoundExcep
return dataRead().nodeCursorById( statement, nodeId ); return dataRead().nodeCursorById( statement, nodeId );
} }


@Override
public Cursor<NodeItem> nodeGeCursor( BatchingLongProgression progression )
{
statement.assertOpen();
return dataRead().nodeGetCursor( statement, progression );
}

@Override
public BatchingLongProgression parallelNodeScan()
{
statement.assertOpen();
return dataRead().parallelNodeScanProgression( statement );
}

@Override @Override
public Cursor<RelationshipItem> relationshipCursorById( long relId ) throws EntityNotFoundException public Cursor<RelationshipItem> relationshipCursorById( long relId ) throws EntityNotFoundException
{ {
Expand Down
Expand Up @@ -89,6 +89,7 @@
import org.neo4j.kernel.impl.index.IndexEntityType; import org.neo4j.kernel.impl.index.IndexEntityType;
import org.neo4j.kernel.impl.index.LegacyIndexStore; import org.neo4j.kernel.impl.index.LegacyIndexStore;
import org.neo4j.register.Register.DoubleLongRegister; import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.EntityType; import org.neo4j.storageengine.api.EntityType;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
Expand Down Expand Up @@ -140,6 +141,18 @@ public StateHandlingStatementOperations( StoreReadLayer storeLayer, AutoIndexing


// <Cursors> // <Cursors>


@Override
public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement )
{
return storeLayer.parallelNodeScanProgression( statement.storageStatement() );
}

@Override
public Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression )
{
return storeLayer.nodeGetCursor( statement.storageStatement(), progression, statement.readableTxState() );
}

@Override @Override
public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement ) public Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement )
{ {
Expand All @@ -160,7 +173,7 @@ public Cursor<NodeItem> nodeCursorById( KernelStatement statement, long nodeId )


private Cursor<NodeItem> nodeCursor( KernelStatement statement, long nodeId ) private Cursor<NodeItem> nodeCursor( KernelStatement statement, long nodeId )
{ {
return storeLayer.nodeCursor( statement.storageStatement(), nodeId, statement.readableTxState() ); return storeLayer.nodeGetSingleCursor( statement.storageStatement(), nodeId, statement.readableTxState() );
} }


@Override @Override
Expand Down
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.kernel.api.schema.IndexQuery; import org.neo4j.kernel.api.schema.IndexQuery;
import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor;
import org.neo4j.kernel.impl.api.KernelStatement; import org.neo4j.kernel.impl.api.KernelStatement;
import org.neo4j.kernel.impl.api.RelationshipVisitor; import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.PropertyItem;
Expand Down Expand Up @@ -83,6 +83,10 @@ long nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long n


PrimitiveLongIterator relationshipsGetAll( KernelStatement state ); PrimitiveLongIterator relationshipsGetAll( KernelStatement state );


BatchingLongProgression parallelNodeScanProgression( KernelStatement statement );

Cursor<NodeItem> nodeGetCursor( KernelStatement statement, BatchingLongProgression progression );

Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement ); Cursor<NodeItem> nodeGetAllCursor( KernelStatement statement );


Cursor<NodeItem> nodeCursorById( KernelStatement statement, long nodeId ) throws EntityNotFoundException; Cursor<NodeItem> nodeCursorById( KernelStatement statement, long nodeId ) throws EntityNotFoundException;
Expand Down
Expand Up @@ -61,9 +61,10 @@
import org.neo4j.kernel.impl.transaction.state.PropertyLoader; import org.neo4j.kernel.impl.transaction.state.PropertyLoader;
import org.neo4j.register.Register; import org.neo4j.register.Register;
import org.neo4j.register.Register.DoubleLongRegister; import org.neo4j.register.Register.DoubleLongRegister;
import org.neo4j.storageengine.api.BatchingLongProgression;
import org.neo4j.storageengine.api.BatchingProgressionFactory;
import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.Direction;
import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.NodeItem;
import org.neo4j.storageengine.api.BatchingProgressionFactory;
import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.PropertyItem;
import org.neo4j.storageengine.api.RelationshipItem; import org.neo4j.storageengine.api.RelationshipItem;
import org.neo4j.storageengine.api.StorageProperty; import org.neo4j.storageengine.api.StorageProperty;
Expand Down Expand Up @@ -398,14 +399,28 @@ public RelationshipIterator relationshipsGetAll()
return new AllRelationshipIterator( relationshipStore ); return new AllRelationshipIterator( relationshipStore );
} }


@Override
public BatchingLongProgression parallelNodeScanProgression( StorageStatement statement )
{
return progressionFactory.parallelAllNodeScan( nodeStore );
}

@Override
public Cursor<NodeItem> nodeGetCursor( StorageStatement statement, BatchingLongProgression progression,
NodeTransactionStateView stateView )
{
return statement.acquireNewNodeCursor( progression, stateView );
}

@Override @Override
public Cursor<NodeItem> nodeGetAllCursor( StorageStatement statement, NodeTransactionStateView stateView ) public Cursor<NodeItem> nodeGetAllCursor( StorageStatement statement, NodeTransactionStateView stateView )
{ {
return statement.acquireNodeCursor( progressionFactory.allNodeScan( nodeStore ), stateView ); return statement.acquireNodeCursor( progressionFactory.allNodeScan( nodeStore ), stateView );
} }


@Override @Override
public Cursor<NodeItem> nodeCursor( StorageStatement statement, long nodeId, NodeTransactionStateView stateView ) public Cursor<NodeItem> nodeGetSingleCursor( StorageStatement statement, long nodeId,
NodeTransactionStateView stateView )
{ {
return statement.acquireNodeCursor( progressionFactory.singleNodeFetch( nodeId ), stateView ); return statement.acquireNodeCursor( progressionFactory.singleNodeFetch( nodeId ), stateView );
} }
Expand Down
Expand Up @@ -51,7 +51,7 @@
*/ */
public class StoreStatement implements StorageStatement public class StoreStatement implements StorageStatement
{ {
protected final InstanceCache<NodeCursor> nodeCursor; private final InstanceCache<NodeCursor> nodeCursor;
private final InstanceCache<StoreSingleRelationshipCursor> singleRelationshipCursor; private final InstanceCache<StoreSingleRelationshipCursor> singleRelationshipCursor;
private final InstanceCache<StoreIteratorRelationshipCursor> iteratorRelationshipCursor; private final InstanceCache<StoreIteratorRelationshipCursor> iteratorRelationshipCursor;
private final InstanceCache<StoreNodeRelationshipCursor> nodeRelationshipsCursor; private final InstanceCache<StoreNodeRelationshipCursor> nodeRelationshipsCursor;
Expand All @@ -62,6 +62,7 @@ public class StoreStatement implements StorageStatement
private final NeoStores neoStores; private final NeoStores neoStores;
private final Supplier<IndexReaderFactory> indexReaderFactorySupplier; private final Supplier<IndexReaderFactory> indexReaderFactorySupplier;
private final Supplier<LabelScanReader> labelScanStore; private final Supplier<LabelScanReader> labelScanStore;
private final LockService lockService;


private IndexReaderFactory indexReaderFactory; private IndexReaderFactory indexReaderFactory;
private LabelScanReader labelScanReader; private LabelScanReader labelScanReader;
Expand All @@ -75,6 +76,7 @@ public StoreStatement( NeoStores neoStores, Supplier<IndexReaderFactory> indexRe
this.neoStores = neoStores; this.neoStores = neoStores;
this.indexReaderFactorySupplier = indexReaderFactory; this.indexReaderFactorySupplier = indexReaderFactory;
this.labelScanStore = labelScanReaderSupplier; this.labelScanStore = labelScanReaderSupplier;
this.lockService = lockService;


nodeCursor = new InstanceCache<NodeCursor>() nodeCursor = new InstanceCache<NodeCursor>()
{ {
Expand Down Expand Up @@ -152,6 +154,14 @@ public void acquire()
this.acquired = true; this.acquired = true;
} }


@Override
public Cursor<NodeItem> acquireNewNodeCursor( BatchingLongProgression progression,
NodeTransactionStateView stateView )
{
return new NodeCursor( neoStores.getNodeStore(), InstanceCache::noCache, lockService )
.init( progression, stateView );
}

@Override @Override
public Cursor<NodeItem> acquireNodeCursor( BatchingLongProgression progression, NodeTransactionStateView stateView ) public Cursor<NodeItem> acquireNodeCursor( BatchingLongProgression progression, NodeTransactionStateView stateView )
{ {
Expand Down
Expand Up @@ -21,7 +21,6 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;


Expand All @@ -39,7 +38,6 @@
import org.neo4j.kernel.api.exceptions.LabelNotFoundKernelException; import org.neo4j.kernel.api.exceptions.LabelNotFoundKernelException;
import org.neo4j.kernel.api.exceptions.PropertyKeyIdNotFoundKernelException; import org.neo4j.kernel.api.exceptions.PropertyKeyIdNotFoundKernelException;
import org.neo4j.kernel.api.properties.DefinedProperty; import org.neo4j.kernel.api.properties.DefinedProperty;
import org.neo4j.kernel.impl.api.KernelTransactionImplementation;
import org.neo4j.kernel.impl.core.NodeProxy; import org.neo4j.kernel.impl.core.NodeProxy;
import org.neo4j.kernel.impl.core.RelationshipProxy; import org.neo4j.kernel.impl.core.RelationshipProxy;
import org.neo4j.kernel.impl.core.RelationshipProxy.RelationshipActions; import org.neo4j.kernel.impl.core.RelationshipProxy.RelationshipActions;
Expand Down Expand Up @@ -196,7 +194,7 @@ private void takeSnapshot()
{ {
for ( long nodeId : state.addedAndRemovedNodes().getRemoved() ) for ( long nodeId : state.addedAndRemovedNodes().getRemoved() )
{ {
try ( Cursor<NodeItem> node = store.nodeCursor( storeStatement, nodeId, EMPTY ) ) try ( Cursor<NodeItem> node = store.nodeGetSingleCursor( storeStatement, nodeId, EMPTY ) )
{ {
if ( node.next() ) if ( node.next() )
{ {
Expand Down Expand Up @@ -357,7 +355,7 @@ private Object committedValue( NodeState nodeState, int property )
return null; return null;
} }


try ( Cursor<NodeItem> node = store.nodeCursor( storeStatement, nodeState.getId(), EMPTY ) ) try ( Cursor<NodeItem> node = store.nodeGetSingleCursor( storeStatement, nodeState.getId(), EMPTY ) )
{ {
if ( !node.next() ) if ( !node.next() )
{ {
Expand Down
Expand Up @@ -76,4 +76,9 @@ public void close()
instance.dispose(); instance.dispose();
} }
} }

public static <T extends Disposable> void noCache( T instance )
{

}
} }
Expand Up @@ -66,6 +66,9 @@ public interface StorageStatement extends AutoCloseable
@Override @Override
void close(); void close();


// FIXME: this is a temporary workaround until we have a way to cache cursors thread safely in the transaction
Cursor<NodeItem> acquireNewNodeCursor( BatchingLongProgression progression, NodeTransactionStateView stateView );

/** /**
* Acquires {@link Cursor} capable of {@link Cursor#get() serving} {@link NodeItem} for selected nodes. * Acquires {@link Cursor} capable of {@link Cursor#get() serving} {@link NodeItem} for selected nodes.
* No node is selected when this method returns, a call to {@link Cursor#next()} will have to be made * No node is selected when this method returns, a call to {@link Cursor#next()} will have to be made
Expand Down
Expand Up @@ -266,9 +266,14 @@ IndexReader indexGetFreshReader( StorageStatement storeStatement, IndexDescripto
*/ */
RelationshipIterator relationshipsGetAll(); RelationshipIterator relationshipsGetAll();


BatchingLongProgression parallelNodeScanProgression( StorageStatement statement );

Cursor<NodeItem> nodeGetCursor( StorageStatement statement, BatchingLongProgression progression,
NodeTransactionStateView stateView );

Cursor<NodeItem> nodeGetAllCursor( StorageStatement storeStatement, NodeTransactionStateView stateView ); Cursor<NodeItem> nodeGetAllCursor( StorageStatement storeStatement, NodeTransactionStateView stateView );


Cursor<NodeItem> nodeCursor( StorageStatement storeStatement, long nodeId, NodeTransactionStateView stateView ); Cursor<NodeItem> nodeGetSingleCursor( StorageStatement storeStatement, long nodeId, NodeTransactionStateView stateView );


Cursor<RelationshipItem> relationshipCursor( StorageStatement storeStatement, long relationshipId, Cursor<RelationshipItem> relationshipCursor( StorageStatement storeStatement, long relationshipId,
ReadableTransactionState state ); ReadableTransactionState state );
Expand Down Expand Up @@ -393,5 +398,4 @@ int countDegrees( StorageStatement statement, NodeItem node, Direction direction
ReadableTransactionState state ); ReadableTransactionState state );


<T> T getOrCreateSchemaDependantState( Class<T> type, Function<StoreReadLayer, T> factory ); <T> T getOrCreateSchemaDependantState( Class<T> type, Function<StoreReadLayer, T> factory );

} }

0 comments on commit 507f311

Please sign in to comment.