Skip to content

Commit

Permalink
Rename of Raft.send to Raft.sendInNextReady.
Browse files Browse the repository at this point in the history
The change makes it explicit that sending messages does not happen
immidietely and is subject to proper persist & then send protocol
on the application side. See:

etcd-io#12589 (comment)

for more context.
  • Loading branch information
ptabor committed Jan 12, 2021
1 parent 295ac2d commit 1f6ba62
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 55 deletions.
12 changes: 6 additions & 6 deletions raft/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,19 @@ stale log entries:
passes 'MsgHup' to its Step method and becomes (or remains) a candidate to
start a new election.
'MsgBeat' is an internal type that signals the leader to send a heartbeat of
'MsgBeat' is an internal type that signals the leader to sendInNextReady a heartbeat of
the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in
the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
send periodic 'MsgHeartbeat' messages to its followers.
sendInNextReady periodic 'MsgHeartbeat' messages to its followers.
'MsgProp' proposes to append data to its log entries. This is a special
type to redirect proposals to leader. Therefore, send method overwrites
type to redirect proposals to leader. Therefore, sendInNextReady method overwrites
raftpb.Message's term with its HardState's term to avoid attaching its
local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step'
method, the leader first calls the 'appendEntry' method to append entries
to its log, and then calls 'bcastAppend' method to send those entries to
to its log, and then calls 'bcastAppend' method to sendInNextReady those entries to
its peers. When passed to candidate, 'MsgProp' is dropped. When passed to
follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send
follower, 'MsgProp' is stored in follower's mailbox(msgs) by the sendInNextReady
method. It is stored with sender's ID and later forwarded to leader by
rafthttp package.
Expand Down Expand Up @@ -272,7 +272,7 @@ stale log entries:
'MsgSnapStatus' tells the result of snapshot install message. When a
follower rejected 'MsgSnap', it indicates the snapshot request with
'MsgSnap' had failed from network issues which causes the network layer
to fail to send out snapshots to its followers. Then leader considers
to fail to sendInNextReady out snapshots to its followers. Then leader considers
follower's progress as probe. When 'MsgSnap' were not rejected, it
indicates that the snapshot succeeded and the leader sets follower's
progress to probe and resumes its log replication.
Expand Down
4 changes: 2 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Node interface {

// Status returns the current status of the raft state machine.
Status() Status
// ReportUnreachable reports the given node is not reachable for the last send.
// ReportUnreachable reports the given node is not reachable for the last sendInNextReady.
ReportUnreachable(id uint64)
// ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
// who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
Expand Down Expand Up @@ -311,7 +311,7 @@ func (n *node) run() {
} else if n.rn.HasReady() {
// Populate a Ready. Note that this Ready is not guaranteed to
// actually be handled. We will arm readyc, but there's no guarantee
// that we will actually send on it. It's possible that we will
// that we will actually sendInNextReady on it. It's possible that we will
// service another channel instead, loop around, and then populate
// the Ready again. We could instead force the previous Ready to be
// handled first, but it's generally good to emit larger Readys plus
Expand Down
12 changes: 6 additions & 6 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,15 @@ func TestDisableProposalForwarding(t *testing.T) {

var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}

// send proposal to r2(follower) where DisableProposalForwarding is false
// sendInNextReady proposal to r2(follower) where DisableProposalForwarding is false
r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})

// verify r2(follower) does forward the proposal when DisableProposalForwarding is false
if len(r2.msgs) != 1 {
t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
}

// send proposal to r3(follower) where DisableProposalForwarding is true
// sendInNextReady proposal to r3(follower) where DisableProposalForwarding is true
r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})

// verify r3(follower) does not forward the proposal when DisableProposalForwarding is true
Expand All @@ -245,7 +245,7 @@ func TestDisableProposalForwarding(t *testing.T) {
}

// TestNodeReadIndexToOldLeader ensures that raftpb.MsgReadIndex to old leader
// gets forwarded to the new leader and 'send' method does not attach its term.
// gets forwarded to the new leader and 'sendInNextReady' method does not attach its term.
func TestNodeReadIndexToOldLeader(t *testing.T) {
r1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
r2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
Expand All @@ -258,7 +258,7 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {

var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}

// send readindex request to r2(follower)
// sendInNextReady readindex request to r2(follower)
r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})

// verify r2(follower) forwards this message to r1(leader) with term not set
Expand All @@ -270,7 +270,7 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
}

// send readindex request to r3(follower)
// sendInNextReady readindex request to r3(follower)
r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})

// verify r3(follower) forwards this message to r1(leader) with term not set as well.
Expand Down Expand Up @@ -899,7 +899,7 @@ func TestAppendPagination(t *testing.T) {
n.recover()

// After the partition recovers, tick the clock to wake everything
// back up and send the messages.
// back up and sendInNextReady the messages.
n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
if !seenFullMessage {
t.Error("didn't see any messages more than half the max size; something is wrong with this test")
Expand Down
45 changes: 23 additions & 22 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,9 @@ func (r *raft) hardState() pb.HardState {
}
}

// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
// sendInNextReady schedules persisting state to a stable storage and then
// sending the message (as part of next Ready message processing).
func (r *raft) sendInNextReady(m pb.Message) {
if m.From == None {
m.From = r.id
}
Expand Down Expand Up @@ -502,7 +503,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
}
}
}
r.send(m)
r.sendInNextReady(m)
return true
}

Expand All @@ -522,7 +523,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
Context: ctx,
}

r.send(m)
r.sendInNextReady(m)
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down Expand Up @@ -839,7 +840,7 @@ func (r *raft) campaign(t CampaignType) {
if t == campaignTransfer {
ctx = []byte(t)
}
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
r.sendInNextReady(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}

Expand Down Expand Up @@ -912,14 +913,14 @@ func (r *raft) Step(m pb.Message) error {
// with "pb.MsgAppResp" of higher term would force leader to step down.
// However, this disruption is inevitable to free this stuck node with
// fresh election. This can be prevented with Pre-Vote phase.
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgAppResp})
} else if m.Type == pb.MsgPreVote {
// Before Pre-Vote enable, there may have candidate with higher term,
// but less log. After update to Pre-Vote, the cluster may deadlock if
// we drop messages with a lower term.
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
r.sendInNextReady(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
} else {
// ignore other cases
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
Expand Down Expand Up @@ -974,7 +975,7 @@ func (r *raft) Step(m pb.Message) error {
// the message (it ignores all out of date messages).
// The term in the original message and current local term are the
// same in the case of regular votes, but different for pre-votes.
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
r.sendInNextReady(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// Only record real votes.
r.electionElapsed = 0
Expand All @@ -983,7 +984,7 @@ func (r *raft) Step(m pb.Message) error {
} else {
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
r.sendInNextReady(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}

default:
Expand Down Expand Up @@ -1088,7 +1089,7 @@ func stepLeader(r *raft, m pb.Message) error {
// only one voting member (the leader) in the cluster
if r.prs.IsSingleton() {
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
r.sendInNextReady(resp)
}
return nil
}
Expand All @@ -1110,7 +1111,7 @@ func stepLeader(r *raft, m pb.Message) error {
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
case ReadOnlyLeaseBased:
if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
r.send(resp)
r.sendInNextReady(resp)
}
}
return nil
Expand Down Expand Up @@ -1203,7 +1204,7 @@ func stepLeader(r *raft, m pb.Message) error {
rss := r.readOnly.advance(m)
for _, rs := range rss {
if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
r.send(resp)
r.sendInNextReady(resp)
}
}
case pb.MsgSnapStatus:
Expand Down Expand Up @@ -1328,7 +1329,7 @@ func stepFollower(r *raft, m pb.Message) error {
return ErrProposalDropped
}
m.To = r.lead
r.send(m)
r.sendInNextReady(m)
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
Expand All @@ -1347,7 +1348,7 @@ func stepFollower(r *raft, m pb.Message) error {
return nil
}
m.To = r.lead
r.send(m)
r.sendInNextReady(m)
case pb.MsgTimeoutNow:
r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
// Leadership transfers never use pre-vote even if r.preVote is true; we
Expand All @@ -1360,7 +1361,7 @@ func stepFollower(r *raft, m pb.Message) error {
return nil
}
m.To = r.lead
r.send(m)
r.sendInNextReady(m)
case pb.MsgReadIndexResp:
if len(m.Entries) != 1 {
r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
Expand All @@ -1373,34 +1374,34 @@ func stepFollower(r *raft, m pb.Message) error {

func (r *raft) handleAppendEntries(m pb.Message) {
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}

if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.raftLog.committed, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
r.sendInNextReady(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
}

Expand Down Expand Up @@ -1593,7 +1594,7 @@ func (r *raft) resetRandomizedElectionTimeout() {
}

func (r *raft) sendTimeoutNow(to uint64) {
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
r.sendInNextReady(pb.Message{To: to, Type: pb.MsgTimeoutNow})
}

func (r *raft) abortLeaderTransfer() {
Expand Down
2 changes: 1 addition & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestStartAsFollower(t *testing.T) {
}

// TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
// it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
// it will sendInNextReady a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
// as heartbeat to all followers.
// Reference: section 5.2
func TestLeaderBcastBeat(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion raft/raft_snap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSnapshotAbort(t *testing.T) {
if sm.prs.Progress[2].PendingSnapshot != 0 {
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
}
// The follower entered StateReplicate and the leader send an append
// The follower entered StateReplicate and the leader sendInNextReady an append
// and optimistically updated the progress (so we see 13 instead of 12).
// There is something to append because the leader appended an empty entry
// to the log at index 12 when it assumed leadership.
Expand Down

0 comments on commit 1f6ba62

Please sign in to comment.