Skip to content

Commit

Permalink
Fix rollup metrics aggregation (#2298)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan committed Dec 13, 2021
1 parent 29e8157 commit 074d8eb
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 11 deletions.
8 changes: 4 additions & 4 deletions common/metrics/opentelemetry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
return NewTagFilteringScope(tagsFilterConfig, impl)
}

rootScope := newOpentelemetryScope(serviceIdx, reporter, nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache)
globalRootScope := newOpentelemetryScope(serviceIdx, reporter, nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &opentelemetryClient{
rootScope: rootScope,
rootScope: globalRootScope,
childScopes: make(map[int]Scope, totalScopes),
metricDefs: getMetricDefs(serviceIdx),
serviceIdx: serviceIdx,
Expand All @@ -70,7 +70,7 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
namespace: namespaceAllValue,
}
mergeMapToRight(def.tags, scopeTags)
metricsClient.childScopes[idx] = scopeWrapper(rootScope.taggedString(scopeTags))
metricsClient.childScopes[idx] = scopeWrapper(globalRootScope.taggedString(scopeTags, true))
}

for idx, def := range ScopeDefs[serviceIdx] {
Expand All @@ -79,7 +79,7 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
namespace: namespaceAllValue,
}
mergeMapToRight(def.tags, scopeTags)
metricsClient.childScopes[idx] = scopeWrapper(rootScope.taggedString(scopeTags))
metricsClient.childScopes[idx] = scopeWrapper(globalRootScope.taggedString(scopeTags, true))
}

return metricsClient, nil
Expand Down
16 changes: 10 additions & 6 deletions common/metrics/opentelemetry_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func newOpentelemetryScope(
defs map[int]metricDefinition,
isNamespace bool,
gaugeCache OtelGaugeCache,
selfAsRoot bool,
) *opentelemetryScope {
result := &opentelemetryScope{
serviceIdx: serviceIdx,
Expand All @@ -65,6 +66,9 @@ func newOpentelemetryScope(
isNamespaceTagged: isNamespace,
gaugeCache: gaugeCache,
}
if selfAsRoot {
result.rootScope = result
}
result.labels = tagMapToLabelArray(tags)
return result
}
Expand Down Expand Up @@ -110,7 +114,7 @@ func (m *opentelemetryScope) StartTimer(id int) Stopwatch {
m.rootScope.labels)
return newOpenTelemetryStopwatch([]openTelemetryStopwatchMetric{timer, timerRollup})
case m.isNamespaceTagged:
allScope := m.taggedString(map[string]string{namespace: namespaceAllValue})
allScope := m.taggedString(map[string]string{namespace: namespaceAllValue}, false)
timerAll := newOpenTelemetryStopwatchMetric(
allScope.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String(), opt...),
allScope.labels)
Expand Down Expand Up @@ -146,7 +150,7 @@ func (m *opentelemetryScope) RecordTimer(id int, d time.Duration) {
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String(), opt...).Record(
ctx,
d.Nanoseconds(),
m.taggedString(map[string]string{namespace: namespaceAllValue}).labels...,
m.taggedString(map[string]string{namespace: namespaceAllValue}, false).labels...,
)
}
}
Expand Down Expand Up @@ -177,12 +181,12 @@ func (m *opentelemetryScope) RecordDistribution(id int, d int) {
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String(), opt...).Record(
ctx,
value,
m.taggedString(map[string]string{namespace: namespaceAllValue}).labels...,
m.taggedString(map[string]string{namespace: namespaceAllValue}, false).labels...,
)
}
}

func (m *opentelemetryScope) taggedString(tags map[string]string) *opentelemetryScope {
func (m *opentelemetryScope) taggedString(tags map[string]string, selfAsRoot bool) *opentelemetryScope {
namespaceTagged := m.isNamespaceTagged
tagMap := make(map[string]string, len(tags)+len(m.labels))
for k, v := range m.tags {
Expand All @@ -195,7 +199,7 @@ func (m *opentelemetryScope) taggedString(tags map[string]string) *opentelemetry
}
tagMap[k] = v
}
return newOpentelemetryScope(m.serviceIdx, m.reporter, m.rootScope, tagMap, m.defs, namespaceTagged, m.gaugeCache)
return newOpentelemetryScope(m.serviceIdx, m.reporter, m.rootScope, tagMap, m.defs, namespaceTagged, m.gaugeCache, selfAsRoot)
}

func (m *opentelemetryScope) Tagged(tags ...Tag) Scope {
Expand Down Expand Up @@ -232,7 +236,7 @@ func (m *opentelemetryScope) TaggedInternal(tags ...Tag) internalScope {
tagMap[tag.Key()] = tag.Value()
}

return m.taggedString(tagMap)
return m.taggedString(tagMap, false)
}

func unitToOptions(unit MetricUnit) metric.InstrumentOption {
Expand Down
8 changes: 7 additions & 1 deletion common/metrics/tally_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,13 @@ func (m *TallyClient) Scope(scopeIdx int, tags ...Tag) Scope {
scope := m.childScopes[scopeIdx]
return m.scopeWrapper(
newTallyScopeInternal(
NoopScope(0),
newTallyScopeInternal(
NoopScope(0),
scope,
m.metricDefs,
false,
m.perUnitBuckets,
),
scope,
m.metricDefs,
false,
Expand Down

0 comments on commit 074d8eb

Please sign in to comment.