Skip to content

Commit

Permalink
stop arbiter if all meters are stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Sep 10, 2019
1 parent cac0b30 commit 808e8fc
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 24 deletions.
57 changes: 41 additions & 16 deletions meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,7 @@ func NewMeter() Meter {
return NilMeter{}
}
m := newStandardMeter()
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
m.startArbiter()
return m
}

Expand Down Expand Up @@ -145,9 +139,7 @@ func newStandardMeter() *StandardMeter {
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *StandardMeter) Stop() {
if atomic.CompareAndSwapUint32(&m.stopped, 0, 1) {
arbiter.Lock()
delete(arbiter.meters, m)
arbiter.Unlock()
m.stopArbiter()
}
}

Expand All @@ -156,7 +148,7 @@ func (m *StandardMeter) Count() int64 {
return atomic.LoadInt64(&m.snapshot.count)
}

// Mark records the occurance of n events.
// Mark records the occurrance of n events.
func (m *StandardMeter) Mark(n int64) {
if atomic.LoadUint32(&m.stopped) == 1 {
return
Expand Down Expand Up @@ -221,23 +213,54 @@ func (m *StandardMeter) tick() {
m.updateSnapshot()
}

func (m *StandardMeter) startArbiter() {
arbiter.Lock()
defer arbiter.Unlock()
arbiter.meters[m] = struct{}{}
if !arbiter.started {
arbiter.started = true
go arbiter.tick()
}
}

func (m *StandardMeter) stopArbiter() {
arbiter.Lock()
defer arbiter.Unlock()
delete(arbiter.meters, m)
if len(arbiter.meters) == 0 && arbiter.started {
arbiter.cancel <- struct{}{}
arbiter.started = false
}
}

// meterArbiter ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterArbiter struct {
sync.RWMutex
started bool
meters map[*StandardMeter]struct{}
ticker *time.Ticker
started bool
meters map[*StandardMeter]struct{}
cancel chan struct{}
interval time.Duration
}

var arbiter = meterArbiter{ticker: time.NewTicker(5e9), meters: make(map[*StandardMeter]struct{})}
func newArbiter(d time.Duration) *meterArbiter {
return &meterArbiter{
meters: make(map[*StandardMeter]struct{}),
cancel: make(chan struct{}),
interval: d,
}
}

// Ticks meters on the scheduled interval
func (ma *meterArbiter) tick() {
ticker := time.NewTicker(ma.interval)
defer ticker.Stop()
for {
select {
case <-ma.ticker.C:
case <-ticker.C:
ma.tickMeters()
case <-ma.cancel:
return
}
}
}
Expand All @@ -249,3 +272,5 @@ func (ma *meterArbiter) tickMeters() {
meter.tick()
}
}

var arbiter = newArbiter(5 * time.Second)
10 changes: 2 additions & 8 deletions meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ func BenchmarkMeterParallel(b *testing.B) {
// exercise race detector
func TestMeterConcurrency(t *testing.T) {
rand.Seed(time.Now().Unix())
ma := meterArbiter{
ticker: time.NewTicker(time.Millisecond),
meters: make(map[*StandardMeter]struct{}),
}
ma := newArbiter(time.Millisecond)
m := newStandardMeter()
ma.meters[m] = struct{}{}
go ma.tick()
Expand Down Expand Up @@ -61,10 +58,7 @@ func TestGetOrRegisterMeter(t *testing.T) {
}

func TestMeterDecay(t *testing.T) {
ma := meterArbiter{
ticker: time.NewTicker(time.Millisecond),
meters: make(map[*StandardMeter]struct{}),
}
ma := newArbiter(time.Millisecond)
m := newStandardMeter()
ma.meters[m] = struct{}{}
go ma.tick()
Expand Down

0 comments on commit 808e8fc

Please sign in to comment.