Skip to content

Commit

Permalink
kvserver: send MsgAppResp back to sender
Browse files Browse the repository at this point in the history
This addresses the following race:

- n1 runs a ConfChange that adds n2 as a learner.
- n1 sends MsgApp to the learner.
- n1 starts the INITIAL snapshot, say at index 100.
- n2 receives n1's MsgApp. Since it's an uninitialized
  Replica and its log is empty, it rejects this MsgApp.
- n2 receives and applies the INITIAL snapshot, which prompts it to send an
  affirmative MsgAppResp to n1.
- n1's RawNode now tracks n2 as StateProbe (via call to ReportSnapshot(success))
- n1 receives the MsgApp rejection; n2 regresses to StateSnapshot because the
  rejection comes with a RejectHint (suggested next index to try) of zero,
  which is not in n1's log. In particular, the SnapshotIndex will likely be
  higher than the index of the snapshot actually sent, say 101.
- n1 receives the affirmative MsgAppResp (for index 100). However, 100 < 101
  so this is ignored and the follower remains in StateSnapshot.

With this commit, the last two steps cannot happen: n2 transitions straight to
StateReplicate because we step a copy of the affirmative MsgAppResp in. The
later rejection will be dropped, since it is stale (you can't hint at index zero
when you already have a positive index confirmed).

I will add that there is no great testing for the above other than stressing
the test with additional logging, noting the symptoms, and noting that they
disappear with this commit. Scripted testing of this code is within reach[^1]
but is outside of the scope of this PR.

[^1]: cockroachdb#105177

There is an additional bit of brittleness that is silently suppressed by this
commit, but which deserves to be fixed independently because how the problem
gets avoided seems accidental and incomplete. When raft requests a snapshot, it notes its
current LastIndex and uses it as the PendingSnapshot for the follower's
Progress.

At the time of writing, MsgAppResp that reconnect the follower to the log but
which are not greater than or equal to PendingSnapshot are ignored. In effect,
this means that perfectly good snapshots are thrown away if they happen to be a
little bit stale. In the example above, the snapshot is stale: PendingSnapshot
is 101, but the snapshot is at index 100. Then how does this commit (mostly)
fix the problem, i.e. why isn't the snapshot discarded? The key is that when we
synchronously step the MsgAppResp(100) into the leader's RawNode, the rejection
hasn't arrived yet and so the follower transitions into StateReplicate with a
Match of 100. This is then enough so that raft recognizes the rejected MsgApp
as stale (since it would regress on durably stored entries). However, there is
an alternative example where the rejection arrives earlier: after the snapshot
index has been picked, but before the follower has been transitioned into
StateReplicate. For this to have a negative effect, an entry has to be appended
to the leader's log between generating the snapshot and handling the rejection.
Without the combination of delegated snapshots and sustained write activity on
the leader, this window is small, and this combination is usually not present
in tests but it may well be relevant in "real" clusters. We track addressing
this in cockroachdb#106813.

Closes cockroachdb#87554.
Closes cockroachdb#97971.
Closes cockroachdb#84242.

Epic: None
Release note (bug fix): removed a source of unnecessary Raft snapshots during
replica movement.
  • Loading branch information
tbg committed Jul 14, 2023
1 parent 012c06f commit ec77ba0
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 2 deletions.
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2872,7 +2872,18 @@ func (r *Replica) sendSnapshotUsingDelegate(
retErr = timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
// Sending snapshot
_, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
if err == nil && resp.MsgAppResp != nil {
const mayCampaignOnWake = false
_ = r.withRaftGroup(mayCampaignOnWake, func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
msg := *resp.MsgAppResp
// With a delegated snapshot, the recipient received the snapshot
// from another replica and will thus respond to it instead. But the
// message is valid for the actual originator of the send as well.
msg.To = rn.BasicStatus().ID
return false, rn.Step(*resp.MsgAppResp)
})
}
return err
},
)
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,17 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil {
return stats, errors.Wrap(err, "while applying snapshot")
}
for _, msg := range msgStorageAppend.Responses {
// TODO is this conditional enough?
if msg.To == uint64(inSnap.FromReplica.ReplicaID) &&
msg.Type == raftpb.MsgAppResp &&
!msg.Reject &&
msg.Index == snap.Metadata.Index {

inSnap.msgAppRespCh <- msg
break
}
}
stats.tSnapEnd = timeutil.Now()
stats.snap.applied = true

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ type IncomingSnapshot struct {
DataSize int64
snapType kvserverpb.SnapshotRequest_Type
placeholder *ReplicaPlaceholder
raftAppliedIndex kvpb.RaftIndex // logging only
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
}

func (s IncomingSnapshot) String() string {
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,18 @@ func (s *Store) processRaftSnapshotRequest(
log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index)
s.metrics.RangeSnapshotRecvUnusable.Inc(1)
}
// If the snapshot was applied and acked with an MsgAppResp, return that
// message up the stack. We're using msgAppRespCh as a shortcut to avoid
// plumbing return parameters through an additional few layers of raft
// handling.
//
// NB: in practice there's always an MsgAppResp here, but it is better not
// to rely on what is essentially discretionary raft behavior.
select {
case msg := <-inSnap.msgAppRespCh:
msgAppResp = &msg
default:
}
return nil
})
if pErr != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
DataSize: dataSize,
snapType: header.Type,
raftAppliedIndex: header.State.RaftAppliedIndex,
msgAppRespCh: make(chan raftpb.Message, 1),
}

timingTag.stop("totalTime")
Expand Down

0 comments on commit ec77ba0

Please sign in to comment.