Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Merge branch 'master' into r/drop-mapping-rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Skillington committed Jul 17, 2018
2 parents dc6333c + d92603d commit 00e8df3
Show file tree
Hide file tree
Showing 19 changed files with 512 additions and 158 deletions.
16 changes: 0 additions & 16 deletions aggregation/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,22 +272,6 @@ func NewTypesFromProto(input []aggregationpb.AggregationType) (Types, error) {
return res, nil
}

// UnmarshalYAML unmarshals aggregation types from a string.
// TODO(xichen): look into whether it's possible to unmarshal it as an array to be more consistent.
func (aggTypes *Types) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}

parsed, err := ParseTypes(str)
if err != nil {
return err
}
*aggTypes = parsed
return nil
}

// Contains checks if the given type is contained in the aggregation types.
func (aggTypes Types) Contains(aggType Type) bool {
for _, at := range aggTypes {
Expand Down
21 changes: 15 additions & 6 deletions aggregation/type_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ import (

func TestTypesConfiguration(t *testing.T) {
str := `
defaultGaugeAggregationTypes: Max
defaultTimerAggregationTypes: P50,P99,P9999
defaultGaugeAggregationTypes: [Max]
defaultTimerAggregationTypes:
- P50
- P99
- P9999
counterTransformFnType: empty
timerTransformFnType: suffix
gaugeTransformFnType: empty
Expand All @@ -56,8 +59,11 @@ gaugeTransformFnType: empty

func TestTypesConfigurationNoTransformFnType(t *testing.T) {
str := `
defaultGaugeAggregationTypes: Max
defaultTimerAggregationTypes: P50,P99,P9999
defaultGaugeAggregationTypes: [Max]
defaultTimerAggregationTypes:
- P50
- P99
- P9999
`

var cfg TypesConfiguration
Expand All @@ -75,8 +81,11 @@ defaultTimerAggregationTypes: P50,P99,P9999

func TestTypesConfigurationError(t *testing.T) {
str := `
defaultGaugeAggregationTypes: Max
defaultTimerAggregationTypes: P50,P99,P9999
defaultGaugeAggregationTypes: [Max]
defaultTimerAggregationTypes:
- P50
- P99
- P9999
timerTransformFnType: bla
`

Expand Down
98 changes: 94 additions & 4 deletions aggregation/type_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package aggregation

import (
"encoding/json"
"testing"

"github.com/m3db/m3x/pool"
Expand Down Expand Up @@ -79,26 +80,115 @@ func TestTypesIsDefault(t *testing.T) {
require.False(t, Types{Max}.IsDefault())
}

func TestTypesMarshalJSON(t *testing.T) {
inputs := []struct {
types Types
expected string
expectedErr bool
}{
{
types: Types{},
expected: `[]`,
},
{
types: Types{Min},
expected: `["Min"]`,
},
{
types: Types{Mean, Max, P99, P9999},
expected: `["Mean","Max","P99","P9999"]`,
},
{
types: Types{Type(1000)},
expectedErr: true,
},
}
for _, input := range inputs {
b, err := json.Marshal(input.types)
if input.expectedErr {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.Equal(t, input.expected, string(b))
}
}

func TestTypesUnMarshalJSON(t *testing.T) {
inputs := []struct {
str string
expected Types
expectedErr bool
}{
{
str: `[]`,
expected: Types{},
},
{
str: `["Min"]`,
expected: Types{Min},
},
{
str: `["Mean","Max","P99","P9999"]`,
expected: Types{Mean, Max, P99, P9999},
},
{
str: `[P100]`,
expectedErr: true,
},
}
for _, input := range inputs {
var aggTypes Types
err := json.Unmarshal([]byte(input.str), &aggTypes)
if input.expectedErr {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.Equal(t, input.expected, aggTypes)
}
}

func TestTypesMarshalJSONRoundTrip(t *testing.T) {
inputs := []Types{
{},
{Min},
{Mean, Max, P99, P9999},
}

for _, input := range inputs {
b, err := json.Marshal(input)
require.NoError(t, err)
var aggTypes Types
require.NoError(t, json.Unmarshal(b, &aggTypes))
require.Equal(t, input, aggTypes)
}
}

func TestTypesUnmarshalYAML(t *testing.T) {
inputs := []struct {
str string
expected Types
expectedErr bool
}{
{
str: "Min",
str: "",
expected: Types(nil),
},
{
str: "[Min]",
expected: Types{Min},
},
{
str: "Mean,Max,P99,P9999",
str: "[Mean,Max,P99,P9999]",
expected: Types{Mean, Max, P99, P9999},
},
{
str: "Min,Max,P99,P9999,P100",
str: "[Min,Max,P99,P9999,P100]",
expectedErr: true,
},
{
str: "Min,Max,P99,P9999,P100",
str: "[Min,Max,P99,P9999,P100]",
expectedErr: true,
},
{
Expand Down
27 changes: 10 additions & 17 deletions matcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/m3db/m3cluster/client"
"github.com/m3db/m3cluster/kv"
"github.com/m3db/m3metrics/aggregation"
"github.com/m3db/m3metrics/filters"
"github.com/m3db/m3metrics/matcher/cache"
"github.com/m3db/m3metrics/metric/id"
Expand All @@ -39,16 +38,15 @@ import (

// Configuration is config used to create a Matcher.
type Configuration struct {
InitWatchTimeout time.Duration `yaml:"initWatchTimeout"`
RulesKVConfig kv.OverrideConfiguration `yaml:"rulesKVConfig"`
NamespacesKey string `yaml:"namespacesKey" validate:"nonzero"`
RuleSetKeyFmt string `yaml:"ruleSetKeyFmt" validate:"nonzero"`
NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"`
DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"`
NameTagKey string `yaml:"nameTagKey" validate:"nonzero"`
MatchRangePast *time.Duration `yaml:"matchRangePast"`
SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"`
AggregationTypes aggregation.TypesConfiguration `yaml:"aggregationTypes"`
InitWatchTimeout time.Duration `yaml:"initWatchTimeout"`
RulesKVConfig kv.OverrideConfiguration `yaml:"rulesKVConfig"`
NamespacesKey string `yaml:"namespacesKey" validate:"nonzero"`
RuleSetKeyFmt string `yaml:"ruleSetKeyFmt" validate:"nonzero"`
NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"`
DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"`
NameTagKey string `yaml:"nameTagKey" validate:"nonzero"`
MatchRangePast *time.Duration `yaml:"matchRangePast"`
SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"`
}

// NewNamespaces creates a matcher.Namespaces.
Expand Down Expand Up @@ -120,15 +118,10 @@ func (cfg *Configuration) NewOptions(
return m3.IsRollupID(name, tags, sortedTagIteratorPool)
}

aggTypeOpts, err := cfg.AggregationTypes.NewOptions(instrumentOpts)
if err != nil {
return nil, err
}
ruleSetOpts := rules.NewOptions().
SetTagsFilterOptions(tagsFilterOptions).
SetNewRollupIDFn(m3.NewRollupID).
SetIsRollupIDFn(isRollupIDFn).
SetAggregationTypesOptions(aggTypeOpts)
SetIsRollupIDFn(isRollupIDFn)

// Configure ruleset key function.
ruleSetKeyFn := func(namespace []byte) string {
Expand Down
13 changes: 11 additions & 2 deletions matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ type Namespaces interface {

// ReverseMatch reverse matches the matching policies for a given id in a given namespace
// between [fromNanos, toNanos), taking into account the metric type and aggregation type for the given id.
ReverseMatch(namespace, id []byte, fromNanos, toNanos int64, mt metric.Type, at aggregation.Type) rules.MatchResult
ReverseMatch(
namespace, id []byte,
fromNanos, toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) rules.MatchResult

// Close closes the namespaces.
Close()
Expand Down Expand Up @@ -183,12 +190,14 @@ func (n *namespaces) ReverseMatch(
fromNanos, toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) rules.MatchResult {
ruleSet, exists := n.ruleSet(namespace)
if !exists {
return rules.EmptyMatchResult
}
return ruleSet.ReverseMatch(id, fromNanos, toNanos, mt, at)
return ruleSet.ReverseMatch(id, fromNanos, toNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
}

func (n *namespaces) ruleSet(namespace []byte) (RuleSet, bool) {
Expand Down
4 changes: 3 additions & 1 deletion matcher/ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func (r *ruleSet) ReverseMatch(
fromNanos, toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) rules.MatchResult {
callStart := r.nowFn()
r.RLock()
Expand All @@ -175,7 +177,7 @@ func (r *ruleSet) ReverseMatch(
r.metrics.nilMatcher.Inc(1)
return rules.EmptyMatchResult
}
res := r.matcher.ReverseMatch(id, fromNanos, toNanos, mt, at)
res := r.matcher.ReverseMatch(id, fromNanos, toNanos, mt, at, isMultiAggregationTypesAllowed, aggTypesOpts)
r.RUnlock()
r.metrics.match.ReportSuccess(r.nowFn().Sub(callStart))
return res
Expand Down
48 changes: 29 additions & 19 deletions matcher/ruleset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,21 @@ func TestRuleSetReverseMatchWithMatcher(t *testing.T) {
rs.matcher = mockMatcher

var (
now = rs.nowFn()
fromNanos = now.Add(-time.Second).UnixNano()
toNanos = now.Add(time.Second).UnixNano()
now = rs.nowFn()
fromNanos = now.Add(-time.Second).UnixNano()
toNanos = now.Add(time.Second).UnixNano()
isMultiAggregationTypesAllowed = true
aggTypesOpts = aggregation.NewTypesOptions()
)

require.Equal(t, mockMatcher.res, rs.ReverseMatch([]byte("foo"), fromNanos, toNanos, metric.CounterType, aggregation.Sum))
require.Equal(t, mockMatcher.res, rs.ReverseMatch([]byte("foo"), fromNanos, toNanos, metric.CounterType, aggregation.Sum, isMultiAggregationTypesAllowed, aggTypesOpts))
require.Equal(t, []byte("foo"), mockMatcher.id)
require.Equal(t, fromNanos, mockMatcher.fromNanos)
require.Equal(t, toNanos, mockMatcher.toNanos)
require.Equal(t, metric.CounterType, mockMatcher.metricType)
require.Equal(t, aggregation.Sum, mockMatcher.aggregationType)
require.Equal(t, isMultiAggregationTypesAllowed, mockMatcher.isMultiAggregationTypesAllowed)
require.Equal(t, aggTypesOpts, mockMatcher.aggTypesOpts)
}

func TestToRuleSetNilValue(t *testing.T) {
Expand Down Expand Up @@ -217,12 +221,14 @@ func TestRuleSetProcessSuccess(t *testing.T) {
}

type mockMatcher struct {
id []byte
fromNanos int64
toNanos int64
res rules.MatchResult
metricType metric.Type
aggregationType aggregation.Type
id []byte
fromNanos int64
toNanos int64
res rules.MatchResult
metricType metric.Type
aggregationType aggregation.Type
isMultiAggregationTypesAllowed bool
aggTypesOpts aggregation.TypesOptions
}

func (mm *mockMatcher) ForwardMatch(
Expand All @@ -240,12 +246,16 @@ func (mm *mockMatcher) ReverseMatch(
fromNanos, toNanos int64,
mt metric.Type,
at aggregation.Type,
isMultiAggregationTypesAllowed bool,
aggTypesOpts aggregation.TypesOptions,
) rules.MatchResult {
mm.id = id
mm.fromNanos = fromNanos
mm.toNanos = toNanos
mm.metricType = mt
mm.aggregationType = at
mm.isMultiAggregationTypesAllowed = isMultiAggregationTypesAllowed
mm.aggTypesOpts = aggTypesOpts
return mm.res
}

Expand All @@ -257,15 +267,15 @@ type mockRuleSet struct {
matcher *mockMatcher
}

func (r *mockRuleSet) Namespace() []byte { return []byte(r.namespace) }
func (r *mockRuleSet) Version() int { return r.version }
func (r *mockRuleSet) CutoverNanos() int64 { return r.cutoverNanos }
func (r *mockRuleSet) LastUpdatedAtNanos() int64 { return 0 }
func (r *mockRuleSet) CreatedAtNanos() int64 { return 0 }
func (r *mockRuleSet) Tombstoned() bool { return r.tombstoned }
func (r *mockRuleSet) Proto() (*rulepb.RuleSet, error) { return nil, nil }
func (r *mockRuleSet) ActiveSet(timeNanos int64) rules.Matcher { return r.matcher }
func (r *mockRuleSet) ToMutableRuleSet() rules.MutableRuleSet { return nil }
func (r *mockRuleSet) Namespace() []byte { return []byte(r.namespace) }
func (r *mockRuleSet) Version() int { return r.version }
func (r *mockRuleSet) CutoverNanos() int64 { return r.cutoverNanos }
func (r *mockRuleSet) LastUpdatedAtNanos() int64 { return 0 }
func (r *mockRuleSet) CreatedAtNanos() int64 { return 0 }
func (r *mockRuleSet) Tombstoned() bool { return r.tombstoned }
func (r *mockRuleSet) Proto() (*rulepb.RuleSet, error) { return nil, nil }
func (r *mockRuleSet) ActiveSet(timeNanos int64) rules.Matcher { return r.matcher }
func (r *mockRuleSet) ToMutableRuleSet() rules.MutableRuleSet { return nil }
func (r *mockRuleSet) MappingRules() (view.MappingRules, error) { return nil, nil }
func (r *mockRuleSet) RollupRules() (view.RollupRules, error) { return nil, nil }
func (r *mockRuleSet) Latest() (view.RuleSet, error) { return view.RuleSet{}, nil }
Expand Down
Loading

0 comments on commit 00e8df3

Please sign in to comment.