Skip to content

Commit

Permalink
Keep track of last applied command inside the state machine.
Browse files Browse the repository at this point in the history
This ensures that RaftInstance doesn't try to apply commands that were
already applied by RaftLogReplay.
  • Loading branch information
apcj committed Feb 18, 2016
1 parent 3b29456 commit 4c18d71
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 14 deletions.
Expand Up @@ -44,6 +44,7 @@
import org.neo4j.coreedge.raft.replication.ReplicatedContent;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
import org.neo4j.coreedge.raft.state.RaftState;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.coreedge.raft.state.StateMachine;
Expand Down Expand Up @@ -100,7 +101,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName
private RenewableTimeoutService.RenewableTimeout electionTimer;
private RaftMembershipManager<MEMBER> membershipManager;

private final StateMachine stateMachine;
private final LastAppliedTrackingStateMachine stateMachine;
private final long electionTimeout;
private final long leaderWaitTimeout;

Expand All @@ -116,7 +117,7 @@ public enum Timeouts implements RenewableTimeoutService.TimeoutName

public RaftInstance( MEMBER myself, StateStorage<TermState> termStorage,
StateStorage<VoteState<MEMBER>> voteStorage, RaftLog entryLog,
StateMachine stateMachine, long electionTimeout, long heartbeatInterval,
LastAppliedTrackingStateMachine stateMachine, long electionTimeout, long heartbeatInterval,
RenewableTimeoutService renewableTimeoutService,
final Inbound inbound, final Outbound<MEMBER> outbound, long leaderWaitTimeout,
LogProvider logProvider, RaftMembershipManager<MEMBER> membershipManager,
Expand Down Expand Up @@ -190,7 +191,6 @@ public synchronized void bootstrapWithInitialMembers( RaftGroup<MEMBER> memberSe
logCommand.applyTo( entryLog );
}
membershipManager.processLog( logCommands );
lastApplied = 0;
}
catch ( RaftStorageException e )
{
Expand Down Expand Up @@ -255,8 +255,6 @@ public ReadableRaftState<MEMBER> state()
return raftState;
}

private long lastApplied = -1;

private void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageException, IOException
{
adjustLogShipping( outcome );
Expand All @@ -265,7 +263,7 @@ private void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageExceptio
raftState.update( outcome );
membershipManager.processLog( outcome.getLogCommands() );

for ( long index = lastApplied + 1; index <= raftState.entryLog().commitIndex(); index++ )
for ( long index = stateMachine.lastApplied() + 1; index <= raftState.entryLog().commitIndex(); index++ )
{
ReplicatedContent content = raftState.entryLog().readEntryContent( index );
stateMachine.applyCommand( content, index );
Expand All @@ -274,7 +272,6 @@ private void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageExceptio
stateMachine.flush();
}
}
lastApplied = raftState.entryLog().commitIndex();
volatileLeader.set( outcome.getLeader() );
}

Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import java.io.IOException;

import org.neo4j.coreedge.raft.replication.ReplicatedContent;

import static java.lang.Math.max;

public class LastAppliedTrackingStateMachine implements StateMachine
{
public static final long NOTHING_APPLIED = -1;

private final StateMachine stateMachine;
private long lastApplied = NOTHING_APPLIED;

public LastAppliedTrackingStateMachine( StateMachine stateMachine )
{
this.stateMachine = stateMachine;
}

@Override
public void applyCommand( ReplicatedContent content, long logIndex )
{
stateMachine.applyCommand( content, logIndex );
lastApplied = max( logIndex, lastApplied );
}

@Override
public void flush() throws IOException
{
stateMachine.flush();
}

public long lastApplied()
{
return lastApplied;
}
}
Expand Up @@ -69,6 +69,7 @@
import org.neo4j.coreedge.raft.replication.tx.ReplicatedTransactionStateMachine;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.DurableStateStorage;
import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
import org.neo4j.coreedge.raft.state.StateMachines;
import org.neo4j.coreedge.raft.state.StateStorage;
import org.neo4j.coreedge.raft.state.id_allocation.IdAllocationState;
Expand Down Expand Up @@ -192,12 +193,13 @@ public EnterpriseCoreEditionModule( final PlatformModule platformModule,
marshal, logProvider ) ), platformModule.monitors );

StateMachines stateMachines = new StateMachines();
LastAppliedTrackingStateMachine lastAppliedStateMachine = new LastAppliedTrackingStateMachine( stateMachines );

int flushAfter = config.get( CoreEdgeClusterSettings.state_machine_flush_window_size );

raft = createRaft( life, loggingOutbound, discoveryService, config, messageLogger, monitoredRaftLog,
stateMachines, fileSystem, clusterStateDirectory, myself, logProvider, raftServer, raftTimeoutService,
databaseHealthSupplier, platformModule.monitors, flushAfter );
lastAppliedStateMachine, fileSystem, clusterStateDirectory, myself, logProvider, raftServer,
raftTimeoutService, databaseHealthSupplier, platformModule.monitors, flushAfter );

dependencies.satisfyDependency( raft );

Expand Down Expand Up @@ -333,7 +335,7 @@ fileSystem, new File( clusterStateDirectory, "id-allocation-state" ), "id-alloca

life.add( CoreServerStartupProcess.createLifeSupport(
platformModule.dataSourceManager, replicatedIdGeneratorFactory, raft,
new RaftLogReplay( stateMachines, monitoredRaftLog, logProvider, flushAfter ), raftServer,
new RaftLogReplay( lastAppliedStateMachine, monitoredRaftLog, logProvider, flushAfter ), raftServer,
catchupServer, raftTimeoutService, membershipWaiter,
joinCatchupTimeout,
new RecoverTransactionLogState( dependencies, logProvider,
Expand Down Expand Up @@ -397,7 +399,7 @@ private static RaftInstance<CoreMember> createRaft( LifeSupport life,
Config config,
MessageLogger<AdvertisedSocketAddress> messageLogger,
RaftLog raftLog,
StateMachines stateMachines,
LastAppliedTrackingStateMachine stateMachines,
FileSystemAbstraction fileSystem,
File clusterStateDirectory,
CoreMember myself,
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.neo4j.coreedge.raft.net.Outbound;
import org.neo4j.coreedge.raft.replication.LeaderOnlyReplicator;
import org.neo4j.coreedge.raft.replication.shipping.RaftLogShippingManager;
import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.raft.state.StateMachines;
import org.neo4j.coreedge.raft.state.StateStorage;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class RaftInstanceBuilder<MEMBER>
private StateStorage<RaftMembershipState<MEMBER>> raftMembership =
new StubStateStorage<>( new RaftMembershipState<>() );
private Monitors monitors = new Monitors();
private StateMachine stateMachine = new StateMachines();
private LastAppliedTrackingStateMachine stateMachine = new LastAppliedTrackingStateMachine( new StateMachines() );
private int flushAfter = 1;

public RaftInstanceBuilder( MEMBER member, int expectedClusterSize, RaftGroup.Builder<MEMBER> memberSetBuilder )
Expand Down Expand Up @@ -142,7 +143,7 @@ public RaftInstanceBuilder<MEMBER> raftLog( RaftLog raftLog )
return this;
}

public RaftInstanceBuilder<MEMBER> stateMachine( StateMachine stateMachine )
public RaftInstanceBuilder<MEMBER> stateMachine( LastAppliedTrackingStateMachine stateMachine )
{
this.stateMachine = stateMachine;
return this;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.neo4j.coreedge.raft.RaftInstance;
import org.neo4j.coreedge.raft.RaftInstanceBuilder;
import org.neo4j.coreedge.raft.ReplicatedInteger;
import org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine;
import org.neo4j.coreedge.raft.state.StateMachine;
import org.neo4j.coreedge.server.RaftTestMember;
import org.neo4j.coreedge.server.RaftTestMemberSetBuilder;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void before() throws Exception

raft = new RaftInstanceBuilder<>( myself, 3, RaftTestMemberSetBuilder.INSTANCE )
.raftLog( testEntryLog )
.stateMachine( stateMachine )
.stateMachine( new LastAppliedTrackingStateMachine( stateMachine ) )
.build();
}

Expand Down
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.coreedge.raft.state;

import org.junit.Test;

import org.neo4j.coreedge.raft.ReplicatedInteger;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import static org.neo4j.coreedge.raft.state.LastAppliedTrackingStateMachine.NOTHING_APPLIED;

public class LastAppliedTrackingStateMachineTest
{
@Test
public void shouldInitiallyHaveNothingApplied() throws Exception
{
// given
LastAppliedTrackingStateMachine lastAppliedTrackingStateMachine = new LastAppliedTrackingStateMachine( mock( StateMachines.class ) );

// then
assertEquals( NOTHING_APPLIED, lastAppliedTrackingStateMachine.lastApplied() );
}

@Test
public void shouldKeepTrackOfTheLastAppliedIndex() throws Exception
{
// given
LastAppliedTrackingStateMachine lastAppliedTrackingStateMachine = new LastAppliedTrackingStateMachine( mock( StateMachines.class ) );

// when
lastAppliedTrackingStateMachine.applyCommand( ReplicatedInteger.valueOf( 1 ), 1 );

// then
assertEquals( 1, lastAppliedTrackingStateMachine.lastApplied() );
}

@Test
public void shouldKeepHighestIndexSeenSoFar() throws Exception
{
// given
LastAppliedTrackingStateMachine lastAppliedTrackingStateMachine = new LastAppliedTrackingStateMachine( mock( StateMachines.class ) );

// when
lastAppliedTrackingStateMachine.applyCommand( ReplicatedInteger.valueOf( 1 ), 9 );
lastAppliedTrackingStateMachine.applyCommand( ReplicatedInteger.valueOf( 1 ), 1 );

// then
assertEquals( 9, lastAppliedTrackingStateMachine.lastApplied() );
}
}

0 comments on commit 4c18d71

Please sign in to comment.