Skip to content

Commit 1900a8e

Browse files
authored
Merge pull request etcd-io#10308 from tbg/fix/progress-after-snap
raft: enter ProgressStateReplica immediately after snapshot
2 parents 8a9a2a1 + bfaae1b commit 1900a8e

File tree

2 files changed

+123
-2
lines changed

2 files changed

+123
-2
lines changed

raft/raft.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,13 @@ func stepLeader(r *raft, m pb.Message) error {
10781078
pr.becomeReplicate()
10791079
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
10801080
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1081+
// Transition back to replicating state via probing state
1082+
// (which takes the snapshot into account). If we didn't
1083+
// move to replicating state, that would only happen with
1084+
// the next round of appends (but there may not be a next
1085+
// round for a while, exposing an inconsistent RaftStatus).
10811086
pr.becomeProbe()
1087+
pr.becomeReplicate()
10821088
case pr.State == ProgressStateReplicate:
10831089
pr.ins.freeTo(m.Index)
10841090
}

raft/raft_snap_test.go

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,114 @@ func TestSnapshotSucceed(t *testing.T) {
111111
}
112112
}
113113

114+
// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
115+
// shot is sent to a follower at the most recent index (i.e. the snapshot index
116+
// is the leader's last index is the committed index). In that situation, a bug
117+
// in the past left the follower in probing status until the next log entry was
118+
// committed.
119+
func TestSnapshotSucceedViaAppResp(t *testing.T) {
120+
snap := pb.Snapshot{
121+
Metadata: pb.SnapshotMetadata{
122+
Index: 11, // magic number
123+
Term: 11, // magic number
124+
ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
125+
},
126+
}
127+
128+
s1 := NewMemoryStorage()
129+
n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1)
130+
131+
// Become follower because otherwise the way this test sets things up the
132+
// leadership term will be 1 (which is stale). We want it to match the snap-
133+
// shot term in this test.
134+
n1.becomeFollower(snap.Metadata.Term-1, 2)
135+
n1.becomeCandidate()
136+
n1.becomeLeader()
137+
138+
// Apply a snapshot on the leader. This is a workaround against the fact that
139+
// the leader will always append an empty entry, but that empty entry works
140+
// against what we're trying to assert in this test, namely that a snapshot
141+
// at the latest committed index leaves the follower in probing state.
142+
// With the snapshot, the empty entry is fully committed.
143+
n1.restore(snap)
144+
145+
noMessage := pb.MessageType(-1)
146+
mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
147+
t.Helper()
148+
for i, msg := range from.msgs {
149+
if msg.From != from.id || msg.To != to.id || msg.Type != typ {
150+
continue
151+
}
152+
t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
153+
if err := to.Step(msg); err != nil {
154+
t.Fatalf("%v: %s", msg, err)
155+
}
156+
from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
157+
return msg
158+
}
159+
if typ == noMessage {
160+
if len(from.msgs) == 0 {
161+
return pb.Message{}
162+
}
163+
t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
164+
}
165+
t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
166+
return pb.Message{} // unreachable
167+
}
168+
169+
// Create the follower that will receive the snapshot.
170+
s2 := NewMemoryStorage()
171+
n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2)
172+
173+
// Let the leader probe the follower.
174+
if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
175+
t.Fatalf("expected message to be sent")
176+
}
177+
if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
178+
// For this test to work, the leader must not have anything to append
179+
// to the follower right now.
180+
t.Fatalf("unexpectedly appending entries %v", msg.Entries)
181+
}
182+
183+
// Follower rejects the append (because it doesn't have any log entries)
184+
if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
185+
t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
186+
}
187+
188+
expIdx := snap.Metadata.Index
189+
// Leader sends snapshot due to RejectHint of zero (the storage we use here
190+
// has index zero compacted).
191+
if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
192+
t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
193+
}
194+
195+
// n2 reacts to snapshot with MsgAppResp.
196+
if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
197+
t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
198+
}
199+
200+
// Leader sends MsgApp to communicate commit index.
201+
if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
202+
t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
203+
}
204+
205+
// Follower responds.
206+
mustSend(n2, n1, pb.MsgAppResp)
207+
208+
// Leader has correct state for follower.
209+
pr := n1.prs[2]
210+
if pr.State != ProgressStateReplicate {
211+
t.Fatalf("unexpected state %v", pr)
212+
}
213+
if pr.Match != expIdx || pr.Next != expIdx+1 {
214+
t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
215+
}
216+
217+
// Leader and follower are done.
218+
mustSend(n1, n2, noMessage)
219+
mustSend(n2, n1, noMessage)
220+
}
221+
114222
func TestSnapshotAbort(t *testing.T) {
115223
storage := NewMemoryStorage()
116224
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
@@ -128,7 +236,14 @@ func TestSnapshotAbort(t *testing.T) {
128236
if sm.prs[2].PendingSnapshot != 0 {
129237
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot)
130238
}
131-
if sm.prs[2].Next != 12 {
132-
t.Fatalf("Next = %d, want 12", sm.prs[2].Next)
239+
// The follower entered ProgressStateReplicate and the leader send an append
240+
// and optimistically updated the progress (so we see 13 instead of 12).
241+
// There is something to append because the leader appended an empty entry
242+
// to the log at index 12 when it assumed leadership.
243+
if sm.prs[2].Next != 13 {
244+
t.Fatalf("Next = %d, want 13", sm.prs[2].Next)
245+
}
246+
if n := sm.prs[2].ins.count; n != 1 {
247+
t.Fatalf("expected an inflight message, got %d", n)
133248
}
134249
}

0 commit comments

Comments
 (0)