Skip to content

Commit

Permalink
Refactor outcome to encapsulate boiler plate and clarify logic in roles.
Browse files Browse the repository at this point in the history
Still a lot of work left to make this sane. For example how follower state
gets updates. This commit should have no effect on observed behaviour.
  • Loading branch information
martinfurmanski committed Nov 27, 2015
1 parent d53b742 commit 88bf1ce
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 166 deletions.
Expand Up @@ -216,7 +216,7 @@ protected void handleOutcome( Outcome<MEMBER> outcome ) throws RaftStorageExcept

if( outcome.leader != null && outcome.leader.equals( myself ) )
{
LeaderContext leaderContext = new LeaderContext( outcome.newTerm, outcome.leaderCommit );
LeaderContext leaderContext = new LeaderContext( outcome.term, outcome.leaderCommit );

if ( oldLeader == null || !oldLeader.equals( myself ) )
{
Expand Down
Expand Up @@ -20,54 +20,136 @@
package org.neo4j.coreedge.raft.outcome;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.roles.Role;
import org.neo4j.coreedge.raft.state.FollowerStates;
import org.neo4j.coreedge.raft.state.ReadableRaftState;

public class Outcome<MEMBER> implements Serializable
{
public final Role newRole;
public final long newTerm;
public final MEMBER leader;
public final long leaderCommit;
public final MEMBER votedFor;
public final Set<MEMBER> votesForMe;
public final long lastLogIndexBeforeWeBecameLeader;
public final FollowerStates<MEMBER> followerStates;
public final boolean renewElectionTimeout;
public final Iterable<LogCommand> logCommands;
public final Iterable<ShipCommand> shipCommands;
public final Collection<RaftMessages.Directed<MEMBER>> outgoingMessages;

public Outcome(Role newRole, long newTerm, MEMBER leader, long leaderCommit, MEMBER votedFor,
Set<MEMBER> votesForMe, long lastLogIndexBeforeWeBecameLeader,
FollowerStates<MEMBER> followerStates, boolean renewElectionTimeout,
Iterable<LogCommand> logCommands, Collection<RaftMessages.Directed<MEMBER>> outgoingMessages,
Iterable<ShipCommand> shipCommands )
/* Common */
public Role newRole;

public long term;
public MEMBER leader;

public long leaderCommit;

public ArrayList<LogCommand> logCommands = new ArrayList<>();
public ArrayList<RaftMessages.Directed<MEMBER>> outgoingMessages = new ArrayList<>();

/* Follower */
public MEMBER votedFor;
public boolean renewElectionTimeout;

/* Candidate */
public HashSet<MEMBER> votesForMe;
public long lastLogIndexBeforeWeBecameLeader;

/* Leader */
public FollowerStates<MEMBER> followerStates;
public ArrayList<ShipCommand> shipCommands = new ArrayList<>();

public Outcome( Role currentRole, ReadableRaftState<MEMBER> ctx )
{
defaults( currentRole, ctx );
}

public Outcome( Role newRole, long term, MEMBER leader, long leaderCommit, MEMBER votedFor,
Set<MEMBER> votesForMe, long lastLogIndexBeforeWeBecameLeader,
FollowerStates<MEMBER> followerStates, boolean renewElectionTimeout,
Collection<LogCommand> logCommands, Collection<RaftMessages.Directed<MEMBER>> outgoingMessages,
Collection<ShipCommand> shipCommands )
{
this.newRole = newRole;
this.newTerm = newTerm;
this.term = term;
this.leader = leader;
this.leaderCommit = leaderCommit;
this.votedFor = votedFor;
this.votesForMe = votesForMe;
this.votesForMe = new HashSet<>( votesForMe );
this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader;
this.followerStates = followerStates;
this.renewElectionTimeout = renewElectionTimeout;
this.logCommands = logCommands;
this.outgoingMessages = outgoingMessages;
this.shipCommands = shipCommands;

this.logCommands.addAll( logCommands );
this.outgoingMessages.addAll( outgoingMessages );
this.shipCommands.addAll( shipCommands );
}

private void defaults( Role currentRole, ReadableRaftState<MEMBER> ctx )
{
newRole = currentRole;

term = ctx.term();
leader = ctx.leader();

leaderCommit = ctx.leaderCommit();

votedFor = ctx.votedFor();
renewElectionTimeout = false;

votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : new HashSet<>();

lastLogIndexBeforeWeBecameLeader = (currentRole == Role.LEADER) ? ctx.lastLogIndexBeforeWeBecameLeader() : -1;
followerStates = (currentRole == Role.LEADER) ? ctx.followerStates() : new FollowerStates<>();
}

public void setNextRole( Role nextRole )
{
this.newRole = nextRole;
}

public void setNextTerm( long nextTerm )
{
this.term = nextTerm;
}

public void setLeader( MEMBER leader )
{
this.leader = leader;
}

public void setLeaderCommit( long leaderCommit )
{
this.leaderCommit = leaderCommit;
}

public void addLogCommand( LogCommand logCommand )
{
this.logCommands.add( logCommand );
}

public void addOutgoingMessage( RaftMessages.Directed<MEMBER> message )
{
this.outgoingMessages.add( message );
}

public void setVotedFor( MEMBER votedFor )
{
this.votedFor = votedFor;
}

public void renewElectionTimeout()
{
this.renewElectionTimeout = true;
}

public void addVoteForMe( MEMBER voteFrom )
{
this.votesForMe.add( voteFrom );
}

@Override
public String toString()
{
return "Outcome{" +
"newRole=" + newRole +
", newTerm=" + newTerm +
"nextRole=" + newRole +
", newTerm=" + term +
", leader=" + leader +
", leaderCommit=" + leaderCommit +
", logCommands=" + logCommands +
Expand All @@ -80,4 +162,14 @@ public String toString()
", outgoingMessages=" + outgoingMessages +
'}';
}

public void setLastLogIndexBeforeWeBecameLeader( long lastLogIndexBeforeWeBecameLeader )
{
this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader;
}

public void addShipCommand( ShipCommand shipCommand )
{
shipCommands.add( shipCommand );
}
}
Expand Up @@ -19,18 +19,10 @@
*/
package org.neo4j.coreedge.raft.roles;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

import org.neo4j.coreedge.raft.outcome.ShipCommand;
import org.neo4j.coreedge.raft.RaftMessageHandler;
import org.neo4j.coreedge.raft.RaftMessages;
import org.neo4j.coreedge.raft.log.RaftStorageException;
import org.neo4j.coreedge.raft.outcome.LogCommand;
import org.neo4j.coreedge.raft.outcome.Outcome;
import org.neo4j.coreedge.raft.state.FollowerStates;
import org.neo4j.coreedge.raft.state.ReadableRaftState;
import org.neo4j.logging.Log;

Expand All @@ -45,15 +37,7 @@ public class Candidate implements RaftMessageHandler
public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message,
ReadableRaftState<MEMBER> ctx, Log log ) throws RaftStorageException
{
Role nextRole = CANDIDATE;
MEMBER leader = ctx.leader();
long leaderCommit = ctx.leaderCommit();
Collection<RaftMessages.Directed<MEMBER>> outgoingMessages = new ArrayList<>();
ArrayList<LogCommand> logCommands = new ArrayList<>();
ArrayList<ShipCommand> shipCommands = new ArrayList<>();
Set<MEMBER> updatedVotesForMe = new HashSet<>( ctx.votesForMe() );
long newTerm = ctx.term();
long lastLogIndexBeforeWeBecameLeader = -1;
Outcome<MEMBER> outcome = new Outcome<>( CANDIDATE, ctx );

switch ( message.type() )
{
Expand All @@ -66,8 +50,8 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message,
break;
}

nextRole = FOLLOWER;
outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), message ) );
outcome.setNextRole( FOLLOWER );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) );
break;
}

Expand All @@ -81,12 +65,12 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message,
new RaftMessages.AppendEntries.Response<>( ctx.myself(), ctx.term(), false, req
.prevLogIndex() );

outgoingMessages.add( new RaftMessages.Directed<>( req.from(), appendResponse ) );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) );
break;
}

nextRole = FOLLOWER ;
outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), req ) );
outcome.setNextRole( FOLLOWER );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) );
break;
}

Expand All @@ -96,8 +80,8 @@ public <MEMBER> Outcome<MEMBER> handle( RaftMessages.Message<MEMBER> message,

if ( res.term() > ctx.term() )
{
newTerm = res.term();
nextRole = FOLLOWER;
outcome.setNextTerm( res.term() );
outcome.setNextRole( FOLLOWER );
break;
}
else if ( res.term() < ctx.term() || !res.voteGranted() )
Expand All @@ -107,19 +91,19 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )

if ( !res.from().equals( ctx.myself() ) )
{
updatedVotesForMe.add( res.from() );
outcome.addVoteForMe( res.from() );
}

if ( isQuorum( ctx.votingMembers().size(), updatedVotesForMe.size() ) )
if ( isQuorum( ctx.votingMembers().size(), outcome.votesForMe.size() ) )
{
log.info( "In term %d %s ELECTED AS LEADER voted for by %s%n",
ctx.term(), ctx.myself(), updatedVotesForMe );
ctx.term(), ctx.myself(), outcome.votesForMe );

leader = ctx.myself();
Leader.sendHeartbeats( ctx, outgoingMessages );
lastLogIndexBeforeWeBecameLeader = ctx.entryLog().appendIndex();
outcome.setLeader( ctx.myself() );
Leader.sendHeartbeats( ctx, outcome );

nextRole = LEADER;
outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() );
outcome.setNextRole( LEADER );
}
break;
}
Expand All @@ -130,27 +114,25 @@ else if ( res.term() < ctx.term() || !res.voteGranted() )

if ( req.term() > ctx.term() )
{
newTerm = req.term();
updatedVotesForMe.clear();
outcome.setNextTerm( req.term() );
outcome.votesForMe.clear();

nextRole = FOLLOWER;
outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), req ) );
outcome.setNextRole( FOLLOWER );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) );
break;
}

outgoingMessages.add( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( ctx.myself(), newTerm, false ) ) );
outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( ctx.myself(), outcome.term, false ) ) );
break;
}

case ELECTION_TIMEOUT:
{
nextRole = FOLLOWER;
outcome.setNextRole( FOLLOWER );
break;
}
}

return new Outcome<>( nextRole, newTerm, leader, leaderCommit, null, updatedVotesForMe, lastLogIndexBeforeWeBecameLeader, new FollowerStates<>(), false, logCommands,
// This effectively clears the local follower state
outgoingMessages, shipCommands );
return outcome;
}
}

0 comments on commit 88bf1ce

Please sign in to comment.