Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Support aggregation type filtering in ReverseMatch
Browse files Browse the repository at this point in the history
  • Loading branch information
Chao Wang committed Oct 12, 2017
1 parent 887ca91 commit dabe346
Show file tree
Hide file tree
Showing 24 changed files with 855 additions and 271 deletions.
8 changes: 4 additions & 4 deletions matcher/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import "github.com/m3db/m3metrics/rules"

// Source is a datasource providing match results.
type Source interface {
// Match returns the match result for a given id within time range
// ForwardMatch returns the match result for a given id within time range
// [fromNanos, toNanos).
Match(id []byte, fromNanos, toNanos int64) rules.MatchResult
ForwardMatch(id []byte, fromNanos, toNanos int64) rules.MatchResult
}

// Cache caches the rule matching result associated with metrics.
type Cache interface {
// Match returns the rule matching result associated with a metric id
// ForwardMatch returns the rule matching result associated with a metric id
// between [fromNanos, toNanos).
Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult
ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult

// Register sets the result source for a given namespace.
Register(namespace []byte, source Source)
Expand Down
4 changes: 2 additions & 2 deletions matcher/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewCache(opts Options) matcher.Cache {
return c
}

func (c *cache) Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
func (c *cache) ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
c.RLock()
res, found := c.tryGetWithLock(namespace, id, fromNanos, toNanos, dontSetIfNotFound)
c.RUnlock()
Expand Down Expand Up @@ -262,7 +262,7 @@ func (c *cache) setWithLock(
if invalidate {
results = c.invalidateWithLock(nsHash, idHash, results)
}
res := results.source.Match(id, fromNanos, toNanos)
res := results.source.ForwardMatch(id, fromNanos, toNanos)
newElem := &element{nsHash: nsHash, idHash: idHash, result: res}
newElem.SetPromotionExpiry(c.newPromotionExpiry(c.nowFn()))
results.elems[idHash] = newElem
Expand Down
24 changes: 12 additions & 12 deletions matcher/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCacheMatchNamespaceDoesNotExist(t *testing.T) {
opts := testCacheOptions()
c := NewCache(opts)

res := c.Match([]byte("nonexistentNs"), []byte("foo"), 0, 0)
res := c.ForwardMatch([]byte("nonexistentNs"), []byte("foo"), 0, 0)
require.Equal(t, testEmptyMatchResult, res)
}

Expand All @@ -63,7 +63,7 @@ func TestCacheMatchIDCachedValidNoPromotion(t *testing.T) {
populateCache(c, testValues, now.Add(time.Minute), source, populateBoth)

// Get the second id and assert we didn't perform a promotion.
res := c.Match(testValues[1].namespace, testValues[1].id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(testValues[1].namespace, testValues[1].id, now.UnixNano(), now.UnixNano())
require.Equal(t, testValues[1].result, res)
validateCache(t, c, testValues)
}
Expand All @@ -78,7 +78,7 @@ func TestCacheMatchIDCachedValidWithPromotion(t *testing.T) {

// Move the time and assert we performed a promotion.
now = now.Add(time.Minute)
res := c.Match(testValues[1].namespace, testValues[1].id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(testValues[1].namespace, testValues[1].id, now.UnixNano(), now.UnixNano())
require.Equal(t, testValues[1].result, res)
expected := []testValue{testValues[1], testValues[0]}
validateCache(t, c, expected)
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestCacheMatchIDCachedInvalidSourceValidInvalidateAll(t *testing.T) {
source.setResult(id, result)

require.Equal(t, 2, len(c.namespaces[nsHash].elems))
res := c.Match(ns, id, now.UnixNano(), now.Add(time.Minute).UnixNano())
res := c.ForwardMatch(ns, id, now.UnixNano(), now.Add(time.Minute).UnixNano())
require.Equal(t, result, res)

// Wait for deletion to happen
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestCacheMatchIDCachedInvalidSourceValidInvalidateAllNoEviction(t *testing.
source.setResult(id, result)

require.Equal(t, 2, len(c.namespaces[nsHash].elems))
res := c.Match(ns, id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(ns, id, now.UnixNano(), now.UnixNano())
require.Equal(t, result, res)

// Wait for deletion to happen
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestCacheMatchIDCachedInvalidSourceValidInvalidateOneNoEviction(t *testing.
source.setResult(id, result)

require.Equal(t, 2, len(c.namespaces[nsHash].elems))
res := c.Match(ns, id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(ns, id, now.UnixNano(), now.UnixNano())
require.Equal(t, result, res)

// Wait for deletion to happen.
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestCacheMatchIDCachedInvalidSourceValidWithEviction(t *testing.T) {
{namespace: []byte("ns2"), id: []byte("baz")},
{namespace: []byte("ns2"), id: []byte("cat")},
} {
res := c.Match(value.namespace, value.id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(value.namespace, value.id, now.UnixNano(), now.UnixNano())
require.Equal(t, newResult, res)
}
conditionFn := func() bool {
Expand All @@ -275,7 +275,7 @@ func TestCacheMatchIDCachedInvalidSourceValidWithEviction(t *testing.T) {

// Retrieve one more id and assert we perform async eviction.
c.invalidationMode = InvalidateAll
res := c.Match([]byte("ns1"), []byte("lol"), now.UnixNano(), now.UnixNano())
res := c.ForwardMatch([]byte("ns1"), []byte("lol"), now.UnixNano(), now.UnixNano())
require.Equal(t, newResult, res)
require.NoError(t, testWaitUntilWithTimeout(conditionFn, testWaitTimeout))
expected = []testValue{
Expand All @@ -293,7 +293,7 @@ func TestCacheMatchIDNotCachedAndDoesNotExistInSource(t *testing.T) {
source := newMockSource()
populateCache(c, testValues, now.Add(time.Minute), source, populateBoth)

res := c.Match([]byte("nsfoo"), []byte("nonExistent"), now.UnixNano(), now.UnixNano())
res := c.ForwardMatch([]byte("nsfoo"), []byte("nonExistent"), now.UnixNano(), now.UnixNano())
require.Equal(t, testEmptyMatchResult, res)
}

Expand All @@ -313,7 +313,7 @@ func TestCacheMatchIDNotCachedSourceValidNoEviction(t *testing.T) {
result = testValues[1].result
)
require.Equal(t, 0, len(c.namespaces[nsHash].elems))
res := c.Match(ns, id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(ns, id, now.UnixNano(), now.UnixNano())
require.Equal(t, result, res)

expected := []testValue{testValues[1]}
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestCacheMatchParallel(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
res := c.Match(v.namespace, v.id, now.UnixNano(), now.UnixNano())
res := c.ForwardMatch(v.namespace, v.id, now.UnixNano(), now.UnixNano())
require.Equal(t, newResult, res)
}()
}
Expand Down Expand Up @@ -557,7 +557,7 @@ func (s *mockSource) IsValid(version int) bool {
return version >= currVersion
}

func (s *mockSource) Match(id []byte, fromNanos, toNanos int64) rules.MatchResult {
func (s *mockSource) ForwardMatch(id []byte, fromNanos, toNanos int64) rules.MatchResult {
s.Lock()
defer s.Unlock()
if res, exists := s.idMap[string(id)]; exists {
Expand Down
9 changes: 5 additions & 4 deletions matcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3metrics/filters"
"github.com/m3db/m3metrics/metric/id"
"github.com/m3db/m3metrics/metric/id/m3"
"github.com/m3db/m3metrics/policy"
"github.com/m3db/m3metrics/rules"
"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/instrument"
Expand All @@ -46,7 +47,7 @@ type Configuration struct {
NameTagKey string `yaml:"nameTagKey" validate:"nonzero"`
MatchRangePast *time.Duration `yaml:"matchRangePast"`
SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"`
MatchMode rules.MatchMode `yaml:"matchMode"`
Policy policy.Configuration `yaml:"policy"`
}

// NewNamespaces creates a matcher.Namespaces.
Expand Down Expand Up @@ -116,7 +117,8 @@ func (cfg *Configuration) NewOptions(
ruleSetOpts := rules.NewOptions().
SetTagsFilterOptions(tagsFilterOptions).
SetNewRollupIDFn(m3.NewRollupID).
SetIsRollupIDFn(isRollupIDFn)
SetIsRollupIDFn(isRollupIDFn).
SetPolicyOptions(cfg.Policy.NewOptions())

// Configure ruleset key function.
ruleSetKeyFn := func(namespace []byte) string {
Expand All @@ -131,8 +133,7 @@ func (cfg *Configuration) NewOptions(
SetNamespacesKey(cfg.NamespacesKey).
SetRuleSetKeyFn(ruleSetKeyFn).
SetNamespaceTag([]byte(cfg.NamespaceTag)).
SetDefaultNamespace([]byte(cfg.DefaultNamespace)).
SetMatchMode(cfg.MatchMode)
SetDefaultNamespace([]byte(cfg.DefaultNamespace))

if cfg.InitWatchTimeout != 0 {
opts = opts.SetInitWatchTimeout(cfg.InitWatchTimeout)
Expand Down
3 changes: 0 additions & 3 deletions matcher/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/m3db/m3cluster/client"
"github.com/m3db/m3cluster/kv"
"github.com/m3db/m3cluster/kv/mem"
"github.com/m3db/m3metrics/rules"
"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/instrument"

Expand All @@ -46,7 +45,6 @@ func TestConfigurationNewNamespaces(t *testing.T) {
NamespaceTag: "NamespaceTag",
DefaultNamespace: "DefaultNamespace",
NameTagKey: "NameTagKey",
MatchMode: "forward",
}

ctrl := gomock.NewController(t)
Expand All @@ -63,5 +61,4 @@ func TestConfigurationNewNamespaces(t *testing.T) {
require.Equal(t, cfg.NamespacesKey, opts.NamespacesKey())
require.Equal(t, []byte(cfg.NamespaceTag), opts.NamespaceTag())
require.Equal(t, []byte(cfg.DefaultNamespace), opts.DefaultNamespace())
require.Equal(t, rules.ForwardMatch, opts.MatchMode())
}
8 changes: 4 additions & 4 deletions matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (

// Matcher matches rules against metric IDs.
type Matcher interface {
// Match matches rules against metric ID for time range [fromNanos, toNanos)
// ForwardMatch matches rules against metric ID for time range [fromNanos, toNanos)
// and returns the match result.
Match(id id.ID, fromNanos, toNanos int64) rules.MatchResult
ForwardMatch(id id.ID, fromNanos, toNanos int64) rules.MatchResult

// Close closes the matcher.
Close() error
Expand Down Expand Up @@ -74,12 +74,12 @@ func NewMatcher(cache Cache, opts Options) (Matcher, error) {
}, nil
}

func (m *matcher) Match(id id.ID, fromNanos, toNanos int64) rules.MatchResult {
func (m *matcher) ForwardMatch(id id.ID, fromNanos, toNanos int64) rules.MatchResult {
ns, found := id.TagValue(m.namespaceTag)
if !found {
ns = m.defaultNamespace
}
return m.cache.Match(ns, id.Bytes(), fromNanos, toNanos)
return m.cache.ForwardMatch(ns, id.Bytes(), fromNanos, toNanos)
}

func (m *matcher) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions matcher/match_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestMatcherMatchDoesNotExist(t *testing.T) {
}
now := time.Now()
matcher := testMatcher(t, newMemCache())
require.Equal(t, rules.EmptyMatchResult, matcher.Match(id, now.UnixNano(), now.UnixNano()))
require.Equal(t, rules.EmptyMatchResult, matcher.ForwardMatch(id, now.UnixNano(), now.UnixNano()))
}

func TestMatcherMatchExists(t *testing.T) {
Expand All @@ -91,7 +91,7 @@ func TestMatcherMatchExists(t *testing.T) {
matcher := testMatcher(t, cache)
c := cache.(*memCache)
c.namespaces[ns] = memRes
require.Equal(t, res, matcher.Match(id, now.UnixNano(), now.UnixNano()))
require.Equal(t, res, matcher.ForwardMatch(id, now.UnixNano(), now.UnixNano()))
}

func TestMatcherClose(t *testing.T) {
Expand Down
36 changes: 27 additions & 9 deletions matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/m3db/m3cluster/kv"
"github.com/m3db/m3cluster/kv/util/runtime"
"github.com/m3db/m3metrics/generated/proto/schema"
"github.com/m3db/m3metrics/metric"
"github.com/m3db/m3metrics/policy"
"github.com/m3db/m3metrics/rules"
"github.com/m3db/m3x/clock"
xid "github.com/m3db/m3x/id"
Expand All @@ -50,9 +52,13 @@ type Namespaces interface {
// Version returns the current version for a give namespace.
Version(namespace []byte) int

// Match returns the matching policies for a given id in a given namespace
// ForwardMatch forward matches the matching policies for a given id in a given namespace
// between [fromNanos, toNanos).
Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult
ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult

// ReverseMatch reverse matches the matching policies for a given id in a given namespace
// between [fromNanos, toNanos), with aware of the metric type and aggregation type for the given id.
ReverseMatch(namespace, id []byte, fromNanos, toNanos int64, mt metric.Type, at policy.AggregationType) rules.MatchResult

// Close closes the namespaces.
Close()
Expand Down Expand Up @@ -163,19 +169,31 @@ func (n *namespaces) Version(namespace []byte) int {
return ruleSet.Version()
}

func (n *namespaces) Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
var (
res = rules.EmptyMatchResult
nsHash = xid.HashFn(namespace)
)
func (n *namespaces) ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
ruleSet, exists := n.ruleSet(namespace)
if !exists {
return rules.EmptyMatchResult
}
return ruleSet.ForwardMatch(id, fromNanos, toNanos)
}

func (n *namespaces) ReverseMatch(namespace, id []byte, fromNanos, toNanos int64, mt metric.Type, at policy.AggregationType) rules.MatchResult {
ruleSet, exists := n.ruleSet(namespace)
if !exists {
return rules.EmptyMatchResult
}
return ruleSet.ReverseMatch(id, fromNanos, toNanos, mt, at)
}

func (n *namespaces) ruleSet(namespace []byte) (RuleSet, bool) {
nsHash := xid.HashFn(namespace)
n.RLock()
ruleSet, exists := n.rules[nsHash]
n.RUnlock()
if !exists {
n.metrics.notExists.Inc(1)
return res
}
return ruleSet.Match(id, fromNanos, toNanos)
return ruleSet, exists
}

func (n *namespaces) Close() {
Expand Down
2 changes: 1 addition & 1 deletion matcher/namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func newMemCache() Cache {
return &memCache{namespaces: make(map[string]memResults)}
}

func (c *memCache) Match(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
func (c *memCache) ForwardMatch(namespace, id []byte, fromNanos, toNanos int64) rules.MatchResult {
c.RLock()
defer c.RUnlock()
if results, exists := c.namespaces[string(namespace)]; exists {
Expand Down
18 changes: 0 additions & 18 deletions matcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ type OnRuleSetUpdatedFn func(namespace []byte, ruleSet RuleSet)

// Options provide a set of options for the matcher.
type Options interface {
// SetMatchMode sets the match mode.
SetMatchMode(value rules.MatchMode) Options

// MatchMode returns the match mode.
MatchMode() rules.MatchMode

// SetClockOptions sets the clock options.
SetClockOptions(value clock.Options) Options

Expand Down Expand Up @@ -144,7 +138,6 @@ type Options interface {
}

type options struct {
matchMode rules.MatchMode
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
Expand All @@ -163,7 +156,6 @@ type options struct {
// NewOptions creates a new set of options.
func NewOptions() Options {
return &options{
matchMode: rules.ForwardMatch,
clockOpts: clock.NewOptions(),
instrumentOpts: instrument.NewOptions(),
ruleSetOpts: rules.NewOptions(),
Expand All @@ -177,16 +169,6 @@ func NewOptions() Options {
}
}

func (o *options) SetMatchMode(value rules.MatchMode) Options {
opts := *o
opts.matchMode = value
return &opts
}

func (o *options) MatchMode() rules.MatchMode {
return o.matchMode
}

func (o *options) SetClockOptions(value clock.Options) Options {
opts := *o
opts.clockOpts = value
Expand Down
Loading

0 comments on commit dabe346

Please sign in to comment.