Skip to content

Commit

Permalink
Update term if receive vote request from later term for all roles.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewkerr9000 committed Feb 6, 2018
1 parent 3997e28 commit 8e12b78
Show file tree
Hide file tree
Showing 18 changed files with 948 additions and 531 deletions.
Expand Up @@ -141,9 +141,30 @@ public int hashCode()
} }
} }


interface AnyVote
{
interface Request
{
long term();

long lastLogTerm();

long lastLogIndex();

MemberId candidate();
}

interface Response
{
long term();

boolean voteGranted();
}
}

interface Vote interface Vote
{ {
class Request extends BaseRaftMessage class Request extends BaseRaftMessage implements AnyVote.Request
{ {
private long term; private long term;
private MemberId candidate; private MemberId candidate;
Expand All @@ -159,6 +180,7 @@ public Request( MemberId from, long term, MemberId candidate, long lastLogIndex,
this.lastLogTerm = lastLogTerm; this.lastLogTerm = lastLogTerm;
} }


@Override
public long term() public long term()
{ {
return term; return term;
Expand Down Expand Up @@ -205,23 +227,26 @@ public String toString()
from, term, candidate, lastLogIndex, lastLogTerm ); from, term, candidate, lastLogIndex, lastLogTerm );
} }


@Override
public long lastLogTerm() public long lastLogTerm()
{ {
return lastLogTerm; return lastLogTerm;
} }


@Override
public long lastLogIndex() public long lastLogIndex()
{ {
return lastLogIndex; return lastLogIndex;
} }


@Override
public MemberId candidate() public MemberId candidate()
{ {
return candidate; return candidate;
} }
} }


class Response extends BaseRaftMessage class Response extends BaseRaftMessage implements AnyVote.Response
{ {
private long term; private long term;
private boolean voteGranted; private boolean voteGranted;
Expand Down Expand Up @@ -271,11 +296,13 @@ public String toString()
return format( "Vote.Response from %s {term=%d, voteGranted=%s}", from, term, voteGranted ); return format( "Vote.Response from %s {term=%d, voteGranted=%s}", from, term, voteGranted );
} }


@Override
public long term() public long term()
{ {
return term; return term;
} }


@Override
public boolean voteGranted() public boolean voteGranted()
{ {
return voteGranted; return voteGranted;
Expand All @@ -285,7 +312,7 @@ public boolean voteGranted()


interface PreVote interface PreVote
{ {
class Request extends BaseRaftMessage class Request extends BaseRaftMessage implements AnyVote.Request
{ {
private long term; private long term;
private MemberId candidate; private MemberId candidate;
Expand All @@ -301,6 +328,7 @@ public Request( MemberId from, long term, MemberId candidate, long lastLogIndex,
this.lastLogTerm = lastLogTerm; this.lastLogTerm = lastLogTerm;
} }


@Override
public long term() public long term()
{ {
return term; return term;
Expand Down Expand Up @@ -347,23 +375,26 @@ public String toString()
from, term, candidate, lastLogIndex, lastLogTerm ); from, term, candidate, lastLogIndex, lastLogTerm );
} }


@Override
public long lastLogTerm() public long lastLogTerm()
{ {
return lastLogTerm; return lastLogTerm;
} }


@Override
public long lastLogIndex() public long lastLogIndex()
{ {
return lastLogIndex; return lastLogIndex;
} }


@Override
public MemberId candidate() public MemberId candidate()
{ {
return candidate; return candidate;
} }
} }


class Response extends BaseRaftMessage class Response extends BaseRaftMessage implements AnyVote.Response
{ {
private long term; private long term;
private boolean voteGranted; private boolean voteGranted;
Expand Down Expand Up @@ -413,11 +444,13 @@ public String toString()
return format( "PreVote.Response from %s {term=%d, voteGranted=%s}", from, term, voteGranted ); return format( "PreVote.Response from %s {term=%d, voteGranted=%s}", from, term, voteGranted );
} }


@Override
public long term() public long term()
{ {
return term; return term;
} }


@Override
public boolean voteGranted() public boolean voteGranted()
{ {
return voteGranted; return voteGranted;
Expand Down
Expand Up @@ -157,8 +157,19 @@ public Outcome handle( RaftMessages.Timeout.Election election ) throws IOExcepti
} }


@Override @Override
public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException public Outcome handle( RaftMessages.PreVote.Request req ) throws IOException
{ {
if ( ctx.supportPreVoting() )
{
if ( req.term() > ctx.term() )
{
outcome.getVotesForMe().clear();
outcome.setNextRole( FOLLOWER );
log.info( "Moving to FOLLOWER state after receiving pre vote request from %s at term %d (I am at %d)",
req.from(), req.term(), ctx.term() );
}
Voting.declinePreVoteRequest( ctx, outcome, req );
}
return outcome; return outcome;
} }


Expand Down
Expand Up @@ -285,9 +285,7 @@ private static class PreVoteRequestDecliningHandler implements PreVoteRequestHan
@Override @Override
public Outcome handle( RaftMessages.PreVote.Request request, Outcome outcome, ReadableRaftState ctx, Log log ) throws IOException public Outcome handle( RaftMessages.PreVote.Request request, Outcome outcome, ReadableRaftState ctx, Log log ) throws IOException
{ {
// TODO should we look out for later terms? Voting.declinePreVoteRequest( ctx, outcome, request );
outcome.addOutgoingMessage(
new RaftMessages.Directed( request.from(), new RaftMessages.PreVote.Response( ctx.myself(), outcome.getTerm(), false ) ) );
return outcome; return outcome;
} }


Expand Down
Expand Up @@ -280,20 +280,17 @@ public Outcome handle( RaftMessages.Vote.Response response ) throws IOException
} }


@Override @Override
public Outcome handle( RaftMessages.PreVote.Request request ) throws IOException public Outcome handle( RaftMessages.PreVote.Request req ) throws IOException
{ {
if ( request.term() > ctx.term() ) if ( ctx.supportPreVoting() )
{ {
log.info( if ( req.term() > ctx.term() )
"Considering pre vote request at term %d (my term is " + "%d) from %s", {
request.term(), ctx.term(), request.from() ); stepDownToFollower( outcome );

log.info( "Moving to FOLLOWER state after receiving pre vote request from %s at term %d (I am at %d)",
Voting.handlePreVoteRequest( ctx, outcome, request, log ); req.from(), req.term(), ctx.term() );
} }
else Voting.declinePreVoteRequest( ctx, outcome, req );
{
outcome.addOutgoingMessage( new RaftMessages.Directed( request.from(),
new RaftMessages.PreVote.Response( ctx.myself(), ctx.term(), false ) ) );
} }
return outcome; return outcome;
} }
Expand Down
Expand Up @@ -20,12 +20,12 @@
package org.neo4j.causalclustering.core.consensus.roles; package org.neo4j.causalclustering.core.consensus.roles;


import java.io.IOException; import java.io.IOException;
import java.util.Optional;


import org.neo4j.causalclustering.core.consensus.RaftMessages; import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.outcome.Outcome; import org.neo4j.causalclustering.core.consensus.outcome.Outcome;
import org.neo4j.causalclustering.core.consensus.state.ReadableRaftState; import org.neo4j.causalclustering.core.consensus.state.ReadableRaftState;
import org.neo4j.causalclustering.identity.MemberId; import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.function.ThrowingBooleanSupplier;
import org.neo4j.logging.Log; import org.neo4j.logging.Log;


public class Voting public class Voting
Expand All @@ -39,10 +39,8 @@ static void handleVoteRequest( ReadableRaftState state, Outcome outcome,
outcome.setVotedFor( null ); outcome.setVotedFor( null );
} }


boolean willVoteForCandidate = shouldVoteFor( voteRequest.candidate(), outcome.getTerm(), voteRequest.term(), boolean votedForAnother = outcome.getVotedFor() != null && !outcome.getVotedFor().equals( voteRequest.candidate() );
state.entryLog().readEntryTerm( state.entryLog().appendIndex() ), voteRequest.lastLogTerm(), boolean willVoteForCandidate = shouldVoteFor( state, outcome, voteRequest, votedForAnother, log );
state.entryLog().appendIndex(), voteRequest.lastLogIndex(),
Optional.ofNullable( outcome.getVotedFor() ), log );


if ( willVoteForCandidate ) if ( willVoteForCandidate )
{ {
Expand All @@ -57,27 +55,60 @@ static void handleVoteRequest( ReadableRaftState state, Outcome outcome,


static void handlePreVoteRequest( ReadableRaftState state, Outcome outcome, static void handlePreVoteRequest( ReadableRaftState state, Outcome outcome,
RaftMessages.PreVote.Request voteRequest, Log log ) throws IOException RaftMessages.PreVote.Request voteRequest, Log log ) throws IOException
{
ThrowingBooleanSupplier<IOException> willVoteForCandidate =
() -> shouldVoteFor( state, outcome, voteRequest, false, log );
respondToPreVoteRequest( state, outcome, voteRequest, willVoteForCandidate );
}

static void declinePreVoteRequest( ReadableRaftState state, Outcome outcome,
RaftMessages.PreVote.Request voteRequest ) throws IOException
{
respondToPreVoteRequest( state, outcome, voteRequest, () -> false );
}

private static void respondToPreVoteRequest( ReadableRaftState state, Outcome outcome,
RaftMessages.PreVote.Request voteRequest, ThrowingBooleanSupplier<IOException> willVoteFor ) throws IOException
{ {
if ( voteRequest.term() > state.term() ) if ( voteRequest.term() > state.term() )
{ {
outcome.setNextTerm( voteRequest.term() ); outcome.setNextTerm( voteRequest.term() );
} }


boolean willVoteForCandidate = shouldVoteFor( voteRequest.candidate(), outcome.getTerm(), voteRequest.term(),
state.entryLog().readEntryTerm( state.entryLog().appendIndex() ), voteRequest.lastLogTerm(),
state.entryLog().appendIndex(), voteRequest.lastLogIndex(),
Optional.empty(), log );

outcome.addOutgoingMessage( new RaftMessages.Directed( voteRequest.from(), new RaftMessages.PreVote.Response( outcome.addOutgoingMessage( new RaftMessages.Directed( voteRequest.from(), new RaftMessages.PreVote.Response(
state.myself(), outcome.getTerm(), state.myself(), outcome.getTerm(),
willVoteForCandidate ) ) ); willVoteFor.getAsBoolean() ) ) );
}

private static boolean shouldVoteFor( ReadableRaftState state, Outcome outcome, RaftMessages.AnyVote.Request voteRequest,
boolean committedToVotingForAnother, Log log )
throws IOException
{
long requestTerm = voteRequest.term();
MemberId candidate = voteRequest.candidate();
long requestLastLogTerm = voteRequest.lastLogTerm();
long requestLastLogIndex = voteRequest.lastLogIndex();
long contextTerm = outcome.getTerm();
long contextLastAppended = state.entryLog().appendIndex();
long contextLastLogTerm = state.entryLog().readEntryTerm( contextLastAppended );

return shouldVoteFor(
candidate,
contextTerm,
requestTerm,
contextLastLogTerm,
requestLastLogTerm,
contextLastAppended,
requestLastLogIndex,
committedToVotingForAnother,
log
);
} }


@SuppressWarnings( "OptionalUsedAsFieldOrParameterType" )
public static boolean shouldVoteFor( MemberId candidate, long contextTerm, long requestTerm, public static boolean shouldVoteFor( MemberId candidate, long contextTerm, long requestTerm,
long contextLastLogTerm, long requestLastLogTerm, long contextLastLogTerm, long requestLastLogTerm,
long contextLastAppended, long requestLastLogIndex, long contextLastAppended, long requestLastLogIndex,
Optional<MemberId> votedFor, Log log ) boolean committedToVotingForAnother, Log log )
{ {
if ( requestTerm < contextTerm ) if ( requestTerm < contextTerm )
{ {
Expand All @@ -92,17 +123,16 @@ public static boolean shouldVoteFor( MemberId candidate, long contextTerm, long
boolean requesterLogUpToDate = requestLogEndsAtHigherTerm || boolean requesterLogUpToDate = requestLogEndsAtHigherTerm ||
(logsEndAtSameTerm && requestLogAtLeastAsLongAsMyLog); (logsEndAtSameTerm && requestLogAtLeastAsLongAsMyLog);


boolean votedForOtherInSameTerm = requestTerm == contextTerm && boolean votedForOtherInSameTerm = requestTerm == contextTerm && committedToVotingForAnother;
votedFor.map( member -> !member.equals( candidate ) ).orElse( false );


boolean shouldVoteFor = requesterLogUpToDate && !votedForOtherInSameTerm; boolean shouldVoteFor = requesterLogUpToDate && !votedForOtherInSameTerm;


log.debug( "Should vote for raft candidate %s: " + log.debug( "Should vote for raft candidate %s: " +
"requester log up to date: %s (request last log term: %s, context last log term: %s, request last log index: %s, context last append: %s) " + "requester log up to date: %s (request last log term: %s, context last log term: %s, request last log index: %s, context last append: %s) " +
"voted for other in same term: %s (request term: %s, context term: %s, votedFor: %s)", "voted for other in same term: %s (request term: %s, context term: %s, voted for another: %s)",
shouldVoteFor, shouldVoteFor,
requesterLogUpToDate, requestLastLogTerm, contextLastLogTerm, requestLastLogIndex, contextLastAppended, requesterLogUpToDate, requestLastLogTerm, contextLastLogTerm, requestLastLogIndex, contextLastAppended,
votedForOtherInSameTerm, requestTerm, contextTerm, votedFor ); votedForOtherInSameTerm, requestTerm, contextTerm, committedToVotingForAnother );


return shouldVoteFor; return shouldVoteFor;
} }
Expand Down
Expand Up @@ -21,6 +21,8 @@


import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesRequestBuilder; import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesRequestBuilder;
import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesResponseBuilder; import org.neo4j.causalclustering.core.consensus.roles.AppendEntriesResponseBuilder;
import org.neo4j.causalclustering.core.consensus.vote.PreVoteRequestBuilder;
import org.neo4j.causalclustering.core.consensus.vote.PreVoteResponseBuilder;
import org.neo4j.causalclustering.core.consensus.vote.VoteRequestBuilder; import org.neo4j.causalclustering.core.consensus.vote.VoteRequestBuilder;
import org.neo4j.causalclustering.core.consensus.vote.VoteResponseBuilder; import org.neo4j.causalclustering.core.consensus.vote.VoteResponseBuilder;


Expand All @@ -46,8 +48,18 @@ public static VoteRequestBuilder voteRequest()
return new VoteRequestBuilder(); return new VoteRequestBuilder();
} }


public static PreVoteRequestBuilder preVoteRequest()
{
return new PreVoteRequestBuilder();
}

public static VoteResponseBuilder voteResponse() public static VoteResponseBuilder voteResponse()
{ {
return new VoteResponseBuilder(); return new VoteResponseBuilder();
} }

public static PreVoteResponseBuilder preVoteResponse()
{
return new PreVoteResponseBuilder();
}
} }

0 comments on commit 8e12b78

Please sign in to comment.