diff --git a/raft/core/core_handle.go b/raft/core/core_handle.go index cba4178..4a5d5b4 100644 --- a/raft/core/core_handle.go +++ b/raft/core/core_handle.go @@ -97,6 +97,9 @@ func (c *core) handleReadIndexRequest(msg *raftpd.Message) { } func (c *core) handleReadIndexResponse(msg *raftpd.Message) { + log.Debugf("%d [Term: %d, commit: %d] receive read index response from: %d [idx: %d]", + c.id, c.term, msg.From, msg.Index) + // TODO: check readState := read.ReadState{ Index: msg.Index, @@ -115,7 +118,7 @@ func (c *core) handleAppendEntries(msg *raftpd.Message) { To: msg.From, } if c.log.CommitIndex() > msg.LogIndex { - log.Infof("%d [Term: %d, commit: %d] reject expired append Entries "+ + log.Debugf("%d [Term: %d, commit: %d] reject expired append Entries "+ "from %d [logterm: %d, idx: %d]", c.id, c.term, c.log.CommitIndex(), msg.From, msg.LogTerm, msg.LogIndex) @@ -125,7 +128,7 @@ func (c *core) handleAppendEntries(msg *raftpd.Message) { reply.RejectHint = c.log.CommitIndex() reply.Reject = false } else if idx, ok := c.log.TryAppend(msg.LogIndex, msg.LogTerm, msg.Entries); ok { - log.Infof("%d [Term: %d, commit: %d] accept append Entries "+ + log.Debugf("%d [Term: %d, commit: %d] accept append Entries "+ "from %d [logterm: %d, idx: %d], last: %d", c.id, c.term, c.log.CommitIndex(), msg.From, msg.LogTerm, msg.LogIndex, idx) @@ -136,7 +139,7 @@ func (c *core) handleAppendEntries(msg *raftpd.Message) { reply.RejectHint = idx /* idx is index of latest log entry */ reply.Reject = false } else { - log.Infof("%d [logterm: %d, commit: %d, last idx: %d] rejected msgApp "+ + log.Debugf("%d [logterm: %d, commit: %d, last idx: %d] rejected msgApp "+ "[logterm: %d, idx: %d] and hint %d from %d", c.id, c.log.Term(msg.LogIndex), c.log.CommitIndex(), c.log.LastIndex(), msg.LogTerm, msg.LogIndex, idx, msg.From) @@ -181,7 +184,7 @@ func (c *core) handleSnapshot(msg *raftpd.Message) { Reject: false, } if c.tryRestore(msg.Snapshot) { - log.Infof("%x [commit: %d] restore snapshot [index: %d, term: %d]", + log.Debugf("%x [commit: %d] restore snapshot [index: %d, term: %d]", c.id, c.log.CommitIndex(), msg.Snapshot.Metadata.Index, msg.Snapshot.Metadata.Term) @@ -192,7 +195,7 @@ func (c *core) handleSnapshot(msg *raftpd.Message) { reply.Index = msg.Snapshot.Metadata.Index reply.RejectHint = c.log.LastIndex() } else { - log.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", + log.Debugf("%x [commit: %d] ignored snapshot [index: %d, term: %d]", c.id, c.log.CommitIndex(), msg.Snapshot.Metadata.Index, msg.Snapshot.Metadata.Term) @@ -219,6 +222,8 @@ func (c *core) handleUnreachable(msg *raftpd.Message) { } func (c *core) handleHeartbeat(msg *raftpd.Message) { + log.Debugf("%d [term: %d] handle heartbeat request from %d", c.id, c.term, msg.From) + c.log.CommitTo(msg.Index) reply := raftpd.Message{ @@ -231,14 +236,19 @@ func (c *core) handleHeartbeat(msg *raftpd.Message) { } func (c *core) handleHeartbeatResponse(msg *raftpd.Message) { + log.Debugf("%d [term: %d] handle heartbeat response from %d", c.id, c.term, msg.From) ackCount := c.readOnly.ReceiveAck(msg.From, msg.Context) if ackCount < c.quorum() { return } + log.Debugf("%d [term: %d] handle heartbeat response from %d", c.id, c.term, msg.From) rss := c.readOnly.Advance(msg.Context) for _, rs := range rss { if rs.To == c.id { + log.Debugf("%d [term: %d] save read state: %d, %v", + c.id, c.term, rs.Index, rs.Context) + readState := read.ReadState{ Index: rs.Index, RequestCtx: rs.Context, @@ -246,6 +256,9 @@ func (c *core) handleHeartbeatResponse(msg *raftpd.Message) { c.callback.saveReadState(&readState) } else { + log.Debugf("%d [term: %d] redirect heartbeat response %d to %d %v", + c.id, c.term, rs.Index, rs.To, rs.Context) + redirect := raftpd.Message{ To: rs.To, MsgType: raftpd.MsgReadIndexResponse, @@ -359,6 +372,7 @@ func (c *core) handleVoteResponse(msg *raftpd.Message) { } func (c *core) broadcastHeartbeatWithCtx(context []byte) { + log.Debugf("%d [Term: %d] begin broadcast heartbeat with context: %v", c.id, c.term, context) for i := 0; i < len(c.nodes); i++ { c.sendHeartbeat(c.nodes[i], context) } @@ -445,7 +459,7 @@ func (c *core) sendAppend(node *peer.Node) { } } - log.Infof("%d [Term: %d] send append [idx: %d, Term: %d, len: %d] "+ + log.Debugf("%d [Term: %d] send append [idx: %d, Term: %d, len: %d] "+ "to node: %d [matched: %d next: %d]", c.id, c.term, msg.LogIndex, msg.LogTerm, len(msg.Entries), node.ID, node.Matched, node.NextIdx) @@ -462,18 +476,18 @@ func (c *core) sendSnapshot(node *peer.Node) { // if snapshot is building at now, it will return nil, // so just ignore it and send message to it on next tick. if snapshot == nil { - log.Infof("%x failed to send snapshot to %x because snapshot "+ + log.Debugf("%x failed to send snapshot to %x because snapshot "+ "is temporarily unavailable", c.id, node.ID) return } - log.Infof("%x [firstIdx: %d, commit: %d] send "+ + log.Debugf("%x [firstIdx: %d, commit: %d] send "+ "snapshot[index: %d, term: %d] to %x", c.id, c.log.FirstIndex(), c.log.CommitIndex(), snapshot.Metadata.Index, snapshot.Metadata.Term, node.ID) node.SendSnapshot(snapshot.Metadata.Index) - log.Infof("%x paused sending replication messages to %x", c.id, node.ID) + log.Debugf("%x paused sending replication messages to %x", c.id, node.ID) msg := raftpd.Message{ MsgType: raftpd.MsgSnapshotRequest, diff --git a/raft/core/core_internal.go b/raft/core/core_internal.go index 0e3e188..5d958af 100644 --- a/raft/core/core_internal.go +++ b/raft/core/core_internal.go @@ -61,9 +61,9 @@ func (c *core) becomeFollower(term, leaderID uint64) { c.vote = leaderID if leaderID != conf.InvalidID { - log.Infof("%v become %d's follower at %d", c.id, leaderID, c.term) + log.Debugf("%v become %d's follower at %d", c.id, leaderID, c.term) } else { - log.Infof("%v become follower at %d, without leader", c.id, c.term) + log.Debugf("%v become follower at %d, without leader", c.id, c.term) } } @@ -83,7 +83,7 @@ func (c *core) becomeLeader() { utils.Assert(c.vote == c.id, "leader will vote itself") - log.Infof("%v become leader at %d [firstIdx: %d, lastIdx: %d]", + log.Debugf("%v become leader at %d [firstIdx: %d, lastIdx: %d]", c.id, c.term, c.log.FirstIndex(), c.log.LastIndex()) } @@ -97,7 +97,7 @@ func (c *core) becomeCandidate() { c.resetNodesVoteState() - log.Infof("%v become candidate at %d", c.id, c.term) + log.Debugf("%v become candidate at %d", c.id, c.term) } func (c *core) becomePreCandidate() { @@ -112,7 +112,7 @@ func (c *core) becomePreCandidate() { // Becoming a pre-candidate changes our state, // but doesn't change anything else. In particular it does not increase // currentTerm or change votedFor. - log.Infof("%x became pre-candidate at term %d", c.id, c.term) + log.Debugf("%x became pre-candidate at term %d", c.id, c.term) } func (c *core) preCampaign() { diff --git a/raft/core/raw_node.go b/raft/core/raw_node.go index e524cfa..0b767d7 100644 --- a/raft/core/raw_node.go +++ b/raft/core/raw_node.go @@ -104,7 +104,6 @@ func (node *RawNode) Ready() Ready { // clear all node.commitEntries = make([]raftpd.Entry, 0) node.messages = make([]raftpd.Message, 0) - node.readStates = make([]read.ReadState, 0) return ready } @@ -138,18 +137,22 @@ func (node *RawNode) readSnapshot() *raftpd.Snapshot { func (node *RawNode) drainReadState() []read.ReadState { var readStates []read.ReadState + lastApplied := node.prevHS.Commit if len(node.commitEntries) > 0 { - lastApplied := node.commitEntries[len(node.commitEntries)-1].Index - for i := 0; i < len(node.readStates); i++ { - if node.readStates[i].Index <= lastApplied { - continue - } - //save and drain read states. - readStates = make([]read.ReadState, 0, i) - copy(readStates, node.readStates) - length := copy(node.readStates[:i], node.readStates[i:]) - node.readStates = node.readStates[:length] + waitCommitIdx := node.commitEntries[len(node.commitEntries)-1].Index + if lastApplied < waitCommitIdx { + lastApplied = waitCommitIdx } } + i := 0 + for ; i < len(node.readStates); i++ { + if node.readStates[i].Index > lastApplied { + break + } + } + //save and drain read states. + readStates = make([]read.ReadState, i) + copy(readStates, node.readStates) + node.readStates = node.readStates[i:] return readStates }