From 88bf1cef92e8051b8cc79a968cfd8a3db2b2f066 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 26 Nov 2015 17:50:39 +0100 Subject: [PATCH] Refactor outcome to encapsulate boiler plate and clarify logic in roles. Still a lot of work left to make this sane. For example how follower state gets updates. This commit should have no effect on observed behaviour. --- .../org/neo4j/coreedge/raft/RaftInstance.java | 2 +- .../neo4j/coreedge/raft/outcome/Outcome.java | 142 +++++++++++++++--- .../neo4j/coreedge/raft/roles/Candidate.java | 62 +++----- .../neo4j/coreedge/raft/roles/Follower.java | 79 ++++------ .../org/neo4j/coreedge/raft/roles/Leader.java | 71 ++++----- .../neo4j/coreedge/raft/state/RaftState.java | 2 +- .../coreedge/raft/roles/CandidateTest.java | 2 +- .../coreedge/raft/roles/FollowerTest.java | 2 +- .../neo4j/coreedge/raft/roles/LeaderTest.java | 6 +- .../coreedge/raft/state/RaftStateTest.java | 5 +- .../state/explorer/ComparableRaftState.java | 2 +- 11 files changed, 209 insertions(+), 166 deletions(-) diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 8fa07eb7727b..6c55436141b5 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -216,7 +216,7 @@ protected void handleOutcome( Outcome outcome ) throws RaftStorageExcept if( outcome.leader != null && outcome.leader.equals( myself ) ) { - LeaderContext leaderContext = new LeaderContext( outcome.newTerm, outcome.leaderCommit ); + LeaderContext leaderContext = new LeaderContext( outcome.term, outcome.leaderCommit ); if ( oldLeader == null || !oldLeader.equals( myself ) ) { diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java index 08ee04559c79..d4e60ea7b37a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/outcome/Outcome.java @@ -20,54 +20,136 @@ package org.neo4j.coreedge.raft.outcome; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Set; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.roles.Role; import org.neo4j.coreedge.raft.state.FollowerStates; +import org.neo4j.coreedge.raft.state.ReadableRaftState; public class Outcome implements Serializable { - public final Role newRole; - public final long newTerm; - public final MEMBER leader; - public final long leaderCommit; - public final MEMBER votedFor; - public final Set votesForMe; - public final long lastLogIndexBeforeWeBecameLeader; - public final FollowerStates followerStates; - public final boolean renewElectionTimeout; - public final Iterable logCommands; - public final Iterable shipCommands; - public final Collection> outgoingMessages; - - public Outcome(Role newRole, long newTerm, MEMBER leader, long leaderCommit, MEMBER votedFor, - Set votesForMe, long lastLogIndexBeforeWeBecameLeader, - FollowerStates followerStates, boolean renewElectionTimeout, - Iterable logCommands, Collection> outgoingMessages, - Iterable shipCommands ) + /* Common */ + public Role newRole; + + public long term; + public MEMBER leader; + + public long leaderCommit; + + public ArrayList logCommands = new ArrayList<>(); + public ArrayList> outgoingMessages = new ArrayList<>(); + + /* Follower */ + public MEMBER votedFor; + public boolean renewElectionTimeout; + + /* Candidate */ + public HashSet votesForMe; + public long lastLogIndexBeforeWeBecameLeader; + + /* Leader */ + public FollowerStates followerStates; + public ArrayList shipCommands = new ArrayList<>(); + + public Outcome( Role currentRole, ReadableRaftState ctx ) + { + defaults( currentRole, ctx ); + } + + public Outcome( Role newRole, long term, MEMBER leader, long leaderCommit, MEMBER votedFor, + Set votesForMe, long lastLogIndexBeforeWeBecameLeader, + FollowerStates followerStates, boolean renewElectionTimeout, + Collection logCommands, Collection> outgoingMessages, + Collection shipCommands ) { this.newRole = newRole; - this.newTerm = newTerm; + this.term = term; this.leader = leader; this.leaderCommit = leaderCommit; this.votedFor = votedFor; - this.votesForMe = votesForMe; + this.votesForMe = new HashSet<>( votesForMe ); this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader; this.followerStates = followerStates; this.renewElectionTimeout = renewElectionTimeout; - this.logCommands = logCommands; - this.outgoingMessages = outgoingMessages; - this.shipCommands = shipCommands; + + this.logCommands.addAll( logCommands ); + this.outgoingMessages.addAll( outgoingMessages ); + this.shipCommands.addAll( shipCommands ); + } + + private void defaults( Role currentRole, ReadableRaftState ctx ) + { + newRole = currentRole; + + term = ctx.term(); + leader = ctx.leader(); + + leaderCommit = ctx.leaderCommit(); + + votedFor = ctx.votedFor(); + renewElectionTimeout = false; + + votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : new HashSet<>(); + + lastLogIndexBeforeWeBecameLeader = (currentRole == Role.LEADER) ? ctx.lastLogIndexBeforeWeBecameLeader() : -1; + followerStates = (currentRole == Role.LEADER) ? ctx.followerStates() : new FollowerStates<>(); + } + + public void setNextRole( Role nextRole ) + { + this.newRole = nextRole; + } + + public void setNextTerm( long nextTerm ) + { + this.term = nextTerm; + } + + public void setLeader( MEMBER leader ) + { + this.leader = leader; + } + + public void setLeaderCommit( long leaderCommit ) + { + this.leaderCommit = leaderCommit; + } + + public void addLogCommand( LogCommand logCommand ) + { + this.logCommands.add( logCommand ); + } + + public void addOutgoingMessage( RaftMessages.Directed message ) + { + this.outgoingMessages.add( message ); + } + + public void setVotedFor( MEMBER votedFor ) + { + this.votedFor = votedFor; + } + + public void renewElectionTimeout() + { + this.renewElectionTimeout = true; + } + + public void addVoteForMe( MEMBER voteFrom ) + { + this.votesForMe.add( voteFrom ); } @Override public String toString() { return "Outcome{" + - "newRole=" + newRole + - ", newTerm=" + newTerm + + "nextRole=" + newRole + + ", newTerm=" + term + ", leader=" + leader + ", leaderCommit=" + leaderCommit + ", logCommands=" + logCommands + @@ -80,4 +162,14 @@ public String toString() ", outgoingMessages=" + outgoingMessages + '}'; } + + public void setLastLogIndexBeforeWeBecameLeader( long lastLogIndexBeforeWeBecameLeader ) + { + this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader; + } + + public void addShipCommand( ShipCommand shipCommand ) + { + shipCommands.add( shipCommand ); + } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java index 829784272f79..9f5c63e7bdab 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java @@ -19,18 +19,10 @@ */ package org.neo4j.coreedge.raft.roles; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; - -import org.neo4j.coreedge.raft.outcome.ShipCommand; import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.log.RaftStorageException; -import org.neo4j.coreedge.raft.outcome.LogCommand; import org.neo4j.coreedge.raft.outcome.Outcome; -import org.neo4j.coreedge.raft.state.FollowerStates; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.logging.Log; @@ -45,15 +37,7 @@ public class Candidate implements RaftMessageHandler public Outcome handle( RaftMessages.Message message, ReadableRaftState ctx, Log log ) throws RaftStorageException { - Role nextRole = CANDIDATE; - MEMBER leader = ctx.leader(); - long leaderCommit = ctx.leaderCommit(); - Collection> outgoingMessages = new ArrayList<>(); - ArrayList logCommands = new ArrayList<>(); - ArrayList shipCommands = new ArrayList<>(); - Set updatedVotesForMe = new HashSet<>( ctx.votesForMe() ); - long newTerm = ctx.term(); - long lastLogIndexBeforeWeBecameLeader = -1; + Outcome outcome = new Outcome<>( CANDIDATE, ctx ); switch ( message.type() ) { @@ -66,8 +50,8 @@ public Outcome handle( RaftMessages.Message message, break; } - nextRole = FOLLOWER; - outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), message ) ); + outcome.setNextRole( FOLLOWER ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); break; } @@ -81,12 +65,12 @@ public Outcome handle( RaftMessages.Message message, new RaftMessages.AppendEntries.Response<>( ctx.myself(), ctx.term(), false, req .prevLogIndex() ); - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), appendResponse ) ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); break; } - nextRole = FOLLOWER ; - outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), req ) ); + outcome.setNextRole( FOLLOWER ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); break; } @@ -96,8 +80,8 @@ public Outcome handle( RaftMessages.Message message, if ( res.term() > ctx.term() ) { - newTerm = res.term(); - nextRole = FOLLOWER; + outcome.setNextTerm( res.term() ); + outcome.setNextRole( FOLLOWER ); break; } else if ( res.term() < ctx.term() || !res.voteGranted() ) @@ -107,19 +91,19 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) if ( !res.from().equals( ctx.myself() ) ) { - updatedVotesForMe.add( res.from() ); + outcome.addVoteForMe( res.from() ); } - if ( isQuorum( ctx.votingMembers().size(), updatedVotesForMe.size() ) ) + if ( isQuorum( ctx.votingMembers().size(), outcome.votesForMe.size() ) ) { log.info( "In term %d %s ELECTED AS LEADER voted for by %s%n", - ctx.term(), ctx.myself(), updatedVotesForMe ); + ctx.term(), ctx.myself(), outcome.votesForMe ); - leader = ctx.myself(); - Leader.sendHeartbeats( ctx, outgoingMessages ); - lastLogIndexBeforeWeBecameLeader = ctx.entryLog().appendIndex(); + outcome.setLeader( ctx.myself() ); + Leader.sendHeartbeats( ctx, outcome ); - nextRole = LEADER; + outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() ); + outcome.setNextRole( LEADER ); } break; } @@ -130,27 +114,25 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) if ( req.term() > ctx.term() ) { - newTerm = req.term(); - updatedVotesForMe.clear(); + outcome.setNextTerm( req.term() ); + outcome.votesForMe.clear(); - nextRole = FOLLOWER; - outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), req ) ); + outcome.setNextRole( FOLLOWER ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); break; } - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( ctx.myself(), newTerm, false ) ) ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( ctx.myself(), outcome.term, false ) ) ); break; } case ELECTION_TIMEOUT: { - nextRole = FOLLOWER; + outcome.setNextRole( FOLLOWER ); break; } } - return new Outcome<>( nextRole, newTerm, leader, leaderCommit, null, updatedVotesForMe, lastLogIndexBeforeWeBecameLeader, new FollowerStates<>(), false, logCommands, - // This effectively clears the local follower state - outgoingMessages, shipCommands ); + return outcome; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java index 8e486317760b..e89ce1245636 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java @@ -19,13 +19,8 @@ */ package org.neo4j.coreedge.raft.roles; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; import java.util.Set; -import org.neo4j.coreedge.raft.outcome.ShipCommand; import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries; @@ -34,10 +29,8 @@ import org.neo4j.coreedge.raft.log.RaftStorageException; import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries; import org.neo4j.coreedge.raft.outcome.CommitCommand; -import org.neo4j.coreedge.raft.outcome.LogCommand; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; -import org.neo4j.coreedge.raft.state.FollowerStates; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.logging.Log; @@ -63,13 +56,13 @@ private static boolean logHistoryMatches( ReadableRaftState ctx } private static boolean generateCommitIfNecessary( ReadableRaftState ctx, AppendEntries - .Request req, List commands ) + .Request req, Outcome outcome ) { long localCommit = ctx.entryLog().commitIndex(); long newCommitIndex = min( req.leaderCommit(), req.prevLogIndex() + req.entries().length ); if ( newCommitIndex > localCommit ) { - commands.add( new CommitCommand( newCommitIndex ) ); + outcome.addLogCommand( new CommitCommand( newCommitIndex ) ); return true; } return false; @@ -79,15 +72,7 @@ private static boolean generateCommitIfNecessary( ReadableRaftState Outcome handle( RaftMessages.Message message, ReadableRaftState ctx, Log log ) throws RaftStorageException { - Role nextRole = FOLLOWER; - MEMBER leader = ctx.leader(); - long leaderCommit = ctx.leaderCommit(); - Collection> outgoingMessages = new ArrayList<>(); - List logCommands = new ArrayList<>(); - ArrayList shipCommands = new ArrayList<>(); - MEMBER votedFor = ctx.votedFor(); - long newTerm = ctx.term(); - boolean renewElectionTimeout = false; + Outcome outcome = new Outcome<>( FOLLOWER, ctx ); switch ( message.type() ) { @@ -102,10 +87,10 @@ public Outcome handle( RaftMessages.Message message, Re if ( logHistoryMatches( ctx, req ) ) { - logCommands.add( new CommitCommand( req.commitIndex() ) ); + outcome.addLogCommand( new CommitCommand( req.commitIndex() ) ); } - renewElectionTimeout = true; + outcome.renewElectionTimeout(); break; } @@ -117,20 +102,21 @@ public Outcome handle( RaftMessages.Message message, Re { Response appendResponse = new Response<>( ctx.myself(), ctx.term(), false, req.prevLogIndex() ); - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), appendResponse ) ); + + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); break; } - renewElectionTimeout = true; - newTerm = req.leaderTerm(); - leader = req.from(); - leaderCommit = req.leaderCommit(); + outcome.renewElectionTimeout(); + outcome.setNextTerm( req.leaderTerm() ); + outcome.setLeader( req.from() ); + outcome.setLeaderCommit( req.leaderCommit() ); if ( !logHistoryMatches( ctx, req ) ) { assert req.prevLogIndex() > -1 && req.prevLogTerm() > -1; - Response appendResponse = new Response<>( ctx.myself(), newTerm, false, -1 ); - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), appendResponse ) ); + Response appendResponse = new Response<>( ctx.myself(), req.leaderTerm(), false, -1 ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); break; } @@ -149,22 +135,22 @@ public Outcome handle( RaftMessages.Message message, Re } else if ( logTerm != req.entries()[offset].term() ) { - logCommands.add( new TruncateLogCommand( baseIndex + offset ) ); + outcome.addLogCommand( new TruncateLogCommand( baseIndex + offset ) ); break; } } if( offset < req.entries().length ) { - logCommands.add( new BatchAppendLogEntries( baseIndex, offset, req.entries() ) ); + outcome.addLogCommand( new BatchAppendLogEntries( baseIndex, offset, req.entries() ) ); } - generateCommitIfNecessary( ctx, req, logCommands ); + generateCommitIfNecessary( ctx, req, outcome ); long endMatchIndex = req.prevLogIndex() + req.entries().length; // this is the index of the last incoming entry if ( endMatchIndex >= 0 ) { - Response appendResponse = new Response<>( ctx.myself(), newTerm, true, endMatchIndex ); - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), appendResponse ) ); + Response appendResponse = new Response<>( ctx.myself(), req.leaderTerm(), true, endMatchIndex ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); } break; } @@ -175,23 +161,23 @@ else if ( logTerm != req.entries()[offset].term() ) if ( req.term() > ctx.term() ) { - newTerm = req.term(); - votedFor = null; + outcome.setNextTerm( req.term() ); + outcome.setVotedFor( null ); } - boolean willVoteForCandidate = shouldVoteFor( req.candidate(), req.term(), newTerm, ctx + boolean willVoteForCandidate = shouldVoteFor( req.candidate(), req.term(), outcome.term, ctx .entryLog().appendIndex(), req.lastLogIndex(), ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ), - req.lastLogTerm(), votedFor ); + req.lastLogTerm(), outcome.votedFor ); if ( willVoteForCandidate ) { - votedFor = req.from(); - renewElectionTimeout = true; + outcome.setVotedFor( req.from() ); + outcome.renewElectionTimeout(); } - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( - ctx.myself(), newTerm, + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( + ctx.myself(), outcome.term, willVoteForCandidate ) ) ); break; } @@ -204,27 +190,26 @@ else if ( logTerm != req.entries()[offset].term() ) break; } - newTerm++; + outcome.setNextTerm( ctx.term() + 1 ); RaftMessages.Vote.Request voteForMe = - new RaftMessages.Vote.Request<>( ctx.myself(), newTerm, ctx.myself(), ctx.entryLog() + new RaftMessages.Vote.Request<>( ctx.myself(), outcome.term, ctx.myself(), ctx.entryLog() .appendIndex(), ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ) ); for ( MEMBER member : currentMembers ) { if ( !member.equals( ctx.myself() ) ) { - outgoingMessages.add( new RaftMessages.Directed<>( member, voteForMe ) ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( member, voteForMe ) ); } } - votedFor = ctx.myself(); - nextRole = CANDIDATE; + outcome.setVotedFor( ctx.myself() ); + outcome.setNextRole( CANDIDATE ); break; } } - return new Outcome<>( nextRole, newTerm, leader, leaderCommit, votedFor, Collections.emptySet(), -1, - new FollowerStates<>(), renewElectionTimeout, logCommands, outgoingMessages, shipCommands ); + return outcome; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index 3d91d192c01e..a3e4eac2f938 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -19,11 +19,6 @@ */ package org.neo4j.coreedge.raft.roles; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; - -import org.neo4j.coreedge.raft.outcome.ShipCommand; import org.neo4j.coreedge.raft.Followers; import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; @@ -32,18 +27,17 @@ import org.neo4j.coreedge.raft.log.RaftStorageException; import org.neo4j.coreedge.raft.outcome.AppendLogEntry; import org.neo4j.coreedge.raft.outcome.CommitCommand; -import org.neo4j.coreedge.raft.outcome.LogCommand; import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.outcome.ShipCommand; +import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.FollowerState; import org.neo4j.coreedge.raft.state.FollowerStates; import org.neo4j.coreedge.raft.state.ReadableRaftState; -import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.function.Predicate; import org.neo4j.helpers.collection.FilteringIterable; import org.neo4j.logging.Log; import static java.lang.Math.max; - import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; import static org.neo4j.coreedge.raft.roles.Role.LEADER; @@ -54,15 +48,14 @@ public static Iterable replicationTargets( final ReadableRaftSt return new FilteringIterable<>( ctx.replicationMembers(), (Predicate) member -> !member.equals( ctx.myself() ) ); } - static void sendHeartbeats( ReadableRaftState ctx, Collection> - outgoingMessages ) throws RaftStorageException + static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws RaftStorageException { for ( MEMBER to : replicationTargets( ctx ) ) { long commitIndex = ctx.leaderCommit(); long commitIndexTerm = ctx.entryLog().readEntryTerm( commitIndex ); Heartbeat heartbeat = new Heartbeat<>( ctx.myself(), ctx.term(), commitIndex, commitIndexTerm ); - outgoingMessages.add( new RaftMessages.Directed<>( to, heartbeat ) ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( to, heartbeat ) ); } } @@ -70,13 +63,7 @@ static void sendHeartbeats( ReadableRaftState ctx, Collection Outcome handle( RaftMessages.Message message, ReadableRaftState ctx, Log log ) throws RaftStorageException { - Role nextRole = LEADER; - long leaderCommit = ctx.entryLog().commitIndex(); - Collection> outgoingMessages = new ArrayList<>(); - ArrayList logCommands = new ArrayList<>(); - ArrayList shipCommands = new ArrayList<>(); - long newTerm = ctx.term(); - FollowerStates updatedFollowerStates = ctx.followerStates(); + Outcome outcome = new Outcome<>( LEADER, ctx ); switch ( message.type() ) { @@ -89,14 +76,14 @@ public Outcome handle( RaftMessages.Message message, break; } - nextRole = FOLLOWER; - outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), message ) ); + outcome.setNextRole( FOLLOWER ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); break; } case HEARTBEAT_TIMEOUT: { - sendHeartbeats( ctx, outgoingMessages ); + sendHeartbeats( ctx, outcome ); break; } @@ -109,7 +96,7 @@ public Outcome handle( RaftMessages.Message message, RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( ctx.myself(), ctx.term(), false, req.prevLogIndex() ); - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), appendResponse ) ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); break; } else if ( req.leaderTerm() == ctx.term() ) @@ -119,8 +106,8 @@ else if ( req.leaderTerm() == ctx.term() ) else { // There is a new leader in a later term, we should revert to follower. (ยง5.1) - nextRole = FOLLOWER; - outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), message ) ); + outcome.setNextRole( FOLLOWER ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); break; } } @@ -136,9 +123,9 @@ else if ( req.leaderTerm() == ctx.term() ) } else if ( res.term() > ctx.term() ) { - newTerm = res.term(); - nextRole = FOLLOWER; - updatedFollowerStates = new FollowerStates<>(); + outcome.setNextTerm( res.term() ); + outcome.setNextRole( FOLLOWER ); + outcome.followerStates = new FollowerStates<>(); break; } @@ -150,10 +137,10 @@ else if ( res.term() > ctx.term() ) boolean followerProgressed = res.matchIndex() > follower.getMatchIndex(); - updatedFollowerStates = updatedFollowerStates.onSuccessResponse( res.from(), + outcome.followerStates = outcome.followerStates.onSuccessResponse( res.from(), max( res.matchIndex(), follower.getMatchIndex() ) ); - shipCommands.add( new ShipCommand.Match( res.matchIndex(), res.from() ) ); + outcome.addShipCommand( new ShipCommand.Match( res.matchIndex(), res.from() ) ); /* * Matches from older terms can in complicated leadership change / log truncation scenarios @@ -171,19 +158,19 @@ else if ( res.term() > ctx.term() ) { // TODO: Test that mismatch between voting and participating members affects commit outcome - long quorumAppendIndex = Followers.quorumAppendIndex( ctx.votingMembers(), updatedFollowerStates ); + long quorumAppendIndex = Followers.quorumAppendIndex( ctx.votingMembers(), outcome.followerStates ); if ( quorumAppendIndex > ctx.entryLog().commitIndex() ) { - leaderCommit = quorumAppendIndex; + outcome.setLeaderCommit( quorumAppendIndex ); - logCommands.add( new CommitCommand( quorumAppendIndex ) ); - shipCommands.add( new ShipCommand.CommitUpdate() ); + outcome.addLogCommand( new CommitCommand( quorumAppendIndex ) ); + outcome.addShipCommand( new ShipCommand.CommitUpdate() ); } } } else // Response indicated failure. Must go back a log entry and retry - this is where catchup happens { - shipCommands.add ( new ShipCommand.Mismatch( ctx.entryLog().appendIndex(), res.from() ) ); // TODO: Fix remote last appended parameter. + outcome.addShipCommand( new ShipCommand.Mismatch( ctx.entryLog().appendIndex(), res.from() ) ); // TODO: Fix remote last appended parameter. } break; @@ -195,14 +182,14 @@ else if ( res.term() > ctx.term() ) if ( req.term() > ctx.term() ) { - newTerm = req.term(); + outcome.setNextTerm( req.term() ); - nextRole = FOLLOWER; - outgoingMessages.add( new RaftMessages.Directed<>( ctx.myself(), req ) ); + outcome.setNextRole( FOLLOWER ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); break; } - outgoingMessages.add( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( ctx.myself(), ctx.term(), false ) ) ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( ctx.myself(), ctx.term(), false ) ) ); break; } @@ -219,14 +206,12 @@ else if ( res.term() > ctx.term() ) RaftLogEntry newLogEntry = new RaftLogEntry( ctx.term(), content ); - shipCommands.add( new ShipCommand.NewEntry( prevLogIndex, prevLogTerm, newLogEntry ) ); - logCommands.add( new AppendLogEntry( prevLogIndex + 1, newLogEntry ) ); + outcome.addShipCommand( new ShipCommand.NewEntry( prevLogIndex, prevLogTerm, newLogEntry ) ); + outcome.addLogCommand( new AppendLogEntry( prevLogIndex + 1, newLogEntry ) ); break; } } - return new Outcome<>( nextRole, newTerm, ctx.leader(), leaderCommit, null, - Collections.emptySet(), ctx.lastLogIndexBeforeWeBecameLeader(), updatedFollowerStates, false, - logCommands, outgoingMessages, shipCommands ); + return outcome; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java index 690264d6c2ca..6a3285f40483 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/state/RaftState.java @@ -120,7 +120,7 @@ public ReadableRaftLog entryLog() public void update( Outcome outcome ) throws RaftStorageException { - termStore.update( outcome.newTerm ); + termStore.update( outcome.term ); voteStore.update( outcome.votedFor ); leader = outcome.leader; leaderCommit = outcome.leaderCommit; diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java index 9b73f624d156..1b5ba4db4c34 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java @@ -79,7 +79,7 @@ public void candidateShouldUpdateTermToCurrentMessageAndBecomeFollower() throws // then assertEquals( FOLLOWER, outcome.newRole ); - assertEquals( HIGHEST_TERM, outcome.newTerm ); + assertEquals( HIGHEST_TERM, outcome.term ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java index 6060efba9573..c5a2822a7f6a 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java @@ -95,7 +95,7 @@ public void followerShouldUpdateTermToCurrentMessage() throws Exception .build(), state, log() ); // Then - assertEquals( HIGHEST_TERM, outcome.newTerm ); + assertEquals( HIGHEST_TERM, outcome.term ); } @Test diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index 86a2747bfa99..be727f6f6bd2 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -101,7 +101,7 @@ public void leaderShouldUpdateTermToCurrentMessageAndBecomeFollower() throws Exc // then assertEquals( FOLLOWER, outcome.newRole ); - assertEquals( HIGHEST_TERM, outcome.newTerm ); + assertEquals( HIGHEST_TERM, outcome.term ); } @Test @@ -125,7 +125,7 @@ public void leaderShouldRejectVoteRequestWithNewerTermAndBecomeAFollower() throw assertEquals( message, messageFor( outcome, myself ) ); assertEquals( FOLLOWER, outcome.newRole ); assertEquals( 0, count( outcome.logCommands ) ); - assertEquals( state.term() + 1, outcome.newTerm ); + assertEquals( state.term() + 1, outcome.term ); } @Test @@ -389,7 +389,7 @@ public void leaderShouldRejectAppendEntriesResponseWithNewerTermAndBecomeAFollow assertEquals( 0, count( outcome.outgoingMessages ) ); assertEquals( FOLLOWER, outcome.newRole ); assertEquals( 0, count( outcome.logCommands ) ); - assertEquals( state.term() + 1, outcome.newTerm ); + assertEquals( state.term() + 1, outcome.term ); } // TODO: test that shows we don't commit for previous terms diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java index 3db2101fdcbe..8dfabd8a4fda 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/RaftStateTest.java @@ -40,7 +40,6 @@ import static org.mockito.Mockito.mock; import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; -import static org.neo4j.helpers.collection.IteratorUtil.asIterable; public class RaftStateTest { @@ -72,9 +71,9 @@ private FollowerStates initialFollowerStates() return new FollowerStates<>( new FollowerStates<>(), new RaftTestMember( 1 ), new FollowerState() ); } - private Iterable emptyLogCommands() + private Collection emptyLogCommands() { - return asIterable( Collections.emptyIterator() ); + return Collections.emptyList(); } private class FakeMembership implements RaftMembership diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java index abebd67fae1a..691b0f7417bf 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ComparableRaftState.java @@ -132,7 +132,7 @@ public ReadableRaftLog entryLog() public void update( Outcome outcome ) throws RaftStorageException { - term = outcome.newTerm; + term = outcome.term; votedFor = outcome.votedFor; leader = outcome.leader; votesForMe = outcome.votesForMe;