diff --git a/server/raft.go b/server/raft.go index b98db1976b..aa3151ee2d 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1481,6 +1481,7 @@ func (n *raft) shutdown(shouldDelete bool) { n.Unlock() return } + close(n.quit) if c := n.c; c != nil { var subs []*subscription @@ -2346,7 +2347,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.Unlock() return } - n.debug("Snapshot sent, reset first entry to %d", lastIndex) + n.debug("Snapshot sent, reset first catchup entry to %d", lastIndex) } } @@ -2646,14 +2647,13 @@ func (n *raft) runAsCandidate() { // if we observe a bigger term, we should start over again or risk forming a quorum fully knowing // someone with a better term exists. This is even the right thing to do if won == true. n.Lock() + n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term) n.term = vresp.term n.vote = noVote n.writeTermVote() - n.debug("Stepping down from candidate, detected higher term: %d vs %d", vresp.term, n.term) n.stepdown.push(noLeader) n.lxfer = false n.Unlock() - return } case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). @@ -2746,6 +2746,13 @@ func (n *raft) truncateWAL(term, index uint64) { os.Remove(n.snapfile) n.snapfile = _EMPTY_ } + // Make sure to reset commit and applied if above + if n.commit > n.pindex { + n.commit = n.pindex + } + if n.applied > n.commit { + n.applied = n.commit + } }() if err := n.wal.Truncate(index); err != nil { @@ -2763,10 +2770,10 @@ func (n *raft) truncateWAL(term, index uint64) { // Set after we know we have truncated properly. n.pterm, n.pindex = term, index - } // Reset our WAL. +// Lock should be held. func (n *raft) resetWAL() { n.truncateWAL(0, 0) } @@ -2782,7 +2789,6 @@ func (n *raft) updateLeader(newLeader string) { // processAppendEntry will process an appendEntry. func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.Lock() - // Don't reset here if we have been asked to assume leader position. if !n.lxfer { n.resetElectionTimeout() @@ -2912,22 +2918,17 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var ar *appendEntryResponse var success bool - eae, err := n.loadEntry(ae.pindex) - // If terms mismatched, or we got an error loading, delete that entry and all others past it. - if eae != nil && ae.pterm > eae.pterm || err != nil { - // Truncate will reset our pterm and pindex. Only do so if we have an entry. - if eae != nil { - n.truncateWAL(ae.pterm, ae.pindex) - } - // Make sure to cancel any catchups in progress. - if catchingUp { - n.cancelCatchup() - } + if eae, _ := n.loadEntry(ae.pindex); eae == nil { + n.resetWAL() } else { - // Inherit regardless. - n.pterm = ae.pterm - success = true + // If terms mismatched, or we got an error loading, delete that entry and all others past it. + // Make sure to cancel any catchups in progress. + // Truncate will reset our pterm and pindex. Only do so if we have an entry. + n.truncateWAL(ae.pterm, ae.pindex) } + // Cancel regardless. + n.cancelCatchup() + // Create response. ar = &appendEntryResponse{ae.pterm, ae.pindex, n.id, success, _EMPTY_} n.Unlock() @@ -3560,10 +3561,15 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { // Only way we get to yes is through here. voteOk := n.vote == noVote || n.vote == vr.candidate - if voteOk && vr.lastTerm >= n.pterm && vr.lastIndex >= n.pindex { + if voteOk && (vr.lastTerm > n.pterm || vr.lastTerm == n.pterm && vr.lastIndex >= n.pindex) { vresp.granted = true n.vote = vr.candidate n.writeTermVote() + } else { + if vr.term >= n.term && n.vote == noVote { + n.term = vr.term + n.resetElect(randCampaignTimeout()) + } } n.Unlock()