Skip to content

Commit

Permalink
metric interface refactor, consumer and producer decoupled (#2883)
Browse files Browse the repository at this point in the history
* metric interface refactor, consumer and producer decoupled

* moving event handler implementations in temporal repo.

* handle error and test update

* rename newclient newtallyclient

* comments and constructor

* clean and removal of code not needed.
  • Loading branch information
jbreiding committed May 27, 2022
1 parent df4d519 commit a7e3520
Show file tree
Hide file tree
Showing 31 changed files with 1,400 additions and 173 deletions.
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
install: update-tools bins

# Rebuild binaries (used by Dockerfile).
bins: clean-bins temporal-server temporal-cassandra-tool temporal-sql-tool
bins: temporal-server temporal-cassandra-tool temporal-sql-tool

# Install all tools, recompile proto files, run all possible checks and tests (long but comprehensive).
all: update-tools clean proto bins check test
Expand Down Expand Up @@ -95,6 +95,8 @@ SUMMARY_COVER_PROFILE := $(COVER_ROOT)/summary.out
# Packages are specified as import paths.
INTEG_TEST_COVERPKG := -coverpkg="$(MODULE_ROOT)/client/...,$(MODULE_ROOT)/common/...,$(MODULE_ROOT)/service/..."

ALL_SRC := $(shell find . -name "*.go")

##### Tools #####
update-checkers:
@printf $(COLOR) "Install/update check tools..."
Expand Down Expand Up @@ -184,15 +186,15 @@ clean-bins:
@rm -f temporal-cassandra-tool
@rm -f temporal-sql-tool

temporal-server:
temporal-server: $(ALL_SRC)
@printf $(COLOR) "Build temporal-server with CGO_ENABLED=$(CGO_ENABLED) for $(GOOS)/$(GOARCH)..."
CGO_ENABLED=$(CGO_ENABLED) go build -o temporal-server ./cmd/server

temporal-cassandra-tool:
temporal-cassandra-tool: $(ALL_SRC)
@printf $(COLOR) "Build temporal-cassandra-tool with CGO_ENABLED=$(CGO_ENABLED) for $(GOOS)/$(GOARCH)..."
CGO_ENABLED=$(CGO_ENABLED) go build -o temporal-cassandra-tool ./cmd/tools/cassandra

temporal-sql-tool:
temporal-sql-tool: $(ALL_SRC)
@printf $(COLOR) "Build temporal-sql-tool with CGO_ENABLED=$(CGO_ENABLED) for $(GOOS)/$(GOARCH)..."
CGO_ENABLED=$(CGO_ENABLED) go build -o temporal-sql-tool ./cmd/tools/sql

Expand Down
10 changes: 2 additions & 8 deletions common/archiver/s3store/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally/v4"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -71,9 +70,7 @@ const (
testBucketURI = "s3://test-bucket"
)

var (
testBranchToken = []byte{1, 2, 3}
)
var testBranchToken = []byte{1, 2, 3}

type historyArchiverSuite struct {
*require.Assertions
Expand Down Expand Up @@ -101,13 +98,10 @@ func (s *historyArchiverSuite) TearDownSuite() {
}

func (s *historyArchiverSuite) SetupTest() {
scope := tally.NewTestScope("test", nil)
s.Assertions = require.New(s.T())
metricsClient, err := metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.History)
s.Require().NoError(err, "metrics.NewClient")
s.container = &archiver.HistoryBootstrapContainer{
Logger: log.NewNoopLogger(),
MetricsClient: metricsClient,
MetricsClient: metrics.NoopClient,
}

s.controller = gomock.NewController(s.T())
Expand Down
12 changes: 5 additions & 7 deletions common/archiver/s3store/visibilityArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"

"github.com/uber-go/tally/v4"
commonpb "go.temporal.io/api/common/v1"
workflowpb "go.temporal.io/api/workflow/v1"
)
Expand Down Expand Up @@ -133,20 +132,16 @@ const (

func (s *visibilityArchiverSuite) SetupSuite() {
var err error
scope := tally.NewTestScope("test", nil)

s.testArchivalURI, err = archiver.NewURI(testBucketURI)
s.Require().NoError(err)
metricsClient, err := metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.History)
s.Require().NoError(err, "metrics.NewClient")
s.container = &archiver.VisibilityBootstrapContainer{
Logger: log.NewNoopLogger(),
MetricsClient: metricsClient,
MetricsClient: metrics.NoopClient,
}
}

func (s *visibilityArchiverSuite) TearDownSuite() {

}

func (s *visibilityArchiverSuite) SetupTest() {
Expand All @@ -161,6 +156,7 @@ func (s *visibilityArchiverSuite) SetupTest() {
func (s *visibilityArchiverSuite) TearDownTest() {
s.controller.Finish()
}

func (s *visibilityArchiverSuite) TestArchive_Fail_InvalidURI() {
visibilityArchiver := s.newTestVisibilityArchiver()
URI, err := archiver.NewURI("wrongscheme://")
Expand Down Expand Up @@ -273,6 +269,7 @@ func (s *visibilityArchiverSuite) TestQuery_Fail_InvalidQuery() {
s.Error(err)
s.Nil(response)
}

func (s *visibilityArchiverSuite) TestQuery_Success_DirectoryNotExist() {
visibilityArchiver := s.newTestVisibilityArchiver()
mockParser := NewMockQueryParser(s.controller)
Expand Down Expand Up @@ -520,6 +517,7 @@ func (s *visibilityArchiverSuite) TestArchiveAndQueryPrecisions() {
s.Len(response.Executions, 2, "Iteration ", i)
}
}

func (s *visibilityArchiverSuite) TestArchiveAndQuery() {
visibilityArchiver := s.newTestVisibilityArchiver()
URI, err := archiver.NewURI(testBucketURI + "/archive-and-query")
Expand All @@ -540,7 +538,7 @@ func (s *visibilityArchiverSuite) TestArchiveAndQuery() {
Query: "parsed by mockParser",
}
executions := []*workflowpb.WorkflowExecutionInfo{}
var first = true
first := true
for first || request.NextPageToken != nil {
response, err := visibilityArchiver.Query(context.Background(), URI, request, searchattribute.TestNameTypeMap)
s.NoError(err)
Expand Down
25 changes: 3 additions & 22 deletions common/metrics/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
type (
// Config contains the config items for metrics subsystem
Config struct {
ClientConfig `yaml:"clientConfig,inline""`
ClientConfig `yaml:"clientConfig,inline"`

// M3 is the configuration for m3 metrics reporter
M3 *m3.Configuration `yaml:"m3"`
Expand Down Expand Up @@ -95,7 +95,7 @@ type (
// PrometheusConfig is a new format for config for prometheus metrics.
PrometheusConfig struct {
// Metric framework: Tally/OpenTelemetry
Framework string `yaml:framework`
Framework string `yaml:"framework"`
// Address for prometheus to serve metrics from.
ListenAddress string `yaml:"listenAddress"`
// DefaultHistogramBoundaries defines the default histogram bucket
Expand Down Expand Up @@ -293,9 +293,6 @@ func InitReporterFromPrometheusConfig(logger log.Logger, config *PrometheusConfi
// Current priority order is:
// m3 > statsd > prometheus
func NewScope(logger log.Logger, c *Config) tally.Scope {
if c.M3 != nil {
return newM3Scope(logger, c)
}
if c.Statsd != nil {
return newStatsdScope(logger, c)
}
Expand Down Expand Up @@ -361,22 +358,6 @@ func setDefaultPerUnitHistogramBoundaries(clientConfig *ClientConfig) {
}
}

// newM3Scope returns a new m3 scope with
// a default reporting interval of a second
func newM3Scope(logger log.Logger, c *Config) tally.Scope {
reporter, err := c.M3.NewReporter()
if err != nil {
logger.Fatal("error creating m3 reporter", tag.Error(err))
}
scopeOpts := tally.ScopeOptions{
Tags: c.Tags,
CachedReporter: reporter,
Prefix: c.Prefix,
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
}

// newM3Scope returns a new statsd scope with
// a default reporting interval of a second
func newStatsdScope(logger log.Logger, c *Config) tally.Scope {
Expand All @@ -388,7 +369,7 @@ func newStatsdScope(logger log.Logger, c *Config) tally.Scope {
if err != nil {
logger.Fatal("error creating statsd client", tag.Error(err))
}
//NOTE: according to ( https://github.com/uber-go/tally )Tally's statsd implementation doesn't support tagging.
// NOTE: according to ( https://github.com/uber-go/tally )Tally's statsd implementation doesn't support tagging.
// Therefore, we implement Tally interface to have a statsd reporter that can support tagging
reporter := statsdreporter.NewReporter(statter, tallystatsdreporter.Options{})
scopeOpts := tally.ScopeOptions{
Expand Down
7 changes: 4 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ type (
)

// MetricUnit supported values
// Values are pulled from https://pkg.go.dev/golang.org/x/exp/event#Unit
const (
Dimensionless = "dimensionless"
Milliseconds = "milliseconds"
Bytes = "bytes"
Dimensionless = "1"
Milliseconds = "ms"
Bytes = "By"
)

// MetricTypes which are supported
Expand Down
145 changes: 145 additions & 0 deletions common/metrics/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package metrics

import (
"context"
"time"

"golang.org/x/exp/event"

"go.temporal.io/server/common/log"
)

type (
eventMetricProvider struct {
context context.Context
tags []Tag
}

// MetricHandler represents the extension point for collection instruments
// typedef of event.Handler from https://pkg.go.dev/golang.org/x/exp/event#Handler
MetricHandler = event.Handler
)

var _ MetricProvider = (*eventMetricProvider)(nil)

// MetricHandlerFromConfig is used at startup to construct
func MetricHandlerFromConfig(logger log.Logger, c *Config) MetricHandler {
setDefaultPerUnitHistogramBoundaries(&c.ClientConfig)
if c.Prometheus != nil && len(c.Prometheus.Framework) > 0 {
switch c.Prometheus.Framework {
case FrameworkTally:
return NewTallyMetricHandler(
logger,
newPrometheusScope(
logger,
convertPrometheusConfigToTally(c.Prometheus),
&c.ClientConfig,
),
c.ClientConfig.PerUnitHistogramBoundaries,
)
case FrameworkOpentelemetry:
otelProvider, err := NewOpenTelemetryProvider(logger, c.Prometheus, &c.ClientConfig)
if err != nil {
logger.Fatal(err.Error())
}

return NewOtelMetricHandler(logger, otelProvider.GetMeter())
}
}

return NewTallyMetricHandler(
logger,
NewScope(logger, c),
c.ClientConfig.PerUnitHistogramBoundaries,
)
}

// NewEventMetricProvider provides an eventMetricProvider given event.Exporter struct
func NewEventMetricProvider(h MetricHandler) *eventMetricProvider {
eo := &event.ExporterOptions{
DisableLogging: true,
DisableTracing: true,
}

return &eventMetricProvider{
context: event.WithExporter(context.Background(), event.NewExporter(h, eo)),
tags: []Tag{},
}
}

// WithTags creates a new MetricProvder with provided []Tag
// Tags are merged with registered Tags from the source MetricProvider
func (emp *eventMetricProvider) WithTags(tags ...Tag) MetricProvider {
return &eventMetricProvider{
context: emp.context,
tags: append(emp.tags, tags...),
}
}

// Counter obtains a counter for the given name.
func (emp *eventMetricProvider) Counter(n string, m *MetricOptions) CounterMetric {
return CounterMetricFunc(func(i int64, t ...Tag) {
e := event.NewCounter(n, m)
e.Record(emp.context, i, emp.tagsToLabels(t)...)
})
}

// Gauge obtains a gauge for the given name.
func (emp *eventMetricProvider) Gauge(n string, m *MetricOptions) GaugeMetric {
return GaugeMetricFunc(func(f float64, t ...Tag) {
e := event.NewFloatGauge(n, m)
e.Record(emp.context, f, emp.tagsToLabels(t)...)
})
}

// Timer obtains a timer for the given name.
func (emp *eventMetricProvider) Timer(n string, m *MetricOptions) TimerMetric {
return TimerMetricFunc(func(d time.Duration, t ...Tag) {
e := event.NewDuration(n, m)
e.Record(emp.context, d, emp.tagsToLabels(t)...)
})
}

// Histogram obtains a histogram for the given name.
func (emp *eventMetricProvider) Histogram(n string, m *MetricOptions) HistogramMetric {
return HistogramMetricFunc(func(i int64, t ...Tag) {
e := event.NewIntDistribution(n, m)
e.Record(emp.context, i, emp.tagsToLabels(t)...)
})
}

// tagsToLabels helper to merge registred tags and additional tags converting to event.Label struct
func (emp *eventMetricProvider) tagsToLabels(tags []Tag) []event.Label {
l := make([]event.Label, len(emp.tags)+len(tags))
t := append(emp.tags, tags...)

for i := range t {
l[i] = event.String(t[i].Key(), t[i].Value())
}

return l
}

0 comments on commit a7e3520

Please sign in to comment.