Skip to content

Commit

Permalink
Merge pull request #3128 from neersighted/peer_watchdog_afterfunc
Browse files Browse the repository at this point in the history
man/state/raft/trans/peer: use AfterFunc for context watchdog
  • Loading branch information
thaJeztah committed May 31, 2023
2 parents 90ca234 + 6b42535 commit 01bb7a4
Showing 1 changed file with 7 additions and 23 deletions.
30 changes: 7 additions & 23 deletions manager/state/raft/transport/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,10 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
//
// By doing it as this watchdog-type structure, we can time out individual
// operations by canceling the context on our own terms.
bump := make(chan struct{})
go func() {
for {
select {
case <-bump:
case <-time.After(p.tr.config.SendTimeout):
cancel()
case <-ctx.Done():
return
}
}
}()
t := time.AfterFunc(p.tr.config.SendTimeout, cancel)
defer t.Stop()

bump := func() { t.Reset(p.tr.config.SendTimeout) }

var err error
var stream api.Raft_StreamRaftMessageClient
Expand All @@ -260,22 +252,14 @@ func (p *peer) sendProcessMessage(ctx context.Context, m raftpb.Message) error {
// Stream
for _, msg := range msgs {
err = stream.Send(&msg)

// If the send succeeds, bump the watchdog timer.
//
// We cannot just do a naked send to the bump channel. If we try to
// send, for example, and the timer has elapsed, then the context
// will have been canceled, the watchdog loop will have exited, and
// there would be no receiver. We'd block here forever.
select {
case bump <- struct{}{}:
case <-ctx.Done():
}
if err != nil {
log.G(ctx).WithError(err).Error("error streaming message to peer")
stream.CloseAndRecv()
break
}

// If the send succeeds, bump the watchdog timer.
bump()
}

// Finished sending all the messages.
Expand Down

0 comments on commit 01bb7a4

Please sign in to comment.