From 507f311c19019d83b38c6a3eaf1a8bb81f7c7bb5 Mon Sep 17 00:00:00 2001 From: Davide Grohmann Date: Mon, 3 Apr 2017 14:49:16 +0200 Subject: [PATCH] Add methods in ReadOperations to create parallel scans This is the first steps to expose parallel scans up the stack and eventually into the Cypher runtime. --- .../org/neo4j/kernel/api/ReadOperations.java | 6 +- .../TransactionCountingStateVisitor.java | 6 +- .../ConstraintEnforcingEntityOperations.java | 13 + .../impl/api/GuardingStatementOperations.java | 15 + .../kernel/impl/api/OperationsFacade.java | 15 + .../api/StateHandlingStatementOperations.java | 15 +- .../api/operations/EntityReadOperations.java | 6 +- .../kernel/impl/api/store/StorageLayer.java | 19 +- .../kernel/impl/api/store/StoreStatement.java | 12 +- .../TxStateTransactionDataSnapshot.java | 6 +- .../neo4j/kernel/impl/util/InstanceCache.java | 5 + .../storageengine/api/StorageStatement.java | 3 + .../storageengine/api/StoreReadLayer.java | 8 +- .../state/IndexQueryTransactionStateTest.java | 20 +- .../api/state/LabelTransactionStateTest.java | 10 +- .../StateHandlingStatementOperationsTest.java | 14 +- .../StateOperationsAutoIndexingTest.java | 8 +- .../TxStateTransactionDataViewTest.java | 10 +- ...NodeScanReadOperationsIntegrationTest.java | 258 ++++++++++++++++++ 19 files changed, 403 insertions(+), 46 deletions(-) create mode 100644 enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java index 9f1e0970e7412..d86608e12f8b2 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java @@ -48,9 +48,9 @@ import org.neo4j.kernel.api.schema.SchemaDescriptor; import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor; -import org.neo4j.kernel.impl.api.RelationshipVisitor; import org.neo4j.kernel.impl.api.store.RelationshipIterator; import org.neo4j.register.Register.DoubleLongRegister; +import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.RelationshipItem; @@ -212,6 +212,10 @@ long nodesCountIndexed( IndexDescriptor index, long nodeId, Object value ) Cursor nodeCursorById( long nodeId ) throws EntityNotFoundException; + Cursor nodeGeCursor( BatchingLongProgression progression ); + + BatchingLongProgression parallelNodeScan(); + Cursor relationshipCursorById( long relId ) throws EntityNotFoundException; Cursor nodeGetProperties( NodeItem node ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java index f2c8be8245f18..92d926ad53e07 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java @@ -65,7 +65,7 @@ public void visitCreatedNode( long id ) public void visitDeletedNode( long id ) { counts.incrementNodeCount( ANY_LABEL, -1 ); - storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY ) + storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY ) .forAll( this::decrementCountForLabelsAndRelationships ); super.visitDeletedNode( id ); } @@ -125,7 +125,7 @@ public void visitNodeLabelChanges( long id, final Set added, final Set< } // get the relationship counts from *before* this transaction, // the relationship changes will compensate for what happens during the transaction - storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY ) + storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY ) .forAll( node -> storeLayer.degrees( statement, node, ( type, out, in ) -> { added.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, out, in ) ); @@ -161,6 +161,6 @@ private void updateRelationshipCount( long startNode, int type, long endNode, in private void visitLabels( long nodeId, PrimitiveIntVisitor visitor ) { - storeLayer.nodeCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) ); + storeLayer.nodeGetSingleCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java index ef2a5d536147c..a38df056c7490 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java @@ -72,6 +72,7 @@ import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.Locks; +import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -465,6 +466,18 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement state ) return entityReadOperations.relationshipsGetAll( state ); } + @Override + public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ) + { + return entityReadOperations.parallelNodeScanProgression( statement ); + } + + @Override + public Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ) + { + return entityReadOperations.nodeGetCursor( statement, progression ); + } + @Override public Cursor nodeGetAllCursor( KernelStatement statement ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java index 530d670c96937..1d339a7d40986 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java @@ -39,6 +39,7 @@ import org.neo4j.kernel.guard.Guard; import org.neo4j.kernel.impl.api.operations.EntityReadOperations; import org.neo4j.kernel.impl.api.operations.EntityWriteOperations; +import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -234,6 +235,20 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement statement ) return entityReadDelegate.relationshipsGetAll( statement ); } + @Override + public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ) + { + guard.check( statement ); + return entityReadDelegate.parallelNodeScanProgression( statement ); + } + + @Override + public Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ) + { + guard.check( statement ); + return entityReadDelegate.nodeGetCursor( statement, progression ); + } + @Override public Cursor nodeGetAllCursor( KernelStatement statement ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java index 3f3c7963b143c..71a708e5199e2 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java @@ -105,6 +105,7 @@ import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo; import org.neo4j.register.Register.DoubleLongRegister; +import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.RelationshipItem; @@ -554,6 +555,20 @@ public Cursor nodeCursorById( long nodeId ) throws EntityNotFoundExcep return dataRead().nodeCursorById( statement, nodeId ); } + @Override + public Cursor nodeGeCursor( BatchingLongProgression progression ) + { + statement.assertOpen(); + return dataRead().nodeGetCursor( statement, progression ); + } + + @Override + public BatchingLongProgression parallelNodeScan() + { + statement.assertOpen(); + return dataRead().parallelNodeScanProgression( statement ); + } + @Override public Cursor relationshipCursorById( long relId ) throws EntityNotFoundException { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java index 7d522d8861b51..9d1fc52920169 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java @@ -89,6 +89,7 @@ import org.neo4j.kernel.impl.index.IndexEntityType; import org.neo4j.kernel.impl.index.LegacyIndexStore; import org.neo4j.register.Register.DoubleLongRegister; +import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.EntityType; import org.neo4j.storageengine.api.NodeItem; @@ -140,6 +141,18 @@ public StateHandlingStatementOperations( StoreReadLayer storeLayer, AutoIndexing // + @Override + public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ) + { + return storeLayer.parallelNodeScanProgression( statement.storageStatement() ); + } + + @Override + public Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ) + { + return storeLayer.nodeGetCursor( statement.storageStatement(), progression, statement.readableTxState() ); + } + @Override public Cursor nodeGetAllCursor( KernelStatement statement ) { @@ -160,7 +173,7 @@ public Cursor nodeCursorById( KernelStatement statement, long nodeId ) private Cursor nodeCursor( KernelStatement statement, long nodeId ) { - return storeLayer.nodeCursor( statement.storageStatement(), nodeId, statement.readableTxState() ); + return storeLayer.nodeGetSingleCursor( statement.storageStatement(), nodeId, statement.readableTxState() ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java index 87cc27cb4b27f..ba20ad34669fd 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java @@ -32,7 +32,7 @@ import org.neo4j.kernel.api.schema.IndexQuery; import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.impl.api.KernelStatement; -import org.neo4j.kernel.impl.api.RelationshipVisitor; +import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -83,6 +83,10 @@ long nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long n PrimitiveLongIterator relationshipsGetAll( KernelStatement state ); + BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ); + + Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ); + Cursor nodeGetAllCursor( KernelStatement statement ); Cursor nodeCursorById( KernelStatement statement, long nodeId ) throws EntityNotFoundException; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java index 8a01846c85986..101acb42af074 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java @@ -61,9 +61,10 @@ import org.neo4j.kernel.impl.transaction.state.PropertyLoader; import org.neo4j.register.Register; import org.neo4j.register.Register.DoubleLongRegister; +import org.neo4j.storageengine.api.BatchingLongProgression; +import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.RelationshipItem; import org.neo4j.storageengine.api.StorageProperty; @@ -398,6 +399,19 @@ public RelationshipIterator relationshipsGetAll() return new AllRelationshipIterator( relationshipStore ); } + @Override + public BatchingLongProgression parallelNodeScanProgression( StorageStatement statement ) + { + return progressionFactory.parallelAllNodeScan( nodeStore ); + } + + @Override + public Cursor nodeGetCursor( StorageStatement statement, BatchingLongProgression progression, + NodeTransactionStateView stateView ) + { + return statement.acquireNewNodeCursor( progression, stateView ); + } + @Override public Cursor nodeGetAllCursor( StorageStatement statement, NodeTransactionStateView stateView ) { @@ -405,7 +419,8 @@ public Cursor nodeGetAllCursor( StorageStatement statement, NodeTransa } @Override - public Cursor nodeCursor( StorageStatement statement, long nodeId, NodeTransactionStateView stateView ) + public Cursor nodeGetSingleCursor( StorageStatement statement, long nodeId, + NodeTransactionStateView stateView ) { return statement.acquireNodeCursor( progressionFactory.singleNodeFetch( nodeId ), stateView ); } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java index 781eb8b4e7562..e64bcd4ff573e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java @@ -51,7 +51,7 @@ */ public class StoreStatement implements StorageStatement { - protected final InstanceCache nodeCursor; + private final InstanceCache nodeCursor; private final InstanceCache singleRelationshipCursor; private final InstanceCache iteratorRelationshipCursor; private final InstanceCache nodeRelationshipsCursor; @@ -62,6 +62,7 @@ public class StoreStatement implements StorageStatement private final NeoStores neoStores; private final Supplier indexReaderFactorySupplier; private final Supplier labelScanStore; + private final LockService lockService; private IndexReaderFactory indexReaderFactory; private LabelScanReader labelScanReader; @@ -75,6 +76,7 @@ public StoreStatement( NeoStores neoStores, Supplier indexRe this.neoStores = neoStores; this.indexReaderFactorySupplier = indexReaderFactory; this.labelScanStore = labelScanReaderSupplier; + this.lockService = lockService; nodeCursor = new InstanceCache() { @@ -152,6 +154,14 @@ public void acquire() this.acquired = true; } + @Override + public Cursor acquireNewNodeCursor( BatchingLongProgression progression, + NodeTransactionStateView stateView ) + { + return new NodeCursor( neoStores.getNodeStore(), InstanceCache::noCache, lockService ) + .init( progression, stateView ); + } + @Override public Cursor acquireNodeCursor( BatchingLongProgression progression, NodeTransactionStateView stateView ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java index df52a59efd464..6c99340d7485c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -39,7 +38,6 @@ import org.neo4j.kernel.api.exceptions.LabelNotFoundKernelException; import org.neo4j.kernel.api.exceptions.PropertyKeyIdNotFoundKernelException; import org.neo4j.kernel.api.properties.DefinedProperty; -import org.neo4j.kernel.impl.api.KernelTransactionImplementation; import org.neo4j.kernel.impl.core.NodeProxy; import org.neo4j.kernel.impl.core.RelationshipProxy; import org.neo4j.kernel.impl.core.RelationshipProxy.RelationshipActions; @@ -196,7 +194,7 @@ private void takeSnapshot() { for ( long nodeId : state.addedAndRemovedNodes().getRemoved() ) { - try ( Cursor node = store.nodeCursor( storeStatement, nodeId, EMPTY ) ) + try ( Cursor node = store.nodeGetSingleCursor( storeStatement, nodeId, EMPTY ) ) { if ( node.next() ) { @@ -357,7 +355,7 @@ private Object committedValue( NodeState nodeState, int property ) return null; } - try ( Cursor node = store.nodeCursor( storeStatement, nodeState.getId(), EMPTY ) ) + try ( Cursor node = store.nodeGetSingleCursor( storeStatement, nodeState.getId(), EMPTY ) ) { if ( !node.next() ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java index 7981b0bc197aa..9c9cb2655ee82 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java @@ -76,4 +76,9 @@ public void close() instance.dispose(); } } + + public static void noCache( T instance ) + { + + } } diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java b/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java index 1e6ed60ed70f0..e0dad1c95bf54 100644 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java +++ b/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java @@ -66,6 +66,9 @@ public interface StorageStatement extends AutoCloseable @Override void close(); + // FIXME: this is a temporary workaround until we have a way to cache cursors thread safely in the transaction + Cursor acquireNewNodeCursor( BatchingLongProgression progression, NodeTransactionStateView stateView ); + /** * Acquires {@link Cursor} capable of {@link Cursor#get() serving} {@link NodeItem} for selected nodes. * No node is selected when this method returns, a call to {@link Cursor#next()} will have to be made diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java b/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java index 0624eb889c1f1..29a0313dbf528 100644 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java +++ b/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java @@ -266,9 +266,14 @@ IndexReader indexGetFreshReader( StorageStatement storeStatement, IndexDescripto */ RelationshipIterator relationshipsGetAll(); + BatchingLongProgression parallelNodeScanProgression( StorageStatement statement ); + + Cursor nodeGetCursor( StorageStatement statement, BatchingLongProgression progression, + NodeTransactionStateView stateView ); + Cursor nodeGetAllCursor( StorageStatement storeStatement, NodeTransactionStateView stateView ); - Cursor nodeCursor( StorageStatement storeStatement, long nodeId, NodeTransactionStateView stateView ); + Cursor nodeGetSingleCursor( StorageStatement storeStatement, long nodeId, NodeTransactionStateView stateView ); Cursor relationshipCursor( StorageStatement storeStatement, long relationshipId, ReadableTransactionState state ); @@ -393,5 +398,4 @@ int countDegrees( StorageStatement statement, NodeItem node, Direction direction ReadableTransactionState state ); T getOrCreateSchemaDependantState( Class type, Function factory ); - } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java index ee573f6d6b0c4..2340110d40fd2 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java @@ -116,7 +116,7 @@ public void shouldExcludeRemovedNodesFromIndexQuery() throws Exception long nodeId = 2L; when( indexReader.query( withValue ) ).then( answerAsPrimitiveLongIteratorFrom( asList( 1L, nodeId, 3L ) ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId ) ); txContext.nodeDelete( state, nodeId ); @@ -135,7 +135,7 @@ public void shouldExcludeRemovedNodeFromUniqueIndexQuery() throws Exception long nodeId = 1L; when( indexReader.query( withValue ) ).thenReturn( asPrimitiveResourceIterator( nodeId ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId ) ); txContext.nodeDelete( state, nodeId ); @@ -185,7 +185,7 @@ public void shouldIncludeCreatedNodesWithCorrectLabelAndProperty() throws Except long nodeId = 1L; state.writableTxState().nodeDoAddProperty( nodeId, stringProperty( propertyKeyId, value ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -208,7 +208,7 @@ public void shouldIncludeUniqueCreatedNodeWithCorrectLabelAndProperty() throws E long nodeId = 1L; state.writableTxState().nodeDoAddProperty( nodeId, stringProperty( propertyKeyId, value ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -230,7 +230,7 @@ public void shouldIncludeExistingNodesWithCorrectPropertyAfterAddingLabel() thro long nodeId = 1L; - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -252,7 +252,7 @@ public void shouldIncludeExistingUniqueNodeWithCorrectPropertyAfterAddingLabel() long nodeId = 2L; - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -273,7 +273,7 @@ public void shouldExcludeExistingNodesWithCorrectPropertyAfterRemovingLabel() th long nodeId = 1L; when( indexReader.query( withValue ) ).then( answerAsPrimitiveLongIteratorFrom( asList( nodeId, 2L, 3L ) ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L, labels( labelId ) ) ); mockStoreProperty(); @@ -293,7 +293,7 @@ public void shouldExcludeExistingUniqueNodeWithCorrectPropertyAfterRemovingLabel long nodeId = 1L; when( indexReader.query( withValue ) ).thenReturn( asPrimitiveResourceIterator( nodeId ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L, labels( labelId ) ) ); mockStoreProperty(); @@ -315,7 +315,7 @@ public void shouldExcludeNodesWithRemovedProperty() throws Exception long nodeId = 1L; state.writableTxState().nodeDoAddProperty( nodeId, intProperty( propertyKeyId, 10 ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, labels( labelId ) ) ); txContext.nodeAddLabel( state, nodeId, labelId ); @@ -334,7 +334,7 @@ public void shouldExcludeUniqueNodeWithRemovedProperty() throws Exception long nodeId = 1L; when( indexReader.query( withValue ) ).thenReturn( asPrimitiveResourceIterator( nodeId ) ); - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40, labels( labelId ) ) ); mockStoreProperty(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java index d8a6d9fe72f64..df0a8e512e01f 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java @@ -243,7 +243,7 @@ public void removingNonExistentLabelFromNodeShouldRespondFalse() throws Exceptio public void should_return_true_when_adding_new_label() throws Exception { // GIVEN - when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -259,7 +259,7 @@ public void should_return_true_when_adding_new_label() throws Exception public void should_return_false_when_adding_existing_label() throws Exception { // GIVEN - when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L, StubCursors.labels( 12 ) ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -275,7 +275,7 @@ public void should_return_false_when_adding_existing_label() throws Exception public void should_return_true_when_removing_existing_label() throws Exception { // GIVEN - when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L, StubCursors.labels( 12 ) ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -291,7 +291,7 @@ public void should_return_true_when_removing_existing_label() throws Exception public void should_return_true_when_removing_non_existant_label() throws Exception { // GIVEN - when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L ) ); // WHEN @@ -334,7 +334,7 @@ private void commitLabels( Labels... labels ) throws Exception Map> allLabels = new HashMap<>(); for ( Labels nodeLabels : labels ) { - when( store.nodeCursor( any( StorageStatement.class ), eq( nodeLabels.nodeId ), + when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeLabels.nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeLabels.nodeId, StubCursors.labels( nodeLabels.labelIds ) ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java index e2a5701a92663..44827304ff038 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java @@ -95,8 +95,9 @@ public class StateHandlingStatementOperationsTest public void shouldNeverDelegateWrites() throws Exception { KernelStatement state = mockedState( new TxState() ); + when( inner.indexesGetForLabel( 0 ) ).thenReturn( iterator( IndexDescriptorFactory.forLabel( 0, 0 ) ) ); - when( inner.nodeCursor( any( StorageStatement.class ), anyLong(), any( ReadableTransactionState.class ) ) ) + when( inner.nodeGetSingleCursor( any( StorageStatement.class ), anyLong(), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 0 ) ); when( inner.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -112,7 +113,7 @@ public void shouldNeverDelegateWrites() throws Exception // one for add and one for remove verify( inner, times( 2 ) ) - .nodeCursor( any( StorageStatement.class ), eq( 0L ), any( ReadableTransactionState.class ) ); + .nodeGetSingleCursor( any( StorageStatement.class ), eq( 0L ), any( ReadableTransactionState.class ) ); } @Test @@ -415,10 +416,9 @@ public void shouldConsiderTransactionStateDuringIndexBetweenRangeSeekByNumberWit IndexQuery.NumberRangePredicate indexQuery = IndexQuery.range( index.schema().getPropertyId(), lower, true, upper, false ); when( indexReader.query( indexQuery ) ).thenReturn( - PrimitiveLongCollections.resourceIterator( PrimitiveLongCollections.iterator( 43L, 44L, 46L ), null ) - ); - when( storeReadLayer.nodeCursor( any( StorageStatement.class ), anyLong(), any( ReadableTransactionState.class ) ) ) - .thenAnswer( invocationOnMock -> + PrimitiveLongCollections.resourceIterator( PrimitiveLongCollections.iterator( 43L, 44L, 46L ), null ) ); + when( storeReadLayer.nodeGetSingleCursor( any( StorageStatement.class ), anyLong(), + any( ReadableTransactionState.class ) ) ).thenAnswer( invocationOnMock -> { long nodeId = (long) invocationOnMock.getArguments()[1]; when( storeReadLayer @@ -501,7 +501,7 @@ public void shouldNotRecordNodeSetPropertyOnSameValue() throws Exception when( kernelStatement.readableTxState() ).thenReturn( ReadableTransactionState.EMPTY ); Cursor ourNode = nodeCursorWithProperty( propertyKeyId ); when( storeReadLayer - .nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + .nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( ourNode ); InternalAutoIndexing autoIndexing = mock( InternalAutoIndexing.class ); AutoIndexOperations autoIndexOps = mock( AutoIndexOperations.class ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java index 0e1ad33b53e44..5fd2f4bc31950 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java @@ -76,7 +76,7 @@ public void shouldSignalNodeRemovedToAutoIndex() throws Exception { // Given when( storeLayer - .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( mock( NodeItem.class ) ) ); // When @@ -109,7 +109,7 @@ public void shouldSignalNodePropertyAddedToAutoIndex() throws Exception NodeItem node = mock( NodeItem.class ); when( node.labels() ).thenReturn( PrimitiveIntCollections.emptySet() ); when( storeLayer - .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( node ) ); when( storeLayer .nodeGetProperty( any( StorageStatement.class ), any( NodeItem.class ), eq( property.propertyKeyId() ), @@ -156,7 +156,7 @@ public void shouldSignalNodePropertyChangedToAutoIndex() throws Exception NodeItem node = mock( NodeItem.class ); when( node.labels() ).thenReturn( PrimitiveIntCollections.emptySet() ); when( storeLayer - .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( node ) ); when( storeLayer .nodeGetProperty( any( StorageStatement.class ), any( NodeItem.class ), eq( property.propertyKeyId() ), @@ -209,7 +209,7 @@ public void shouldSignalNodePropertyRemovedToAutoIndex() throws Exception .thenReturn( cursor( existingProperty ) ); when( node.labels() ).thenReturn( PrimitiveIntCollections.emptySet() ); when( storeLayer - .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( node ) ); // When diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java index 32e990c50c2ec..11de15779adc6 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java @@ -106,13 +106,13 @@ public void showsDeletedNodes() throws Exception state.nodeDoDelete( 2L ); NodeItem node1 = asNode( 2L, 20L, labels( 15 ) ); - when( ops.nodeCursor( storeStatement, 2L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node1 ) ); + when( ops.nodeGetSingleCursor( storeStatement, 2L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node1 ) ); when( ops.nodeGetProperties( storeStatement, node1, NodeState.EMPTY ) ) .thenReturn( asPropertyCursor( stringProperty( 1, "p" ) ) ); NodeItem node2 = asNode( 1L, 21L, labels() ); - when( ops.nodeCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node2 ) ); + when( ops.nodeGetSingleCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node2 ) ); when( ops.nodeGetProperties( storeStatement, node2, NodeState.EMPTY ) ).thenReturn( asPropertyCursor() ); when( ops.propertyKeyGetName( 1 ) ).thenReturn( "key" ); @@ -171,7 +171,7 @@ public void correctlySaysNodeIsDeleted() throws Exception Node node = mock( Node.class ); when( node.getId() ).thenReturn( 1L ); NodeItem nodeItem = asNode( 1 ); - when( ops.nodeCursor( storeStatement, 1, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( nodeItem ) ); + when( ops.nodeGetSingleCursor( storeStatement, 1, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( nodeItem ) ); when( ops.nodeGetProperties( storeStatement, nodeItem, NodeState.EMPTY ) ).thenReturn( asPropertyCursor() ); // When & Then @@ -206,7 +206,7 @@ public void shouldListAddedNodePropertiesProperties() throws Exception when( ops.propertyKeyGetName( propertyKeyId ) ).thenReturn( "theKey" ); long propertyId = 20L; NodeItem node = asNode( 1L, propertyId, labels() ); - when( ops.nodeCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); + when( ops.nodeGetSingleCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); when( ops.nodeGetProperty( storeStatement, node, propertyKeyId, NodeState.EMPTY ) ) .thenReturn( asPropertyCursor( prevProp ) ); @@ -231,7 +231,7 @@ public void shouldListRemovedNodeProperties() throws Exception when( ops.propertyKeyGetName( propertyKeyId ) ).thenReturn( "theKey" ); long propertyId = 20L; NodeItem node = asNode( 1L, propertyId, labels() ); - when( ops.nodeCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); + when( ops.nodeGetSingleCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); when( ops.nodeGetProperty( storeStatement, node, propertyKeyId, NodeState.EMPTY ) ) .thenReturn( asPropertyCursor( prevProp ) ); diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java new file mode 100644 index 0000000000000..1539313090409 --- /dev/null +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java @@ -0,0 +1,258 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.kernel.impl.api.integrationtest; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Predicate; + +import org.neo4j.cursor.Cursor; +import org.neo4j.graphdb.GraphDatabaseService; +import org.neo4j.graphdb.NotFoundException; +import org.neo4j.graphdb.Transaction; +import org.neo4j.helpers.Exceptions; +import org.neo4j.kernel.api.KernelTransaction; +import org.neo4j.kernel.api.ReadOperations; +import org.neo4j.kernel.api.Statement; +import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; +import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine; +import org.neo4j.kernel.impl.store.NodeStore; +import org.neo4j.kernel.internal.GraphDatabaseAPI; +import org.neo4j.storageengine.api.BatchingLongProgression; +import org.neo4j.storageengine.api.NodeItem; +import org.neo4j.test.rule.DatabaseRule; +import org.neo4j.test.rule.EnterpriseDatabaseRule; +import org.neo4j.test.rule.RandomRule; + +import static java.util.Collections.disjoint; +import static java.util.stream.Collectors.toList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ParallelNodeScanReadOperationsIntegrationTest +{ + @Rule + public DatabaseRule databaseRule = new EnterpriseDatabaseRule(); + + @Rule + public final RandomRule random = new RandomRule(); + + @Test + public void shouldRunParallelNodeScanThroughTheReadOperationsUsingTransactions() throws Throwable + { + GraphDatabaseAPI db = databaseRule.getGraphDatabaseAPI(); + createNodes( db, randomNodes( nodeStore( db ) ) ); + + int threads = random.nextInt( 2, 6 ); + ExecutorService executorService = Executors.newFixedThreadPool( threads ); + + Set singleThreadExecutionResult = new HashSet<>(); + Set parallelExecutionResult = new HashSet<>(); + ThreadToStatementContextBridge bridge = + db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ); + + try ( Transaction tx = db.beginTx() ) + { + KernelTransaction ktx = bridge.getKernelTransactionBoundToThisThread( true ); + + try ( Statement statement = ktx.acquireStatement() ) + { + statement.readOperations().nodeGetAllCursor().forAll( n -> singleThreadExecutionResult.add( n.id() ) ); + } + + try ( Statement statement = ktx.acquireStatement() ) + { + parallelExecution( statement.readOperations(), executorService, threads, parallelExecutionResult ); + } + + tx.success(); + } + + assertEquals( singleThreadExecutionResult, parallelExecutionResult ); + } + + @Test + public void shouldRunParallelNodeScanWithTxStateChangesThroughTheReadOperationsUsingTransactions() throws Throwable + { + GraphDatabaseAPI db = databaseRule.getGraphDatabaseAPI(); + int nodes = randomNodes( nodeStore( db ) ); + createNodes( db, nodes ); + int threads = random.nextInt( 2, 6 ); + ExecutorService executorService = Executors.newFixedThreadPool( threads ); + + Set singleThreadExecutionResult = new HashSet<>(); + Set parallelExecutionResult = new HashSet<>(); + ThreadToStatementContextBridge bridge = + db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ); + + try ( Transaction tx = db.beginTx() ) + { + createTemporaryNodesAndDeleteSomeOfTheExistingNodes( db, nodes ); + KernelTransaction ktx = bridge.getKernelTransactionBoundToThisThread( true ); + + try ( Statement statement = ktx.acquireStatement() ) + { + statement.readOperations().nodeGetAllCursor().forAll( n -> singleThreadExecutionResult.add( n.id() ) ); + } + + try ( Statement statement = ktx.acquireStatement() ) + { + parallelExecution( statement.readOperations(), executorService, threads, parallelExecutionResult ); + } + + tx.success(); + } + + assertEquals( message( singleThreadExecutionResult, parallelExecutionResult ), singleThreadExecutionResult, + parallelExecutionResult ); + } + + private void parallelExecution( ReadOperations readOperations, ExecutorService executorService, int threads, + Set parallelExecutionResult ) throws Throwable + { + try + { + BatchingLongProgression progression = readOperations.parallelNodeScan(); + + @SuppressWarnings( "unchecked" ) + Future>[] futures = new Future[threads]; + for ( int i = 0; i < threads; i++ ) + { + futures[i] = executorService.submit( () -> + { + HashSet ids = new HashSet<>(); + try ( Cursor cursor = readOperations.nodeGeCursor( progression ) ) + { + while ( cursor.next() ) + { + long nodeId = cursor.get().id(); + assertTrue( ids.add( nodeId ) ); + } + } + return ids; + } ); + } + + Throwable t = null; + for ( int i = 0; i < threads; i++ ) + { + try + { + Set set = futures[i].get(); + assertTrue( i + "" + message( parallelExecutionResult, set ), + disjoint( parallelExecutionResult, set ) ); + parallelExecutionResult.addAll( set ); + } + catch ( Throwable current ) + { + current.printStackTrace(); + t = Exceptions.chain( t, current ); + } + } + + if ( t != null ) + { + throw t; + } + } + finally + { + executorService.shutdown(); + } + } + + private String message( Set expected, Set actual ) + { + List sortedExpected = expected.stream().sorted().collect( toList() ); + List sortedActual = actual.stream().sorted().collect( toList() ); + + List missingInActual = + expected.stream().filter( ((Predicate) actual::contains).negate() ).sorted().collect( toList() ); + List extraInActual = + actual.stream().filter( ((Predicate) expected::contains).negate() ).sorted().collect( toList() ); + + List intersect = expected.stream().filter( actual::contains ).sorted().collect( toList() ); + + String expectedResult = "Expected: " + sortedExpected; + String actualResult = "Actual: " + sortedActual; + String diff = "Missing: " + missingInActual + "\n" + "Extra: " + extraInActual; + String overlap = "Overlap: " + intersect; + return expectedResult + "\n" + actualResult + "\n" + diff + "\n" + overlap; + } + + private NodeStore nodeStore( GraphDatabaseAPI db ) + { + return db.getDependencyResolver().resolveDependency( RecordStorageEngine.class ) + .testAccessNeoStores().getNodeStore(); + } + + private int randomNodes( NodeStore nodeStore ) + { + int recordsPerPage = nodeStore.getRecordsPerPage(); + int pages = random.nextInt( 40, 1000 ); + int nonAlignedRecords = random.nextInt( 0, recordsPerPage - 1 ); + return pages * recordsPerPage + nonAlignedRecords; + } + + private void createTemporaryNodesAndDeleteSomeOfTheExistingNodes( GraphDatabaseService db, int nodes ) + { + for ( long i = 0; i < 100; i++ ) + { + db.createNode(); + } + + List deleted = new ArrayList<>( 100 ); + for ( int i = 0; i < 100; i++ ) + { + try + { + long id = random.nextLong( 0, nodes ); + db.getNodeById( id ).delete(); + deleted.add( id ); + } + catch ( NotFoundException ex ) + { + // already deleted, it doesn't matter let's proceed + } + } + Collections.sort( deleted ); + } + + private void createNodes( GraphDatabaseAPI db, int nodes ) + { + try ( Transaction tx = db.beginTx() ) + { + for ( int j = 0; j < nodes; j++ ) + { + db.createNode().getId(); + } + tx.success(); + } + } +}