Skip to content

Commit

Permalink
Merge pull request #51 from youzan/fix-raft-apply-index
Browse files Browse the repository at this point in the history
Fix raft apply index not continued
  • Loading branch information
absolute8511 committed Jan 3, 2020
2 parents 96c2775 + 9f658ae commit fcecca4
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ PREFIX=/usr/local
DESTDIR=
BINDIR=${PREFIX}/bin
PROJECT?=github.com/youzan/ZanRedisDB
VERBINARY?= 0.7.1
VERBINARY?= 0.7.2
COMMIT?=$(shell git rev-parse --short HEAD)
BUILD_TIME?=$(shell date '+%Y-%m-%d_%H:%M:%S-%Z')
GOFLAGS=-ldflags "-X ${PROJECT}/common.VerBinary=${VERBINARY} -X ${PROJECT}/common.Commit=${COMMIT} -X ${PROJECT}/common.BuildTime=${BUILD_TIME}"
Expand Down
3 changes: 0 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ type customProposeData struct {

// a key-value node backed by raft
type KVNode struct {
reqProposeC *entryQueue
readyC chan struct{}
rn *raftNode
store *KVStore
Expand Down Expand Up @@ -234,7 +233,6 @@ func NewKVNode(kvopts *KVOptions, config *RaftConfig,
return nil, err
}
s := &KVNode{
reqProposeC: newEntryQueue(proposeQueueLen, 1),
readyC: make(chan struct{}, 1),
stopChan: stopChan,
stopDone: make(chan struct{}),
Expand Down Expand Up @@ -326,7 +324,6 @@ func (nd *KVNode) Stop() {
}
defer close(nd.stopDone)
close(nd.stopChan)
nd.reqProposeC.close()
nd.expireHandler.Stop()
nd.wg.Wait()
nd.rn.StopNode()
Expand Down
19 changes: 19 additions & 0 deletions node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type raftNode struct {
replayRunning int32
busySnapshot int32
loopServering int32
lastPublished uint64
}

// newRaftNode initiates a raft instance and returns a committed log entry
Expand Down Expand Up @@ -870,6 +871,9 @@ func (rc *raftNode) serveChannels() {
continue
}
rc.processReady(rd)
if rd.MoreCommittedEntries {
rc.node.NotifyEventCh()
}
}
}
}
Expand Down Expand Up @@ -925,6 +929,21 @@ func (rc *raftNode) processReady(rd raft.Ready) {
}
processedMsgs, hasRequestSnapMsg := rc.processMessages(rd.Messages)
if len(rd.CommittedEntries) > 0 || !raft.IsEmptySnap(rd.Snapshot) || hasRequestSnapMsg {
var newPublished uint64
if !raft.IsEmptySnap(rd.Snapshot) {
newPublished = rd.Snapshot.Metadata.Index
}
if len(rd.CommittedEntries) > 0 {
firsti := rd.CommittedEntries[0].Index
if rc.lastPublished != 0 && firsti > rc.lastPublished+1 {
e := fmt.Sprintf("%v first index of committed entry[%d] should <= last published[%d] + 1, snap: %v",
rc.Descrp(), firsti, rc.lastPublished, rd.Snapshot.Metadata.String())
rc.Errorf("%s", e)
rc.Errorf("raft node status: %v", rc.node.DebugString())
}
newPublished = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
}
rc.lastPublished = newPublished
rc.publishEntries(rd.CommittedEntries, rd.Snapshot, applySnapshotTransferResult, raftDone, applyWaitDone)
}
if !raft.IsEmptySnap(rd.Snapshot) {
Expand Down
1 change: 1 addition & 0 deletions node/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (n *nodeRecorder) ApplyConfChange(conf raftpb.ConfChange) *raftpb.ConfState
func (n *nodeRecorder) Stop() {
n.Record(testutil.Action{Name: "Stop"})
}
func (n *nodeRecorder) DebugString() string { return "" }

func (n *nodeRecorder) ReportUnreachable(id uint64, g raftpb.Group) {}

Expand Down
4 changes: 4 additions & 0 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (l *raftLog) hasNextEnts() bool {
return l.committed+1 > off
}

func (l *raftLog) hasMoreNextEnts(appliedTo uint64) bool {
return l.committed > appliedTo
}

func (l *raftLog) snapshot() (pb.Snapshot, error) {
if l.unstable.snapshot != nil {
return *l.unstable.snapshot, nil
Expand Down
83 changes: 67 additions & 16 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package raft

import (
"errors"
"fmt"
"time"

pb "github.com/youzan/ZanRedisDB/raft/raftpb"
Expand Down Expand Up @@ -83,7 +84,8 @@ type Ready struct {
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry

// Whether there are more committed entries ready to be applied.
MoreCommittedEntries bool
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
Expand Down Expand Up @@ -115,6 +117,19 @@ func (rd Ready) containsUpdates() bool {
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}

// appliedCursor extracts from the Ready the highest index the client has
// applied (once the Ready is confirmed via Advance). If no information is
// contained in the Ready, returns zero.
func (rd Ready) appliedCursor() uint64 {
if n := len(rd.CommittedEntries); n > 0 {
return rd.CommittedEntries[n-1].Index
}
if index := rd.Snapshot.Metadata.Index; index > 0 {
return index
}
return 0
}

type msgWithDrop struct {
m pb.Message
dropCB context.CancelFunc
Expand Down Expand Up @@ -185,6 +200,7 @@ type Node interface {
ReportSnapshot(id uint64, group pb.Group, status SnapshotStatus)
// Stop performs any necessary termination of the Node.
Stop()
DebugString() string
}

type Peer struct {
Expand Down Expand Up @@ -255,6 +271,8 @@ func StartNode(c *Config, peers []Peer, isLearner bool) Node {
n.logger = c.Logger
n.r = r
n.prevS = newPrevState(r)
off := max(r.raftLog.applied+1, r.raftLog.firstIndex())
n.lastSteppedIndex = off
n.NotifyEventCh()
return &n
}
Expand All @@ -270,25 +288,28 @@ func RestartNode(c *Config) Node {
n.logger = c.Logger
n.r = r
n.prevS = newPrevState(r)
off := max(r.raftLog.applied+1, r.raftLog.firstIndex())
n.lastSteppedIndex = off
n.NotifyEventCh()
return &n
}

// node is the canonical implementation of the Node interface
type node struct {
propQ *ProposalQueue
msgQ *MessageQueue
confc chan pb.ConfChange
confstatec chan pb.ConfState
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
eventNotifyCh chan bool
r *raft
prevS *prevState
newReadyFunc func(*raft, *SoftState, pb.HardState, bool) Ready
needAdvance bool
propQ *ProposalQueue
msgQ *MessageQueue
confc chan pb.ConfChange
confstatec chan pb.ConfState
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
eventNotifyCh chan bool
r *raft
prevS *prevState
newReadyFunc func(*raft, *SoftState, pb.HardState, bool) Ready
needAdvance bool
lastSteppedIndex uint64

logger Logger
}
Expand Down Expand Up @@ -361,11 +382,34 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) {
rd := n.newReadyFunc(n.r, n.prevS.prevSoftSt, n.prevS.prevHardSt, moreEntriesToApply)
if rd.containsUpdates() {
n.needAdvance = true
var stepIndex uint64
if !IsEmptySnap(rd.Snapshot) {
stepIndex = rd.Snapshot.Metadata.Index
}
if len(rd.CommittedEntries) > 0 {
fi := rd.CommittedEntries[0].Index
if n.lastSteppedIndex != 0 && fi > n.lastSteppedIndex+1 {
e := fmt.Sprintf("raft.node: %x(%v) index not continued: %v, %v, %v, snap:%v, prev: %v, logs: %v ",
n.r.id, n.r.group, fi, n.lastSteppedIndex, stepIndex, rd.Snapshot.Metadata.String(), n.prevS,
n.r.raftLog.String())
n.logger.Error(e)
}
stepIndex = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
}
n.lastSteppedIndex = stepIndex
return rd, true
}
return Ready{}, false
}

func (n *node) DebugString() string {
ents := n.r.raftLog.allEntries()
e := fmt.Sprintf("raft.node: %x(%v) index not continued: %v, prev: %v, logs: %v, %v ",
n.r.id, n.r.group, n.lastSteppedIndex, n.prevS, len(ents),
n.r.raftLog.String())
return e
}

func (n *node) handleLeaderUpdate(r *raft) bool {
lead := n.prevS.prevLead
needHandleProposal := lead != None
Expand Down Expand Up @@ -448,8 +492,11 @@ func (n *node) Advance(rd Ready) {
n.r.msgs = nil
n.r.readStates = nil

if n.prevS.prevHardSt.Commit != 0 {
n.r.raftLog.appliedTo(n.prevS.prevHardSt.Commit)
appliedI := rd.appliedCursor()
if appliedI != 0 {
// since the committed entries may less than the hard commit index due to the
// limit for buffer len, we should not use the hard state commit index.
n.r.raftLog.appliedTo(appliedI)
}
if n.prevS.havePrevLastUnstablei {
n.r.raftLog.stableTo(n.prevS.prevLastUnstablei, n.prevS.prevLastUnstablet)
Expand Down Expand Up @@ -722,6 +769,10 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState, moreEntri
if moreEntriesToApply {
rd.CommittedEntries = r.raftLog.nextEnts()
}
if len(rd.CommittedEntries) > 0 {
lastIndex := rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
rd.MoreCommittedEntries = r.raftLog.hasMoreNextEnts(lastIndex)
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
Expand Down
79 changes: 79 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package raft

import (
"bytes"
"math"
"reflect"
"testing"
"time"
Expand All @@ -25,6 +26,14 @@ import (
"golang.org/x/net/context"
)

type ignoreSizeHintMemStorage struct {
*MemoryStorage
}

func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
}

// TestNodeStep ensures that node.Step sends msgProp to propc chan
// and other kinds of messages to recvc chan.
func TestNodeStep(t *testing.T) {
Expand Down Expand Up @@ -935,3 +944,73 @@ func TestNodeProposeAddLearnerNode(t *testing.T) {
close(stop)
<-done
}

// TestNodeCommitPaginationAfterRestart regression tests a scenario in which the
// Storage's Entries size limitation is slightly more permissive than Raft's
// internal one. The original bug was the following:
//
// - node learns that index 11 (or 100, doesn't matter) is committed
// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However,
// index 10 already exceeds maxBytes, due to a user-provided impl of Entries.
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a different code path
// (since it is now called with an upper bound of 10) and removes the last entry.
// - Raft emits a HardState with a regressing commit index.
//
// A simpler version of this test would have the storage return a lot less entries than dictated
// by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression.
// This wouldn't need to exploit anything about Raft-internal code paths to fail.
func TestNodeCommitPaginationAfterRestart(t *testing.T) {
s := &ignoreSizeHintMemStorage{
MemoryStorage: NewRealMemoryStorage(),
}
persistedHardState := raftpb.HardState{
Term: 1,
Vote: 1,
Commit: 10,
}

s.hardState = persistedHardState
s.ents = make([]raftpb.Entry, 10)
var size uint64
for i := range s.ents {
ent := raftpb.Entry{
Term: 1,
Index: uint64(i + 1),
Type: raftpb.EntryNormal,
Data: []byte("a"),
}

s.ents[i] = ent
size += uint64(ent.Size())
}

cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
// Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should
// not be included in the initial rd.CommittedEntries. However, our storage will ignore
// this and *will* return it (which is how the Commit index ended up being 10 initially).
cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1

r := newRaft(cfg)
n := newNode()
defer s.Close()
n.r = r
n.prevS = newPrevState(r)
n.NotifyEventCh()
n.Campaign(context.TODO())

defer n.Stop()

rd, _ := n.StepNode(true, false)
if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n",
persistedHardState.Commit, rd.HardState.Commit,
)
}
}

// TestNodeCommitEntriesTooMuch check the commit index will be continued even
// if the apply commit channel is full
func TestNodeCommitEntriesTooMuch(t *testing.T) {
}
17 changes: 6 additions & 11 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,12 @@ func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
if rn.prevHardSt.Commit != 0 {
// In most cases, prevHardSt and rd.HardState will be the same
// because when there are new entries to apply we just sent a
// HardState with an updated Commit value. However, on initial
// startup the two are different because we don't send a HardState
// until something changes, but we do send any un-applied but
// committed entries (and previously-committed entries may be
// incorporated into the snapshot, even if rd.CommittedEntries is
// empty). Therefore we mark all committed entries as applied
// whether they were included in rd.HardState or not.
rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)
// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
rn.raft.raftLog.appliedTo(index)
}
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
Expand Down

0 comments on commit fcecca4

Please sign in to comment.