From c90d669e228ad263a4464b3971af4043a9b4519f Mon Sep 17 00:00:00 2001 From: MishaDemianenko Date: Thu, 25 May 2017 16:45:55 +0200 Subject: [PATCH] Revert "Merge pull request #14 from davidegrohmann/3.3-parallel-scan-read-operations" This reverts commit 530cd9dd6f2874f7121a6905d6d7ad1a6b42f376, reversing changes made to 3f4bbe01a83134644e58ccb7793c647a9c78ac88. --- .../org/neo4j/kernel/NeoStoreDataSource.java | 33 +-- .../org/neo4j/kernel/api/ReadOperations.java | 6 +- .../TransactionCountingStateVisitor.java | 6 +- .../ConstraintEnforcingEntityOperations.java | 13 - .../impl/api/GuardingStatementOperations.java | 15 - .../kernel/impl/api/KernelTransactions.java | 35 ++- .../kernel/impl/api/OperationsFacade.java | 15 - .../api/StateHandlingStatementOperations.java | 15 +- .../api/operations/EntityReadOperations.java | 6 +- ...lNodeScan.java => AllNodeProgression.java} | 5 +- .../api/store}/BatchingLongProgression.java | 4 +- .../CommunityBatchingProgressionFactory.java | 45 --- .../kernel/impl/api/store/NodeCursor.java | 1 - ...eFetch.java => SingleNodeProgression.java} | 6 +- .../kernel/impl/api/store/StorageLayer.java | 27 +- .../kernel/impl/api/store/StoreStatement.java | 17 +- .../TxStateTransactionDataSnapshot.java | 6 +- .../impl/factory/CommunityEditionModule.java | 17 +- .../kernel/impl/factory/DataSourceModule.java | 1 - .../kernel/impl/factory/EditionModule.java | 26 +- .../recordstorage/RecordStorageEngine.java | 6 +- .../neo4j/kernel/impl/util/InstanceCache.java | 5 - .../api/BatchingProgressionFactory.java | 31 --- .../storageengine/api/StorageStatement.java | 4 +- .../storageengine/api/StoreReadLayer.java | 8 +- .../state/IndexQueryTransactionStateTest.java | 20 +- .../api/state/LabelTransactionStateTest.java | 10 +- .../api/state/SchemaTransactionStateTest.java | 4 +- .../StateHandlingStatementOperationsTest.java | 14 +- .../StateOperationsAutoIndexingTest.java | 8 +- ...nTest.java => AllNodeProgressionTest.java} | 10 +- .../kernel/impl/api/store/NodeCursorTest.java | 1 - ...st.java => SingleNodeProgressionTest.java} | 4 +- .../impl/api/store/StorageLayerLabelTest.java | 2 +- .../api/store/StorageLayerNodeAndRelTest.java | 3 +- .../api/store/StorageLayerPropertyTest.java | 2 +- .../StorageLayerRelTypesAndDegreeTest.java | 2 +- .../TxStateTransactionDataViewTest.java | 13 +- .../kernel/impl/store/NeoStoresTest.java | 14 +- .../test/rule/NeoStoreDataSourceRule.java | 3 +- .../test/rule/RecordStorageEngineRule.java | 3 +- .../collection/pool/LinkedQueuePool.java | 27 +- .../neo4j/collection/pool/MarshlandPool.java | 17 +- .../collection/pool/LinkedQueuePoolTest.java | 11 +- .../collection/pool/MarshlandPoolTest.java | 4 +- .../core/EnterpriseCoreEditionModule.java | 7 +- .../EnterpriseReadReplicaEditionModule.java | 7 +- .../factory/HighlyAvailableEditionModule.java | 7 +- ...ory.java => EnterpriseStoreStatement.java} | 22 +- ...n.java => ParallelAllNodeProgression.java} | 5 +- .../enterprise/EnterpriseEditionModule.java | 8 +- .../enterprise/PropertyExistenceEnforcer.java | 4 +- .../lock/forseti/ForsetiLockManager.java | 4 +- ...NodeScanReadOperationsIntegrationTest.java | 258 ------------------ ...java => EnterpriseStoreStatementTest.java} | 39 ++- ...va => ParallelAllNodeProgressionTest.java} | 10 +- 56 files changed, 241 insertions(+), 655 deletions(-) rename community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/{AllNodeScan.java => AllNodeProgression.java} (91%) rename community/kernel/src/main/java/org/neo4j/{storageengine/api => kernel/impl/api/store}/BatchingLongProgression.java (91%) delete mode 100644 community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/CommunityBatchingProgressionFactory.java rename community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/{SingleNodeFetch.java => SingleNodeProgression.java} (89%) delete mode 100644 community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingProgressionFactory.java rename community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/{AllNodeScanTest.java => AllNodeProgressionTest.java} (91%) rename community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/{SingleNodeFetchTest.java => SingleNodeProgressionTest.java} (92%) rename enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/{EnterpriseBatchingProgressionFactory.java => EnterpriseStoreStatement.java} (53%) rename enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/{ParallelAllNodeScan.java => ParallelAllNodeProgression.java} (94%) delete mode 100644 enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java rename enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/{StoreParallelNodeScanIntegrationTest.java => EnterpriseStoreStatementTest.java} (79%) rename enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/{ParallelAllNodeScanTest.java => ParallelAllNodeProgressionTest.java} (95%) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java index c025cc6ff285a..c2b314af14cd3 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/NeoStoreDataSource.java @@ -159,7 +159,6 @@ import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.Logger; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StoreFileMetadata; import org.neo4j.storageengine.api.StoreReadLayer; @@ -271,7 +270,6 @@ boolean applicable( DiagnosticsPhase phase ) private final AvailabilityGuard availabilityGuard; private final SystemNanoClock clock; private final StoreCopyCheckPointMutex storeCopyCheckPointMutex; - private final BatchingProgressionFactory progressionFactory; private Dependencies dependencies; private LifeSupport life; @@ -329,7 +327,6 @@ public NeoStoreDataSource( SystemNanoClock clock, AccessCapability accessCapability, StoreCopyCheckPointMutex storeCopyCheckPointMutex, - BatchingProgressionFactory progressionFactory, RecoveryCleanupWorkCollector recoveryCleanupWorkCollector ) { this.storeDir = storeDir; @@ -367,7 +364,6 @@ public NeoStoreDataSource( this.availabilityGuard = availabilityGuard; this.clock = clock; this.accessCapability = accessCapability; - this.progressionFactory = progressionFactory; this.recoveryCleanupWorkCollector = recoveryCleanupWorkCollector; readOnly = config.get( Configuration.read_only ); @@ -445,9 +441,9 @@ public void start() throws IOException SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering = new SynchronizedArrayIdOrderingQueue( 20 ); - storageEngine = buildStorageEngine( propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, - legacyIndexProviderLookup, indexConfigStore, updateableSchemaState::clear, - legacyIndexTransactionOrdering, progressionFactory ); + storageEngine = buildStorageEngine( + propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, legacyIndexProviderLookup, + indexConfigStore, updateableSchemaState::clear, legacyIndexTransactionOrdering ); LogEntryReader logEntryReader = new VersionAwareLogEntryReader<>( storageEngine.commandReaderFactory(), STRICT ); @@ -458,7 +454,7 @@ public void start() throws IOException buildTransactionLogs( storeDir, config, logProvider, scheduler, fs, storageEngine, logEntryReader, legacyIndexTransactionOrdering, transactionIdStore, logVersionRepository ); - transactionLogModule.satisfyDependencies( dependencies ); + transactionLogModule.satisfyDependencies(dependencies); buildRecovery( fs, transactionIdStore, @@ -576,24 +572,23 @@ private void upgradeStore( RecordFormats format ) format ).migrate( storeDir ); } - private StorageEngine buildStorageEngine( PropertyKeyTokenHolder propertyKeyTokenHolder, - LabelTokenHolder labelTokens, RelationshipTypeTokenHolder relationshipTypeTokens, + private StorageEngine buildStorageEngine( + PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHolder labelTokens, + RelationshipTypeTokenHolder relationshipTypeTokens, LegacyIndexProviderLookup legacyIndexProviderLookup, IndexConfigStore indexConfigStore, - Runnable schemaStateChangeCallback, SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering, - BatchingProgressionFactory progressionFactory ) + Runnable schemaStateChangeCallback, SynchronizedArrayIdOrderingQueue legacyIndexTransactionOrdering ) { // TODO we should break this dependency on the kernelModule (which has not yet been created at this point in // TODO the code) and instead let information about generations of transactions flow through the StorageEngine // TODO API Supplier transactionSnapshotSupplier = () -> kernelModule.kernelTransactions().get(); - RecordStorageEngine storageEngine = - new RecordStorageEngine( storeDir, config, idGeneratorFactory, eligibleForReuse, - idTypeConfigurationProvider, pageCache, fs, logProvider, propertyKeyTokenHolder, labelTokens, - relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, storageStatementFactory, - scheduler, tokenNameLookup, lockService, schemaIndexProvider, indexingServiceMonitor, - databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup, indexConfigStore, - legacyIndexTransactionOrdering, transactionSnapshotSupplier, progressionFactory ); + RecordStorageEngine storageEngine = new RecordStorageEngine( storeDir, config, idGeneratorFactory, + eligibleForReuse, idTypeConfigurationProvider, pageCache, fs, logProvider, propertyKeyTokenHolder, + labelTokens, relationshipTypeTokens, schemaStateChangeCallback, constraintSemantics, + storageStatementFactory, scheduler, tokenNameLookup, lockService, schemaIndexProvider, + indexingServiceMonitor, databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup, + indexConfigStore, legacyIndexTransactionOrdering, transactionSnapshotSupplier ); // We pretend that the storage engine abstract hides all details within it. Whereas that's mostly // true it's not entirely true for the time being. As long as we need this call below, which diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java index d86608e12f8b2..9f1e0970e7412 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/ReadOperations.java @@ -48,9 +48,9 @@ import org.neo4j.kernel.api.schema.SchemaDescriptor; import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptor; import org.neo4j.kernel.api.schema.index.IndexDescriptor; +import org.neo4j.kernel.impl.api.RelationshipVisitor; import org.neo4j.kernel.impl.api.store.RelationshipIterator; import org.neo4j.register.Register.DoubleLongRegister; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.RelationshipItem; @@ -212,10 +212,6 @@ long nodesCountIndexed( IndexDescriptor index, long nodeId, Object value ) Cursor nodeCursorById( long nodeId ) throws EntityNotFoundException; - Cursor nodeGeCursor( BatchingLongProgression progression ); - - BatchingLongProgression parallelNodeScan(); - Cursor relationshipCursorById( long relId ) throws EntityNotFoundException; Cursor nodeGetProperties( NodeItem node ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java index 92d926ad53e07..f2c8be8245f18 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/api/txstate/TransactionCountingStateVisitor.java @@ -65,7 +65,7 @@ public void visitCreatedNode( long id ) public void visitDeletedNode( long id ) { counts.incrementNodeCount( ANY_LABEL, -1 ); - storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY ) + storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY ) .forAll( this::decrementCountForLabelsAndRelationships ); super.visitDeletedNode( id ); } @@ -125,7 +125,7 @@ public void visitNodeLabelChanges( long id, final Set added, final Set< } // get the relationship counts from *before* this transaction, // the relationship changes will compensate for what happens during the transaction - storeLayer.nodeGetSingleCursor( statement, id, ReadableTransactionState.EMPTY ) + storeLayer.nodeCursor( statement, id, ReadableTransactionState.EMPTY ) .forAll( node -> storeLayer.degrees( statement, node, ( type, out, in ) -> { added.forEach( label -> updateRelationshipsCountsFromDegrees( type, label, out, in ) ); @@ -161,6 +161,6 @@ private void updateRelationshipCount( long startNode, int type, long endNode, in private void visitLabels( long nodeId, PrimitiveIntVisitor visitor ) { - storeLayer.nodeGetSingleCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) ); + storeLayer.nodeCursor( statement, nodeId, txState ).forAll( node -> node.labels().visitKeys( visitor ) ); } } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java index a38df056c7490..ef2a5d536147c 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/ConstraintEnforcingEntityOperations.java @@ -72,7 +72,6 @@ import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.locking.LockTracer; import org.neo4j.kernel.impl.locking.Locks; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -466,18 +465,6 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement state ) return entityReadOperations.relationshipsGetAll( state ); } - @Override - public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ) - { - return entityReadOperations.parallelNodeScanProgression( statement ); - } - - @Override - public Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ) - { - return entityReadOperations.nodeGetCursor( statement, progression ); - } - @Override public Cursor nodeGetAllCursor( KernelStatement statement ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java index 1d339a7d40986..530d670c96937 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/GuardingStatementOperations.java @@ -39,7 +39,6 @@ import org.neo4j.kernel.guard.Guard; import org.neo4j.kernel.impl.api.operations.EntityReadOperations; import org.neo4j.kernel.impl.api.operations.EntityWriteOperations; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -235,20 +234,6 @@ public PrimitiveLongIterator relationshipsGetAll( KernelStatement statement ) return entityReadDelegate.relationshipsGetAll( statement ); } - @Override - public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ) - { - guard.check( statement ); - return entityReadDelegate.parallelNodeScanProgression( statement ); - } - - @Override - public Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ) - { - guard.check( statement ); - return entityReadDelegate.nodeGetCursor( statement, progression ); - } - @Override public Cursor nodeGetAllCursor( KernelStatement statement ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java index 5849527d92922..8da15226542e8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/KernelTransactions.java @@ -96,9 +96,10 @@ public class KernelTransactions extends LifecycleAdapter implements Supplier allTransactions = newSetFromMap( new ConcurrentHashMap<>() ); + // This is the factory that actually builds brand-new instances. + private final Factory factory = new KernelTransactionImplementationFactory( allTransactions ); // Global pool of transactions, wrapped by the thread-local marshland pool and so is not used directly. - private final LinkedQueuePool globalTxPool = - new GlobalKernelTransactionPool( allTransactions ); + private final LinkedQueuePool globalTxPool = new GlobalKernelTransactionPool( allTransactions, factory ); // Pool of unused transactions. private final MarshlandPool localTxPool = new MarshlandPool<>( globalTxPool ); @@ -287,6 +288,17 @@ KernelTransactionHandle createHandle( KernelTransactionImplementation tx ) return new KernelTransactionImplementationHandle( tx ); } + /** + * Get all transactions + * *

+ * Note: this method is package-private for testing only. + * @return set of all kernel transaction + */ + Set getAllTransactions() + { + return allTransactions; + } + private void assertRunning() { if ( availabilityGuard.isShutdown() ) @@ -308,18 +320,17 @@ private void assertCurrentThreadIsNotBlockingNewTransactions() } } - private class GlobalKernelTransactionPool extends LinkedQueuePool + private class KernelTransactionImplementationFactory implements Factory { private Set transactions; - GlobalKernelTransactionPool( Set transactions ) + KernelTransactionImplementationFactory( Set transactions ) { - super( 8 ); this.transactions = transactions; } @Override - protected KernelTransactionImplementation create() + public KernelTransactionImplementation newInstance() { KernelTransactionImplementation tx = new KernelTransactionImplementation( statementOperations, schemaWriteGuard, hooks, @@ -330,6 +341,18 @@ protected KernelTransactionImplementation create() this.transactions.add( tx ); return tx; } + } + + private class GlobalKernelTransactionPool extends LinkedQueuePool + { + private Set transactions; + + GlobalKernelTransactionPool( Set transactions, + Factory factory ) + { + super( 8, factory ); + this.transactions = transactions; + } @Override protected void dispose( KernelTransactionImplementation tx ) diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java index fb1a0b343b4fa..8a808f898daf5 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/OperationsFacade.java @@ -105,7 +105,6 @@ import org.neo4j.kernel.impl.proc.Procedures; import org.neo4j.kernel.impl.query.clientconnection.ClientConnectionInfo; import org.neo4j.register.Register.DoubleLongRegister; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; import org.neo4j.storageengine.api.RelationshipItem; @@ -555,20 +554,6 @@ public Cursor nodeCursorById( long nodeId ) throws EntityNotFoundExcep return dataRead().nodeCursorById( statement, nodeId ); } - @Override - public Cursor nodeGeCursor( BatchingLongProgression progression ) - { - statement.assertOpen(); - return dataRead().nodeGetCursor( statement, progression ); - } - - @Override - public BatchingLongProgression parallelNodeScan() - { - statement.assertOpen(); - return dataRead().parallelNodeScanProgression( statement ); - } - @Override public Cursor relationshipCursorById( long relId ) throws EntityNotFoundException { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java index 00abbae445d5c..a66ab82b2b8c3 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/StateHandlingStatementOperations.java @@ -89,7 +89,6 @@ import org.neo4j.kernel.impl.index.IndexEntityType; import org.neo4j.kernel.impl.index.LegacyIndexStore; import org.neo4j.register.Register.DoubleLongRegister; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.EntityType; import org.neo4j.storageengine.api.NodeItem; @@ -141,18 +140,6 @@ public StateHandlingStatementOperations( StoreReadLayer storeLayer, AutoIndexing // - @Override - public BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ) - { - return storeLayer.parallelNodeScanProgression( statement.storageStatement() ); - } - - @Override - public Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ) - { - return storeLayer.nodeGetCursor( statement.storageStatement(), progression, statement.readableTxState() ); - } - @Override public Cursor nodeGetAllCursor( KernelStatement statement ) { @@ -173,7 +160,7 @@ public Cursor nodeCursorById( KernelStatement statement, long nodeId ) private Cursor nodeCursor( KernelStatement statement, long nodeId ) { - return storeLayer.nodeGetSingleCursor( statement.storageStatement(), nodeId, statement.readableTxState() ); + return storeLayer.nodeCursor( statement.storageStatement(), nodeId, statement.readableTxState() ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java index ba20ad34669fd..87cc27cb4b27f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/operations/EntityReadOperations.java @@ -32,7 +32,7 @@ import org.neo4j.kernel.api.schema.IndexQuery; import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.impl.api.KernelStatement; -import org.neo4j.storageengine.api.BatchingLongProgression; +import org.neo4j.kernel.impl.api.RelationshipVisitor; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -83,10 +83,6 @@ long nodesCountIndexed( KernelStatement statement, IndexDescriptor index, long n PrimitiveLongIterator relationshipsGetAll( KernelStatement state ); - BatchingLongProgression parallelNodeScanProgression( KernelStatement statement ); - - Cursor nodeGetCursor( KernelStatement statement, BatchingLongProgression progression ); - Cursor nodeGetAllCursor( KernelStatement statement ); Cursor nodeCursorById( KernelStatement statement, long nodeId ) throws EntityNotFoundException; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/AllNodeScan.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/AllNodeProgression.java similarity index 91% rename from community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/AllNodeScan.java rename to community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/AllNodeProgression.java index d143cb0d9e619..3b023e67830ec 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/AllNodeScan.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/AllNodeProgression.java @@ -20,16 +20,15 @@ package org.neo4j.kernel.impl.api.store; import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.storageengine.api.BatchingLongProgression; -public class AllNodeScan implements BatchingLongProgression +public class AllNodeProgression implements BatchingLongProgression { private final NodeStore nodeStore; private long start; private boolean done; - AllNodeScan( NodeStore nodeStore ) + AllNodeProgression( NodeStore nodeStore ) { this.nodeStore = nodeStore; this.start = nodeStore.getNumberOfReservedLowIds(); diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingLongProgression.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/BatchingLongProgression.java similarity index 91% rename from community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingLongProgression.java rename to community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/BatchingLongProgression.java index f14593fb77b19..cfe014f7caa27 100644 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingLongProgression.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/BatchingLongProgression.java @@ -17,9 +17,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -package org.neo4j.storageengine.api; - -import org.neo4j.kernel.impl.api.store.Batch; +package org.neo4j.kernel.impl.api.store; public interface BatchingLongProgression { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/CommunityBatchingProgressionFactory.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/CommunityBatchingProgressionFactory.java deleted file mode 100644 index 755b92ab99595..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/CommunityBatchingProgressionFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.kernel.impl.api.store; - -import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.storageengine.api.BatchingLongProgression; -import org.neo4j.storageengine.api.BatchingProgressionFactory; - -public class CommunityBatchingProgressionFactory implements BatchingProgressionFactory -{ - @Override - public BatchingLongProgression singleNodeFetch( long nodeId ) - { - return new SingleNodeFetch( nodeId ); - } - - @Override - public BatchingLongProgression allNodeScan( NodeStore nodeStore ) - { - return new AllNodeScan( nodeStore ); - } - - @Override - public BatchingLongProgression parallelAllNodeScan( NodeStore nodeStore ) - { - throw new UnsupportedOperationException( "Parallel All Node Scan is not available in Neo4j Community Edition" ); - } -} diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java index f2a41e04077cf..3122b8cca9ce0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/NodeCursor.java @@ -33,7 +33,6 @@ import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.store.record.Record; import org.neo4j.kernel.impl.util.IoPrimitiveUtils; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.txstate.NodeState; import org.neo4j.storageengine.api.txstate.NodeTransactionStateView; diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/SingleNodeFetch.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/SingleNodeProgression.java similarity index 89% rename from community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/SingleNodeFetch.java rename to community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/SingleNodeProgression.java index 092ad8d320de2..f32b94aa4746f 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/SingleNodeFetch.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/SingleNodeProgression.java @@ -19,15 +19,13 @@ */ package org.neo4j.kernel.impl.api.store; -import org.neo4j.storageengine.api.BatchingLongProgression; - import static org.neo4j.kernel.api.StatementConstants.NO_SUCH_NODE; -public class SingleNodeFetch implements BatchingLongProgression +public class SingleNodeProgression implements BatchingLongProgression { private long nodeId; - public SingleNodeFetch( long nodeId ) + public SingleNodeProgression( long nodeId ) { this.nodeId = nodeId; } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java index 101acb42af074..a663b1fcf4fa7 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StorageLayer.java @@ -61,8 +61,6 @@ import org.neo4j.kernel.impl.transaction.state.PropertyLoader; import org.neo4j.register.Register; import org.neo4j.register.Register.DoubleLongRegister; -import org.neo4j.storageengine.api.BatchingLongProgression; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -103,12 +101,10 @@ public class StorageLayer implements StoreReadLayer private final PropertyLoader propertyLoader; private final Supplier statementProvider; private final SchemaCache schemaCache; - private final BatchingProgressionFactory progressionFactory; public StorageLayer( PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHolder labelTokenHolder, RelationshipTypeTokenHolder relationshipTokenHolder, SchemaStorage schemaStorage, NeoStores neoStores, - IndexingService indexService, Supplier storeStatementSupplier, SchemaCache schemaCache, - BatchingProgressionFactory progressionFactory ) + IndexingService indexService, Supplier storeStatementSupplier, SchemaCache schemaCache ) { this.relationshipTokenHolder = relationshipTokenHolder; this.schemaStorage = schemaStorage; @@ -121,7 +117,6 @@ public StorageLayer( PropertyKeyTokenHolder propertyKeyTokenHolder, LabelTokenHo this.counts = neoStores.getCounts(); this.propertyLoader = new PropertyLoader( neoStores ); this.schemaCache = schemaCache; - this.progressionFactory = progressionFactory; } @Override @@ -399,30 +394,16 @@ public RelationshipIterator relationshipsGetAll() return new AllRelationshipIterator( relationshipStore ); } - @Override - public BatchingLongProgression parallelNodeScanProgression( StorageStatement statement ) - { - return progressionFactory.parallelAllNodeScan( nodeStore ); - } - - @Override - public Cursor nodeGetCursor( StorageStatement statement, BatchingLongProgression progression, - NodeTransactionStateView stateView ) - { - return statement.acquireNewNodeCursor( progression, stateView ); - } - @Override public Cursor nodeGetAllCursor( StorageStatement statement, NodeTransactionStateView stateView ) { - return statement.acquireNodeCursor( progressionFactory.allNodeScan( nodeStore ), stateView ); + return statement.acquireNodeCursor( new AllNodeProgression( nodeStore ), stateView ); } @Override - public Cursor nodeGetSingleCursor( StorageStatement statement, long nodeId, - NodeTransactionStateView stateView ) + public Cursor nodeCursor( StorageStatement statement, long nodeId, NodeTransactionStateView stateView ) { - return statement.acquireNodeCursor( progressionFactory.singleNodeFetch( nodeId ), stateView ); + return statement.acquireNodeCursor( new SingleNodeProgression( nodeId ), stateView ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java index e64bcd4ff573e..1f69c762b47f8 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/api/store/StoreStatement.java @@ -29,7 +29,6 @@ import org.neo4j.kernel.impl.locking.LockService; import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.util.InstanceCache; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.Direction; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -51,7 +50,7 @@ */ public class StoreStatement implements StorageStatement { - private final InstanceCache nodeCursor; + protected final InstanceCache nodeCursor; private final InstanceCache singleRelationshipCursor; private final InstanceCache iteratorRelationshipCursor; private final InstanceCache nodeRelationshipsCursor; @@ -62,7 +61,6 @@ public class StoreStatement implements StorageStatement private final NeoStores neoStores; private final Supplier indexReaderFactorySupplier; private final Supplier labelScanStore; - private final LockService lockService; private IndexReaderFactory indexReaderFactory; private LabelScanReader labelScanReader; @@ -76,7 +74,6 @@ public StoreStatement( NeoStores neoStores, Supplier indexRe this.neoStores = neoStores; this.indexReaderFactorySupplier = indexReaderFactory; this.labelScanStore = labelScanReaderSupplier; - this.lockService = lockService; nodeCursor = new InstanceCache() { @@ -155,11 +152,15 @@ public void acquire() } @Override - public Cursor acquireNewNodeCursor( BatchingLongProgression progression, - NodeTransactionStateView stateView ) + public BatchingLongProgression parallelNodeScanProgression() { - return new NodeCursor( neoStores.getNodeStore(), InstanceCache::noCache, lockService ) - .init( progression, stateView ); + throw unsupportedOperation(); + } + + private UnsupportedOperationException unsupportedOperation() + { + return new UnsupportedOperationException( "This operation is not supported in community edition but only in " + + "enterprise edition" ); } @Override diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java index 6c99340d7485c..df52a59efd464 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataSnapshot.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.Map; @@ -38,6 +39,7 @@ import org.neo4j.kernel.api.exceptions.LabelNotFoundKernelException; import org.neo4j.kernel.api.exceptions.PropertyKeyIdNotFoundKernelException; import org.neo4j.kernel.api.properties.DefinedProperty; +import org.neo4j.kernel.impl.api.KernelTransactionImplementation; import org.neo4j.kernel.impl.core.NodeProxy; import org.neo4j.kernel.impl.core.RelationshipProxy; import org.neo4j.kernel.impl.core.RelationshipProxy.RelationshipActions; @@ -194,7 +196,7 @@ private void takeSnapshot() { for ( long nodeId : state.addedAndRemovedNodes().getRemoved() ) { - try ( Cursor node = store.nodeGetSingleCursor( storeStatement, nodeId, EMPTY ) ) + try ( Cursor node = store.nodeCursor( storeStatement, nodeId, EMPTY ) ) { if ( node.next() ) { @@ -355,7 +357,7 @@ private Object committedValue( NodeState nodeState, int property ) return null; } - try ( Cursor node = store.nodeGetSingleCursor( storeStatement, nodeState.getId(), EMPTY ) ) + try ( Cursor node = store.nodeCursor( storeStatement, nodeState.getId(), EMPTY ) ) { if ( !node.next() ) { diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java index fced947d22a4e..d30c3b8658472 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/CommunityEditionModule.java @@ -36,7 +36,6 @@ import org.neo4j.kernel.api.security.UserManagerSupplier; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.api.SchemaWriteGuard; -import org.neo4j.kernel.impl.api.store.CommunityBatchingProgressionFactory; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics; @@ -57,6 +56,7 @@ import org.neo4j.kernel.impl.locking.community.CommunityLockManger; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.proc.Procedures; +import org.neo4j.kernel.impl.storageengine.impl.recordstorage.StorageStatementFactory; import org.neo4j.kernel.impl.store.id.DefaultIdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdReuseEligibility; @@ -70,7 +70,6 @@ import org.neo4j.kernel.internal.KernelData; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.kernel.lifecycle.LifecycleStatus; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.udc.UsageData; /** @@ -128,9 +127,7 @@ public CommunityEditionModule( PlatformModule platformModule ) constraintSemantics = createSchemaRuleVerifier(); - storageStatementFactory = StoreStatement::new; - - progressionFactory = platformModule.dependencies.satisfyDependency( createProgressionFactory() ); + storageStatementFactory = createStorageStatementFactory(); coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); @@ -145,11 +142,6 @@ public CommunityEditionModule( PlatformModule platformModule ) dependencies.satisfyDependency( createSessionTracker() ); } - protected BatchingProgressionFactory createProgressionFactory() - { - return new CommunityBatchingProgressionFactory(); - } - static Predicate fileWatcherFileNameFilter() { return Predicates.any( @@ -168,6 +160,11 @@ protected ConstraintSemantics createSchemaRuleVerifier() return new StandardConstraintSemantics(); } + protected StorageStatementFactory createStorageStatementFactory() + { + return StoreStatement::new; + } + protected StatementLocksFactory createStatementLocksFactory( Locks locks, Config config, LogService logService ) { return new SimpleStatementLocksFactory( locks ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/DataSourceModule.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/DataSourceModule.java index 0dceaaac7bb05..9d21d0d62bbd3 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/DataSourceModule.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/DataSourceModule.java @@ -221,7 +221,6 @@ public DataSourceModule( final PlatformModule platformModule, EditionModule edit platformModule.availabilityGuard, platformModule.clock, editionModule.accessCapability, platformModule.storeCopyCheckPointMutex, - editionModule.progressionFactory, platformModule.recoveryCleanupWorkCollector ) ); dataSourceManager.register( neoStoreDataSource ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java index 52211ed560927..18617005816cb 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/factory/EditionModule.java @@ -58,7 +58,6 @@ import org.neo4j.kernel.internal.KernelDiagnostics; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.Log; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.udc.UsageData; import org.neo4j.udc.UsageDataKeys; @@ -70,8 +69,18 @@ */ public abstract class EditionModule { - public IdGeneratorFactory idGeneratorFactory; + void registerProcedures( Procedures procedures ) throws KernelException + { + procedures.registerProcedure( org.neo4j.kernel.builtinprocs.BuiltInProcedures.class ); + procedures.registerProcedure( org.neo4j.kernel.builtinprocs.TokenProcedures.class ); + procedures.registerProcedure( org.neo4j.kernel.builtinprocs.BuiltInDbmsProcedures.class ); + registerEditionSpecificProcedures( procedures ); + } + + protected abstract void registerEditionSpecificProcedures( Procedures procedures ) throws KernelException; + + public IdGeneratorFactory idGeneratorFactory; public IdTypeConfigurationProvider idTypeConfigurationProvider; public LabelTokenHolder labelTokenHolder; @@ -106,8 +115,6 @@ public abstract class EditionModule public FileSystemWatcherService watcherService; - public BatchingProgressionFactory progressionFactory; - protected FileSystemWatcherService createFileSystemWatcherService( FileSystemAbstraction fileSystem, File storeDir, LogService logging, JobScheduler jobScheduler, Predicate fileNameFilter ) { @@ -221,15 +228,4 @@ protected BoltConnectionTracker createSessionTracker() { return BoltConnectionTracker.NOOP; } - - void registerProcedures( Procedures procedures ) throws KernelException - { - procedures.registerProcedure( org.neo4j.kernel.builtinprocs.BuiltInProcedures.class ); - procedures.registerProcedure( org.neo4j.kernel.builtinprocs.TokenProcedures.class ); - procedures.registerProcedure( org.neo4j.kernel.builtinprocs.BuiltInDbmsProcedures.class ); - - registerEditionSpecificProcedures( procedures ); - } - - protected abstract void registerEditionSpecificProcedures( Procedures procedures ) throws KernelException; } diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java index c4bb7bd8066a9..28d96670916d0 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/storageengine/impl/recordstorage/RecordStorageEngine.java @@ -111,7 +111,6 @@ import org.neo4j.logging.LogProvider; import org.neo4j.storageengine.api.CommandReaderFactory; import org.neo4j.storageengine.api.CommandsToApply; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageStatement; @@ -195,8 +194,7 @@ public RecordStorageEngine( LegacyIndexProviderLookup legacyIndexProviderLookup, IndexConfigStore indexConfigStore, IdOrderingQueue legacyIndexTransactionOrdering, - Supplier transactionsSnapshotSupplier, - BatchingProgressionFactory progressionFactory ) + Supplier transactionsSnapshotSupplier ) { this.propertyKeyTokenHolder = propertyKeyTokenHolder; this.relationshipTypeTokenHolder = relationshipTypeTokens; @@ -239,7 +237,7 @@ public RecordStorageEngine( storeLayer = new StorageLayer( propertyKeyTokenHolder, labelTokens, relationshipTypeTokens, schemaStorage, neoStores, indexingService, - storeStatementSupplier, schemaCache, progressionFactory ); + storeStatementSupplier, schemaCache ); legacyIndexApplierLookup = new LegacyIndexApplierLookup.Direct( legacyIndexProviderLookup ); diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java index 9c9cb2655ee82..7981b0bc197aa 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/util/InstanceCache.java @@ -76,9 +76,4 @@ public void close() instance.dispose(); } } - - public static void noCache( T instance ) - { - - } } diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingProgressionFactory.java b/community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingProgressionFactory.java deleted file mode 100644 index d30019c098825..0000000000000 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/BatchingProgressionFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ -package org.neo4j.storageengine.api; - -import org.neo4j.kernel.impl.store.NodeStore; - -public interface BatchingProgressionFactory -{ - BatchingLongProgression singleNodeFetch( long nodeId ); - - BatchingLongProgression allNodeScan( NodeStore nodeStore ); - - BatchingLongProgression parallelAllNodeScan( NodeStore nodeStore ); -} diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java b/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java index 24c3d639ab7c0..4159cc07feed0 100644 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java +++ b/community/kernel/src/main/java/org/neo4j/storageengine/api/StorageStatement.java @@ -23,6 +23,7 @@ import org.neo4j.kernel.api.exceptions.index.IndexNotFoundKernelException; import org.neo4j.kernel.api.schema.index.IndexDescriptor; import org.neo4j.kernel.impl.api.store.NodeDegreeCounter; +import org.neo4j.kernel.impl.api.store.BatchingLongProgression; import org.neo4j.kernel.impl.locking.Lock; import org.neo4j.storageengine.api.schema.IndexReader; import org.neo4j.storageengine.api.schema.LabelScanReader; @@ -66,8 +67,7 @@ public interface StorageStatement extends AutoCloseable @Override void close(); - // FIXME: this is a temporary workaround until we have a way to cache cursors thread safely in the transaction - Cursor acquireNewNodeCursor( BatchingLongProgression progression, NodeTransactionStateView stateView ); + BatchingLongProgression parallelNodeScanProgression(); /** * Acquires {@link Cursor} capable of {@link Cursor#get() serving} {@link NodeItem} for selected nodes. diff --git a/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java b/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java index 29a0313dbf528..0624eb889c1f1 100644 --- a/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java +++ b/community/kernel/src/main/java/org/neo4j/storageengine/api/StoreReadLayer.java @@ -266,14 +266,9 @@ IndexReader indexGetFreshReader( StorageStatement storeStatement, IndexDescripto */ RelationshipIterator relationshipsGetAll(); - BatchingLongProgression parallelNodeScanProgression( StorageStatement statement ); - - Cursor nodeGetCursor( StorageStatement statement, BatchingLongProgression progression, - NodeTransactionStateView stateView ); - Cursor nodeGetAllCursor( StorageStatement storeStatement, NodeTransactionStateView stateView ); - Cursor nodeGetSingleCursor( StorageStatement storeStatement, long nodeId, NodeTransactionStateView stateView ); + Cursor nodeCursor( StorageStatement storeStatement, long nodeId, NodeTransactionStateView stateView ); Cursor relationshipCursor( StorageStatement storeStatement, long relationshipId, ReadableTransactionState state ); @@ -398,4 +393,5 @@ int countDegrees( StorageStatement statement, NodeItem node, Direction direction ReadableTransactionState state ); T getOrCreateSchemaDependantState( Class type, Function factory ); + } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java index 0d6f319b8e41f..9f9b5c330c0a7 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/IndexQueryTransactionStateTest.java @@ -116,7 +116,7 @@ public void shouldExcludeRemovedNodesFromIndexQuery() throws Exception long nodeId = 2L; when( indexReader.query( withValue ) ).then( answerAsPrimitiveLongIteratorFrom( asList( 1L, nodeId, 3L ) ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId ) ); txContext.nodeDelete( state, nodeId ); @@ -135,7 +135,7 @@ public void shouldExcludeRemovedNodeFromUniqueIndexQuery() throws Exception long nodeId = 1L; when( indexReader.query( withValue ) ).thenReturn( asPrimitiveResourceIterator( nodeId ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId ) ); txContext.nodeDelete( state, nodeId ); @@ -185,7 +185,7 @@ public void shouldIncludeCreatedNodesWithCorrectLabelAndProperty() throws Except long nodeId = 1L; state.writableTxState().nodeDoAddProperty( nodeId, stringProperty( propertyKeyId, value ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -208,7 +208,7 @@ public void shouldIncludeUniqueCreatedNodeWithCorrectLabelAndProperty() throws E long nodeId = 1L; state.writableTxState().nodeDoAddProperty( nodeId, stringProperty( propertyKeyId, value ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -230,7 +230,7 @@ public void shouldIncludeExistingNodesWithCorrectPropertyAfterAddingLabel() thro long nodeId = 1L; - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -252,7 +252,7 @@ public void shouldIncludeExistingUniqueNodeWithCorrectPropertyAfterAddingLabel() long nodeId = 2L; - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L ) ); mockStoreProperty(); @@ -273,7 +273,7 @@ public void shouldExcludeExistingNodesWithCorrectPropertyAfterRemovingLabel() th long nodeId = 1L; when( indexReader.query( withValue ) ).then( answerAsPrimitiveLongIteratorFrom( asList( nodeId, 2L, 3L ) ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L, labels( labelId ) ) ); mockStoreProperty(); @@ -293,7 +293,7 @@ public void shouldExcludeExistingUniqueNodeWithCorrectPropertyAfterRemovingLabel long nodeId = 1L; when( indexReader.query( withValue ) ).thenReturn( asPrimitiveResourceIterator( nodeId ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40L, labels( labelId ) ) ); mockStoreProperty(); @@ -315,7 +315,7 @@ public void shouldExcludeNodesWithRemovedProperty() throws Exception long nodeId = 1L; state.writableTxState().nodeDoAddProperty( nodeId, intProperty( propertyKeyId, 10 ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, labels( labelId ) ) ); txContext.nodeAddLabel( state, nodeId, labelId ); @@ -334,7 +334,7 @@ public void shouldExcludeUniqueNodeWithRemovedProperty() throws Exception long nodeId = 1L; when( indexReader.query( withValue ) ).thenReturn( asPrimitiveResourceIterator( nodeId ) ); - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeId, 40, labels( labelId ) ) ); mockStoreProperty(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java index df0a8e512e01f..d8a6d9fe72f64 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/LabelTransactionStateTest.java @@ -243,7 +243,7 @@ public void removingNonExistentLabelFromNodeShouldRespondFalse() throws Exceptio public void should_return_true_when_adding_new_label() throws Exception { // GIVEN - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -259,7 +259,7 @@ public void should_return_true_when_adding_new_label() throws Exception public void should_return_false_when_adding_existing_label() throws Exception { // GIVEN - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L, StubCursors.labels( 12 ) ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -275,7 +275,7 @@ public void should_return_false_when_adding_existing_label() throws Exception public void should_return_true_when_removing_existing_label() throws Exception { // GIVEN - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L, StubCursors.labels( 12 ) ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -291,7 +291,7 @@ public void should_return_true_when_removing_existing_label() throws Exception public void should_return_true_when_removing_non_existant_label() throws Exception { // GIVEN - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + when( store.nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 1337L ) ); // WHEN @@ -334,7 +334,7 @@ private void commitLabels( Labels... labels ) throws Exception Map> allLabels = new HashMap<>(); for ( Labels nodeLabels : labels ) { - when( store.nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeLabels.nodeId ), + when( store.nodeCursor( any( StorageStatement.class ), eq( nodeLabels.nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( nodeLabels.nodeId, StubCursors.labels( nodeLabels.labelIds ) ) ); when( store.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/SchemaTransactionStateTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/SchemaTransactionStateTest.java index e845c25c04a72..e3e37e2f5d6ea 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/SchemaTransactionStateTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/SchemaTransactionStateTest.java @@ -39,7 +39,7 @@ import org.neo4j.kernel.impl.api.StateHandlingStatementOperations; import org.neo4j.kernel.impl.api.StatementOperationsTestHelper; import org.neo4j.kernel.impl.api.legacyindex.InternalAutoIndexing; -import org.neo4j.kernel.impl.api.store.SingleNodeFetch; +import org.neo4j.kernel.impl.api.store.SingleNodeProgression; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.index.LegacyIndexStore; import org.neo4j.storageengine.api.StoreReadLayer; @@ -276,7 +276,7 @@ private void commitLabels( Labels... labels ) throws Exception Map> allLabels = new HashMap<>(); for ( Labels nodeLabels : labels ) { - when( storeStatement.acquireNodeCursor( new SingleNodeFetch( nodeLabels.nodeId ), EMPTY ) ) + when( storeStatement.acquireNodeCursor( new SingleNodeProgression( nodeLabels.nodeId ), EMPTY ) ) .thenReturn( asNodeCursor( nodeLabels.nodeId, StubCursors.labels( nodeLabels.labelIds ) ) ); for ( int label : nodeLabels.labelIds ) diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java index 8c707d9569b81..e9a7b0f50d3f9 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateHandlingStatementOperationsTest.java @@ -95,9 +95,8 @@ public class StateHandlingStatementOperationsTest public void shouldNeverDelegateWrites() throws Exception { KernelStatement state = mockedState( new TxState() ); - when( inner.indexesGetForLabel( 0 ) ).thenReturn( iterator( IndexDescriptorFactory.forLabel( 0, 0 ) ) ); - when( inner.nodeGetSingleCursor( any( StorageStatement.class ), anyLong(), any( ReadableTransactionState.class ) ) ) + when( inner.nodeCursor( any( StorageStatement.class ), anyLong(), any( ReadableTransactionState.class ) ) ) .thenReturn( asNodeCursor( 0 ) ); when( inner.nodeGetProperties( any( StorageStatement.class ), any( NodeItem.class ), any( PropertyContainerState.class ) ) ).thenReturn( asPropertyCursor() ); @@ -113,7 +112,7 @@ public void shouldNeverDelegateWrites() throws Exception // one for add and one for remove verify( inner, times( 2 ) ) - .nodeGetSingleCursor( any( StorageStatement.class ), eq( 0L ), any( ReadableTransactionState.class ) ); + .nodeCursor( any( StorageStatement.class ), eq( 0L ), any( ReadableTransactionState.class ) ); } @Test @@ -416,9 +415,10 @@ public void shouldConsiderTransactionStateDuringIndexBetweenRangeSeekByNumberWit IndexQuery.NumberRangePredicate indexQuery = IndexQuery.range( index.schema().getPropertyId(), lower, true, upper, false ); when( indexReader.query( indexQuery ) ).thenReturn( - PrimitiveLongCollections.resourceIterator( PrimitiveLongCollections.iterator( 43L, 44L, 46L ), null ) ); - when( storeReadLayer.nodeGetSingleCursor( any( StorageStatement.class ), anyLong(), - any( ReadableTransactionState.class ) ) ).thenAnswer( invocationOnMock -> + PrimitiveLongCollections.resourceIterator( PrimitiveLongCollections.iterator( 43L, 44L, 46L ), null ) + ); + when( storeReadLayer.nodeCursor( any( StorageStatement.class ), anyLong(), any( ReadableTransactionState.class ) ) ) + .thenAnswer( invocationOnMock -> { long nodeId = (long) invocationOnMock.getArguments()[1]; when( storeReadLayer @@ -501,7 +501,7 @@ public void shouldNotRecordNodeSetPropertyOnSameValue() throws Exception when( kernelStatement.readableTxState() ).thenReturn( ReadableTransactionState.EMPTY ); Cursor ourNode = nodeCursorWithProperty( propertyKeyId ); when( storeReadLayer - .nodeGetSingleCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) + .nodeCursor( any( StorageStatement.class ), eq( nodeId ), any( ReadableTransactionState.class ) ) ) .thenReturn( ourNode ); InternalAutoIndexing autoIndexing = mock( InternalAutoIndexing.class ); AutoIndexOperations autoIndexOps = mock( AutoIndexOperations.class ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java index 5fd2f4bc31950..0e1ad33b53e44 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/state/StateOperationsAutoIndexingTest.java @@ -76,7 +76,7 @@ public void shouldSignalNodeRemovedToAutoIndex() throws Exception { // Given when( storeLayer - .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( mock( NodeItem.class ) ) ); // When @@ -109,7 +109,7 @@ public void shouldSignalNodePropertyAddedToAutoIndex() throws Exception NodeItem node = mock( NodeItem.class ); when( node.labels() ).thenReturn( PrimitiveIntCollections.emptySet() ); when( storeLayer - .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( node ) ); when( storeLayer .nodeGetProperty( any( StorageStatement.class ), any( NodeItem.class ), eq( property.propertyKeyId() ), @@ -156,7 +156,7 @@ public void shouldSignalNodePropertyChangedToAutoIndex() throws Exception NodeItem node = mock( NodeItem.class ); when( node.labels() ).thenReturn( PrimitiveIntCollections.emptySet() ); when( storeLayer - .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( node ) ); when( storeLayer .nodeGetProperty( any( StorageStatement.class ), any( NodeItem.class ), eq( property.propertyKeyId() ), @@ -209,7 +209,7 @@ public void shouldSignalNodePropertyRemovedToAutoIndex() throws Exception .thenReturn( cursor( existingProperty ) ); when( node.labels() ).thenReturn( PrimitiveIntCollections.emptySet() ); when( storeLayer - .nodeGetSingleCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) + .nodeCursor( any( StorageStatement.class ), eq( 1337L ), any( ReadableTransactionState.class ) ) ) .thenReturn( cursor( node ) ); // When diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/AllNodeScanTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/AllNodeProgressionTest.java similarity index 91% rename from community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/AllNodeScanTest.java rename to community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/AllNodeProgressionTest.java index 77f095ae32d2f..c9adb466a0c38 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/AllNodeScanTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/AllNodeProgressionTest.java @@ -29,7 +29,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class AllNodeScanTest +public class AllNodeProgressionTest { private final int start = 1; private final long end = 42L; @@ -46,7 +46,7 @@ public class AllNodeScanTest public void shouldReturnABatchFromLowReservedIdsToHighIdPossibleInUse() throws Throwable { // given - AllNodeScan progression = new AllNodeScan( nodeStore ); + AllNodeProgression progression = new AllNodeProgression( nodeStore ); // when boolean hasNext = progression.nextBatch( batch ); @@ -62,7 +62,7 @@ public void shouldReturnABatchFromLowReservedIdsToHighIdPossibleInUse() throws T public void shouldCheckIfTheHighIdHasChangedAndIssueAnExtraBatchWithTheRemainingElements() throws Throwable { // given - AllNodeScan progression = new AllNodeScan( nodeStore ); + AllNodeProgression progression = new AllNodeProgression( nodeStore ); assertTrue( progression.nextBatch( batch ) ); checkBatch( start, end ); @@ -79,7 +79,7 @@ public void shouldCheckIfTheHighIdHasChangedAndIssueAnExtraBatchWithTheRemaining public void shouldNeverReturnNewBatchesIfTheProgressionHasReturnFalseToSignalTermination() throws Throwable { // given - AllNodeScan progression = new AllNodeScan( nodeStore ); + AllNodeProgression progression = new AllNodeProgression( nodeStore ); assertTrue( progression.nextBatch( batch ) ); checkBatch( start, end ); @@ -100,7 +100,7 @@ private void checkBatch( long start, long end ) } } - private void assertNoMoreValueInBatchAndProgression( AllNodeScan progression ) + private void assertNoMoreValueInBatchAndProgression( AllNodeProgression progression ) { assertFalse( batch.hasNext() ); assertFalse( progression.nextBatch( batch ) ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java index 7d85eb2ec77d8..91ce584006a90 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/NodeCursorTest.java @@ -41,7 +41,6 @@ import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.impl.store.record.NodeRecord; import org.neo4j.kernel.impl.util.IoPrimitiveUtils; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.txstate.NodeTransactionStateView; import org.neo4j.storageengine.api.txstate.ReadableTransactionState; diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/SingleNodeFetchTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/SingleNodeProgressionTest.java similarity index 92% rename from community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/SingleNodeFetchTest.java rename to community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/SingleNodeProgressionTest.java index 0a962857ac5f4..4a7a847719b57 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/SingleNodeFetchTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/SingleNodeProgressionTest.java @@ -25,14 +25,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -public class SingleNodeFetchTest +public class SingleNodeProgressionTest { @Test public void shouldReturnOnlyTheGivenNodeId() throws Throwable { // given long nodeId = 42L; - SingleNodeFetch progression = new SingleNodeFetch( nodeId ); + SingleNodeProgression progression = new SingleNodeProgression( nodeId ); Batch batch = new Batch(); // when / then diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerLabelTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerLabelTest.java index 3317fa1ef3a01..42f2b2c75a526 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerLabelTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerLabelTest.java @@ -58,7 +58,7 @@ public void should_be_able_to_list_labels_for_node() throws Exception } // THEN - disk.newStatement().acquireNodeCursor( new SingleNodeFetch( nodeId ), EMPTY ).forAll( + disk.newStatement().acquireNodeCursor( new SingleNodeProgression( nodeId ), EMPTY ).forAll( node -> assertEquals( PrimitiveIntCollections.asSet( new int[]{labelId1, labelId2} ), node.labels() ) ); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerNodeAndRelTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerNodeAndRelTest.java index 509ea756fce22..8150b585080ea 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerNodeAndRelTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerNodeAndRelTest.java @@ -26,6 +26,7 @@ import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.RelationshipItem; import org.neo4j.storageengine.api.StorageStatement; +import org.neo4j.storageengine.api.txstate.ReadableTransactionState; import static junit.framework.Assert.assertFalse; import static junit.framework.TestCase.assertTrue; @@ -92,7 +93,7 @@ private boolean nodeExists( long id ) { try ( StorageStatement statement = disk.newStatement() ) { - try ( Cursor node = statement.acquireNodeCursor( new SingleNodeFetch( id ), EMPTY ) ) + try ( Cursor node = statement.acquireNodeCursor( new SingleNodeProgression( id ), EMPTY ) ) { return node.next(); } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerPropertyTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerPropertyTest.java index 5ea222797729c..e4814afbc0d80 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerPropertyTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerPropertyTest.java @@ -98,7 +98,7 @@ public void should_get_all_node_properties() throws Exception long nodeId = createLabeledNode( db, singletonMap( "prop", value ), label1 ).getId(); // when - try ( Cursor node = statement.acquireNodeCursor( new SingleNodeFetch( nodeId ), EMPTY ) ) + try ( Cursor node = statement.acquireNodeCursor( new SingleNodeProgression( nodeId ), EMPTY ) ) { node.next(); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java index 05b63c7f3efb3..2a352a330963f 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StorageLayerRelTypesAndDegreeTest.java @@ -472,7 +472,7 @@ private NodeCursor newCursor( long nodeId ) { NodeCursor cursor = new NodeCursor( resolveNeoStores().getNodeStore(), mock( Consumer.class ), NO_LOCK_SERVICE ); - cursor.init( new SingleNodeFetch( nodeId ), EMPTY ); + cursor.init( new SingleNodeProgression( nodeId ), EMPTY ); assertTrue( cursor.next() ); return cursor; } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java index 11de15779adc6..122ee51781905 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/coreapi/TxStateTransactionDataViewTest.java @@ -40,11 +40,11 @@ import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; import org.neo4j.kernel.impl.api.state.TxState; +import org.neo4j.kernel.impl.api.store.BatchingLongProgression; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.core.NodeProxy; import org.neo4j.kernel.impl.core.RelationshipProxy; import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; -import org.neo4j.storageengine.api.BatchingLongProgression; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.RelationshipItem; import org.neo4j.storageengine.api.StoreReadLayer; @@ -58,6 +58,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.neo4j.helpers.collection.Iterables.single; @@ -106,13 +107,13 @@ public void showsDeletedNodes() throws Exception state.nodeDoDelete( 2L ); NodeItem node1 = asNode( 2L, 20L, labels( 15 ) ); - when( ops.nodeGetSingleCursor( storeStatement, 2L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node1 ) ); + when( ops.nodeCursor( storeStatement, 2L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node1 ) ); when( ops.nodeGetProperties( storeStatement, node1, NodeState.EMPTY ) ) .thenReturn( asPropertyCursor( stringProperty( 1, "p" ) ) ); NodeItem node2 = asNode( 1L, 21L, labels() ); - when( ops.nodeGetSingleCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node2 ) ); + when( ops.nodeCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node2 ) ); when( ops.nodeGetProperties( storeStatement, node2, NodeState.EMPTY ) ).thenReturn( asPropertyCursor() ); when( ops.propertyKeyGetName( 1 ) ).thenReturn( "key" ); @@ -171,7 +172,7 @@ public void correctlySaysNodeIsDeleted() throws Exception Node node = mock( Node.class ); when( node.getId() ).thenReturn( 1L ); NodeItem nodeItem = asNode( 1 ); - when( ops.nodeGetSingleCursor( storeStatement, 1, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( nodeItem ) ); + when( ops.nodeCursor( storeStatement, 1, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( nodeItem ) ); when( ops.nodeGetProperties( storeStatement, nodeItem, NodeState.EMPTY ) ).thenReturn( asPropertyCursor() ); // When & Then @@ -206,7 +207,7 @@ public void shouldListAddedNodePropertiesProperties() throws Exception when( ops.propertyKeyGetName( propertyKeyId ) ).thenReturn( "theKey" ); long propertyId = 20L; NodeItem node = asNode( 1L, propertyId, labels() ); - when( ops.nodeGetSingleCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); + when( ops.nodeCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); when( ops.nodeGetProperty( storeStatement, node, propertyKeyId, NodeState.EMPTY ) ) .thenReturn( asPropertyCursor( prevProp ) ); @@ -231,7 +232,7 @@ public void shouldListRemovedNodeProperties() throws Exception when( ops.propertyKeyGetName( propertyKeyId ) ).thenReturn( "theKey" ); long propertyId = 20L; NodeItem node = asNode( 1L, propertyId, labels() ); - when( ops.nodeGetSingleCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); + when( ops.nodeCursor( storeStatement, 1L, ReadableTransactionState.EMPTY ) ).thenReturn( cursor( node ) ); when( ops.nodeGetProperty( storeStatement, node, propertyKeyId, NodeState.EMPTY ) ) .thenReturn( asPropertyCursor( prevProp ) ); diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java index 481eb5be6a43f..16ddff86775ad 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/store/NeoStoresTest.java @@ -57,7 +57,7 @@ import org.neo4j.kernel.impl.api.KernelStatement; import org.neo4j.kernel.impl.api.KernelTransactionImplementation; import org.neo4j.kernel.impl.api.RelationshipVisitor; -import org.neo4j.kernel.impl.api.store.SingleNodeFetch; +import org.neo4j.kernel.impl.api.store.SingleNodeProgression; import org.neo4j.kernel.impl.core.RelationshipTypeToken; import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine; import org.neo4j.kernel.impl.store.MetaDataStore.Position; @@ -302,7 +302,7 @@ private DefinedProperty nodeAddProperty( long nodeId, int key, Object value ) DefinedProperty property = Property.property( key, value ); DefinedProperty oldProperty = NO_SUCH_PROPERTY; try ( StorageStatement statement = storeLayer.newStatement(); - Cursor cursor = statement.acquireNodeCursor( new SingleNodeFetch( nodeId ), EMPTY ) ) + Cursor cursor = statement.acquireNodeCursor( new SingleNodeProgression( nodeId ), EMPTY ) ) { if ( cursor.next() ) { @@ -1032,7 +1032,7 @@ private int validateAndCountRelationships( long node, long rel1, long rel2, int int count = 0; try ( KernelStatement statement = (KernelStatement) tx.acquireStatement(); Cursor nodeCursor = statement.storageStatement() - .acquireNodeCursor( new SingleNodeFetch( node ), EMPTY ) ) + .acquireNodeCursor( new SingleNodeProgression( node ), EMPTY ) ) { nodeCursor.next(); @@ -1117,7 +1117,7 @@ else if ( data.propertyKeyId() == prop3.propertyKeyId() ) try ( KernelStatement statement = (KernelStatement) tx.acquireStatement(); Cursor nodeCursor = statement.storageStatement() - .acquireNodeCursor( new SingleNodeFetch( node ), EMPTY ) ) + .acquireNodeCursor( new SingleNodeProgression( node ), EMPTY ) ) { nodeCursor.next(); NodeItem nodeItem = nodeCursor.get(); @@ -1153,7 +1153,7 @@ private boolean nodeExists( long nodeId ) { try ( StorageStatement statement = storeLayer.newStatement() ) { - try ( Cursor node = statement.acquireNodeCursor( new SingleNodeFetch( nodeId ), EMPTY ) ) + try ( Cursor node = statement.acquireNodeCursor( new SingleNodeProgression( nodeId ), EMPTY ) ) { return node.next(); } @@ -1411,7 +1411,7 @@ private void assertHasRelationships( long node ) { try ( KernelStatement statement = (KernelStatement) tx.acquireStatement(); Cursor nodeCursor = statement.storageStatement() - .acquireNodeCursor( new SingleNodeFetch( node ), EMPTY ) ) + .acquireNodeCursor( new SingleNodeProgression( node ), EMPTY ) ) { nodeCursor.next(); NodeItem nodeItem = nodeCursor.get(); @@ -1535,7 +1535,7 @@ private void deleteRelationships( long nodeId ) throws Exception { try ( KernelStatement statement = (KernelStatement) tx.acquireStatement(); Cursor nodeCursor = statement.storageStatement() - .acquireNodeCursor( new SingleNodeFetch( nodeId ), EMPTY ) ) + .acquireNodeCursor( new SingleNodeProgression( nodeId ), EMPTY ) ) { assertTrue( nodeCursor.next() ); NodeItem nodeItem = nodeCursor.get(); diff --git a/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java b/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java index fdf55191ea19d..980afe55f88ea 100644 --- a/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java +++ b/community/kernel/src/test/java/org/neo4j/test/rule/NeoStoreDataSourceRule.java @@ -39,7 +39,6 @@ import org.neo4j.kernel.impl.api.legacyindex.InternalAutoIndexing; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; import org.neo4j.kernel.impl.api.scan.NativeLabelScanStoreExtension; -import org.neo4j.kernel.impl.api.store.CommunityBatchingProgressionFactory; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; @@ -143,7 +142,7 @@ TransactionHeaderInformationFactory.DEFAULT, new StartupStatisticsProvider(), nu new StandardConstraintSemantics(), StoreStatement::new, monitors, new Tracers( "null", NullLog.getInstance(), monitors, jobScheduler ), mock( Procedures.class ), IOLimiter.unlimited(), new AvailabilityGuard( clock, NullLog.getInstance() ), clock, new CanWrite(), - new StoreCopyCheckPointMutex(), new CommunityBatchingProgressionFactory(), + new StoreCopyCheckPointMutex(), GroupingRecoveryCleanupWorkCollector.IMMEDIATE ); return dataSource; diff --git a/community/kernel/src/test/java/org/neo4j/test/rule/RecordStorageEngineRule.java b/community/kernel/src/test/java/org/neo4j/test/rule/RecordStorageEngineRule.java index acb6bdb8f1b1d..d161630fedfe9 100644 --- a/community/kernel/src/test/java/org/neo4j/test/rule/RecordStorageEngineRule.java +++ b/community/kernel/src/test/java/org/neo4j/test/rule/RecordStorageEngineRule.java @@ -35,7 +35,6 @@ import org.neo4j.kernel.impl.api.LegacyIndexProviderLookup; import org.neo4j.kernel.impl.api.index.IndexingService; import org.neo4j.kernel.impl.api.scan.LabelScanStoreProvider; -import org.neo4j.kernel.impl.api.store.CommunityBatchingProgressionFactory; import org.neo4j.kernel.impl.api.store.StoreStatement; import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.constraints.StandardConstraintSemantics; @@ -216,7 +215,7 @@ private class ExtendedRecordStorageEngine extends RecordStorageEngine constraintSemantics, storageStatementFactory, scheduler, tokenNameLookup, lockService, indexProvider, indexingServiceMonitor, databaseHealth, labelScanStoreProvider, legacyIndexProviderLookup, indexConfigStore, legacyIndexTransactionOrdering, - transactionsSnapshotSupplier, new CommunityBatchingProgressionFactory() ); + transactionsSnapshotSupplier ); this.transactionApplierTransformer = transactionApplierTransformer; } diff --git a/community/primitive-collections/src/main/java/org/neo4j/collection/pool/LinkedQueuePool.java b/community/primitive-collections/src/main/java/org/neo4j/collection/pool/LinkedQueuePool.java index 50250e8af5211..6fa902cdcce93 100644 --- a/community/primitive-collections/src/main/java/org/neo4j/collection/pool/LinkedQueuePool.java +++ b/community/primitive-collections/src/main/java/org/neo4j/collection/pool/LinkedQueuePool.java @@ -24,7 +24,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongSupplier; -public abstract class LinkedQueuePool implements Pool +import org.neo4j.function.Factory; + +public class LinkedQueuePool implements Pool { public interface Monitor { @@ -103,34 +105,39 @@ public boolean shouldCheck() } } - private static final int DEFAULT_CHECK_INTERVAL = 60 * 1000; + public static final int DEFAULT_CHECK_INTERVAL = 60 * 1000; private final Queue unused = new ConcurrentLinkedQueue<>(); - private final Monitor monitor; + private final Monitor monitor; private final int minSize; + private final Factory factory; private final CheckStrategy checkStrategy; // Guarded by nothing. Those are estimates, losing some values doesn't matter much - private final AtomicInteger allocated = new AtomicInteger(); - private final AtomicInteger queueSize = new AtomicInteger(); + private final AtomicInteger allocated = new AtomicInteger( 0 ); + private final AtomicInteger queueSize = new AtomicInteger( 0 ); private int currentPeakSize; private int targetSize; - public LinkedQueuePool( int minSize ) + public LinkedQueuePool( int minSize, Factory factory ) { - this( minSize, new CheckStrategy.TimeoutCheckStrategy( DEFAULT_CHECK_INTERVAL ), - new Monitor.Adapter<>() ); + this( minSize, factory, new CheckStrategy.TimeoutCheckStrategy( DEFAULT_CHECK_INTERVAL ), + new Monitor.Adapter() ); } - LinkedQueuePool( int minSize, CheckStrategy strategy, Monitor monitor ) + public LinkedQueuePool( int minSize, Factory factory, CheckStrategy strategy, Monitor monitor ) { this.minSize = minSize; + this.factory = factory; this.currentPeakSize = 0; this.targetSize = minSize; this.checkStrategy = strategy; this.monitor = monitor; } - protected abstract R create(); + protected R create() + { + return factory.newInstance(); + } protected void dispose( R resource ) { diff --git a/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java b/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java index 8775b8b80fa23..9ba8c8c331cc9 100644 --- a/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java +++ b/community/primitive-collections/src/main/java/org/neo4j/collection/pool/MarshlandPool.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.neo4j.function.Factory; + import static java.util.Collections.newSetFromMap; /** @@ -65,6 +67,11 @@ protected LocalSlot initialValue() newSetFromMap( new ConcurrentHashMap() ); private final ReferenceQueue> objectsFromDeadThreads = new ReferenceQueue<>(); + public MarshlandPool( Factory objectFactory ) + { + this( new LinkedQueuePool<>( 4, objectFactory ) ); + } + public MarshlandPool( Pool delegatePool ) { this.pool = delegatePool; @@ -84,7 +91,6 @@ public T acquire() } // Try the reference queue, containing objects from dead threads - @SuppressWarnings( "unchecked" ) LocalSlotReference slotReference = (LocalSlotReference) objectsFromDeadThreads.poll(); if ( slotReference != null && slotReference.object != null ) { @@ -106,9 +112,10 @@ public void release( T obj ) { localSlot.set( obj ); } + + // Fall back to the delegate pool else { - // Fall back to the delegate pool pool.release( obj ); } } @@ -116,7 +123,6 @@ public void release( T obj ) /** * Dispose of all objects in this pool, releasing them back to the delegate pool */ - @SuppressWarnings( "unchecked" ) public void disposeAll() { for ( LocalSlotReference slotReference : slotReferences ) @@ -145,6 +151,11 @@ public void disposeAll() } } + public void close() + { + disposeAll(); + } + /** * This is used to trigger the GC to notify us whenever the thread local has been garbage collected. */ diff --git a/community/primitive-collections/src/test/java/org/neo4j/collection/pool/LinkedQueuePoolTest.java b/community/primitive-collections/src/test/java/org/neo4j/collection/pool/LinkedQueuePoolTest.java index aef835003b841..caf6e65b6f9ca 100644 --- a/community/primitive-collections/src/test/java/org/neo4j/collection/pool/LinkedQueuePoolTest.java +++ b/community/primitive-collections/src/test/java/org/neo4j/collection/pool/LinkedQueuePoolTest.java @@ -300,15 +300,8 @@ private void buildAPeakOfAcquiredFlyweightsAndTriggerAlarmWithSideEffects( int M private LinkedQueuePool getLinkedQueuePool( StatefulMonitor stateMonitor, FakeClock clock, int minSize ) { - return new LinkedQueuePool( minSize, - new LinkedQueuePool.CheckStrategy.TimeoutCheckStrategy( 100, clock ), stateMonitor ) - { - @Override - protected Object create() - { - return new Object(); - } - }; + return new LinkedQueuePool<>( minSize, Object::new, + new LinkedQueuePool.CheckStrategy.TimeoutCheckStrategy( 100, clock ), stateMonitor ); } private List> acquireFromPool( final LinkedQueuePool pool, int times ) diff --git a/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java b/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java index a614512fdad73..ca3aedc52d46d 100644 --- a/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java +++ b/community/primitive-collections/src/test/java/org/neo4j/collection/pool/MarshlandPoolTest.java @@ -76,7 +76,7 @@ public void shouldReturnToDelegatePoolIfLocalPoolIsFull() throws Exception } @Test - public void shouldReleaseAllSlotsOnDisposeAll() throws Exception + public void shouldReleaseAllSlotsOnClose() throws Exception { // Given Pool delegatePool = mock( Pool.class ); @@ -88,7 +88,7 @@ public void shouldReleaseAllSlotsOnDisposeAll() throws Exception pool.release( first ); // When - pool.disposeAll(); + pool.close(); // Then verify( delegatePool, times( 1 ) ).acquire(); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java index 0220d3efd0407..afb97c4dc99f5 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/EnterpriseCoreEditionModule.java @@ -70,8 +70,7 @@ import org.neo4j.kernel.enterprise.builtinprocs.EnterpriseBuiltInDbmsProcedures; import org.neo4j.kernel.impl.api.SchemaWriteGuard; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; -import org.neo4j.kernel.impl.api.store.EnterpriseBatchingProgressionFactory; -import org.neo4j.kernel.impl.api.store.StoreStatement; +import org.neo4j.kernel.impl.api.store.EnterpriseStoreStatement; import org.neo4j.kernel.impl.coreapi.CoreAPIAvailabilityGuard; import org.neo4j.kernel.impl.enterprise.EnterpriseConstraintSemantics; import org.neo4j.kernel.impl.enterprise.EnterpriseEditionModule; @@ -283,9 +282,7 @@ private void editionInvariants( PlatformModule platformModule, Dependencies depe constraintSemantics = new EnterpriseConstraintSemantics(); - storageStatementFactory = StoreStatement::new; - - progressionFactory = dependencies.satisfyDependency( new EnterpriseBatchingProgressionFactory() ); + storageStatementFactory = EnterpriseStoreStatement::new; coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 68fc51e2c8988..c83d933954fb0 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -63,8 +63,7 @@ import org.neo4j.kernel.impl.api.ReadOnlyTransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionRepresentationCommitProcess; -import org.neo4j.kernel.impl.api.store.EnterpriseBatchingProgressionFactory; -import org.neo4j.kernel.impl.api.store.StoreStatement; +import org.neo4j.kernel.impl.api.store.EnterpriseStoreStatement; import org.neo4j.kernel.impl.core.DelegatingLabelTokenHolder; import org.neo4j.kernel.impl.core.DelegatingPropertyKeyTokenHolder; import org.neo4j.kernel.impl.core.DelegatingRelationshipTypeTokenHolder; @@ -166,9 +165,7 @@ public class EnterpriseReadReplicaEditionModule extends EditionModule constraintSemantics = new EnterpriseConstraintSemantics(); - storageStatementFactory = StoreStatement::new; - - progressionFactory = dependencies.satisfyDependency( new EnterpriseBatchingProgressionFactory() ); + storageStatementFactory = EnterpriseStoreStatement::new; coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); diff --git a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java index bbc6df8d92497..3e8d17ac65546 100644 --- a/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java +++ b/enterprise/ha/src/main/java/org/neo4j/kernel/ha/factory/HighlyAvailableEditionModule.java @@ -128,8 +128,7 @@ import org.neo4j.kernel.impl.api.CommitProcessFactory; import org.neo4j.kernel.impl.api.TransactionCommitProcess; import org.neo4j.kernel.impl.api.TransactionHeaderInformation; -import org.neo4j.kernel.impl.api.store.EnterpriseBatchingProgressionFactory; -import org.neo4j.kernel.impl.api.store.StoreStatement; +import org.neo4j.kernel.impl.api.store.EnterpriseStoreStatement; import org.neo4j.kernel.impl.core.DelegatingLabelTokenHolder; import org.neo4j.kernel.impl.core.DelegatingPropertyKeyTokenHolder; import org.neo4j.kernel.impl.core.DelegatingRelationshipTypeTokenHolder; @@ -525,9 +524,7 @@ public void elected( String role, InstanceId instanceId, URI electedMember ) constraintSemantics = new EnterpriseConstraintSemantics(); - storageStatementFactory = StoreStatement::new; - - progressionFactory = dependencies.satisfyDependency( new EnterpriseBatchingProgressionFactory() ); + storageStatementFactory = EnterpriseStoreStatement::new; coreAPIAvailabilityGuard = new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/EnterpriseBatchingProgressionFactory.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/EnterpriseStoreStatement.java similarity index 53% rename from enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/EnterpriseBatchingProgressionFactory.java rename to enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/EnterpriseStoreStatement.java index 0098eec02e0e8..15bcf6581e748 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/EnterpriseBatchingProgressionFactory.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/EnterpriseStoreStatement.java @@ -19,14 +19,28 @@ */ package org.neo4j.kernel.impl.api.store; +import java.util.function.Supplier; + +import org.neo4j.kernel.impl.api.IndexReaderFactory; +import org.neo4j.kernel.impl.locking.LockService; +import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.storageengine.api.BatchingLongProgression; +import org.neo4j.storageengine.api.schema.LabelScanReader; -public class EnterpriseBatchingProgressionFactory extends CommunityBatchingProgressionFactory +public class EnterpriseStoreStatement extends StoreStatement { + private final NodeStore nodeStore; + + public EnterpriseStoreStatement( NeoStores neoStores, Supplier indexReaderFactory, + Supplier labelScanReaderSupplier, LockService lockService ) + { + super( neoStores, indexReaderFactory, labelScanReaderSupplier, lockService ); + this.nodeStore = neoStores.getNodeStore(); + } + @Override - public BatchingLongProgression parallelAllNodeScan( NodeStore nodeStore ) + public BatchingLongProgression parallelNodeScanProgression() { - return new ParallelAllNodeScan( nodeStore ); + return new ParallelAllNodeProgression( nodeStore ); } } diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeScan.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeProgression.java similarity index 94% rename from enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeScan.java rename to enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeProgression.java index 991c998b903de..e33ea44562e8f 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeScan.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeProgression.java @@ -23,9 +23,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.storageengine.api.BatchingLongProgression; -class ParallelAllNodeScan implements BatchingLongProgression +class ParallelAllNodeProgression implements BatchingLongProgression { private final NodeStore nodeStore; private final AtomicLong nextPageId; @@ -34,7 +33,7 @@ class ParallelAllNodeScan implements BatchingLongProgression private final AtomicBoolean done = new AtomicBoolean(); private final AtomicBoolean append = new AtomicBoolean( true ); - ParallelAllNodeScan( NodeStore nodeStore ) + ParallelAllNodeProgression( NodeStore nodeStore ) { this.nodeStore = nodeStore; // start from the page containing the first non reserved id diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/EnterpriseEditionModule.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/EnterpriseEditionModule.java index 634975a9022b6..20f92dce5dd0b 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/EnterpriseEditionModule.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/EnterpriseEditionModule.java @@ -26,7 +26,7 @@ import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.enterprise.api.security.EnterpriseAuthManager; import org.neo4j.kernel.enterprise.builtinprocs.EnterpriseBuiltInDbmsProcedures; -import org.neo4j.kernel.impl.api.store.EnterpriseBatchingProgressionFactory; +import org.neo4j.kernel.impl.api.store.EnterpriseStoreStatement; import org.neo4j.kernel.impl.constraints.ConstraintSemantics; import org.neo4j.kernel.impl.enterprise.configuration.EnterpriseEditionSettings; import org.neo4j.kernel.impl.enterprise.id.EnterpriseIdTypeConfigurationProvider; @@ -39,9 +39,9 @@ import org.neo4j.kernel.impl.locking.StatementLocksFactory; import org.neo4j.kernel.impl.logging.LogService; import org.neo4j.kernel.impl.proc.Procedures; +import org.neo4j.kernel.impl.storageengine.impl.recordstorage.StorageStatementFactory; import org.neo4j.kernel.impl.store.id.configuration.IdTypeConfigurationProvider; import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; -import org.neo4j.storageengine.api.BatchingProgressionFactory; /** * This implementation of {@link EditionModule} creates the implementations of services @@ -76,9 +76,9 @@ protected ConstraintSemantics createSchemaRuleVerifier() } @Override - protected BatchingProgressionFactory createProgressionFactory() + protected StorageStatementFactory createStorageStatementFactory() { - return new EnterpriseBatchingProgressionFactory(); + return EnterpriseStoreStatement::new; } @Override diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/PropertyExistenceEnforcer.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/PropertyExistenceEnforcer.java index e5bb2821e3f3e..ccf3a42f8e73e 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/PropertyExistenceEnforcer.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/PropertyExistenceEnforcer.java @@ -38,7 +38,7 @@ import org.neo4j.kernel.api.schema.RelationTypeSchemaDescriptor; import org.neo4j.kernel.api.schema.SchemaProcessor; import org.neo4j.kernel.api.schema.constaints.ConstraintDescriptor; -import org.neo4j.kernel.impl.api.store.SingleNodeFetch; +import org.neo4j.kernel.impl.api.store.SingleNodeProgression; import org.neo4j.kernel.impl.locking.Lock; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.PropertyItem; @@ -213,7 +213,7 @@ private void validateNode( long nodeId ) throws NodePropertyExistenceException PrimitiveIntSet labelIds; try ( Cursor node = storeStatement() - .acquireNodeCursor( new SingleNodeFetch( nodeId ), txState ) ) + .acquireNodeCursor( new SingleNodeProgression( nodeId ), txState ) ) { if ( node.next() ) { diff --git a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLockManager.java b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLockManager.java index cb15fea56b5ae..5728d2c41e775 100644 --- a/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLockManager.java +++ b/enterprise/kernel/src/main/java/org/neo4j/kernel/impl/enterprise/lock/forseti/ForsetiLockManager.java @@ -254,7 +254,7 @@ public void close() private static class ForsetiClientFlyweightPool extends LinkedQueuePool { /** Client id counter **/ - private final AtomicInteger clientIds = new AtomicInteger(); + private final AtomicInteger clientIds = new AtomicInteger( 0 ); /** Re-use ids, forseti uses these in arrays, so we want to keep them low and not loose them. */ // TODO we could use a synchronised SimpleBitSet instead, since we know that we only care about reusing a @@ -270,7 +270,7 @@ private static class ForsetiClientFlyweightPool extends LinkedQueuePool[] lockMaps, WaitStrategy[] waitStrategies ) { - super( 128 ); + super( 128, null ); this.config = config; this.clock = clock; this.lockMaps = lockMaps; diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java deleted file mode 100644 index 1539313090409..0000000000000 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/integrationtest/ParallelNodeScanReadOperationsIntegrationTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Neo4j is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -package org.neo4j.kernel.impl.api.integrationtest; - -import org.junit.Rule; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.function.Predicate; - -import org.neo4j.cursor.Cursor; -import org.neo4j.graphdb.GraphDatabaseService; -import org.neo4j.graphdb.NotFoundException; -import org.neo4j.graphdb.Transaction; -import org.neo4j.helpers.Exceptions; -import org.neo4j.kernel.api.KernelTransaction; -import org.neo4j.kernel.api.ReadOperations; -import org.neo4j.kernel.api.Statement; -import org.neo4j.kernel.impl.core.ThreadToStatementContextBridge; -import org.neo4j.kernel.impl.storageengine.impl.recordstorage.RecordStorageEngine; -import org.neo4j.kernel.impl.store.NodeStore; -import org.neo4j.kernel.internal.GraphDatabaseAPI; -import org.neo4j.storageengine.api.BatchingLongProgression; -import org.neo4j.storageengine.api.NodeItem; -import org.neo4j.test.rule.DatabaseRule; -import org.neo4j.test.rule.EnterpriseDatabaseRule; -import org.neo4j.test.rule.RandomRule; - -import static java.util.Collections.disjoint; -import static java.util.stream.Collectors.toList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class ParallelNodeScanReadOperationsIntegrationTest -{ - @Rule - public DatabaseRule databaseRule = new EnterpriseDatabaseRule(); - - @Rule - public final RandomRule random = new RandomRule(); - - @Test - public void shouldRunParallelNodeScanThroughTheReadOperationsUsingTransactions() throws Throwable - { - GraphDatabaseAPI db = databaseRule.getGraphDatabaseAPI(); - createNodes( db, randomNodes( nodeStore( db ) ) ); - - int threads = random.nextInt( 2, 6 ); - ExecutorService executorService = Executors.newFixedThreadPool( threads ); - - Set singleThreadExecutionResult = new HashSet<>(); - Set parallelExecutionResult = new HashSet<>(); - ThreadToStatementContextBridge bridge = - db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ); - - try ( Transaction tx = db.beginTx() ) - { - KernelTransaction ktx = bridge.getKernelTransactionBoundToThisThread( true ); - - try ( Statement statement = ktx.acquireStatement() ) - { - statement.readOperations().nodeGetAllCursor().forAll( n -> singleThreadExecutionResult.add( n.id() ) ); - } - - try ( Statement statement = ktx.acquireStatement() ) - { - parallelExecution( statement.readOperations(), executorService, threads, parallelExecutionResult ); - } - - tx.success(); - } - - assertEquals( singleThreadExecutionResult, parallelExecutionResult ); - } - - @Test - public void shouldRunParallelNodeScanWithTxStateChangesThroughTheReadOperationsUsingTransactions() throws Throwable - { - GraphDatabaseAPI db = databaseRule.getGraphDatabaseAPI(); - int nodes = randomNodes( nodeStore( db ) ); - createNodes( db, nodes ); - int threads = random.nextInt( 2, 6 ); - ExecutorService executorService = Executors.newFixedThreadPool( threads ); - - Set singleThreadExecutionResult = new HashSet<>(); - Set parallelExecutionResult = new HashSet<>(); - ThreadToStatementContextBridge bridge = - db.getDependencyResolver().resolveDependency( ThreadToStatementContextBridge.class ); - - try ( Transaction tx = db.beginTx() ) - { - createTemporaryNodesAndDeleteSomeOfTheExistingNodes( db, nodes ); - KernelTransaction ktx = bridge.getKernelTransactionBoundToThisThread( true ); - - try ( Statement statement = ktx.acquireStatement() ) - { - statement.readOperations().nodeGetAllCursor().forAll( n -> singleThreadExecutionResult.add( n.id() ) ); - } - - try ( Statement statement = ktx.acquireStatement() ) - { - parallelExecution( statement.readOperations(), executorService, threads, parallelExecutionResult ); - } - - tx.success(); - } - - assertEquals( message( singleThreadExecutionResult, parallelExecutionResult ), singleThreadExecutionResult, - parallelExecutionResult ); - } - - private void parallelExecution( ReadOperations readOperations, ExecutorService executorService, int threads, - Set parallelExecutionResult ) throws Throwable - { - try - { - BatchingLongProgression progression = readOperations.parallelNodeScan(); - - @SuppressWarnings( "unchecked" ) - Future>[] futures = new Future[threads]; - for ( int i = 0; i < threads; i++ ) - { - futures[i] = executorService.submit( () -> - { - HashSet ids = new HashSet<>(); - try ( Cursor cursor = readOperations.nodeGeCursor( progression ) ) - { - while ( cursor.next() ) - { - long nodeId = cursor.get().id(); - assertTrue( ids.add( nodeId ) ); - } - } - return ids; - } ); - } - - Throwable t = null; - for ( int i = 0; i < threads; i++ ) - { - try - { - Set set = futures[i].get(); - assertTrue( i + "" + message( parallelExecutionResult, set ), - disjoint( parallelExecutionResult, set ) ); - parallelExecutionResult.addAll( set ); - } - catch ( Throwable current ) - { - current.printStackTrace(); - t = Exceptions.chain( t, current ); - } - } - - if ( t != null ) - { - throw t; - } - } - finally - { - executorService.shutdown(); - } - } - - private String message( Set expected, Set actual ) - { - List sortedExpected = expected.stream().sorted().collect( toList() ); - List sortedActual = actual.stream().sorted().collect( toList() ); - - List missingInActual = - expected.stream().filter( ((Predicate) actual::contains).negate() ).sorted().collect( toList() ); - List extraInActual = - actual.stream().filter( ((Predicate) expected::contains).negate() ).sorted().collect( toList() ); - - List intersect = expected.stream().filter( actual::contains ).sorted().collect( toList() ); - - String expectedResult = "Expected: " + sortedExpected; - String actualResult = "Actual: " + sortedActual; - String diff = "Missing: " + missingInActual + "\n" + "Extra: " + extraInActual; - String overlap = "Overlap: " + intersect; - return expectedResult + "\n" + actualResult + "\n" + diff + "\n" + overlap; - } - - private NodeStore nodeStore( GraphDatabaseAPI db ) - { - return db.getDependencyResolver().resolveDependency( RecordStorageEngine.class ) - .testAccessNeoStores().getNodeStore(); - } - - private int randomNodes( NodeStore nodeStore ) - { - int recordsPerPage = nodeStore.getRecordsPerPage(); - int pages = random.nextInt( 40, 1000 ); - int nonAlignedRecords = random.nextInt( 0, recordsPerPage - 1 ); - return pages * recordsPerPage + nonAlignedRecords; - } - - private void createTemporaryNodesAndDeleteSomeOfTheExistingNodes( GraphDatabaseService db, int nodes ) - { - for ( long i = 0; i < 100; i++ ) - { - db.createNode(); - } - - List deleted = new ArrayList<>( 100 ); - for ( int i = 0; i < 100; i++ ) - { - try - { - long id = random.nextLong( 0, nodes ); - db.getNodeById( id ).delete(); - deleted.add( id ); - } - catch ( NotFoundException ex ) - { - // already deleted, it doesn't matter let's proceed - } - } - Collections.sort( deleted ); - } - - private void createNodes( GraphDatabaseAPI db, int nodes ) - { - try ( Transaction tx = db.beginTx() ) - { - for ( int j = 0; j < nodes; j++ ) - { - db.createNode().getId(); - } - tx.success(); - } - } -} diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StoreParallelNodeScanIntegrationTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/EnterpriseStoreStatementTest.java similarity index 79% rename from enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StoreParallelNodeScanIntegrationTest.java rename to enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/EnterpriseStoreStatementTest.java index 888bc245d393a..1a52194054fc9 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/StoreParallelNodeScanIntegrationTest.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/EnterpriseStoreStatementTest.java @@ -36,8 +36,6 @@ import org.neo4j.kernel.impl.store.NeoStores; import org.neo4j.kernel.impl.store.NodeStore; import org.neo4j.kernel.internal.GraphDatabaseAPI; -import org.neo4j.storageengine.api.BatchingLongProgression; -import org.neo4j.storageengine.api.BatchingProgressionFactory; import org.neo4j.storageengine.api.NodeItem; import org.neo4j.storageengine.api.txstate.NodeTransactionStateView; import org.neo4j.test.rule.EnterpriseDatabaseRule; @@ -49,7 +47,7 @@ import static org.neo4j.kernel.impl.locking.LockService.NO_LOCK_SERVICE; import static org.neo4j.storageengine.api.txstate.ReadableTransactionState.EMPTY; -public class StoreParallelNodeScanIntegrationTest +public class EnterpriseStoreStatementTest { @Rule public final EnterpriseDatabaseRule databaseRule = new EnterpriseDatabaseRule(); @@ -60,25 +58,23 @@ public class StoreParallelNodeScanIntegrationTest public void parallelScanShouldProvideTheSameResultAsANormalScan() throws Throwable { GraphDatabaseAPI db = databaseRule.getGraphDatabaseAPI(); - BatchingProgressionFactory progressionFactory = - db.getDependencyResolver().resolveDependency( BatchingProgressionFactory.class ); NeoStores neoStores = db.getDependencyResolver().resolveDependency( RecordStorageEngine.class ).testAccessNeoStores(); int nodes = randomNodes( neoStores.getNodeStore() ); createNodes( db, nodes ); - Set expected = singleThreadExecution( neoStores, progressionFactory, EMPTY ); + Set expected = singleThreadExecution( neoStores, EMPTY ); int threads = random.nextInt( 2, 6 ); - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executorService = Executors.newCachedThreadPool(); try { - Set parallelResult = parallelExecution( neoStores, executor, threads, progressionFactory, EMPTY ); + Set parallelResult = parallelExecution( neoStores, executorService, threads, EMPTY ); assertEquals( expected, parallelResult ); } finally { - executor.shutdown(); + executorService.shutdown(); } } @@ -87,8 +83,6 @@ public void parallelScanWithTxStateChangesShouldProvideTheSameResultAsANormalSca throws Throwable { GraphDatabaseAPI db = databaseRule.getGraphDatabaseAPI(); - BatchingProgressionFactory progressionFactory = - db.getDependencyResolver().resolveDependency( BatchingProgressionFactory.class ); NeoStores neoStores = db.getDependencyResolver().resolveDependency( RecordStorageEngine.class ).testAccessNeoStores(); int nodes = randomNodes( neoStores.getNodeStore() ); @@ -96,31 +90,31 @@ public void parallelScanWithTxStateChangesShouldProvideTheSameResultAsANormalSca TxState txState = crateTxStateWithRandomAddedAndDeletedNodes( nodes, lastNodeId ); - Set expected = singleThreadExecution( neoStores, progressionFactory, txState ); + Set expected = singleThreadExecution( neoStores, txState ); - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executorService = Executors.newCachedThreadPool(); try { int threads = random.nextInt( 2, 6 ); - Set parallelResult = parallelExecution( neoStores, executor, threads, progressionFactory, txState ); + Set parallelResult = parallelExecution( neoStores, executorService, threads, txState ); assertEquals( expected, parallelResult ); } finally { - executor.shutdown(); + executorService.shutdown(); } } private Set parallelExecution( NeoStores neoStores, ExecutorService executorService, int threads, - BatchingProgressionFactory progressionFactory, NodeTransactionStateView stateView ) throws Throwable + NodeTransactionStateView stateView ) throws Throwable { - StoreStatement[] localStatements = new StoreStatement[threads]; + EnterpriseStoreStatement[] localStatements = new EnterpriseStoreStatement[threads]; for ( int i = 0; i < threads; i++ ) { - localStatements[i] = new StoreStatement( neoStores, null, null, NO_LOCK_SERVICE ); + localStatements[i] = new EnterpriseStoreStatement( neoStores, null, null, NO_LOCK_SERVICE ); } // use any of the local statements to build the shared progression - BatchingLongProgression progression = progressionFactory.parallelAllNodeScan( neoStores.getNodeStore() ); + BatchingLongProgression progression = localStatements[0].parallelNodeScanProgression(); @SuppressWarnings( "unchecked" ) Future>[] futures = new Future[threads]; @@ -167,13 +161,12 @@ private Set parallelExecution( NeoStores neoStores, ExecutorService execut return parallelResult; } - private Set singleThreadExecution( NeoStores neoStores, BatchingProgressionFactory progressionFactory, - NodeTransactionStateView stateView ) + private Set singleThreadExecution( NeoStores neoStores, NodeTransactionStateView stateView ) { Set expected = new HashSet<>(); - StoreStatement statement = new StoreStatement( neoStores, null, null, NO_LOCK_SERVICE ); + EnterpriseStoreStatement statement = new EnterpriseStoreStatement( neoStores, null, null, NO_LOCK_SERVICE ); try ( Cursor cursor = statement - .acquireNodeCursor( progressionFactory.allNodeScan( neoStores.getNodeStore() ), stateView ) ) + .acquireNodeCursor( new AllNodeProgression( neoStores.getNodeStore() ), stateView ) ) { while ( cursor.next() ) { diff --git a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeScanTest.java b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeProgressionTest.java similarity index 95% rename from enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeScanTest.java rename to enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeProgressionTest.java index d06919d1c3316..f295a7af0adf7 100644 --- a/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeScanTest.java +++ b/enterprise/kernel/src/test/java/org/neo4j/kernel/impl/api/store/ParallelAllNodeProgressionTest.java @@ -44,7 +44,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ParallelAllNodeScanTest +public class ParallelAllNodeProgressionTest { @Rule public RandomRule random = new RandomRule(); @@ -81,7 +81,7 @@ public void shouldServeDisjointBatchesToDifferentThreads() throws Throwable { // given when( nodeStore.getHighestPossibleIdInUse() ).thenReturn( (long) end ); - ParallelAllNodeScan progression = new ParallelAllNodeScan( nodeStore ); + ParallelAllNodeProgression progression = new ParallelAllNodeProgression( nodeStore ); ExecutorService service = Executors.newFixedThreadPool( threads ); try { @@ -101,7 +101,7 @@ public void shouldConsiderHighIdChanges() throws Throwable { // given when( nodeStore.getHighestPossibleIdInUse() ).thenReturn( (long) end, end + 1L, end + 2L ); - ParallelAllNodeScan progression = new ParallelAllNodeScan( nodeStore ); + ParallelAllNodeProgression progression = new ParallelAllNodeProgression( nodeStore ); ExecutorService service = Executors.newFixedThreadPool( threads ); try { @@ -119,7 +119,7 @@ public void shouldConsiderHighIdChanges() throws Throwable @Test public void onlyOneShouldRetrieveTheAddedNodes() throws Throwable { - ParallelAllNodeScan progression = new ParallelAllNodeScan( nodeStore ); + ParallelAllNodeProgression progression = new ParallelAllNodeProgression( null ); ExecutorService service = Executors.newFixedThreadPool( threads ); try { @@ -157,7 +157,7 @@ private Set mergeResultsAndAssertDisjoint( Future>[] futures ) return mergedResults; } - private Future>[] runInParallel( int threads, ParallelAllNodeScan progression, + private Future>[] runInParallel( int threads, ParallelAllNodeProgression progression, ExecutorService service ) { @SuppressWarnings( "unchecked" )