diff --git a/common/rpc/interceptor/namespace.go b/common/rpc/interceptor/namespace.go index d67ef81f56e..997b8ced76f 100644 --- a/common/rpc/interceptor/namespace.go +++ b/common/rpc/interceptor/namespace.go @@ -24,16 +24,39 @@ package interceptor +import ( + "go.temporal.io/server/common/cache" +) + +// gRPC method request must implement either NamespaceNameGetter or NamespaceIDGetter +// for namespace specific metrics to be reported properly type ( - NamespaceGetter interface { + NamespaceNameGetter interface { GetNamespace() string } -) -func GetNamespace(req interface{}) string { - if namespaceGetter, ok := req.(NamespaceGetter); ok { - return namespaceGetter.GetNamespace() + NamespaceIDGetter interface { + GetNamespaceId() string } +) - return "" +func GetNamespace( + namespaceCache cache.NamespaceCache, + req interface{}, +) string { + switch request := req.(type) { + case NamespaceNameGetter: + return request.GetNamespace() + + case NamespaceIDGetter: + namespaceID := request.GetNamespaceId() + namespaceEntry, err := namespaceCache.GetNamespaceByID(namespaceID) + if err != nil { + return "" + } + return namespaceEntry.GetInfo().Name + + default: + return "" + } } diff --git a/common/rpc/interceptor/namespace_count_limit.go b/common/rpc/interceptor/namespace_count_limit.go index 943c33e08ee..6beba43c679 100644 --- a/common/rpc/interceptor/namespace_count_limit.go +++ b/common/rpc/interceptor/namespace_count_limit.go @@ -31,6 +31,8 @@ import ( "go.temporal.io/api/serviceerror" "google.golang.org/grpc" + + "go.temporal.io/server/common/cache" ) var ( @@ -39,6 +41,8 @@ var ( type ( NamespaceCountLimitInterceptor struct { + namespaceCache cache.NamespaceCache + countFn func(namespace string) int tokens map[string]int @@ -50,12 +54,14 @@ type ( var _ grpc.UnaryServerInterceptor = (*NamespaceCountLimitInterceptor)(nil).Intercept func NewNamespaceCountLimitInterceptor( + namespaceCache cache.NamespaceCache, countFn func(namespace string) int, tokens map[string]int, ) *NamespaceCountLimitInterceptor { return &NamespaceCountLimitInterceptor{ - countFn: countFn, - tokens: tokens, + namespaceCache: namespaceCache, + countFn: countFn, + tokens: tokens, namespaceToCount: make(map[string]*int32), } @@ -71,7 +77,7 @@ func (ni *NamespaceCountLimitInterceptor) Intercept( // token will default to 0 token, _ := ni.tokens[methodName] if token != 0 { - namespace := GetNamespace(req) + namespace := GetNamespace(ni.namespaceCache, req) counter := ni.counter(namespace) count := atomic.AddInt32(counter, int32(token)) defer atomic.AddInt32(counter, -int32(token)) diff --git a/common/rpc/interceptor/namespace_rate_limit.go b/common/rpc/interceptor/namespace_rate_limit.go index d81e31b4ced..650027e17ea 100644 --- a/common/rpc/interceptor/namespace_rate_limit.go +++ b/common/rpc/interceptor/namespace_rate_limit.go @@ -31,6 +31,7 @@ import ( "go.temporal.io/api/serviceerror" "google.golang.org/grpc" + "go.temporal.io/server/common/cache" "go.temporal.io/server/common/quotas" ) @@ -44,18 +45,21 @@ var ( type ( NamespaceRateLimitInterceptor struct { - rateLimiter quotas.NamespaceRateLimiter - tokens map[string]int + namespaceCache cache.NamespaceCache + rateLimiter quotas.NamespaceRateLimiter + tokens map[string]int } ) var _ grpc.UnaryServerInterceptor = (*NamespaceRateLimitInterceptor)(nil).Intercept func NewNamespaceRateLimitInterceptor( + namespaceCache cache.NamespaceCache, rateFn func(namespace string) float64, tokens map[string]int, ) *NamespaceRateLimitInterceptor { return &NamespaceRateLimitInterceptor{ + namespaceCache: namespaceCache, rateLimiter: quotas.NewNamespaceRateLimiter( func(namespace string) quotas.RateLimiter { return quotas.NewDefaultIncomingDynamicRateLimiter( @@ -78,7 +82,7 @@ func (ni *NamespaceRateLimitInterceptor) Intercept( token = NamespaceRateLimitDefaultToken } - namespace := GetNamespace(req) + namespace := GetNamespace(ni.namespaceCache, req) if !ni.rateLimiter.AllowN(namespace, time.Now().UTC(), token) { return nil, ErrNamespaceRateLimitServerBusy } diff --git a/common/rpc/interceptor/namespace_test.go b/common/rpc/interceptor/namespace_test.go new file mode 100644 index 00000000000..eccdc1b27a1 --- /dev/null +++ b/common/rpc/interceptor/namespace_test.go @@ -0,0 +1,156 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package interceptor + +import ( + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/api/workflowservice/v1" + + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/matchingservice/v1" +) + +type ( + nameepaceSuite struct { + suite.Suite + *require.Assertions + } +) + +var ( + frontendAPIExcluded = map[string]struct{}{ + "GetClusterInfo": {}, + "GetSearchAttributes": {}, + "ListNamespaces": {}, + } + + matchingAPIExcluded = map[string]struct{}{ + "ListTaskQueuePartitions": {}, + } + + historyAPIExcluded = map[string]struct{}{ + "CloseShard": {}, + "GetDLQMessages": {}, + "GetDLQReplicationMessages": {}, + "GetReplicationMessages": {}, + "MergeDLQMessages": {}, + "PurgeDLQMessages": {}, + "RemoveTask": {}, + "SyncShardStatus": {}, + } +) + +func TestNameepaceSuite(t *testing.T) { + s := new(nameepaceSuite) + suite.Run(t, s) +} + +func (s *nameepaceSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *nameepaceSuite) TearDownTest() { + +} + +func (s *nameepaceSuite) TestFrontendAPIMetrics() { + namespaceNameGetter := reflect.TypeOf((*NamespaceNameGetter)(nil)).Elem() + + var service workflowservice.WorkflowServiceServer + t := reflect.TypeOf(&service).Elem() + for i := 0; i < t.NumMethod(); i++ { + method := t.Method(i) + methodName := method.Name + methodType := method.Type + + // 0th parameter is context.Context + // 1th parameter is the request + if _, ok := frontendAPIExcluded[methodName]; ok { + continue + } + if methodType.NumIn() < 2 { + continue + } + request := methodType.In(1) + if !request.Implements(namespaceNameGetter) { + s.Fail(fmt.Sprintf("API: %v not implementing NamespaceNameGetter", methodName)) + } + } +} + +func (s *nameepaceSuite) TestMatchingAPIMetrics() { + namespaceIDGetter := reflect.TypeOf((*NamespaceIDGetter)(nil)).Elem() + + var service matchingservice.MatchingServiceServer + t := reflect.TypeOf(&service).Elem() + for i := 0; i < t.NumMethod(); i++ { + method := t.Method(i) + methodName := method.Name + methodType := method.Type + + // 0th parameter is context.Context + // 1th parameter is the request + if _, ok := matchingAPIExcluded[methodName]; ok { + continue + } + if methodType.NumIn() < 2 { + continue + } + request := methodType.In(1) + if !request.Implements(namespaceIDGetter) { + s.Fail(fmt.Sprintf("API: %v not implementing NamespaceIDGetter", methodName)) + } + } +} + +func (s *nameepaceSuite) TestHistoryAPIMetrics() { + namespaceIDGetter := reflect.TypeOf((*NamespaceIDGetter)(nil)).Elem() + + var service historyservice.HistoryServiceServer + t := reflect.TypeOf(&service).Elem() + for i := 0; i < t.NumMethod(); i++ { + method := t.Method(i) + methodName := method.Name + methodType := method.Type + + // 0th parameter is context.Context + // 1th parameter is the request + if _, ok := historyAPIExcluded[methodName]; ok { + continue + } + if methodType.NumIn() < 2 { + continue + } + request := methodType.In(1) + if !request.Implements(namespaceIDGetter) { + s.Fail(fmt.Sprintf("API: %v not implementing NamespaceIDGetter", methodName)) + } + } +} diff --git a/common/rpc/interceptor/telemetry.go b/common/rpc/interceptor/telemetry.go index 71060266abd..5ad2afdf2fa 100644 --- a/common/rpc/interceptor/telemetry.go +++ b/common/rpc/interceptor/telemetry.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc" "go.temporal.io/server/common" + "go.temporal.io/server/common/cache" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -48,23 +49,26 @@ var ( type ( TelemetryInterceptor struct { - metricsClient metrics.Client - scopes map[string]int - logger log.Logger + namespaceCache cache.NamespaceCache + metricsClient metrics.Client + scopes map[string]int + logger log.Logger } ) var _ grpc.UnaryServerInterceptor = (*TelemetryInterceptor)(nil).Intercept func NewTelemetryInterceptor( + namespaceCache cache.NamespaceCache, metricsClient metrics.Client, scopes map[string]int, logger log.Logger, ) *TelemetryInterceptor { return &TelemetryInterceptor{ - metricsClient: metricsClient, - scopes: scopes, - logger: logger, + namespaceCache: namespaceCache, + metricsClient: metricsClient, + scopes: scopes, + logger: logger, } } @@ -95,7 +99,7 @@ func (ti *TelemetryInterceptor) Intercept( scope, _ := ti.scopes[methodName] scope = ti.overrideScope(scope, methodName, req) var metricsScope metrics.Scope - if namespace := GetNamespace(req); namespace != "" { + if namespace := GetNamespace(ti.namespaceCache, req); namespace != "" { metricsScope = ti.metricsClient.Scope(scope).Tagged(metrics.NamespaceTag(namespace)) } else { metricsScope = ti.metricsClient.Scope(scope).Tagged(metrics.NamespaceUnknownTag()) diff --git a/service/frontend/service.go b/service/frontend/service.go index 465811056b8..eb44a7bf90c 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -245,6 +245,7 @@ func (s *Service) Start() { } metricsInterceptor := interceptor.NewTelemetryInterceptor( + s.Resource.GetNamespaceCache(), s.Resource.GetMetricsClient(), metrics.FrontendAPIMetricsScopes(), s.Resource.GetLogger(), @@ -254,12 +255,14 @@ func (s *Service) Start() { APIRateLimitOverride, ) namespaceRateLimiterInterceptor := interceptor.NewNamespaceRateLimitInterceptor( + s.Resource.GetNamespaceCache(), func(namespace string) float64 { return float64(s.config.MaxNamespaceRPSPerInstance(namespace)) }, APIRateLimitOverride, ) namespaceCountLimiterInterceptor := interceptor.NewNamespaceCountLimitInterceptor( + s.Resource.GetNamespaceCache(), s.config.MaxNamespaceCountPerInstance, APICountLimitOverride, ) diff --git a/service/history/service.go b/service/history/service.go index 79fc43191ac..0e09746edbf 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -150,6 +150,7 @@ func (s *Service) Start() { s.handler.Start() metricsInterceptor := interceptor.NewTelemetryInterceptor( + s.Resource.GetNamespaceCache(), s.Resource.GetMetricsClient(), metrics.HistoryAPIMetricsScopes(), s.Resource.GetLogger(), diff --git a/service/matching/service.go b/service/matching/service.go index 69432a9da85..15f6088f5fa 100644 --- a/service/matching/service.go +++ b/service/matching/service.go @@ -103,6 +103,7 @@ func (s *Service) Start() { s.handler.Start() metricsInterceptor := interceptor.NewTelemetryInterceptor( + s.Resource.GetNamespaceCache(), s.Resource.GetMetricsClient(), metrics.MatchingAPIMetricsScopes(), s.Resource.GetLogger(),