Description
Stats.Close() in sink/stats.go stops two of its three dmetrics.AvgRate* rate objects but omits the third:
func (s *Stats) Close() {
s.dataMsgRate.SyncNow()
s.undoMsgRate.SyncNow()
s.LogNow()
s.Shutdown(nil)
s.dataMsgRate.Stop() // ✓
s.undoMsgRate.Stop() // ✓
// s.progressBlockRate.Stop() ← missing
}
Each dmetrics.AvgRate* object spawns an internal janitor goroutine on construction. Without Stop(), that goroutine leaks for the process lifetime.
Impact
In code paths that repeatedly construct and tear down a Sinker — e.g., a continuous-mode loop that builds a fresh Sinker each collection interval — this produces one leaked goroutine per iteration. Over days of runtime with an hourly interval, this is hundreds of leaked goroutines, clearly visible as a linear slope in go_goroutines metrics.
Reproduction
func TestLeak(t *testing.T) {
before := runtime.NumGoroutine()
for range 50 {
s := newStats(zap.NewNop())
s.Close()
}
runtime.GC()
time.Sleep(100 * time.Millisecond)
after := runtime.NumGoroutine()
// without fix: after - before == 50
// with fix: after - before ≈ 0
}
Fix
Add the missing line to Stats.Close():
s.progressBlockRate.Stop()
The same bug existed in the now-deprecated streamingfast/substreams-sink package and was carried over when the sink package was ported into this repo.
Description
Stats.Close()insink/stats.gostops two of its threedmetrics.AvgRate*rate objects but omits the third:Each
dmetrics.AvgRate*object spawns an internal janitor goroutine on construction. WithoutStop(), that goroutine leaks for the process lifetime.Impact
In code paths that repeatedly construct and tear down a
Sinker— e.g., a continuous-mode loop that builds a freshSinkereach collection interval — this produces one leaked goroutine per iteration. Over days of runtime with an hourly interval, this is hundreds of leaked goroutines, clearly visible as a linear slope ingo_goroutinesmetrics.Reproduction
Fix
Add the missing line to
Stats.Close():The same bug existed in the now-deprecated
streamingfast/substreams-sinkpackage and was carried over when thesinkpackage was ported into this repo.