Skip to content

Commit bfaae1b

Browse files
committed
raft: enter ProgressStateReplica immediately after snapshot
When a follower requires a snapshot and the snapshot is sent at the committed (and last) index in an otherwise idle Raft group, the follower would previously remain in ProgressStateProbe even though it had been caught up completely. In a busy Raft group this wouldn't be an issue since the next round of MsgApp would update the state, but in an idle group there's nothing that rectifies the status (since there's nothing to append or update). The reason this matters is that the state is exposed through `RaftStatus()`. Concretely, in CockroachDB, we use the Raft status to make sharding decisions (since it's dangerous to make rapid changes to a fragile Raft group), and had to work around this problem[1]. [1]: https://github.com/cockroachdb/cockroach/blob/91b11dae416f3d9b55fadd2a4b096e94d874176c/pkg/storage/split_delay_helper.go#L138-L141
1 parent 6c649de commit bfaae1b

File tree

2 files changed

+123
-2
lines changed

2 files changed

+123
-2
lines changed

raft/raft.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,13 @@ func stepLeader(r *raft, m pb.Message) error {
10781078
pr.becomeReplicate()
10791079
case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
10801080
r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1081+
// Transition back to replicating state via probing state
1082+
// (which takes the snapshot into account). If we didn't
1083+
// move to replicating state, that would only happen with
1084+
// the next round of appends (but there may not be a next
1085+
// round for a while, exposing an inconsistent RaftStatus).
10811086
pr.becomeProbe()
1087+
pr.becomeReplicate()
10821088
case pr.State == ProgressStateReplicate:
10831089
pr.ins.freeTo(m.Index)
10841090
}

raft/raft_snap_test.go

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,114 @@ func TestSnapshotSucceed(t *testing.T) {
111111
}
112112
}
113113

114+
// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap-
115+
// shot is sent to a follower at the most recent index (i.e. the snapshot index
116+
// is the leader's last index is the committed index). In that situation, a bug
117+
// in the past left the follower in probing status until the next log entry was
118+
// committed.
119+
func TestSnapshotSucceedViaAppResp(t *testing.T) {
120+
snap := pb.Snapshot{
121+
Metadata: pb.SnapshotMetadata{
122+
Index: 11, // magic number
123+
Term: 11, // magic number
124+
ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}},
125+
},
126+
}
127+
128+
s1 := NewMemoryStorage()
129+
n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1)
130+
131+
// Become follower because otherwise the way this test sets things up the
132+
// leadership term will be 1 (which is stale). We want it to match the snap-
133+
// shot term in this test.
134+
n1.becomeFollower(snap.Metadata.Term-1, 2)
135+
n1.becomeCandidate()
136+
n1.becomeLeader()
137+
138+
// Apply a snapshot on the leader. This is a workaround against the fact that
139+
// the leader will always append an empty entry, but that empty entry works
140+
// against what we're trying to assert in this test, namely that a snapshot
141+
// at the latest committed index leaves the follower in probing state.
142+
// With the snapshot, the empty entry is fully committed.
143+
n1.restore(snap)
144+
145+
noMessage := pb.MessageType(-1)
146+
mustSend := func(from, to *raft, typ pb.MessageType) pb.Message {
147+
t.Helper()
148+
for i, msg := range from.msgs {
149+
if msg.From != from.id || msg.To != to.id || msg.Type != typ {
150+
continue
151+
}
152+
t.Log(DescribeMessage(msg, func([]byte) string { return "" }))
153+
if err := to.Step(msg); err != nil {
154+
t.Fatalf("%v: %s", msg, err)
155+
}
156+
from.msgs = append(from.msgs[:i], from.msgs[i+1:]...)
157+
return msg
158+
}
159+
if typ == noMessage {
160+
if len(from.msgs) == 0 {
161+
return pb.Message{}
162+
}
163+
t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs)
164+
}
165+
t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs)
166+
return pb.Message{} // unreachable
167+
}
168+
169+
// Create the follower that will receive the snapshot.
170+
s2 := NewMemoryStorage()
171+
n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2)
172+
173+
// Let the leader probe the follower.
174+
if !n1.maybeSendAppend(2, true /* sendIfEmpty */) {
175+
t.Fatalf("expected message to be sent")
176+
}
177+
if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 {
178+
// For this test to work, the leader must not have anything to append
179+
// to the follower right now.
180+
t.Fatalf("unexpectedly appending entries %v", msg.Entries)
181+
}
182+
183+
// Follower rejects the append (because it doesn't have any log entries)
184+
if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject {
185+
t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint)
186+
}
187+
188+
expIdx := snap.Metadata.Index
189+
// Leader sends snapshot due to RejectHint of zero (the storage we use here
190+
// has index zero compacted).
191+
if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx {
192+
t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index)
193+
}
194+
195+
// n2 reacts to snapshot with MsgAppResp.
196+
if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx {
197+
t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index)
198+
}
199+
200+
// Leader sends MsgApp to communicate commit index.
201+
if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx {
202+
t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit)
203+
}
204+
205+
// Follower responds.
206+
mustSend(n2, n1, pb.MsgAppResp)
207+
208+
// Leader has correct state for follower.
209+
pr := n1.prs[2]
210+
if pr.State != ProgressStateReplicate {
211+
t.Fatalf("unexpected state %v", pr)
212+
}
213+
if pr.Match != expIdx || pr.Next != expIdx+1 {
214+
t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next)
215+
}
216+
217+
// Leader and follower are done.
218+
mustSend(n1, n2, noMessage)
219+
mustSend(n2, n1, noMessage)
220+
}
221+
114222
func TestSnapshotAbort(t *testing.T) {
115223
storage := NewMemoryStorage()
116224
sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage)
@@ -128,7 +236,14 @@ func TestSnapshotAbort(t *testing.T) {
128236
if sm.prs[2].PendingSnapshot != 0 {
129237
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot)
130238
}
131-
if sm.prs[2].Next != 12 {
132-
t.Fatalf("Next = %d, want 12", sm.prs[2].Next)
239+
// The follower entered ProgressStateReplicate and the leader send an append
240+
// and optimistically updated the progress (so we see 13 instead of 12).
241+
// There is something to append because the leader appended an empty entry
242+
// to the log at index 12 when it assumed leadership.
243+
if sm.prs[2].Next != 13 {
244+
t.Fatalf("Next = %d, want 13", sm.prs[2].Next)
245+
}
246+
if n := sm.prs[2].ins.count; n != 1 {
247+
t.Fatalf("expected an inflight message, got %d", n)
133248
}
134249
}

0 commit comments

Comments
 (0)