Skip to content

Commit

Permalink
General raft improvements under heavy corruption.
Browse files Browse the repository at this point in the history
Do not exit candidate state in place when stepping down, would cause double vote requests.
When truncating our WAL make sure to adjust commit and applied as needed.
On a miss where the index is less than ours, if we can not find the entry reset our state.
For a vote, if last processed term is higher than ours always agree if no vote has been cast.
If terms are equal make sure the requestor's index is at least as high as ours.
If we decide not to vote for someone, and we have not voted and we are a better fit, move forward with a campaign.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Mar 2, 2023
1 parent 4e3b983 commit c586014
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,7 @@ func (n *raft) shutdown(shouldDelete bool) {
n.Unlock()
return
}

close(n.quit)
if c := n.c; c != nil {
var subs []*subscription
Expand Down Expand Up @@ -2346,7 +2347,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
n.Unlock()
return
}
n.debug("Snapshot sent, reset first entry to %d", lastIndex)
n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex)
}
}

Expand Down Expand Up @@ -2646,14 +2647,13 @@ func (n *raft) runAsCandidate() {
// if we observe a bigger term, we should start over again or risk forming a quorum fully knowing
// someone with a better term exists. This is even the right thing to do if won == true.
n.Lock()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.term = vresp.term
n.vote = noVote
n.writeTermVote()
n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term)
n.stepdown.push(noLeader)
n.lxfer = false
n.Unlock()
return
}
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
Expand Down Expand Up @@ -2746,6 +2746,13 @@ func (n *raft) truncateWAL(term, index uint64) {
os.Remove(n.snapfile)
n.snapfile = _EMPTY_
}
// Make sure to reset commit and applied if above
if n.commit > n.pindex {
n.commit = n.pindex
}
if n.applied > n.commit {
n.applied = n.commit
}
}()

if err := n.wal.Truncate(index); err != nil {
Expand All @@ -2763,10 +2770,10 @@ func (n *raft) truncateWAL(term, index uint64) {

// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index

}

// Reset our WAL.
// Lock should be held.
func (n *raft) resetWAL() {
n.truncateWAL(0, 0)
}
Expand All @@ -2782,7 +2789,6 @@ func (n *raft) updateLeader(newLeader string) {
// processAppendEntry will process an appendEntry.
func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.Lock()

// Don't reset here if we have been asked to assume leader position.
if !n.lxfer {
n.resetElectionTimeout()
Expand Down Expand Up @@ -2912,22 +2918,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var ar *appendEntryResponse

var success bool
eae, err := n.loadEntry(ae.pindex)
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
if eae != nil && ae.pterm > eae.pterm || err != nil {
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
if eae != nil {
n.truncateWAL(ae.pterm, ae.pindex)
}
// Make sure to cancel any catchups in progress.
if catchingUp {
n.cancelCatchup()
}
if eae, _ := n.loadEntry(ae.pindex); eae == nil {
n.resetWAL()
} else {
// Inherit regardless.
n.pterm = ae.pterm
success = true
// If terms mismatched, or we got an error loading, delete that entry and all others past it.
// Make sure to cancel any catchups in progress.
// Truncate will reset our pterm and pindex. Only do so if we have an entry.
n.truncateWAL(ae.pterm, ae.pindex)
}
// Cancel regardless.
n.cancelCatchup()

// Create response.
ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_}
n.Unlock()
Expand Down Expand Up @@ -3560,10 +3561,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {

// Only way we get to yes is through here.
voteOk := n.vote == noVote || n.vote == vr.candidate
if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex {
if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) {
vresp.granted = true
n.vote = vr.candidate
n.writeTermVote()
} else {
if vr.term >= n.term && n.vote == noVote {
n.term = vr.term
n.resetElect(randCampaignTimeout())
}
}
n.Unlock()

Expand Down

0 comments on commit c586014

Please sign in to comment.