diff --git a/matcher/cache.go b/matcher/cache.go index 5bb58ee..2f4daef 100644 --- a/matcher/cache.go +++ b/matcher/cache.go @@ -24,7 +24,7 @@ import "github.com/m3db/m3metrics/rules" // Source is a datasource providing match results. type Source interface { - // Match returns the match result for an given id within time range + // Match returns the match result for a given id within time range // [fromNanos, toNanos). Match(id []byte, fromNanos, toNanos int64) rules.MatchResult } diff --git a/matcher/match.go b/matcher/match.go index 994a769..43cb2a5 100644 --- a/matcher/match.go +++ b/matcher/match.go @@ -42,7 +42,7 @@ type matcher struct { namespaceTag []byte defaultNamespace []byte - namespaces *namespaces + namespaces Namespaces cache Cache } @@ -50,17 +50,19 @@ type matcher struct { func NewMatcher(cache Cache, opts Options) (Matcher, error) { instrumentOpts := opts.InstrumentOptions() scope := instrumentOpts.MetricsScope() - namespacesScope := scope.SubScope("namespaces") - namespacesOpts := opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(namespacesScope)) + iOpts := instrumentOpts.SetMetricsScope(scope.SubScope("namespaces")) + namespacesOpts := opts.SetInstrumentOptions(iOpts). + SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) { + cache.Register(namespace, ruleSet) + }). + SetOnNamespaceRemovedFn(func(namespace []byte) { + cache.Unregister(namespace) + }). + SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) { + cache.Register(namespace, ruleSet) + }) key := opts.NamespacesKey() - namespaces := newNamespaces(key, cache, namespacesOpts) - // NB(xichen): if there is no cache provided, the newly created - // namespaces object is used as a zero-size forwarding cache that - // forwards all match requests to the backing rulesets. - if cache == nil { - cache = namespaces - namespaces.setCache(namespaces) - } + namespaces := NewNamespaces(key, namespacesOpts) if err := namespaces.Watch(); err != nil { errCreateWatch, ok := err.(runtime.CreateWatchError) if ok { diff --git a/matcher/match_test.go b/matcher/match_test.go index 7c823f7..cbedaca 100644 --- a/matcher/match_test.go +++ b/matcher/match_test.go @@ -72,7 +72,7 @@ func TestMatcherMatchDoesNotExist(t *testing.T) { tagValueFn: func(tagName []byte) ([]byte, bool) { return nil, false }, } now := time.Now() - matcher := testMatcher(t, nil) + matcher := testMatcher(t, newMemCache()) require.Equal(t, rules.EmptyMatchResult, matcher.Match(id, now.UnixNano(), now.UnixNano())) } diff --git a/matcher/namespaces.go b/matcher/namespaces.go index 63bc4a5..1304091 100644 --- a/matcher/namespaces.go +++ b/matcher/namespaces.go @@ -41,6 +41,22 @@ var ( errNilValue = errors.New("nil value received") ) +// Namespaces manages runtime updates to registered namespaces and provides +// API to match metic ids against rules in the corresponding namespaces. +type Namespaces interface { + runtime.Value + + // Version returns the current version for a give namespace. + Version(namespace []byte) int + + // Match returns the matching policies for a given id in a given namespace + // between [fromNanos, toNanos). + Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult + + // Close closes the namespaces. + Close() +} + type namespacesMetrics struct { notExists tally.Counter added tally.Counter @@ -62,41 +78,41 @@ func newNamespacesMetrics(scope tally.Scope) namespacesMetrics { } // namespaces contains the list of namespace users have defined rules for. -// NB(xichen): namespaces implements the Cache interface and can be used as -// a zero-size forwarding cache that forwards all match requests to the backing -// rulesets without caching any match results. type namespaces struct { sync.RWMutex runtime.Value - key string - store kv.Store - cache Cache - opts Options - log xlog.Logger - ruleSetKeyFn RuleSetKeyFn - nowFn clock.NowFn - matchRangePast time.Duration + key string + store kv.Store + opts Options + nowFn clock.NowFn + log xlog.Logger + ruleSetKeyFn RuleSetKeyFn + matchRangePast time.Duration + onNamespaceAddedFn OnNamespaceAddedFn + onNamespaceRemovedFn OnNamespaceRemovedFn proto *schema.Namespaces - rules map[xid.Hash]*ruleSet + rules map[xid.Hash]RuleSet metrics namespacesMetrics } -func newNamespaces(key string, cache Cache, opts Options) *namespaces { +// NewNamespaces creates a new namespaces object. +func NewNamespaces(key string, opts Options) Namespaces { instrumentOpts := opts.InstrumentOptions() n := &namespaces{ - key: key, - store: opts.KVStore(), - cache: cache, - opts: opts, - log: instrumentOpts.Logger(), - ruleSetKeyFn: opts.RuleSetKeyFn(), - nowFn: opts.ClockOptions().NowFn(), - matchRangePast: opts.MatchRangePast(), - proto: &schema.Namespaces{}, - rules: make(map[xid.Hash]*ruleSet), - metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + key: key, + store: opts.KVStore(), + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + log: instrumentOpts.Logger(), + ruleSetKeyFn: opts.RuleSetKeyFn(), + matchRangePast: opts.MatchRangePast(), + onNamespaceAddedFn: opts.OnNamespaceAddedFn(), + onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), + proto: &schema.Namespaces{}, + rules: make(map[xid.Hash]RuleSet), + metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), } valueOpts := runtime.NewOptions(). SetInstrumentOptions(instrumentOpts). @@ -108,6 +124,18 @@ func newNamespaces(key string, cache Cache, opts Options) *namespaces { return n } +func (n *namespaces) Version(namespace []byte) int { + nsHash := xid.HashFn(namespace) + n.RLock() + ruleSet, exists := n.rules[nsHash] + if !exists { + n.RUnlock() + return kv.UninitializedVersion + } + n.RUnlock() + return ruleSet.Version() +} + func (n *namespaces) Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult { var ( res = rules.EmptyMatchResult @@ -124,10 +152,7 @@ func (n *namespaces) Match(namespace, id []byte, fromNanos, toNanos int64) rules return ruleSet.Match(id, fromNanos, toNanos) } -func (n *namespaces) Register([]byte, Source) {} -func (n *namespaces) Unregister([]byte) {} - -func (n *namespaces) Close() error { +func (n *namespaces) Close() { // NB(xichen): we stop watching the value outside lock because otherwise we might // be holding the namespace lock while attempting to acquire the value lock, and // the updating goroutine might be holding the value lock and attempting to @@ -139,13 +164,6 @@ func (n *namespaces) Close() error { rs.Unwatch() } n.RUnlock() - return nil -} - -func (n *namespaces) setCache(cache Cache) { - n.Lock() - n.cache = cache - n.Unlock() } func (n *namespaces) toNamespaces(value kv.Value) (interface{}, error) { @@ -184,7 +202,7 @@ func (n *namespaces) process(value interface{}) error { ruleSetScope := instrumentOpts.MetricsScope().SubScope("ruleset") ruleSetOpts := n.opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(ruleSetScope)) ruleSetKey := n.ruleSetKeyFn(ns.Name()) - ruleSet = newRuleSet(nsName, ruleSetKey, n.cache, ruleSetOpts) + ruleSet = newRuleSet(nsName, ruleSetKey, ruleSetOpts) n.rules[nsHash] = ruleSet n.metrics.added.Inc(1) } @@ -218,8 +236,8 @@ func (n *namespaces) process(value interface{}) error { } } - if !exists { - n.cache.Register(nsName, ruleSet) + if !exists && n.onNamespaceAddedFn != nil { + n.onNamespaceAddedFn(nsName, ruleSet) } } @@ -231,7 +249,9 @@ func (n *namespaces) process(value interface{}) error { // Process the namespaces not in the incoming update. earliest := n.nowFn().Add(-n.matchRangePast) if ruleSet.Tombstoned() && ruleSet.CutoverNanos() <= earliest.UnixNano() { - n.cache.Unregister(ruleSet.Namespace()) + if n.onNamespaceRemovedFn != nil { + n.onNamespaceRemovedFn(ruleSet.Namespace()) + } delete(n.rules, nsHash) ruleSet.Unwatch() n.metrics.unwatched.Inc(1) diff --git a/matcher/namespaces_test.go b/matcher/namespaces_test.go index e7d7f5a..eb57220 100644 --- a/matcher/namespaces_test.go +++ b/matcher/namespaces_test.go @@ -117,7 +117,7 @@ func TestNamespacesProcess(t *testing.T) { {namespace: []byte("catNs"), id: "cat", version: 3, tombstoned: true}, {namespace: []byte("lolNs"), id: "lol", version: 3, tombstoned: true}, } { - rs := newRuleSet(input.namespace, input.id, cache, opts) + rs := newRuleSet(input.namespace, input.id, opts).(*ruleSet) rs.Value = &mockRuntimeValue{key: input.id} rs.version = input.version rs.tombstoned = input.tombstoned @@ -201,10 +201,11 @@ func TestNamespacesProcess(t *testing.T) { } for i, ns := range update.Namespaces { rs, exists := nss.rules[xid.HashFn([]byte(ns.Name))] + ruleSet := rs.(*ruleSet) require.True(t, exists) require.Equal(t, expected[i].key, rs.Key()) - require.Equal(t, rs, c.namespaces[string(ns.Name)].source.(*ruleSet)) - mv, ok := rs.Value.(*mockRuntimeValue) + require.Equal(t, ruleSet, c.namespaces[string(ns.Name)].source) + mv, ok := ruleSet.Value.(*mockRuntimeValue) if !ok { continue } @@ -219,8 +220,17 @@ func testNamespaces() (kv.Store, Cache, *namespaces, Options) { opts := NewOptions(). SetInitWatchTimeout(100 * time.Millisecond). SetKVStore(store). - SetNamespacesKey(testNamespacesKey) - return store, cache, newNamespaces(testNamespacesKey, cache, opts), opts + SetNamespacesKey(testNamespacesKey). + SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) { + cache.Register(namespace, ruleSet) + }). + SetOnNamespaceRemovedFn(func(namespace []byte) { + cache.Unregister(namespace) + }). + SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) { + cache.Register(namespace, ruleSet) + }) + return store, cache, NewNamespaces(testNamespacesKey, opts).(*namespaces), opts } type memResults struct { diff --git a/matcher/options.go b/matcher/options.go index bc4834b..9478499 100644 --- a/matcher/options.go +++ b/matcher/options.go @@ -45,13 +45,18 @@ var ( defaultDefaultNamespace = []byte("defaultNamespace") ) -func defaultRuleSetKeyFn(namespace []byte) string { - return fmt.Sprintf(defaultRuleSetKeyFormat, namespace) -} - // RuleSetKeyFn generates the ruleset key for a given namespace. type RuleSetKeyFn func(namespace []byte) string +// OnNamespaceAddedFn is called when a namespace is added. +type OnNamespaceAddedFn func(namespace []byte, ruleSet RuleSet) + +// OnNamespaceRemovedFn is called when a namespace is removed. +type OnNamespaceRemovedFn func(namespace []byte) + +// OnRuleSetUpdatedFn is called when a ruleset is updated. +type OnRuleSetUpdatedFn func(namespace []byte, ruleSet RuleSet) + // Options provide a set of options for the msgpack-based reporter. type Options interface { // SetClockOptions sets the clock options. @@ -113,19 +118,40 @@ type Options interface { // MatchRangePast returns the limit on the earliest time eligible for rule matching. MatchRangePast() time.Duration + + // SetOnNamespaceAddedFn sets the function to be called when a namespace is added. + SetOnNamespaceAddedFn(value OnNamespaceAddedFn) Options + + // OnNamespaceAddedFn returns the function to be called when a namespace is added. + OnNamespaceAddedFn() OnNamespaceAddedFn + + // SetOnNamespaceRemovedFn sets the function to be called when a namespace is removed. + SetOnNamespaceRemovedFn(value OnNamespaceRemovedFn) Options + + // OnNamespaceRemovedFn returns the function to be called when a namespace is removed. + OnNamespaceRemovedFn() OnNamespaceRemovedFn + + // SetOnRuleSetUpdatedFn sets the function to be called when a ruleset is updated. + SetOnRuleSetUpdatedFn(value OnRuleSetUpdatedFn) Options + + // OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated. + OnRuleSetUpdatedFn() OnRuleSetUpdatedFn } type options struct { - clockOpts clock.Options - instrumentOpts instrument.Options - ruleSetOpts rules.Options - initWatchTimeout time.Duration - kvStore kv.Store - namespacesKey string - ruleSetKeyFn RuleSetKeyFn - namespaceTag []byte - defaultNamespace []byte - matchRangePast time.Duration + clockOpts clock.Options + instrumentOpts instrument.Options + ruleSetOpts rules.Options + initWatchTimeout time.Duration + kvStore kv.Store + namespacesKey string + ruleSetKeyFn RuleSetKeyFn + namespaceTag []byte + defaultNamespace []byte + matchRangePast time.Duration + onNamespaceAddedFn OnNamespaceAddedFn + onNamespaceRemovedFn OnNamespaceRemovedFn + onRuleSetUpdatedFn OnRuleSetUpdatedFn } // NewOptions creates a new set of options. @@ -243,3 +269,37 @@ func (o *options) SetMatchRangePast(value time.Duration) Options { func (o *options) MatchRangePast() time.Duration { return o.matchRangePast } + +func (o *options) SetOnNamespaceAddedFn(value OnNamespaceAddedFn) Options { + opts := *o + opts.onNamespaceAddedFn = value + return &opts +} + +func (o *options) OnNamespaceAddedFn() OnNamespaceAddedFn { + return o.onNamespaceAddedFn +} + +func (o *options) SetOnNamespaceRemovedFn(value OnNamespaceRemovedFn) Options { + opts := *o + opts.onNamespaceRemovedFn = value + return &opts +} + +func (o *options) OnNamespaceRemovedFn() OnNamespaceRemovedFn { + return o.onNamespaceRemovedFn +} + +func (o *options) SetOnRuleSetUpdatedFn(value OnRuleSetUpdatedFn) Options { + opts := *o + opts.onRuleSetUpdatedFn = value + return &opts +} + +func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn { + return o.onRuleSetUpdatedFn +} + +func defaultRuleSetKeyFn(namespace []byte) string { + return fmt.Sprintf(defaultRuleSetKeyFormat, namespace) +} diff --git a/matcher/ruleset.go b/matcher/ruleset.go index e14fe24..69c71b6 100644 --- a/matcher/ruleset.go +++ b/matcher/ruleset.go @@ -34,6 +34,25 @@ import ( "github.com/uber-go/tally" ) +// RuleSet manages runtime updates to registered rules and provides +// API to match metic ids against rules in the corresponding ruleset. +type RuleSet interface { + runtime.Value + Source + + // Namespace returns the namespace of the ruleset. + Namespace() []byte + + // Version returns the current version of the ruleset. + Version() int + + // CutoverNanos returns the cutover time of the ruleset. + CutoverNanos() int64 + + // Tombstoned returns whether the ruleset is tombstoned. + Tombstoned() bool +} + type ruleSetMetrics struct { match instrument.MethodMetrics nilMatcher tally.Counter @@ -53,14 +72,14 @@ type ruleSet struct { sync.RWMutex runtime.Value - namespace []byte - key string - cache Cache - store kv.Store - opts Options - nowFn clock.NowFn - matchRangePast time.Duration - ruleSetOpts rules.Options + namespace []byte + key string + store kv.Store + opts Options + nowFn clock.NowFn + matchRangePast time.Duration + ruleSetOpts rules.Options + onRuleSetUpdatedFn OnRuleSetUpdatedFn proto *schema.RuleSet version int @@ -73,22 +92,21 @@ type ruleSet struct { func newRuleSet( namespace []byte, key string, - cache Cache, opts Options, -) *ruleSet { +) RuleSet { instrumentOpts := opts.InstrumentOptions() r := &ruleSet{ - namespace: namespace, - key: key, - cache: cache, - store: opts.KVStore(), - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - matchRangePast: opts.MatchRangePast(), - ruleSetOpts: opts.RuleSetOptions(), - proto: &schema.RuleSet{}, - version: kv.UninitializedVersion, - metrics: newRuleSetMetrics(instrumentOpts.MetricsScope(), instrumentOpts.MetricsSamplingRate()), + namespace: namespace, + key: key, + store: opts.KVStore(), + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + matchRangePast: opts.MatchRangePast(), + ruleSetOpts: opts.RuleSetOptions(), + onRuleSetUpdatedFn: opts.OnRuleSetUpdatedFn(), + proto: &schema.RuleSet{}, + version: kv.UninitializedVersion, + metrics: newRuleSetMetrics(instrumentOpts.MetricsScope(), instrumentOpts.MetricsSamplingRate()), } valueOpts := runtime.NewOptions(). SetInstrumentOptions(opts.InstrumentOptions()). @@ -166,7 +184,9 @@ func (r *ruleSet) process(value interface{}) error { r.cutoverNanos = ruleSet.CutoverNanos() r.tombstoned = ruleSet.TombStoned() r.matcher = ruleSet.ActiveSet(r.nowFn().Add(-r.matchRangePast)) - r.cache.Register(r.namespace, r) + if r.onRuleSetUpdatedFn != nil { + r.onRuleSetUpdatedFn(r.namespace, r) + } r.metrics.updated.Inc(1) return nil } diff --git a/matcher/ruleset_test.go b/matcher/ruleset_test.go index 3802af7..c2328b3 100644 --- a/matcher/ruleset_test.go +++ b/matcher/ruleset_test.go @@ -173,6 +173,7 @@ func testRuleSet() (kv.Store, Cache, *ruleSet, Options) { opts := NewOptions(). SetInitWatchTimeout(100 * time.Millisecond). SetKVStore(store). - SetRuleSetKeyFn(func(ns []byte) string { return fmt.Sprintf("/rules/%s", ns) }) - return store, cache, newRuleSet(testNamespace, testNamespacesKey, cache, opts), opts + SetRuleSetKeyFn(func(ns []byte) string { return fmt.Sprintf("/rules/%s", ns) }). + SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) { cache.Register(namespace, ruleSet) }) + return store, cache, newRuleSet(testNamespace, testNamespacesKey, opts).(*ruleSet), opts }