Skip to content

Commit

Permalink
use sampler lock instead
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Couture-Beil <alex@earthly.dev>
  • Loading branch information
alexcb committed Jul 10, 2023
1 parent 4f0a1f8 commit 54a203f
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions executor/resources/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type WithTimestamp interface {
}

type Sampler[T WithTimestamp] struct {
mu sync.RWMutex
mu sync.Mutex
minInterval time.Duration
maxSamples int
callback func(ts time.Time) (T, error)
Expand All @@ -26,7 +26,6 @@ type Sub[T WithTimestamp] struct {
first time.Time
last time.Time
samples []T
mu sync.RWMutex
err error
}

Expand All @@ -35,8 +34,8 @@ func (s *Sub[T]) Close(captureLast bool) ([]T, error) {
delete(s.sampler.subs, s)
s.sampler.mu.Unlock()

s.mu.Lock()
defer s.mu.Unlock()
s.sampler.mu.Lock()
defer s.sampler.mu.Unlock()

if s.err != nil {
return nil, s.err
Expand Down Expand Up @@ -98,26 +97,26 @@ func (s *Sampler[T]) run() {
return
case <-ticker.C:
tm := time.Now()
s.mu.RLock()
s.mu.Lock()
active := make([]*Sub[T], 0, len(s.subs))
for ss := range s.subs {
ss.mu.Lock()
if tm.Sub(ss.last) < ss.interval {
ss.mu.Unlock()
continue
}
ss.last = tm
ss.mu.Unlock()
active = append(active, ss)
}
s.mu.RUnlock()
s.mu.Unlock()
ticker = time.NewTimer(s.minInterval)
if len(active) == 0 {
continue
}
value, err := s.callback(tm)
s.mu.Lock()
for _, ss := range active {
ss.mu.Lock()
if _, found := s.subs[ss]; !found {
continue // skip if Close() was called while the lock was released
}
if err != nil {
ss.err = err
} else {
Expand All @@ -128,8 +127,8 @@ func (s *Sampler[T]) run() {
if time.Duration(ss.interval)*time.Duration(s.maxSamples) <= dur {
ss.interval *= 2
}
ss.mu.Unlock()
}
s.mu.Unlock()
}
}
}
Expand Down

0 comments on commit 54a203f

Please sign in to comment.