Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add functions to find mapping policies for rollup metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 16, 2017
1 parent 4503730 commit b934e54
Show file tree
Hide file tree
Showing 4 changed files with 421 additions and 61 deletions.
17 changes: 17 additions & 0 deletions metric/id/m3/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ func NewRollupID(name []byte, tagPairs []id.TagPair) []byte {
return buf.Bytes()
}

// IsRollupID determines whether an id is a rollup id.
func IsRollupID(id []byte) bool {
_, tags, err := NameAndTags(id)
if err != nil {
return false
}
iter := NewSortedTagIterator(tags)
defer iter.Close()
for iter.Next() {
name, val := iter.Current()
if bytes.Equal(name, rollupTagPair.Name) && bytes.Equal(val, rollupTagPair.Value) {
return true
}
}
return false
}

// TODO(xichen): pool the mids.
type metricID struct {
id []byte
Expand Down
14 changes: 14 additions & 0 deletions metric/id/m3/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ func TestNewRollupID(t *testing.T) {
require.Equal(t, expected, NewRollupID(name, tagPairs))
}

func TestIsRollupID(t *testing.T) {
inputs := []struct {
id []byte
expected bool
}{
{id: []byte("m3+foo+a1=b1,m3_rollup=true,a2=b2"), expected: true},
{id: []byte("foo.bar.baz"), expected: false},
{id: []byte("m3+foo+a1=b1,a2=b2"), expected: false},
}
for _, input := range inputs {
require.Equal(t, input.expected, IsRollupID(input.id))
}
}

func TestMetricIDTagValue(t *testing.T) {
iterPool := id.NewSortedTagIteratorPool(nil)
iterPool.Init(func() id.SortedTagIterator {
Expand Down
179 changes: 161 additions & 18 deletions rules/ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,30 @@ var (
errNilRuleSetSchema = errors.New("nil rule set schema")
)

// MatchMode determines how match is performed.
type MatchMode int

// List of supported match modes.
const (
// When performing matches in MappingPoliciesOnly mode, the matcher only finds the applicable
// mapping policies for the given id, without attempting to determine the rollup policies.
MappingPoliciesOnly MatchMode = iota

// When performing matches in Full mode, the matcher find the applicable mapping
// policies as well as the rollup policies for the given id.
Full
)

// MatchOptions provide a set of options for id matching.
type MatchOptions struct {
Mode MatchMode
UseDefaultIfNoMatch bool
}

// Matcher matches metrics against rules to determine applicable policies.
type Matcher interface {
// MatchAll returns all applicable policies given a metric id between [from, to).
MatchAll(id []byte, from time.Time, to time.Time) MatchResult
// MatchAll returns the applicable policies for a metric id between [from, to).
MatchAll(id []byte, isRollupID bool, from, to time.Time, opts MatchOptions) MatchResult
}

type activeRuleSet struct {
Expand Down Expand Up @@ -90,32 +110,149 @@ func newActiveRuleSet(

// NB(xichen): could make this more efficient by keeping track of matched rules
// at previous iteration and incrementally update match results.
func (as *activeRuleSet) MatchAll(id []byte, from time.Time, to time.Time) MatchResult {
func (as *activeRuleSet) MatchAll(
id []byte,
isRollupID bool,
from, to time.Time,
opts MatchOptions,
) MatchResult {
var (
fromNanos = from.UnixNano()
toNanos = to.UnixNano()
currMappingResults = policy.PoliciesList{as.matchMappings(id, fromNanos)}
currRollupResults = as.matchRollups(id, fromNanos)
nextIdx = as.nextCutoverIdx(fromNanos)
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
currMappingResults policy.PoliciesList
currRollupResults []RollupResult
)

if currMappingPolicies, found := as.mappingPoliciesFor(id, isRollupID, fromNanos, opts); found {
currMappingResults = append(currMappingResults, currMappingPolicies)
}
if opts.Mode != MappingPoliciesOnly {
currRollupResults = as.rollupResultsFor(id, fromNanos)
}
for nextIdx < len(as.cutoverTimesAsc) && nextCutoverNanos < toNanos {
nextMappingPolicies := as.matchMappings(id, nextCutoverNanos)
nextRollupResults := as.matchRollups(id, nextCutoverNanos)
currMappingResults = mergeMappingResults(currMappingResults, nextMappingPolicies)
currRollupResults = mergeRollupResults(currRollupResults, nextRollupResults, nextCutoverNanos)
if nextMappingPolicies, found := as.mappingPoliciesFor(id, isRollupID, nextCutoverNanos, opts); found {
currMappingResults = mergeMappingResults(currMappingResults, nextMappingPolicies)
}
if opts.Mode != MappingPoliciesOnly {
nextRollupResults := as.rollupResultsFor(id, nextCutoverNanos)
currRollupResults = mergeRollupResults(currRollupResults, nextRollupResults, nextCutoverNanos)
}
nextIdx++
nextCutoverNanos = as.cutoverNanosAt(nextIdx)
}

// The result expires when it reaches the first cutover time after t among all
// The result expires when it reaches the first cutover time after to among all
// active rules because the metric may then be matched against a different set of rules.
return NewMatchResult(nextCutoverNanos, currMappingResults, currRollupResults)
}

func (as *activeRuleSet) matchMappings(id []byte, timeNanos int64) policy.StagedPolicies {
// TODO(xichen): pool the policies.
// mappingPoliciesFor returns the mapping policies applicable to a given id at a given
// time. It returns true if one or more matching policies are found, and false otherwise.
func (as *activeRuleSet) mappingPoliciesFor(
id []byte,
isRollupID bool,
timeNanos int64,
opts MatchOptions,
) (policy.StagedPolicies, bool) {
var (
sp policy.StagedPolicies
matched bool
)
if !isRollupID {
sp, matched = as.mappingsForNonRollupID(id, timeNanos)
} else {
sp, matched = as.mappingsForRollupID(id, timeNanos)
}
if matched {
return sp, true
}
if opts.UseDefaultIfNoMatch {
return policy.DefaultStagedPolicies, true
}
return policy.DefaultStagedPolicies, false
}

// NB(xichen): in order to determine the applicable policies for a rollup metric, we need to
// match the id against rollup rules to determine which rollup rules are applicable, under the
// assumption that no two rollup targets in the same namespace may have the same rollup metric
// name and the list of rollup tags. Otherwise, a rollup metric could potentially match more
// than one rollup rule with different policies even though only one of the matched rules was
// used to produce the given rollup metric id due to its tag filters, thereby causing the wrong
// staged policies to be returned. This also implies at any given time, at most one rollup target
// may match the given rollup id.
func (as *activeRuleSet) mappingsForRollupID(id []byte, timeNanos int64) (policy.StagedPolicies, bool) {
if len(as.rollupRules) == 0 {
return policy.DefaultStagedPolicies, false
}

// If we cannot extract tags from the id, this is likely an invalid
// metric and we bail early.
name, tags, err := as.tagFilterOpts.NameAndTagsFn(id)
if err != nil {
return policy.DefaultStagedPolicies, false
}

var (
cutoverNanos int64
policies []policy.Policy
matched bool
)
for _, rollupRule := range as.rollupRules {
snapshot := rollupRule.ActiveSnapshot(timeNanos)
if snapshot == nil {
continue
}
for _, target := range snapshot.targets {
if !bytes.Equal(target.Name, name) {
continue
}
var (
tagIter = as.tagFilterOpts.SortedTagIteratorFn(tags)
hasMoreTags = tagIter.Next()
targetTagIdx = 0
)
for hasMoreTags && targetTagIdx < len(target.Tags) {
tagName, _ := tagIter.Current()
res := bytes.Compare(tagName, target.Tags[targetTagIdx])
if res == 0 {
targetTagIdx++
hasMoreTags = tagIter.Next()
continue
}
// If one of the target tags is not found in the id, this is considered
// a non-match so bail immediately.
if res > 0 {
break
}
hasMoreTags = tagIter.Next()
}
tagIter.Close()

// If all of the target tags are matched, this is considered as a match.
if targetTagIdx == len(target.Tags) {
matched = true
policies = append(policies, target.Policies...)
if cutoverNanos < snapshot.cutoverNanos {
cutoverNanos = snapshot.cutoverNanos
}
break
}
}
if matched {
break
}
}
if !matched {
return policy.DefaultStagedPolicies, false
}
resolved := resolvePolicies(policies)
return policy.NewStagedPolicies(cutoverNanos, false, resolved), true
}

// NB(xichen): if the given id is not a rollup id, we need to match it against the mapping
// rules to determine the corresponding mapping policies.
func (as *activeRuleSet) mappingsForNonRollupID(id []byte, timeNanos int64) (policy.StagedPolicies, bool) {
var (
cutoverNanos int64
policies []policy.Policy
Expand All @@ -134,13 +271,13 @@ func (as *activeRuleSet) matchMappings(id []byte, timeNanos int64) policy.Staged
policies = append(policies, snapshot.policies...)
}
if cutoverNanos == 0 && len(policies) == 0 {
return policy.DefaultStagedPolicies
return policy.DefaultStagedPolicies, false
}
resolved := resolvePolicies(policies)
return policy.NewStagedPolicies(cutoverNanos, false, resolved)
return policy.NewStagedPolicies(cutoverNanos, false, resolved), true
}

func (as *activeRuleSet) matchRollups(id []byte, timeNanos int64) []RollupResult {
func (as *activeRuleSet) rollupResultsFor(id []byte, timeNanos int64) []RollupResult {
// TODO(xichen): pool the rollup targets.
var (
cutoverNanos int64
Expand Down Expand Up @@ -229,12 +366,18 @@ func (as *activeRuleSet) toRollupResults(id []byte, cutoverNanos int64, targets
continue
}
if res > 0 {
targetTagIdx++
continue
break
}
hasMoreTags = tagIter.Next()
}
tagIter.Close()
// If not all the target tags are found in the id, this is considered
// an ineligible rollup target. In practice, this should never happen
// because the UI requires the list of rollup tags should be a subset
// of the tags in the metric selection filter.
if targetTagIdx < len(target.Tags) {
continue
}

result := RollupResult{
ID: as.newRollupIDFn(target.Name, tagPairs),
Expand Down
Loading

0 comments on commit b934e54

Please sign in to comment.