Skip to content

Commit

Permalink
Add service type tag to metrics (#2545)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan committed Feb 25, 2022
1 parent f92cc93 commit 9964786
Show file tree
Hide file tree
Showing 23 changed files with 155 additions and 43 deletions.
4 changes: 3 additions & 1 deletion common/archiver/s3store/historyArchiver_test.go
Expand Up @@ -103,9 +103,11 @@ func (s *historyArchiverSuite) TearDownSuite() {
func (s *historyArchiverSuite) SetupTest() {
scope := tally.NewTestScope("test", nil)
s.Assertions = require.New(s.T())
metricsClient, err := metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.History)
s.Require().NoError(err, "metrics.NewClient")
s.container = &archiver.HistoryBootstrapContainer{
Logger: log.NewNoopLogger(),
MetricsClient: metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.HistoryArchiverScope),
MetricsClient: metricsClient,
}

s.controller = gomock.NewController(s.T())
Expand Down
5 changes: 3 additions & 2 deletions common/archiver/s3store/visibilityArchiver_test.go
Expand Up @@ -137,10 +137,11 @@ func (s *visibilityArchiverSuite) SetupSuite() {

s.testArchivalURI, err = archiver.NewURI(testBucketURI)
s.Require().NoError(err)

metricsClient, err := metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.History)
s.Require().NoError(err, "metrics.NewClient")
s.container = &archiver.VisibilityBootstrapContainer{
Logger: log.NewNoopLogger(),
MetricsClient: metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.VisibilityArchiverScope),
MetricsClient: metricsClient,
}
}

Expand Down
24 changes: 24 additions & 0 deletions common/metrics/common.go
Expand Up @@ -67,8 +67,32 @@ func GetMetricsServiceIdx(serviceName string, logger log.Logger) ServiceIdx {
return Matching
case primitives.WorkerService:
return Worker
case primitives.ServerService:
return Server
case primitives.UnitTestService:
return UnitTestService
default:
logger.Fatal("Unknown service name for metrics!", tag.Service(serviceName))
panic(fmt.Sprintf("Unknown service name for metrics: %s", serviceName))
}
}

// GetMetricsServiceIdx returns service id corresponding to serviceName
func MetricsServiceIdxToServiceName(serviceIdx ServiceIdx) (string, error) {
switch serviceIdx {
case Server:
return primitives.ServerService, nil
case Frontend:
return primitives.FrontendService, nil
case History:
return primitives.HistoryService, nil
case Matching:
return primitives.MatchingService, nil
case Worker:
return primitives.WorkerService, nil
case UnitTestService:
return primitives.UnitTestService, nil
default:
return "", fmt.Errorf(fmt.Sprintf("Unknown service idx for metrics: %d", serviceIdx))
}
}
16 changes: 9 additions & 7 deletions common/metrics/metric_client_tests.go
Expand Up @@ -29,6 +29,8 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"go.temporal.io/server/common/primitives"
)

type (
Expand Down Expand Up @@ -73,7 +75,7 @@ func (s *MetricTestSuiteBase) TestClientReportCounter() {
TestScope1,
TestCounterMetric1, 66)
testDef := MetricDefs[UnitTestService][TestCounterMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsCounter(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, 66))
assert.NoError(s.T(), s.metricTestUtility.ContainsCounter(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, 66))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}

Expand All @@ -82,7 +84,7 @@ func (s *MetricTestSuiteBase) TestClientReportGauge() {
TestScope1,
TestGaugeMetric1, 66)
testDef := MetricDefs[UnitTestService][TestGaugeMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsGauge(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, 66))
assert.NoError(s.T(), s.metricTestUtility.ContainsGauge(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, 66))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}

Expand All @@ -92,7 +94,7 @@ func (s *MetricTestSuiteBase) TestClientReportTimer() {
TestScope1,
TestTimerMetric1, targetDuration)
testDef := MetricDefs[UnitTestService][TestTimerMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsTimer(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, targetDuration))
assert.NoError(s.T(), s.metricTestUtility.ContainsTimer(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, targetDuration))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}

Expand All @@ -101,28 +103,28 @@ func (s *MetricTestSuiteBase) TestClientReportHistogram() {
TestScope1,
TestDimensionlessHistogramMetric1, 66)
testDef := MetricDefs[UnitTestService][TestDimensionlessHistogramMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsHistogram(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, 66))
assert.NoError(s.T(), s.metricTestUtility.ContainsHistogram(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, 66))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}

func (s *MetricTestSuiteBase) TestScopeReportCounter() {
s.testClient.Scope(TestScope1).AddCounter(TestCounterMetric1, 66)
testDef := MetricDefs[UnitTestService][TestCounterMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsCounter(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, 66))
assert.NoError(s.T(), s.metricTestUtility.ContainsCounter(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, 66))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}

func (s *MetricTestSuiteBase) TestScopeReportGauge() {
s.testClient.Scope(TestScope1).UpdateGauge(TestGaugeMetric1, 66)
testDef := MetricDefs[UnitTestService][TestGaugeMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsGauge(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, 66))
assert.NoError(s.T(), s.metricTestUtility.ContainsGauge(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, 66))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}

func (s *MetricTestSuiteBase) TestScopeReportTimer() {
targetDuration := time.Second * 100
s.testClient.Scope(TestScope1).RecordTimer(TestTimerMetric1, targetDuration)
testDef := MetricDefs[UnitTestService][TestTimerMetric1]
assert.NoError(s.T(), s.metricTestUtility.ContainsTimer(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation}, targetDuration))
assert.NoError(s.T(), s.metricTestUtility.ContainsTimer(testDef.metricName, map[string]string{namespace: namespaceAllValue, OperationTagName: ScopeDefs[UnitTestService][TestScope1].operation, serviceName: primitives.UnitTestService}, targetDuration))
assert.Equal(s.T(), 1, s.metricTestUtility.CollectionSize())
}
12 changes: 12 additions & 0 deletions common/metrics/opentelemetry_client.go
Expand Up @@ -25,6 +25,7 @@
package metrics

import (
"fmt"
"time"

"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -55,6 +56,17 @@ func NewOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,

globalRootScope := newOpentelemetryScope(serviceIdx, reporter.GetMeterMust(), nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache, false)

serviceTypeTagValue, err := MetricsServiceIdxToServiceName(serviceIdx)
if err != nil {
return nil, fmt.Errorf("failed to initialize metrics client: %w", err)
}

rootTags := make(map[string]string, len(clientConfig.Tags)+1)
for k, v := range clientConfig.Tags {
rootTags[k] = v
}
rootTags[serviceName] = serviceTypeTagValue

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &opentelemetryClient{
rootScope: globalRootScope,
Expand Down
1 change: 0 additions & 1 deletion common/metrics/opentelemetry_reporter.go
Expand Up @@ -93,7 +93,6 @@ func (r *opentelemetryReporterImpl) GetMeterMust() metric.MeterMust {
}

func (r *opentelemetryReporterImpl) NewClient(logger log.Logger, serviceIdx ServiceIdx) (Client, error) {

return NewOpentelemeteryClient(r.clientConfig, serviceIdx, r, logger, r.gaugeCache)
}

Expand Down
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Expand Up @@ -46,6 +46,7 @@ const (
workflowType = "workflowType"
activityType = "activityType"
commandType = "commandType"
serviceName = "service_name"

namespaceAllValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -227,3 +228,7 @@ func HttpStatusTag(value int) Tag {
func ResourceExhaustedCauseTag(cause enumspb.ResourceExhaustedCause) Tag {
return &tagImpl{key: resourceExhaustedTag, value: cause.String()}
}

func ServiceTypeTag(value string) Tag {
return &tagImpl{key: serviceName, value: value}
}
18 changes: 14 additions & 4 deletions common/metrics/tally_client.go
Expand Up @@ -25,11 +25,14 @@
package metrics

import (
"fmt"
"time"

"github.com/uber-go/tally/v4"
)

var _ Client = (*TallyClient)(nil)

// TallyClient is used for reporting metrics by various Temporal services
type TallyClient struct {
// This is the scope provided by user to the client. It contains no client-specific tags.
Expand All @@ -46,7 +49,7 @@ type TallyClient struct {
// Client implementation
// reporter holds the common tags for the service
// serviceIdx indicates the service type in (InputhostIndex, ... StorageIndex)
func NewClient(clientConfig *ClientConfig, scope tally.Scope, serviceIdx ServiceIdx) Client {
func NewClient(clientConfig *ClientConfig, scope tally.Scope, serviceIdx ServiceIdx) (Client, error) {
tagsFilterConfig := NewTagFilteringScopeConfig(clientConfig.ExcludeTags)

perUnitBuckets := make(map[MetricUnit]tally.Buckets)
Expand All @@ -58,6 +61,13 @@ func NewClient(clientConfig *ClientConfig, scope tally.Scope, serviceIdx Service
return NewTagFilteringScope(tagsFilterConfig, impl)
}

serviceTypeTagValue, err := MetricsServiceIdxToServiceName(serviceIdx)
if err != nil {
return nil, fmt.Errorf("failed to initialize metrics client: %w", err)
}

rootScope := scope.Tagged(map[string]string{serviceName: serviceTypeTagValue})

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &TallyClient{
globalRootScope: scope,
Expand All @@ -75,7 +85,7 @@ func NewClient(clientConfig *ClientConfig, scope tally.Scope, serviceIdx Service
namespace: namespaceAllValue,
}
mergeMapToRight(def.tags, scopeTags)
metricsClient.childScopes[idx] = scope.Tagged(scopeTags)
metricsClient.childScopes[idx] = rootScope.Tagged(scopeTags)
}

for idx, def := range ScopeDefs[serviceIdx] {
Expand All @@ -84,10 +94,10 @@ func NewClient(clientConfig *ClientConfig, scope tally.Scope, serviceIdx Service
namespace: namespaceAllValue,
}
mergeMapToRight(def.tags, scopeTags)
metricsClient.childScopes[idx] = scope.Tagged(scopeTags)
metricsClient.childScopes[idx] = rootScope.Tagged(scopeTags)
}

return metricsClient
return metricsClient, nil
}

// IncCounter increments one for a counter and emits
Expand Down
6 changes: 5 additions & 1 deletion common/metrics/tally_metric_test_utility.go
Expand Up @@ -46,7 +46,11 @@ func NewTallyMetricTestUtility() *TallyMetricTestUtility {
}

func (t *TallyMetricTestUtility) GetClient(config *ClientConfig, idx ServiceIdx) Client {
return NewClient(config, t.scope, idx)
result, err := NewClient(config, t.scope, idx)
if err != nil {
panic(err)
}
return result
}

func (t *TallyMetricTestUtility) ContainsCounter(name MetricName, labels map[string]string, value int64) error {
Expand Down
2 changes: 1 addition & 1 deletion common/metrics/tally_reporter.go
Expand Up @@ -51,7 +51,7 @@ func NewTallyReporter(
}

func (tr *TallyReporter) NewClient(logger log.Logger, serviceIdx ServiceIdx) (Client, error) {
return NewClient(tr.clientConfig, tr.scope, serviceIdx), nil
return NewClient(tr.clientConfig, tr.scope, serviceIdx)
}

func (tr *TallyReporter) GetScope() tally.Scope {
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/persistence-tests/persistenceTestBase.go
Expand Up @@ -211,7 +211,9 @@ func (s *TestBase) Setup(clusterMetadataConfig *cluster.Config) {

cfg := s.DefaultTestCluster.Config()
scope := tally.NewTestScope(common.HistoryServiceName, make(map[string]string))
metricsClient := metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.GetMetricsServiceIdx(common.HistoryServiceName, s.Logger))
var metricsClient metrics.Client
metricsClient, err = metrics.NewClient(&metrics.ClientConfig{}, scope, metrics.GetMetricsServiceIdx(common.HistoryServiceName, s.Logger))
s.fatalOnError("metrics.NewClient", err)
factory := client.NewFactoryImpl(&cfg, resolver.NewNoopResolver(), nil, serialization.NewSerializer(), s.AbstractDataStoreFactory, clusterName, metricsClient, s.Logger)

s.TaskMgr, err = factory.NewTaskManager()
Expand Down
2 changes: 2 additions & 0 deletions common/primitives/role.go
Expand Up @@ -31,4 +31,6 @@ const (
HistoryService = "history"
MatchingService = "matching"
WorkerService = "worker"
ServerService = "server"
UnitTestService = "unittest"
)
7 changes: 6 additions & 1 deletion common/resource/resourceTest.go
Expand Up @@ -165,6 +165,11 @@ func NewTest(

scope := tally.NewTestScope("test", nil)

metricClient, err := metrics.NewClient(&metrics.ClientConfig{}, scope, serviceMetricsIndex)
if err != nil {
panic(err)
}

return &Test{
MetricsScope: scope,
ClusterMetadata: cluster.NewMockMetadata(controller),
Expand All @@ -177,7 +182,7 @@ func NewTest(
NamespaceCache: namespace.NewMockRegistry(controller),
TimeSource: clock.NewRealTimeSource(),
PayloadSerializer: serialization.NewSerializer(),
MetricsClient: metrics.NewClient(&metrics.ClientConfig{}, scope, serviceMetricsIndex),
MetricsClient: metricClient,
ArchivalMetadata: archiver.NewMockArchivalMetadata(controller),
ArchiverProvider: provider.NewMockArchiverProvider(controller),

Expand Down
5 changes: 4 additions & 1 deletion common/tasks/benchmark_test.go
Expand Up @@ -53,11 +53,14 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler(b *testing.B) {
3: 1,
}
logger := log.NewTestLogger()
metricsClient := metrics.NewClient(
metricsClient, err := metrics.NewClient(
&metrics.ClientConfig{},
tally.NewTestScope("test", nil),
metrics.Common,
)
if err != nil {
panic(err)
}

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions{
Expand Down
7 changes: 5 additions & 2 deletions common/tasks/interleaved_weighted_round_robin_test.go
Expand Up @@ -76,11 +76,14 @@ func (s *interleavedWeightedRoundRobinSchedulerSuite) SetupTest() {
3: 1,
}
logger := log.NewTestLogger()
metricsClient := metrics.NewClient(
metricsClient, err := metrics.NewClient(
&metrics.ClientConfig{},
tally.NewTestScope("test", nil),
metrics.Common,
metrics.History,
)
if err != nil {
s.NoError(err, "metrics.NewClient")
}

s.scheduler = NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions{
Expand Down
7 changes: 6 additions & 1 deletion common/tasks/parallel_processor_test.go
Expand Up @@ -62,12 +62,17 @@ func (s *parallelProcessorSuite) SetupTest() {

s.controller = gomock.NewController(s.T())

metricsClient, err := metrics.NewClient(&metrics.ClientConfig{}, tally.NoopScope, metrics.History)
if err != nil {
s.NoError(err, "metrics.NewClient")
}

s.processor = NewParallelProcessor(
&ParallelProcessorOptions{
QueueSize: 1,
WorkerCount: 1,
},
metrics.NewClient(&metrics.ClientConfig{}, tally.NoopScope, metrics.Common),
metricsClient,
log.NewNoopLogger(),
)
s.retryPolicy = backoff.NewExponentialRetryPolicy(time.Millisecond)
Expand Down

0 comments on commit 9964786

Please sign in to comment.