diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java index 7e7565ac964e7..d5ce1c4038a85 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/CausalClusteringSettings.java @@ -73,6 +73,10 @@ public class CausalClusteringSettings implements LoadableConfig public static final Setting refuse_to_be_leader = setting( "causal_clustering.refuse_to_be_leader", BOOLEAN, FALSE ); + @Description( "Enable pre-voting extension to the Raft protocol (this is breaking and must match between the core cluster members)" ) + public static final Setting enable_pre_voting = + setting( "causal_clustering.enable_pre_voting", BOOLEAN, FALSE ); + @Description( "The maximum batch size when catching up (in unit of entries)" ) public static final Setting catchup_batch_size = setting( "causal_clustering.catchup_batch_size", INTEGER, "64" ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java index 648a674d3f704..116340b63b592 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/ConsensusModule.java @@ -136,9 +136,12 @@ expectedClusterSize, electionTimeout, systemClock(), config.get( join_catch_up_t raftTimeoutService = new DelayedRenewableTimeoutService( systemClock(), logProvider ); + boolean supportsPreVoting = config.get( CausalClusteringSettings.enable_pre_voting ); + raftMachine = new RaftMachine( myself, termState, voteState, raftLog, electionTimeout, heartbeatInterval, raftTimeoutService, outbound, logProvider, raftMembershipManager, logShipping, inFlightCache, - RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ), platformModule.monitors, systemClock() ); + RefuseToBeLeaderStrategy.shouldRefuseToBeLeader( config, logProvider.getLog( getClass() ) ), + supportsPreVoting, platformModule.monitors, systemClock() ); life.add( new RaftCoreTopologyConnector( coreTopologyService, raftMachine ) ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/MajorityIncludingSelfQuorum.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/MajorityIncludingSelfQuorum.java index 0234e2626b437..428163f957976 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/MajorityIncludingSelfQuorum.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/MajorityIncludingSelfQuorum.java @@ -19,10 +19,17 @@ */ package org.neo4j.causalclustering.core.consensus; +import java.util.Collection; + public class MajorityIncludingSelfQuorum { private static final int MIN_QUORUM = 2; + public static boolean isQuorum( Collection cluster, Collection countNotIncludingMyself ) + { + return isQuorum( cluster.size(), countNotIncludingMyself.size() ); + } + public static boolean isQuorum( int clusterSize, int countNotIncludingSelf ) { return isQuorum( MIN_QUORUM, clusterSize, countNotIncludingSelf ); diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java index ba47678ccb1f0..4682e33450f60 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMachine.java @@ -97,7 +97,8 @@ public RaftMachine( MemberId myself, StateStorage termStorage, StateS RaftLog entryLog, long electionTimeout, long heartbeatInterval, RenewableTimeoutService renewableTimeoutService, Outbound outbound, LogProvider logProvider, RaftMembershipManager membershipManager, RaftLogShippingManager logShipping, - InFlightCache inFlightCache, boolean refuseToBecomeLeader, Monitors monitors, Clock clock ) + InFlightCache inFlightCache, boolean refuseToBecomeLeader, boolean supportPreVoting, Monitors monitors, + Clock clock ) { this.myself = myself; this.electionTimeout = electionTimeout; @@ -115,7 +116,7 @@ public RaftMachine( MemberId myself, StateStorage termStorage, StateS this.inFlightCache = inFlightCache; this.state = new RaftState( myself, termStorage, membershipManager, entryLog, voteStorage, inFlightCache, - logProvider ); + logProvider, supportPreVoting ); leaderNotFoundMonitor = monitors.newMonitor( LeaderNotFoundMonitor.class ); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessages.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessages.java index a2401e00bd612..d3c51cf4eead7 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessages.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/RaftMessages.java @@ -37,6 +37,26 @@ public interface RaftMessages { + + interface Handler + { + T handle( Vote.Request request ) throws E; + T handle( Vote.Response response ) throws E; + T handle( PreVote.Request request ) throws E; + T handle( PreVote.Response response ) throws E; + T handle( AppendEntries.Request request ) throws E; + T handle( AppendEntries.Response response ) throws E; + T handle( Heartbeat heartbeat ) throws E; + T handle( LogCompactionInfo logCompactionInfo ) throws E; + T handle( HeartbeatResponse heartbeatResponse ) throws E; + T handle( Timeout.Election election ) throws E; + T handle( Timeout.Heartbeat heartbeat ) throws E; + T handle( NewEntry.Request request ) throws E; + T handle( NewEntry.BatchRequest batchRequest ) throws E; + T handle( PruneRequest pruneRequest ) throws E; + } + + // Position is used to identify messages. Changing order will break upgrade paths. enum Type { VOTE_REQUEST, @@ -59,12 +79,16 @@ enum Type NEW_BATCH_REQUEST, PRUNE_REQUEST, + + PRE_VOTE_REQUEST, + PRE_VOTE_RESPONSE, } interface RaftMessage extends Message { MemberId from(); Type type(); + T dispatch( Handler handler ) throws E; } class Directed @@ -139,6 +163,12 @@ public long term() return term; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -202,6 +232,12 @@ public Response( MemberId from, long term, boolean voteGranted ) this.voteGranted = voteGranted; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -246,6 +282,148 @@ public boolean voteGranted() } } + interface PreVote + { + class Request extends BaseRaftMessage + { + private long term; + private MemberId candidate; + private long lastLogIndex; + private long lastLogTerm; + + public Request( MemberId from, long term, MemberId candidate, long lastLogIndex, long lastLogTerm ) + { + super( from, Type.PRE_VOTE_REQUEST ); + this.term = term; + this.candidate = candidate; + this.lastLogIndex = lastLogIndex; + this.lastLogTerm = lastLogTerm; + } + + public long term() + { + return term; + } + + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + Request request = (Request) o; + return lastLogIndex == request.lastLogIndex && + lastLogTerm == request.lastLogTerm && + term == request.term && + candidate.equals( request.candidate ); + } + + @Override + public int hashCode() + { + int result = (int) term; + result = 31 * result + candidate.hashCode(); + result = 31 * result + (int) (lastLogIndex ^ (lastLogIndex >>> 32)); + result = 31 * result + (int) (lastLogTerm ^ (lastLogTerm >>> 32)); + return result; + } + + @Override + public String toString() + { + return format( "PreVote.Request from %s {term=%d, candidate=%s, lastAppended=%d, lastLogTerm=%d}", + from, term, candidate, lastLogIndex, lastLogTerm ); + } + + public long lastLogTerm() + { + return lastLogTerm; + } + + public long lastLogIndex() + { + return lastLogIndex; + } + + public MemberId candidate() + { + return candidate; + } + } + + class Response extends BaseRaftMessage + { + private long term; + private boolean voteGranted; + + public Response( MemberId from, long term, boolean voteGranted ) + { + super( from, Type.PRE_VOTE_RESPONSE ); + this.term = term; + this.voteGranted = voteGranted; + } + + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + + Response response = (Response) o; + + return term == response.term && voteGranted == response.voteGranted; + + } + + @Override + public int hashCode() + { + int result = (int) term; + result = 31 * result + (voteGranted ? 1 : 0); + return result; + } + + @Override + public String toString() + { + return format( "PreVote.Response from %s {term=%d, voteGranted=%s}", from, term, voteGranted ); + } + + public long term() + { + return term; + } + + public boolean voteGranted() + { + return voteGranted; + } + } + } + interface AppendEntries { class Request extends BaseRaftMessage @@ -295,6 +473,12 @@ public long leaderCommit() return leaderCommit; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -365,6 +549,12 @@ public long appendIndex() return appendIndex; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -431,6 +621,12 @@ public long commitIndexTerm() return commitIndexTerm; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -480,6 +676,12 @@ public HeartbeatResponse( MemberId from ) super( from, HEARTBEAT_RESPONSE ); } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public String toString() { @@ -509,6 +711,12 @@ public long prevIndex() return prevIndex; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -556,6 +764,12 @@ public Election( MemberId from ) super( from, Type.ELECTION_TIMEOUT ); } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public String toString() { @@ -570,6 +784,12 @@ public Heartbeat( MemberId from ) super( from, Type.HEARTBEAT_TIMEOUT ); } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public String toString() { @@ -590,6 +810,12 @@ public Request( MemberId from, ReplicatedContent content ) this.content = content; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public String toString() { @@ -640,6 +866,12 @@ public void add( ReplicatedContent content ) list.add( content ); } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { @@ -734,6 +966,12 @@ public Type type() { return message.type(); } + + @Override + public T dispatch( Handler visitor ) throws E + { + return message.dispatch( visitor ); + } } class PruneRequest extends BaseRaftMessage @@ -751,6 +989,12 @@ public long pruneIndex() return pruneIndex; } + @Override + public T dispatch( Handler handler ) throws E + { + return handler.handle( this ); + } + @Override public boolean equals( Object o ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/outcome/Outcome.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/outcome/Outcome.java index 69c5c4368f261..e0646761ed6fe 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/outcome/Outcome.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/outcome/Outcome.java @@ -60,6 +60,8 @@ public class Outcome implements Message, ConsensusOutcome private MemberId votedFor; private boolean renewElectionTimeout; private boolean needsFreshSnapshot; + private boolean isPreElection; + private Set preVotesForMe; /* Candidate */ private Set votesForMe; @@ -78,10 +80,10 @@ public Outcome( Role currentRole, ReadableRaftState ctx ) } public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, MemberId votedFor, - Set votesForMe, long lastLogIndexBeforeWeBecameLeader, + Set votesForMe, Set preVotesForMe, long lastLogIndexBeforeWeBecameLeader, FollowerStates followerStates, boolean renewElectionTimeout, Collection logCommands, Collection outgoingMessages, - Collection shipCommands, long commitIndex, Set heartbeatResponses ) + Collection shipCommands, long commitIndex, Set heartbeatResponses, boolean isPreElection ) { this.nextRole = nextRole; this.term = term; @@ -89,6 +91,7 @@ public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, Me this.leaderCommit = leaderCommit; this.votedFor = votedFor; this.votesForMe = new HashSet<>( votesForMe ); + this.preVotesForMe = new HashSet<>( preVotesForMe ); this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader; this.followerStates = followerStates; this.renewElectionTimeout = renewElectionTimeout; @@ -98,6 +101,7 @@ public Outcome( Role nextRole, long term, MemberId leader, long leaderCommit, Me this.outgoingMessages.addAll( outgoingMessages ); this.shipCommands.addAll( shipCommands ); this.commitIndex = commitIndex; + this.isPreElection = isPreElection; } private void defaults( Role currentRole, ReadableRaftState ctx ) @@ -113,6 +117,8 @@ private void defaults( Role currentRole, ReadableRaftState ctx ) renewElectionTimeout = false; needsFreshSnapshot = false; + isPreElection = (currentRole == Role.FOLLOWER) && ctx.isPreElection(); + preVotesForMe = isPreElection ? new HashSet<>( ctx.preVotesForMe() ) : emptySet(); votesForMe = (currentRole == Role.CANDIDATE) ? new HashSet<>( ctx.votesForMe() ) : emptySet(); heartbeatResponses = (currentRole == Role.LEADER) ? new HashSet<>( ctx.heartbeatResponses() ) : emptySet(); @@ -214,6 +220,7 @@ public String toString() ", renewElectionTimeout=" + renewElectionTimeout + ", needsFreshSnapshot=" + needsFreshSnapshot + ", votesForMe=" + votesForMe + + ", preVotesForMe=" + preVotesForMe + ", lastLogIndexBeforeWeBecameLeader=" + lastLogIndexBeforeWeBecameLeader + ", followerStates=" + followerStates + ", shipCommands=" + shipCommands + @@ -318,4 +325,24 @@ public Set getHeartbeatResponses() { return heartbeatResponses; } + + public void setPreElection( boolean isPreElection ) + { + this.isPreElection = isPreElection; + } + + public boolean isPreElection() + { + return isPreElection; + } + + public void addPreVoteForMe( MemberId from ) + { + this.preVotesForMe.add( from ); + } + + public Set getPreVotesForMe() + { + return preVotesForMe; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java index 90b9519ddd59b..cb416d3d55492 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Appending.java @@ -50,11 +50,12 @@ static void handleAppendEntriesRequest( ReadableRaftState state, Outcome outcome } outcome.renewElectionTimeout(); + outcome.setPreElection( false ); outcome.setNextTerm( request.leaderTerm() ); outcome.setLeader( request.from() ); outcome.setLeaderCommit( request.leaderCommit() ); - if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm(), log ) ) + if ( !Follower.logHistoryMatches( state, request.prevLogIndex(), request.prevLogTerm() ) ) { assert request.prevLogIndex() > -1 && request.prevLogTerm() > -1; RaftMessages.AppendEntries.Response appendResponse = new RaftMessages.AppendEntries.Response( diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Candidate.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Candidate.java index bd6b21fd8429b..0c760b26508a2 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Candidate.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Candidate.java @@ -38,124 +38,177 @@ class Candidate implements RaftMessageHandler @Override public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log ) throws IOException { - Outcome outcome = new Outcome( CANDIDATE, ctx ); + return message.dispatch( new Handler( ctx, log ) ); + } - switch ( message.type() ) - { - case HEARTBEAT: - { - RaftMessages.Heartbeat req = (RaftMessages.Heartbeat) message; + private static class Handler implements RaftMessages.Handler + { + private final ReadableRaftState ctx; + private final Log log; + private final Outcome outcome; - if ( req.leaderTerm() < ctx.term() ) - { - break; - } + private Handler( ReadableRaftState ctx, Log log ) + { + this.ctx = ctx; + this.log = log; + this.outcome = new Outcome( CANDIDATE, ctx ); + } - outcome.setNextRole( FOLLOWER ); - log.info( "Moving to FOLLOWER state after receiving heartbeat from %s at term %d (I am at %d)", - req.from(), req.leaderTerm(), ctx.term() ); - Heart.beat( ctx, outcome, (RaftMessages.Heartbeat) message, log ); - break; + @Override + public Outcome handle( RaftMessages.Heartbeat req ) throws IOException + { + if ( req.leaderTerm() < ctx.term() ) + { + return outcome; } - case APPEND_ENTRIES_REQUEST: + outcome.setNextRole( FOLLOWER ); + log.info( "Moving to FOLLOWER state after receiving heartbeat from %s at term %d (I am at %d)", + req.from(), req.leaderTerm(), ctx.term() ); + Heart.beat( ctx, outcome, req, log ); + return outcome; + } + + @Override + public Outcome handle( RaftMessages.AppendEntries.Request req ) throws IOException + { + if ( req.leaderTerm() < ctx.term() ) { - RaftMessages.AppendEntries.Request req = (RaftMessages.AppendEntries.Request) message; + RaftMessages.AppendEntries.Response appendResponse = + new RaftMessages.AppendEntries.Response( ctx.myself(), ctx.term(), false, + req.prevLogIndex(), ctx.entryLog().appendIndex() ); - if ( req.leaderTerm() < ctx.term() ) - { - RaftMessages.AppendEntries.Response appendResponse = - new RaftMessages.AppendEntries.Response( ctx.myself(), ctx.term(), false, - req.prevLogIndex(), ctx.entryLog().appendIndex() ); + outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), appendResponse ) ); + return outcome; + } - outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), appendResponse ) ); - break; - } + outcome.setNextRole( FOLLOWER ); + log.info( "Moving to FOLLOWER state after receiving append entries request from %s at term %d (I am at %d)n", + req.from(), req.leaderTerm(), ctx.term() ); + Appending.handleAppendEntriesRequest( ctx, outcome, req, log ); + return outcome; + } + @Override + public Outcome handle( RaftMessages.Vote.Response res ) throws IOException + { + if ( res.term() > ctx.term() ) + { + outcome.setNextTerm( res.term() ); outcome.setNextRole( FOLLOWER ); - log.info( "Moving to FOLLOWER state after receiving append entries request from %s at term %d (I am at %d)n", - req.from(), req.leaderTerm(), ctx.term() ); - Appending.handleAppendEntriesRequest( ctx, outcome, req, log ); - break; + log.info( "Moving to FOLLOWER state after receiving vote response from %s at term %d (I am at %d)", + res.from(), res.term(), ctx.term() ); + return outcome; + } + else if ( res.term() < ctx.term() || !res.voteGranted() ) + { + return outcome; } - case VOTE_RESPONSE: + if ( !res.from().equals( ctx.myself() ) ) { - RaftMessages.Vote.Response res = (RaftMessages.Vote.Response) message; - - if ( res.term() > ctx.term() ) - { - outcome.setNextTerm( res.term() ); - outcome.setNextRole( FOLLOWER ); - log.info( "Moving to FOLLOWER state after receiving vote response from %s at term %d (I am at %d)", - res.from(), res.term(), ctx.term() ); - break; - } - else if ( res.term() < ctx.term() || !res.voteGranted() ) - { - break; - } - - if ( !res.from().equals( ctx.myself() ) ) - { - outcome.addVoteForMe( res.from() ); - } - - if ( isQuorum( ctx.votingMembers().size(), outcome.getVotesForMe().size() ) ) - { - outcome.setLeader( ctx.myself() ); - Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() ); - Leader.sendHeartbeats( ctx, outcome ); - - outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() ); - outcome.electedLeader(); - outcome.renewElectionTimeout(); - outcome.setNextRole( LEADER ); - log.info( "Moving to LEADER state at term %d (I am %s), voted for by %s", - ctx.term(), ctx.myself(), outcome.getVotesForMe() ); - } - break; + outcome.addVoteForMe( res.from() ); } - case VOTE_REQUEST: + if ( isQuorum( ctx.votingMembers(), outcome.getVotesForMe() ) ) { - RaftMessages.Vote.Request req = (RaftMessages.Vote.Request) message; - if ( req.term() > ctx.term() ) - { - outcome.getVotesForMe().clear(); - outcome.setNextRole( FOLLOWER ); - log.info( "Moving to FOLLOWER state after receiving vote request from %s at term %d (I am at %d)", - req.from(), req.term(), ctx.term() ); - Voting.handleVoteRequest( ctx, outcome, req, log ); - break; - } - - outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), - new RaftMessages.Vote.Response( ctx.myself(), outcome.getTerm(), false ) ) ); - break; + outcome.setLeader( ctx.myself() ); + Appending.appendNewEntry( ctx, outcome, new NewLeaderBarrier() ); + Leader.sendHeartbeats( ctx, outcome ); + + outcome.setLastLogIndexBeforeWeBecameLeader( ctx.entryLog().appendIndex() ); + outcome.electedLeader(); + outcome.renewElectionTimeout(); + outcome.setNextRole( LEADER ); + log.info( "Moving to LEADER state at term %d (I am %s), voted for by %s", + ctx.term(), ctx.myself(), outcome.getVotesForMe() ); } + return outcome; + } - case ELECTION_TIMEOUT: + @Override + public Outcome handle( RaftMessages.Vote.Request req ) throws IOException + { + if ( req.term() > ctx.term() ) { - log.info( "Failed to get elected. Got votes from: %s", ctx.votesForMe() ); - if ( !Election.start( ctx, outcome, log ) ) - { - log.info( "Moving to FOLLOWER state after failing to start election" ); - outcome.setNextRole( FOLLOWER ); - } - break; + outcome.getVotesForMe().clear(); + outcome.setNextRole( FOLLOWER ); + log.info( "Moving to FOLLOWER state after receiving vote request from %s at term %d (I am at %d)", + req.from(), req.term(), ctx.term() ); + Voting.handleVoteRequest( ctx, outcome, req, log ); + return outcome; } - case PRUNE_REQUEST: + outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), + new RaftMessages.Vote.Response( ctx.myself(), outcome.getTerm(), false ) ) ); + return outcome; + } + + @Override + public Outcome handle( RaftMessages.Timeout.Election election ) throws IOException + { + log.info( "Failed to get elected. Got votes from: %s", ctx.votesForMe() ); + if ( !Election.startRealElection( ctx, outcome, log ) ) { - Pruning.handlePruneRequest( outcome, (RaftMessages.PruneRequest) message ); - break; + log.info( "Moving to FOLLOWER state after failing to start election" ); + outcome.setNextRole( FOLLOWER ); } + return outcome; + } - default: - break; + @Override + public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException + { + return outcome; } - return outcome; + @Override + public Outcome handle( RaftMessages.PreVote.Response response ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.AppendEntries.Response response ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.LogCompactionInfo logCompactionInfo ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.HeartbeatResponse heartbeatResponse ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.Timeout.Heartbeat heartbeat ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.NewEntry.Request request ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.NewEntry.BatchRequest batchRequest ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.PruneRequest pruneRequest ) throws IOException + { + Pruning.handlePruneRequest( outcome, pruneRequest ); + return outcome; + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Election.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Election.java index 589185aad28a1..3257f6fd27fe3 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Election.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Election.java @@ -30,7 +30,7 @@ public class Election { - public static boolean start( ReadableRaftState ctx, Outcome outcome, Log log ) throws IOException + public static boolean startRealElection( ReadableRaftState ctx, Outcome outcome, Log log ) throws IOException { Set currentMembers = ctx.votingMembers(); if ( currentMembers == null || !currentMembers.contains( ctx.myself() ) ) @@ -54,4 +54,26 @@ public static boolean start( ReadableRaftState ctx, Outcome outcome, Log log ) t log.info( "Election started with vote request: %s and members: %s", voteForMe, currentMembers ); return true; } + + public static boolean startPreElection( ReadableRaftState ctx, Outcome outcome, Log log ) throws IOException + { + Set currentMembers = ctx.votingMembers(); + if ( currentMembers == null || !currentMembers.contains( ctx.myself() ) ) + { + log.info( "Pre-election attempted but not started, current members are %s, I am %s", + currentMembers, ctx.myself() ); + return false; + } + + RaftMessages.PreVote.Request preVoteForMe = + new RaftMessages.PreVote.Request( ctx.myself(), outcome.getTerm(), ctx.myself(), ctx.entryLog() + .appendIndex(), ctx.entryLog().readEntryTerm( ctx.entryLog().appendIndex() ) ); + + currentMembers.stream().filter( member -> !member.equals( ctx.myself() ) ).forEach( member -> + outcome.addOutgoingMessage( new RaftMessages.Directed( member, preVoteForMe ) ) + ); + + log.info( "Pre-election started with: %s and members: %s", preVoteForMe, currentMembers ); + return true; + } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Follower.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Follower.java index e077348eee4d5..fb696ce35ab4f 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Follower.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Follower.java @@ -30,12 +30,13 @@ import org.neo4j.logging.Log; import static java.lang.Long.min; +import static org.neo4j.causalclustering.core.consensus.MajorityIncludingSelfQuorum.isQuorum; import static org.neo4j.causalclustering.core.consensus.roles.Role.CANDIDATE; import static org.neo4j.causalclustering.core.consensus.roles.Role.FOLLOWER; class Follower implements RaftMessageHandler { - static boolean logHistoryMatches( ReadableRaftState ctx, long leaderSegmentPrevIndex, long leaderSegmentPrevTerm, Log log ) + static boolean logHistoryMatches( ReadableRaftState ctx, long leaderSegmentPrevIndex, long leaderSegmentPrevTerm ) throws IOException { // NOTE: A prevLogIndex before or at our log's prevIndex means that we @@ -47,11 +48,8 @@ static boolean logHistoryMatches( ReadableRaftState ctx, long leaderSegmentPrevI long localLogPrevIndex = ctx.entryLog().prevIndex(); long localSegmentPrevTerm = ctx.entryLog().readEntryTerm( leaderSegmentPrevIndex ); - boolean logHistoryMatches = - leaderSegmentPrevIndex > -1 && - (leaderSegmentPrevIndex <= localLogPrevIndex || localSegmentPrevTerm == leaderSegmentPrevTerm); - - return logHistoryMatches; + return leaderSegmentPrevIndex > -1 && + (leaderSegmentPrevIndex <= localLogPrevIndex || localSegmentPrevTerm == leaderSegmentPrevTerm); } static void commitToLogOnUpdate( ReadableRaftState ctx, long indexOfLastNewEntry, long leaderCommit, Outcome outcome ) @@ -80,62 +78,238 @@ private static void handleLeaderLogCompaction( ReadableRaftState ctx, Outcome ou @Override public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log ) throws IOException { - Outcome outcome = new Outcome( FOLLOWER, ctx ); + return message.dispatch( visitor( ctx, log ) ); + } + + private abstract static class Handler implements RaftMessages.Handler + { + protected final ReadableRaftState ctx; + protected final Log log; + protected final Outcome outcome; + + Handler( ReadableRaftState ctx, Log log ) + { + this.ctx = ctx; + this.log = log; + this.outcome = new Outcome( FOLLOWER, ctx ); + } + + @Override + public Outcome handle( Heartbeat heartbeat ) throws IOException + { + Heart.beat( ctx, outcome, heartbeat, log ); + return outcome; + } + + @Override + public Outcome handle( AppendEntries.Request request ) throws IOException + { + Appending.handleAppendEntriesRequest( ctx, outcome, request, log ); + return outcome; + } + + @Override + public Outcome handle( RaftMessages.Vote.Request request ) throws IOException + { + Voting.handleVoteRequest( ctx, outcome, request, log ); + return outcome; + } + + @Override + public Outcome handle( RaftMessages.LogCompactionInfo logCompactionInfo ) throws IOException + { + handleLeaderLogCompaction( ctx, outcome, logCompactionInfo ); + return outcome; + } + + @Override + public Outcome handle( RaftMessages.Vote.Response response ) throws IOException + { + log.info( "Late vote response: %s", response ); + return outcome; + } - switch ( message.type() ) + @Override + public Outcome handle( RaftMessages.PruneRequest pruneRequest ) throws IOException { - case HEARTBEAT: + Pruning.handlePruneRequest( outcome, pruneRequest ); + return outcome; + } + + @Override + public Outcome handle( AppendEntries.Response response ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.HeartbeatResponse heartbeatResponse ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.Timeout.Heartbeat heartbeat ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.NewEntry.Request request ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( RaftMessages.NewEntry.BatchRequest batchRequest ) throws IOException + { + return outcome; + } + } + + abstract class PreVoteSupportedHandler extends Handler + { + + PreVoteSupportedHandler( ReadableRaftState ctx, Log log ) + { + super( ctx, log ); + } + + @Override + public Outcome handle( RaftMessages.Timeout.Election election ) throws IOException + { + log.info( "Election timeout triggered" ); + if ( Election.startPreElection( ctx, outcome, log ) ) { - Heart.beat( ctx, outcome, (Heartbeat) message, log ); - break; + outcome.setPreElection( true ); } + return outcome; + } + } + + class PreVoteActiveHandler extends PreVoteSupportedHandler + { + + PreVoteActiveHandler( ReadableRaftState ctx, Log log ) + { + super( ctx, log ); + } - case APPEND_ENTRIES_REQUEST: + @Override + public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException + { + Voting.handlePreVoteRequest( ctx, outcome, request, log ); + return outcome; + } + + @Override + public Outcome handle( RaftMessages.PreVote.Response res ) throws IOException + { + if ( res.term() > ctx.term() ) { - Appending.handleAppendEntriesRequest( ctx, outcome, (AppendEntries.Request) message, log ); - break; + outcome.setNextTerm( res.term() ); + log.info( "Aborting pre-election after receiving pre-vote response from %s at term %d (I am at %d)", + res.from(), res.term(), ctx.term() ); + return outcome; } - - case VOTE_REQUEST: + else if ( res.term() < ctx.term() || !res.voteGranted() ) { - Voting.handleVoteRequest( ctx, outcome, (RaftMessages.Vote.Request) message, log ); - break; + return outcome; } - case LOG_COMPACTION_INFO: + if ( !res.from().equals( ctx.myself() ) ) { - handleLeaderLogCompaction( ctx, outcome, (RaftMessages.LogCompactionInfo) message ); - break; + outcome.addPreVoteForMe( res.from() ); } - case ELECTION_TIMEOUT: + if ( isQuorum( ctx.votingMembers(), outcome.getPreVotesForMe() ) ) { - log.info( "Election timeout triggered" ); - if ( Election.start( ctx, outcome, log ) ) + outcome.renewElectionTimeout(); + outcome.setPreElection( false ); + if ( Election.startRealElection( ctx, outcome, log ) ) { outcome.setNextRole( CANDIDATE ); - log.info( "Moving to CANDIDATE state after successfully starting election" ); + log.info( "Moving to CANDIDATE state after successful pre-election stage" ); } - break; } + return outcome; + } + } - case VOTE_RESPONSE: - { - RaftMessages.Vote.Response voteResponse = (RaftMessages.Vote.Response) message; - log.info( "Late vote response: %s", voteResponse ); - break; - } + class PreVoteInactiveHandler extends PreVoteSupportedHandler + { + + PreVoteInactiveHandler( ReadableRaftState ctx, Log log ) + { + super( ctx, log ); + } + + @Override + public Outcome handle( RaftMessages.PreVote.Response response ) throws IOException + { + return outcome; + } - case PRUNE_REQUEST: + @Override + public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException + { + outcome.addOutgoingMessage( new RaftMessages.Directed( + request.from(), + new RaftMessages.PreVote.Response( ctx.myself(), outcome.getTerm(), false ) + ) ); + return outcome; + } + } + + class PreVoteUnsupportedHandler extends Handler + { + + PreVoteUnsupportedHandler( ReadableRaftState ctx, Log log ) + { + super( ctx, log ); + } + + @Override + public Outcome handle( RaftMessages.Timeout.Election election ) throws IOException + { + log.info( "Election timeout triggered" ); + if ( Election.startRealElection( ctx, outcome, log ) ) { - Pruning.handlePruneRequest( outcome, (RaftMessages.PruneRequest) message ); - break; + outcome.setNextRole( CANDIDATE ); + log.info( "Moving to CANDIDATE state after successfully starting election" ); } + return outcome; + } - default: - break; + @Override + public Outcome handle( RaftMessages.PreVote.Response response ) throws IOException + { + return outcome; } - return outcome; + @Override + public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException + { + return outcome; + } + } + + private Handler visitor( ReadableRaftState ctx, Log log ) + { + if ( ctx.supportPreVoting() ) + { + if ( ctx.isPreElection() ) + { + return new PreVoteActiveHandler( ctx, log ); + } + else + { + return new PreVoteInactiveHandler( ctx, log ); + } + } + else + { + return new PreVoteUnsupportedHandler( ctx, log ); + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java index 34ee0435b0cde..b861273972ee9 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Heart.java @@ -37,13 +37,14 @@ static void beat( ReadableRaftState state, Outcome outcome, RaftMessages.Heartbe } outcome.renewElectionTimeout(); + outcome.setPreElection( false ); outcome.setNextTerm( request.leaderTerm() ); outcome.setLeader( request.from() ); outcome.setLeaderCommit( request.commitIndex() ); outcome.addOutgoingMessage( new RaftMessages.Directed( request.from(), new RaftMessages.HeartbeatResponse( state.myself() ) ) ); - if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm(), log ) ) + if ( !Follower.logHistoryMatches( state, request.commitIndex(), request.commitIndexTerm() ) ) { return; } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java index f708ac37ff7b1..77c832405ab1c 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Leader.java @@ -63,39 +63,53 @@ static void sendHeartbeats( ReadableRaftState ctx, Outcome outcome ) throws IOEx @Override public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, Log log ) throws IOException { - Outcome outcome = new Outcome( LEADER, ctx ); + return message.dispatch( new Handler( ctx, log ) ); + } - switch ( message.type() ) - { - case HEARTBEAT: + private static class Handler implements RaftMessages.Handler + { + private final ReadableRaftState ctx; + private final Log log; + private final Outcome outcome; + + Handler( ReadableRaftState ctx, Log log ) { - Heartbeat req = (Heartbeat) message; + this.ctx = ctx; + this.log = log; + this.outcome = new Outcome( LEADER, ctx ); + } - if ( req.leaderTerm() < ctx.term() ) + @Override + public Outcome handle( Heartbeat heartbeat ) throws IOException + { + if ( heartbeat.leaderTerm() < ctx.term() ) { - break; + return outcome; } stepDownToFollower( outcome ); log.info( "Moving to FOLLOWER state after receiving heartbeat at term %d (my term is " + "%d) from %s", - req.leaderTerm(), ctx.term(), req.from() ); - Heart.beat( ctx, outcome, (Heartbeat) message, log ); - break; + heartbeat.leaderTerm(), ctx.term(), heartbeat.from() ); + Heart.beat( ctx, outcome, heartbeat, log ); + return outcome; } - case HEARTBEAT_TIMEOUT: + @Override + public Outcome handle( RaftMessages.Timeout.Heartbeat heartbeat ) throws IOException { sendHeartbeats( ctx, outcome ); - break; + return outcome; } - case HEARTBEAT_RESPONSE: + @Override + public Outcome handle( RaftMessages.HeartbeatResponse heartbeatResponse ) throws IOException { - outcome.addHeartbeatResponse( message.from() ); - break; + outcome.addHeartbeatResponse( heartbeatResponse.from() ); + return outcome; } - case ELECTION_TIMEOUT: + @Override + public Outcome handle( RaftMessages.Timeout.Election election ) throws IOException { if ( !isQuorum( ctx.votingMembers().size(), ctx.heartbeatResponses().size() ) ) { @@ -105,13 +119,13 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, } outcome.getHeartbeatResponses().clear(); - break; + return outcome; + } - case APPEND_ENTRIES_REQUEST: + @Override + public Outcome handle( RaftMessages.AppendEntries.Request req ) throws IOException { - RaftMessages.AppendEntries.Request req = (RaftMessages.AppendEntries.Request) message; - if ( req.leaderTerm() < ctx.term() ) { RaftMessages.AppendEntries.Response appendResponse = @@ -119,7 +133,7 @@ public Outcome handle( RaftMessages.RaftMessage message, ReadableRaftState ctx, ctx.entryLog().appendIndex() ); outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), appendResponse ) ); - break; + return outcome; } else if ( req.leaderTerm() == ctx.term() ) { @@ -132,18 +146,18 @@ else if ( req.leaderTerm() == ctx.term() ) log.info( "Moving to FOLLOWER state after receiving append request at term %d (my term is " + "%d) from %s", req.leaderTerm(), ctx.term(), req.from() ); Appending.handleAppendEntriesRequest( ctx, outcome, req, log ); - break; + return outcome; } + } - case APPEND_ENTRIES_RESPONSE: + @Override + public Outcome handle( RaftMessages.AppendEntries.Response response ) throws IOException { - RaftMessages.AppendEntries.Response response = (RaftMessages.AppendEntries.Response) message; - if ( response.term() < ctx.term() ) { /* Ignore responses from old terms! */ - break; + return outcome; } else if ( response.term() > ctx.term() ) { @@ -152,7 +166,7 @@ else if ( response.term() > ctx.term() ) log.info( "Moving to FOLLOWER state after receiving append response at term %d (my term is " + "%d) from %s", response.term(), ctx.term(), response.from() ); outcome.replaceFollowerStates( new FollowerStates<>() ); - break; + return outcome; } FollowerState follower = ctx.followerStates().get( response.from() ); @@ -213,13 +227,12 @@ else if ( response.term() > ctx.term() ) outcome.addOutgoingMessage( directedCompactionInfo ); } } - break; + return outcome; } - case VOTE_REQUEST: + @Override + public Outcome handle( RaftMessages.Vote.Request req ) throws IOException { - RaftMessages.Vote.Request req = (RaftMessages.Vote.Request) message; - if ( req.term() > ctx.term() ) { stepDownToFollower( outcome ); @@ -228,47 +241,80 @@ else if ( response.term() > ctx.term() ) req.term(), ctx.term(), req.from() ); Voting.handleVoteRequest( ctx, outcome, req, log ); - break; + return outcome; } outcome.addOutgoingMessage( new RaftMessages.Directed( req.from(), new RaftMessages.Vote.Response( ctx.myself(), ctx.term(), false ) ) ); - break; + return outcome; } - case NEW_ENTRY_REQUEST: + @Override + public Outcome handle( RaftMessages.NewEntry.Request req ) throws IOException { - RaftMessages.NewEntry.Request req = (RaftMessages.NewEntry.Request) message; ReplicatedContent content = req.content(); Appending.appendNewEntry( ctx, outcome, content ); - break; + return outcome; + } - case NEW_BATCH_REQUEST: + @Override + public Outcome handle( RaftMessages.NewEntry.BatchRequest req ) throws IOException { - RaftMessages.NewEntry.BatchRequest req = (RaftMessages.NewEntry.BatchRequest) message; List contents = req.contents(); Appending.appendNewEntries( ctx, outcome, contents ); - break; + return outcome; } - case PRUNE_REQUEST: + @Override + public Outcome handle( RaftMessages.PruneRequest pruneRequest ) throws IOException { - Pruning.handlePruneRequest( outcome, (RaftMessages.PruneRequest) message ); - break; + Pruning.handlePruneRequest( outcome, pruneRequest ); + return outcome; } - default: - break; + @Override + public Outcome handle( RaftMessages.Vote.Response response ) throws IOException + { + return outcome; } - return outcome; - } + @Override + public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException + { + if ( request.term() > ctx.term() ) + { + log.info( + "Considering pre vote request at term %d (my term is " + "%d) from %s", + request.term(), ctx.term(), request.from() ); - private void stepDownToFollower( Outcome outcome ) - { - outcome.steppingDown(); - outcome.setNextRole( FOLLOWER ); - outcome.setLeader( null ); + Voting.handlePreVoteRequest( ctx, outcome, request, log ); + } + else + { + outcome.addOutgoingMessage( new RaftMessages.Directed( request.from(), + new RaftMessages.PreVote.Response( ctx.myself(), ctx.term(), false ) ) ); + } + return outcome; + } + + @Override + public Outcome handle( RaftMessages.PreVote.Response response ) throws IOException + { + return outcome; + } + + @Override + public Outcome handle( LogCompactionInfo logCompactionInfo ) throws IOException + { + return outcome; + } + + private void stepDownToFollower( Outcome outcome ) + { + outcome.steppingDown(); + outcome.setNextRole( FOLLOWER ); + outcome.setLeader( null ); + } } } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java index 7a9379b229b1c..ed0799f6d7bf8 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/roles/Voting.java @@ -20,6 +20,7 @@ package org.neo4j.causalclustering.core.consensus.roles; import java.io.IOException; +import java.util.Optional; import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.outcome.Outcome; @@ -41,7 +42,7 @@ static void handleVoteRequest( ReadableRaftState state, Outcome outcome, boolean willVoteForCandidate = shouldVoteFor( voteRequest.candidate(), outcome.getTerm(), voteRequest.term(), state.entryLog().readEntryTerm( state.entryLog().appendIndex() ), voteRequest.lastLogTerm(), state.entryLog().appendIndex(), voteRequest.lastLogIndex(), - outcome.getVotedFor(), log ); + Optional.ofNullable( outcome.getVotedFor() ), log ); if ( willVoteForCandidate ) { @@ -54,10 +55,28 @@ static void handleVoteRequest( ReadableRaftState state, Outcome outcome, willVoteForCandidate ) ) ); } + static void handlePreVoteRequest( ReadableRaftState state, Outcome outcome, + RaftMessages.PreVote.Request voteRequest, Log log ) throws IOException + { + if ( voteRequest.term() > state.term() ) + { + outcome.setNextTerm( voteRequest.term() ); + } + + boolean willVoteForCandidate = shouldVoteFor( voteRequest.candidate(), outcome.getTerm(), voteRequest.term(), + state.entryLog().readEntryTerm( state.entryLog().appendIndex() ), voteRequest.lastLogTerm(), + state.entryLog().appendIndex(), voteRequest.lastLogIndex(), + Optional.empty(), log ); + + outcome.addOutgoingMessage( new RaftMessages.Directed( voteRequest.from(), new RaftMessages.PreVote.Response( + state.myself(), outcome.getTerm(), + willVoteForCandidate ) ) ); + } + public static boolean shouldVoteFor( MemberId candidate, long contextTerm, long requestTerm, long contextLastLogTerm, long requestLastLogTerm, long contextLastAppended, long requestLastLogIndex, - MemberId votedFor, Log log ) + Optional votedFor, Log log ) { if ( requestTerm < contextTerm ) { @@ -68,11 +87,12 @@ public static boolean shouldVoteFor( MemberId candidate, long contextTerm, long boolean requestLogEndsAtHigherTerm = requestLastLogTerm > contextLastLogTerm; boolean logsEndAtSameTerm = requestLastLogTerm == contextLastLogTerm; boolean requestLogAtLeastAsLongAsMyLog = requestLastLogIndex >= contextLastAppended; + boolean requesterLogUpToDate = requestLogEndsAtHigherTerm || (logsEndAtSameTerm && requestLogAtLeastAsLongAsMyLog); boolean votedForOtherInSameTerm = requestTerm == contextTerm && - votedFor != null && !votedFor.equals( candidate ); + votedFor.map( member -> !member.equals( candidate ) ).orElse( false ); boolean shouldVoteFor = requesterLogUpToDate && !votedForOtherInSameTerm; diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java index 2475393b54459..c3806e8749211 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/RaftState.java @@ -46,24 +46,27 @@ public class RaftState implements ReadableRaftState private final Log log; private final RaftLog entryLog; private final InFlightCache inFlightCache; + private final boolean supportPreVoting; private TermState termState; private VoteState voteState; private MemberId leader; private Set votesForMe = new HashSet<>(); + private Set preVotesForMe = new HashSet<>(); private Set heartbeatResponses = new HashSet<>(); private FollowerStates followerStates = new FollowerStates<>(); private long leaderCommit = -1; private long commitIndex = -1; private long lastLogIndexBeforeWeBecameLeader = -1; + private boolean isPreElection; public RaftState( MemberId myself, StateStorage termStorage, RaftMembership membership, RaftLog entryLog, StateStorage voteStorage, - InFlightCache inFlightCache, LogProvider logProvider ) + InFlightCache inFlightCache, LogProvider logProvider, boolean supportPreVoting ) { this.myself = myself; this.termStorage = termStorage; @@ -71,7 +74,11 @@ public RaftState( MemberId myself, this.membership = membership; this.entryLog = entryLog; this.inFlightCache = inFlightCache; - log = logProvider.getLog( getClass() ); + this.supportPreVoting = supportPreVoting; + this.log = logProvider.getLog( getClass() ); + + // Initial state + this.isPreElection = supportPreVoting; } @Override @@ -170,6 +177,24 @@ public long commitIndex() return commitIndex; } + @Override + public boolean supportPreVoting() + { + return supportPreVoting; + } + + @Override + public boolean isPreElection() + { + return isPreElection; + } + + @Override + public Set preVotesForMe() + { + return preVotesForMe; + } + public void update( Outcome outcome ) throws IOException { if ( termState().update( outcome.getTerm() ) ) @@ -186,9 +211,11 @@ public void update( Outcome outcome ) throws IOException leaderCommit = outcome.getLeaderCommit(); votesForMe = outcome.getVotesForMe(); + preVotesForMe = outcome.getPreVotesForMe(); heartbeatResponses = outcome.getHeartbeatResponses(); lastLogIndexBeforeWeBecameLeader = outcome.getLastLogIndexBeforeWeBecameLeader(); followerStates = outcome.getFollowerStates(); + isPreElection = outcome.isPreElection(); for ( RaftLogCommand logCommand : outcome.getLogCommands() ) { diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java index dc57c3cce8e77..bc75d50b70feb 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/core/consensus/state/ReadableRaftState.java @@ -52,4 +52,10 @@ public interface ReadableRaftState ReadableRaftLog entryLog(); long commitIndex(); + + boolean supportPreVoting(); + + boolean isPreElection(); + + Set preVotesForMe(); } diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java index d6ad84d391da7..e5b7a3f3d7e7a 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageDecoder.java @@ -41,6 +41,8 @@ import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.HEARTBEAT_RESPONSE; import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.LOG_COMPACTION_INFO; import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.NEW_ENTRY_REQUEST; +import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.PRE_VOTE_REQUEST; +import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.PRE_VOTE_RESPONSE; import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.VOTE_REQUEST; import static org.neo4j.causalclustering.core.consensus.RaftMessages.Type.VOTE_RESPONSE; @@ -83,6 +85,23 @@ else if ( messageType.equals( VOTE_RESPONSE ) ) result = new RaftMessages.Vote.Response( from, term, voteGranted ); } + else if ( messageType.equals( PRE_VOTE_REQUEST ) ) + { + MemberId candidate = retrieveMember( channel ); + + long term = channel.getLong(); + long lastLogIndex = channel.getLong(); + long lastLogTerm = channel.getLong(); + + result = new RaftMessages.PreVote.Request( from, term, candidate, lastLogIndex, lastLogTerm ); + } + else if ( messageType.equals( PRE_VOTE_RESPONSE ) ) + { + long term = channel.getLong(); + boolean voteGranted = channel.get() == 1; + + result = new RaftMessages.PreVote.Response( from, term, voteGranted ); + } else if ( messageType.equals( APPEND_ENTRIES_REQUEST ) ) { // how many diff --git a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java index 68e59cc4a5744..803ec78ebc827 100644 --- a/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java +++ b/enterprise/causal-clustering/src/main/java/org/neo4j/causalclustering/messaging/marshalling/RaftMessageEncoder.java @@ -53,25 +53,65 @@ protected synchronized void encode( ChannelHandlerContext ctx, channel.putInt( message.type().ordinal() ); memberMarshal.marshal( message.from(), channel ); - if ( message instanceof RaftMessages.Vote.Request ) + message.dispatch( new Handler( marshal, memberMarshal, channel ) ); + } + + private static class Handler implements RaftMessages.Handler + { + private final ChannelMarshal marshal; + private final MemberId.Marshal memberMarshal; + private final NetworkFlushableByteBuf channel; + + Handler( ChannelMarshal marshal, MemberId.Marshal memberMarshal, NetworkFlushableByteBuf channel ) + { + this.marshal = marshal; + this.memberMarshal = memberMarshal; + this.channel = channel; + } + + @Override + public Void handle( RaftMessages.Vote.Request voteRequest ) throws Exception { - RaftMessages.Vote.Request voteRequest = (RaftMessages.Vote.Request) message; memberMarshal.marshal( voteRequest.candidate(), channel ); channel.putLong( voteRequest.term() ); channel.putLong( voteRequest.lastLogIndex() ); channel.putLong( voteRequest.lastLogTerm() ); + + return (Void)null; } - else if ( message instanceof RaftMessages.Vote.Response ) + + @Override + public Void handle( RaftMessages.Vote.Response voteResponse ) throws Exception { - RaftMessages.Vote.Response voteResponse = (RaftMessages.Vote.Response) message; channel.putLong( voteResponse.term() ); channel.put( (byte) (voteResponse.voteGranted() ? 1 : 0) ); + + return (Void)null; } - else if ( message instanceof RaftMessages.AppendEntries.Request ) + + @Override + public Void handle( RaftMessages.PreVote.Request preVoteRequest ) throws Exception + { + memberMarshal.marshal( preVoteRequest.candidate(), channel ); + channel.putLong( preVoteRequest.term() ); + channel.putLong( preVoteRequest.lastLogIndex() ); + channel.putLong( preVoteRequest.lastLogTerm() ); + + return (Void)null; + } + + @Override + public Void handle( RaftMessages.PreVote.Response preVoteResponse ) throws Exception { - RaftMessages.AppendEntries.Request appendRequest = (RaftMessages.AppendEntries - .Request) message; + channel.putLong( preVoteResponse.term() ); + channel.put( (byte) (preVoteResponse.voteGranted() ? 1 : 0) ); + return (Void)null; + } + + @Override + public Void handle( RaftMessages.AppendEntries.Request appendRequest ) throws Exception + { channel.putLong( appendRequest.leaderTerm() ); channel.putLong( appendRequest.prevLogIndex() ); channel.putLong( appendRequest.prevLogTerm() ); @@ -84,43 +124,80 @@ else if ( message instanceof RaftMessages.AppendEntries.Request ) channel.putLong( raftLogEntry.term() ); marshal.marshal( raftLogEntry.content(), channel ); } + + return (Void)null; } - else if ( message instanceof RaftMessages.AppendEntries.Response ) - { - RaftMessages.AppendEntries.Response appendResponse = - (RaftMessages.AppendEntries.Response) message; + @Override + public Void handle( RaftMessages.AppendEntries.Response appendResponse ) throws Exception + { channel.putLong( appendResponse.term() ); channel.put( (byte) (appendResponse.success() ? 1 : 0) ); channel.putLong( appendResponse.matchIndex() ); channel.putLong( appendResponse.appendIndex() ); + + return (Void)null; } - else if ( message instanceof RaftMessages.NewEntry.Request ) + + @Override + public Void handle( RaftMessages.NewEntry.Request newEntryRequest ) throws Exception { - RaftMessages.NewEntry.Request newEntryRequest = (RaftMessages.NewEntry - .Request) message; marshal.marshal( newEntryRequest.content(), channel ); + + return (Void)null; } - else if ( message instanceof RaftMessages.Heartbeat ) + + @Override + public Void handle( RaftMessages.Heartbeat heartbeat ) throws Exception { - RaftMessages.Heartbeat heartbeat = (RaftMessages.Heartbeat) message; channel.putLong( heartbeat.leaderTerm() ); channel.putLong( heartbeat.commitIndexTerm() ); channel.putLong( heartbeat.commitIndex() ); + + return (Void)null; } - else if ( message instanceof RaftMessages.HeartbeatResponse ) + + @Override + public Void handle( RaftMessages.HeartbeatResponse heartbeatResponse ) throws Exception { - //Heartbeat Response does not have any data attached to it. + // Heartbeat Response does not have any data attached to it. + return (Void)null; } - else if ( message instanceof RaftMessages.LogCompactionInfo ) + + @Override + public Void handle( RaftMessages.LogCompactionInfo logCompactionInfo ) throws Exception { - RaftMessages.LogCompactionInfo logCompactionInfo = (RaftMessages.LogCompactionInfo) message; channel.putLong( logCompactionInfo.leaderTerm() ); channel.putLong( logCompactionInfo.prevIndex() ); + return (Void)null; } - else + + @Override + public Void handle( RaftMessages.Timeout.Election election ) throws Exception + { + // Not network + return (Void)null; + } + + @Override + public Void handle( RaftMessages.Timeout.Heartbeat heartbeat ) throws Exception + { + // Not network + return (Void)null; + } + + @Override + public Void handle( RaftMessages.NewEntry.BatchRequest batchRequest ) throws Exception + { + // Not network + return (Void)null; + } + + @Override + public Void handle( RaftMessages.PruneRequest pruneRequest ) throws Exception { - throw new IllegalArgumentException( "Unknown message type: " + message ); + // Not network + return (Void)null; } } } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java index eab6cadf37a1a..603ea38ed7f9d 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/RaftMachineBuilder.java @@ -99,7 +99,7 @@ public RaftMachine build() retryTimeMillis, catchupBatchSize, maxAllowedShippingLag, inFlightCache ); RaftMachine raft = new RaftMachine( member, termState, voteState, raftLog, electionTimeout, heartbeatInterval, renewableTimeoutService, outbound, logProvider, - membershipManager, logShipping, inFlightCache, false, monitors, clock ); + membershipManager, logShipping, inFlightCache, false, false, monitors, clock ); inbound.registerHandler( ( incomingMessage ) -> { try diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java index 1843f25fe31f1..5c7bda4a3f794 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/explorer/ComparableRaftState.java @@ -50,12 +50,14 @@ public class ComparableRaftState implements ReadableRaftState private long leaderCommit = -1; private MemberId votedFor = null; private Set votesForMe = new HashSet<>(); + private Set preVotesForMe = new HashSet<>(); private Set heartbeatResponses = new HashSet<>(); private long lastLogIndexBeforeWeBecameLeader = -1; private FollowerStates followerStates = new FollowerStates<>(); protected final RaftLog entryLog; private final InFlightCache inFlightCache; private long commitIndex = -1; + private boolean isPreElection = false; ComparableRaftState( MemberId myself, Set votingMembers, Set replicationMembers, RaftLog entryLog, InFlightCache inFlightCache, LogProvider logProvider ) @@ -152,6 +154,24 @@ public long commitIndex() return commitIndex; } + @Override + public boolean supportPreVoting() + { + return false; + } + + @Override + public boolean isPreElection() + { + return isPreElection; + } + + @Override + public Set preVotesForMe() + { + return preVotesForMe; + } + public void update( Outcome outcome ) throws IOException { term = outcome.getTerm(); @@ -160,6 +180,7 @@ public void update( Outcome outcome ) throws IOException votesForMe = outcome.getVotesForMe(); lastLogIndexBeforeWeBecameLeader = outcome.getLastLogIndexBeforeWeBecameLeader(); followerStates = outcome.getFollowerStates(); + isPreElection = outcome.isPreElection(); for ( RaftLogCommand logCommand : outcome.getLogCommands() ) { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java index 2ef13709bc537..568dae347f3b4 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/FollowerTest.java @@ -20,9 +20,6 @@ package org.neo4j.causalclustering.core.consensus.roles; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; import java.io.IOException; import java.util.Collection; @@ -38,7 +35,6 @@ import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.state.RaftState; import org.neo4j.causalclustering.identity.MemberId; -import org.neo4j.causalclustering.messaging.Inbound; import org.neo4j.logging.Log; import org.neo4j.logging.NullLogProvider; @@ -55,7 +51,6 @@ import static org.neo4j.causalclustering.identity.RaftTestMember.member; import static org.neo4j.helpers.collection.Iterators.asSet; -@RunWith(MockitoJUnitRunner.class) public class FollowerTest { private MemberId myself = member( 0 ); @@ -64,9 +59,6 @@ public class FollowerTest private MemberId member1 = member( 1 ); private MemberId member2 = member( 2 ); - @Mock - private Inbound inbound; - @Test public void followerShouldTransitToCandidateAndInstigateAnElectionAfterTimeout() throws Exception { diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/PreVotingInitiatorTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/PreVotingInitiatorTest.java new file mode 100644 index 0000000000000..92f65b4e1d03f --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/PreVotingInitiatorTest.java @@ -0,0 +1,355 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.consensus.roles; + +import org.junit.Test; + +import java.io.IOException; + +import org.neo4j.causalclustering.core.consensus.RaftMessages; +import org.neo4j.causalclustering.core.consensus.outcome.Outcome; +import org.neo4j.causalclustering.core.consensus.state.RaftState; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.neo4j.causalclustering.core.consensus.MessageUtils.messageFor; +import static org.neo4j.causalclustering.core.consensus.roles.Role.CANDIDATE; +import static org.neo4j.causalclustering.core.consensus.roles.Role.FOLLOWER; +import static org.neo4j.causalclustering.core.consensus.state.RaftStateBuilder.raftState; +import static org.neo4j.causalclustering.identity.RaftTestMember.member; +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class PreVotingInitiatorTest +{ + private MemberId myself = member( 0 ); + + /* A few members that we use at will in tests. */ + private MemberId member1 = member( 1 ); + private MemberId member2 = member( 2 ); + private MemberId member3 = member( 3 ); + private MemberId member4 = member( 4 ); + + @Test + public void shouldSetPreElectionOnElectionTimeout() throws Exception + { + // given + RaftState state = initialState(); + + // when + Outcome outcome = new Follower().handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome ); + + // then + assertThat( outcome.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome.isPreElection(), equalTo( true ) ); + } + + @Test + public void shouldSendPreVoteRequestsOnElectionTimeout() throws Exception + { + // given + RaftState state = initialState(); + + // when + Outcome outcome = new Follower().handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome ); + + // then + assertThat( messageFor( outcome, member1 ).type(), equalTo( RaftMessages.Type.PRE_VOTE_REQUEST ) ); + assertThat( messageFor( outcome, member2 ).type(), equalTo( RaftMessages.Type.PRE_VOTE_REQUEST ) ); + } + + @Test + public void shouldProceedToRealElectionIfReceiveQuorumOfPositiveResponses() throws Exception + { + // given + RaftState state = initialState(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + + // then + assertThat( outcome2.getRole(), equalTo( CANDIDATE ) ); + assertThat( outcome2.isPreElection(), equalTo( false ) ); + assertThat( outcome2.getPreVotesForMe(), contains( member1 ) ); + } + + @Test + public void shouldIgnorePositiveResponsesFromOlderTerm() throws Exception + { + // given + RaftState state = raftState() + .myself( myself ) + .term( 1 ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2 ) ) + .build(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + + // then + assertThat( outcome2.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome2.isPreElection(), equalTo( true ) ); + assertThat( outcome2.getPreVotesForMe(), empty() ); + } + + @Test + public void shouldIgnorePositiveResponsesIfNotInPreVotingStage() throws Exception + { + // given + RaftState state = raftState() + .myself( myself ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2 ) ) + .build(); + + Follower underTest = new Follower(); + + // when + Outcome outcome = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + + // then + assertThat( outcome.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome.isPreElection(), equalTo( false ) ); + assertThat( outcome.getPreVotesForMe(), empty() ); + } + + @Test + public void shouldNotMoveToRealElectionWithoutQuorum() throws Exception + { + // given + RaftState state = raftState() + .myself( myself ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2, member3, member4 ) ) + .build(); + + Follower underTest = new Follower(); + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + + // then + assertThat( outcome2.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome2.isPreElection(), equalTo( true ) ); + assertThat( outcome2.getPreVotesForMe(), contains( member1 ) ); + } + + @Test + public void shouldMoveToRealElectionWithQuorumOf5() throws Exception + { + // given + RaftState state = raftState() + .myself( myself ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2, member3, member4 ) ) + .build(); + + Follower underTest = new Follower(); + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + state.update( outcome2 ); + Outcome outcome3 = underTest.handle( new RaftMessages.PreVote.Response( member2, 0L, true ), state, log() ); + + // then + assertThat( outcome3.getRole(), equalTo( CANDIDATE ) ); + assertThat( outcome3.isPreElection(), equalTo( false ) ); + } + + @Test + public void shouldNotCountVotesFromSameMemberTwice() throws Exception + { + // given + RaftState state = raftState() + .myself( myself ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2, member3, member4 ) ) + .build(); + + Follower underTest = new Follower(); + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + state.update( outcome2 ); + Outcome outcome3 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + + // then + assertThat( outcome3.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome3.isPreElection(), equalTo( true ) ); + } + + @Test + public void shouldResetPreVotesWhenMovingBackToFollower() throws Exception + { + // given + RaftState state = initialState(); + + Outcome outcome1 = new Follower().handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + Outcome outcome2 = new Follower().handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + assertThat( CANDIDATE, equalTo( outcome2.getRole() ) ); + assertThat( outcome2.getPreVotesForMe(), contains( member1 ) ); + + // when + Outcome outcome3 = new Candidate().handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + + // then + assertThat( outcome3.getPreVotesForMe(), empty() ); + } + + @Test + public void shouldSendRealVoteRequestsIfReceivePositiveResponses() throws Exception + { + // given + RaftState state = initialState(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, true ), state, log() ); + + // then + assertThat( messageFor( outcome2, member1 ).type(), equalTo( RaftMessages.Type.VOTE_REQUEST ) ); + assertThat( messageFor( outcome2, member2 ).type(), equalTo( RaftMessages.Type.VOTE_REQUEST ) ); + } + + @Test + public void shouldNotProceedToRealElectionIfReceiveNegativeResponses() throws Exception + { + // given + RaftState state = initialState(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, false ), state, log() ); + state.update( outcome2 ); + Outcome outcome3 = underTest.handle( new RaftMessages.PreVote.Response( member2, 0L, false ), state, log() ); + + // then + assertThat( outcome3.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome3.isPreElection(), equalTo( true ) ); + assertThat( outcome3.getPreVotesForMe(), empty() ); + } + + @Test + public void shouldNotSendRealVoteRequestsIfReceiveNegativeResponses() throws Exception + { + // given + RaftState state = initialState(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.PreVote.Response( member1, 0L, false ), state, log() ); + state.update( outcome2 ); + Outcome outcome3 = underTest.handle( new RaftMessages.PreVote.Response( member2, 0L, false ), state, log() ); + + // then + assertThat( outcome2.getOutgoingMessages(), empty() ); + assertThat( outcome3.getOutgoingMessages(), empty() ); + } + + @Test + public void shouldResetPreVoteIfReceiveHeartbeatFromLeader() throws Exception + { + // given + RaftState state = initialState(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.Heartbeat( member1, 0L, 0L, 0L ), state, log() ); + + // then + assertThat( outcome2.getRole(), equalTo( FOLLOWER ) ); + assertThat( outcome2.isPreElection(), equalTo( false ) ); + assertThat( outcome2.getPreVotesForMe(), empty() ); + } + + @Test + public void shouldNotSendPreVoteRequestsIfReceiveHeartbeatFromLeader() throws Exception + { + // given + RaftState state = initialState(); + + Follower underTest = new Follower(); + + Outcome outcome1 = underTest.handle( new RaftMessages.Timeout.Election( myself ), state, log() ); + state.update( outcome1 ); + + // when + Outcome outcome2 = underTest.handle( new RaftMessages.Heartbeat( member1, 0L, 0L, 0L ), state, log() ); + state.update( outcome2 ); + Outcome outcome3 = underTest.handle( new RaftMessages.PreVote.Response( member2, 0L, true ), state, log() ); + + // then + assertThat( outcome3.isPreElection(), equalTo( false ) ); + } + + private Log log() + { + return NullLogProvider.getInstance().getLog( getClass() ); + } + + private RaftState initialState() throws IOException + { + return raftState() + .myself( myself ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2 ) ) + .build(); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/PreVotingVoterTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/PreVotingVoterTest.java new file mode 100644 index 0000000000000..9370fe3a6b48d --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/roles/PreVotingVoterTest.java @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.core.consensus.roles; + +import org.junit.Test; + +import java.io.IOException; + +import org.neo4j.causalclustering.core.consensus.RaftMessages; +import org.neo4j.causalclustering.core.consensus.outcome.Outcome; +import org.neo4j.causalclustering.core.consensus.state.RaftState; +import org.neo4j.causalclustering.identity.MemberId; +import org.neo4j.logging.Log; +import org.neo4j.logging.NullLogProvider; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.neo4j.causalclustering.core.consensus.MessageUtils.messageFor; +import static org.neo4j.causalclustering.core.consensus.state.RaftStateBuilder.raftState; +import static org.neo4j.causalclustering.identity.RaftTestMember.member; +import static org.neo4j.helpers.collection.Iterators.asSet; + +public class PreVotingVoterTest +{ + private MemberId myself = member( 0 ); + + /* A few members that we use at will in tests. */ + private MemberId member1 = member( 1 ); + private MemberId member2 = member( 2 ); + + @Test + public void shouldRespondPositivelyIfWouldVoteForCandidate() throws Exception + { + // given + RaftState raftState = initialState(); + + // when + Outcome outcome = new Follower().handle( new RaftMessages.PreVote.Request( member1, 0, member1, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member1 ); + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( true ) ); + } + + @Test + public void shouldRespondPositivelyEvenIfAlreadyVotedInRealElection() throws Exception + { + // given + RaftState raftState = initialState(); + raftState.update( new Follower().handle( new RaftMessages.Vote.Request( member1, 0, member1, 0, 0 ), raftState, log() ) ); + + // when + Outcome outcome = new Follower().handle( new RaftMessages.PreVote.Request( member2, 0, member2, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member2 ); + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( true ) ); + } + + @Test + public void shouldRespondNegativelyIfLeaderAndRequestNotFromGreaterTerm() throws Exception + { + // given + RaftState raftState = initialState(); + + // when + Outcome outcome = new Leader().handle( new RaftMessages.PreVote.Request( member1, Long.MIN_VALUE, member1, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member1 ); + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( false ) ); + } + + @Test + public void shouldRespondPositivelyIfLeaderAndRequestFromGreaterTerm() throws Exception + { + // given + RaftState raftState = initialState(); + + // when + Outcome outcome = new Leader().handle( new RaftMessages.PreVote.Request( member1, Long.MAX_VALUE, member1, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member1 ); + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( true ) ); + } + + @Test + public void shouldRespondNegativelyIfNotInPreVoteMyself() throws Exception + { + // given + RaftState raftState = raftState() + .myself( myself ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2 ) ) + .setPreElection( false ) + .build(); + + // when + Outcome outcome = new Follower().handle( new RaftMessages.PreVote.Request( member1, 0, member1, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member1 ); + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( false ) ); + } + + @Test + public void shouldRespondNegativelyIfWouldNotVoteForCandidate() throws Exception + { + // given + RaftState raftState = raftState() + .myself( myself ) + .term( 1 ) + .setPreElection( true ) + .supportsPreVoting( true ) + .votingMembers( asSet( myself, member1, member2 ) ) + .build(); + + // when + Outcome outcome = new Follower().handle( new RaftMessages.PreVote.Request( member1, 0, member1, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member1 ); + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( false ) ); + } + + @Test + public void shouldRespondPositivelyToMultipleMembersIfWouldVoteForAny() throws Exception + { + // given + RaftState raftState = initialState(); + + // when + Outcome outcome1 = new Follower().handle( new RaftMessages.PreVote.Request( member1, 0, member1, 0, 0 ), raftState, log() ); + raftState.update( outcome1 ); + Outcome outcome2 = new Follower().handle( new RaftMessages.PreVote.Request( member2, 0, member2, 0, 0 ), raftState, log() ); + raftState.update( outcome2 ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome2, member2 ); + + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).voteGranted() , equalTo( true ) ); + } + + @Test + public void shouldUseTermFromRequestIfHigherThanOwn() throws Exception + { + // given + RaftState raftState = initialState(); + long newTerm = 1; + + // when + Outcome outcome = new Follower().handle( new RaftMessages.PreVote.Request( member1, newTerm, member1, 0, 0 ), raftState, log() ); + + // then + RaftMessages.RaftMessage raftMessage = messageFor( outcome, member1 ); + + assertThat( raftMessage.type(), equalTo( RaftMessages.Type.PRE_VOTE_RESPONSE ) ); + assertThat( ( (RaftMessages.PreVote.Response)raftMessage ).term() , equalTo( newTerm ) ); + } + + private RaftState initialState() throws IOException + { + return raftState() + .myself( myself ) + .supportsPreVoting( true ) + .setPreElection( true ) + .votingMembers( asSet( myself, member1, member2 ) ) + .build(); + } + + private Log log() + { + return NullLogProvider.getInstance().getLog( getClass() ); + } +} diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateBuilder.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateBuilder.java index 260c38f1b47d2..1e8de68a53399 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateBuilder.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateBuilder.java @@ -58,10 +58,13 @@ public static RaftStateBuilder raftState() public long leaderCommit = -1; private MemberId votedFor; private RaftLog entryLog = new InMemoryRaftLog(); + private boolean supportPreVoting; private Set votesForMe = emptySet(); + private Set preVotesForMe = emptySet(); private long lastLogIndexBeforeWeBecameLeader = -1; public long commitIndex = -1; private FollowerStates followerStates = new FollowerStates<>(); + private boolean isPreElection = false; public RaftStateBuilder myself( MemberId myself ) { @@ -117,6 +120,12 @@ public RaftStateBuilder votesForMe( Set votesForMe ) return this; } + public RaftStateBuilder supportsPreVoting( boolean supportPreVoting ) + { + this.supportPreVoting = supportPreVoting; + return this; + } + public RaftStateBuilder lastLogIndexBeforeWeBecameLeader( long lastLogIndexBeforeWeBecameLeader ) { this.lastLogIndexBeforeWeBecameLeader = lastLogIndexBeforeWeBecameLeader; @@ -129,6 +138,12 @@ public RaftStateBuilder commitIndex( long commitIndex ) return this; } + public RaftStateBuilder setPreElection( boolean isPreElection ) + { + this.isPreElection = isPreElection; + return this; + } + public RaftState build() throws IOException { StateStorage termStore = new InMemoryStateStorage<>( new TermState() ); @@ -136,14 +151,14 @@ public RaftState build() throws IOException StubMembership membership = new StubMembership( votingMembers, replicationMembers ); RaftState state = new RaftState( myself, termStore, membership, entryLog, - voteStore, new ConsecutiveInFlightCache(), NullLogProvider.getInstance() ); + voteStore, new ConsecutiveInFlightCache(), NullLogProvider.getInstance(), supportPreVoting ); Collection noMessages = Collections.emptyList(); List noLogCommands = Collections.emptyList(); - state.update( new Outcome( null, term, leader, leaderCommit, votedFor, votesForMe, + state.update( new Outcome( null, term, leader, leaderCommit, votedFor, votesForMe, preVotesForMe, lastLogIndexBeforeWeBecameLeader, followerStates, false, noLogCommands, - noMessages, emptySet(), commitIndex, emptySet() ) ); + noMessages, emptySet(), commitIndex, emptySet(), isPreElection ) ); return state; } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java index e714c189d3b6b..a4f879fee1bb1 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/state/RaftStateTest.java @@ -66,7 +66,7 @@ public void shouldUpdateCacheState() throws Exception InFlightCache cache = new ConsecutiveInFlightCache(); RaftState raftState = new RaftState( member( 0 ), new InMemoryStateStorage<>( new TermState() ), new FakeMembership(), new InMemoryRaftLog(), - new InMemoryStateStorage<>( new VoteState() ), cache, NullLogProvider.getInstance() ); + new InMemoryStateStorage<>( new VoteState() ), cache, NullLogProvider.getInstance(), false ); List logCommands = new LinkedList() {{ @@ -79,8 +79,8 @@ public void shouldUpdateCacheState() throws Exception }}; Outcome raftTestMemberOutcome = - new Outcome( CANDIDATE, 0, null, -1, null, emptySet(), -1, initialFollowerStates(), true, - logCommands, emptyOutgoingMessages(), emptySet(), -1, emptySet() ); + new Outcome( CANDIDATE, 0, null, -1, null, emptySet(), emptySet(), -1, initialFollowerStates(), true, + logCommands, emptyOutgoingMessages(), emptySet(), -1, emptySet(), false ); //when raftState.update(raftTestMemberOutcome); @@ -101,14 +101,15 @@ public void shouldRemoveFollowerStateAfterBecomingLeader() throws Exception new InMemoryStateStorage<>( new TermState() ), new FakeMembership(), new InMemoryRaftLog(), new InMemoryStateStorage<>( new VoteState( ) ), - new ConsecutiveInFlightCache(), NullLogProvider.getInstance() ); + new ConsecutiveInFlightCache(), NullLogProvider.getInstance(), + false ); - raftState.update( new Outcome( CANDIDATE, 1, null, -1, null, emptySet(), -1, initialFollowerStates(), true, emptyLogCommands(), - emptyOutgoingMessages(), emptySet(), -1, emptySet() ) ); + raftState.update( new Outcome( CANDIDATE, 1, null, -1, null, emptySet(), emptySet(), -1, initialFollowerStates(), true, emptyLogCommands(), + emptyOutgoingMessages(), emptySet(), -1, emptySet(), false ) ); // when - raftState.update( new Outcome( CANDIDATE, 1, null, -1, null, emptySet(), -1, new FollowerStates<>(), true, emptyLogCommands(), - emptyOutgoingMessages(), emptySet(), -1, emptySet() ) ); + raftState.update( new Outcome( CANDIDATE, 1, null, -1, null, emptySet(), emptySet(), -1, new FollowerStates<>(), true, emptyLogCommands(), + emptyOutgoingMessages(), emptySet(), -1, emptySet(), false ) ); // then assertEquals( 0, raftState.followerStates().size() ); diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/vote/VotingTest.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/vote/VotingTest.java index e6471f6a801d5..a37b3480d9861 100644 --- a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/vote/VotingTest.java +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/core/consensus/vote/VotingTest.java @@ -20,9 +20,8 @@ package org.neo4j.causalclustering.core.consensus.vote; import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.runners.MockitoJUnitRunner; +import java.util.Optional; import java.util.UUID; import org.neo4j.causalclustering.core.consensus.roles.Voting; @@ -33,7 +32,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -@RunWith(MockitoJUnitRunner.class) public class VotingTest { MemberId candidate = new MemberId( UUID.randomUUID() ); @@ -56,7 +54,7 @@ public void shouldAcceptRequestWithIdenticalLog() logTerm, appendIndex, appendIndex, - null, + Optional.empty(), log ) ); } @@ -72,7 +70,7 @@ public void shouldRejectRequestFromOldTerm() logTerm, appendIndex, appendIndex, - null, + Optional.empty(), log ) ); } @@ -88,7 +86,7 @@ public void shouldRejectRequestIfCandidateLogEndsAtLowerTerm() logTerm - 1, appendIndex, appendIndex, - null, + Optional.empty(), log ) ); } @@ -104,7 +102,7 @@ public void shouldRejectRequestIfLogsEndInSameTermButCandidateLogIsShorter() logTerm, appendIndex, appendIndex - 1, - null, + Optional.empty(), log ) ); } @@ -120,7 +118,7 @@ public void shouldAcceptRequestIfLogsEndInSameTermAndCandidateLogIsSameLength() logTerm, appendIndex, appendIndex, - null, + Optional.empty(), log ) ); } @@ -136,7 +134,7 @@ public void shouldAcceptRequestIfLogsEndInSameTermAndCandidateLogIsLonger() logTerm, appendIndex, appendIndex + 1, - null, + Optional.empty(), log ) ); } @@ -152,7 +150,7 @@ public void shouldAcceptRequestIfLogsEndInHigherTermAndCandidateLogIsShorter() logTerm + 1, appendIndex, appendIndex - 1, - null, + Optional.empty(), log ) ); } @@ -168,7 +166,7 @@ public void shouldAcceptRequestIfLogEndsAtHigherTermAndCandidateLogIsSameLength( logTerm + 1, appendIndex, appendIndex, - null, + Optional.empty(), log ) ); } @@ -184,7 +182,7 @@ public void shouldAcceptRequestIfLogEndsAtHigherTermAndCandidateLogIsLonger() logTerm + 1, appendIndex, appendIndex + 1, - null, + Optional.empty(), log ) ); } @@ -200,7 +198,7 @@ public void shouldRejectRequestIfAlreadyVotedForOtherCandidate() logTerm, appendIndex, appendIndex, - otherMember, + Optional.of( otherMember ), log ) ); } @@ -216,7 +214,7 @@ public void shouldAcceptRequestIfAlreadyVotedForCandidate() logTerm, appendIndex, appendIndex, - candidate, + Optional.of( candidate ), log ) ); } diff --git a/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/PreElectionIT.java b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/PreElectionIT.java new file mode 100644 index 0000000000000..1919298338ce3 --- /dev/null +++ b/enterprise/causal-clustering/src/test/java/org/neo4j/causalclustering/scenarios/PreElectionIT.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Neo4j is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package org.neo4j.causalclustering.scenarios; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.neo4j.causalclustering.core.CausalClusteringSettings; +import org.neo4j.causalclustering.core.consensus.roles.Role; +import org.neo4j.causalclustering.discovery.Cluster; +import org.neo4j.causalclustering.discovery.CoreClusterMember; +import org.neo4j.test.Race; +import org.neo4j.test.assertion.Assert; +import org.neo4j.test.causalclustering.ClusterRule; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class PreElectionIT +{ + @Rule + public ClusterRule clusterRule = new ClusterRule( getClass() ) + .withNumberOfCoreMembers( 3 ) + .withNumberOfReadReplicas( 0 ) + .withSharedCoreParam( CausalClusteringSettings.leader_election_timeout, "10s" ) + .withSharedCoreParam( CausalClusteringSettings.enable_pre_voting, "true" ); + + private Cluster cluster; + + @Before + public void setUp() throws Exception + { + cluster = clusterRule.startCluster(); + } + + @Test + public void shouldActuallyStartAClusterWithPreVoting() throws Exception + { + // pass + } + + @Test + public void shouldStartAnElectionIfAllServersHaveTimedOutOnHeartbeats() throws Exception + { + Collection> futures = new ArrayList<>( cluster.coreMembers().size() ); + + // given + long initialTerm = cluster.awaitLeader().raft().term(); + + // when + for ( CoreClusterMember member : cluster.coreMembers() ) + { + if ( Role.FOLLOWER == member.raft().currentRole() ) + { + futures.add( CompletableFuture.runAsync( Race.throwing( () -> member.raft().triggerElection() ) ) ); + } + } + + // then + Assert.assertEventually( + "Should be on a new term following an election", + () -> cluster.awaitLeader().raft().term(), not( equalTo( initialTerm ) ), + 1, + TimeUnit.MINUTES ); + + // cleanup + for ( CompletableFuture future : futures ) + { + future.cancel( false ); + } + } + + @Test + public void shouldNotStartAnElectionIfAMinorityOfServersHaveTimedOutOnHeartbeats() throws Exception + { + // given + CoreClusterMember follower = cluster.awaitCoreMemberWithRole( Role.FOLLOWER, 1, TimeUnit.MINUTES ); + + // when + follower.raft().triggerElection(); + + // then + try + { + cluster.awaitCoreMemberWithRole( Role.CANDIDATE, 1, TimeUnit.MINUTES ); + fail( "Should not have started an election if less than a quorum have timed out" ); + } + catch ( TimeoutException e ) + { + // pass + } + } + + @Test + public void shouldStartElectionIfLeaderRemoved() throws Exception + { + // given + CoreClusterMember oldLeader = cluster.awaitLeader(); + + // when + cluster.removeCoreMember( oldLeader ); + + // then + CoreClusterMember newLeader = cluster.awaitLeader(); + + assertThat( newLeader.serverId(), not( equalTo( oldLeader.serverId() ) ) ); + } +}