Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeouts from very large raft messages #3122

Merged
merged 1 commit into from Apr 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 55 additions & 1 deletion manager/state/raft/transport/peer.go
Expand Up @@ -196,9 +196,52 @@ func needsSplitting(m *raftpb.Message) bool {
}

func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
// These lines used to be in the code, but they've been removed. I'm
// leaving them in in a comment just in case they cause some unforeseen
// breakage later, to show why they were removed.
//
// ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout)
// defer cancel()
//
// Basically, these lines created a timeout that applied not to each chunk
// of a streaming message, but to the whole streaming process. With a
// sufficiently large raft log, the bandwidth on some connections can not
// physically be enough to fit within the default 2 second timeout.
// Further, it seems that because of some gRPC magic, the timeout was
// getting propagated to the stream *server*, meaning it wasn't even the
// sender timing out, it was the receiver.
//
// It should be fine to remove this timeout. The whole purpose of this
// method is to send very large raft messages that could take several
// seconds to send.

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// This is a bootleg watchdog timer. If the timer elapses without something
// being written to the bump channel, it will cancel the context.
//
// We use this because the operations on this stream *must* either time out
// or succeed for raft to function correctly. We can't just time out the
// whole operation, because of the reasons stated above. But we also only
// set the context once, when we create the stream, and so can't set an
// individual timeout for each stream operation.
//
// By doing it as this watchdog-type structure, we can time out individual
// operations by canceling the context on our own terms.
bump := make(chan struct{})
go func() {
for {
select {
case <-bump:
case <-time.After(p.tr.config.SendTimeout):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new timer will be created on each turn of the loop, which won't be cleaned up until after the timer fires. Memory consumption would increase proportionally to the number of messages the snapshot data is split into, and won't be fully garbage-collectable until a nontrivial amount of time after the snapshot is fully transmitted. The memory usage could potentially become significant when there's a thundering herd of a hundred nodes joining a cluster.

Active timers created with timer.AfterFunc are allowed to be reset, and context.CancelFunc closures are idempotent, so a resettable watchdog timer which cancels a context on expiry and consumes O(1) memory can be implemented quite simply:

t := timer.AfterFunc(p.tr.config.SendTimeout, cancel)
defer t.Stop()

bump := func() { t.Reset(p.tr.config.SendTimeout) }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈 good catch (I always forget the caveats with these); @dperny can have a look?

cancel()
case <-ctx.Done():
return
}
}
}()

var err error
var stream api.Raft_StreamRaftMessageClient
stream, err = api.NewRaftClient(p.conn()).StreamRaftMessage(ctx)
Expand All @@ -217,6 +260,17 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
// Stream
for _, msg := range msgs {
err = stream.Send(&msg)

// If the send succeeds, bump the watchdog timer.
//
// We cannot just do a naked send to the bump channel. If we try to
// send, for example, and the timer has elapsed, then the context
// will have been canceled, the watchdog loop will have exited, and
// there would be no receiver. We'd block here forever.
select {
case bump <- struct{}{}:
case <-ctx.Done():
}
if err != nil {
log.G(ctx).WithError(err).Error("error streaming message to peer")
stream.CloseAndRecv()
Expand Down