Skip to content

Commit

Permalink
[WIP] Add optional tag slice to matching config
Browse files Browse the repository at this point in the history
  • Loading branch information
kentzeng12 committed Jun 6, 2024
1 parent 75a402a commit b52ec88
Show file tree
Hide file tree
Showing 5 changed files with 1,009 additions and 8 deletions.
5 changes: 4 additions & 1 deletion src/metrics/filters/tags_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ type TagsFilterOptions struct {
// Name of the name tag.
NameTagKey []byte

// Function to extract name and tags from an id.
// Name of tags to include in rollup ID if seen in metric ID
IncludeTagKeys [][]byte

//Function to extract name and tags from an id.
NameAndTagsFn id.NameAndTagsFn

// Function to create a new sorted tag iterator from id tags.
Expand Down
6 changes: 6 additions & 0 deletions src/metrics/matcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Configuration struct {
NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"`
DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"`
NameTagKey string `yaml:"nameTagKey" validate:"nonzero"`
IncludeTagKeys []string `yaml:"includeTagKeys"`
MatchRangePast *time.Duration `yaml:"matchRangePast"`
SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"`
}
Expand Down Expand Up @@ -114,6 +115,11 @@ func (cfg *Configuration) NewOptions(
NameAndTagsFn: m3.NameAndTags,
SortedTagIteratorFn: sortedTagIteratorFn,
}
includeTagKeys := make([][]byte, len(cfg.IncludeTagKeys))
for i, includeTagKey := range cfg.IncludeTagKeys {
includeTagKeys[i] = []byte(includeTagKey)
}
tagsFilterOptions.IncludeTagKeys = includeTagKeys

isRollupIDFn := func(name []byte, tags []byte) bool {
return m3.IsRollupID(name, tags, sortedTagIteratorPool)
Expand Down
36 changes: 30 additions & 6 deletions src/metrics/rules/active_ruleset.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,12 @@ func (as *activeRuleSet) matchRollupTarget(
}

var (
rollupTags = rollupOp.Tags
sortedTagIter = matchOpts.SortedTagIteratorFn(sortedTagPairBytes)
matchTagIdx = 0
nameTagName = as.tagsFilterOpts.NameTagKey
nameTagValue []byte
rollupTags = rollupOp.Tags
sortedTagIter = matchOpts.SortedTagIteratorFn(sortedTagPairBytes)
matchTagIdx = 0
nameTagName = as.tagsFilterOpts.NameTagKey
nameTagValue []byte
includeTagNames = as.tagsFilterOpts.IncludeTagKeys
)

switch rollupOp.Type {
Expand All @@ -543,9 +544,15 @@ func (as *activeRuleSet) matchRollupTarget(
nameTagValue = tagVal
}

// If we've matched all tags, no need to process.
matchedIncludeTag := isIncludeTag(tagName, includeTagNames)

// If we've matched all tags, no need to process the rollup rules.
// We don't break out of the for loop, because we may still need to find the name tag.
// We also still need to add any remaining include tags.
if matchTagIdx >= len(rollupTags) {
if targetOpts.generateRollupID && matchedIncludeTag {
tagPairs = append(tagPairs, metricid.TagPair{Name: tagName, Value: tagVal})
}
continue
}

Expand All @@ -559,6 +566,12 @@ func (as *activeRuleSet) matchRollupTarget(
continue
}

// The current tag didn't match the rollup tag, but we still want to add the include tag to the rollup id.
if targetOpts.generateRollupID && matchedIncludeTag {
tagPairs = append(tagPairs, metricid.TagPair{Name: tagName, Value: tagVal})
continue
}

// If one of the target tags is not found in the ID, this is considered a non-match so return immediately.
if res > 0 {
return nil, false, nil
Expand Down Expand Up @@ -845,6 +858,17 @@ func (as *activeRuleSet) cutoverNanosAt(idx int) int64 {
return timeNanosMax
}

// isIncludeTag checks if tagName is in includeTagNames
// Include tags are tags that if present in the metric id, should be included in the rollup id.
func isIncludeTag(tagName []byte, includeTagNames [][]byte) bool {
for _, includeTagName := range includeTagNames {
if bytes.Compare(tagName, includeTagName) == 0 {
return true
}
}
return false
}

// filterByAggregationType takes a list of pipelines as input and returns those
// containing the given aggregation type.
func filteredPipelinesWithAggregationType(
Expand Down
Loading

0 comments on commit b52ec88

Please sign in to comment.