Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Raft core fixes
* Pre-vote task should use the correct term
* Schedule append request reset on any append request or install snapshot RPC
  • Loading branch information
metanet committed Feb 12, 2019
1 parent e7a869b commit 92b126f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 12 deletions.
Expand Up @@ -203,7 +203,7 @@ public void run() {
logger.fine("Starting raft node: " + localMember + " for " + groupId
+ " with " + state.memberCount() + " members: " + state.members());
}
raftIntegration.execute(new PreVoteTask(this));
raftIntegration.execute(new PreVoteTask(this, 0));

scheduleLeaderFailureDetection();
}
Expand Down Expand Up @@ -411,11 +411,7 @@ public void send(AppendFailureResponse response, Endpoint target) {
*/
public void broadcastAppendRequest() {
for (Endpoint follower : state.remoteMembers()) {
try {
sendAppendRequest(follower);
} catch (Throwable e) {
logger.severe(e);
}
sendAppendRequest(follower);
}
updateLastAppendEntriesTimestamp();
}
Expand Down Expand Up @@ -462,6 +458,7 @@ public void sendAppendRequest(Endpoint follower) {
+ " <= snapshot index: " + raftLog.snapshotIndex());
}
followerState.setMaxAppendRequestBackoff();
scheduleAppendAckResetTask();
raftIntegration.send(installSnapshot, follower);
return;
}
Expand Down Expand Up @@ -514,6 +511,7 @@ public void sendAppendRequest(Endpoint follower) {

if (setAppendRequestBackoff) {
followerState.setAppendRequestBackoff();
scheduleAppendAckResetTask();
}

send(request, follower);
Expand Down Expand Up @@ -922,7 +920,7 @@ final void resetLeaderAndStartElection() {

private void runPreVoteTask() {
if (state.preCandidateState() == null) {
new PreVoteTask(RaftNodeImpl.this).run();
new PreVoteTask(RaftNodeImpl.this, state.term()).run();
}
}
}
Expand Down
Expand Up @@ -35,8 +35,11 @@
*/
public class PreVoteTask extends RaftNodeStatusAwareTask implements Runnable {

public PreVoteTask(RaftNodeImpl raftNode) {
private int term;

public PreVoteTask(RaftNodeImpl raftNode, int term) {
super(raftNode);
this.term = term;
}

@Override
Expand All @@ -46,6 +49,9 @@ protected void innerRun() {
if (state.leader() != null) {
logger.fine("No new pre-vote phase, we already have a LEADER: " + state.leader());
return;
} else if (state.term() != term) {
logger.fine("No new pre-vote phase for term= " + term + " because of new term: " + state.term());
return;
}

Collection<Endpoint> remoteMembers = state.remoteMembers();
Expand Down Expand Up @@ -73,6 +79,6 @@ protected void innerRun() {
}

private void schedulePreVoteTimeout() {
raftNode.schedule(new PreVoteTimeoutTask(raftNode), raftNode.getLeaderElectionTimeoutInMillis());
raftNode.schedule(new PreVoteTimeoutTask(raftNode, term), raftNode.getLeaderElectionTimeoutInMillis());
}
}
Expand Up @@ -27,8 +27,11 @@
*/
public class PreVoteTimeoutTask extends RaftNodeStatusAwareTask implements Runnable {

PreVoteTimeoutTask(RaftNodeImpl raftNode) {
private int term;

PreVoteTimeoutTask(RaftNodeImpl raftNode, int term) {
super(raftNode);
this.term = term;
}

@Override
Expand All @@ -37,6 +40,6 @@ protected void innerRun() {
return;
}
logger.fine("Pre-vote for term: " + raftNode.state().term() + " has timed out!");
new PreVoteTask(raftNode).run();
new PreVoteTask(raftNode, term).run();
}
}
Expand Up @@ -85,7 +85,6 @@ public void run() {
handleRaftGroupCmd(newEntryLogIndex, operation);

raftNode.broadcastAppendRequest();
raftNode.scheduleAppendAckResetTask();
}

private boolean verifyRaftNodeStatus() {
Expand Down

0 comments on commit 92b126f

Please sign in to comment.