From a0ccdf1f5b0e9dadb1dfb42d7f7418f3c1951e6c Mon Sep 17 00:00:00 2001 From: Alistair Jones Date: Mon, 25 Jan 2016 17:04:47 +0000 Subject: [PATCH] Inline RAFT message processing. Process RAFT messages directly inside the role handlers, thereby removing the need to change role, and having to send the message back for later processing by that role. The tests have been restructured to reflect the requirement for some messages to be handled in mostly the same way, regardless of the current role. --- .../org/neo4j/coreedge/raft/RaftInstance.java | 2 +- .../org/neo4j/coreedge/raft/RaftMessages.java | 8 +- .../neo4j/coreedge/raft/roles/Appending.java | 113 +++++ .../neo4j/coreedge/raft/roles/Candidate.java | 10 +- .../neo4j/coreedge/raft/roles/Follower.java | 112 +--- .../org/neo4j/coreedge/raft/roles/Heart.java | 48 ++ .../org/neo4j/coreedge/raft/roles/Leader.java | 27 +- .../org/neo4j/coreedge/raft/roles/Role.java | 6 +- .../raft/{Ballot.java => roles/Voting.java} | 34 +- .../org/neo4j/coreedge/raft/MessageUtils.java | 16 +- .../raft/{BallotTest.java => VotingTest.java} | 26 +- .../raft/roles/AppendEntriesRequestTest.java | 312 ++++++++++++ .../coreedge/raft/roles/CandidateTest.java | 122 ++--- .../coreedge/raft/roles/FollowerTest.java | 477 +----------------- .../coreedge/raft/roles/HeartbeatTest.java | 147 ++++++ .../neo4j/coreedge/raft/roles/LeaderTest.java | 64 --- .../roles/NonFollowerVoteRequestTest.java | 103 ++++ .../coreedge/raft/roles/VoteRequestTest.java | 184 +++++++ .../explorer/ClusterSafetyViolations.java | 2 +- .../state/explorer/action/ProcessMessage.java | 2 +- 20 files changed, 1035 insertions(+), 780 deletions(-) create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java create mode 100644 enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java rename enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/{Ballot.java => roles/Voting.java} (57%) rename enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/{BallotTest.java => VotingTest.java} (90%) create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java create mode 100644 enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java index 8d9732c7bf07..781f2ae02cc7 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftInstance.java @@ -245,7 +245,7 @@ public synchronized void handle( Serializable incomingMessage ) try { handlingMessage = true; - Outcome outcome = currentRole.role.handle( (RaftMessages.Message) incomingMessage, state, log ); + Outcome outcome = currentRole.handler.handle( (RaftMessages.Message) incomingMessage, state, log ); handleOutcome( outcome ); currentRole = outcome.getNewRole(); diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java index fb2a260205b9..7f4bccd25be1 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/RaftMessages.java @@ -78,6 +78,12 @@ public Message message() { return message; } + + @Override + public String toString() + { + return format( "Directed{to=%s, message=%s}", to, message ); + } } interface Vote @@ -355,7 +361,7 @@ public int hashCode() @Override public String toString() { - return String.format( "AppendEntries.Response from %s {term=%d, success=%s, matchIndex=%d, appendIndex=%d}", + return format( "AppendEntries.Response from %s {term=%d, success=%s, matchIndex=%d, appendIndex=%d}", from, term, success, matchIndex, appendIndex ); } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java new file mode 100644 index 000000000000..ab7a741201cd --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Appending.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.roles; + +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.outcome.AppendLogEntry; +import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.outcome.ShipCommand; +import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; +import org.neo4j.coreedge.raft.replication.ReplicatedContent; +import org.neo4j.coreedge.raft.state.ReadableRaftState; + +public class Appending +{ + public static void handleAppendEntriesRequest( + ReadableRaftState state, Outcome outcome, RaftMessages.AppendEntries.Request request ) + throws RaftStorageException + { + if ( request.leaderTerm() < state.term() ) + { + RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( + state.myself(), state.term(), false, -1, state.entryLog().appendIndex() ); + + outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) ); + return; + } + + outcome.renewElectionTimeout(); + outcome.setNextTerm( request.leaderTerm() ); + outcome.setLeader( request.from() ); + outcome.setLeaderCommit( request.leaderCommit() ); + + if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm() ) ) + { + assert request.prevLogIndex() > -1 && request.prevLogTerm() > -1; + RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( + state.myself(), request.leaderTerm(), false, -1, state.entryLog().appendIndex() ); + + outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) ); + return; + } + + long baseIndex = request.prevLogIndex() + 1; + int offset; + + /* Find possible truncation point. */ + for ( offset = 0; offset < request.entries().length; offset++ ) + { + long logTerm = state.entryLog().readEntryTerm( baseIndex + offset ); + + if( baseIndex + offset > state.entryLog().appendIndex() ) + { + /* entry doesn't exist */ + break; + } + else if ( logTerm != request.entries()[offset].term() ) + { + outcome.addLogCommand( new TruncateLogCommand( baseIndex + offset ) ); + break; + } + } + + if( offset < request.entries().length ) + { + outcome.addLogCommand( new BatchAppendLogEntries( baseIndex, offset, request.entries() ) ); + } + + Follower.commitToLogOnUpdate( state, request.prevLogIndex() + request.entries().length, request.leaderCommit + (), outcome ); + + long endMatchIndex = request.prevLogIndex() + request.entries().length; // this is the index of the last incoming entry + if ( endMatchIndex >= 0 ) + { + RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response<>( state.myself(), request.leaderTerm(), true, endMatchIndex, endMatchIndex ); + outcome.addOutgoingMessage( new RaftMessages.Directed<>( request.from(), appendResponse ) ); + } + } + + public static void appendNewEntry( ReadableRaftState ctx, Outcome outcome, ReplicatedContent + content ) throws RaftStorageException + { + long prevLogIndex = ctx.entryLog().appendIndex(); + long prevLogTerm = prevLogIndex == -1 ? -1 : + prevLogIndex > ctx.lastLogIndexBeforeWeBecameLeader() ? + ctx.term() : + ctx.entryLog().readLogEntry( prevLogIndex ).term(); + + RaftLogEntry newLogEntry = new RaftLogEntry( ctx.term(), content ); + + outcome.addShipCommand( new ShipCommand.NewEntry( prevLogIndex, prevLogTerm, newLogEntry ) ); + outcome.addLogCommand( new AppendLogEntry( prevLogIndex + 1, newLogEntry ) ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java index d0d0751d4ce0..eb1aa909d1d6 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Candidate.java @@ -52,7 +52,7 @@ public Outcome handle( RaftMessages.Message message, } outcome.setNextRole( FOLLOWER ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); + Heart.beat( ctx, outcome, (RaftMessages.Heartbeat) message ); break; } @@ -71,7 +71,7 @@ public Outcome handle( RaftMessages.Message message, } outcome.setNextRole( FOLLOWER ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); + Appending.handleAppendEntriesRequest( ctx, outcome, req ); break; } @@ -101,7 +101,7 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) ctx.term(), ctx.myself(), outcome.getVotesForMe() ); outcome.setLeader( ctx.myself() ); - Leader.appendNewEntry( ctx, outcome, new NewLeaderBarrier() ); + Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() ); outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() ); outcome.setNextRole( LEADER ); @@ -115,11 +115,9 @@ else if ( res.term() < ctx.term() || !res.voteGranted() ) if ( req.term() > ctx.term() ) { - outcome.setNextTerm( req.term() ); outcome.getVotesForMe().clear(); - outcome.setNextRole( FOLLOWER ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); + Voting.handleVoteRequest( ctx, outcome, req ); break; } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java index e69976de6d81..ce6371669877 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Follower.java @@ -24,24 +24,21 @@ import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.AppendEntries; -import org.neo4j.coreedge.raft.RaftMessages.AppendEntries.Response; import org.neo4j.coreedge.raft.RaftMessages.Heartbeat; import org.neo4j.coreedge.raft.log.RaftStorageException; -import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries; import org.neo4j.coreedge.raft.outcome.CommitCommand; import org.neo4j.coreedge.raft.outcome.Outcome; -import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; import org.neo4j.coreedge.raft.state.ReadableRaftState; import org.neo4j.logging.Log; import static java.lang.Long.min; -import static org.neo4j.coreedge.raft.Ballot.shouldVoteFor; + import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; public class Follower implements RaftMessageHandler { - private static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm ) + public static boolean logHistoryMatches( ReadableRaftState ctx, long prevLogIndex, long prevLogTerm ) throws RaftStorageException { // NOTE: A previous log index of -1 means no history, @@ -52,7 +49,7 @@ private static boolean logHistoryMatches( ReadableRaftState ctx return prevLogIndex == -1 || ctx.entryLog().readEntryTerm( prevLogIndex ) == prevLogTerm; } - private static boolean commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry, + public static boolean commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry, long leaderCommit, Outcome outcome ) { long newCommitIndex = min( leaderCommit, indexOfLastNewEntry ); @@ -75,115 +72,19 @@ public Outcome handle( RaftMessages.Message message, Re { case HEARTBEAT: { - Heartbeat req = (Heartbeat) message; - - if ( req.leaderTerm() < ctx.term() ) - { - break; - } - - outcome.renewElectionTimeout(); - outcome.setNextTerm( req.leaderTerm() ); - outcome.setLeader( req.from() ); - outcome.setLeaderCommit( req.commitIndex() ); - - if ( !logHistoryMatches( ctx, req.commitIndex(), req.commitIndexTerm() ) ) - { - break; - } - - commitToLogOnUpdate( ctx, req.commitIndex(), req.commitIndex(), outcome ); + Heart.beat( ctx, outcome, (Heartbeat) message ); break; } case APPEND_ENTRIES_REQUEST: { - AppendEntries.Request req = (AppendEntries.Request) message; - - if ( req.leaderTerm() < ctx.term() ) - { - Response appendResponse = new Response<>( - ctx.myself(), ctx.term(), false, -1, ctx.entryLog().appendIndex() ); - - outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); - break; - } - - outcome.renewElectionTimeout(); - outcome.setNextTerm( req.leaderTerm() ); - outcome.setLeader( req.from() ); - outcome.setLeaderCommit( req.leaderCommit() ); - - if ( !logHistoryMatches( ctx, req.prevLogIndex(), req.prevLogTerm() ) ) - { - assert req.prevLogIndex() > -1 && req.prevLogTerm() > -1; - Response appendResponse = new Response<>( - ctx.myself(), req.leaderTerm(), false, -1, ctx.entryLog().appendIndex() ); - - outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); - break; - } - - long baseIndex = req.prevLogIndex() + 1; - int offset; - - /* Find possible truncation point. */ - for ( offset = 0; offset < req.entries().length; offset++ ) - { - long logTerm = ctx.entryLog().readEntryTerm( baseIndex + offset ); - - if( baseIndex + offset > ctx.entryLog().appendIndex() ) - { - /* entry doesn't exist */ - break; - } - else if ( logTerm != req.entries()[offset].term() ) - { - outcome.addLogCommand( new TruncateLogCommand( baseIndex + offset ) ); - break; - } - } - - if( offset < req.entries().length ) - { - outcome.addLogCommand( new BatchAppendLogEntries( baseIndex, offset, req.entries() ) ); - } - - commitToLogOnUpdate( ctx, req.prevLogIndex() + req.entries().length, req.leaderCommit(), outcome ); - - long endMatchIndex = req.prevLogIndex() + req.entries().length; // this is the index of the last incoming entry - if ( endMatchIndex >= 0 ) - { - Response appendResponse = new Response<>( ctx.myself(), req.leaderTerm(), true, endMatchIndex, endMatchIndex ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), appendResponse ) ); - } + Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message ); break; } case VOTE_REQUEST: { - RaftMessages.Vote.Request req = (RaftMessages.Vote.Request) message; - - if ( req.term() > ctx.term() ) - { - outcome.setNextTerm( req.term() ); - outcome.setVotedFor( null ); - } - - boolean willVoteForCandidate = shouldVoteFor( req.candidate(), outcome.getTerm(), req.term(), - ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ), req.lastLogTerm(), - ctx.entryLog().appendIndex(), req.lastLogIndex(), - outcome.getVotedFor() ); - - if ( willVoteForCandidate ) - { - outcome.setVotedFor( req.from() ); - outcome.renewElectionTimeout(); - } - - outcome.addOutgoingMessage( new RaftMessages.Directed<>( req.from(), new RaftMessages.Vote.Response<>( - ctx.myself(), outcome.getTerm(), - willVoteForCandidate ) ) ); + Voting.handleVoteRequest( ctx, outcome, (RaftMessages.Vote.Request) message ); break; } @@ -217,4 +118,5 @@ else if ( logTerm != req.entries()[offset].term() ) return outcome; } + } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java new file mode 100644 index 000000000000..573d9d7fbb7b --- /dev/null +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Heart.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.roles; + +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.state.ReadableRaftState; + +public class Heart +{ + public static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbeat request ) throws RaftStorageException + { + if ( request.leaderTerm() < state.term() ) + { + return; + } + + outcome.renewElectionTimeout(); + outcome.setNextTerm( request.leaderTerm() ); + outcome.setLeader( request.from() ); + outcome.setLeaderCommit( request.commitIndex() ); + + if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm() ) ) + { + return; + } + + Follower.commitToLogOnUpdate( state, request.commitIndex(), request.commitIndex(), outcome ); + } +} diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java index 52fc8d3f3b9d..9e4e77ad57ab 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Leader.java @@ -23,9 +23,7 @@ import org.neo4j.coreedge.raft.RaftMessageHandler; import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.Heartbeat; -import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.RaftStorageException; -import org.neo4j.coreedge.raft.outcome.AppendLogEntry; import org.neo4j.coreedge.raft.outcome.CommitCommand; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.outcome.ShipCommand; @@ -58,21 +56,6 @@ static void sendHeartbeats( ReadableRaftState ctx, Outcome void appendNewEntry( ReadableRaftState ctx, Outcome outcome, ReplicatedContent - content ) throws RaftStorageException - { - long prevLogIndex = ctx.entryLog().appendIndex(); - long prevLogTerm = prevLogIndex == -1 ? -1 : - prevLogIndex > ctx.lastLogIndexBeforeWeBecameLeader() ? - ctx.term() : - ctx.entryLog().readLogEntry( prevLogIndex ).term(); - - RaftLogEntry newLogEntry = new RaftLogEntry( ctx.term(), content ); - - outcome.addShipCommand( new ShipCommand.NewEntry( prevLogIndex, prevLogTerm, newLogEntry ) ); - outcome.addLogCommand( new AppendLogEntry( prevLogIndex + 1, newLogEntry ) ); - } - @Override public Outcome handle( RaftMessages.Message message, ReadableRaftState ctx, Log log ) throws RaftStorageException @@ -91,7 +74,7 @@ public Outcome handle( RaftMessages.Message message, } outcome.setNextRole( FOLLOWER ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); + Heart.beat( ctx, outcome, (Heartbeat) message ); break; } @@ -122,7 +105,7 @@ else if ( req.leaderTerm() == ctx.term() ) { // There is a new leader in a later term, we should revert to follower. (ยง5.1) outcome.setNextRole( FOLLOWER ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), message ) ); + Appending.handleAppendEntriesRequest( ctx, outcome, req ); break; } } @@ -197,10 +180,8 @@ else if ( res.term() > ctx.term() ) if ( req.term() > ctx.term() ) { - outcome.setNextTerm( req.term() ); - outcome.setNextRole( FOLLOWER ); - outcome.addOutgoingMessage( new RaftMessages.Directed<>( ctx.myself(), req ) ); + Voting.handleVoteRequest( ctx, outcome, req ); break; } @@ -213,7 +194,7 @@ else if ( res.term() > ctx.term() ) RaftMessages.NewEntry.Request req = (RaftMessages.NewEntry.Request) message; ReplicatedContent content = req.content(); - appendNewEntry( ctx, outcome, content ); + Appending.appendNewEntry( ctx, outcome, content ); break; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Role.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Role.java index f451f7b6ac14..2b231f8e2226 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Role.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Role.java @@ -27,10 +27,10 @@ public enum Role CANDIDATE( new Candidate() ), LEADER( new Leader() ); - public final RaftMessageHandler role; + public final RaftMessageHandler handler; - Role( RaftMessageHandler role ) + Role( RaftMessageHandler handler ) { - this.role = role; + this.handler = handler; } } diff --git a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/Ballot.java b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java similarity index 57% rename from enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/Ballot.java rename to enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java index 8516cb009cf7..c3e954ed721a 100644 --- a/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/Ballot.java +++ b/enterprise/core-edge/src/main/java/org/neo4j/coreedge/raft/roles/Voting.java @@ -17,10 +17,40 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -package org.neo4j.coreedge.raft; +package org.neo4j.coreedge.raft.roles; -public class Ballot +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.state.ReadableRaftState; + +public class Voting { + public static void handleVoteRequest( ReadableRaftState state, Outcome outcome, + RaftMessages.Vote.Request voteRequest ) throws RaftStorageException + { + if ( voteRequest.term() > state.term() ) + { + outcome.setNextTerm( voteRequest.term() ); + outcome.setVotedFor( null ); + } + + boolean willVoteForCandidate = shouldVoteFor( voteRequest.candidate(), outcome.getTerm(), voteRequest.term(), + state.entryLog().readEntryTerm( state.entryLog().appendIndex() ), voteRequest.lastLogTerm(), + state.entryLog().appendIndex(), voteRequest.lastLogIndex(), + outcome.getVotedFor() ); + + if ( willVoteForCandidate ) + { + outcome.setVotedFor( voteRequest.from() ); + outcome.renewElectionTimeout(); + } + + outcome.addOutgoingMessage( new RaftMessages.Directed<>( voteRequest.from(), new RaftMessages.Vote.Response<>( + state.myself(), outcome.getTerm(), + willVoteForCandidate ) ) ); + } + public static boolean shouldVoteFor( MEMBER candidate, long contextTerm, long requestTerm, long contextLastLogTerm, long requestLastLogTerm, long contextLastAppended, long requestLastLogIndex, diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/MessageUtils.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/MessageUtils.java index a53c0c3b55eb..b9903e87faaa 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/MessageUtils.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/MessageUtils.java @@ -19,6 +19,7 @@ */ package org.neo4j.coreedge.raft; +import java.util.NoSuchElementException; import java.util.function.Predicate; import org.neo4j.coreedge.raft.outcome.Outcome; @@ -26,11 +27,24 @@ import org.neo4j.helpers.collection.FilteringIterable; import org.neo4j.helpers.collection.IteratorUtil; +import static java.lang.String.format; + +import static org.junit.Assert.fail; + public class MessageUtils { public static RaftMessages.Message messageFor( Outcome outcome, final RaftTestMember member ) { Predicate> selectMember = message -> message.to() == member; - return IteratorUtil.single( new FilteringIterable<>( outcome.getOutgoingMessages(), selectMember ) ).message(); + try + { + return IteratorUtil.single( new FilteringIterable<>( outcome.getOutgoingMessages(), selectMember ) ) + .message(); + } + catch ( NoSuchElementException e ) + { + throw new AssertionError( format( "Expected message for %s, but outcome only contains %s.", + member, outcome.getOutgoingMessages() ) ); + } } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BallotTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VotingTest.java similarity index 90% rename from enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BallotTest.java rename to enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VotingTest.java index f753e9c71ae9..36357f535bbf 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/BallotTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/VotingTest.java @@ -23,11 +23,13 @@ import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; +import org.neo4j.coreedge.raft.roles.Voting; + import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @RunWith(MockitoJUnitRunner.class) -public class BallotTest +public class VotingTest { Object candidate = new Object(); Object otherMember = new Object(); @@ -39,7 +41,7 @@ public class BallotTest @Test public void shouldAcceptRequestWithIdenticalLog() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -54,7 +56,7 @@ public void shouldAcceptRequestWithIdenticalLog() @Test public void shouldRejectRequestFromOldTerm() { - assertFalse( Ballot.shouldVoteFor( + assertFalse( Voting.shouldVoteFor( candidate, currentTerm, currentTerm - 1, @@ -69,7 +71,7 @@ public void shouldRejectRequestFromOldTerm() @Test public void shouldRejectRequestIfCandidateLogEndsAtLowerTerm() { - assertFalse( Ballot.shouldVoteFor( + assertFalse( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -84,7 +86,7 @@ public void shouldRejectRequestIfCandidateLogEndsAtLowerTerm() @Test public void shouldRejectRequestIfLogsEndInSameTermButCandidateLogIsShorter() { - assertFalse( Ballot.shouldVoteFor( + assertFalse( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -99,7 +101,7 @@ public void shouldRejectRequestIfLogsEndInSameTermButCandidateLogIsShorter() @Test public void shouldAcceptRequestIfLogsEndInSameTermAndCandidateLogIsSameLength() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -114,7 +116,7 @@ public void shouldAcceptRequestIfLogsEndInSameTermAndCandidateLogIsSameLength() @Test public void shouldAcceptRequestIfLogsEndInSameTermAndCandidateLogIsLonger() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -129,7 +131,7 @@ public void shouldAcceptRequestIfLogsEndInSameTermAndCandidateLogIsLonger() @Test public void shouldAcceptRequestIfLogsEndInHigherTermAndCandidateLogIsShorter() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -144,7 +146,7 @@ public void shouldAcceptRequestIfLogsEndInHigherTermAndCandidateLogIsShorter() @Test public void shouldAcceptRequestIfLogEndsAtHigherTermAndCandidateLogIsSameLength() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -159,7 +161,7 @@ public void shouldAcceptRequestIfLogEndsAtHigherTermAndCandidateLogIsSameLength( @Test public void shouldAcceptRequestIfLogEndsAtHigherTermAndCandidateLogIsLonger() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -174,7 +176,7 @@ public void shouldAcceptRequestIfLogEndsAtHigherTermAndCandidateLogIsLonger() @Test public void shouldRejectRequestIfAlreadyVotedForOtherCandidate() { - assertFalse( Ballot.shouldVoteFor( + assertFalse( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, @@ -189,7 +191,7 @@ public void shouldRejectRequestIfAlreadyVotedForOtherCandidate() @Test public void shouldAcceptRequestIfAlreadyVotedForCandidate() { - assertTrue( Ballot.shouldVoteFor( + assertTrue( Voting.shouldVoteFor( candidate, currentTerm, currentTerm, diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java new file mode 100644 index 000000000000..abe4d4dbf848 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/AppendEntriesRequestTest.java @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.roles; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.neo4j.coreedge.raft.RaftMessages.AppendEntries.Response; +import org.neo4j.coreedge.raft.ReplicatedString; +import org.neo4j.coreedge.raft.log.InMemoryRaftLog; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.outcome.BatchAppendLogEntries; +import org.neo4j.coreedge.raft.outcome.CommitCommand; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.outcome.TruncateLogCommand; +import org.neo4j.coreedge.raft.state.RaftState; +import org.neo4j.coreedge.server.RaftTestMember; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.Matchers.empty; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import static org.neo4j.coreedge.raft.MessageUtils.messageFor; +import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; +import static org.neo4j.coreedge.raft.roles.AppendEntriesRequestTest.ContentGenerator.content; +import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; +import static org.neo4j.coreedge.server.RaftTestMember.member; + +@RunWith(Parameterized.class) +public class AppendEntriesRequestTest +{ + @Parameterized.Parameters(name = "{0} with leader {1} terms ahead.") + public static Collection data() + { + return Arrays.asList( new Object[][]{ + {Role.FOLLOWER, 0}, {Role.FOLLOWER, 1}, {Role.LEADER, 1}, {Role.CANDIDATE, 1} + } ); + } + + @Parameterized.Parameter(value = 0) + public Role role; + + @Parameterized.Parameter(value = 1) + public int leaderTermDifference; + + private RaftTestMember myself = member( 0 ); + private RaftTestMember leader = member( 1 ); + + @Test + public void shouldAcceptInitialEntry() throws Exception + { + RaftState state = raftState() + .myself( myself ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + RaftLogEntry logEntry = new RaftLogEntry( leaderTerm, content() ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( -1 ) + .prevLogTerm( -1 ) + .logEntry( logEntry ) + .build(), state, log() ); + + // then + assertTrue( ((Response) messageFor( outcome, leader )).success() ); + assertThat( outcome.getLogCommands(), hasItem( new BatchAppendLogEntries( 0, 0, new RaftLogEntry[]{ logEntry } ) ) ); + } + + @Test + public void shouldAcceptInitialEntries() throws Exception + { + RaftState state = raftState() + .myself( myself ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + RaftLogEntry logEntry1 = new RaftLogEntry( leaderTerm, content() ); + RaftLogEntry logEntry2 = new RaftLogEntry( leaderTerm, content() ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( -1 ) + .prevLogTerm( -1 ) + .logEntry( logEntry1 ) + .logEntry( logEntry2 ) + .build(), state, log() ); + + // then + assertTrue( ((Response) messageFor( outcome, leader )).success() ); + assertThat( outcome.getLogCommands(), hasItem( new BatchAppendLogEntries( 0, 0, + new RaftLogEntry[]{logEntry1, logEntry2} ) ) ); + } + + @Test + public void shouldRejectDiscontinuousEntries() throws Exception + { + // given + RaftState state = raftState() + .myself( myself ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( state.entryLog().appendIndex() + 1 ) + .prevLogTerm( leaderTerm ) + .logEntry( new RaftLogEntry( leaderTerm, content() ) ) + .build(), state, log() ); + + // then + Response response = (Response) messageFor( outcome, leader ); + assertEquals( state.entryLog().appendIndex(), response.appendIndex() ); + assertFalse( response.success() ); + } + + @Test + public void shouldAcceptContinuousEntries() throws Exception + { + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .myself( myself ) + .entryLog( raftLog ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + raftLog.append( new RaftLogEntry( leaderTerm, content() ) ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( raftLog.appendIndex() ) + .prevLogTerm( leaderTerm ) + .logEntry( new RaftLogEntry( leaderTerm, content() ) ) + .build(), state, log() ); + + // then + assertTrue( ((Response) messageFor( outcome, leader )).success() ); + } + + @Test + public void shouldTruncateOnReceiptOfConflictingEntry() throws Exception + { + // given + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .myself( myself ) + .term( 5 ) + .entryLog( raftLog ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + raftLog.append( new RaftLogEntry( state.term() - 1, content() ) ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( raftLog.appendIndex() - 1 ) + .prevLogTerm( -1 ) + .logEntry( new RaftLogEntry( leaderTerm, content() ) ) + .build(), state, log() ); + + // then + assertTrue( ((Response) messageFor( outcome, leader )).success() ); + assertThat( outcome.getLogCommands(), hasItem( new TruncateLogCommand( 0 ) ) ); + } + + @Test + public void shouldCommitEntry() throws Exception + { + // given + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .entryLog( raftLog ) + .myself( myself ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + raftLog.append( new RaftLogEntry( leaderTerm, content() ) ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( raftLog.appendIndex() ) + .prevLogTerm( leaderTerm ) + .leaderCommit( 0 ) + .build(), state, log() ); + + // then + assertTrue( ((Response) messageFor( outcome, leader )).success() ); + assertThat( outcome.getLogCommands(), hasItem( new CommitCommand( 0 ) ) ); + } + + @Test + public void shouldAppendNewEntryAndCommitPreviouslyAppendedEntry() throws Exception + { + // given + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .entryLog( raftLog ) + .myself( myself ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + RaftLogEntry previouslyAppendedEntry = new RaftLogEntry( leaderTerm, content() ); + raftLog.append( previouslyAppendedEntry ); + RaftLogEntry newLogEntry = new RaftLogEntry( leaderTerm, content() ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( raftLog.appendIndex() ) + .prevLogTerm( leaderTerm ) + .logEntry( newLogEntry ) + .leaderCommit( 0 ) + .build(), state, log() ); + + // then + assertTrue( ((Response) messageFor( outcome, leader )).success() ); + assertThat( outcome.getLogCommands(), hasItem( new CommitCommand( 0 ) ) ); + assertThat( outcome.getLogCommands(), hasItem( new BatchAppendLogEntries( 1, 0, + new RaftLogEntry[]{ newLogEntry } ) ) ); + } + + @Test + public void shouldNotCommitAheadOfMatchingHistory() throws Exception + { + // given + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .entryLog( raftLog ) + .myself( myself ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + RaftLogEntry previouslyAppendedEntry = new RaftLogEntry( leaderTerm, content() ); + raftLog.append( previouslyAppendedEntry ); + + // when + Outcome outcome = role.handler.handle( appendEntriesRequest() + .from( leader ) + .leaderTerm( leaderTerm ) + .prevLogIndex( raftLog.appendIndex() + 1 ) + .prevLogTerm( leaderTerm ) + .leaderCommit( 0 ) + .build(), state, log() ); + + + // then + assertFalse( ((Response) messageFor( outcome, leader )).success() ); + assertThat( outcome.getLogCommands(), empty() ); + } + + public RaftState newState() throws RaftStorageException + { + return raftState().myself( myself ).build(); + } + + private Log log() + { + return NullLogProvider.getInstance().getLog( getClass() ); + } + + static class ContentGenerator + { + private static int count = 0; + + public static ReplicatedString content() + { + return new ReplicatedString( String.format( "content#%d", count++ ) ); + } + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java index 768a242b8d6e..cbd42fa180ec 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/CandidateTest.java @@ -19,8 +19,6 @@ */ package org.neo4j.coreedge.raft.roles; -import java.util.UUID; - import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -28,162 +26,116 @@ import org.neo4j.coreedge.raft.NewLeaderBarrier; import org.neo4j.coreedge.raft.log.RaftLogEntry; -import org.neo4j.coreedge.raft.outcome.AppendLogEntry; -import org.neo4j.coreedge.server.RaftTestMember; -import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftStorageException; import org.neo4j.coreedge.raft.net.Inbound; +import org.neo4j.coreedge.raft.outcome.AppendLogEntry; import org.neo4j.coreedge.raft.outcome.Outcome; import org.neo4j.coreedge.raft.state.RaftState; +import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.logging.Log; import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; import static org.hamcrest.CoreMatchers.hasItem; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.TestMessageBuilders.voteResponse; -import static org.neo4j.coreedge.raft.roles.Role.LEADER; -import static org.neo4j.coreedge.server.RaftTestMember.member; -import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; -import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; +import static org.neo4j.coreedge.raft.roles.Role.LEADER; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; +import static org.neo4j.coreedge.server.RaftTestMember.member; @RunWith(MockitoJUnitRunner.class) public class CandidateTest { private RaftTestMember myself = member( 0 ); - - /* A few members that we use at will in tests. */ private RaftTestMember member1 = member( 1 ); @Mock private Inbound inbound; private LogProvider logProvider = NullLogProvider.getInstance(); - public static final int HIGHEST_TERM = 99; @Test - public void shouldBeElectedLeader() throws Exception + public void shouldBeElectedLeaderOnReceivingGrantedVoteResponseWithCurrentTerm() throws Exception { // given - long term = 0; - RaftState state = raftState() - .myself( myself ) - .term( term ) - .build(); - - Candidate candidate = new Candidate(); + RaftState state = newState(); // when - Outcome outcome = candidate.handle( voteResponse() - .term( term ) + Outcome outcome = CANDIDATE.handler.handle( voteResponse() + .term( state.term() ) .from( member1 ) .grant() .build(), state, log() ); // then assertEquals( LEADER, outcome.getNewRole() ); - assertThat( outcome.getLogCommands(), hasItem( new AppendLogEntry( 0, new RaftLogEntry( term, new NewLeaderBarrier() ) )) ); + assertThat( outcome.getLogCommands(), hasItem( new AppendLogEntry( 0, + new RaftLogEntry( state.term(), new NewLeaderBarrier() ) ) ) ); } @Test - public void candidateShouldUpdateTermToCurrentMessageAndBecomeFollower() throws Exception + public void shouldStayAsCandidateOnReceivingDeniedVoteResponseWithCurrentTerm() throws Exception { // given - RaftState state = raftState().build(); - - Candidate candidate = new Candidate(); + RaftState state = newState(); // when - Outcome outcome = candidate.handle( voteRequest().from( member1 ).term( HIGHEST_TERM ).lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log() ); + Outcome outcome = CANDIDATE.handler.handle( voteResponse() + .term( state.term() ) + .from( member1 ) + .deny() + .build(), state, log() ); // then - assertEquals( FOLLOWER, outcome.getNewRole() ); - assertEquals( HIGHEST_TERM, outcome.getTerm() ); + assertEquals( CANDIDATE, outcome.getNewRole() ); } @Test - public void candidateShouldBecomeFollowerIfReceivesMessageWithNewerTerm() throws Exception + public void shouldUpdateTermOnReceivingVoteResponseWithLaterTerm() throws Exception { // given - RaftState state = raftState() - .myself( myself ) - .build(); + RaftState state = newState(); - Candidate candidate = new Candidate(); + final long voterTerm = state.term() + 1; // when - RaftMessages.Vote.Request message = voteRequest() + Outcome outcome = CANDIDATE.handler.handle( voteResponse() + .term( voterTerm ) .from( member1 ) - .term( state.term() + 1 ) - .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(); - Outcome outcome = candidate.handle( message, state, log() ); + .grant() + .build(), state, log() ); // then - assertEquals( message, messageFor( outcome, myself ) ); assertEquals( FOLLOWER, outcome.getNewRole() ); + assertEquals( voterTerm, outcome.getTerm() ); } @Test - public void candidateShouldRejectAnyMessageWithOldTerm() throws Exception + public void shouldRejectVoteResponseWithOldTerm() throws Exception { // given - RaftState state = raftState().build(); + RaftState state = newState(); - Candidate candidate = new Candidate(); + final long voterTerm = state.term() - 1; // when - Outcome outcome = candidate.handle( voteRequest().from( member1 ).term( state.term() - 1 ).lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log() ); + Outcome outcome = CANDIDATE.handler.handle( voteResponse() + .term( voterTerm ) + .from( member1 ) + .grant() + .build(), state, log() ); // then - assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); assertEquals( CANDIDATE, outcome.getNewRole() ); } - @Test - public void candidateShouldBecomeFollowerOnReceiptOfAppendEntriesFromLeaderWithHigherTerm() throws Exception - { - candidateShouldBecomeFollowerOnReceiptOfAppendEntriesFromLeaderWithTerm( 2 ); - } - - @Test - public void candidateShouldBecomeFollowerOnReceiptOfAppendEntriesFromLeaderWithSameTerm() throws Exception - { - candidateShouldBecomeFollowerOnReceiptOfAppendEntriesFromLeaderWithTerm( 1 ); - } - - private void candidateShouldBecomeFollowerOnReceiptOfAppendEntriesFromLeaderWithTerm( int term ) throws Exception + public RaftState newState() throws RaftStorageException { - // given - RaftState state = raftState() - .myself( myself ) - .term( 1 ) - .build(); - - Candidate candidate = new Candidate(); - - // when - RaftMessages.AppendEntries.Request message = appendEntriesRequest() - .from( member1 ) - .leader( member1 ) - .leaderTerm( term ) - .leaderCommit( 1 ) - .correlationId( UUID.randomUUID() ) - .prevLogIndex( 0 ) - .prevLogTerm( term - 1 ).build(); - Outcome outcome = candidate.handle( message, state, log() ); - - // then - assertEquals( message, messageFor( outcome, myself ) ); - assertEquals( FOLLOWER, outcome.getNewRole() ); + return raftState().myself( myself ).build(); } private Log log() diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java index 3b90d57ef7a7..a1cae17d1e02 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/FollowerTest.java @@ -19,8 +19,6 @@ */ package org.neo4j.coreedge.raft.roles; -import java.util.Iterator; - import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -29,43 +27,27 @@ import org.neo4j.coreedge.raft.RaftMessages; import org.neo4j.coreedge.raft.RaftMessages.Message; import org.neo4j.coreedge.raft.RaftMessages.Timeout.Election; -import org.neo4j.coreedge.raft.RaftMessages.Vote; import org.neo4j.coreedge.raft.ReplicatedString; -import org.neo4j.coreedge.raft.log.InMemoryRaftLog; -import org.neo4j.coreedge.raft.log.RaftLog; import org.neo4j.coreedge.raft.log.RaftLogEntry; import org.neo4j.coreedge.raft.log.RaftStorageException; import org.neo4j.coreedge.raft.net.Inbound; -import org.neo4j.coreedge.raft.outcome.CommitCommand; -import org.neo4j.coreedge.raft.outcome.LogCommand; import org.neo4j.coreedge.raft.outcome.Outcome; -import org.neo4j.coreedge.raft.replication.ReplicatedContent; import org.neo4j.coreedge.raft.state.RaftState; import org.neo4j.coreedge.server.RaftTestMember; import org.neo4j.logging.Log; -import org.neo4j.logging.LogProvider; import org.neo4j.logging.NullLogProvider; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.neo4j.coreedge.raft.MessageUtils.messageFor; import static org.neo4j.coreedge.raft.RaftMessages.AppendEntries; import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesRequest; -import static org.neo4j.coreedge.raft.TestMessageBuilders.appendEntriesResponse; -import static org.neo4j.coreedge.raft.TestMessageBuilders.heartbeat; -import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; import static org.neo4j.coreedge.raft.roles.Role.CANDIDATE; -import static org.neo4j.coreedge.raft.roles.Role.FOLLOWER; import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; import static org.neo4j.coreedge.server.RaftTestMember.member; -import static org.neo4j.helpers.collection.Iterables.single; import static org.neo4j.helpers.collection.IteratorUtil.asSet; @@ -77,78 +59,10 @@ public class FollowerTest /* A few members that we use at will in tests. */ private RaftTestMember member1 = member( 1 ); private RaftTestMember member2 = member( 2 ); - private RaftTestMember leader = member( 3 ); @Mock private Inbound inbound; - private LogProvider logProvider = NullLogProvider.getInstance(); - - private static final int HIGHEST_TERM = 99; - - @Test - public void followerShouldUpdateTermToCurrentMessage() throws Exception - { - // Given - RaftState state = raftState().build(); - - - Follower follower = new Follower(); - - // When - Outcome outcome = follower.handle( voteRequest().from( member1 ).term( HIGHEST_TERM ) - .lastLogIndex( 0 ).lastLogTerm( -1 ) - .build(), state, log() ); - - // Then - assertEquals( HIGHEST_TERM, outcome.getTerm() ); - } - - @Test - public void shouldVoteOnceOnlyPerTerm() throws Exception - { - // given - RaftState state = raftState() - .myself( myself ) - .votingMembers( asSet( myself, member1, member2 ) ) - .build(); - - Follower follower = new Follower(); - - // when - Outcome outcome1 = follower.handle( voteRequest().from( member1 ).term( 1 ).build(), state, - log() ); - state.update( outcome1 ); - Outcome outcome2 = follower.handle( voteRequest().from( member2 ).term( 1 ).build(), state, - log() ); - - // then - assertEquals( new Vote.Response<>( myself, 1, true ), messageFor( outcome1, member1 ) ); - assertEquals( new Vote.Response<>( myself, 1, false ), messageFor( outcome2, member2 ) ); - - } - - @Test - public void followersShouldRejectAnyMessageWithOldTermAndStayAFollower() throws Exception - { - // given - RaftState state = raftState() - .myself( myself ) - .votingMembers( asSet( myself, member1, member2 ) ) - - .build(); - - Follower follower = new Follower(); - - // when - Outcome outcome = follower.handle( voteRequest().from( member1 ).term( state.term() - 1 ) - .lastLogIndex( 0 ).lastLogTerm( -1 ).build(), state, log() ); - - // then - assertEquals( new Vote.Response<>( myself, state.term(), false ), messageFor( outcome, member1 ) ); - assertEquals( FOLLOWER, outcome.getNewRole() ); - } - @Test public void followerShouldTransitToCandidateAndInstigateAnElectionAfterTimeout() throws Exception { @@ -172,33 +86,6 @@ public void followerShouldTransitToCandidateAndInstigateAnElectionAfterTimeout() assertNotNull( messageFor( outcome, member2 ) ); } - @Test - public void followerShouldVoteForOnlyOneCandidatePerElection() throws Exception - { - // given - RaftState state = raftState().build(); - - Follower follower = new Follower(); - - // when - final long startingTerm = state.term() + 1; - - Outcome outcome1 = follower.handle( voteRequest().from( member1 ).term( startingTerm ) - .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log() ); - - // This updates state.term() as a side-effect - state.update( outcome1 ); - - Outcome outcome2 = follower.handle( voteRequest().from( member2 ).term( state.term() ) - .lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log() ); - - // then - assertThat( messageFor( outcome1, member1 ), instanceOf( Vote.Response.class ) ); - assertFalse( ((Vote.Response) messageFor( outcome2, member2 )).voteGranted() ); - } - @Test public void shouldBecomeCandidateOnReceivingElectionTimeoutMessage() throws Exception { @@ -217,117 +104,6 @@ public void shouldBecomeCandidateOnReceivingElectionTimeoutMessage() throws Exce assertEquals( CANDIDATE, outcome.getNewRole() ); } - @Test - public void followerShouldRejectEntriesForWhichItDoesNotHavePrecedentInItsLog() throws Exception - { - // given - RaftState state = raftState() - .myself( myself ) - .votingMembers( asSet( myself, member1, member2 ) ) - .build(); - - Follower follower = new Follower(); - - // when - Outcome outcome = follower.handle( new RaftMessages.AppendEntries.Request<>( myself, 99, - 99, 0, new RaftLogEntry[] { new RaftLogEntry( 99, ContentGenerator.content() ) }, 99 ), state, log() ); - - // then - Message response = messageFor( outcome, myself ); - assertThat( response, instanceOf( AppendEntries.Response.class ) ); - assertFalse( ((AppendEntries.Response) response).success() ); - } - - @Test - public void followerShouldAcceptEntriesForWhichItHasPrecedentInItsLog() throws Exception - { - RaftState state = raftState() - .myself( myself ) - .build(); - - Follower follower = new Follower(); - - state.update( follower.handle( new RaftMessages.AppendEntries.Request<>( myself, 0, -1, -1, - new RaftLogEntry[] { new RaftLogEntry( 0, ContentGenerator.content() ) }, 0 ), state, log() ) ); - - // when - Outcome outcome = follower.handle( new RaftMessages.AppendEntries.Request<>( myself, 0, - 0, 0, new RaftLogEntry[] { new RaftLogEntry( 0, ContentGenerator.content() ) }, 1 ), state, log() ); - - state.update( outcome ); - - // then - Message response = messageFor( outcome, myself ); - assertThat( response, instanceOf( AppendEntries.Response.class ) ); - assertTrue( ((AppendEntries.Response) response).success() ); - } - - @Test - public void followerShouldOverwriteSomeAppendedEntriesOnReceiptOfConflictingCommittedEntries() throws Exception - { - // given - RaftState state = raftState() - .myself( myself ) - .build(); - - Follower follower = new Follower(); - - appendSomeEntriesToLog( state, follower, 9, 0 ); - - // when - Outcome outcome = follower.handle( new AppendEntries.Request<>( myself, 1, -1, -1, - new RaftLogEntry[] { new RaftLogEntry( 1, new ReplicatedString( "commit this!" ) ) }, 0 ), state, log() ); - state.update( outcome ); - - // then - assertEquals( 0, state.entryLog().commitIndex() ); - } - - @Test - public void followerWithEmptyLogShouldCommitEntriesWithHigherTerm() throws Exception - { - // given - RaftState state = raftState() - .myself( myself ) - .build(); - - Follower follower = new Follower(); - - // when - Outcome outcome = follower.handle( new AppendEntries.Request<>( myself, 1, -1, -1, - new RaftLogEntry[] { new RaftLogEntry( 1, new ReplicatedString( "commit this!" ) ) }, 0 ), state, log() ); - state.update( outcome ); - - // then - assertEquals( 0, state.entryLog().commitIndex() ); - assertEquals( 1, state.term() ); - } - - @Test - public void followerWithNonEmptyLogShouldOverwriteAppendedEntriesOnReceiptOfCommittedEntryWithHigherTerm() - throws Exception - { - // given - int term = 1; - RaftState state = raftState() - .myself( myself ) - .term( term ) - .build(); - - Follower follower = new Follower(); - - appendSomeEntriesToLog( state, follower, 9, term ); - - // when leader says it agrees with some of the existing log - Outcome outcome = follower.handle( new AppendEntries.Request<>( member1, 2, 3, 1, - new RaftLogEntry[] { new RaftLogEntry( 2, new ReplicatedString( "commit this!" ) ) }, 4 ), state, log() ); - state.update( outcome ); - - // then - assertEquals( 4, state.entryLog().commitIndex() ); - assertEquals( 2, state.term() ); - } - @Test public void followerReceivingHeartbeatIndicatingClusterIsAheadShouldElicitAppendResponse() throws Exception { @@ -358,118 +134,6 @@ public void followerReceivingHeartbeatIndicatingClusterIsAheadShouldElicitAppend assertFalse( response.success() ); } - @Test - public void heartbeatShouldNotResultInCommitIfReferringToFutureEntries() throws Exception - { - int term = 1; - int followerAppendIndex = 9; - - RaftState state = raftState() - .myself( myself ) - .term( term ) - .build(); - - - Follower follower = new Follower(); - appendSomeEntriesToLog( state, follower, followerAppendIndex, term ); - - RaftMessages.Heartbeat heartbeat = heartbeat().from( member1 ) - .commitIndex( followerAppendIndex + 2 ) // The leader is talking about committing stuff we don't know about - .commitIndexTerm( term ) // And is in the same term - .leaderTerm( term + 2 ) - .build(); - - Outcome outcome = follower.handle( heartbeat, state, log() ); - - // Then there should be no actions taken against the log - assertFalse( outcome.getLogCommands().iterator().hasNext() ); - } - - @Test - public void heartbeatShouldNotResultInCommitIfHistoryMismatches() throws Exception - { - int term = 1; - int followerAppendIndex = 9; - - RaftState state = raftState() - .myself( myself ) - .term( term ) - .build(); - - - Follower follower = new Follower(); - appendSomeEntriesToLog( state, follower, followerAppendIndex, term ); - - RaftMessages.Heartbeat heartbeat = heartbeat().from( member1 ) - .commitIndex( followerAppendIndex ) // The leader suggests that we commit stuff we have appended - .commitIndexTerm( term + 2 ) // but in a different term - .leaderTerm( term + 2 ) // And is in a term that is further in the future - .build(); - - Outcome outcome = follower.handle( heartbeat, state, log() ); - - assertFalse( outcome.getLogCommands().iterator().hasNext() ); - } - - @Test - public void historyShouldResultInCommitIfHistoryMatches() throws Exception - { - int term = 1; - int followerAppendIndex = 9; - - RaftState state = raftState() - .myself( myself ) - .term( term ) - .build(); - - Follower follower = new Follower(); - appendSomeEntriesToLog( state, follower, followerAppendIndex, term ); - - RaftMessages.Heartbeat heartbeat = heartbeat().from( member1 ) - .commitIndex( followerAppendIndex - 1) // The leader suggests that we commit stuff we have appended - .commitIndexTerm( term ) // in the same term - .leaderTerm( term + 2 ) // with the leader in the future - .build(); - - Outcome outcome = follower.handle( heartbeat, state, log() ); - - Iterator iterator = outcome.getLogCommands().iterator(); - assertTrue( iterator.hasNext() ); - LogCommand logCommand = iterator.next(); - assertFalse( iterator.hasNext() ); - assertThat( logCommand, instanceOf( CommitCommand.class ) ); - CommitCommand commit = (CommitCommand) logCommand; - CapturingRaftLog capture = new CapturingRaftLog(); - commit.applyTo( capture ); - assertEquals( followerAppendIndex - 1, capture.commitIndex() ); - } - - @Test - public void shouldAppendMultipleEntries() throws Exception - { - // given - int term = 1; - RaftState state = raftState() - .myself( myself ) - .term( term ) - .build(); - - Follower follower = new Follower(); - - RaftLogEntry[] entries = { - new RaftLogEntry( 1, new ReplicatedString( "commit this!" ) ), - new RaftLogEntry( 1, new ReplicatedString( "commit this as well!" ) ), - new RaftLogEntry( 1, new ReplicatedString( "commit this too!" ) ) - }; - - Outcome outcome = follower.handle( - new AppendEntries.Request<>( member1, 1, -1, -1, entries, -1 ), state, log() ); - state.update( outcome ); - - // then - assertEquals( 2, state.entryLog().appendIndex() ); - } - @Test public void shouldTruncateIfTermDoesNotMatch() throws Exception { @@ -483,7 +147,7 @@ public void shouldTruncateIfTermDoesNotMatch() throws Exception Follower follower = new Follower(); state.update( follower.handle( new AppendEntries.Request<>( member1, 1, -1, -1, - new RaftLogEntry[] { + new RaftLogEntry[]{ new RaftLogEntry( 2, ContentGenerator.content() ), }, -1 ), state, log() ) ); @@ -560,71 +224,6 @@ public void shouldNotRenewElectionTimeoutOnReceiptOfHeartbeatInLowerTerm() throw assertFalse( outcome.electionTimeoutRenewed() ); } - @Test - public void shouldNotCommitAheadOfMatchingHistory() throws Exception - { - // given - int LEADER_COMMIT = 10; - int LEADER1_TERM = 1; - int LEADER2_TERM = 2; - - InMemoryRaftLog raftLog = new InMemoryRaftLog(); - - raftLog.append( new RaftLogEntry( LEADER1_TERM, ReplicatedString.valueOf( "gryffindor" ) ) ); // (0) we committed this far already - raftLog.commit( 0 ); - raftLog.append( new RaftLogEntry( LEADER1_TERM, ReplicatedString.valueOf( "hufflepuff" ) ) ); // (1) we should only commit up to this, because this is how far we match - raftLog.append( new RaftLogEntry( LEADER1_TERM, ReplicatedString.valueOf( "ravenclaw" ) ) ); // (2) leader will have committed including this and more - - RaftState state = raftState() - .myself( myself ) - .entryLog( raftLog ) - .term( 1 ) - .build(); - - Follower follower = new Follower(); - Outcome outcome; - - // when - append request matching history and truncating - RaftLogEntry[] newEntries = { - new RaftLogEntry( LEADER2_TERM, new ReplicatedString( "slytherin" ) ), // (1 - overwrite and truncate) - }; - - outcome = follower.handle( new RaftMessages.AppendEntries.Request<>( myself, LEADER2_TERM, 0, LEADER1_TERM, newEntries, LEADER_COMMIT ), state, log() ); - - // then - assertThat( outcome.getLogCommands(), hasItem( new CommitCommand( 1 ) ) ); - } - - @Test - public void shouldIncludeLatestAppendedInResponse() throws Exception - { - // given: just a single appended entry at follower - RaftLogEntry entryA = new RaftLogEntry( 1, ReplicatedString.valueOf( "b" ) ); - - InMemoryRaftLog raftLog = new InMemoryRaftLog(); - raftLog.append( entryA ); - - RaftState state = raftState() - .myself( myself ) - .entryLog( raftLog ) - .term( 1 ) - .build(); - - Follower follower = new Follower(); - - RaftLogEntry entryB = new RaftLogEntry( 1, ReplicatedString.valueOf( "b" ) ); - - // when: append request for item way forward (index=10, term=2) - Outcome outcome = follower.handle( - new RaftMessages.AppendEntries.Request<>( leader, 2, 10, 2, - new RaftLogEntry[]{entryB}, 10 ), state, log() ); - - // then: respond with false and how far ahead we are - assertThat( single( outcome.getOutgoingMessages()).message(), equalTo( - appendEntriesResponse().from( myself ).term( 2 ).appendIndex( 0 ).matchIndex( -1 ).failure() - .build() ) ); - } - private void appendSomeEntriesToLog( RaftState raft, Follower follower, int numberOfEntriesToAppend, int term ) throws RaftStorageException { @@ -655,78 +254,6 @@ public static ReplicatedString content() private Log log() { - return logProvider.getLog( getClass() ); - } - - private static final class CapturingRaftLog implements RaftLog - { - - private long commitIndex; - - @Override - public void replay() throws Throwable - { - - } - - @Override - public void registerListener( Listener consumer ) - { - - } - - @Override - public long append( RaftLogEntry entry ) throws RaftStorageException - { - return 0; - } - - @Override - public void truncate( long fromIndex ) throws RaftStorageException - { - - } - - @Override - public void commit( long commitIndex ) throws RaftStorageException - { - this.commitIndex = commitIndex; - } - - @Override - public long appendIndex() - { - return 0; - } - - @Override - public long commitIndex() - { - return commitIndex; - } - - @Override - public RaftLogEntry readLogEntry( long logIndex ) throws RaftStorageException - { - return null; - } - - @Override - public ReplicatedContent readEntryContent( long logIndex ) throws RaftStorageException - { - return null; - } - - @Override - public long readEntryTerm( long logIndex ) throws RaftStorageException - { - return 0; - } - - @Override - public boolean entryExists( long logIndex ) - { - return false; - } + return NullLogProvider.getInstance().getLog( getClass() ); } } diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java new file mode 100644 index 000000000000..c479fe96bbe3 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/HeartbeatTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.roles; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.InMemoryRaftLog; +import org.neo4j.coreedge.raft.log.RaftLogEntry; +import org.neo4j.coreedge.raft.outcome.CommitCommand; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.state.RaftState; +import org.neo4j.coreedge.server.RaftTestMember; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.core.IsCollectionContaining.hasItem; +import static org.junit.Assert.assertThat; + +import static org.neo4j.coreedge.raft.TestMessageBuilders.heartbeat; +import static org.neo4j.coreedge.raft.roles.AppendEntriesRequestTest.ContentGenerator.content; +import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; +import static org.neo4j.coreedge.server.RaftTestMember.member; + +@RunWith(Parameterized.class) +public class HeartbeatTest +{ + @Parameterized.Parameters(name = "{0} with leader {1} terms ahead.") + public static Collection data() + { + return Arrays.asList( new Object[][]{ + {Role.FOLLOWER, 0}, {Role.FOLLOWER, 1}, {Role.LEADER, 1}, {Role.CANDIDATE, 1} + } ); + } + + @Parameterized.Parameter(value = 0) + public Role role; + + @Parameterized.Parameter(value = 1) + public int leaderTermDifference; + + private RaftTestMember myself = member( 0 ); + private RaftTestMember leader = member( 1 ); + + @Test + public void shouldNotResultInCommitIfReferringToFutureEntries() throws Exception + { + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .myself( myself ) + .entryLog( raftLog ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + raftLog.append( new RaftLogEntry( leaderTerm, content() ) ); + + RaftMessages.Heartbeat heartbeat = heartbeat() + .from( leader ) + .commitIndex( raftLog.appendIndex() + 1) // The leader is talking about committing stuff we don't know about + .commitIndexTerm( leaderTerm ) // And is in the same term + .leaderTerm( leaderTerm ) + .build(); + + Outcome outcome = role.handler.handle( heartbeat, state, log() ); + + assertThat( outcome.getLogCommands(), empty()); + } + + @Test + public void shouldNotResultInCommitIfHistoryMismatches() throws Exception + { + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .myself( myself ) + .entryLog( raftLog ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + raftLog.append( new RaftLogEntry( leaderTerm, content() ) ); + + RaftMessages.Heartbeat heartbeat = heartbeat() + .from( leader ) + .commitIndex( raftLog.appendIndex()) // The leader is talking about committing stuff we don't know about + .commitIndexTerm( leaderTerm ) // And is in the same term + .leaderTerm( leaderTerm ) + .build(); + + Outcome outcome = role.handler.handle( heartbeat, state, log() ); + + assertThat( outcome.getLogCommands(), hasItem(new CommitCommand( 0 )) ); + } + + @Test + public void shouldResultInCommitIfHistoryMatches() throws Exception + { + InMemoryRaftLog raftLog = new InMemoryRaftLog(); + RaftState state = raftState() + .myself( myself ) + .entryLog( raftLog ) + .build(); + + long leaderTerm = state.term() + leaderTermDifference; + raftLog.append( new RaftLogEntry( leaderTerm - 1, content() ) ); + + RaftMessages.Heartbeat heartbeat = heartbeat() + .from( leader ) + .commitIndex( raftLog.appendIndex()) // The leader is talking about committing stuff we don't know about + .commitIndexTerm( leaderTerm ) // And is in the same term + .leaderTerm( leaderTerm ) + .build(); + + Outcome outcome = role.handler.handle( heartbeat, state, log() ); + + assertThat( outcome.getLogCommands(), empty() ); + + } + + private Log log() + { + return NullLogProvider.getInstance().getLog( getClass() ); + } + + +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java index 4bf9504968de..a1f78e065c48 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/LeaderTest.java @@ -87,47 +87,6 @@ public class LeaderTest private static final ReplicatedString CONTENT = ReplicatedString.valueOf( "some-content-to-raft" ); - @Test - public void leaderShouldUpdateTermToCurrentMessageAndBecomeFollower() throws Exception - { - // given - RaftState state = raftState().build(); - - Leader leader = new Leader(); - - // when - Outcome outcome = leader.handle( voteRequest().from( member1 ).term( HIGHEST_TERM ).lastLogIndex( 0 ) - .lastLogTerm( -1 ).build(), state, log() ); - - // then - assertEquals( FOLLOWER, outcome.getNewRole() ); - assertEquals( HIGHEST_TERM, outcome.getTerm() ); - } - - @Test - public void leaderShouldRejectVoteRequestWithNewerTermAndBecomeAFollower() throws Exception - { - // given - RaftState state = raftState().myself( myself ).build(); - - Leader leader = new Leader(); - - // when - RaftMessages.Vote.Request message = voteRequest() - .from( member1 ) - .term( state.term() + 1 ) - .lastLogIndex( 0 ) - .lastLogTerm( -1 ) - .build(); - Outcome outcome = leader.handle( message, state, log() ); - - // then - assertEquals( message, messageFor( outcome, myself ) ); - assertEquals( FOLLOWER, outcome.getNewRole() ); - assertEquals( 0, count( outcome.getLogCommands() ) ); - assertEquals( state.term() + 1, outcome.getTerm() ); - } - @Test public void leaderShouldNotRespondToSuccessResponseFromFollowerThatWillSoonUpToDateViaInFlightMessages() throws Exception @@ -394,29 +353,6 @@ public void leaderShouldRejectAppendEntriesResponseWithNewerTermAndBecomeAFollow // TODO: test that shows we don't commit for previous terms - @Test - public void leaderShouldRejectAnyMessageWithOldTerm() throws Exception - { - // given - RaftState state = raftState().build(); - - Leader leader = new Leader(); - - // when - RaftMessages.Vote.Request message = voteRequest() - .from( member1 ) - .term( state.term() - 1 ) - .lastLogIndex( 0 ) - .lastLogTerm( -1 ) - .build(); - Outcome outcome = leader.handle( message, state, log() ); - - // then - assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )) - .voteGranted() ); - assertEquals( LEADER, outcome.getNewRole() ); - } - @Test public void leaderShouldSendHeartbeatsToAllClusterMembersOnReceiptOfHeartbeatTick() throws Exception { diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java new file mode 100644 index 000000000000..2523c804e7b8 --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/NonFollowerVoteRequestTest.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.roles; + +import java.util.Collection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.state.RaftState; +import org.neo4j.coreedge.server.RaftTestMember; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static java.util.Arrays.asList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.neo4j.coreedge.raft.MessageUtils.messageFor; +import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; +import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; +import static org.neo4j.coreedge.server.RaftTestMember.member; + +@RunWith(Parameterized.class) +public class NonFollowerVoteRequestTest +{ + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return asList( Role.CANDIDATE, Role.LEADER ); + } + + @Parameterized.Parameter + public Role role; + + private RaftTestMember myself = member( 0 ); + private RaftTestMember member1 = member( 1 ); + + @Test + public void shouldRejectVoteRequestFromCurrentTerm() throws Exception + { + RaftState state = newState(); + + // when + final long candidateTerm = state.term(); + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); + assertEquals( role, outcome.getNewRole() ); + } + + @Test + public void shouldRejectVoteRequestFromPreviousTerm() throws Exception + { + RaftState state = newState(); + + // when + final long candidateTerm = state.term() - 1; + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); + assertEquals( role, outcome.getNewRole() ); + } + + public RaftState newState() throws RaftStorageException + { + return raftState().myself( myself ).build(); + } + + private Log log() + { + return NullLogProvider.getInstance().getLog( getClass() ); + } +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java new file mode 100644 index 000000000000..19589acecbea --- /dev/null +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/roles/VoteRequestTest.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.coreedge.raft.roles; + +import java.util.Collection; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.neo4j.coreedge.raft.RaftMessages; +import org.neo4j.coreedge.raft.log.RaftStorageException; +import org.neo4j.coreedge.raft.outcome.Outcome; +import org.neo4j.coreedge.raft.state.RaftState; +import org.neo4j.coreedge.server.RaftTestMember; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static java.util.Arrays.asList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import static org.neo4j.coreedge.raft.MessageUtils.messageFor; +import static org.neo4j.coreedge.raft.TestMessageBuilders.voteRequest; +import static org.neo4j.coreedge.raft.state.RaftStateBuilder.raftState; +import static org.neo4j.coreedge.server.RaftTestMember.member; + +/** + * Most behaviour for handling vote requests is identical for all roles. + */ +@RunWith(Parameterized.class) +public class VoteRequestTest +{ + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return asList( Role.values() ); + } + + @Parameterized.Parameter + public Role role; + + private RaftTestMember myself = member( 0 ); + private RaftTestMember member1 = member( 1 ); + private RaftTestMember member2 = member( 2 ); + + @Test + public void shouldVoteForCandidateInLaterTerm() throws Exception + { + // given + RaftState state = newState(); + + // when + final long candidateTerm = state.term() + 1; + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertTrue( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); + } + + @Test + public void shouldDenyForCandidateInPreviousTerm() throws Exception + { + // given + RaftState state = newState(); + + // when + final long candidateTerm = state.term() - 1; + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome, member1 )).voteGranted() ); + assertEquals( role, outcome.getNewRole() ); + } + + @Test + public void shouldVoteForOnlyOneCandidatePerTerm() throws Exception + { + // given + RaftState state = newState(); + + // when + final long candidateTerm = state.term() + 1; + + Outcome outcome1 = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + state.update( outcome1 ); + + Outcome outcome2 = role.handler.handle( voteRequest().from( member2 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertFalse( ((RaftMessages.Vote.Response) messageFor( outcome2, member2 )).voteGranted() ); + } + + @Test + public void shouldStayInCurrentRoleOnRequestFromCurrentTerm() throws Exception + { + // given + RaftState state = newState(); + + // when + final long candidateTerm = state.term(); + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertEquals( role, outcome.getNewRole() ); + } + + @Test + public void shouldMoveToFollowerIfRequestIsFromLaterTerm() throws Exception + { + // given + RaftState state = newState(); + + // when + final long candidateTerm = state.term() + 1; + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertEquals( Role.FOLLOWER, outcome.getNewRole() ); + } + + @Test + public void shouldUpdateTermIfRequestIsFromLaterTerm() throws Exception + { + // given + RaftState state = newState(); + + // when + final long candidateTerm = state.term() + 1; + + Outcome outcome = role.handler.handle( voteRequest().from( member1 ).term( candidateTerm ) + .lastLogIndex( 0 ) + .lastLogTerm( -1 ).build(), state, log() ); + + // then + assertEquals( candidateTerm, outcome.getTerm() ); + } + + public RaftState newState() throws RaftStorageException + { + return raftState().myself( myself ).build(); + } + + private Log log() + { + return NullLogProvider.getInstance().getLog( getClass() ); + } + +} diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ClusterSafetyViolations.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ClusterSafetyViolations.java index be80e4146bff..28da053760b9 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ClusterSafetyViolations.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/ClusterSafetyViolations.java @@ -91,7 +91,7 @@ public static boolean multipleLeadersInSameTerm( ClusterState state ) Set termThatHaveALeader = new HashSet<>(); for ( Map.Entry entry : state.roles.entrySet() ) { - RaftMessageHandler role = entry.getValue().role; + RaftMessageHandler role = entry.getValue().handler; if ( role instanceof Leader ) { long term = state.states.get( entry.getKey() ).term(); diff --git a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java index ebb97480c69e..361761f652ce 100644 --- a/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java +++ b/enterprise/core-edge/src/test/java/org/neo4j/coreedge/raft/state/explorer/action/ProcessMessage.java @@ -53,7 +53,7 @@ public ClusterState advance( ClusterState previous ) throws RaftStorageException } ComparableRaftState memberState = previous.states.get( member ); ComparableRaftState newMemberState = new ComparableRaftState( memberState ); - Outcome outcome = previous.roles.get( member ).role.handle( message, memberState, log ); + Outcome outcome = previous.roles.get( member ).handler.handle( message, memberState, log ); newMemberState.update( outcome ); for ( RaftMessages.Directed outgoingMessage : outcome.getOutgoingMessages() )