From 6f3f544841a1bd0fed66e4264a391da162af4298 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 14 Sep 2023 10:30:24 +0100 Subject: [PATCH] Fix leaking timers in stream sources Signed-off-by: Neil Twigg --- server/stream.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/server/stream.go b/server/stream.go index 7290a8982b..d66f60620f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -248,7 +248,8 @@ type stream struct { mirror *sourceInfo // Sources - sources map[string]*sourceInfo + sources map[string]*sourceInfo + sourceRetries map[string]*time.Timer // Indicates we have direct consumers. directs int @@ -2765,12 +2766,24 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6 // Simply schedules setSourceConsumer at the given delay. // -// Does not require lock +// Lock held on entry func (mset *stream) scheduleSetSourceConsumerRetry(iname string, seq uint64, delay time.Duration, startTime time.Time) { - time.AfterFunc(delay, func() { + if mset.sourceRetries == nil { + mset.sourceRetries = map[string]*time.Timer{} + } + if t, ok := mset.sourceRetries[iname]; ok && !t.Stop() { + // It looks like the goroutine has started running but hasn't taken the + // stream lock yet (otherwise the map entry would be deleted). We had + // might as well let the running goroutine complete and schedule another + // timer only if it needs to. + return + } + mset.sourceRetries[iname] = time.AfterFunc(delay, func() { mset.mu.Lock() + defer mset.mu.Unlock() + + delete(mset.sourceRetries, iname) mset.setSourceConsumer(iname, seq, startTime) - mset.mu.Unlock() }) }