diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java index da20d38a92f88..7dbe0cf50c112 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java @@ -45,12 +45,12 @@ public class ReplicatedTransactionStateMachine implements StateMachine { - private final GlobalSessionTrackerState sessionTracker; + private final GlobalSessionTrackerState sessionTrackerState; private final GlobalSession myGlobalSession; private final LockTokenManager lockTokenManager; private final TransactionCommitProcess commitProcess; private final CommittingTransactions transactionFutures; - private final StateStorage> storage; + private final StateStorage> sessionTrackerStorage; private final Log log; private long lastCommittedIndex = -1; @@ -59,15 +59,15 @@ public ReplicatedTransactionStateMachine( TransactionCommitProcess commitProcess GlobalSession myGlobalSession, LockTokenManager lockTokenManager, CommittingTransactions transactionFutures, - StateStorage> storage, + StateStorage> sessionTrackerStorage, LogProvider logProvider ) { this.commitProcess = commitProcess; this.myGlobalSession = myGlobalSession; this.lockTokenManager = lockTokenManager; this.transactionFutures = transactionFutures; - this.storage = storage; - this.sessionTracker = storage.getInitialState(); + this.sessionTrackerStorage = sessionTrackerStorage; + this.sessionTrackerState = sessionTrackerStorage.getInitialState(); this.log = logProvider.getLog( getClass() ); } @@ -83,7 +83,7 @@ public synchronized void applyCommand( ReplicatedContent content, long logIndex @Override public void flush() throws IOException { - storage.persistStoreData( sessionTracker ); + sessionTrackerStorage.persistStoreData( sessionTrackerState ); } private void handleTransaction( ReplicatedTransaction replicatedTx, long logIndex ) @@ -107,11 +107,11 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long try { - /* - * At this point, we need to check if the tx exists in the log. If it does, it is ok to skip it. However, we - * may still need to persist the session state (as we may crashed in between), which happens outside this - * if check. - */ + /* + * At this point, we need to check if the tx exists in the log. If it does, it is ok to skip it. However, we + * may still need to persist the session state (as we may crashed in between), which happens outside this + * if check. + */ if ( logIndex <= lastCommittedIndex ) { log.info( "Ignoring transaction at log index %d since already committed up to %d", logIndex, @@ -173,18 +173,19 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long " restarted once the underlying cause has been addressed.", e ); } } - /* - * Finally, we need to check, in an idempotent fashion, if the session state needs to be persisted. - */ - if ( sessionTracker.logIndex() < logIndex ) + + /* + * Finally, we need to check, in an idempotent fashion, if the session state needs to be persisted. + */ + if ( sessionTrackerState.logIndex() < logIndex ) { - sessionTracker.update( replicatedTx.globalSession(), replicatedTx.localOperationId(), logIndex ); + sessionTrackerState.update( replicatedTx.globalSession(), replicatedTx.localOperationId(), logIndex ); sessionUpdated = true; } else { log.info( format( "Rejecting log index %d since the session tracker is already at log index %d", - logIndex, sessionTracker.logIndex() ) ); + logIndex, sessionTrackerState.logIndex() ) ); } } finally @@ -199,7 +200,7 @@ private void handleTransaction( ReplicatedTransaction replicatedTx, long private boolean operationValid( ReplicatedTransaction replicatedTx ) { - return sessionTracker.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() ); + return sessionTrackerState.validateOperation( replicatedTx.globalSession(), replicatedTx.localOperationId() ); } public void setLastCommittedIndex( long lastCommittedIndex ) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java index 601918a609515..a66c6a5224ed8 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java @@ -76,7 +76,6 @@ public synchronized void notifyCommitted() { this.commitIndex = commitIndex; executor.execute( () -> { - try { applyUpTo( commitIndex ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index 20bcb44b35cb9..9fe4a59f12530 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -278,10 +278,10 @@ fileSystem, new File( clusterStateDirectory, "lock-token-state" ), "lock-token", new ReplicatedLockTokenStateMachine<>( lockTokenState ); stateMachines.add( replicatedLockTokenStateMachine ); - StateStorage> onDiskGlobalSessionTrackerState; + StateStorage> sessionTrackerStorage; try { - onDiskGlobalSessionTrackerState = life.add( new DurableStateStorage<>( + sessionTrackerStorage = life.add( new DurableStateStorage<>( fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session-tracker", new GlobalSessionTrackerState.Marshal<>( new CoreMemberMarshal() ), config.get( CoreEdgeClusterSettings.global_session_tracker_state_size ), @@ -295,7 +295,7 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session commitProcessFactory = createCommitProcessFactory( replicator, localSessionPool, replicatedLockTokenStateMachine, - dependencies, logging, platformModule.monitors, onDiskGlobalSessionTrackerState, stateMachines ); + dependencies, logging, platformModule.monitors, sessionTrackerStorage, stateMachines ); final StateStorage idAllocationState; try @@ -452,7 +452,7 @@ public static CommitProcessFactory createCommitProcessFactory( final Replicator replicator, final LocalSessionPool localSessionPool, final LockTokenManager currentReplicatedLockState, final Dependencies dependencies, final LogService logging, Monitors monitors, - StateStorage> globalSessionTrackerState, + StateStorage> sessionTrackerStorage, StateMachines stateMachines ) { return ( appender, applier, config ) -> { @@ -464,7 +464,7 @@ public static CommitProcessFactory createCommitProcessFactory( ReplicatedTransactionStateMachine replicatedTxStateMachine = new ReplicatedTransactionStateMachine<>( localCommit, localSessionPool.getGlobalSession(), currentReplicatedLockState, - committingTransactions, globalSessionTrackerState, logging.getInternalLogProvider() ); + committingTransactions, sessionTrackerStorage, logging.getInternalLogProvider() ); dependencies.satisfyDependencies( replicatedTxStateMachine );