Skip to content

Commit

Permalink
Expose metrics reporters constructors to end-user (#2481)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan committed Feb 15, 2022
1 parent 632c4f7 commit eed1fde
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 150 deletions.
127 changes: 72 additions & 55 deletions common/metrics/config.go
Expand Up @@ -51,8 +51,6 @@ type (
Statsd *StatsdConfig `yaml:"statsd"`
// Prometheus is the configuration for prometheus reporter
Prometheus *PrometheusConfig `yaml:"prometheus"`
// Deprecated {optional} Config for Prometheus metrics reporter for SDK reported metrics.
PrometheusSDK *PrometheusConfig `yaml:"prometheusSDK"`
}

ClientConfig struct {
Expand Down Expand Up @@ -207,96 +205,109 @@ var (
}
)

// InitMetricReporters is a root function for initalizing metrics clients.
// InitMetricsReporterInternal is a root function for initializing metrics clients used inside temporal server.
//
// Usage pattern
// serverReporter, sdkReporter, err := c.InitMetricReporters(logger, customReporter)
// Usage pattern;
// serverReporter, err := c.InitMetricsReporterInternal(logger, config, customReporter)
// metricsClient := serverReporter.newClient(logger, serviceIdx)
//
// customReporter Provide this argument if you want to report metrics to a custom metric platform, otherwise use nil.
//
// returns SeverReporter, SDKReporter, error
func (c *Config) InitMetricReporters(logger log.Logger, customReporter interface{}) (Reporter, Reporter, error) {
// init from config only
// returns SeverReporter, error
func InitMetricsReporterInternal(logger log.Logger, c *Config, customReporter interface{}) (Reporter, error) {
if customReporter == nil {
if c.Prometheus != nil && len(c.Prometheus.Framework) > 0 {
return c.initReportersFromPrometheusConfig(logger, customReporter)
}

scope := c.NewScope(logger)
reporter := newTallyReporter(scope, &c.ClientConfig)
return reporter, reporter, nil
reporter, err := InitMetricsReporter(logger, c)
return reporter, err
}

// handle custom reporter from ServerOptions
switch cReporter := customReporter.(type) {
case tally.BaseStatsReporter:
scope := c.NewCustomReporterScope(logger, cReporter)
reporter := newTallyReporter(scope, &c.ClientConfig)
return reporter, reporter, nil
scope := NewCustomReporterScope(logger, &c.ClientConfig, cReporter)
reporter := NewTallyReporter(scope, &c.ClientConfig)
return reporter, nil
case Reporter:
return cReporter, cReporter, nil
return cReporter, nil
default:
return nil, nil, fmt.Errorf(
return nil, fmt.Errorf(
"specified customReporter does not implement tally.BaseStatsReporter or metrics.Reporter")
}
}

func (c *Config) initReportersFromPrometheusConfig(logger log.Logger, customReporter interface{}) (Reporter, Reporter, error) {
serverReporter, err := c.initReporterFromPrometheusConfig(logger, c.Prometheus, &c.ClientConfig)
if err != nil {
return nil, nil, err
}
sdkReporter := serverReporter
if c.PrometheusSDK != nil {
sdkReporter, err = c.initReporterFromPrometheusConfig(logger, c.PrometheusSDK, &c.ClientConfig)
if err != nil {
return nil, nil, err
}
// InitMetricsReporter is a method that initializes reporter to be used inside server.
//
// Reporter is a base for reporting metrics and is used to initialize MetricsClient or UserScope.
// MetricsClient is utilized internally in server to report metrics.
// UserScope is utilized by user to report metrics.
//
// Recommended to use for current support for reporting metrics in extensions.
//
// reporter := InitMetricsReporter()
// extension := MyExtensions(reporter.UserScope())
// serverOptions.WithReporter(reporter)
func InitMetricsReporter(logger log.Logger, c *Config) (Reporter, error) {
if c.Prometheus != nil && len(c.Prometheus.Framework) > 0 {
return InitReporterFromPrometheusConfig(logger, c.Prometheus, &c.ClientConfig)
}
return serverReporter, sdkReporter, nil
return NewTallyReporterFromConfig(logger, c)
}

func (c *Config) initReporterFromPrometheusConfig(logger log.Logger, config *PrometheusConfig, clientConfig *ClientConfig) (Reporter, error) {
func NewTallyReporterFromConfig(logger log.Logger, c *Config) (*TallyReporter, error) {
scope := NewScope(logger, c)
reporter := NewTallyReporter(scope, &c.ClientConfig)
return reporter, nil
}

// InitReporterFromPrometheusConfig initializes reporter from PrometheusConfig
//
// This is a custom case of initializing temporal metrics reporter.
func InitReporterFromPrometheusConfig(logger log.Logger, config *PrometheusConfig, clientConfig *ClientConfig) (Reporter, error) {
// TODO: We should switch to this being the only metrics reporter constructor once we decide to deprecate tally and
// custom tally configs Config.Statsd and Config.M3.
switch config.Framework {
case FrameworkTally:
return c.newTallyReporterByPrometheusConfig(logger, config), nil
return NewTallyReporterFromPrometheusConfig(logger, config, clientConfig), nil
case FrameworkOpentelemetry:
return newOpentelemeteryReporter(logger, config, clientConfig)
return NewOpentelemeteryReporter(logger, config, clientConfig)
default:
err := fmt.Errorf("unsupported framework type specified in config: %q", config.Framework)
logger.Error(err.Error())
return nil, err
}
}

func (c *Config) newTallyReporterByPrometheusConfig(logger log.Logger, config *PrometheusConfig) Reporter {
tallyConfig := c.convertPrometheusConfigToTally(config)
tallyScope := c.newPrometheusScope(logger, tallyConfig)
return newTallyReporter(tallyScope, &c.ClientConfig)
}

// NewScope builds a new tally scope for this metrics configuration
//
// If the underlying configuration is valid for multiple reporter types,
// only one of them will be used for reporting.
//
// Current priority order is:
// m3 > statsd > prometheus
func (c *Config) NewScope(logger log.Logger) tally.Scope {
func NewScope(logger log.Logger, c *Config) tally.Scope {
if c.M3 != nil {
return c.newM3Scope(logger)
return newM3Scope(logger, c)
}
if c.Statsd != nil {
return c.newStatsdScope(logger)
return newStatsdScope(logger, c)
}
if c.Prometheus != nil {
return c.newPrometheusScope(logger, c.convertPrometheusConfigToTally(c.Prometheus))
return newPrometheusScope(logger, convertPrometheusConfigToTally(c.Prometheus), &c.ClientConfig)
}
return tally.NoopScope
}

func (c *Config) buildHistogramBuckets(config *PrometheusConfig) []prometheus.HistogramObjective {
func NewTallyReporterFromPrometheusConfig(
logger log.Logger,
config *PrometheusConfig,
clientConfig *ClientConfig,
) Reporter {
tallyConfig := convertPrometheusConfigToTally(config)
tallyScope := newPrometheusScope(logger, tallyConfig, clientConfig)
return NewTallyReporter(tallyScope, clientConfig)
}

func buildHistogramBuckets(
config *PrometheusConfig,
) []prometheus.HistogramObjective {
var result []prometheus.HistogramObjective
if len(config.DefaultHistogramBuckets) > 0 {
result = make([]prometheus.HistogramObjective, len(config.DefaultHistogramBuckets))
Expand All @@ -309,7 +320,9 @@ func (c *Config) buildHistogramBuckets(config *PrometheusConfig) []prometheus.Hi
return result
}

func (c *Config) convertPrometheusConfigToTally(config *PrometheusConfig) *prometheus.Configuration {
func convertPrometheusConfigToTally(
config *PrometheusConfig,
) *prometheus.Configuration {
defaultObjectives := make([]prometheus.SummaryObjective, len(config.DefaultSummaryObjectives))
for i, item := range config.DefaultSummaryObjectives {
defaultObjectives[i].AllowedError = item.AllowedError
Expand All @@ -321,13 +334,13 @@ func (c *Config) convertPrometheusConfigToTally(config *PrometheusConfig) *prome
ListenNetwork: config.ListenNetwork,
ListenAddress: config.ListenAddress,
TimerType: "histogram",
DefaultHistogramBuckets: c.buildHistogramBuckets(config),
DefaultHistogramBuckets: buildHistogramBuckets(config),
DefaultSummaryObjectives: defaultObjectives,
OnError: config.OnError,
}
}

func (c *Config) NewCustomReporterScope(logger log.Logger, customReporter tally.BaseStatsReporter) tally.Scope {
func NewCustomReporterScope(logger log.Logger, c *ClientConfig, customReporter tally.BaseStatsReporter) tally.Scope {
options := tally.ScopeOptions{
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
Expand All @@ -351,7 +364,7 @@ func (c *Config) NewCustomReporterScope(logger log.Logger, customReporter tally.

// newM3Scope returns a new m3 scope with
// a default reporting interval of a second
func (c *Config) newM3Scope(logger log.Logger) tally.Scope {
func newM3Scope(logger log.Logger, c *Config) tally.Scope {
reporter, err := c.M3.NewReporter()
if err != nil {
logger.Fatal("error creating m3 reporter", tag.Error(err))
Expand All @@ -368,7 +381,7 @@ func (c *Config) newM3Scope(logger log.Logger) tally.Scope {

// newM3Scope returns a new statsd scope with
// a default reporting interval of a second
func (c *Config) newStatsdScope(logger log.Logger) tally.Scope {
func newStatsdScope(logger log.Logger, c *Config) tally.Scope {
config := c.Statsd
if len(config.HostPort) == 0 {
return tally.NoopScope
Expand All @@ -392,7 +405,11 @@ func (c *Config) newStatsdScope(logger log.Logger) tally.Scope {

// newPrometheusScope returns a new prometheus scope with
// a default reporting interval of a second
func (c *Config) newPrometheusScope(logger log.Logger, config *prometheus.Configuration) tally.Scope {
func newPrometheusScope(
logger log.Logger,
config *prometheus.Configuration,
clientConfig *ClientConfig,
) tally.Scope {
if len(config.DefaultHistogramBuckets) == 0 {
config.DefaultHistogramBuckets = histogramBoundariesToHistogramObjectives(defaultHistogramBoundaries)
}
Expand All @@ -408,11 +425,11 @@ func (c *Config) newPrometheusScope(logger log.Logger, config *prometheus.Config
logger.Fatal("error creating prometheus reporter", tag.Error(err))
}
scopeOpts := tally.ScopeOptions{
Tags: c.Tags,
Tags: clientConfig.Tags,
CachedReporter: reporter,
Separator: prometheus.DefaultSeparator,
SanitizeOptions: &sanitizeOptions,
Prefix: c.Prefix,
Prefix: clientConfig.Prefix,
DefaultBuckets: histogramBoundariesToValueBuckets(defaultHistogramBoundaries),
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
Expand Down
17 changes: 8 additions & 9 deletions common/metrics/config_test.go
Expand Up @@ -97,7 +97,7 @@ func (s *MetricsSuite) TestStatsd() {

config := new(Config)
config.Statsd = statsd
scope := config.NewScope(log.NewNoopLogger())
scope := NewScope(log.NewNoopLogger(), config)
s.NotNil(scope)
}

Expand All @@ -109,7 +109,7 @@ func (s *MetricsSuite) TestM3() {
}
config := new(Config)
config.M3 = m3
scope := config.NewScope(log.NewNoopLogger())
scope := NewScope(log.NewNoopLogger(), config)
s.NotNil(scope)
}

Expand All @@ -121,33 +121,33 @@ func (s *MetricsSuite) TestPrometheus() {
}
config := new(Config)
config.Prometheus = prom
scope := config.NewScope(log.NewNoopLogger())
scope := NewScope(log.NewNoopLogger(), config)
s.NotNil(scope)
}

func (s *MetricsSuite) TestNoop() {
config := &Config{}
scope := config.NewScope(log.NewNoopLogger())
scope := NewScope(log.NewNoopLogger(), config)
s.Equal(tally.NoopScope, scope)
}

func (s *MetricsSuite) TestCustomReporter() {
config := &Config{}
scope := config.NewCustomReporterScope(log.NewNoopLogger(), tally.NullStatsReporter)
scope := NewCustomReporterScope(log.NewNoopLogger(), &config.ClientConfig, tally.NullStatsReporter)
s.NotNil(scope)
s.NotEqual(tally.NoopScope, scope)
}

func (s *MetricsSuite) TestCustomCachedReporter() {
config := &Config{}
scope := config.NewCustomReporterScope(log.NewNoopLogger(), CachedNullStatsReporter)
scope := NewCustomReporterScope(log.NewNoopLogger(), &config.ClientConfig, CachedNullStatsReporter)
s.NotNil(scope)
s.NotEqual(tally.NoopScope, scope)
}

func (s *MetricsSuite) TestUnsupportedReporter() {
config := &Config{}
scope := config.NewCustomReporterScope(log.NewNoopLogger(), UnsupportedNullStatsReporter)
scope := NewCustomReporterScope(log.NewNoopLogger(), &config.ClientConfig, UnsupportedNullStatsReporter)
s.Equal(tally.NoopScope, scope)
}

Expand All @@ -161,8 +161,7 @@ func (s *MetricsSuite) TestOTCustomReporter() {
config := &Config{}
config.Prometheus = prom
mockReporter := NewMockReporter(s.controller)
reporter, sdkReporter, err := config.InitMetricReporters(log.NewNoopLogger(), mockReporter)
reporter, err := InitMetricsReporterInternal(log.NewNoopLogger(), config, mockReporter)
s.Equal(mockReporter, reporter)
s.Equal(mockReporter, sdkReporter)
s.Nil(err)
}
1 change: 1 addition & 0 deletions common/metrics/interfaces.go
Expand Up @@ -116,6 +116,7 @@ type (
Reporter interface {
NewClient(logger log.Logger, serviceIdx ServiceIdx) (Client, error)
Stop(logger log.Logger)
UserScope() UserScope
}
)

Expand Down
14 changes: 14 additions & 0 deletions common/metrics/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions common/metrics/opentelemetry_client.go
Expand Up @@ -40,7 +40,7 @@ type (
serviceIdx ServiceIdx
scopeWrapper func(impl internalScope) internalScope
gaugeCache OtelGaugeCache
userScope *opentelemetryUserScope
userScope UserScope
}
)

Expand All @@ -53,9 +53,7 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
return NewTagFilteringScope(tagsFilterConfig, impl)
}

userScope := newOpentelemetryUserScope(reporter, clientConfig.Tags, gaugeCache)

globalRootScope := newOpentelemetryScope(serviceIdx, reporter, nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)
globalRootScope := newOpentelemetryScope(serviceIdx, reporter.GetMeterMust(), nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &opentelemetryClient{
Expand All @@ -65,7 +63,7 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
serviceIdx: serviceIdx,
scopeWrapper: scopeWrapper,
gaugeCache: gaugeCache,
userScope: userScope,
userScope: reporter.UserScope(),
}

for idx, def := range ScopeDefs[Common] {
Expand Down

0 comments on commit eed1fde

Please sign in to comment.