diff --git a/manager/state/raft/transport/peer.go b/manager/state/raft/transport/peer.go index 237b871619..23eb6748b6 100644 --- a/manager/state/raft/transport/peer.go +++ b/manager/state/raft/transport/peer.go @@ -196,9 +196,52 @@ func needsSplitting(m *raftpb.Message) bool { } func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error { - ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout) + // These lines used to be in the code, but they've been removed. I'm + // leaving them in in a comment just in case they cause some unforeseen + // breakage later, to show why they were removed. + // + // ctx, cancel := context.WithTimeout(ctx, p.tr.config.SendTimeout) + // defer cancel() + // + // Basically, these lines created a timeout that applied not to each chunk + // of a streaming message, but to the whole streaming process. With a + // sufficiently large raft log, the bandwidth on some connections can not + // physically be enough to fit within the default 2 second timeout. + // Further, it seems that because of some gRPC magic, the timeout was + // getting propagated to the stream *server*, meaning it wasn't even the + // sender timing out, it was the receiver. + // + // It should be fine to remove this timeout. The whole purpose of this + // method is to send very large raft messages that could take several + // seconds to send. + + ctx, cancel := context.WithCancel(ctx) defer cancel() + // This is a bootleg watchdog timer. If the timer elapses without something + // being written to the bump channel, it will cancel the context. + // + // We use this because the operations on this stream *must* either time out + // or succeed for raft to function correctly. We can't just time out the + // whole operation, because of the reasons stated above. But we also only + // set the context once, when we create the stream, and so can't set an + // individual timeout for each stream operation. + // + // By doing it as this watchdog-type structure, we can time out individual + // operations by canceling the context on our own terms. + bump := make(chan struct{}) + go func() { + for { + select { + case <-bump: + case <-time.After(p.tr.config.SendTimeout): + cancel() + case <-ctx.Done(): + return + } + } + }() + var err error var stream api.Raft_StreamRaftMessageClient stream, err = api.NewRaftClient(p.conn()).StreamRaftMessage(ctx) @@ -217,6 +260,17 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error { // Stream for _, msg := range msgs { err = stream.Send(&msg) + + // If the send succeeds, bump the watchdog timer. + // + // We cannot just do a naked send to the bump channel. If we try to + // send, for example, and the timer has elapsed, then the context + // will have been canceled, the watchdog loop will have exited, and + // there would be no receiver. We'd block here forever. + select { + case bump <- struct{}{}: + case <-ctx.Done(): + } if err != nil { log.G(ctx).WithError(err).Error("error streaming message to peer") stream.CloseAndRecv()