Skip to content

Commit

Permalink
Fix ping loop getting stuck if the first ping doesn't respond
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed May 21, 2024
1 parent 09ac469 commit 074e239
Showing 1 changed file with 43 additions and 10 deletions.
53 changes: 43 additions & 10 deletions libgm/longpoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,27 @@ type dittoPinger struct {
log *zerolog.Logger
}

func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) {
type resetter struct {
C chan struct{}
d atomic.Bool
}

func newResetter() *resetter {
return &resetter{
C: make(chan struct{}),
}
}

func (r *resetter) Done() {
if r.d.CompareAndSwap(false, true) {
go func() {
time.Sleep(5 * time.Second)
close(r.C)
}()
}
}

func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration, reset *resetter) {
dp.pingHandlingLock.Lock()
defer dp.pingHandlingLock.Unlock()
logEvt := dp.log.Debug().Uint64("ping_id", pingID).Dur("duration", dur)
Expand All @@ -77,6 +97,7 @@ func (dp *dittoPinger) OnRespond(pingID uint64, dur time.Duration) {
dp.notRespondingSent = false
dp.pingFails = 0
dp.firstPingDone = true
reset.Done()
}

func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) {
Expand All @@ -89,7 +110,7 @@ func (dp *dittoPinger) OnTimeout(pingID uint64, sendNotResponding bool) {
}
}

func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage) {
func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout time.Duration, timeoutCount int, pingChan <-chan *IncomingRPCMessage, reset *resetter) {
var timerChan <-chan time.Time
var timer *time.Timer
if timeout > 0 {
Expand All @@ -98,7 +119,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
}
select {
case <-pingChan:
dp.OnRespond(pingID, time.Since(start))
dp.OnRespond(pingID, time.Since(start), reset)
if timer != nil && !timer.Stop() {
<-timer.C
}
Expand All @@ -115,7 +136,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
timeoutCount++
select {
case <-pingChan:
dp.OnRespond(pingID, time.Since(start))
dp.OnRespond(pingID, time.Since(start), reset)
return
case <-repingTickerChan:
if repingTickerTime < maxRepingTickerTime {
Expand All @@ -128,7 +149,7 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
Uint64("ping_id", subPingID).
Str("next_reping", repingTickerTime.String()).
Msg("Sending new ping")
dp.Ping(subPingID, defaultPingTimeout, timeoutCount)
dp.Ping(subPingID, defaultPingTimeout, timeoutCount, reset)
case <-dp.client.pingShortCircuit:
dp.pingHandlingLock.Lock()
dp.log.Debug().Uint64("ping_id", pingID).
Expand All @@ -140,16 +161,28 @@ func (dp *dittoPinger) WaitForResponse(pingID uint64, start time.Time, timeout t
dp.pingHandlingLock.Unlock()
case <-dp.stop:
return
case <-reset.C:
dp.log.Debug().
Uint64("ping_id", pingID).
Msg("Another ping was successful, giving up on this one")
return
}
}
case <-reset.C:
dp.log.Debug().
Uint64("ping_id", pingID).
Msg("Another ping was successful, giving up on this one")
if timer != nil && !timer.Stop() {
<-timer.C
}
case <-dp.stop:
if timer != nil && !timer.Stop() {
<-timer.C
}
}
}

func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int) {
func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount int, reset *resetter) {
dp.pingHandlingLock.Lock()
if time.Since(dp.lastPingTime) < minPingInterval {
dp.log.Debug().
Expand Down Expand Up @@ -177,9 +210,9 @@ func (dp *dittoPinger) Ping(pingID uint64, timeout time.Duration, timeoutCount i
}
dp.pingHandlingLock.Unlock()
if timeoutCount == 0 {
dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan)
dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan, reset)
} else {
go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan)
go dp.WaitForResponse(pingID, now, timeout, timeoutCount, pingChan, reset)
}
}

Expand All @@ -193,12 +226,12 @@ func (dp *dittoPinger) Loop() {
pingID := pingIDCounter.Add(1)
dp.log.Debug().Uint64("ping_id", pingID).Msg("Ditto ping wait short-circuited")
pingStart = time.Now()
dp.Ping(pingID, shortPingTimeout, 0)
dp.Ping(pingID, shortPingTimeout, 0, newResetter())
case <-dp.ping:
pingID := pingIDCounter.Add(1)
dp.log.Trace().Uint64("ping_id", pingID).Msg("Doing normal ditto ping")
pingStart = time.Now()
dp.Ping(pingID, defaultPingTimeout, 0)
dp.Ping(pingID, defaultPingTimeout, 0, newResetter())
case <-dp.stop:
return
}
Expand Down

0 comments on commit 074e239

Please sign in to comment.