Skip to content

Commit

Permalink
Implement metrics reporting via opentelemetry (#1445)
Browse files Browse the repository at this point in the history
* Implement metrics reporting via opentelemetry.

* Add generalized Prometheus config
* Refactor metrics initialization code to utilize generalized Prometheus config
* Add OpenTelemetry Client, Scope and Reporter implementations
  • Loading branch information
Ardagan committed Apr 16, 2021
1 parent b4c4b84 commit b06a313
Show file tree
Hide file tree
Showing 21 changed files with 1,038 additions and 92 deletions.
2 changes: 1 addition & 1 deletion cmd/server/main.go
Expand Up @@ -159,7 +159,7 @@ func buildCLI() *cli.App {

err = s.Start()
if err != nil {
return cli.Exit(fmt.Sprintf("Unable to start server: %v.", err), 1)
return cli.Exit(fmt.Sprintf("Unable to start server. Error: %v", err), 1)
}
return cli.Exit("All services are stopped.", 0)
},
Expand Down
26 changes: 25 additions & 1 deletion common/metrics/common.go
Expand Up @@ -24,7 +24,14 @@

package metrics

import "time"
import (
"fmt"
"time"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/primitives"
)

const (
distributionToTimerRatio = int(time.Millisecond / time.Nanosecond)
Expand All @@ -48,3 +55,20 @@ func getMetricDefs(serviceIdx ServiceIdx) map[int]metricDefinition {

return defs
}

// GetMetricsServiceIdx returns service id corresponding to serviceName
func GetMetricsServiceIdx(serviceName string, logger log.Logger) ServiceIdx {
switch serviceName {
case primitives.FrontendService:
return Frontend
case primitives.HistoryService:
return History
case primitives.MatchingService:
return Matching
case primitives.WorkerService:
return Worker
default:
logger.Fatal("Unknown service name for metrics!", tag.Service(serviceName))
panic(fmt.Sprintf("Unknown service name for metrics: %s", serviceName))
}
}
203 changes: 181 additions & 22 deletions common/metrics/config.go
Expand Up @@ -25,6 +25,7 @@
package metrics

import (
"fmt"
"time"

"github.com/cactus/go-statsd-client/statsd"
Expand All @@ -47,12 +48,11 @@ type (
// Statsd is the configuration for statsd reporter
Statsd *StatsdConfig `yaml:"statsd"`
// Prometheus is the configuration for prometheus reporter
Prometheus *prometheus.Configuration `yaml:"prometheus"`
// Config for Opentelemetery Prometheus metrics
OTPrometheus *PrometheusConfig `yaml:"otprometheus"`
Prometheus *PrometheusConfig `yaml:"prometheus"`
// {optional} Config for Prometheus metrics reporter for SDK reported metrics.
PrometheusSDK *PrometheusConfig `yaml:"prometheusSDK"`
// Tags is the set of key-value pairs to be reported as part of every metric
Tags map[string]string `yaml:"tags"`

// Prefix sets the prefix to all outgoing metrics
Prefix string `yaml:"prefix"`
}
Expand All @@ -72,17 +72,64 @@ type (
FlushBytes int `yaml:"flushBytes"`
}

// PrometheusConfig is a new format for config for prometheus metrics.
PrometheusConfig struct {
// Metric framework: Tally/OpenTelemetry
Framework string `yaml:framework`
// Address for prometheus to serve metrics from.
ListenAddress string `yaml:"listenAddress"`
// DefaultHistogramBoundaries defines the default histogram bucket
// boundaries.
DefaultHistogramBoundaries []float64 `yaml:"defaultHistogramBoundaries"`
// HandlerPath if specified will be used instead of using the default
// HTTP handler path "/metrics".
HandlerPath string `yaml:"handlerPath"`

// Configs below are kept for backwards compatibility with previously exposed tally prometheus.Configuration.

// Deprecated. ListenNetwork if specified will be used instead of using tcp network.
// Supported networks: tcp, tcp4, tcp6 and unix.
ListenNetwork string `yaml:"listenNetwork"`

// Deprecated. TimerType is the default Prometheus type to use for Tally timers.
TimerType string `yaml:"timerType"`

// Deprecated. DefaultHistogramBuckets if specified will set the default histogram
// buckets to be used by the reporter.
DefaultHistogramBuckets []HistogramObjective `yaml:"defaultHistogramBuckets"`

// Deprecated. DefaultSummaryObjectives if specified will set the default summary
// objectives to be used by the reporter.
DefaultSummaryObjectives []SummaryObjective `yaml:"defaultSummaryObjectives"`

// Deprecated. OnError specifies what to do when an error either with listening
// on the specified listen address or registering a metric with the
// Prometheus. By default the registerer will panic.
OnError string `yaml:"onError"`
}
)

// Deprecated. HistogramObjective is a Prometheus histogram bucket.
// Added for backwards compatibility.
type HistogramObjective struct {
Upper float64 `yaml:"upper"`
}

// Deprecated. SummaryObjective is a Prometheus summary objective.
// Added for backwards compatibility.
type SummaryObjective struct {
Percentile float64 `yaml:"percentile"`
AllowedError float64 `yaml:"allowedError"`
}

const (
ms = float64(time.Millisecond) / float64(time.Second)

// Supported framework types
// FrameworkTally tally framework id
FrameworkTally = "tally"
// FrameworkOpentelemetry OpenTelemetry framework id
FrameworkOpentelemetry = "opentelemetry"
)

// tally sanitizer options that satisfy both Prometheus and M3 restrictions.
Expand Down Expand Up @@ -111,7 +158,9 @@ var (
ReplacementCharacter: tally.DefaultReplacementCharacter,
}

defaultHistogramBuckets = tally.ValueBuckets([]float64{
defaultQuantiles = []float64{50, 75, 90, 95, 99}

defaultHistogramBoundaries = []float64{
1 * ms,
2 * ms,
5 * ms,
Expand All @@ -131,9 +180,78 @@ var (
200000 * ms,
500000 * ms,
1000000 * ms,
})
}
)

// InitMetricReporters is a root function for initalizing metrics clients.
//
// Usage pattern
// serverReporter, sdkReporter, err := c.InitMetricReporters(logger, customReporter)
// metricsClient := serverReporter.newClient(logger, serviceIdx)
//
// customReporter Provide this argument if you want to report metrics to a custom metric platform, otherwise use nil.
//
// returns SeverReporter, SDKReporter, error
func (c *Config) InitMetricReporters(logger log.Logger, customReporter interface{}) (Reporter, Reporter, error) {

if c.Prometheus != nil && len(c.Prometheus.Framework) > 0 {
return c.initReportersFromPrometheusConfig(logger, customReporter)
}

var scope tally.Scope
if customReporter != nil {
if tallyCustomReporter, ok := customReporter.(tally.BaseStatsReporter); ok {
scope = c.NewCustomReporterScope(logger, tallyCustomReporter)
} else {
return nil, nil, fmt.Errorf(
"Specified customReporter is not of expected type tally.BaseStatsReporter "+
"as expected for metrics framework \"%s\".", FrameworkTally,
)
}
} else {
scope = c.NewScope(logger)
}
reporter := newTallyReporter(scope)
return reporter, reporter, nil
}

func (c *Config) initReportersFromPrometheusConfig(logger log.Logger, customReporter interface{}) (Reporter, Reporter, error) {
if customReporter != nil {
logger.Fatal("Metrics extension point is not implemented.")
}
serverReporter, err := c.initReporterFromPrometheusConfig(logger, c.Prometheus, customReporter)
if err != nil {
return nil, nil, err
}
sdkReporter := serverReporter
if c.PrometheusSDK != nil {
sdkReporter, err = c.initReporterFromPrometheusConfig(logger, c.PrometheusSDK, customReporter)
if err != nil {
return nil, nil, err
}
}
return serverReporter, sdkReporter, nil
}

func (c *Config) initReporterFromPrometheusConfig(logger log.Logger, config *PrometheusConfig, extensionPoint interface{}) (Reporter, error) {
switch config.Framework {
case FrameworkTally:
return c.newTallyReporterByPrometheusConfig(logger, config), nil
case FrameworkOpentelemetry:
return newOpentelemeteryReporter(logger, c.Tags, c.Prefix, config)
default:
err := fmt.Errorf("Unsupported framework type specified in config: %s", config.Framework)
logger.Error(err.Error())
return nil, err
}
}

func (c *Config) newTallyReporterByPrometheusConfig(logger log.Logger, config *PrometheusConfig) Reporter {
tallyConfig := c.convertPrometheusConfigToTally(config)
tallyScope := c.newPrometheusScope(logger, tallyConfig)
return newTallyReporter(tallyScope)
}

// NewScope builds a new tally scope for this metrics configuration
//
// If the underlying configuration is valid for multiple reporter types,
Expand All @@ -149,14 +267,45 @@ func (c *Config) NewScope(logger log.Logger) tally.Scope {
return c.newStatsdScope(logger)
}
if c.Prometheus != nil {
return c.newPrometheusScope(logger)
return c.newPrometheusScope(logger, c.convertPrometheusConfigToTally(c.Prometheus))
}
return tally.NoopScope
}

func (c *Config) buildHistogramBuckets(config *PrometheusConfig) []prometheus.HistogramObjective {
var result []prometheus.HistogramObjective
if len(config.DefaultHistogramBuckets) > 0 {
result = make([]prometheus.HistogramObjective, len(config.DefaultHistogramBuckets))
for i, item := range config.DefaultHistogramBuckets {
result[i].Upper = item.Upper
}
} else if len(config.DefaultHistogramBoundaries) > 0 {
result = histogramBoundariesToHistogramObjectives(config.DefaultHistogramBoundaries)
}
return result
}

func (c *Config) convertPrometheusConfigToTally(config *PrometheusConfig) *prometheus.Configuration {
defaultObjectives := make([]prometheus.SummaryObjective, len(config.DefaultSummaryObjectives))
for i, item := range config.DefaultSummaryObjectives {
defaultObjectives[i].AllowedError = item.AllowedError
defaultObjectives[i].Percentile = item.Percentile
}

return &prometheus.Configuration{
HandlerPath: config.HandlerPath,
ListenNetwork: config.ListenNetwork,
ListenAddress: config.ListenAddress,
TimerType: "histogram",
DefaultHistogramBuckets: c.buildHistogramBuckets(config),
DefaultSummaryObjectives: defaultObjectives,
OnError: config.OnError,
}
}

func (c *Config) NewCustomReporterScope(logger log.Logger, customReporter tally.BaseStatsReporter) tally.Scope {
options := tally.ScopeOptions{
DefaultBuckets: defaultHistogramBuckets,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
if c != nil {
options.Tags = c.Tags
Expand Down Expand Up @@ -187,7 +336,7 @@ func (c *Config) newM3Scope(logger log.Logger) tally.Scope {
Tags: c.Tags,
CachedReporter: reporter,
Prefix: c.Prefix,
DefaultBuckets: defaultHistogramBuckets,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
Expand All @@ -211,26 +360,19 @@ func (c *Config) newStatsdScope(logger log.Logger) tally.Scope {
Tags: c.Tags,
Reporter: reporter,
Prefix: c.Prefix,
DefaultBuckets: defaultHistogramBuckets,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
}

// newPrometheusScope returns a new prometheus scope with
// a default reporting interval of a second
func (c *Config) newPrometheusScope(logger log.Logger) tally.Scope {
if len(c.Prometheus.DefaultHistogramBuckets) == 0 {
for _, value := range defaultHistogramBuckets {
c.Prometheus.DefaultHistogramBuckets = append(
c.Prometheus.DefaultHistogramBuckets,
prometheus.HistogramObjective{
Upper: value,
},
)
}
func (c *Config) newPrometheusScope(logger log.Logger, config *prometheus.Configuration) tally.Scope {
if len(config.DefaultHistogramBuckets) == 0 {
config.DefaultHistogramBuckets = histogramBoundariesToHistogramObjectives(defaultHistogramBoundaries)
}
reporter, err := c.Prometheus.NewReporter(
reporter, err := config.NewReporter(
prometheus.ConfigurationOptions{
Registry: prom.NewRegistry(),
OnError: func(err error) {
Expand All @@ -247,8 +389,25 @@ func (c *Config) newPrometheusScope(logger log.Logger) tally.Scope {
Separator: prometheus.DefaultSeparator,
SanitizeOptions: &sanitizeOptions,
Prefix: c.Prefix,
DefaultBuckets: defaultHistogramBuckets,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
}

func histogramBoundariesToHistogramObjectives(boundaries []float64) []prometheus.HistogramObjective {
var result []prometheus.HistogramObjective
for _, value := range boundaries {
result = append(
result,
prometheus.HistogramObjective{
Upper: value,
},
)
}
return result
}

func histogramBoundariesToValueBuckets(buckets []float64) tally.ValueBuckets {
return tally.ValueBuckets(buckets)
}
3 changes: 1 addition & 2 deletions common/metrics/config_test.go
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"github.com/uber-go/tally/m3"
"github.com/uber-go/tally/prometheus"

"go.temporal.io/server/common/log"
)
Expand Down Expand Up @@ -112,7 +111,7 @@ func (s *MetricsSuite) TestM3() {
}

func (s *MetricsSuite) TestPrometheus() {
prom := &prometheus.Configuration{
prom := &PrometheusConfig{
OnError: "panic",
TimerType: "histogram",
ListenAddress: "127.0.0.1:0",
Expand Down
9 changes: 8 additions & 1 deletion common/metrics/interfaces.go
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/uber-go/tally"
"go.temporal.io/server/common/log"
)

type (
Expand Down Expand Up @@ -62,7 +63,7 @@ type (
Scope(scope int, tags ...Tag) Scope
}

// Scope is an interface for metrics
// Scope is an interface for metric.
Scope interface {
// IncCounter increments a counter metric
IncCounter(counter int)
Expand All @@ -82,6 +83,12 @@ type (
// information to metrics
Tagged(tags ...Tag) Scope
}

// Reporter is an interface for base constructor for metrics client.
Reporter interface {
NewClient(logger log.Logger, serviceIdx ServiceIdx) (Client, error)
Stop(logger log.Logger)
}
)

var sanitizer = tally.NewSanitizer(tally.SanitizeOptions{
Expand Down

0 comments on commit b06a313

Please sign in to comment.