From a284516dc1366f7e0c6a6aa544772eec1950630a Mon Sep 17 00:00:00 2001 From: MishaDemianenko Date: Wed, 20 Jun 2018 14:50:29 +0200 Subject: [PATCH] Use localDatabase as a source of database dependent components in CC. --- .../catchup/RegularCatchupServerHandler.java | 13 ++------ .../PrepareStoreCopyRequestHandler.java | 8 ++--- .../storecopy/StoreCopyRequestHandler.java | 13 ++++---- .../catchup/tx/TxPullRequestHandler.java | 10 +++--- .../core/server/CoreServerModule.java | 5 +-- .../machines/CoreStateMachinesModule.java | 12 ++++--- .../token/ReplicatedLabelTokenHolder.java | 8 +++-- .../ReplicatedPropertyKeyTokenHolder.java | 8 +++-- ...ReplicatedRelationshipTypeTokenHolder.java | 13 +++++--- .../machines/token/ReplicatedTokenHolder.java | 11 +++---- .../machines/tx/RecoverConsensusLogIndex.java | 10 +++--- .../EnterpriseReadReplicaEditionModule.java | 28 ++++++++-------- .../PrepareStoreCopyRequestHandlerTest.java | 33 ++++++++++--------- .../StoreCopyRequestHandlerTest.java | 17 ++++++---- .../catchup/storecopy/TestCatchupServer.java | 11 +------ .../catchup/tx/TxPullRequestHandlerTest.java | 27 ++++++++++----- .../token/ReplicatedTokenHolderTest.java | 12 +++---- 17 files changed, 123 insertions(+), 116 deletions(-) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java index ce78b77fd1b3..e9bf84491d6a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/RegularCatchupServerHandler.java @@ -39,8 +39,6 @@ import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.NeoStoreDataSource; -import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; -import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; @@ -51,8 +49,6 @@ public class RegularCatchupServerHandler implements CatchupServerHandler private final Monitors monitors; private final LogProvider logProvider; private final Supplier storeIdSupplier; - private final Supplier transactionIdStoreSupplier; - private final Supplier logicalTransactionStoreSupplier; private final Supplier dataSourceSupplier; private final BooleanSupplier dataSourceAvailabilitySupplier; private final FileSystemAbstraction fs; @@ -60,15 +56,12 @@ public class RegularCatchupServerHandler implements CatchupServerHandler private final Supplier checkPointerSupplier; public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, Supplier storeIdSupplier, - Supplier transactionIdStoreSupplier, Supplier logicalTransactionStoreSupplier, Supplier dataSourceSupplier, BooleanSupplier dataSourceAvailabilitySupplier, FileSystemAbstraction fs, CoreSnapshotService snapshotService, Supplier checkPointerSupplier ) { this.monitors = monitors; this.logProvider = logProvider; this.storeIdSupplier = storeIdSupplier; - this.transactionIdStoreSupplier = transactionIdStoreSupplier; - this.logicalTransactionStoreSupplier = logicalTransactionStoreSupplier; this.dataSourceSupplier = dataSourceSupplier; this.dataSourceAvailabilitySupplier = dataSourceAvailabilitySupplier; this.fs = fs; @@ -79,8 +72,8 @@ public RegularCatchupServerHandler( Monitors monitors, LogProvider logProvider, @Override public ChannelHandler txPullRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return new TxPullRequestHandler( catchupServerProtocol, storeIdSupplier, dataSourceAvailabilitySupplier, transactionIdStoreSupplier, - logicalTransactionStoreSupplier, monitors, logProvider ); + return new TxPullRequestHandler( catchupServerProtocol, storeIdSupplier, dataSourceAvailabilitySupplier, dataSourceSupplier, + monitors, logProvider ); } @Override @@ -92,7 +85,7 @@ public ChannelHandler getStoreIdRequestHandler( CatchupServerProtocol catchupSer @Override public ChannelHandler storeListingRequestHandler( CatchupServerProtocol catchupServerProtocol ) { - return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, dataSourceSupplier, new PrepareStoreCopyFilesProvider( fs ) ); + return new PrepareStoreCopyRequestHandler( catchupServerProtocol, dataSourceSupplier, new PrepareStoreCopyFilesProvider( fs ) ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandler.java index 28604b31139c..8001a75f5eec 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandler.java @@ -42,16 +42,14 @@ public class PrepareStoreCopyRequestHandler extends SimpleChannelInboundHandler { private final CatchupServerProtocol protocol; - private final Supplier checkPointerSupplier; private final PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider; private final Supplier dataSourceSupplier; private final StoreFileStreamingProtocol streamingProtocol = new StoreFileStreamingProtocol(); - public PrepareStoreCopyRequestHandler( CatchupServerProtocol catchupServerProtocol, Supplier checkPointerSupplier, - Supplier dataSourceSupplier, PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider ) + public PrepareStoreCopyRequestHandler( CatchupServerProtocol catchupServerProtocol, Supplier dataSourceSupplier, + PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider ) { this.protocol = catchupServerProtocol; - this.checkPointerSupplier = checkPointerSupplier; this.prepareStoreCopyFilesProvider = prepareStoreCopyFilesProvider; this.dataSourceSupplier = dataSourceSupplier; } @@ -71,7 +69,7 @@ protected void channelRead0( ChannelHandlerContext channelHandlerContext, Prepar } else { - CheckPointer checkPointer = checkPointerSupplier.get(); + CheckPointer checkPointer = neoStoreDataSource.getDependencyResolver().resolveDependency( CheckPointer.class ); closeablesListener.add( tryCheckpointAndAcquireMutex( checkPointer ) ); PrepareStoreCopyFiles prepareStoreCopyFiles = closeablesListener.add( prepareStoreCopyFilesProvider.prepareStoreCopyFiles( neoStoreDataSource ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java index 22378437cfda..a3cfc97d4197 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandler.java @@ -53,18 +53,16 @@ public abstract class StoreCopyRequestHandler extend { private final CatchupServerProtocol protocol; private final Supplier dataSource; - private final Supplier checkpointerSupplier; private final StoreFileStreamingProtocol storeFileStreamingProtocol; private final FileSystemAbstraction fs; private final Log log; - StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, Supplier checkpointerSupplier, - StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider ) + StoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, StoreFileStreamingProtocol storeFileStreamingProtocol, + FileSystemAbstraction fs, LogProvider logProvider ) { this.protocol = protocol; this.dataSource = dataSource; - this.checkpointerSupplier = checkpointerSupplier; this.storeFileStreamingProtocol = storeFileStreamingProtocol; this.fs = fs; this.log = logProvider.getLog( StoreCopyRequestHandler.class ); @@ -82,7 +80,8 @@ protected void channelRead0( ChannelHandlerContext ctx, T request ) throws Excep { responseStatus = StoreCopyFinishedResponse.Status.E_STORE_ID_MISMATCH; } - else if ( !isTransactionWithinReach( request.requiredTransactionId(), checkpointerSupplier.get() ) ) + else if ( !isTransactionWithinReach( request.requiredTransactionId(), + neoStoreDataSource.getDependencyResolver().resolveDependency( CheckPointer.class ) ) ) { responseStatus = StoreCopyFinishedResponse.Status.E_TOO_FAR_BEHIND; } @@ -130,7 +129,7 @@ public static class GetStoreFileRequestHandler extends StoreCopyRequestHandler dataSource, Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider ) { - super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, fs, logProvider ); + super( protocol, dataSource, storeFileStreamingProtocol, fs, logProvider ); } @Override @@ -151,7 +150,7 @@ public GetIndexSnapshotRequestHandler( CatchupServerProtocol protocol, Supplier< Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider ) { - super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, fs, logProvider ); + super( protocol, dataSource, storeFileStreamingProtocol, fs, logProvider ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java index 33332186f576..62af7e47fac8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandler.java @@ -35,6 +35,8 @@ import org.neo4j.causalclustering.catchup.ResponseMessageType; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.cursor.IOCursor; +import org.neo4j.graphdb.DependencyResolver; +import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException; @@ -61,14 +63,14 @@ public class TxPullRequestHandler extends SimpleChannelInboundHandler storeIdSupplier, - BooleanSupplier databaseAvailable, Supplier transactionIdStoreSupplier, - Supplier logicalTransactionStoreSupplier, Monitors monitors, LogProvider logProvider ) + BooleanSupplier databaseAvailable, Supplier dataSourceSupplier, Monitors monitors, LogProvider logProvider ) { this.protocol = protocol; this.storeIdSupplier = storeIdSupplier; this.databaseAvailable = databaseAvailable; - this.transactionIdStore = transactionIdStoreSupplier.get(); - this.logicalTransactionStore = logicalTransactionStoreSupplier.get(); + DependencyResolver dependencies = dataSourceSupplier.get().getDependencyResolver(); + this.transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class ); + this.logicalTransactionStore = dependencies.resolveDependency( LogicalTransactionStore.class ); this.monitor = monitors.newMonitor( TxPullRequestsMonitor.class ); this.log = logProvider.getLog( getClass() ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java index a0508429d68d..1ed55785baaa 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/server/CoreServerModule.java @@ -77,8 +77,6 @@ import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.kernel.configuration.Config; import org.neo4j.kernel.impl.logging.LogService; -import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; -import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.internal.DatabaseHealth; @@ -182,8 +180,7 @@ commandApplicationProcess, logProvider, new ExponentialBackoffStrategy( 1, 30, S Collection supportedModifierProtocols = supportedProtocolCreator.createSupportedModifierProtocols(); CatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, - logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), - platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, + logProvider, localDatabase::storeId, localDatabase::dataSource, localDatabase::isAvailable, fileSystem, snapshotService, platformModule.dependencies.provideDependency( CheckPointer.class ) ); catchupServer = new CatchupServerBuilder( catchupServerHandler ) diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/CoreStateMachinesModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/CoreStateMachinesModule.java index 06df57dfa834..1d6ff8fb869b 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/CoreStateMachinesModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/CoreStateMachinesModule.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Map; import java.util.function.BooleanSupplier; +import java.util.function.Supplier; import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; import org.neo4j.causalclustering.core.consensus.LeaderLocator; @@ -72,6 +73,7 @@ import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.kernel.lifecycle.LifeSupport; import org.neo4j.logging.LogProvider; +import org.neo4j.storageengine.api.StorageEngine; import static org.neo4j.causalclustering.core.CausalClusteringSettings.array_block_id_allocation_size; import static org.neo4j.causalclustering.core.CausalClusteringSettings.id_alloc_state_size; @@ -145,18 +147,20 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule, dependencies.satisfyDependency( new IdBasedStoreEntityCounters( this.idGeneratorFactory ) ); TokenRegistry relationshipTypeTokenRegistry = new TokenRegistry( "RelationshipType" ); + Supplier storageEngineSupplier = () -> localDatabase.dataSource().getDependencyResolver().resolveDependency( StorageEngine.class ); ReplicatedRelationshipTypeTokenHolder relationshipTypeTokenHolder = new ReplicatedRelationshipTypeTokenHolder( relationshipTypeTokenRegistry, replicator, - this.idGeneratorFactory, dependencies ); + this.idGeneratorFactory, storageEngineSupplier ); TokenRegistry propertyKeyTokenRegistry = new TokenRegistry( "PropertyKey" ); ReplicatedPropertyKeyTokenHolder propertyKeyTokenHolder = new ReplicatedPropertyKeyTokenHolder( propertyKeyTokenRegistry, replicator, this.idGeneratorFactory, - dependencies ); + storageEngineSupplier ); TokenRegistry labelTokenRegistry = new TokenRegistry( "Label" ); ReplicatedLabelTokenHolder labelTokenHolder = - new ReplicatedLabelTokenHolder( labelTokenRegistry, replicator, this.idGeneratorFactory, dependencies ); + new ReplicatedLabelTokenHolder( labelTokenRegistry, replicator, this.idGeneratorFactory, + storageEngineSupplier ); ReplicatedLockTokenStateMachine replicatedLockTokenStateMachine = new ReplicatedLockTokenStateMachine( lockTokenState ); @@ -182,7 +186,7 @@ public CoreStateMachinesModule( MemberId myself, PlatformModule platformModule, lockManager = createLockManager( config, platformModule.clock, logging, replicator, myself, raftMachine, replicatedLockTokenStateMachine ); - RecoverConsensusLogIndex consensusLogIndexRecovery = new RecoverConsensusLogIndex( dependencies, logProvider ); + RecoverConsensusLogIndex consensusLogIndexRecovery = new RecoverConsensusLogIndex( localDatabase, logProvider ); coreStateMachines = new CoreStateMachines( replicatedTxStateMachine, labelTokenStateMachine, relationshipTypeTokenStateMachine, propertyKeyTokenStateMachine, replicatedLockTokenStateMachine, diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedLabelTokenHolder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedLabelTokenHolder.java index fd41d80b1587..48c6134e21d6 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedLabelTokenHolder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedLabelTokenHolder.java @@ -22,19 +22,21 @@ */ package org.neo4j.causalclustering.core.state.machines.token; +import java.util.function.Supplier; + import org.neo4j.causalclustering.core.replication.Replicator; import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.core.TokenHolder; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdType; -import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.storageengine.api.StorageEngine; public class ReplicatedLabelTokenHolder extends ReplicatedTokenHolder implements TokenHolder { public ReplicatedLabelTokenHolder( TokenRegistry registry, Replicator replicator, - IdGeneratorFactory idGeneratorFactory, Dependencies dependencies ) + IdGeneratorFactory idGeneratorFactory, Supplier storageEngineSupplier ) { - super( registry, replicator, idGeneratorFactory, IdType.LABEL_TOKEN, dependencies, TokenType.LABEL ); + super( registry, replicator, idGeneratorFactory, IdType.LABEL_TOKEN, storageEngineSupplier, TokenType.LABEL ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedPropertyKeyTokenHolder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedPropertyKeyTokenHolder.java index c620fd0e54b4..346d565763a3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedPropertyKeyTokenHolder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedPropertyKeyTokenHolder.java @@ -22,19 +22,21 @@ */ package org.neo4j.causalclustering.core.state.machines.token; +import java.util.function.Supplier; + import org.neo4j.causalclustering.core.replication.RaftReplicator; import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.core.TokenHolder; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdType; -import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.storageengine.api.StorageEngine; public class ReplicatedPropertyKeyTokenHolder extends ReplicatedTokenHolder implements TokenHolder { public ReplicatedPropertyKeyTokenHolder( TokenRegistry registry, RaftReplicator replicator, - IdGeneratorFactory idGeneratorFactory, Dependencies dependencies ) + IdGeneratorFactory idGeneratorFactory, Supplier storageEngineSupplier ) { - super( registry, replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, dependencies, TokenType.PROPERTY ); + super( registry, replicator, idGeneratorFactory, IdType.PROPERTY_KEY_TOKEN, storageEngineSupplier, TokenType.PROPERTY ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedRelationshipTypeTokenHolder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedRelationshipTypeTokenHolder.java index b9bd10709e2e..ad18df5f7dfe 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedRelationshipTypeTokenHolder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedRelationshipTypeTokenHolder.java @@ -22,19 +22,22 @@ */ package org.neo4j.causalclustering.core.state.machines.token; +import java.util.function.Supplier; + import org.neo4j.causalclustering.core.replication.RaftReplicator; import org.neo4j.kernel.api.txstate.TransactionState; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; -import org.neo4j.kernel.impl.store.id.IdType; -import org.neo4j.kernel.impl.util.Dependencies; +import org.neo4j.storageengine.api.StorageEngine; + +import static org.neo4j.causalclustering.core.state.machines.token.TokenType.RELATIONSHIP; +import static org.neo4j.kernel.impl.store.id.IdType.RELATIONSHIP_TYPE_TOKEN; public class ReplicatedRelationshipTypeTokenHolder extends ReplicatedTokenHolder { public ReplicatedRelationshipTypeTokenHolder( TokenRegistry registry, - RaftReplicator replicator, IdGeneratorFactory idGeneratorFactory, Dependencies dependencies ) + RaftReplicator replicator, IdGeneratorFactory idGeneratorFactory, Supplier storageEngineSupplier ) { - super( registry, replicator, idGeneratorFactory, IdType.RELATIONSHIP_TYPE_TOKEN, dependencies, - TokenType.RELATIONSHIP ); + super( registry, replicator, idGeneratorFactory, RELATIONSHIP_TYPE_TOKEN, storageEngineSupplier, RELATIONSHIP ); } @Override diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java index 9a8baf573d9a..82605f1e43f7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolder.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.Supplier; import org.neo4j.causalclustering.core.replication.ReplicationFailureException; import org.neo4j.causalclustering.core.replication.Replicator; @@ -41,7 +42,6 @@ import org.neo4j.kernel.impl.core.TokenNotFoundException; import org.neo4j.kernel.impl.store.id.IdGeneratorFactory; import org.neo4j.kernel.impl.store.id.IdType; -import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageReader; @@ -51,25 +51,24 @@ abstract class ReplicatedTokenHolder implements TokenHolder { - protected final Dependencies dependencies; - private final Replicator replicator; private final TokenRegistry tokenRegistry; private final IdGeneratorFactory idGeneratorFactory; private final IdType tokenIdType; private final TokenType type; + private final Supplier storageEngineSupplier; // TODO: Clean up all the resolving, which now happens every time with special selection strategies. ReplicatedTokenHolder( TokenRegistry tokenRegistry, Replicator replicator, IdGeneratorFactory idGeneratorFactory, IdType tokenIdType, - Dependencies dependencies, TokenType type ) + Supplier storageEngineSupplier, TokenType type ) { this.replicator = replicator; this.tokenRegistry = tokenRegistry; this.idGeneratorFactory = idGeneratorFactory; this.tokenIdType = tokenIdType; this.type = type; - this.dependencies = dependencies; + this.storageEngineSupplier = storageEngineSupplier; } @Override @@ -126,7 +125,7 @@ private int requestToken( String tokenName ) private byte[] createCommands( String tokenName ) { - StorageEngine storageEngine = dependencies.resolveDependency( StorageEngine.class ); + StorageEngine storageEngine = storageEngineSupplier.get(); Collection commands = new ArrayList<>(); TransactionState txState = new TxState(); int tokenId = Math.toIntExact( idGeneratorFactory.get( tokenIdType ).nextId() ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/RecoverConsensusLogIndex.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/RecoverConsensusLogIndex.java index 8decc2be63fb..6d1fa6f26aa4 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/RecoverConsensusLogIndex.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/state/machines/tx/RecoverConsensusLogIndex.java @@ -22,9 +22,10 @@ */ package org.neo4j.causalclustering.core.state.machines.tx; +import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase; +import org.neo4j.graphdb.DependencyResolver; import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; -import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.logging.LogProvider; import static org.neo4j.graphdb.DependencyResolver.SelectionStrategy.ONLY; @@ -35,17 +36,18 @@ */ public class RecoverConsensusLogIndex { - private final Dependencies dependencies; + private final LocalDatabase localDatabase; private final LogProvider logProvider; - public RecoverConsensusLogIndex( Dependencies dependencies, LogProvider logProvider ) + public RecoverConsensusLogIndex( LocalDatabase localDatabase, LogProvider logProvider ) { - this.dependencies = dependencies; + this.localDatabase = localDatabase; this.logProvider = logProvider; } public long findLastAppliedIndex() { + DependencyResolver dependencies = localDatabase.dataSource().getDependencyResolver(); TransactionIdStore transactionIdStore = dependencies.resolveDependency( TransactionIdStore.class, ONLY ); LogicalTransactionStore transactionStore = dependencies.resolveDependency( LogicalTransactionStore.class, ONLY ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java index 88d0a22ee9f7..8d893a95b849 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/readreplica/EnterpriseReadReplicaEditionModule.java @@ -124,7 +124,6 @@ import org.neo4j.kernel.impl.store.id.IdReuseEligibility; import org.neo4j.kernel.impl.store.stats.IdBasedStoreEntityCounters; import org.neo4j.kernel.impl.transaction.TransactionHeaderInformationFactory; -import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; import org.neo4j.kernel.impl.transaction.log.TransactionAppender; import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; @@ -260,27 +259,27 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, final Supplier databaseHealthSupplier = dependencies.provideDependency( DatabaseHealth.class ); - Supplier writableCommitProcess = - () -> new TransactionRepresentationCommitProcess( dependencies.resolveDependency( TransactionAppender.class ), - dependencies.resolveDependency( StorageEngine.class ) ); + StoreFiles storeFiles = new StoreFiles( fileSystem, pageCache ); + LogFiles logFiles = buildLocalDatabaseLogFiles( platformModule, fileSystem, storeDir, config ); + + LocalDatabase localDatabase = + new LocalDatabase( platformModule.storeDir, storeFiles, logFiles, platformModule.dataSourceManager, + databaseHealthSupplier, + watcherService, platformModule.availabilityGuard, logProvider ); + + Supplier writableCommitProcess = () -> new TransactionRepresentationCommitProcess( + localDatabase.dataSource().getDependencyResolver().resolveDependency( TransactionAppender.class ), + localDatabase.dataSource().getDependencyResolver().resolveDependency( StorageEngine.class ) ); LifeSupport txPulling = new LifeSupport(); int maxBatchSize = config.get( CausalClusteringSettings.read_replica_transaction_applier_batch_size ); BatchingTxApplier batchingTxApplier = new BatchingTxApplier( - maxBatchSize, dependencies.provideDependency( TransactionIdStore.class ), writableCommitProcess, + maxBatchSize, () -> localDatabase.dataSource().getDependencyResolver().resolveDependency( TransactionIdStore.class ), writableCommitProcess, platformModule.monitors, platformModule.tracers.pageCursorTracerSupplier, platformModule.versionContextSupplier, logProvider ); TimerService timerService = new TimerService( platformModule.jobScheduler, logProvider ); - StoreFiles storeFiles = new StoreFiles( fileSystem, pageCache ); - LogFiles logFiles = buildLocalDatabaseLogFiles( platformModule, fileSystem, storeDir, config ); - - LocalDatabase localDatabase = - new LocalDatabase( platformModule.storeDir, storeFiles, logFiles, platformModule.dataSourceManager, - databaseHealthSupplier, - watcherService, platformModule.availabilityGuard, logProvider ); - ExponentialBackoffStrategy storeCopyBackoffStrategy = new ExponentialBackoffStrategy( 1, config.get( CausalClusteringSettings.store_copy_backoff_max_wait ).toMillis(), TimeUnit.MILLISECONDS ); @@ -319,8 +318,7 @@ public EnterpriseReadReplicaEditionModule( final PlatformModule platformModule, platformModule.logging.getUserLogProvider(), storeCopyProcess, topologyService ) ); RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( platformModule.monitors, - logProvider, localDatabase::storeId, platformModule.dependencies.provideDependency( TransactionIdStore.class ), - platformModule.dependencies.provideDependency( LogicalTransactionStore.class ), localDatabase::dataSource, localDatabase::isAvailable, + logProvider, localDatabase::storeId, localDatabase::dataSource, localDatabase::isAvailable, fileSystem, null, platformModule.dependencies.provideDependency( CheckPointer.class ) ); InstalledProtocolHandler installedProtocolHandler = new InstalledProtocolHandler(); // TODO: hook into a procedure diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandlerTest.java index 479daac4cc35..c7bb76e419ab 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandlerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/PrepareStoreCopyRequestHandlerTest.java @@ -41,6 +41,7 @@ import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex; +import org.neo4j.kernel.impl.util.Dependencies; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -63,27 +64,15 @@ public class PrepareStoreCopyRequestHandlerTest @Before public void setup() { + Dependencies dependencies = new Dependencies(); + dependencies.satisfyDependency( checkPointer ); StoreCopyCheckPointMutex storeCopyCheckPointMutex = new StoreCopyCheckPointMutex(); when( neoStoreDataSource.getStoreCopyCheckPointMutex() ).thenReturn( storeCopyCheckPointMutex ); + when( neoStoreDataSource.getDependencyResolver() ).thenReturn( dependencies ); PrepareStoreCopyRequestHandler subject = createHandler(); embeddedChannel = new EmbeddedChannel( subject ); } - private PrepareStoreCopyRequestHandler createHandler() - { - catchupServerProtocol = new CatchupServerProtocol(); - catchupServerProtocol.expect( CatchupServerProtocol.State.PREPARE_STORE_COPY ); - Supplier checkPointerSupplier = () -> checkPointer; - Supplier dataSourceSupplier = () -> neoStoreDataSource; - when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) ); - - PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider = mock( PrepareStoreCopyFilesProvider.class ); - when( prepareStoreCopyFilesProvider.prepareStoreCopyFiles( any() ) ).thenReturn( prepareStoreCopyFiles ); - - return new PrepareStoreCopyRequestHandler( catchupServerProtocol, checkPointerSupplier, dataSourceSupplier, - prepareStoreCopyFilesProvider ); - } - @Test public void shouldGiveErrorResponseIfStoreMismatch() { @@ -154,6 +143,20 @@ public void shouldRetainLockWhileStreaming() throws Exception assertEquals( 0, lock.getReadLockCount() ); } + private PrepareStoreCopyRequestHandler createHandler() + { + catchupServerProtocol = new CatchupServerProtocol(); + catchupServerProtocol.expect( CatchupServerProtocol.State.PREPARE_STORE_COPY ); + Supplier dataSourceSupplier = () -> neoStoreDataSource; + when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) ); + + PrepareStoreCopyFilesProvider prepareStoreCopyFilesProvider = mock( PrepareStoreCopyFilesProvider.class ); + when( prepareStoreCopyFilesProvider.prepareStoreCopyFiles( any() ) ).thenReturn( prepareStoreCopyFiles ); + + return new PrepareStoreCopyRequestHandler( catchupServerProtocol, dataSourceSupplier, + prepareStoreCopyFilesProvider ); + } + private void configureProvidedStoreCopyFiles( StoreResource[] atomicFiles, File[] files, LongSet indexIds, long lastCommitedTx ) throws IOException { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java index 7ea98684f029..e36e85a46072 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/StoreCopyRequestHandlerTest.java @@ -40,6 +40,7 @@ import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; import org.neo4j.kernel.impl.transaction.log.checkpoint.TriggerInfo; +import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; import org.neo4j.storageengine.api.StoreFileMetadata; @@ -67,9 +68,12 @@ public void setup() catchupServerProtocol = new CatchupServerProtocol(); catchupServerProtocol.expect( CatchupServerProtocol.State.GET_STORE_FILE ); StoreCopyRequestHandler storeCopyRequestHandler = - new NiceStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, () -> checkPointer, new StoreFileStreamingProtocol(), + new NiceStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, new StoreFileStreamingProtocol(), fileSystemAbstraction, NullLogProvider.getInstance() ); + Dependencies dependencies = new Dependencies(); + dependencies.satisfyDependency( checkPointer ); when( neoStoreDataSource.getStoreId() ).thenReturn( new org.neo4j.kernel.impl.store.StoreId( 1, 2, 5, 3, 4 ) ); + when( neoStoreDataSource.getDependencyResolver() ).thenReturn( dependencies ); embeddedChannel = new EmbeddedChannel( storeCopyRequestHandler ); } @@ -122,7 +126,7 @@ public void shouldResetProtocolAndGiveErrorOnUncheckedException() public void shoulResetProtocolAndGiveErrorIfFilesThrowException() { EmbeddedChannel alternativeChannel = new EmbeddedChannel( - new EvilStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, () -> checkPointer, new StoreFileStreamingProtocol(), + new EvilStoreCopyRequestHandler( catchupServerProtocol, () -> neoStoreDataSource, new StoreFileStreamingProtocol(), fileSystemAbstraction, NullLogProvider.getInstance() ) ); try { @@ -143,10 +147,10 @@ public void shoulResetProtocolAndGiveErrorIfFilesThrowException() private class NiceStoreCopyRequestHandler extends StoreCopyRequestHandler { private NiceStoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, - Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, + StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider ) { - super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, fs, logProvider ); + super( protocol, dataSource, storeFileStreamingProtocol, fs, logProvider ); } @Override @@ -159,10 +163,9 @@ ResourceIterator files( StoreCopyRequest request, NeoStoreDat private class EvilStoreCopyRequestHandler extends StoreCopyRequestHandler { private EvilStoreCopyRequestHandler( CatchupServerProtocol protocol, Supplier dataSource, - Supplier checkpointerSupplier, StoreFileStreamingProtocol storeFileStreamingProtocol, - FileSystemAbstraction fs, LogProvider logProvider ) + StoreFileStreamingProtocol storeFileStreamingProtocol, FileSystemAbstraction fs, LogProvider logProvider ) { - super( protocol, dataSource, checkpointerSupplier, storeFileStreamingProtocol, fs, logProvider ); + super( protocol, dataSource, storeFileStreamingProtocol, fs, logProvider ); } @Override diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java index 0ad0737552a1..1adf5e0facde 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/storecopy/TestCatchupServer.java @@ -42,16 +42,11 @@ import org.neo4j.causalclustering.protocol.handshake.HandshakeServerInitializer; import org.neo4j.causalclustering.protocol.handshake.ModifierProtocolRepository; import org.neo4j.causalclustering.protocol.handshake.ModifierSupportedProtocols; -import org.neo4j.graphdb.DependencyResolver; import org.neo4j.helpers.ListenSocketAddress; import org.neo4j.io.fs.FileSystemAbstraction; -import org.neo4j.io.pagecache.PageCache; import org.neo4j.kernel.AvailabilityGuard; import org.neo4j.kernel.NeoStoreDataSource; -import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore; -import org.neo4j.kernel.impl.transaction.log.TransactionIdStore; import org.neo4j.kernel.impl.transaction.log.checkpoint.CheckPointer; -import org.neo4j.kernel.impl.transaction.log.checkpoint.StoreCopyCheckPointMutex; import org.neo4j.kernel.internal.GraphDatabaseAPI; import org.neo4j.kernel.monitoring.Monitors; import org.neo4j.logging.LogProvider; @@ -81,12 +76,9 @@ private static ChildInitializer childInitializer( FileSystemAbstraction fileSyst ApplicationProtocolRepository catchupRepository = new ApplicationProtocolRepository( ApplicationProtocols.values(), catchupProtocols ); ModifierProtocolRepository modifierRepository = new ModifierProtocolRepository( ModifierProtocols.values(), singletonList( modifierProtocols ) ); - DependencyResolver dependencies = graphDb.getDependencyResolver(); - StoreCopyCheckPointMutex storeCopyCheckPointMutex = dependencies.resolveDependency( StoreCopyCheckPointMutex.class ); Supplier checkPointer = () -> graphDb.getDependencyResolver().resolveDependency( CheckPointer.class ); BooleanSupplier availability = () -> graphDb.getDependencyResolver().resolveDependency( AvailabilityGuard.class ).isAvailable(); Supplier dataSource = () -> graphDb.getDependencyResolver().resolveDependency( NeoStoreDataSource.class ); - PageCache pageCache = graphDb.getDependencyResolver().resolveDependency( PageCache.class ); LogProvider logProvider = NullLogProvider.getInstance(); org.neo4j.kernel.impl.store.StoreId kernelStoreId = dataSource.get().getStoreId(); @@ -94,8 +86,7 @@ private static ChildInitializer childInitializer( FileSystemAbstraction fileSyst kernelStoreId.getUpgradeId() ); RegularCatchupServerHandler catchupServerHandler = new RegularCatchupServerHandler( new Monitors(), logProvider, - () -> storeId, dependencies.provideDependency( TransactionIdStore.class ), dependencies.provideDependency( LogicalTransactionStore.class ), - dataSource, availability, fileSystem, null, checkPointer ); + () -> storeId, dataSource, availability, fileSystem, null, checkPointer ); NettyPipelineBuilderFactory pipelineBuilder = new NettyPipelineBuilderFactory( VoidPipelineWrapperFactory.VOID_WRAPPER ); CatchupProtocolServerInstaller.Factory catchupProtocolServerInstaller = new CatchupProtocolServerInstaller.Factory( pipelineBuilder, logProvider, diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java index 7427ecc9a658..44483fae30ff 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/catchup/tx/TxPullRequestHandlerTest.java @@ -24,12 +24,15 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import org.junit.Before; import org.junit.Test; import org.neo4j.causalclustering.catchup.CatchupServerProtocol; import org.neo4j.causalclustering.catchup.ResponseMessageType; import org.neo4j.causalclustering.identity.StoreId; import org.neo4j.cursor.Cursor; +import org.neo4j.graphdb.DependencyResolver; +import org.neo4j.kernel.NeoStoreDataSource; import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation; import org.neo4j.kernel.impl.transaction.command.Commands; import org.neo4j.kernel.impl.transaction.log.LogPosition; @@ -62,11 +65,23 @@ public class TxPullRequestHandlerTest private final AssertableLogProvider logProvider = new AssertableLogProvider(); private StoreId storeId = new StoreId( 1, 2, 3, 4 ); + private NeoStoreDataSource datasource = mock( NeoStoreDataSource.class ); private LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class ); private TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); - private TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), () -> storeId, () -> true, - () -> transactionIdStore, () -> logicalTransactionStore, new Monitors(), logProvider ); + private TxPullRequestHandler txPullRequestHandler; + + @Before + public void setUp() + { + DependencyResolver dependencyResolver = mock( DependencyResolver.class ); + when( datasource.getDependencyResolver() ).thenReturn( dependencyResolver ); + when( dependencyResolver.resolveDependency( LogicalTransactionStore.class ) ).thenReturn( logicalTransactionStore ); + when( dependencyResolver.resolveDependency( TransactionIdStore.class ) ).thenReturn( transactionIdStore ); + when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); + txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), () -> storeId, () -> true, + () -> datasource, new Monitors(), logProvider ); + } @Test public void shouldRespondWithCompleteStreamOfTransactions() throws Exception @@ -125,13 +140,9 @@ public void shouldNotStreamTxEntriesIfStoreIdMismatches() throws Exception StoreId serverStoreId = new StoreId( 1, 2, 3, 4 ); StoreId clientStoreId = new StoreId( 5, 6, 7, 8 ); - TransactionIdStore transactionIdStore = mock( TransactionIdStore.class ); - when( transactionIdStore.getLastCommittedTransactionId() ).thenReturn( 15L ); - LogicalTransactionStore logicalTransactionStore = mock( LogicalTransactionStore.class ); - TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), () -> serverStoreId, () -> true, - () -> transactionIdStore, () -> logicalTransactionStore, new Monitors(), logProvider ); + () -> datasource, new Monitors(), logProvider ); // when txPullRequestHandler.channelRead0( context, new TxPullRequest( 1, clientStoreId ) ); @@ -152,7 +163,7 @@ public void shouldNotStreamTxsAndReportErrorIfTheLocalDatabaseIsNotAvailable() t TxPullRequestHandler txPullRequestHandler = new TxPullRequestHandler( new CatchupServerProtocol(), () -> storeId, () -> false, - () -> transactionIdStore, () -> logicalTransactionStore, new Monitors(), logProvider ); + () -> datasource, new Monitors(), logProvider ); // when txPullRequestHandler.channelRead0( context, new TxPullRequest( 1, storeId ) ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolderTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolderTest.java index e192ce4dd142..1c1f96ab8dab 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolderTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/state/machines/token/ReplicatedTokenHolderTest.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.neo4j.internal.kernel.api.NamedToken; import org.neo4j.kernel.impl.store.id.IdGenerator; @@ -33,7 +34,6 @@ import org.neo4j.kernel.impl.store.id.IdType; import org.neo4j.kernel.impl.store.record.LabelTokenRecord; import org.neo4j.kernel.impl.transaction.command.Command; -import org.neo4j.kernel.impl.util.Dependencies; import org.neo4j.storageengine.api.StorageCommand; import org.neo4j.storageengine.api.StorageEngine; import org.neo4j.storageengine.api.StorageReader; @@ -54,7 +54,7 @@ public class ReplicatedTokenHolderTest { - private Dependencies dependencies = mock( Dependencies.class ); + private Supplier storageEngineSupplier = mock( Supplier.class ); @Test public void shouldStoreInitialTokens() @@ -62,7 +62,7 @@ public void shouldStoreInitialTokens() // given TokenRegistry registry = new TokenRegistry( "Label" ); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( registry, null, - null, dependencies ); + null, storageEngineSupplier ); // when tokenHolder.setInitialTokens( asList( new NamedToken( "name1", 1 ), new NamedToken( "name2", 2 ) ) ); @@ -77,7 +77,7 @@ public void shouldReturnExistingTokenId() // given TokenRegistry registry = new TokenRegistry( "Label" ); ReplicatedTokenHolder tokenHolder = new ReplicatedLabelTokenHolder( registry, null, - null, dependencies ); + null, storageEngineSupplier ); tokenHolder.setInitialTokens( asList( new NamedToken( "name1", 1 ), new NamedToken( "name2", 2 ) ) ); // when @@ -92,7 +92,7 @@ public void shouldReplicateTokenRequestForNewToken() throws Exception { // given StorageEngine storageEngine = mockedStorageEngine(); - when( dependencies.resolveDependency( StorageEngine.class ) ).thenReturn( storageEngine ); + when( storageEngineSupplier.get() ).thenReturn( storageEngine ); IdGeneratorFactory idGeneratorFactory = mock( IdGeneratorFactory.class ); IdGenerator idGenerator = mock( IdGenerator.class ); @@ -109,7 +109,7 @@ public void shouldReplicateTokenRequestForNewToken() throws Exception completeFuture.complete( generatedTokenId ); return completeFuture; }, - idGeneratorFactory, dependencies ); + idGeneratorFactory, storageEngineSupplier ); // when Integer tokenId = tokenHolder.getOrCreateId( "name1" );