diff --git a/executor/resources/sampler.go b/executor/resources/sampler.go index b2645cae0e25..38e94812da76 100644 --- a/executor/resources/sampler.go +++ b/executor/resources/sampler.go @@ -10,7 +10,7 @@ type WithTimestamp interface { } type Sampler[T WithTimestamp] struct { - mu sync.RWMutex + mu sync.Mutex minInterval time.Duration maxSamples int callback func(ts time.Time) (T, error) @@ -32,9 +32,9 @@ type Sub[T WithTimestamp] struct { func (s *Sub[T]) Close(captureLast bool) ([]T, error) { s.sampler.mu.Lock() delete(s.sampler.subs, s) - s.sampler.mu.Unlock() if s.err != nil { + s.sampler.mu.Unlock() return nil, s.err } current := s.first @@ -46,6 +46,7 @@ func (s *Sub[T]) Close(captureLast bool) ([]T, error) { current = ts } } + s.sampler.mu.Unlock() if captureLast { v, err := s.sampler.callback(time.Now()) @@ -94,8 +95,8 @@ func (s *Sampler[T]) run() { return case <-ticker.C: tm := time.Now() + s.mu.Lock() active := make([]*Sub[T], 0, len(s.subs)) - s.mu.RLock() for ss := range s.subs { if tm.Sub(ss.last) < ss.interval { continue @@ -103,13 +104,17 @@ func (s *Sampler[T]) run() { ss.last = tm active = append(active, ss) } - s.mu.RUnlock() + s.mu.Unlock() ticker = time.NewTimer(s.minInterval) if len(active) == 0 { continue } value, err := s.callback(tm) + s.mu.Lock() for _, ss := range active { + if _, found := s.subs[ss]; !found { + continue // skip if Close() was called while the lock was released + } if err != nil { ss.err = err } else { @@ -121,6 +126,7 @@ func (s *Sampler[T]) run() { ss.interval *= 2 } } + s.mu.Unlock() } } }