Skip to content

Commit

Permalink
[raw node]: fix read index clear bug
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Apr 17, 2018
1 parent d19d3e6 commit 438c277
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 25 deletions.
32 changes: 23 additions & 9 deletions raft/core/core_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (c *core) handleReadIndexRequest(msg *raftpd.Message) {
}

func (c *core) handleReadIndexResponse(msg *raftpd.Message) {
log.Debugf("%d [Term: %d, commit: %d] receive read index response from: %d [idx: %d]",
c.id, c.term, msg.From, msg.Index)

// TODO: check
readState := read.ReadState{
Index: msg.Index,
Expand All @@ -115,7 +118,7 @@ func (c *core) handleAppendEntries(msg *raftpd.Message) {
To: msg.From,
}
if c.log.CommitIndex() > msg.LogIndex {
log.Infof("%d [Term: %d, commit: %d] reject expired append Entries "+
log.Debugf("%d [Term: %d, commit: %d] reject expired append Entries "+
"from %d [logterm: %d, idx: %d]", c.id, c.term, c.log.CommitIndex(),
msg.From, msg.LogTerm, msg.LogIndex)

Expand All @@ -125,7 +128,7 @@ func (c *core) handleAppendEntries(msg *raftpd.Message) {
reply.RejectHint = c.log.CommitIndex()
reply.Reject = false
} else if idx, ok := c.log.TryAppend(msg.LogIndex, msg.LogTerm, msg.Entries); ok {
log.Infof("%d [Term: %d, commit: %d] accept append Entries "+
log.Debugf("%d [Term: %d, commit: %d] accept append Entries "+
"from %d [logterm: %d, idx: %d], last: %d", c.id, c.term, c.log.CommitIndex(),
msg.From, msg.LogTerm, msg.LogIndex, idx)

Expand All @@ -136,7 +139,7 @@ func (c *core) handleAppendEntries(msg *raftpd.Message) {
reply.RejectHint = idx /* idx is index of latest log entry */
reply.Reject = false
} else {
log.Infof("%d [logterm: %d, commit: %d, last idx: %d] rejected msgApp "+
log.Debugf("%d [logterm: %d, commit: %d, last idx: %d] rejected msgApp "+
"[logterm: %d, idx: %d] and hint %d from %d", c.id, c.log.Term(msg.LogIndex),
c.log.CommitIndex(), c.log.LastIndex(), msg.LogTerm, msg.LogIndex, idx, msg.From)

Expand Down Expand Up @@ -181,7 +184,7 @@ func (c *core) handleSnapshot(msg *raftpd.Message) {
Reject: false,
}
if c.tryRestore(msg.Snapshot) {
log.Infof("%x [commit: %d] restore snapshot [index: %d, term: %d]",
log.Debugf("%x [commit: %d] restore snapshot [index: %d, term: %d]",
c.id, c.log.CommitIndex(),
msg.Snapshot.Metadata.Index, msg.Snapshot.Metadata.Term)

Expand All @@ -192,7 +195,7 @@ func (c *core) handleSnapshot(msg *raftpd.Message) {
reply.Index = msg.Snapshot.Metadata.Index
reply.RejectHint = c.log.LastIndex()
} else {
log.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
log.Debugf("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
c.id, c.log.CommitIndex(),
msg.Snapshot.Metadata.Index, msg.Snapshot.Metadata.Term)

Expand All @@ -219,6 +222,8 @@ func (c *core) handleUnreachable(msg *raftpd.Message) {
}

func (c *core) handleHeartbeat(msg *raftpd.Message) {
log.Debugf("%d [term: %d] handle heartbeat request from %d", c.id, c.term, msg.From)

c.log.CommitTo(msg.Index)

reply := raftpd.Message{
Expand All @@ -231,21 +236,29 @@ func (c *core) handleHeartbeat(msg *raftpd.Message) {
}

func (c *core) handleHeartbeatResponse(msg *raftpd.Message) {
log.Debugf("%d [term: %d] handle heartbeat response from %d", c.id, c.term, msg.From)
ackCount := c.readOnly.ReceiveAck(msg.From, msg.Context)
if ackCount < c.quorum() {
return
}
log.Debugf("%d [term: %d] handle heartbeat response from %d", c.id, c.term, msg.From)

rss := c.readOnly.Advance(msg.Context)
for _, rs := range rss {
if rs.To == c.id {
log.Debugf("%d [term: %d] save read state: %d, %v",
c.id, c.term, rs.Index, rs.Context)

readState := read.ReadState{
Index: rs.Index,
RequestCtx: rs.Context,
}

c.callback.saveReadState(&readState)
} else {
log.Debugf("%d [term: %d] redirect heartbeat response %d to %d %v",
c.id, c.term, rs.Index, rs.To, rs.Context)

redirect := raftpd.Message{
To: rs.To,
MsgType: raftpd.MsgReadIndexResponse,
Expand Down Expand Up @@ -359,6 +372,7 @@ func (c *core) handleVoteResponse(msg *raftpd.Message) {
}

func (c *core) broadcastHeartbeatWithCtx(context []byte) {
log.Debugf("%d [Term: %d] begin broadcast heartbeat with context: %v", c.id, c.term, context)
for i := 0; i < len(c.nodes); i++ {
c.sendHeartbeat(c.nodes[i], context)
}
Expand Down Expand Up @@ -445,7 +459,7 @@ func (c *core) sendAppend(node *peer.Node) {
}
}

log.Infof("%d [Term: %d] send append [idx: %d, Term: %d, len: %d] "+
log.Debugf("%d [Term: %d] send append [idx: %d, Term: %d, len: %d] "+
"to node: %d [matched: %d next: %d]",
c.id, c.term, msg.LogIndex, msg.LogTerm, len(msg.Entries),
node.ID, node.Matched, node.NextIdx)
Expand All @@ -462,18 +476,18 @@ func (c *core) sendSnapshot(node *peer.Node) {
// if snapshot is building at now, it will return nil,
// so just ignore it and send message to it on next tick.
if snapshot == nil {
log.Infof("%x failed to send snapshot to %x because snapshot "+
log.Debugf("%x failed to send snapshot to %x because snapshot "+
"is temporarily unavailable", c.id, node.ID)
return
}

log.Infof("%x [firstIdx: %d, commit: %d] send "+
log.Debugf("%x [firstIdx: %d, commit: %d] send "+
"snapshot[index: %d, term: %d] to %x", c.id, c.log.FirstIndex(), c.log.CommitIndex(),
snapshot.Metadata.Index, snapshot.Metadata.Term, node.ID)

node.SendSnapshot(snapshot.Metadata.Index)

log.Infof("%x paused sending replication messages to %x", c.id, node.ID)
log.Debugf("%x paused sending replication messages to %x", c.id, node.ID)

msg := raftpd.Message{
MsgType: raftpd.MsgSnapshotRequest,
Expand Down
10 changes: 5 additions & 5 deletions raft/core/core_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ func (c *core) becomeFollower(term, leaderID uint64) {
c.vote = leaderID

if leaderID != conf.InvalidID {
log.Infof("%v become %d's follower at %d", c.id, leaderID, c.term)
log.Debugf("%v become %d's follower at %d", c.id, leaderID, c.term)
} else {
log.Infof("%v become follower at %d, without leader", c.id, c.term)
log.Debugf("%v become follower at %d, without leader", c.id, c.term)
}
}

Expand All @@ -83,7 +83,7 @@ func (c *core) becomeLeader() {

utils.Assert(c.vote == c.id, "leader will vote itself")

log.Infof("%v become leader at %d [firstIdx: %d, lastIdx: %d]",
log.Debugf("%v become leader at %d [firstIdx: %d, lastIdx: %d]",
c.id, c.term, c.log.FirstIndex(), c.log.LastIndex())
}

Expand All @@ -97,7 +97,7 @@ func (c *core) becomeCandidate() {

c.resetNodesVoteState()

log.Infof("%v become candidate at %d", c.id, c.term)
log.Debugf("%v become candidate at %d", c.id, c.term)
}

func (c *core) becomePreCandidate() {
Expand All @@ -112,7 +112,7 @@ func (c *core) becomePreCandidate() {
// Becoming a pre-candidate changes our state,
// but doesn't change anything else. In particular it does not increase
// currentTerm or change votedFor.
log.Infof("%x became pre-candidate at term %d", c.id, c.term)
log.Debugf("%x became pre-candidate at term %d", c.id, c.term)
}

func (c *core) preCampaign() {
Expand Down
25 changes: 14 additions & 11 deletions raft/core/raw_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (node *RawNode) Ready() Ready {
// clear all
node.commitEntries = make([]raftpd.Entry, 0)
node.messages = make([]raftpd.Message, 0)
node.readStates = make([]read.ReadState, 0)

return ready
}
Expand Down Expand Up @@ -138,18 +137,22 @@ func (node *RawNode) readSnapshot() *raftpd.Snapshot {

func (node *RawNode) drainReadState() []read.ReadState {
var readStates []read.ReadState
lastApplied := node.prevHS.Commit
if len(node.commitEntries) > 0 {
lastApplied := node.commitEntries[len(node.commitEntries)-1].Index
for i := 0; i < len(node.readStates); i++ {
if node.readStates[i].Index <= lastApplied {
continue
}
//save and drain read states.
readStates = make([]read.ReadState, 0, i)
copy(readStates, node.readStates)
length := copy(node.readStates[:i], node.readStates[i:])
node.readStates = node.readStates[:length]
waitCommitIdx := node.commitEntries[len(node.commitEntries)-1].Index
if lastApplied < waitCommitIdx {
lastApplied = waitCommitIdx
}
}
i := 0
for ; i < len(node.readStates); i++ {
if node.readStates[i].Index > lastApplied {
break
}
}
//save and drain read states.
readStates = make([]read.ReadState, i)
copy(readStates, node.readStates)
node.readStates = node.readStates[i:]
return readStates
}

0 comments on commit 438c277

Please sign in to comment.