Skip to content

Commit

Permalink
Use metrics unit in user scope (#2759)
Browse files Browse the repository at this point in the history
* Use metrics unit in user scope
  • Loading branch information
yux0 committed Apr 22, 2022
1 parent 4b44fd2 commit 5ce1a63
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 155 deletions.
104 changes: 63 additions & 41 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,37 +182,57 @@ var (

defaultQuantiles = []float64{50, 75, 90, 95, 99}

defaultHistogramBoundaries = []float64{
1 * ms,
2 * ms,
5 * ms,
10 * ms,
20 * ms,
50 * ms,
100 * ms,
200 * ms,
500 * ms,
1000 * ms,
2000 * ms,
5000 * ms,
10000 * ms,
20000 * ms,
50000 * ms,
100000 * ms,
200000 * ms,
500000 * ms,
1000000 * ms,
2000000 * ms,
5000000 * ms,
10000000 * ms,
20000000 * ms,
50000000 * ms,
100000000 * ms,
200000000 * ms,
500000000 * ms,
1000000000 * ms,
2000000000 * ms,
5000000000 * ms,
defaultPerUnitHistogramBoundaries = map[string][]float64{
Dimensionless: {
1,
2,
5,
10,
20,
50,
100,
200,
500,
1000,
},
Milliseconds: {
1 * ms,
2 * ms,
5 * ms,
10 * ms,
20 * ms,
50 * ms,
100 * ms,
200 * ms,
500 * ms,
1000 * ms,
2000 * ms,
5000 * ms,
10000 * ms,
20000 * ms,
50000 * ms,
100000 * ms,
200000 * ms,
500000 * ms,
1000000 * ms,
},
Bytes: {
1024,
2048,
4096,
8192,
16384,
32768,
65536,
131072,
262144,
524288,
1048576,
2097152,
4194304,
8388608,
16777216,
},
}
)

Expand Down Expand Up @@ -250,7 +270,7 @@ func InitReporterFromPrometheusConfig(logger log.Logger, config *PrometheusConfi
case FrameworkTally:
return NewTallyReporterFromPrometheusConfig(logger, config, clientConfig), nil
case FrameworkOpentelemetry:
return NewOpentelemeteryReporterFromPrometheusConfig(logger, config, clientConfig)
return NewOpenTelemetryReporterFromPrometheusConfig(logger, config, clientConfig)
default:
err := fmt.Errorf("unsupported framework type specified in config: %q", config.Framework)
logger.Error(err.Error())
Expand Down Expand Up @@ -285,6 +305,7 @@ func NewTallyReporterFromPrometheusConfig(
) Reporter {
tallyConfig := convertPrometheusConfigToTally(config)
tallyScope := newPrometheusScope(logger, tallyConfig, clientConfig)
setDefaultPerUnitHistogramBoundaries(clientConfig)
return NewTallyReporter(tallyScope, clientConfig)
}

Expand Down Expand Up @@ -323,6 +344,13 @@ func convertPrometheusConfigToTally(
}
}

func setDefaultPerUnitHistogramBoundaries(clientConfig *ClientConfig) {
if clientConfig.PerUnitHistogramBoundaries != nil && len(clientConfig.PerUnitHistogramBoundaries) > 0 {
return
}
clientConfig.PerUnitHistogramBoundaries = defaultPerUnitHistogramBoundaries
}

// newM3Scope returns a new m3 scope with
// a default reporting interval of a second
func newM3Scope(logger log.Logger, c *Config) tally.Scope {
Expand All @@ -334,7 +362,6 @@ func newM3Scope(logger log.Logger, c *Config) tally.Scope {
Tags: c.Tags,
CachedReporter: reporter,
Prefix: c.Prefix,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
Expand All @@ -355,10 +382,9 @@ func newStatsdScope(logger log.Logger, c *Config) tally.Scope {
// Therefore, we implement Tally interface to have a statsd reporter that can support tagging
reporter := statsdreporter.NewReporter(statter, tallystatsdreporter.Options{})
scopeOpts := tally.ScopeOptions{
Tags: c.Tags,
Reporter: reporter,
Prefix: c.Prefix,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
Tags: c.Tags,
Reporter: reporter,
Prefix: c.Prefix,
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
Expand All @@ -371,9 +397,6 @@ func newPrometheusScope(
config *prometheus.Configuration,
clientConfig *ClientConfig,
) tally.Scope {
if len(config.DefaultHistogramBuckets) == 0 {
config.DefaultHistogramBuckets = histogramBoundariesToHistogramObjectives(defaultHistogramBoundaries)
}
reporter, err := config.NewReporter(
prometheus.ConfigurationOptions{
Registry: prom.NewRegistry(),
Expand All @@ -391,7 +414,6 @@ func newPrometheusScope(
Separator: prometheus.DefaultSeparator,
SanitizeOptions: &sanitizeOptions,
Prefix: clientConfig.Prefix,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
Expand Down
2 changes: 1 addition & 1 deletion common/metrics/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type (
RecordTimer(timer string, d time.Duration)
// RecordDistribution records a distribution (wrapper on top of timer) for the given
// metric name
RecordDistribution(id string, d int)
RecordDistribution(id string, unit MetricUnit, d int)
// UpdateGauge reports Gauge type absolute value metric
UpdateGauge(gauge string, value float64)
// Tagged returns a new scope with added and/or overriden tags values that can be used
Expand Down
8 changes: 4 additions & 4 deletions common/metrics/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/metrics/noop_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (n *noopMetricsUserScope) StartTimer(timer string) Stopwatch {

func (n *noopMetricsUserScope) RecordTimer(timer string, d time.Duration) {}

func (n *noopMetricsUserScope) RecordDistribution(id string, d int) {}
func (n *noopMetricsUserScope) RecordDistribution(id string, unit MetricUnit, d int) {}

func (n *noopMetricsUserScope) UpdateGauge(gauge string, value float64) {}

Expand Down
9 changes: 3 additions & 6 deletions common/metrics/opentelemetry_aggregator_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,25 +36,22 @@ import (
type (
// OtelAggregatorSelector handles utilizing correct histogram bucket list for distinct metric unit types.
OtelAggregatorSelector struct {
buckets map[MetricUnit][]histogram.Option
defaultBuckets []histogram.Option
buckets map[MetricUnit][]histogram.Option
}
)

var _ emetric.AggregatorSelector = &OtelAggregatorSelector{}

// Creates new instance of aggregator selector.
func NewOtelAggregatorSelector(
defaultBoundaries []float64,
perUnitBoundaries map[string][]float64,
) *OtelAggregatorSelector {
perUnitBuckets := make(map[MetricUnit][]histogram.Option, len(perUnitBoundaries))
for unit, buckets := range perUnitBoundaries {
perUnitBuckets[MetricUnit(unit)] = []histogram.Option{histogram.WithExplicitBoundaries(buckets)}
}
return &OtelAggregatorSelector{
defaultBuckets: []histogram.Option{histogram.WithExplicitBoundaries(defaultBoundaries)},
buckets: perUnitBuckets,
buckets: perUnitBuckets,
}
}

Expand All @@ -63,7 +60,7 @@ func (s OtelAggregatorSelector) AggregatorFor(descriptor *sdkapi.Descriptor, agg
case sdkapi.GaugeObserverInstrumentKind:
lastValueAggs(aggPtrs)
case sdkapi.HistogramInstrumentKind:
options := s.defaultBuckets
var options []histogram.Option
if opts, ok := s.buckets[MetricUnit(descriptor.Unit())]; ok {
options = opts
}
Expand Down
30 changes: 15 additions & 15 deletions common/metrics/opentelemetry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"go.temporal.io/server/common/log"
)

// opentelemetryClient is used for reporting metrics by various Temporal services
// openTelemetryClient is used for reporting metrics by various Temporal services
type (
opentelemetryClient struct {
openTelemetryClient struct {
// parentReporter is the parent scope for the metrics
rootScope *opentelemetryScope
rootScope *openTelemetryScope
childScopes map[int]Scope
metricDefs map[int]metricDefinition
serviceIdx ServiceIdx
Expand All @@ -45,16 +45,16 @@ type (
}
)

// NewOpentelemeteryClientByReporter creates and returns a new instance of Client implementation
// NewOpenTelemetryClient creates and returns a new instance of Client implementation
// serviceIdx indicates the service type in (InputhostIndex, ... StorageIndex)
func NewOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx, reporter OpentelemetryReporter, logger log.Logger, gaugeCache OtelGaugeCache) (Client, error) {
func NewOpenTelemetryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx, reporter OpenTelemetryReporter, logger log.Logger, gaugeCache OtelGaugeCache) (Client, error) {
tagsFilterConfig := NewTagFilteringScopeConfig(clientConfig.ExcludeTags)

scopeWrapper := func(impl internalScope) internalScope {
return NewTagFilteringScope(tagsFilterConfig, impl)
}

globalRootScope := newOpentelemetryScope(serviceIdx, reporter.GetMeter(), nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)
globalRootScope := newOpenTelemetryScope(serviceIdx, reporter.GetMeter(), nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)

serviceTypeTagValue, err := MetricsServiceIdxToServiceName(serviceIdx)
if err != nil {
Expand All @@ -68,7 +68,7 @@ func NewOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
rootTags[serviceName] = serviceTypeTagValue

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &opentelemetryClient{
metricsClient := &openTelemetryClient{
rootScope: globalRootScope,
childScopes: make(map[int]Scope, totalScopes),
metricDefs: getMetricDefs(serviceIdx),
Expand Down Expand Up @@ -101,46 +101,46 @@ func NewOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,

// IncCounter increments one for a counter and emits
// to metrics backend
func (m *opentelemetryClient) IncCounter(scopeIdx int, counterIdx int) {
func (m *openTelemetryClient) IncCounter(scopeIdx int, counterIdx int) {
m.childScopes[scopeIdx].IncCounter(counterIdx)
}

// AddCounter adds delta to the counter and
// emits to the metrics backend
func (m *opentelemetryClient) AddCounter(scopeIdx int, counterIdx int, delta int64) {
func (m *openTelemetryClient) AddCounter(scopeIdx int, counterIdx int, delta int64) {
m.childScopes[scopeIdx].AddCounter(counterIdx, delta)
}

// StartTimer starts a timer for the given
// metric name
func (m *opentelemetryClient) StartTimer(scopeIdx int, timerIdx int) Stopwatch {
func (m *openTelemetryClient) StartTimer(scopeIdx int, timerIdx int) Stopwatch {
return m.childScopes[scopeIdx].StartTimer(timerIdx)
}

// RecordTimer records and emits a timer for the given metric name
func (m *opentelemetryClient) RecordTimer(scopeIdx int, timerIdx int, d time.Duration) {
func (m *openTelemetryClient) RecordTimer(scopeIdx int, timerIdx int, d time.Duration) {
m.childScopes[scopeIdx].RecordTimer(timerIdx, d)
}

// RecordDistribution records and emits a distribution (wrapper on top of timer) for the given
// metric name
func (m *opentelemetryClient) RecordDistribution(scopeIdx int, timerIdx int, d int) {
func (m *openTelemetryClient) RecordDistribution(scopeIdx int, timerIdx int, d int) {
m.childScopes[scopeIdx].RecordDistribution(timerIdx, d)
}

// UpdateGauge reports Gauge type metric
func (m *opentelemetryClient) UpdateGauge(scopeIdx int, gaugeIdx int, value float64) {
func (m *openTelemetryClient) UpdateGauge(scopeIdx int, gaugeIdx int, value float64) {
m.childScopes[scopeIdx].UpdateGauge(gaugeIdx, value)
}

// Scope returns a new internal metrics scope that can be used to add additional
// information to the metrics emitted
func (m *opentelemetryClient) Scope(scopeIdx int, tags ...Tag) Scope {
func (m *openTelemetryClient) Scope(scopeIdx int, tags ...Tag) Scope {
return m.childScopes[scopeIdx].Tagged(tags...)
}

// UserScope returns a new metrics scope that can be used to add additional
// information to the metrics emitted by user code.
func (m *opentelemetryClient) UserScope() UserScope {
func (m *openTelemetryClient) UserScope() UserScope {
return m.userScope
}

0 comments on commit 5ce1a63

Please sign in to comment.