diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/ConsensusListener.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/ConsensusListener.java
new file mode 100644
index 0000000000000..1c898c9a72af4
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/ConsensusListener.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft;
+
+/**
+ * A consensus listener is notified when a particular index in the consensus log
+ * is considered committed.
+ */
+public interface ConsensusListener
+{
+ /**
+ * Called when the highest committed index increases.
+ */
+ void notifyCommitted();
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
index bea3391c09c92..e8629bce6af40 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java
@@ -41,13 +41,10 @@
import org.neo4j.coreedge.raft.outcome.CommitCommand;
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
-import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.roles.Role;
-import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
-import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
@@ -85,7 +82,6 @@
public class RaftInstance implements LeaderLocator, Inbound.MessageHandler, CoreMetaData
{
private final LeaderNotFoundMonitor leaderNotFoundMonitor;
- private int flushAfter;
public enum Timeouts implements RenewableTimeoutService.TimeoutName
{
@@ -101,7 +97,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager membershipManager;
- private final LastAppliedTrackingStateMachine stateMachine;
+ private final ConsensusListener consensusListener;
private final long electionTimeout;
private final long leaderWaitTimeout;
@@ -116,18 +112,18 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RaftLogShippingManager logShipping;
public RaftInstance( MEMBER myself, StateStorage termStorage,
- StateStorage> voteStorage, RaftLog entryLog,
- LastAppliedTrackingStateMachine stateMachine, long electionTimeout, long heartbeatInterval,
- RenewableTimeoutService renewableTimeoutService,
- final Inbound inbound, final Outbound outbound, long leaderWaitTimeout,
- LogProvider logProvider, RaftMembershipManager membershipManager,
- RaftLogShippingManager logShipping,
- Supplier databaseHealthSupplier,
- Monitors monitors, int flushAfter )
+ StateStorage> voteStorage, RaftLog entryLog,
+ ConsensusListener consensusListener, long electionTimeout, long heartbeatInterval,
+ RenewableTimeoutService renewableTimeoutService,
+ final Inbound inbound, final Outbound outbound, long leaderWaitTimeout,
+ LogProvider logProvider, RaftMembershipManager membershipManager,
+ RaftLogShippingManager logShipping,
+ Supplier databaseHealthSupplier,
+ Monitors monitors )
{
this.myself = myself;
this.entryLog = entryLog;
- this.stateMachine = stateMachine;
+ this.consensusListener = consensusListener;
this.electionTimeout = electionTimeout;
this.heartbeatInterval = heartbeatInterval;
@@ -137,7 +133,6 @@ public RaftInstance( MEMBER myself, StateStorage termStorage,
this.outbound = outbound;
this.logShipping = logShipping;
this.databaseHealthSupplier = databaseHealthSupplier;
- this.flushAfter = flushAfter;
this.log = logProvider.getLog( getClass() );
this.membershipManager = membershipManager;
@@ -262,16 +257,8 @@ private void handleOutcome( Outcome outcome ) throws RaftStorageExceptio
raftState.update( outcome );
membershipManager.processLog( outcome.getLogCommands() );
+ consensusListener.notifyCommitted();
- for ( long index = stateMachine.lastApplied() + 1; index <= raftState.entryLog().commitIndex(); index++ )
- {
- ReplicatedContent content = raftState.entryLog().readEntryContent( index );
- stateMachine.applyCommand( content, index );
- if ( index % this.flushAfter == 0 )
- {
- stateMachine.flush();
- }
- }
volatileLeader.set( outcome.getLeader() );
}
@@ -420,4 +407,4 @@ public Set replicationMembers()
{
return membershipManager.replicationMembers();
}
-}
\ No newline at end of file
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LastAppliedState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LastAppliedState.java
new file mode 100644
index 0000000000000..870548ae43c24
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LastAppliedState.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.state;
+
+import java.io.IOException;
+
+import org.neo4j.storageengine.api.ReadPastEndException;
+import org.neo4j.storageengine.api.ReadableChannel;
+import org.neo4j.storageengine.api.WritableChannel;
+
+public class LastAppliedState
+{
+ private final long lastApplied;
+
+ public LastAppliedState( long lastApplied )
+ {
+ this.lastApplied = lastApplied;
+ }
+
+ public long get()
+ {
+ return lastApplied;
+ }
+
+ public static class Marshal implements StateMarshal
+ {
+ @Override
+ public LastAppliedState startState()
+ {
+ return new LastAppliedState( -1 );
+ }
+
+ @Override
+ public long ordinal( LastAppliedState lastAppliedState )
+ {
+ return lastAppliedState.get();
+ }
+
+ @Override
+ public void marshal( LastAppliedState lastAppliedState, WritableChannel channel ) throws IOException
+ {
+ channel.putLong( lastAppliedState.get() );
+ }
+
+ @Override
+ public LastAppliedState unmarshal( ReadableChannel source ) throws IOException
+ {
+ try
+ {
+ return new LastAppliedState( source.getLong() );
+ }
+ catch( ReadPastEndException e )
+ {
+ return null;
+ }
+ }
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java
new file mode 100644
index 0000000000000..69e3d7e363679
--- /dev/null
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/StateMachineApplier.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.raft.state;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+
+import org.neo4j.coreedge.raft.ConsensusListener;
+import org.neo4j.coreedge.raft.log.RaftLogEntry;
+import org.neo4j.coreedge.raft.log.RaftStorageException;
+import org.neo4j.coreedge.raft.log.ReadableRaftLog;
+import org.neo4j.kernel.internal.DatabaseHealth;
+import org.neo4j.kernel.lifecycle.LifecycleAdapter;
+import org.neo4j.logging.Log;
+import org.neo4j.logging.LogProvider;
+
+import static java.lang.System.currentTimeMillis;
+
+public class StateMachineApplier extends LifecycleAdapter implements ConsensusListener
+{
+ public static final long NOTHING_APPLIED = -1;
+
+ private final StateMachine stateMachine;
+ private final ReadableRaftLog raftLog;
+ private final StateStorage lastAppliedStorage;
+ private final int flushEvery;
+ private final Supplier dbHealth;
+ private final Log log;
+ private long lastApplied = NOTHING_APPLIED;
+
+ private Executor executor;
+
+ private long commitIndex = NOTHING_APPLIED;
+
+ public StateMachineApplier(
+ StateMachine stateMachine,
+ ReadableRaftLog raftLog,
+ StateStorage lastAppliedStorage,
+ Executor executor,
+ int flushEvery,
+ Supplier dbHealth,
+ LogProvider logProvider )
+ {
+ this.stateMachine = stateMachine;
+ this.raftLog = raftLog;
+ this.lastAppliedStorage = lastAppliedStorage;
+ this.flushEvery = flushEvery;
+ this.log = logProvider.getLog( getClass() );
+ this.dbHealth = dbHealth;
+ this.executor = executor;
+ }
+
+ @Override
+ public synchronized void notifyCommitted()
+ {
+ long commitIndex = raftLog.commitIndex();
+ if ( this.commitIndex != commitIndex )
+ {
+ this.commitIndex = commitIndex;
+ executor.execute( () -> {
+
+ try
+ {
+ applyUpTo( commitIndex );
+ }
+ catch ( Exception e )
+ {
+ log.error( "Failed to apply up to index " + commitIndex, e );
+ dbHealth.get().panic( e );
+ }
+ } );
+ }
+ }
+
+ private void applyUpTo( long commitIndex ) throws IOException, RaftStorageException
+ {
+
+ while ( lastApplied < commitIndex )
+ {
+ long indexToApply = lastApplied + 1;
+
+ RaftLogEntry logEntry = raftLog.readLogEntry( indexToApply );
+ stateMachine.applyCommand( logEntry.content(), indexToApply );
+
+ lastApplied = indexToApply;
+
+ if ( indexToApply % this.flushEvery == 0 )
+ {
+ stateMachine.flush();
+ lastAppliedStorage.persistStoreData( new LastAppliedState( lastApplied ) );
+ }
+ }
+ }
+
+ @Override
+ public synchronized void start() throws IOException, RaftStorageException
+ {
+ lastApplied = lastAppliedStorage.getInitialState().get();
+ log.info( "Replaying commands from index %d to index %d", lastApplied, raftLog.commitIndex() );
+
+ long start = currentTimeMillis();
+ applyUpTo( raftLog.commitIndex() );
+ log.info( "Replay done, took %d ms", currentTimeMillis() - start );
+ }
+}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java
index ee4a85823d487..19aeb46c91d99 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/CoreEdgeClusterSettings.java
@@ -142,12 +142,16 @@ public String toString()
setting( "core_edge.disable_middleware_logging", BOOLEAN, TRUE );
@Description("The maximum file size before the id allocation file is rotated (in unit of entries)")
- public static final Setting id_alloc_state_size = setting( "core_edge.id_alloc_state_size", INTEGER,
- "1000" );
+ public static final Setting last_applied_state_size =
+ setting( "core_edge.last_applied_state_size", INTEGER, "1000" );
+
+ @Description("The maximum file size before the id allocation file is rotated (in unit of entries)")
+ public static final Setting id_alloc_state_size =
+ setting( "core_edge.id_alloc_state_size", INTEGER, "1000" );
@Description("The maximum file size before the membership state file is rotated (in unit of entries)")
- public static final Setting raft_membership_state_size = setting( "core_edge.raft_membership_state_size",
- INTEGER, "1000" );
+ public static final Setting raft_membership_state_size =
+ setting( "core_edge.raft_membership_state_size", INTEGER, "1000" );
@Description("The maximum file size before the vote state file is rotated (in unit of entries)")
public static final Setting vote_state_size = setting( "core_edge.raft_vote_state_size", INTEGER, "1000" );
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java
index 6fd062f5a49cc..4946afae07e9c 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/CoreServerStartupProcess.java
@@ -29,6 +29,7 @@
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
+import org.neo4j.coreedge.raft.state.StateMachineApplier;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
import org.neo4j.kernel.lifecycle.LifeSupport;
@@ -41,22 +42,25 @@
public class CoreServerStartupProcess
{
- public static LifeSupport createLifeSupport( DataSourceManager dataSourceManager,
- ReplicatedIdGeneratorFactory idGeneratorFactory,
- RaftInstance raft, RaftLogReplay raftLogReplay, RaftServer raftServer,
- CatchupServer catchupServer,
- DelayedRenewableTimeoutService raftTimeoutService,
- MembershipWaiter membershipWaiter,
- long joinCatchupTimeout,
- RecoverTransactionLogState recoverTransactionLogState,
- Lifecycle tokenLife )
+ public static LifeSupport createLifeSupport(
+ DataSourceManager dataSourceManager,
+ ReplicatedIdGeneratorFactory idGeneratorFactory,
+ RaftInstance raft,
+ StateMachineApplier recoverableStateMachine,
+ RaftServer raftServer,
+ CatchupServer catchupServer,
+ DelayedRenewableTimeoutService raftTimeoutService,
+ MembershipWaiter membershipWaiter,
+ long joinCatchupTimeout,
+ RecoverTransactionLogState recoverTransactionLogState,
+ Lifecycle tokenLife )
{
LifeSupport services = new LifeSupport();
services.add( dataSourceManager );
services.add( idGeneratorFactory );
services.add( recoverTransactionLogState );
services.add( tokenLife );
- services.add( raftLogReplay );
+ services.add( recoverableStateMachine );
services.add( raftServer );
services.add( catchupServer );
services.add( raftTimeoutService );
@@ -98,5 +102,4 @@ public void start() throws Throwable
}
}
}
-
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
index 1a04b92251247..b628349163433 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/EnterpriseCoreEditionModule.java
@@ -24,9 +24,12 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.neo4j.cluster.ExecutorLifecycleAdapter;
import org.neo4j.coreedge.catchup.CatchupServer;
import org.neo4j.coreedge.catchup.CheckpointerSupplier;
import org.neo4j.coreedge.catchup.DataSourceSupplier;
@@ -34,6 +37,7 @@
import org.neo4j.coreedge.discovery.CoreDiscoveryService;
import org.neo4j.coreedge.discovery.DiscoveryServiceFactory;
import org.neo4j.coreedge.discovery.RaftDiscoveryServiceConnector;
+import org.neo4j.coreedge.raft.ConsensusListener;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
import org.neo4j.coreedge.raft.LeaderLocator;
import org.neo4j.coreedge.raft.RaftInstance;
@@ -71,7 +75,8 @@
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionStateMachine;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.DurableStateStorage;
-import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
+import org.neo4j.coreedge.raft.state.LastAppliedState;
+import org.neo4j.coreedge.raft.state.StateMachineApplier;
import org.neo4j.coreedge.raft.state.StateMachines;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
@@ -87,9 +92,9 @@
import org.neo4j.coreedge.server.ExpiryScheduler;
import org.neo4j.coreedge.server.ListenSocketAddress;
import org.neo4j.coreedge.server.SenderService;
-import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenState;
import org.neo4j.coreedge.server.core.locks.LeaderOnlyLockManager;
import org.neo4j.coreedge.server.core.locks.LockTokenManager;
+import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenState;
import org.neo4j.coreedge.server.core.locks.ReplicatedLockTokenStateMachine;
import org.neo4j.coreedge.server.logging.BetterMessageLogger;
import org.neo4j.coreedge.server.logging.MessageLogger;
@@ -218,13 +223,28 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
MonitoredRaftLog monitoredRaftLog = new MonitoredRaftLog( underlyingLog , platformModule.monitors );
StateMachines stateMachines = new StateMachines();
- LastAppliedTrackingStateMachine lastAppliedStateMachine = new LastAppliedTrackingStateMachine( stateMachines );
-
- int flushAfter = config.get( CoreEdgeClusterSettings.state_machine_flush_window_size );
+ StateMachineApplier recoverableStateMachine;
+ try
+ {
+ DurableStateStorage lastAppliedStorage = life.add( new DurableStateStorage<>(
+ fileSystem, new File( clusterStateDirectory, "last-applied-state" ), "last-applied",
+ new LastAppliedState.Marshal(), config.get( CoreEdgeClusterSettings.last_applied_state_size ),
+ databaseHealthSupplier, logProvider ) );
+ ExecutorService applyExecutor = Executors.newSingleThreadExecutor();
+ life.add( new ExecutorServiceLifecycleAdapter( applyExecutor ) );
+ recoverableStateMachine = new StateMachineApplier(
+ stateMachines, monitoredRaftLog, lastAppliedStorage, applyExecutor,
+ config.get( CoreEdgeClusterSettings.state_machine_flush_window_size ),
+ databaseHealthSupplier, logProvider );
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException( e );
+ }
raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, monitoredRaftLog,
- lastAppliedStateMachine, fileSystem, clusterStateDirectory, myself, logProvider, raftServer,
- raftTimeoutService, databaseHealthSupplier, platformModule.monitors, flushAfter );
+ recoverableStateMachine, fileSystem, clusterStateDirectory, myself, logProvider, raftServer,
+ raftTimeoutService, databaseHealthSupplier, platformModule.monitors );
dependencies.satisfyDependency( raft );
@@ -362,7 +382,7 @@ fileSystem, new File( clusterStateDirectory, "id-allocation-state" ), "id-alloca
life.add( CoreServerStartupProcess.createLifeSupport(
platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft,
- new RaftLogReplay( lastAppliedStateMachine, monitoredRaftLog, logProvider, flushAfter ), raftServer,
+ recoverableStateMachine, raftServer,
catchupServer, raftTimeoutService, membershipWaiter,
joinCatchupTimeout,
new RecoverTransactionLogState( dependencies, logProvider,
@@ -449,20 +469,20 @@ public static CommitProcessFactory createCommitProcessFactory(
}
private static RaftInstance createRaft( LifeSupport life,
- Outbound outbound,
- CoreDiscoveryService discoveryService,
- Config config,
- MessageLogger messageLogger,
- RaftLog raftLog,
- LastAppliedTrackingStateMachine stateMachines,
- FileSystemAbstraction fileSystem,
- File clusterStateDirectory,
- CoreMember myself,
- LogProvider logProvider,
- RaftServer raftServer,
- DelayedRenewableTimeoutService raftTimeoutService,
- Supplier databaseHealthSupplier,
- Monitors monitors, int flushAfter )
+ Outbound outbound,
+ CoreDiscoveryService discoveryService,
+ Config config,
+ MessageLogger messageLogger,
+ RaftLog raftLog,
+ ConsensusListener consensusListener,
+ FileSystemAbstraction fileSystem,
+ File clusterStateDirectory,
+ CoreMember myself,
+ LogProvider logProvider,
+ RaftServer raftServer,
+ DelayedRenewableTimeoutService raftTimeoutService,
+ Supplier databaseHealthSupplier,
+ Monitors monitors )
{
StateStorage termState;
try
@@ -532,10 +552,10 @@ fileSystem, new File( clusterStateDirectory, "term-state" ), "term-state",
config.get( CoreEdgeClusterSettings.log_shipping_max_lag ) );
RaftInstance raftInstance = new RaftInstance<>(
- myself, termState, voteState, raftLog, stateMachines, electionTimeout, heartbeatInterval,
+ myself, termState, voteState, raftLog, consensusListener, electionTimeout, heartbeatInterval,
raftTimeoutService, loggingRaftInbound,
new RaftOutbound( outbound ), leaderWaitTimeout, logProvider,
- raftMembershipManager, logShipping, databaseHealthSupplier, monitors, flushAfter );
+ raftMembershipManager, logShipping, databaseHealthSupplier, monitors );
life.add( new RaftDiscoveryServiceConnector( discoveryService, raftInstance ) );
@@ -566,8 +586,7 @@ private static PrintWriter raftMessagesLog( File storeDir )
protected SchemaWriteGuard createSchemaWriteGuard()
{
- return () -> {
- };
+ return () -> {};
}
protected KernelData createKernelData( FileSystemAbstraction fileSystem, PageCache pageCache, File storeDir,
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LastAppliedTrackingStateMachine.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ExecutorServiceLifecycleAdapter.java
similarity index 50%
rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LastAppliedTrackingStateMachine.java
rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ExecutorServiceLifecycleAdapter.java
index 110a1633d23c6..4ee2d5485a1e1 100644
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/LastAppliedTrackingStateMachine.java
+++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/ExecutorServiceLifecycleAdapter.java
@@ -17,41 +17,24 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-package org.neo4j.coreedge.raft.state;
+package org.neo4j.coreedge.server.core;
-import java.io.IOException;
+import java.util.concurrent.ExecutorService;
-import org.neo4j.coreedge.raft.replication.ReplicatedContent;
+import org.neo4j.kernel.lifecycle.LifecycleAdapter;
-import static java.lang.Math.max;
-
-public class LastAppliedTrackingStateMachine implements StateMachine
+public class ExecutorServiceLifecycleAdapter extends LifecycleAdapter
{
- public static final long NOTHING_APPLIED = -1;
-
- private final StateMachine stateMachine;
- private long lastApplied = NOTHING_APPLIED;
+ private final ExecutorService executorService;
- public LastAppliedTrackingStateMachine( StateMachine stateMachine )
+ public ExecutorServiceLifecycleAdapter( ExecutorService executorService )
{
- this.stateMachine = stateMachine;
+ this.executorService = executorService;
}
@Override
- public void applyCommand( ReplicatedContent content, long logIndex )
- {
- stateMachine.applyCommand( content, logIndex );
- lastApplied = max( logIndex, lastApplied );
- }
-
- @Override
- public void flush() throws IOException
- {
- stateMachine.flush();
- }
-
- public long lastApplied()
+ public void shutdown() throws Throwable
{
- return lastApplied;
+ executorService.shutdown();
}
}
diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/RaftLogReplay.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/RaftLogReplay.java
deleted file mode 100644
index a472c89173b3d..0000000000000
--- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/server/core/RaftLogReplay.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.coreedge.server.core;
-
-import org.neo4j.coreedge.raft.log.RaftLog;
-import org.neo4j.coreedge.raft.replication.ReplicatedContent;
-import org.neo4j.coreedge.raft.state.StateMachine;
-import org.neo4j.kernel.lifecycle.LifecycleAdapter;
-import org.neo4j.logging.Log;
-import org.neo4j.logging.LogProvider;
-
-import static java.lang.Math.max;
-import static java.lang.System.currentTimeMillis;
-
-public class RaftLogReplay extends LifecycleAdapter
-{
- private final StateMachine stateMachine;
- private final RaftLog raftLog;
- private final int flushAfter;
- private final Log log;
-
- public RaftLogReplay( StateMachine stateMachine, RaftLog raftLog, LogProvider logProvider, int flushAfter )
- {
- this.stateMachine = stateMachine;
- this.raftLog = raftLog;
- this.flushAfter = flushAfter;
- this.log = logProvider.getLog( getClass() );
- }
-
- @Override
- public void start() throws Throwable
- {
- long start = currentTimeMillis();
- /*
- * Since all state machines and replicated content listeners persist their state, we can skip all entries that
- * have been committed successfully.
- * However, looking at how commit() does its thing, we probably face a race with a crash in between updating
- * the commit index and having notified all replicated content listeners. We should probably invert the order
- * there. For this reason, and having the (assumed/required) idempotent property of replicated content listeners,
- * we still replay the last committed entry because, since we do one entry at a time, that may be the only one
- * that has not been applied against all listeners.
- * This change is effectively equivalent to truncating/compacting the raft log.
- */
- long index = max( 0, raftLog.commitIndex() - 1 - flushAfter ); // new instances have a commit index of -1, which should be ignored
- log.info( "Starting replay at index %d", index );
- for(; index <= raftLog.commitIndex(); index++ )
- {
- ReplicatedContent content = raftLog.readEntryContent( index );
- stateMachine.applyCommand( content, index );
- log.info( "Index %d replayed as committed", index );
- }
-
- log.info( "Replay done, took %d ms", currentTimeMillis() - start );
- }
-}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/CoreServerStartupProcessTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/CoreServerStartupProcessTest.java
index 33a0662298fa6..4d0bb9fe9004a 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/CoreServerStartupProcessTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/CoreServerStartupProcessTest.java
@@ -30,11 +30,10 @@
import org.neo4j.coreedge.raft.RaftServer;
import org.neo4j.coreedge.raft.membership.MembershipWaiter;
import org.neo4j.coreedge.raft.replication.id.ReplicatedIdGeneratorFactory;
+import org.neo4j.coreedge.raft.state.StateMachineApplier;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.CoreServerStartupProcess;
import org.neo4j.coreedge.raft.DelayedRenewableTimeoutService;
-import org.neo4j.coreedge.server.core.DeleteStoreOnStartUp;
-import org.neo4j.coreedge.server.core.RaftLogReplay;
import org.neo4j.coreedge.server.core.RecoverTransactionLogState;
import org.neo4j.helpers.collection.IteratorUtil;
import org.neo4j.kernel.impl.transaction.state.DataSourceManager;
@@ -58,21 +57,21 @@ public void raftTimeOutServiceTriggersMessagesSentToAnotherServer() throws Excep
DelayedRenewableTimeoutService raftTimeoutService = mock( DelayedRenewableTimeoutService.class );
MembershipWaiter membershipWaiter = mock( MembershipWaiter.class );
RaftInstance raftInstance = mock( RaftInstance.class );
- RaftLogReplay raftLogReplay = mock( RaftLogReplay.class );
+ StateMachineApplier recoverableStateMachine = mock( StateMachineApplier.class );
RecoverTransactionLogState recoverTransactionLogState = mock( RecoverTransactionLogState.class );
Lifecycle tokenLife = mock( Lifecycle.class );
LifeSupport lifeSupport = CoreServerStartupProcess.createLifeSupport( dataSourceManager,
- idGeneratorFactory, raftInstance, raftLogReplay, raftServer, catchupServer, raftTimeoutService,
+ idGeneratorFactory, raftInstance, recoverableStateMachine, raftServer, catchupServer, raftTimeoutService,
membershipWaiter, 0, recoverTransactionLogState, tokenLife );
assertThat( lifeSupport, startsComponent( raftTimeoutService ).after( raftServer )
.because( "server need to be ready to handle responses generated by timeout events" ) );
- assertThat( lifeSupport, startsComponent( raftTimeoutService ).after( raftLogReplay )
+ assertThat( lifeSupport, startsComponent( raftTimeoutService ).after( recoverableStateMachine )
.because( "elections which must request votes from the latest known voting members" ) );
- assertThat( lifeSupport, startsComponent( raftLogReplay ).after( dataSourceManager )
+ assertThat( lifeSupport, startsComponent( recoverableStateMachine ).after( dataSourceManager )
.because( "transactions are replayed from the RAFT log into the data source" ) );
assertThat( lifeSupport, startsComponent( idGeneratorFactory ).after( dataSourceManager )
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java
index 571b6bbba6163..51cbbcb540533 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/RaftInstanceBuilder.java
@@ -29,11 +29,8 @@
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.replication.LeaderOnlyReplicator;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
-import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
-import org.neo4j.coreedge.raft.state.StateMachine;
-import org.neo4j.coreedge.raft.state.StateMachines;
import org.neo4j.coreedge.raft.state.StateStorage;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.raft.state.term.TermState;
import org.neo4j.coreedge.raft.state.vote.VoteState;
@@ -50,16 +47,14 @@ public class RaftInstanceBuilder
private int expectedClusterSize;
private RaftGroup.Builder memberSetBuilder;
- private StateStorage termState = new StubStateStorage<>( new TermState() );
- private StateStorage> voteState = new StubStateStorage<>( new VoteState<>() );
+ private StateStorage termState = new InMemoryStateStorage<>( new TermState() );
+ private StateStorage> voteState = new InMemoryStateStorage<>( new VoteState<>() );
private RaftLog raftLog = new InMemoryRaftLog();
private RenewableTimeoutService renewableTimeoutService = new DelayedRenewableTimeoutService( Clock.SYSTEM_CLOCK,
NullLogProvider.getInstance() );
- private Inbound inbound = handler -> {
- };
- private Outbound outbound = ( advertisedSocketAddress, messages ) -> {
- };
+ private Inbound inbound = handler -> {};
+ private Outbound outbound = ( advertisedSocketAddress, messages ) -> {};
private LogProvider logProvider = NullLogProvider.getInstance();
private Clock clock = Clock.SYSTEM_CLOCK;
@@ -73,10 +68,9 @@ public class RaftInstanceBuilder
private int maxAllowedShippingLag = 256;
private Supplier databaseHealthSupplier;
private StateStorage> raftMembership =
- new StubStateStorage<>( new RaftMembershipState<>() );
+ new InMemoryStateStorage<>( new RaftMembershipState<>() );
private Monitors monitors = new Monitors();
- private LastAppliedTrackingStateMachine stateMachine = new LastAppliedTrackingStateMachine( new StateMachines() );
- private int flushAfter = 1;
+ private ConsensusListener consensusListener = () -> {};
public RaftInstanceBuilder( MEMBER member, int expectedClusterSize, RaftGroup.Builder memberSetBuilder )
{
@@ -95,10 +89,9 @@ public RaftInstance build()
RaftLogShippingManager logShipping = new RaftLogShippingManager<>( outbound, logProvider, raftLog,
clock, member, membershipManager, retryTimeMillis, catchupBatchSize, maxAllowedShippingLag );
- return new RaftInstance<>( member, termState, voteState, raftLog, stateMachine, electionTimeout,
- heartbeatInterval,
- renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider, membershipManager,
- logShipping, databaseHealthSupplier, monitors, flushAfter );
+ return new RaftInstance<>( member, termState, voteState, raftLog, consensusListener, electionTimeout,
+ heartbeatInterval, renewableTimeoutService, inbound, outbound, leaderWaitTimeout, logProvider,
+ membershipManager, logShipping, databaseHealthSupplier, monitors );
}
public RaftInstanceBuilder leaderWaitTimeout( long leaderWaitTimeout )
@@ -143,9 +136,9 @@ public RaftInstanceBuilder raftLog( RaftLog raftLog )
return this;
}
- public RaftInstanceBuilder stateMachine( LastAppliedTrackingStateMachine stateMachine )
+ public RaftInstanceBuilder consensusListener( ConsensusListener consensusListener )
{
- this.stateMachine = stateMachine;
+ this.consensusListener = consensusListener;
return this;
}
@@ -160,10 +153,4 @@ public RaftInstanceBuilder monitors( Monitors monitors )
this.monitors = monitors;
return this;
}
-
- public RaftInstanceBuilder flushAfter( int flushAfter )
- {
- this.flushAfter = flushAfter;
- return this;
- }
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java
index b1a40867bf838..55b82ae52efa8 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/log/RaftInstanceLogTest.java
@@ -25,29 +25,25 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+import org.neo4j.coreedge.raft.ConsensusListener;
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftInstanceBuilder;
import org.neo4j.coreedge.raft.ReplicatedInteger;
-import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
-import org.neo4j.coreedge.raft.state.StateMachine;
+import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.server.RaftTestMember;
import org.neo4j.coreedge.server.RaftTestMemberSetBuilder;
-import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
-
-import static org.neo4j.coreedge.server.RaftTestMember.member;
import static org.neo4j.coreedge.raft.ReplicatedInteger.valueOf;
import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest;
+import static org.neo4j.coreedge.server.RaftTestMember.member;
@RunWith(MockitoJUnitRunner.class)
public class RaftInstanceLogTest
{
@Mock
- StateMachine stateMachine;
+ ConsensusListener consensusListener;
private RaftTestMember myself = member( 0 );
private ReplicatedContent content = ReplicatedInteger.valueOf( 1 );
@@ -63,7 +59,7 @@ public void before() throws Exception
raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE )
.raftLog( testEntryLog )
- .stateMachine( new LastAppliedTrackingStateMachine( stateMachine ) )
+ .consensusListener( consensusListener )
.build();
}
@@ -354,6 +350,6 @@ public void shouldUpdateCommitIndexIfNecessary() throws Exception
// then
assertEquals( 2, testEntryLog.commitIndex() );
- verify( stateMachine ).applyCommand( eq( ReplicatedInteger.valueOf( 1 ) ), anyLong() );
+ verify( consensusListener ).notifyCommitted();
}
}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java
index 41a8eccb2adf7..a00a8fca928c7 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/membership/RaftMembershipManagerTest.java
@@ -31,7 +31,7 @@
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.TruncateLogCommand;
import org.neo4j.coreedge.raft.state.StateStorage;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.raft.state.membership.RaftMembershipState;
import org.neo4j.coreedge.server.RaftTestMember;
import org.neo4j.coreedge.server.RaftTestMemberSetBuilder;
@@ -61,7 +61,7 @@ public void membershipManagerShouldUseLatestAppendedMembershipSetEntries()
RaftMembershipManager membershipManager = new RaftMembershipManager<>(
null, RaftTestMemberSetBuilder.INSTANCE, log,
NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, new StubStateStorage<>( new RaftMembershipState<>() ) );
+ 1000, new InMemoryStateStorage<>( new RaftMembershipState<>() ) );
// when
membershipManager.processLog( asList(
@@ -84,7 +84,7 @@ public void membershipManagerShouldRevertToOldMembershipSetAfterTruncationCauses
RaftMembershipManager membershipManager = new RaftMembershipManager<>(
null,
RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, new StubStateStorage<>( new RaftMembershipState<>() ) );
+ 1000, new InMemoryStateStorage<>( new RaftMembershipState<>() ) );
// when
List logCommands = asList(
@@ -114,7 +114,7 @@ public void membershipManagerShouldRevertToEarlierAppendedMembershipSetAfterTrun
RaftMembershipManager membershipManager = new RaftMembershipManager<>(
null,
RaftTestMemberSetBuilder.INSTANCE, log, NullLogProvider.getInstance(), 3, 1000, new FakeClock(),
- 1000, new StubStateStorage<>( new RaftMembershipState<>() ) );
+ 1000, new InMemoryStateStorage<>( new RaftMembershipState<>() ) );
// when
List logCommands = asList(
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java
index 8647a2bff7315..4aecae5e03ec5 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdAllocationStateMachineTest.java
@@ -22,7 +22,7 @@
import org.junit.Test;
import org.neo4j.coreedge.raft.state.StateStorage;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
@@ -49,7 +49,7 @@ public void shouldNotHaveAnyIdsInitially()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
// when
IdRange myHighestIdRange = idAllocationStateMachine.getHighestIdRange( me, someType );
@@ -65,7 +65,7 @@ public void shouldUpdateStateOnlyForTypeRequested()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
ReplicatedIdAllocationRequest idAllocationRequest = new ReplicatedIdAllocationRequest( me, someType, 0, 1024 );
// when
@@ -81,7 +81,7 @@ public void shouldUpdateHighestIdRangeForSelf()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
ReplicatedIdAllocationRequest idAllocationRequest = new ReplicatedIdAllocationRequest( me, someType, 0, 1024 );
// when
@@ -98,7 +98,7 @@ public void severalDistinctRequestsShouldIncrementallyUpdate()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
long index = 0;
// when
@@ -115,7 +115,7 @@ public void severalEqualRequestsShouldOnlyUpdateOnce()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
// when
idAllocationStateMachine.applyCommand( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), 0 );
@@ -131,7 +131,7 @@ public void outOfOrderRequestShouldBeIgnored()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
// when
idAllocationStateMachine.applyCommand( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), 0 );
@@ -147,7 +147,7 @@ public void requestLosingRaceShouldBeIgnored()
{
// given
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( me,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
// when
idAllocationStateMachine.applyCommand( new ReplicatedIdAllocationRequest( someoneElse, someType, 0, 1024 ), 0 );
@@ -167,14 +167,14 @@ public void shouldCorrectlyRestartWithPreviousState() throws Exception
IdAllocationState idAllocationState = new IdAllocationState();
ReplicatedIdAllocationStateMachine firstIdAllocationStateMachine =
- new ReplicatedIdAllocationStateMachine( me, new StubStateStorage<>( idAllocationState ), NullLogProvider.getInstance() );
+ new ReplicatedIdAllocationStateMachine( me, new InMemoryStateStorage<>( idAllocationState ), NullLogProvider.getInstance() );
firstIdAllocationStateMachine.applyCommand( new ReplicatedIdAllocationRequest( me, someType, 0, 1024 ), 0 );
firstIdAllocationStateMachine.applyCommand( new ReplicatedIdAllocationRequest( me, someType, 1024, 1024 ), 1 );
// when
ReplicatedIdAllocationStateMachine secondIdAllocationStateMachine =
- new ReplicatedIdAllocationStateMachine( me, new StubStateStorage<>( idAllocationState ), NullLogProvider.getInstance() );
+ new ReplicatedIdAllocationStateMachine( me, new InMemoryStateStorage<>( idAllocationState ), NullLogProvider.getInstance() );
// then
assertEquals( firstIdAllocationStateMachine.getHighestIdRange( me, someType ),
@@ -202,7 +202,7 @@ public void shouldIgnoreAlreadySeenIndex() throws Exception
// given
// a state machine
final long AN_INDEX = 24;
- StateStorage stateStorage = new StubStateStorage<>( new IdAllocationState() );
+ StateStorage stateStorage = new InMemoryStateStorage<>( new IdAllocationState() );
ReplicatedIdAllocationStateMachine stateMachine =
new ReplicatedIdAllocationStateMachine( me, stateStorage, NullLogProvider.getInstance() );
@@ -238,7 +238,7 @@ public void shouldContinueAcceptingRequestsAfterIgnoreAlreadySeenIndex() throws
// given
// a state machine
final long AN_INDEX = 24;
- StateStorage stateStorage = new StubStateStorage<>( new IdAllocationState() );
+ StateStorage stateStorage = new InMemoryStateStorage<>( new IdAllocationState() );
ReplicatedIdAllocationStateMachine stateMachine =
new ReplicatedIdAllocationStateMachine( me, stateStorage, NullLogProvider.getInstance() );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java
index 0392a54d8929b..56d7cb3e593ef 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/id/ReplicatedIdRangeAcquirerTest.java
@@ -25,7 +25,7 @@
import org.junit.Test;
import org.neo4j.coreedge.raft.replication.DirectReplicator;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
@@ -95,7 +95,7 @@ private ReplicatedIdGenerator createForMemberWithInitialIdAndRangeLength( CoreMe
int idRangeLength )
{
ReplicatedIdAllocationStateMachine idAllocationStateMachine = new ReplicatedIdAllocationStateMachine( member,
- new StubStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new IdAllocationState() ), NullLogProvider.getInstance() );
stateMachines.add( idAllocationStateMachine );
ReplicatedIdRangeAcquirer acquirer = new ReplicatedIdRangeAcquirer( replicator,
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java
index 122f3f9e8b798..bfda9f7546bfb 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/CommitProcessStateMachineCollaborationTest.java
@@ -32,7 +32,7 @@
import org.neo4j.coreedge.raft.replication.session.LocalSessionPool;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.StateMachines;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.locks.LockTokenManager;
@@ -78,7 +78,7 @@ public void shouldAlwaysCompleteFutureEvenIfReplicationHappensAtUnfortunateMomen
LockTokenManager lockState = lockState( 0 );
final ReplicatedTransactionStateMachine stateMachine = new ReplicatedTransactionStateMachine<>(
localCommitProcess, sessionPool.getGlobalSession(), lockState, txFutures,
- new StubStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
stateMachines.add( stateMachine );
ReplicatedTransactionCommitProcess commitProcess = new ReplicatedTransactionCommitProcess(
@@ -110,7 +110,7 @@ public void shouldFailTransactionIfLockSessionChanges() throws Exception
final ReplicatedTransactionStateMachine stateMachine = new ReplicatedTransactionStateMachine<>(
localCommitProcess, sessionPool.getGlobalSession(), lockState, txFutures,
- new StubStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
stateMachines.add( stateMachine );
ReplicatedTransactionCommitProcess commitProcess = new ReplicatedTransactionCommitProcess(
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java
index 6a08d2342dbb6..dcf4bba4589cc 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachinePersistenceTest.java
@@ -40,7 +40,7 @@
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.server.RaftTestMember;
import org.neo4j.coreedge.server.core.locks.LockTokenManager;
import org.neo4j.graphdb.TransactionFailureException;
@@ -163,7 +163,7 @@ public ReplicatedTransactionStateMachine stateMachine( Transacti
new GlobalSession<>( UUID.randomUUID(), RaftTestMember.member( 1 ) ),
mock( LockTokenManager.class, RETURNS_MOCKS ),
new CommittingTransactionsRegistry(),
- new StubStateStorage<>( sessionTrackerState ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( sessionTrackerState ), NullLogProvider.getInstance() );
}
private ReplicatedTransaction replicatedTx() throws java.io.IOException
@@ -172,4 +172,4 @@ private ReplicatedTransaction replicatedTx() throws java.io.IOEx
return ReplicatedTransactionFactory.createImmutableReplicatedTransaction( tx, new GlobalSession<>( UUID
.randomUUID(), RaftTestMember.member( 2 ) ), new LocalOperationId( 1, 0 ) );
}
-}
\ No newline at end of file
+}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java
index ab61f88de47c3..a64fc68b4aa54 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/replication/tx/ReplicatedTransactionStateMachineTest.java
@@ -27,7 +27,7 @@
import org.neo4j.coreedge.raft.replication.session.GlobalSession;
import org.neo4j.coreedge.raft.replication.session.GlobalSessionTrackerState;
import org.neo4j.coreedge.raft.replication.session.LocalOperationId;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.coreedge.server.core.locks.LockTokenManager;
@@ -71,7 +71,7 @@ public void shouldCommitTransaction() throws Exception
final ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine<>(
localCommitProcess, globalSession, lockState( lockSessionId ), new CommittingTransactionsRegistry(),
- new StubStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
// when
listener.applyCommand( tx, 0 );
@@ -94,7 +94,7 @@ public void shouldOnlyCommitSameTransactionOnce() throws Exception
TransactionCommitProcess localCommitProcess = mock( TransactionCommitProcess.class );
ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine<>(
localCommitProcess, globalSession, lockState( lockSessionId ), new CommittingTransactionsRegistry(),
- new StubStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
// when
listener.applyCommand( tx, 0 );
@@ -121,7 +121,7 @@ public void shouldFailFutureForTransactionCommittedUnderWrongLockSession() throw
CommittingTransactions committingTransactions = new CommittingTransactionsRegistry();
final ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine<>(
localCommitProcess, globalSession, lockState( currentLockSessionId ), committingTransactions,
- new StubStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
CommittingTransaction future = committingTransactions.register( localOperationId );
@@ -156,7 +156,7 @@ public void shouldAcceptTransactionCommittedWithNoLockManager() throws Exception
CommittingTransactions committingTransactions = new CommittingTransactionsRegistry();
final ReplicatedTransactionStateMachine listener = new ReplicatedTransactionStateMachine<>(
localCommitProcess, globalSession, lockState( currentLockSessionId ), committingTransactions,
- new StubStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
+ new InMemoryStateStorage<>( new GlobalSessionTrackerState<>() ), NullLogProvider.getInstance() );
CommittingTransaction future = committingTransactions.register( localOperationId );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/StubStateStorage.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/InMemoryStateStorage.java
similarity index 86%
rename from enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/StubStateStorage.java
rename to enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/InMemoryStateStorage.java
index b9d790826d1d4..75e2e25f7f086 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/StubStateStorage.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/InMemoryStateStorage.java
@@ -21,11 +21,11 @@
import java.io.IOException;
-public class StubStateStorage implements StateStorage
+public class InMemoryStateStorage implements StateStorage
{
- private final STATE state;
+ private STATE state;
- public StubStateStorage( STATE state )
+ public InMemoryStateStorage( STATE state )
{
this.state = state;
}
@@ -39,6 +39,6 @@ public STATE getInitialState()
@Override
public void persistStoreData( STATE state ) throws IOException
{
-
+ this.state = state;
}
-}
\ No newline at end of file
+}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/LastAppliedTrackingStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/LastAppliedTrackingStateMachineTest.java
deleted file mode 100644
index 0295691d32fc9..0000000000000
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/LastAppliedTrackingStateMachineTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.coreedge.raft.state;
-
-import org.junit.Test;
-
-import org.neo4j.coreedge.raft.ReplicatedInteger;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import static org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine.NOTHING_APPLIED;
-
-public class LastAppliedTrackingStateMachineTest
-{
- @Test
- public void shouldInitiallyHaveNothingApplied() throws Exception
- {
- // given
- LastAppliedTrackingStateMachine lastAppliedTrackingStateMachine = new LastAppliedTrackingStateMachine( mock( StateMachines.class ) );
-
- // then
- assertEquals( NOTHING_APPLIED, lastAppliedTrackingStateMachine.lastApplied() );
- }
-
- @Test
- public void shouldKeepTrackOfTheLastAppliedIndex() throws Exception
- {
- // given
- LastAppliedTrackingStateMachine lastAppliedTrackingStateMachine = new LastAppliedTrackingStateMachine( mock( StateMachines.class ) );
-
- // when
- lastAppliedTrackingStateMachine.applyCommand( ReplicatedInteger.valueOf( 1 ), 1 );
-
- // then
- assertEquals( 1, lastAppliedTrackingStateMachine.lastApplied() );
- }
-
- @Test
- public void shouldKeepHighestIndexSeenSoFar() throws Exception
- {
- // given
- LastAppliedTrackingStateMachine lastAppliedTrackingStateMachine = new LastAppliedTrackingStateMachine( mock( StateMachines.class ) );
-
- // when
- lastAppliedTrackingStateMachine.applyCommand( ReplicatedInteger.valueOf( 1 ), 9 );
- lastAppliedTrackingStateMachine.applyCommand( ReplicatedInteger.valueOf( 1 ), 1 );
-
- // then
- assertEquals( 9, lastAppliedTrackingStateMachine.lastApplied() );
- }
-}
\ No newline at end of file
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java
index 368e28ac730fb..6eee0ac122af8 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateBuilder.java
@@ -114,8 +114,8 @@ public RaftStateBuilder lastLogIndexBeforeWeBecameLeader( long lastLogIndexBefor
public RaftState build() throws RaftStorageException
{
- StateStorage termStore = new StubStateStorage<>( new TermState() );
- StateStorage> voteStore = new StubStateStorage<>( new VoteState<>( ) );
+ StateStorage termStore = new InMemoryStateStorage<>( new TermState() );
+ StateStorage> voteStore = new InMemoryStateStorage<>( new VoteState<>( ) );
StubMembership membership = new StubMembership();
RaftState state = new RaftState<>( myself, termStore, membership, entryLog, voteStore );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java
index ecd645486b33c..d3c189ad9d334 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java
@@ -51,9 +51,9 @@ public void shouldRemoveFollowerStateAfterBecomingLeader() throws Exception
{
// given
RaftState raftState = new RaftState<>( new RaftTestMember( 0 ),
- new StubStateStorage<>( new TermState() ),
+ new InMemoryStateStorage<>( new TermState() ),
new FakeMembership(), new InMemoryRaftLog(),
- new StubStateStorage<>( new VoteState<>( ) ) );
+ new InMemoryStateStorage<>( new VoteState<>( ) ) );
raftState.update( new Outcome<>( CANDIDATE, 1, null, -1, null, new HashSet<>(), -1, initialFollowerStates(), true, emptyLogCommands(),
emptyOutgoingMessages(), Collections.emptySet() ) );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateStorageTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateStorageTest.java
index 591aed015373a..1f34dd9ba4396 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateStorageTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/term/MonitoredTermStateStorageTest.java
@@ -22,7 +22,7 @@
import org.junit.Test;
import org.neo4j.coreedge.raft.log.monitoring.RaftTermMonitor;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.kernel.monitoring.Monitors;
import static org.junit.Assert.assertEquals;
@@ -38,7 +38,7 @@ public void shouldMonitorTerm() throws Exception
monitors.addMonitorListener( raftTermMonitor );
TermState state = new TermState();
MonitoredTermStateStorage monitoredTermStateStorage =
- new MonitoredTermStateStorage( new StubStateStorage<>( new TermState() ), monitors );
+ new MonitoredTermStateStorage( new InMemoryStateStorage<>( new TermState() ), monitors );
// when
state.update( 7 );
@@ -64,4 +64,4 @@ public void term( long term )
this.term = term;
}
}
-}
\ No newline at end of file
+}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/RaftLogReplayTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/RaftLogReplayTest.java
deleted file mode 100644
index 739f477770163..0000000000000
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/RaftLogReplayTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2002-2016 "Neo Technology,"
- * Network Engine for Objects in Lund AB [http://neotechnology.com]
- *
- * This file is part of Neo4j.
- *
- * Neo4j is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-package org.neo4j.coreedge.server.core;
-
-import org.junit.Test;
-
-import org.neo4j.coreedge.raft.ReplicatedInteger;
-import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
-import org.neo4j.coreedge.raft.log.RaftLogEntry;
-import org.neo4j.coreedge.raft.state.StateMachine;
-import org.neo4j.logging.NullLogProvider;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class RaftLogReplayTest
-{
- @Test
- public void shouldReplayLastCommittedEntry() throws Throwable
- {
- // given
- StateMachine stateMachine = mock( StateMachine.class );
- InMemoryRaftLog raftLog = new InMemoryRaftLog();
- raftLog.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 0 ) ) );
- raftLog.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 1 ) ) );
- raftLog.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 2 ) ) );
- raftLog.commit( 2 );
- raftLog.append( new RaftLogEntry( 0, ReplicatedInteger.valueOf( 3 ) ) );
-
- RaftLogReplay replayer = new RaftLogReplay( stateMachine, raftLog, NullLogProvider.getInstance(), 1 );
-
- // when
- replayer.start();
-
- // then
- verify( stateMachine ).applyCommand( ReplicatedInteger.valueOf( 2 ), 2 );
- }
-}
\ No newline at end of file
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/StateMachineApplierTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/StateMachineApplierTest.java
new file mode 100644
index 0000000000000..cdff53c7b3b75
--- /dev/null
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/StateMachineApplierTest.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright (c) 2002-2016 "Neo Technology,"
+ * Network Engine for Objects in Lund AB [http://neotechnology.com]
+ *
+ * This file is part of Neo4j.
+ *
+ * Neo4j is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package org.neo4j.coreedge.server.core;
+
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import org.neo4j.coreedge.raft.log.InMemoryRaftLog;
+import org.neo4j.coreedge.raft.log.RaftLogEntry;
+import org.neo4j.coreedge.raft.replication.ReplicatedContent;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
+import org.neo4j.coreedge.raft.state.LastAppliedState;
+import org.neo4j.coreedge.raft.state.StateMachine;
+import org.neo4j.coreedge.raft.state.StateMachineApplier;
+import org.neo4j.kernel.internal.DatabaseHealth;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import static org.neo4j.coreedge.raft.ReplicatedInteger.valueOf;
+import static org.neo4j.logging.NullLogProvider.getInstance;
+
+public class StateMachineApplierTest
+{
+ @Test
+ public void shouldApplyCommittedCommands() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+ StateMachine stateMachine = mock( StateMachine.class );
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 10, health(), getInstance() );
+
+ raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ raftLog.commit( 0 );
+ applier.start();
+
+ // when
+ applier.notifyCommitted();
+
+ // then
+ verify( stateMachine ).applyCommand( valueOf( 0 ), 0 );
+ }
+
+ @Test
+ public void shouldNotApplyAnythingIfNothingIsCommitted() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+ StateMachine stateMachine = mock( StateMachine.class );
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 10, health(), getInstance() );
+
+ raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ applier.start();
+
+ // when
+ applier.notifyCommitted();
+
+ // then
+ verify( stateMachine, times( 0 ) ).applyCommand( valueOf( 0 ), 0 );
+ }
+
+ @Test
+ public void startShouldApplyCommittedButNotYetAppliedCommands() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+ StateMachine stateMachine = mock( StateMachine.class );
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 10, health(), getInstance() );
+
+ raftLog.append( new RaftLogEntry( 0, valueOf( 0 ) ) );
+ raftLog.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ raftLog.append( new RaftLogEntry( 0, valueOf( 2 ) ) );
+ raftLog.append( new RaftLogEntry( 0, valueOf( 3 ) ) );
+ raftLog.commit( 3 );
+
+ lastApplied.persistStoreData( new LastAppliedState( 1 ) );
+
+ // when
+ applier.start();
+
+ // then
+ verify( stateMachine ).applyCommand( valueOf( 2 ), 2 );
+ verify( stateMachine ).applyCommand( valueOf( 3 ), 3 );
+ verifyNoMoreInteractions( stateMachine );
+ }
+
+ @Test
+ public void shouldPeriodicallyFlushStateMachines() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+ StateMachine stateMachine = mock( StateMachine.class );
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 5, health(), getInstance() );
+
+ for ( int i = 0; i < 50; i++ )
+ {
+ raftLog.append( new RaftLogEntry( 0, valueOf( i ) ) );
+ }
+ raftLog.commit( 49 );
+
+ // when
+ applier.start();
+
+ // then
+ verify( stateMachine, times( 10 ) ).flush();
+ }
+
+ @Test
+ public void shouldPeriodicallyStoreLastAppliedState() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+ StateMachine stateMachine = mock( StateMachine.class );
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 5, health(), getInstance() );
+
+ for ( int i = 0; i < 50; i++ )
+ {
+ raftLog.append( new RaftLogEntry( 0, valueOf( i ) ) );
+ }
+ raftLog.commit( 49 );
+
+ // when
+ applier.start();
+
+ // then
+ assertEquals( 45L, lastApplied.getInitialState().get() );
+ }
+
+ @Test
+ public void shouldPanicIfUnableToApply() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+
+ StateMachine stateMachine = new FailingStateMachine();
+
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+
+ Supplier healthSupplier = health();
+ DatabaseHealth health = mock( DatabaseHealth.class );
+ when( healthSupplier.get() ).thenReturn( health );
+
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 5, healthSupplier, getInstance() );
+
+
+ raftLog.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ raftLog.commit( 0 );
+
+ // when
+ applier.notifyCommitted();
+
+ // then
+ verify( health ).panic( anyObject() );
+ }
+
+ @Test
+ public void shouldNotStartIfUnableToApplyOnStartUp() throws Exception
+ {
+ // given
+ InMemoryRaftLog raftLog = new InMemoryRaftLog();
+
+ StateMachine stateMachine = new FailingStateMachine();
+
+ InMemoryStateStorage lastApplied = new InMemoryStateStorage<>( new LastAppliedState( -1 ) );
+
+ Supplier healthSupplier = health();
+ DatabaseHealth health = mock( DatabaseHealth.class );
+ when( healthSupplier.get() ).thenReturn( health );
+
+ StateMachineApplier applier = new StateMachineApplier( stateMachine, raftLog, lastApplied,
+ Runnable::run, 5, healthSupplier, getInstance() );
+
+
+ raftLog.append( new RaftLogEntry( 0, valueOf( 1 ) ) );
+ raftLog.commit( 0 );
+
+ // when
+ try
+ {
+ applier.start();
+ fail( "Should have thrown IllegalStateException" );
+ }
+ catch ( IllegalStateException ignored )
+ {
+ // expected
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Supplier health()
+ {
+ return mock( Supplier.class );
+ }
+
+ private static class FailingStateMachine implements StateMachine
+ {
+ @Override
+ public void applyCommand( ReplicatedContent content, long logIndex )
+ {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ // do nothing
+ }
+ }
+}
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/LeaderOnlyLockManagerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/LeaderOnlyLockManagerTest.java
index 899dc4ebde171..33d82ac069dd5 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/LeaderOnlyLockManagerTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/LeaderOnlyLockManagerTest.java
@@ -23,7 +23,7 @@
import org.neo4j.coreedge.raft.LeaderLocator;
import org.neo4j.coreedge.raft.replication.DirectReplicator;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.server.RaftTestMember;
import org.neo4j.coreedge.raft.state.StateMachines;
import org.neo4j.kernel.impl.locking.Locks;
@@ -49,7 +49,7 @@ public void shouldIssueLocksOnLeader() throws Exception
StateMachines stateMachines = new StateMachines();
ReplicatedLockTokenStateMachine replicatedLockStateMachine =
- new ReplicatedLockTokenStateMachine<>( new StubStateStorage( new ReplicatedLockTokenState<>() ) );
+ new ReplicatedLockTokenStateMachine<>( new InMemoryStateStorage( new ReplicatedLockTokenState<>() ) );
stateMachines.add( replicatedLockStateMachine );
DirectReplicator replicator = new DirectReplicator( stateMachines );
@@ -76,7 +76,7 @@ public void shouldNotIssueLocksOnNonLeader() throws Exception
StateMachines stateMachines = new StateMachines();
ReplicatedLockTokenStateMachine replicatedLockStateMachine =
- new ReplicatedLockTokenStateMachine<>( new StubStateStorage( new ReplicatedLockTokenState<>() ) );
+ new ReplicatedLockTokenStateMachine<>( new InMemoryStateStorage( new ReplicatedLockTokenState<>() ) );
stateMachines.add( replicatedLockStateMachine );
DirectReplicator replicator = new DirectReplicator( stateMachines );
diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachineTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachineTest.java
index 5d3d8bfbe4dc0..072dd18fc6f1b 100644
--- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachineTest.java
+++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/server/core/locks/ReplicatedLockTokenStateMachineTest.java
@@ -26,7 +26,7 @@
import org.neo4j.coreedge.raft.state.DurableStateStorage;
import org.neo4j.coreedge.raft.state.StateMarshal;
-import org.neo4j.coreedge.raft.state.StubStateStorage;
+import org.neo4j.coreedge.raft.state.InMemoryStateStorage;
import org.neo4j.coreedge.server.AdvertisedSocketAddress;
import org.neo4j.coreedge.server.CoreMember;
import org.neo4j.graphdb.mockfs.EphemeralFileSystemAbstraction;
@@ -45,7 +45,7 @@ public void shouldStartWithInvalidTokenId() throws Exception
{
// given
LockTokenManager stateMachine = new ReplicatedLockTokenStateMachine<>(
- new StubStateStorage<>( new ReplicatedLockTokenState<>() ) );
+ new InMemoryStateStorage<>( new ReplicatedLockTokenState<>() ) );
// when
int initialTokenId = stateMachine.currentToken().id();
@@ -59,7 +59,7 @@ public void shouldIssueNextLockTokenCandidateId() throws Exception
{
// given
ReplicatedLockTokenStateMachine