Skip to content

Commit

Permalink
Fix metrics namespace tagging issue for matching & history service (#…
Browse files Browse the repository at this point in the history
…1414)

* Fix namespace name metrics extraction issue for matching & history service

Previously, the namespace attribute extraction only works for frontend service (request comes with namespace name).
Internal request only contains namespace ID, which were not processed correctly
  • Loading branch information
wxing1292 committed Mar 29, 2021
1 parent 7c1c89b commit 3d10123
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 19 deletions.
35 changes: 29 additions & 6 deletions common/rpc/interceptor/namespace.go
Expand Up @@ -24,16 +24,39 @@

package interceptor

import (
"go.temporal.io/server/common/cache"
)

// gRPC method request must implement either NamespaceNameGetter or NamespaceIDGetter
// for namespace specific metrics to be reported properly
type (
NamespaceGetter interface {
NamespaceNameGetter interface {
GetNamespace() string
}
)

func GetNamespace(req interface{}) string {
if namespaceGetter, ok := req.(NamespaceGetter); ok {
return namespaceGetter.GetNamespace()
NamespaceIDGetter interface {
GetNamespaceId() string
}
)

return ""
func GetNamespace(
namespaceCache cache.NamespaceCache,
req interface{},
) string {
switch request := req.(type) {
case NamespaceNameGetter:
return request.GetNamespace()

case NamespaceIDGetter:
namespaceID := request.GetNamespaceId()
namespaceEntry, err := namespaceCache.GetNamespaceByID(namespaceID)
if err != nil {
return ""
}
return namespaceEntry.GetInfo().Name

default:
return ""
}
}
12 changes: 9 additions & 3 deletions common/rpc/interceptor/namespace_count_limit.go
Expand Up @@ -31,6 +31,8 @@ import (

"go.temporal.io/api/serviceerror"
"google.golang.org/grpc"

"go.temporal.io/server/common/cache"
)

var (
Expand All @@ -39,6 +41,8 @@ var (

type (
NamespaceCountLimitInterceptor struct {
namespaceCache cache.NamespaceCache

countFn func(namespace string) int
tokens map[string]int

Expand All @@ -50,12 +54,14 @@ type (
var _ grpc.UnaryServerInterceptor = (*NamespaceCountLimitInterceptor)(nil).Intercept

func NewNamespaceCountLimitInterceptor(
namespaceCache cache.NamespaceCache,
countFn func(namespace string) int,
tokens map[string]int,
) *NamespaceCountLimitInterceptor {
return &NamespaceCountLimitInterceptor{
countFn: countFn,
tokens: tokens,
namespaceCache: namespaceCache,
countFn: countFn,
tokens: tokens,

namespaceToCount: make(map[string]*int32),
}
Expand All @@ -71,7 +77,7 @@ func (ni *NamespaceCountLimitInterceptor) Intercept(
// token will default to 0
token, _ := ni.tokens[methodName]
if token != 0 {
namespace := GetNamespace(req)
namespace := GetNamespace(ni.namespaceCache, req)
counter := ni.counter(namespace)
count := atomic.AddInt32(counter, int32(token))
defer atomic.AddInt32(counter, -int32(token))
Expand Down
10 changes: 7 additions & 3 deletions common/rpc/interceptor/namespace_rate_limit.go
Expand Up @@ -31,6 +31,7 @@ import (
"go.temporal.io/api/serviceerror"
"google.golang.org/grpc"

"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/quotas"
)

Expand All @@ -44,18 +45,21 @@ var (

type (
NamespaceRateLimitInterceptor struct {
rateLimiter quotas.NamespaceRateLimiter
tokens map[string]int
namespaceCache cache.NamespaceCache
rateLimiter quotas.NamespaceRateLimiter
tokens map[string]int
}
)

var _ grpc.UnaryServerInterceptor = (*NamespaceRateLimitInterceptor)(nil).Intercept

func NewNamespaceRateLimitInterceptor(
namespaceCache cache.NamespaceCache,
rateFn func(namespace string) float64,
tokens map[string]int,
) *NamespaceRateLimitInterceptor {
return &NamespaceRateLimitInterceptor{
namespaceCache: namespaceCache,
rateLimiter: quotas.NewNamespaceRateLimiter(
func(namespace string) quotas.RateLimiter {
return quotas.NewDefaultIncomingDynamicRateLimiter(
Expand All @@ -78,7 +82,7 @@ func (ni *NamespaceRateLimitInterceptor) Intercept(
token = NamespaceRateLimitDefaultToken
}

namespace := GetNamespace(req)
namespace := GetNamespace(ni.namespaceCache, req)
if !ni.rateLimiter.AllowN(namespace, time.Now().UTC(), token) {
return nil, ErrNamespaceRateLimitServerBusy
}
Expand Down
156 changes: 156 additions & 0 deletions common/rpc/interceptor/namespace_test.go
@@ -0,0 +1,156 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package interceptor

import (
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/server/api/historyservice/v1"
"go.temporal.io/server/api/matchingservice/v1"
)

type (
nameepaceSuite struct {
suite.Suite
*require.Assertions
}
)

var (
frontendAPIExcluded = map[string]struct{}{
"GetClusterInfo": {},
"GetSearchAttributes": {},
"ListNamespaces": {},
}

matchingAPIExcluded = map[string]struct{}{
"ListTaskQueuePartitions": {},
}

historyAPIExcluded = map[string]struct{}{
"CloseShard": {},
"GetDLQMessages": {},
"GetDLQReplicationMessages": {},
"GetReplicationMessages": {},
"MergeDLQMessages": {},
"PurgeDLQMessages": {},
"RemoveTask": {},
"SyncShardStatus": {},
}
)

func TestNameepaceSuite(t *testing.T) {
s := new(nameepaceSuite)
suite.Run(t, s)
}

func (s *nameepaceSuite) SetupTest() {
s.Assertions = require.New(s.T())
}

func (s *nameepaceSuite) TearDownTest() {

}

func (s *nameepaceSuite) TestFrontendAPIMetrics() {
namespaceNameGetter := reflect.TypeOf((*NamespaceNameGetter)(nil)).Elem()

var service workflowservice.WorkflowServiceServer
t := reflect.TypeOf(&service).Elem()
for i := 0; i < t.NumMethod(); i++ {
method := t.Method(i)
methodName := method.Name
methodType := method.Type

// 0th parameter is context.Context
// 1th parameter is the request
if _, ok := frontendAPIExcluded[methodName]; ok {
continue
}
if methodType.NumIn() < 2 {
continue
}
request := methodType.In(1)
if !request.Implements(namespaceNameGetter) {
s.Fail(fmt.Sprintf("API: %v not implementing NamespaceNameGetter", methodName))
}
}
}

func (s *nameepaceSuite) TestMatchingAPIMetrics() {
namespaceIDGetter := reflect.TypeOf((*NamespaceIDGetter)(nil)).Elem()

var service matchingservice.MatchingServiceServer
t := reflect.TypeOf(&service).Elem()
for i := 0; i < t.NumMethod(); i++ {
method := t.Method(i)
methodName := method.Name
methodType := method.Type

// 0th parameter is context.Context
// 1th parameter is the request
if _, ok := matchingAPIExcluded[methodName]; ok {
continue
}
if methodType.NumIn() < 2 {
continue
}
request := methodType.In(1)
if !request.Implements(namespaceIDGetter) {
s.Fail(fmt.Sprintf("API: %v not implementing NamespaceIDGetter", methodName))
}
}
}

func (s *nameepaceSuite) TestHistoryAPIMetrics() {
namespaceIDGetter := reflect.TypeOf((*NamespaceIDGetter)(nil)).Elem()

var service historyservice.HistoryServiceServer
t := reflect.TypeOf(&service).Elem()
for i := 0; i < t.NumMethod(); i++ {
method := t.Method(i)
methodName := method.Name
methodType := method.Type

// 0th parameter is context.Context
// 1th parameter is the request
if _, ok := historyAPIExcluded[methodName]; ok {
continue
}
if methodType.NumIn() < 2 {
continue
}
request := methodType.In(1)
if !request.Implements(namespaceIDGetter) {
s.Fail(fmt.Sprintf("API: %v not implementing NamespaceIDGetter", methodName))
}
}
}
18 changes: 11 additions & 7 deletions common/rpc/interceptor/telemetry.go
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/grpc"

"go.temporal.io/server/common"
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand All @@ -48,23 +49,26 @@ var (

type (
TelemetryInterceptor struct {
metricsClient metrics.Client
scopes map[string]int
logger log.Logger
namespaceCache cache.NamespaceCache
metricsClient metrics.Client
scopes map[string]int
logger log.Logger
}
)

var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept

func NewTelemetryInterceptor(
namespaceCache cache.NamespaceCache,
metricsClient metrics.Client,
scopes map[string]int,
logger log.Logger,
) *TelemetryInterceptor {
return &TelemetryInterceptor{
metricsClient: metricsClient,
scopes: scopes,
logger: logger,
namespaceCache: namespaceCache,
metricsClient: metricsClient,
scopes: scopes,
logger: logger,
}
}

Expand Down Expand Up @@ -95,7 +99,7 @@ func (ti *TelemetryInterceptor) Intercept(
scope, _ := ti.scopes[methodName]
scope = ti.overrideScope(scope, methodName, req)
var metricsScope metrics.Scope
if namespace := GetNamespace(req); namespace != "" {
if namespace := GetNamespace(ti.namespaceCache, req); namespace != "" {
metricsScope = ti.metricsClient.Scope(scope).Tagged(metrics.NamespaceTag(namespace))
} else {
metricsScope = ti.metricsClient.Scope(scope).Tagged(metrics.NamespaceUnknownTag())
Expand Down
3 changes: 3 additions & 0 deletions service/frontend/service.go
Expand Up @@ -245,6 +245,7 @@ func (s *Service) Start() {
}

metricsInterceptor := interceptor.NewTelemetryInterceptor(
s.Resource.GetNamespaceCache(),
s.Resource.GetMetricsClient(),
metrics.FrontendAPIMetricsScopes(),
s.Resource.GetLogger(),
Expand All @@ -254,12 +255,14 @@ func (s *Service) Start() {
APIRateLimitOverride,
)
namespaceRateLimiterInterceptor := interceptor.NewNamespaceRateLimitInterceptor(
s.Resource.GetNamespaceCache(),
func(namespace string) float64 {
return float64(s.config.MaxNamespaceRPSPerInstance(namespace))
},
APIRateLimitOverride,
)
namespaceCountLimiterInterceptor := interceptor.NewNamespaceCountLimitInterceptor(
s.Resource.GetNamespaceCache(),
s.config.MaxNamespaceCountPerInstance,
APICountLimitOverride,
)
Expand Down
1 change: 1 addition & 0 deletions service/history/service.go
Expand Up @@ -150,6 +150,7 @@ func (s *Service) Start() {
s.handler.Start()

metricsInterceptor := interceptor.NewTelemetryInterceptor(
s.Resource.GetNamespaceCache(),
s.Resource.GetMetricsClient(),
metrics.HistoryAPIMetricsScopes(),
s.Resource.GetLogger(),
Expand Down
1 change: 1 addition & 0 deletions service/matching/service.go
Expand Up @@ -103,6 +103,7 @@ func (s *Service) Start() {
s.handler.Start()

metricsInterceptor := interceptor.NewTelemetryInterceptor(
s.Resource.GetNamespaceCache(),
s.Resource.GetMetricsClient(),
metrics.MatchingAPIMetricsScopes(),
s.Resource.GetLogger(),
Expand Down

0 comments on commit 3d10123

Please sign in to comment.