Skip to content

Commit

Permalink
Enable indepent subscope closing through io.Closer
Browse files Browse the repository at this point in the history
  • Loading branch information
mway committed Apr 19, 2021
1 parent ec291c9 commit aefc7b2
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 6 deletions.
43 changes: 40 additions & 3 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type scope struct {
closed atomic.Bool
done chan struct{}
wg sync.WaitGroup
root bool
}

// ScopeOptions is a set of options to construct a scope.
Expand Down Expand Up @@ -162,6 +163,7 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope {
sanitizer: sanitizer,
separator: sanitizer.Name(opts.Separator),
timers: make(map[string]*timer),
root: true,
}

// NB(r): Take a copy of the tags on creation
Expand Down Expand Up @@ -500,19 +502,54 @@ func (s *scope) Snapshot() Snapshot {
}

func (s *scope) Close() error {
// n.b. Once this flag is set, the next scope report will remove it from
// the registry and clear its metrics.
if !s.closed.CAS(false, true) {
return nil
}

close(s.done)
s.reportRegistry()

if closer, ok := s.baseReporter.(io.Closer); ok {
return closer.Close()
if s.root {
s.reportRegistry()
if closer, ok := s.baseReporter.(io.Closer); ok {
return closer.Close()
}
}

return nil
}

func (s *scope) clearMetrics() {
s.cm.Lock()
s.gm.Lock()
s.tm.Lock()
s.hm.Lock()
defer s.cm.Unlock()
defer s.gm.Unlock()
defer s.tm.Unlock()
defer s.hm.Unlock()

for k := range s.counters {
delete(s.counters, k)
}
s.countersSlice = nil

for k := range s.gauges {
delete(s.gauges, k)
}
s.gaugesSlice = nil

for k := range s.timers {
delete(s.timers, k)
}

for k := range s.histograms {
delete(s.histograms, k)
}
s.histogramsSlice = nil
}

// NB(prateek): We assume concatenation of sanitized inputs is
// sanitized. If that stops being true, then we need to sanitize the
// output of this function.
Expand Down
53 changes: 50 additions & 3 deletions scope_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,55 @@

package tally

import "sync"
import (
"sync"
)

var scopeRegistryKey = keyForPrefixedStringMaps

type scopeRegistry struct {
mu sync.RWMutex
root *scope
subscopes map[string]*scope
}

func newScopeRegistry(root *scope) *scopeRegistry {
r := &scopeRegistry{
root: root,
subscopes: make(map[string]*scope),
}
r.subscopes[scopeRegistryKey(root.prefix, root.tags)] = root
return r
}

func (r *scopeRegistry) Report(reporter StatsReporter) {
defer r.purgeIfRootClosed()
r.mu.RLock()
defer r.mu.RUnlock()

for _, s := range r.subscopes {
for name, s := range r.subscopes {
s.report(reporter)

if s.closed.Load() {
r.removeWithRLock(name)
s.clearMetrics()
}
}
}

func (r *scopeRegistry) CachedReport() {
defer r.purgeIfRootClosed()

r.mu.RLock()
defer r.mu.RUnlock()

for _, s := range r.subscopes {
for name, s := range r.subscopes {
s.cachedReport()

if s.closed.Load() {
r.removeWithRLock(name)
s.clearMetrics()
}
}
}

Expand All @@ -65,6 +82,10 @@ func (r *scopeRegistry) ForEachScope(f func(*scope)) {
}

func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]string) *scope {
if r.root.closed.Load() || parent.closed.Load() {
return NoopScope.(*scope)
}

key := scopeRegistryKey(prefix, parent.tags, tags)

r.mu.RLock()
Expand Down Expand Up @@ -103,6 +124,7 @@ func (r *scopeRegistry) Subscope(parent *scope, prefix string, tags map[string]s
histogramsSlice: make([]*histogram, 0, _defaultInitialSliceSize),
timers: make(map[string]*timer),
bucketCache: parent.bucketCache,
done: make(chan struct{}),
}
r.subscopes[key] = subscope
return subscope
Expand All @@ -112,3 +134,28 @@ func (r *scopeRegistry) lockedLookup(key string) (*scope, bool) {
ss, ok := r.subscopes[key]
return ss, ok
}

func (r *scopeRegistry) purgeIfRootClosed() {
if !r.root.closed.Load() {
return
}

r.mu.Lock()
defer r.mu.Unlock()

for k, s := range r.subscopes {
_ = s.Close()
s.clearMetrics()
delete(r.subscopes, k)
}
}

func (r *scopeRegistry) removeWithRLock(key string) {
// n.b. This function must lock the registry for writing and return it to an
// RLocked state prior to exiting. Defer order is important (LIFO).
r.mu.RUnlock()
defer r.mu.RLock()
r.mu.Lock()
defer r.mu.Unlock()
delete(r.subscopes, key)
}
90 changes: 90 additions & 0 deletions scope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,96 @@ func TestSubScope(t *testing.T) {
assert.Equal(t, tags, histograms["foo.mork.baz"].tags)
}

func TestSubScopeClose(t *testing.T) {
r := newTestStatsReporter()

rs, closer := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0)
// defer closer.Close()
_ = closer

var (
root = rs.(*scope)
s = root.SubScope("mork").(*scope)
rootCounter = root.Counter("foo")
subCounter = s.Counter("foo")
)

// Emit a metric from both scopes.
r.cg.Add(1)
rootCounter.Inc(1)
r.cg.Add(1)
subCounter.Inc(1)

// Verify that we got both metrics.
root.reportRegistry()
r.WaitAll()
counters := r.getCounters()
require.EqualValues(t, 1, counters["foo.foo"].val)
require.EqualValues(t, 1, counters["foo.mork.foo"].val)

// Close the subscope. We expect both metrics to still be reported, because
// we won't have reported the registry before we update the metrics.
require.NoError(t, s.Close())

// Create a subscope from the now-closed scope; it should nop.
ns := s.SubScope("foobar")
require.Equal(t, NoopScope, ns)

// Emit a metric from all scopes.
r.cg.Add(1)
rootCounter.Inc(2)
r.cg.Add(1)
subCounter.Inc(2)

// Verify that we still got both metrics.
root.reportLoopRun()
r.WaitAll()
counters = r.getCounters()
require.EqualValues(t, 2, counters["foo.foo"].val)
require.EqualValues(t, 2, counters["foo.mork.foo"].val)

// Emit a metric for both scopes. The root counter should succeed, and the
// subscope counter should not update what's in the reporter.
r.cg.Add(1)
rootCounter.Inc(3)
subCounter.Inc(3)
root.reportLoopRun()
r.WaitAll()
time.Sleep(time.Second) // since we can't wg.Add the non-reported counter

// We only expect the root scope counter; the subscope counter will be the
// value previously held by the reporter, because it has not been udpated.
counters = r.getCounters()
require.EqualValues(t, 3, counters["foo.foo"].val)
require.EqualValues(t, 2, counters["foo.mork.foo"].val)

// Ensure that we can double-close harmlessly.
require.NoError(t, s.Close())

// Create one more scope so that we can ensure it's defunct once the root is
// closed.
ns = root.SubScope("newscope")

// Close the root scope. We should not be able to emit any more metrics,
// because the root scope reports the registry prior to closing.
require.NoError(t, closer.Close())

ns.Counter("newcounter").Inc(1)
rootCounter.Inc(4)
root.registry.Report(r)
time.Sleep(time.Second) // since we can't wg.Add the non-reported counter

// We do not expect any updates.
counters = r.getCounters()
require.EqualValues(t, 3, counters["foo.foo"].val)
require.EqualValues(t, 2, counters["foo.mork.foo"].val)
_, found := counters["newscope.newcounter"]
require.False(t, found)

// Ensure that we can double-close harmlessly.
require.NoError(t, closer.Close())
}

func TestTaggedSubScope(t *testing.T) {
r := newTestStatsReporter()

Expand Down

0 comments on commit aefc7b2

Please sign in to comment.