Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Cleaner APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 19, 2017
1 parent 88a4604 commit 13a7bda
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 95 deletions.
2 changes: 1 addition & 1 deletion matcher/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import "github.com/m3db/m3metrics/rules"

// Source is a datasource providing match results.
type Source interface {
// Match returns the match result for an given id within time range
// Match returns the match result for a given id within time range
// [fromNanos, toNanos).
Match(id []byte, fromNanos, toNanos int64) rules.MatchResult
}
Expand Down
24 changes: 13 additions & 11 deletions matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,27 @@ type matcher struct {
namespaceTag []byte
defaultNamespace []byte

namespaces *namespaces
namespaces Namespaces
cache Cache
}

// NewMatcher creates a new rule matcher.
func NewMatcher(cache Cache, opts Options) (Matcher, error) {
instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope()
namespacesScope := scope.SubScope("namespaces")
namespacesOpts := opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(namespacesScope))
iOpts := instrumentOpts.SetMetricsScope(scope.SubScope("namespaces"))
namespacesOpts := opts.SetInstrumentOptions(iOpts).
SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
}).
SetOnNamespaceRemovedFn(func(namespace []byte) {
cache.Unregister(namespace)
}).
SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
})
key := opts.NamespacesKey()
namespaces := newNamespaces(key, cache, namespacesOpts)
// NB(xichen): if there is no cache provided, the newly created
// namespaces object is used as a zero-size forwarding cache that
// forwards all match requests to the backing rulesets.
if cache == nil {
cache = namespaces
namespaces.setCache(namespaces)
}
namespaces := NewNamespaces(key, namespacesOpts)
if err := namespaces.Watch(); err != nil {
errCreateWatch, ok := err.(runtime.CreateWatchError)
if ok {
Expand Down
2 changes: 1 addition & 1 deletion matcher/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestMatcherMatchDoesNotExist(t *testing.T) {
tagValueFn: func(tagName []byte) ([]byte, bool) { return nil, false },
}
now := time.Now()
matcher := testMatcher(t, nil)
matcher := testMatcher(t, newMemCache())
require.Equal(t, rules.EmptyMatchResult, matcher.Match(id, now.UnixNano(), now.UnixNano()))
}

Expand Down
98 changes: 59 additions & 39 deletions matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ var (
errNilValue = errors.New("nil value received")
)

// Namespaces manages runtime updates to registered namespaces and provides
// API to match metic ids against rules in the corresponding namespaces.
type Namespaces interface {
runtime.Value

// Version returns the current version for a give namespace.
Version(namespace []byte) int

// Match returns the matching policies for a given id in a given namespace
// between [fromNanos, toNanos).
Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult

// Close closes the namespaces.
Close()
}

type namespacesMetrics struct {
notExists tally.Counter
added tally.Counter
Expand All @@ -62,41 +78,41 @@ func newNamespacesMetrics(scope tally.Scope) namespacesMetrics {
}

// namespaces contains the list of namespace users have defined rules for.
// NB(xichen): namespaces implements the Cache interface and can be used as
// a zero-size forwarding cache that forwards all match requests to the backing
// rulesets without caching any match results.
type namespaces struct {
sync.RWMutex
runtime.Value

key string
store kv.Store
cache Cache
opts Options
log xlog.Logger
ruleSetKeyFn RuleSetKeyFn
nowFn clock.NowFn
matchRangePast time.Duration
key string
store kv.Store
opts Options
nowFn clock.NowFn
log xlog.Logger
ruleSetKeyFn RuleSetKeyFn
matchRangePast time.Duration
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn

proto *schema.Namespaces
rules map[xid.Hash]*ruleSet
rules map[xid.Hash]RuleSet
metrics namespacesMetrics
}

func newNamespaces(key string, cache Cache, opts Options) *namespaces {
// NewNamespaces creates a new namespaces object.
func NewNamespaces(key string, opts Options) Namespaces {
instrumentOpts := opts.InstrumentOptions()
n := &namespaces{
key: key,
store: opts.KVStore(),
cache: cache,
opts: opts,
log: instrumentOpts.Logger(),
ruleSetKeyFn: opts.RuleSetKeyFn(),
nowFn: opts.ClockOptions().NowFn(),
matchRangePast: opts.MatchRangePast(),
proto: &schema.Namespaces{},
rules: make(map[xid.Hash]*ruleSet),
metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()),
key: key,
store: opts.KVStore(),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
log: instrumentOpts.Logger(),
ruleSetKeyFn: opts.RuleSetKeyFn(),
matchRangePast: opts.MatchRangePast(),
onNamespaceAddedFn: opts.OnNamespaceAddedFn(),
onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(),
proto: &schema.Namespaces{},
rules: make(map[xid.Hash]RuleSet),
metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()),
}
valueOpts := runtime.NewOptions().
SetInstrumentOptions(instrumentOpts).
Expand All @@ -108,6 +124,18 @@ func newNamespaces(key string, cache Cache, opts Options) *namespaces {
return n
}

func (n *namespaces) Version(namespace []byte) int {
nsHash := xid.HashFn(namespace)
n.RLock()
ruleSet, exists := n.rules[nsHash]
if !exists {
n.RUnlock()
return kv.UninitializedVersion
}
n.RUnlock()
return ruleSet.Version()
}

func (n *namespaces) Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
var (
res = rules.EmptyMatchResult
Expand All @@ -124,10 +152,7 @@ func (n *namespaces) Match(namespace, id []byte, fromNanos, toNanos int64) rules
return ruleSet.Match(id, fromNanos, toNanos)
}

func (n *namespaces) Register([]byte, Source) {}
func (n *namespaces) Unregister([]byte) {}

func (n *namespaces) Close() error {
func (n *namespaces) Close() {
// NB(xichen): we stop watching the value outside lock because otherwise we might
// be holding the namespace lock while attempting to acquire the value lock, and
// the updating goroutine might be holding the value lock and attempting to
Expand All @@ -139,13 +164,6 @@ func (n *namespaces) Close() error {
rs.Unwatch()
}
n.RUnlock()
return nil
}

func (n *namespaces) setCache(cache Cache) {
n.Lock()
n.cache = cache
n.Unlock()
}

func (n *namespaces) toNamespaces(value kv.Value) (interface{}, error) {
Expand Down Expand Up @@ -184,7 +202,7 @@ func (n *namespaces) process(value interface{}) error {
ruleSetScope := instrumentOpts.MetricsScope().SubScope("ruleset")
ruleSetOpts := n.opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(ruleSetScope))
ruleSetKey := n.ruleSetKeyFn(ns.Name())
ruleSet = newRuleSet(nsName, ruleSetKey, n.cache, ruleSetOpts)
ruleSet = newRuleSet(nsName, ruleSetKey, ruleSetOpts)
n.rules[nsHash] = ruleSet
n.metrics.added.Inc(1)
}
Expand Down Expand Up @@ -218,8 +236,8 @@ func (n *namespaces) process(value interface{}) error {
}
}

if !exists {
n.cache.Register(nsName, ruleSet)
if !exists && n.onNamespaceAddedFn != nil {
n.onNamespaceAddedFn(nsName, ruleSet)
}
}

Expand All @@ -231,7 +249,9 @@ func (n *namespaces) process(value interface{}) error {
// Process the namespaces not in the incoming update.
earliest := n.nowFn().Add(-n.matchRangePast)
if ruleSet.Tombstoned() && ruleSet.CutoverNanos() <= earliest.UnixNano() {
n.cache.Unregister(ruleSet.Namespace())
if n.onNamespaceRemovedFn != nil {
n.onNamespaceRemovedFn(ruleSet.Namespace())
}
delete(n.rules, nsHash)
ruleSet.Unwatch()
n.metrics.unwatched.Inc(1)
Expand Down
20 changes: 15 additions & 5 deletions matcher/namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestNamespacesProcess(t *testing.T) {
{namespace: []byte("catNs"), id: "cat", version: 3, tombstoned: true},
{namespace: []byte("lolNs"), id: "lol", version: 3, tombstoned: true},
} {
rs := newRuleSet(input.namespace, input.id, cache, opts)
rs := newRuleSet(input.namespace, input.id, opts).(*ruleSet)
rs.Value = &mockRuntimeValue{key: input.id}
rs.version = input.version
rs.tombstoned = input.tombstoned
Expand Down Expand Up @@ -201,10 +201,11 @@ func TestNamespacesProcess(t *testing.T) {
}
for i, ns := range update.Namespaces {
rs, exists := nss.rules[xid.HashFn([]byte(ns.Name))]
ruleSet := rs.(*ruleSet)
require.True(t, exists)
require.Equal(t, expected[i].key, rs.Key())
require.Equal(t, rs, c.namespaces[string(ns.Name)].source.(*ruleSet))
mv, ok := rs.Value.(*mockRuntimeValue)
require.Equal(t, ruleSet, c.namespaces[string(ns.Name)].source)
mv, ok := ruleSet.Value.(*mockRuntimeValue)
if !ok {
continue
}
Expand All @@ -219,8 +220,17 @@ func testNamespaces() (kv.Store, Cache, *namespaces, Options) {
opts := NewOptions().
SetInitWatchTimeout(100 * time.Millisecond).
SetKVStore(store).
SetNamespacesKey(testNamespacesKey)
return store, cache, newNamespaces(testNamespacesKey, cache, opts), opts
SetNamespacesKey(testNamespacesKey).
SetOnNamespaceAddedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
}).
SetOnNamespaceRemovedFn(func(namespace []byte) {
cache.Unregister(namespace)
}).
SetOnRuleSetUpdatedFn(func(namespace []byte, ruleSet RuleSet) {
cache.Register(namespace, ruleSet)
})
return store, cache, NewNamespaces(testNamespacesKey, opts).(*namespaces), opts
}

type memResults struct {
Expand Down
88 changes: 74 additions & 14 deletions matcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ var (
defaultDefaultNamespace = []byte("defaultNamespace")
)

func defaultRuleSetKeyFn(namespace []byte) string {
return fmt.Sprintf(defaultRuleSetKeyFormat, namespace)
}

// RuleSetKeyFn generates the ruleset key for a given namespace.
type RuleSetKeyFn func(namespace []byte) string

// OnNamespaceAddedFn is called when a namespace is added.
type OnNamespaceAddedFn func(namespace []byte, ruleSet RuleSet)

// OnNamespaceRemovedFn is called when a namespace is removed.
type OnNamespaceRemovedFn func(namespace []byte)

// OnRuleSetUpdatedFn is called when a ruleset is updated.
type OnRuleSetUpdatedFn func(namespace []byte, ruleSet RuleSet)

// Options provide a set of options for the msgpack-based reporter.
type Options interface {
// SetClockOptions sets the clock options.
Expand Down Expand Up @@ -113,19 +118,40 @@ type Options interface {

// MatchRangePast returns the limit on the earliest time eligible for rule matching.
MatchRangePast() time.Duration

// SetOnNamespaceAddedFn sets the function to be called when a namespace is added.
SetOnNamespaceAddedFn(value OnNamespaceAddedFn) Options

// OnNamespaceAddedFn returns the function to be called when a namespace is added.
OnNamespaceAddedFn() OnNamespaceAddedFn

// SetOnNamespaceRemovedFn sets the function to be called when a namespace is removed.
SetOnNamespaceRemovedFn(value OnNamespaceRemovedFn) Options

// OnNamespaceRemovedFn returns the function to be called when a namespace is removed.
OnNamespaceRemovedFn() OnNamespaceRemovedFn

// SetOnRuleSetUpdatedFn sets the function to be called when a ruleset is updated.
SetOnRuleSetUpdatedFn(value OnRuleSetUpdatedFn) Options

// OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated.
OnRuleSetUpdatedFn() OnRuleSetUpdatedFn
}

type options struct {
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
initWatchTimeout time.Duration
kvStore kv.Store
namespacesKey string
ruleSetKeyFn RuleSetKeyFn
namespaceTag []byte
defaultNamespace []byte
matchRangePast time.Duration
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
initWatchTimeout time.Duration
kvStore kv.Store
namespacesKey string
ruleSetKeyFn RuleSetKeyFn
namespaceTag []byte
defaultNamespace []byte
matchRangePast time.Duration
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn
onRuleSetUpdatedFn OnRuleSetUpdatedFn
}

// NewOptions creates a new set of options.
Expand Down Expand Up @@ -243,3 +269,37 @@ func (o *options) SetMatchRangePast(value time.Duration) Options {
func (o *options) MatchRangePast() time.Duration {
return o.matchRangePast
}

func (o *options) SetOnNamespaceAddedFn(value OnNamespaceAddedFn) Options {
opts := *o
opts.onNamespaceAddedFn = value
return &opts
}

func (o *options) OnNamespaceAddedFn() OnNamespaceAddedFn {
return o.onNamespaceAddedFn
}

func (o *options) SetOnNamespaceRemovedFn(value OnNamespaceRemovedFn) Options {
opts := *o
opts.onNamespaceRemovedFn = value
return &opts
}

func (o *options) OnNamespaceRemovedFn() OnNamespaceRemovedFn {
return o.onNamespaceRemovedFn
}

func (o *options) SetOnRuleSetUpdatedFn(value OnRuleSetUpdatedFn) Options {
opts := *o
opts.onRuleSetUpdatedFn = value
return &opts
}

func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn {
return o.onRuleSetUpdatedFn
}

func defaultRuleSetKeyFn(namespace []byte) string {
return fmt.Sprintf(defaultRuleSetKeyFormat, namespace)
}
Loading

0 comments on commit 13a7bda

Please sign in to comment.