From 9dc6cacc0e2614c295bfa1e84a78b9cae4fc7b75 Mon Sep 17 00:00:00 2001 From: Max Sumrall Date: Fri, 15 Jul 2016 16:26:38 +0200 Subject: [PATCH] Extract ReplicationModule. Extract some session tracking functionality into a SessionTracker. --- .../org/neo4j/coreedge/ReplicationModule.java | 103 ++++++++++++++++++ .../org/neo4j/coreedge/SessionTracker.java | 84 ++++++++++++++ .../org/neo4j/coreedge/SnapFlushable.java | 35 ++++++ .../neo4j/coreedge/raft/state/CoreState.java | 29 ++--- .../raft/state/CoreStateMachines.java | 39 ++++--- .../core/EnterpriseCoreEditionModule.java | 85 +++++++-------- .../coreedge/raft/state/CoreStateTest.java | 7 +- 7 files changed, 294 insertions(+), 88 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/SnapFlushable.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java new file mode 100644 index 0000000000000..9cc46ba92e35b --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/ReplicationModule.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import java.util.function.Supplier; + +import org.neo4j.coreedge.raft.ConsensusModule; +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.net.Outbound; +import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; +import org.neo4j.coreedge.raft.replication.RaftReplicator; +import org.neo4j.coreedge.raft.replication.session.GlobalSession; +import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; +import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; +import org.neo4j.coreedge.raft.replication.tx.ExponentialBackoffStrategy; +import org.neo4j.coreedge.raft.state.DurableStateStorage; +import org.neo4j.coreedge.server.CoreEdgeClusterSettings; +import org.neo4j.coreedge.server.CoreMember; +import org.neo4j.io.fs.FileSystemAbstraction; +import org.neo4j.kernel.configuration.Config; +import org.neo4j.kernel.impl.factory.PlatformModule; +import org.neo4j.kernel.internal.DatabaseHealth; +import org.neo4j.kernel.lifecycle.LifeSupport; +import org.neo4j.logging.LogProvider; + +import static java.util.concurrent.TimeUnit.SECONDS; + +public class ReplicationModule +{ + private final RaftReplicator replicator; + private final ProgressTrackerImpl progressTracker; + private final SessionTracker sessionTracker; + + public ReplicationModule( CoreMember myself, PlatformModule platformModule, Config config, ConsensusModule consensusModule, + Outbound loggingOutbound, File clusterStateDirectory, + FileSystemAbstraction fileSystem, Supplier databaseHealthSupplier, LogProvider logProvider ) + { + LifeSupport life = platformModule.life; + + DurableStateStorage sessionTrackerStorage; + try + { + sessionTrackerStorage = life.add( + new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), + "session-tracker", + new GlobalSessionTrackerState.Marshal( new CoreMember.CoreMemberMarshal() ), + config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), + databaseHealthSupplier, logProvider ) ); + + } + catch ( IOException e ) + { + throw new RuntimeException( e ); + } + + sessionTracker = new SessionTracker( sessionTrackerStorage ); + + GlobalSession myGlobalSession = new GlobalSession( UUID.randomUUID(), myself ); + LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession ); + progressTracker = new ProgressTrackerImpl( myGlobalSession ); + + replicator = new RaftReplicator( consensusModule.raftInstance(), myself, + loggingOutbound, + sessionPool, progressTracker, + new ExponentialBackoffStrategy( 10, SECONDS ) ); + + } + + public RaftReplicator getReplicator() + { + return replicator; + } + + public ProgressTrackerImpl getProgressTracker() + { + return progressTracker; + } + + public SessionTracker getSessionTracker() + { + return sessionTracker; + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java new file mode 100644 index 0000000000000..5eb4d310a8f5c --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SessionTracker.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge; + +import java.io.IOException; + +import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType; +import org.neo4j.coreedge.raft.replication.session.GlobalSession; +import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; +import org.neo4j.coreedge.raft.replication.session.LocalOperationId; +import org.neo4j.coreedge.raft.state.CoreSnapshot; +import org.neo4j.coreedge.raft.state.StateStorage; + +public class SessionTracker implements SnapFlushable +{ + private final StateStorage sessionTrackerStorage; + private GlobalSessionTrackerState sessionState = new GlobalSessionTrackerState(); + + public SessionTracker( StateStorage sessionTrackerStorage ) + { + this.sessionTrackerStorage = sessionTrackerStorage; + } + + public void start() + { + sessionState = sessionTrackerStorage.getInitialState(); + } + + @Override + public long getLastAppliedIndex() + { + return sessionState.logIndex(); + } + + @Override + public void flush() throws IOException + { + sessionTrackerStorage.persistStoreData( sessionState ); + } + + @Override + public void addSnapshots( CoreSnapshot coreSnapshot ) + { + coreSnapshot.add( CoreStateType.SESSION_TRACKER, sessionState.newInstance() ); + } + + @Override + public void installSnapshots( CoreSnapshot coreSnapshot ) + { + sessionState = coreSnapshot.get( CoreStateType.SESSION_TRACKER ); + } + + public boolean validateOperation( GlobalSession globalSession, LocalOperationId localOperationId ) + { + return sessionState.validateOperation( globalSession, localOperationId ); + } + + public void update( GlobalSession globalSession, LocalOperationId localOperationId, long logIndex ) + { + sessionState.update( globalSession, localOperationId, logIndex ); + } + + public GlobalSessionTrackerState newInstance() + { + return sessionState.newInstance(); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SnapFlushable.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SnapFlushable.java new file mode 100644 index 0000000000000..0432d40a1b5b0 --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/SnapFlushable.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge; + +import java.io.IOException; + +import org.neo4j.coreedge.raft.state.CoreSnapshot; + +public interface SnapFlushable +{ + void flush() throws IOException; + + void addSnapshots( CoreSnapshot coreSnapshot ); + + long getLastAppliedIndex(); + + void installSnapshots( CoreSnapshot coreSnapshot ); +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index badc0bec7a8da..228322ea75c91 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -24,8 +24,8 @@ import java.util.List; import java.util.function.Supplier; +import org.neo4j.coreedge.SessionTracker; import org.neo4j.coreedge.catchup.storecopy.StoreCopyFailedException; -import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType; import org.neo4j.coreedge.discovery.CoreServerSelectionException; import org.neo4j.coreedge.raft.RaftStateMachine; import org.neo4j.coreedge.raft.log.RaftLog; @@ -35,7 +35,6 @@ import org.neo4j.coreedge.raft.log.segmented.InFlightMap; import org.neo4j.coreedge.raft.replication.DistributedOperation; import org.neo4j.coreedge.raft.replication.ProgressTracker; -import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; import org.neo4j.coreedge.server.CoreMember; import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; @@ -55,7 +54,7 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log private final StateStorage lastFlushedStorage; private final int flushEvery; private final ProgressTracker progressTracker; - private final StateStorage sessionStorage; + private final SessionTracker sessionTracker; private final Supplier dbHealth; private final InFlightMap inFlightMap; private final Log log; @@ -65,7 +64,6 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine, Log private final RaftLogCommitIndexMonitor commitIndexMonitor; private final OperationBatcher batcher; - private GlobalSessionTrackerState sessionState = new GlobalSessionTrackerState(); private CoreStateMachines coreStateMachines; private long lastApplied = NOTHING; @@ -80,7 +78,7 @@ public CoreState( LogProvider logProvider, ProgressTracker progressTracker, StateStorage lastFlushedStorage, - StateStorage sessionStorage, + SessionTracker sessionTracker, CoreServerSelectionStrategy someoneElse, CoreStateApplier applier, CoreStateDownloader downloader, @@ -92,7 +90,7 @@ public CoreState( this.lastFlushedStorage = lastFlushedStorage; this.flushEvery = flushEvery; this.progressTracker = progressTracker; - this.sessionStorage = sessionStorage; + this.sessionTracker = sessionTracker; this.someoneElse = someoneElse; this.applier = applier; this.downloader = downloader; @@ -212,11 +210,6 @@ public synchronized void notifyNeedFreshSnapshot() } } - /** - * Compacts the core state. - * - * @throws IOException - */ public void compact() throws IOException { raftLog.prune( lastFlushed ); @@ -246,7 +239,7 @@ private void handleOperations( long commandIndex, List ope { for ( DistributedOperation operation : operations ) { - if ( !sessionState.validateOperation( operation.globalSession(), operation.operationId() ) ) + if ( !sessionTracker.validateOperation( operation.globalSession(), operation.operationId() ) ) { commandIndex++; continue; @@ -256,7 +249,7 @@ private void handleOperations( long commandIndex, List ope command.dispatch( dispatcher, commandIndex, result -> progressTracker.trackResult( operation, result ) ); - sessionState.update( operation.globalSession(), operation.operationId(), commandIndex ); + sessionTracker.update( operation.globalSession(), operation.operationId(), commandIndex ); commandIndex++; } } @@ -273,7 +266,7 @@ private void maybeFlush() throws IOException private void flush() throws IOException { coreStateMachines.flush(); - sessionStorage.persistStoreData( sessionState ); + sessionTracker.flush(); lastFlushedStorage.persistStoreData( lastApplied ); lastFlushed = lastApplied; } @@ -283,12 +276,12 @@ public synchronized void start() throws IOException, InterruptedException { lastFlushed = lastApplied = lastFlushedStorage.getInitialState(); log.info( format( "Restoring last applied index to %d", lastApplied ) ); - sessionState = sessionStorage.getInitialState(); + sessionTracker.start(); /* Considering the order in which state is flushed, the state machines will * always be furthest ahead and indicate the furthest possible state to * which we must replay to reach a consistent state. */ - long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionState.logIndex() ); + long lastPossiblyApplying = max( coreStateMachines.getLastAppliedIndex(), sessionTracker.getLastAppliedIndex() ); if ( lastPossiblyApplying > lastApplied ) { @@ -314,7 +307,7 @@ public synchronized CoreSnapshot snapshot() throws IOException, InterruptedExcep CoreSnapshot coreSnapshot = new CoreSnapshot( prevIndex, prevTerm ); coreStateMachines.addSnapshots( coreSnapshot ); - coreSnapshot.add( CoreStateType.SESSION_TRACKER, sessionState.newInstance() ); + sessionTracker.addSnapshots( coreSnapshot ); return coreSnapshot; } @@ -337,7 +330,7 @@ synchronized void installSnapshot( CoreSnapshot coreSnapshot ) this.lastApplied = this.lastFlushed = snapshotPrevIndex; log.info( format( "Skipping lastApplied index forward to %d", snapshotPrevIndex ) ); - sessionState = coreSnapshot.get( CoreStateType.SESSION_TRACKER ); + sessionTracker.installSnapshots( coreSnapshot ); } @Override diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java index 4b7c4174b731a..94fb0efc59be1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java @@ -22,9 +22,9 @@ import java.io.IOException; import java.util.function.Consumer; +import org.neo4j.coreedge.SnapFlushable; import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType; -import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequest; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenRequest; @@ -40,7 +40,7 @@ import static java.lang.Math.max; -public class CoreStateMachines +public class CoreStateMachines implements SnapFlushable { private final ReplicatedTransactionStateMachine replicatedTxStateMachine; @@ -84,6 +84,21 @@ CommandDispatcher commandDispatcher() return currentBatch; } + @Override + public long getLastAppliedIndex() + { + long lastAppliedTxIndex = replicatedTxStateMachine.lastAppliedIndex(); + assert lastAppliedTxIndex == labelTokenStateMachine.lastAppliedIndex(); + assert lastAppliedTxIndex == relationshipTypeTokenStateMachine.lastAppliedIndex(); + assert lastAppliedTxIndex == propertyKeyTokenStateMachine.lastAppliedIndex(); + + long lastAppliedLockTokenIndex = replicatedLockTokenStateMachine.lastAppliedIndex(); + long lastAppliedIdAllocationIndex = idAllocationStateMachine.lastAppliedIndex(); + + return max( max( lastAppliedLockTokenIndex, lastAppliedIdAllocationIndex ), lastAppliedTxIndex ); + } + + @Override public void flush() throws IOException { assert !runningBatch; @@ -98,7 +113,8 @@ public void flush() throws IOException idAllocationStateMachine.flush(); } - void addSnapshots( CoreSnapshot coreSnapshot ) + @Override + public void addSnapshots( CoreSnapshot coreSnapshot ) { assert !runningBatch; @@ -107,7 +123,8 @@ void addSnapshots( CoreSnapshot coreSnapshot ) // transactions and tokens live in the store } - void installSnapshots( CoreSnapshot coreSnapshot ) + @Override + public void installSnapshots( CoreSnapshot coreSnapshot ) { assert !runningBatch; @@ -169,7 +186,6 @@ public void dispatch( ReplicatedLockTokenRequest lockRequest, long commandIndex, replicatedTxStateMachine.ensuredApplied(); replicatedLockTokenStateMachine.applyCommand( lockRequest, commandIndex, callback ); } - @Override public void close() { @@ -177,17 +193,4 @@ public void close() replicatedTxStateMachine.ensuredApplied(); } } - - long getLastAppliedIndex() - { - long lastAppliedTxIndex = replicatedTxStateMachine.lastAppliedIndex(); - assert lastAppliedTxIndex == labelTokenStateMachine.lastAppliedIndex(); - assert lastAppliedTxIndex == relationshipTypeTokenStateMachine.lastAppliedIndex(); - assert lastAppliedTxIndex == propertyKeyTokenStateMachine.lastAppliedIndex(); - - long lastAppliedLockTokenIndex = replicatedLockTokenStateMachine.lastAppliedIndex(); - long lastAppliedIdAllocationIndex = idAllocationStateMachine.lastAppliedIndex(); - - return max( max( lastAppliedLockTokenIndex, lastAppliedIdAllocationIndex ), lastAppliedTxIndex ); - } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index b52e841fef4b7..22b48bde1190f 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -28,6 +28,7 @@ import java.util.function.Supplier; import org.neo4j.coreedge.CoreStateMachinesModule; +import org.neo4j.coreedge.ReplicationModule; import org.neo4j.coreedge.catchup.CatchupServer; import org.neo4j.coreedge.catchup.CheckpointerSupplier; import org.neo4j.coreedge.catchup.DataSourceSupplier; @@ -58,12 +59,6 @@ import org.neo4j.coreedge.raft.net.Outbound; import org.neo4j.coreedge.raft.net.RaftChannelInitializer; import org.neo4j.coreedge.raft.net.RaftOutbound; -import org.neo4j.coreedge.raft.replication.ProgressTrackerImpl; -import org.neo4j.coreedge.raft.replication.RaftReplicator; -import org.neo4j.coreedge.raft.replication.session.GlobalSession; -import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState; -import org.neo4j.coreedge.raft.replication.session.LocalSessionPool; -import org.neo4j.coreedge.raft.replication.tx.ExponentialBackoffStrategy; import org.neo4j.coreedge.raft.roles.Role; import org.neo4j.coreedge.raft.state.CoreState; import org.neo4j.coreedge.raft.state.CoreStateApplier; @@ -119,8 +114,6 @@ import org.neo4j.udc.UsageData; import static java.time.Clock.systemUTC; -import static java.util.concurrent.TimeUnit.SECONDS; - import static org.neo4j.coreedge.server.core.RoleProcedure.CoreOrEdge.CORE; import static org.neo4j.kernel.impl.util.JobScheduler.SchedulingStrategy.NEW_THREAD; @@ -158,8 +151,7 @@ public void registerProcedures( Procedures procedures ) } } - EnterpriseCoreEditionModule( final PlatformModule platformModule, - DiscoveryServiceFactory discoveryServiceFactory ) + EnterpriseCoreEditionModule( final PlatformModule platformModule, DiscoveryServiceFactory discoveryServiceFactory ) { ioLimiter = new ConfigurableIOLimiter( platformModule.config ); @@ -177,7 +169,6 @@ public void registerProcedures( Procedures procedures ) CoreMember myself; StateStorage lastFlushedStorage; - StateStorage sessionTrackerStorage; try { @@ -196,13 +187,6 @@ public void registerProcedures( Procedures procedures ) new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "last-flushed-state" ), "last-flushed", new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ), databaseHealthSupplier, logProvider ) ); - - sessionTrackerStorage = life.add( new DurableStateStorage<>( fileSystem, - new File( clusterStateDirectory, "session-tracker-state" ), "session-tracker", - new GlobalSessionTrackerState.Marshal( new CoreMemberMarshal() ), - config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), databaseHealthSupplier, - logProvider ) ); - } catch ( IOException e ) { @@ -259,9 +243,6 @@ public void registerProcedures( Procedures procedures ) new StoreCopyClient( coreToCoreClient ), new TxPullClient( coreToCoreClient ), new TransactionLogCatchUpFactory() ); - GlobalSession myGlobalSession = new GlobalSession( UUID.randomUUID(), myself ); - LocalSessionPool sessionPool = new LocalSessionPool( myGlobalSession ); - ProgressTrackerImpl progressTracker = new ProgressTrackerImpl( myGlobalSession ); RaftOutbound raftOutbound = new RaftOutbound( discoveryService, senderService, localDatabase, logProvider, logThresholdMillis ); Outbound loggingOutbound = new LoggingOutbound<>( @@ -289,14 +270,13 @@ public void registerProcedures( Procedures procedures ) dependencies.satisfyDependency( consensusModule.raftInstance() ); - RaftReplicator replicator = - new RaftReplicator( consensusModule.raftInstance(), myself, - loggingOutbound, - sessionPool, progressTracker, - new ExponentialBackoffStrategy( 10, SECONDS ) ); + ReplicationModule replicationModule = new ReplicationModule( myself, platformModule, config, consensusModule, + loggingOutbound, clusterStateDirectory, + fileSystem, databaseHealthSupplier, logProvider ); coreStateMachinesModule = new CoreStateMachinesModule( myself, platformModule, clusterStateDirectory, - databaseHealthSupplier, config, replicator, consensusModule.raftInstance(), dependencies, localDatabase ); + databaseHealthSupplier, config, replicationModule.getReplicator(), consensusModule.raftInstance(), + dependencies, localDatabase ); this.idGeneratorFactory = coreStateMachinesModule.idGeneratorFactory; this.idTypeConfigurationProvider = coreStateMachinesModule.idTypeConfigurationProvider; @@ -306,11 +286,14 @@ public void registerProcedures( Procedures procedures ) this.lockManager = coreStateMachinesModule.lockManager; this.commitProcessFactory = coreStateMachinesModule.commitProcessFactory; - CoreState coreState = dependencies.satisfyDependency( new CoreState( coreStateMachinesModule.coreStateMachines, - consensusModule.raftLog(), config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), - config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), - databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, - sessionTrackerStorage, someoneElse, coreStateApplier, downloader, inFlightMap, platformModule.monitors ) ); + CoreState coreState = new CoreState( coreStateMachinesModule.coreStateMachines, consensusModule.raftLog(), + config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), + config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), databaseHealthSupplier, + logProvider, replicationModule.getProgressTracker(), lastFlushedStorage, + replicationModule.getSessionTracker(), someoneElse, coreStateApplier, downloader, inFlightMap, + platformModule.monitors ); + + dependencies.satisfyDependency( coreState ); life.add( new PruningScheduler( coreState, platformModule.jobScheduler, config.get( CoreEdgeClusterSettings.raft_log_pruning_frequency ) ) ); @@ -330,22 +313,7 @@ public void registerProcedures( Procedures procedures ) dependencies.satisfyDependency( createKernelData( fileSystem, platformModule.pageCache, storeDir, config, graphDatabaseFacade, life ) ); - life.add( dependencies.satisfyDependency( createAuthManager( config, logging ) ) ); - - headerInformationFactory = createHeaderInformationFactory(); - - schemaWriteGuard = createSchemaWriteGuard(); - - transactionStartTimeout = config.get( GraphDatabaseSettings.transaction_start_timeout ); - - constraintSemantics = new EnterpriseConstraintSemantics(); - - coreAPIAvailabilityGuard = - new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); - - registerRecovery( platformModule.databaseInfo, life, dependencies ); - - publishEditionInfo( dependencies.resolveDependency( UsageData.class ), platformModule.databaseInfo, config ); + editionInvariants( platformModule, dependencies, config, logging, life ); this.lockManager = dependencies.satisfyDependency( lockManager ); @@ -364,6 +332,27 @@ public void registerProcedures( Procedures procedures ) dependencies.satisfyDependency( createSessionTracker() ); } + private void editionInvariants( PlatformModule platformModule, Dependencies dependencies, Config config, + LogService logging, LifeSupport life ) + { + life.add( dependencies.satisfyDependency( createAuthManager( config, logging ) ) ); + + headerInformationFactory = createHeaderInformationFactory(); + + schemaWriteGuard = createSchemaWriteGuard(); + + transactionStartTimeout = config.get( GraphDatabaseSettings.transaction_start_timeout ); + + constraintSemantics = new EnterpriseConstraintSemantics(); + + coreAPIAvailabilityGuard = + new CoreAPIAvailabilityGuard( platformModule.availabilityGuard, transactionStartTimeout ); + + registerRecovery( platformModule.databaseInfo, life, dependencies ); + + publishEditionInfo( dependencies.resolveDependency( UsageData.class ), platformModule.databaseInfo, config ); + } + public boolean isLeader() { return consensusModule.raftInstance().currentRole() == Role.LEADER; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 5496095219581..8b207aef6b822 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.function.Consumer; +import org.neo4j.coreedge.SessionTracker; import org.neo4j.coreedge.raft.NewLeaderBarrier; import org.neo4j.coreedge.raft.log.InMemoryRaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; @@ -67,8 +68,8 @@ public class CoreStateTest private final InMemoryRaftLog raftLog = spy( new InMemoryRaftLog() ); private final InMemoryStateStorage lastFlushedStorage = new InMemoryStateStorage<>( -1L ); - private final InMemoryStateStorage sessionStorage = - new InMemoryStateStorage<>( new GlobalSessionTrackerState() ); + private final SessionTracker sessionStorage = new SessionTracker( + new InMemoryStateStorage<>( new GlobalSessionTrackerState() ) ); private final DatabaseHealth dbHealth = new DatabaseHealth( mock( DatabasePanicEventGenerator.class ), NullLogProvider.getInstance().getLog( getClass() ) ); @@ -367,7 +368,6 @@ public void shouldFailWhenCacheAndLogMiss() throws Throwable public void shouldIncreaseLastAppliedForStateMachineCommands() throws Exception { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -385,7 +385,6 @@ public void shouldIncreaseLastAppliedForStateMachineCommands() throws Exception public void shouldIncreaseLastAppliedForOtherCommands() throws Exception { // given - coreState.setStateMachine( coreStateMachines ); coreState.start(); // when