diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java index e4d2e975621df..cc26bc73bf20a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachine.java @@ -76,6 +76,12 @@ public void flush() throws IOException storage.persistStoreData( state ); } + @Override + public long lastAppliedIndex() + { + return storage.getInitialState().logIndex(); + } + public synchronized IdAllocationState snapshot() { return state.newInstance(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java index e44ee1f89a960..6f9c60be9175d 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/token/ReplicatedTokenStateMachine.java @@ -55,7 +55,7 @@ public class ReplicatedTokenStateMachine implements StateMa private final TokenFactory tokenFactory; private final Log log; - private long lastCommittedIndex = Long.MAX_VALUE; + private long lastCommittedIndex = -1; public ReplicatedTokenStateMachine( TokenRegistry tokenRegistry, TokenFactory tokenFactory, LogProvider logProvider ) @@ -139,4 +139,10 @@ public synchronized void flush() throws IOException { // already implicitly flushed to the store } + + @Override + public long lastAppliedIndex() + { + return lastCommittedIndex; + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java index 8aeec88517567..66cdcc09760ae 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachine.java @@ -66,12 +66,6 @@ public synchronized void installCommitProcess( TransactionCommitProcess commitPr commitProcess.commit( txs, CommitEvent.NULL, TransactionApplicationMode.EXTERNAL ) ); } - @Override - public void flush() throws IOException - { - // implicitly flushed - } - @Override public synchronized void applyCommand( ReplicatedTransaction replicatedTx, long commandIndex, Consumer callback ) { @@ -110,6 +104,18 @@ public synchronized void applyCommand( ReplicatedTransaction replicatedTx, long } } + @Override + public void flush() throws IOException + { + // implicitly flushed + } + + @Override + public long lastAppliedIndex() + { + return lastCommittedIndex; + } + public synchronized void ensuredApplied() { try diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java index e11d6078f0bdc..506bf3eaf4a0e 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreState.java @@ -55,7 +55,6 @@ public class CoreState extends LifecycleAdapter implements RaftStateMachine lastFlushedStorage; private final int flushEvery; private final ProgressTracker progressTracker; - private final StateStorage lastApplyingStorage; private final StateStorage> sessionStorage; private final Supplier dbHealth; private final InFlightMap inFlightMap; @@ -80,7 +79,6 @@ public CoreState( LogProvider logProvider, ProgressTracker progressTracker, StateStorage lastFlushedStorage, - StateStorage lastApplyingStorage, StateStorage> sessionStorage, CoreStateApplier applier, CoreStateDownloader downloader, @@ -91,7 +89,6 @@ public CoreState( this.lastFlushedStorage = lastFlushedStorage; this.flushEvery = flushEvery; this.progressTracker = progressTracker; - this.lastApplyingStorage = lastApplyingStorage; this.sessionStorage = sessionStorage; this.applier = applier; this.downloader = downloader; @@ -130,7 +127,6 @@ private void submitApplyJob( long lastToApply ) applier.submit( ( status ) -> () -> { try ( InFlightLogEntrySupplier logEntrySupplier = new InFlightLogEntrySupplier( raftLog, inFlightMap ) ) { - lastApplyingStorage.persistStoreData( lastToApply ); for ( long logIndex = lastApplied + 1; !status.isCancelled() && logIndex <= lastToApply; logIndex++ ) { RaftLogEntry entry = logEntrySupplier.get( logIndex ); @@ -284,7 +280,7 @@ public synchronized void start() throws IOException, InterruptedException log.info( format( "Restoring last applied index to %d", lastApplied ) ); sessionState = sessionStorage.getInitialState(); - submitApplyJob( lastApplyingStorage.getInitialState() ); + submitApplyJob( coreStateMachines.getApplyingIndex() ); applier.sync( false ); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java index 218d29a93ca44..e75603481d704 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/CoreStateMachines.java @@ -25,6 +25,7 @@ import org.neo4j.coreedge.catchup.storecopy.LocalDatabase; import org.neo4j.coreedge.catchup.storecopy.core.CoreStateType; import org.neo4j.coreedge.raft.log.MonitoredRaftLog; +import org.neo4j.coreedge.raft.outcome.ShipCommand; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationRequest; import org.neo4j.coreedge.raft.replication.id.ReplicatedIdAllocationStateMachine; import org.neo4j.coreedge.raft.replication.token.ReplicatedTokenRequest; @@ -39,6 +40,8 @@ import org.neo4j.kernel.impl.core.RelationshipTypeToken; import org.neo4j.storageengine.api.Token; +import static java.lang.Math.max; + public class CoreStateMachines { private final ReplicatedTransactionStateMachine replicatedTxStateMachine; @@ -197,4 +200,17 @@ public void close() replicatedTxStateMachine.ensuredApplied(); } } + + public long getApplyingIndex() + { + long lastAppliedTxIndex = replicatedTxStateMachine.lastAppliedIndex(); + assert lastAppliedTxIndex == labelTokenStateMachine.lastAppliedIndex(); + assert lastAppliedTxIndex == relationshipTypeTokenStateMachine.lastAppliedIndex(); + assert lastAppliedTxIndex == propertyKeyTokenStateMachine.lastAppliedIndex(); + + long lastAppliedLockTokenIndex = replicatedLockTokenStateMachine.lastAppliedIndex(); + long lastAppliedIdAllocationIndex = idAllocationStateMachine.lastAppliedIndex(); + + return max( max( lastAppliedLockTokenIndex, lastAppliedIdAllocationIndex ), lastAppliedIdAllocationIndex ); + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachine.java index 2e475a9a0ef59..5fc1ffcf2240b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachine.java @@ -39,4 +39,10 @@ public interface StateMachine * @throws IOException */ void flush() throws IOException; + + /** + * Return the index of the last applied command by this state machine. + * @return the last applied index for this state machine + */ + long lastAppliedIndex(); } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java index e3c56a654f4b8..bc4d51ca8bf76 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java @@ -334,11 +334,6 @@ fileSystem, new File( clusterStateDirectory, "last-flushed-state" ), "last-flush new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ), databaseHealthSupplier, logProvider ) ); - DurableStateStorage lastApplyingStorage = life.add( new DurableStateStorage<>( - fileSystem, new File( clusterStateDirectory, "last-applying-state" ), "last-applying", - new LongIndexMarshal(), config.get( CoreEdgeClusterSettings.last_flushed_state_size ), - databaseHealthSupplier, logProvider ) ); - StateStorage> sessionTrackerStorage; try { @@ -363,7 +358,7 @@ fileSystem, new File( clusterStateDirectory, "session-tracker-state" ), "session coreState = new CoreState( raftLog, config.get( CoreEdgeClusterSettings.state_machine_apply_max_batch_size ), config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ), - databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, lastApplyingStorage, + databaseHealthSupplier, logProvider, progressTracker, lastFlushedStorage, sessionTrackerStorage, applier, downloader, inFlightMap, platformModule.monitors ); raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, raftLog, @@ -565,7 +560,6 @@ private RaftLog createRaftLog( case SEGMENTED: { long rotateAtSize = config.get( CoreEdgeClusterSettings.raft_log_rotation_size ); - int metaDataCacheSize = config.get( CoreEdgeClusterSettings.raft_log_meta_data_cache_size ); String pruningStrategyConfig = config.get( CoreEdgeClusterSettings.raft_log_pruning_strategy ); int entryCacheSize = config.get( CoreEdgeClusterSettings.raft_log_entry_cache_size ); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachine.java index 2bb0347ef252a..05edd3edaaf4b 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachine.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachine.java @@ -66,6 +66,12 @@ public synchronized void flush() throws IOException storage.persistStoreData( state ); } + @Override + public long lastAppliedIndex() + { + return storage.getInitialState().ordinal(); + } + public synchronized ReplicatedLockTokenState snapshot() { return state.newInstance(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java index 04efe86ca5607..735552f9ae164 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/CoreStateTest.java @@ -39,7 +39,6 @@ import org.neo4j.coreedge.raft.replication.tx.CoreReplicatedContent; import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransaction; import org.neo4j.coreedge.server.CoreMember; -import org.neo4j.coreedge.server.edge.CoreServerSelectionStrategy; import org.neo4j.kernel.impl.core.DatabasePanicEventGenerator; import org.neo4j.kernel.internal.DatabaseHealth; import org.neo4j.kernel.monitoring.Monitors; @@ -66,7 +65,6 @@ public class CoreStateTest private final InMemoryRaftLog raftLog = spy( new InMemoryRaftLog() ); private final InMemoryStateStorage lastFlushedStorage = new InMemoryStateStorage<>( -1L ); - private final InMemoryStateStorage lastApplyingStorage = new InMemoryStateStorage<>( -1L ); private final InMemoryStateStorage> sessionStorage = new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ); @@ -82,15 +80,16 @@ public class CoreStateTest private final Monitors monitors = new Monitors(); private final CoreState coreState = new CoreState( raftLog, batchSize, flushEvery, () -> dbHealth, NullLogProvider.getInstance(), new ProgressTrackerImpl( globalSession ), lastFlushedStorage, - lastApplyingStorage, sessionStorage, applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); + sessionStorage, applier, mock( CoreStateDownloader.class ), inFlightMap, monitors ); private ReplicatedTransaction nullTx = new ReplicatedTransaction( null ); private final CommandDispatcher commandDispatcher = mock( CommandDispatcher.class ); - private final CoreStateMachines txStateMachine = mock( CoreStateMachines.class ); + private final CoreStateMachines coreStateMachines = mock( CoreStateMachines.class ); { - when( txStateMachine.commandDispatcher() ).thenReturn( commandDispatcher ); + when( coreStateMachines.commandDispatcher() ).thenReturn( commandDispatcher ); + when( coreStateMachines.getApplyingIndex() ).thenReturn( -1L ); } private int sequenceNumber = 0; @@ -105,10 +104,10 @@ public void shouldApplyCommittedCommand() throws Throwable // given RaftLogCommitIndexMonitor listener = mock( RaftLogCommitIndexMonitor.class ); monitors.addMonitorListener( listener ); - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); - InOrder inOrder = inOrder( txStateMachine, commandDispatcher ); + InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); // when raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -118,7 +117,7 @@ public void shouldApplyCommittedCommand() throws Throwable applier.sync( false ); // then - inOrder.verify( txStateMachine ).commandDispatcher(); + inOrder.verify( coreStateMachines ).commandDispatcher(); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 0L ), anyCallback() ); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 2L ), anyCallback() ); @@ -131,7 +130,7 @@ public void shouldApplyCommittedCommand() throws Throwable public void shouldNotApplyUncommittedCommands() throws Throwable { // given - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -148,7 +147,7 @@ public void shouldNotApplyUncommittedCommands() throws Throwable public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex() throws Throwable { // given - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); // when @@ -157,10 +156,10 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex coreState.notifyCommitted( 1 ); applier.sync( false ); - InOrder inOrder = inOrder( txStateMachine, commandDispatcher ); + InOrder inOrder = inOrder( coreStateMachines, commandDispatcher ); // then - inOrder.verify( txStateMachine ).commandDispatcher(); + inOrder.verify( coreStateMachines ).commandDispatcher(); inOrder.verify( commandDispatcher ).dispatch( eq( nullTx ), eq( 1L ), anyCallback() ); inOrder.verify( commandDispatcher ).close(); } @@ -171,7 +170,7 @@ public void entriesThatAreNotStateMachineCommandsShouldStillIncreaseCommandIndex public void shouldPeriodicallyFlushState() throws Throwable { // given - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); int interactions = flushEvery * 5; @@ -185,7 +184,7 @@ public void shouldPeriodicallyFlushState() throws Throwable applier.sync( false ); // then - verify( txStateMachine, times( interactions / batchSize ) ).flush(); + verify( coreStateMachines, times( interactions / batchSize ) ).flush(); assertEquals( interactions - ( interactions % batchSize) - 1, (long) lastFlushedStorage.getInitialState() ); } @@ -195,6 +194,7 @@ public void shouldPanicIfUnableToApply() throws Throwable // given doThrow( IllegalStateException.class ).when( commandDispatcher ) .dispatch( any( ReplicatedTransaction.class ), anyLong(), anyCallback() ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); raftLog.append( new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -214,7 +214,7 @@ public void shouldApplyToLogFromCache() throws Throwable //given n things to apply in the cache, check that they are actually applied. // given - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 1, operation( nullTx ) ) ); @@ -232,9 +232,7 @@ public void shouldApplyToLogFromCache() throws Throwable public void cacheEntryShouldBePurgedWhenApplied() throws Throwable { //given a cache in submitApplyJob, the contents of the cache should only contain unapplied "things" - - // given - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) ); @@ -256,7 +254,7 @@ public void shouldFallbackToLogCursorOnCacheMiss() throws Throwable { // if the cache does not contain all things to be applied, make sure we fall back to the log // should only happen in recovery, otherwise this is probably a bug. - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); //given cache with missing entry @@ -292,7 +290,7 @@ public void shouldFailWhenCacheAndLogMiss() throws Throwable { //When an entry is not in the log, we must fail. - coreState.setStateMachine( txStateMachine ); + coreState.setStateMachine( coreStateMachines ); coreState.start(); inFlightMap.register( 0L, new RaftLogEntry( 0, operation( nullTx ) ) );