From ebb9270fb4bb228a3330dd96639e2ab09bbb3374 Mon Sep 17 00:00:00 2001 From: Michael Snowden Date: Thu, 29 Jun 2023 17:06:10 -0700 Subject: [PATCH] Add frontend global RPS limit --- common/dynamicconfig/constants.go | 4 +- service/frontend/fx.go | 5 +- service/frontend/fx_test.go | 280 ++++++++++++++++++++++++++++++ service/frontend/service.go | 15 +- 4 files changed, 298 insertions(+), 6 deletions(-) create mode 100644 service/frontend/fx_test.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 312d80eed88..843a1e1e12b 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -237,8 +237,10 @@ const ( FrontendVisibilityMaxPageSize = "frontend.visibilityMaxPageSize" // FrontendHistoryMaxPageSize is default max size for GetWorkflowExecutionHistory in one page FrontendHistoryMaxPageSize = "frontend.historyMaxPageSize" - // FrontendRPS is workflow rate limit per second + // FrontendRPS is workflow rate limit per second per-instance FrontendRPS = "frontend.rps" + // FrontendGlobalRPS is workflow rate limit per second for the whole cluster + FrontendGlobalRPS = "frontend.globalRPS" // FrontendNamespaceReplicationInducingAPIsRPS limits the per second request rate for namespace replication inducing // APIs (e.g. RegisterNamespace, UpdateNamespace, UpdateWorkerBuildIdCompatibility). // This config is EXPERIMENTAL and may be changed or removed in a later release. diff --git a/service/frontend/fx.go b/service/frontend/fx.go index aef981131c5..39596a1294c 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -286,8 +286,11 @@ func TelemetryInterceptorProvider( func RateLimitInterceptorProvider( serviceConfig *Config, + frontendServiceResolver membership.ServiceResolver, ) *interceptor.RateLimitInterceptor { - rateFn := func() float64 { return float64(serviceConfig.RPS()) } + rateFn := func() float64 { + return effectiveRPS(frontendServiceResolver, serviceConfig.RPS(), serviceConfig.GlobalRPS()) + } namespaceReplicationInducingRateFn := func() float64 { return float64(serviceConfig.NamespaceReplicationInducingAPIsRPS()) } return interceptor.NewRateLimitInterceptor( diff --git a/service/frontend/fx_test.go b/service/frontend/fx_test.go new file mode 100644 index 00000000000..25dd32c1ce7 --- /dev/null +++ b/service/frontend/fx_test.go @@ -0,0 +1,280 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + "net" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/membership" + "go.temporal.io/server/internal/nettest" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type testCase struct { + // name of the test case + name string + // t is the test object + t *testing.T + // globalRPSLimit is the global RPS limit for all frontend hosts + globalRPSLimit int + // perInstanceRPSLimit is the RPS limit for each frontend host + perInstanceRPSLimit int + // expectRateLimit is true if the interceptor should return a rate limit error + expectRateLimit bool + // numRequests is the number of requests to send to the interceptor + numRequests int + // serviceResolver is used to determine the number of frontend hosts for the global rate limiter + serviceResolver membership.ServiceResolver + // configure is a function that can be used to override the default test case values + configure func(tc *testCase) +} + +func TestRateLimitInterceptorProvider(t *testing.T) { + t.Parallel() + + // The burst limit is 2 * the rps limit, so this is 8, which is < the number of requests. The interceptor should + // rate limit one of the last two requests because we exceeded the burst limit, and there is no delay between the + // last two requests. + lowPerInstanceRPSLimit := 4 + // The burst limit is 2 * the rps limit, so this is 10, which is >= the number of requests. The interceptor should + // not rate limit any of the requests because we never exceed the burst limit. + highPerInstanceRPSLimit := 5 + // The number of hosts is 10, so if 4 is too low of an RPS limit per-instance, then 40 is too low of a global RPS + // limit. + numHosts := 10 + lowGlobalRPSLimit := lowPerInstanceRPSLimit * numHosts + highGlobalRPSLimit := highPerInstanceRPSLimit * numHosts + + testCases := []testCase{ + { + name: "both rate limits hit", + configure: func(tc *testCase) { + tc.globalRPSLimit = lowGlobalRPSLimit + tc.perInstanceRPSLimit = lowPerInstanceRPSLimit + tc.expectRateLimit = true + }, + }, + { + name: "global rate limit hit", + configure: func(tc *testCase) { + tc.globalRPSLimit = lowGlobalRPSLimit + tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.expectRateLimit = true + }, + }, + { + name: "per instance rate limit hit but ignored because global rate limit is not hit", + configure: func(tc *testCase) { + tc.globalRPSLimit = highGlobalRPSLimit + tc.perInstanceRPSLimit = lowPerInstanceRPSLimit + tc.expectRateLimit = false + }, + }, + { + name: "neither rate limit hit", + configure: func(tc *testCase) { + tc.globalRPSLimit = highGlobalRPSLimit + tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.expectRateLimit = false + }, + }, + { + name: "global rate limit not configured and per instance rate limit not hit", + configure: func(tc *testCase) { + tc.globalRPSLimit = 0 + tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.expectRateLimit = false + }, + }, + { + name: "global rate limit not configured and per instance rate limit is hit", + configure: func(tc *testCase) { + tc.globalRPSLimit = 0 + tc.perInstanceRPSLimit = lowPerInstanceRPSLimit + tc.expectRateLimit = true + }, + }, + { + name: "global rate limit not configured and zero per-instance rate limit", + configure: func(tc *testCase) { + tc.globalRPSLimit = 0 + tc.perInstanceRPSLimit = 0 + tc.expectRateLimit = true + }, + }, + { + name: "nil service resolver causes global RPS limit to be ignored", + configure: func(tc *testCase) { + tc.globalRPSLimit = lowPerInstanceRPSLimit + tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.expectRateLimit = false + tc.serviceResolver = nil + }, + }, + { + name: "no hosts returned by service resolver acts as if there was one host", + configure: func(tc *testCase) { + tc.globalRPSLimit = lowPerInstanceRPSLimit + tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.expectRateLimit = true + serviceResolver := membership.NewMockServiceResolver(gomock.NewController(tc.t)) + serviceResolver.EXPECT().MemberCount().Return(0).AnyTimes() + tc.serviceResolver = serviceResolver + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + tc.numRequests = 10 + tc.t = t + { + // Create a mock service resolver which returns the number of frontend hosts. + // This may be overridden by the test case. + ctrl := gomock.NewController(t) + serviceResolver := membership.NewMockServiceResolver(ctrl) + serviceResolver.EXPECT().MemberCount().Return(numHosts).AnyTimes() + tc.serviceResolver = serviceResolver + } + tc.configure(&tc) + + // Create a rate limit interceptor which uses the fake clock, and the per instance and global RPS limits + // from the test case. + rateLimitInterceptor := RateLimitInterceptorProvider(&Config{ + RPS: func() int { + return tc.perInstanceRPSLimit + }, + GlobalRPS: func() int { + return tc.globalRPSLimit + }, + NamespaceReplicationInducingAPIsRPS: func() int { + // this is not used in this test + return 0 + }, + }, tc.serviceResolver) + + // Create a gRPC server for the fake workflow service. + svc := &testSvc{} + server := grpc.NewServer(grpc.UnaryInterceptor(rateLimitInterceptor.Intercept)) + workflowservice.RegisterWorkflowServiceServer(server, svc) + + pipe := nettest.NewPipe() + + var wg sync.WaitGroup + defer wg.Wait() + wg.Add(1) + + listenerDone := make(chan struct{}) + + go func() { + defer wg.Done() + + // This should return an error because Accept returns an error, but we don't assert anything because we + // don't actually care whether this returns an error or not. + _ = server.Serve(&testListener{ + Pipe: pipe, + done: listenerDone, + }) + }() + + // Create a gRPC client to the fake workflow service. + dialer := grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + return pipe.Connect(ctx.Done()) + }) + transportCredentials := grpc.WithTransportCredentials(insecure.NewCredentials()) + conn, err := grpc.DialContext(context.Background(), "fake", dialer, transportCredentials) + require.NoError(t, err) + + defer func() { + assert.NoError(t, conn.Close()) + // This causes the server to get an error from its call to Accept and stop itself. + close(listenerDone) + }() + + client := workflowservice.NewWorkflowServiceClient(conn) + + // Generate load by sending a number of requests to the server. + for i := 0; i < tc.numRequests; i++ { + _, err = client.StartWorkflowExecution( + context.Background(), + &workflowservice.StartWorkflowExecutionRequest{}, + ) + if err != nil { + break + } + } + + // Check if the rate limit is hit. + if tc.expectRateLimit { + assert.ErrorContains(t, err, "rate limit exceeded") + } else { + assert.NoError(t, err) + } + }) + } +} + +// testSvc is a fake workflow service. +type testSvc struct { + workflowservice.UnimplementedWorkflowServiceServer +} + +// StartWorkflowExecution is a fake implementation of the StartWorkflowExecution gRPC method which does nothing. +func (t *testSvc) StartWorkflowExecution( + context.Context, + *workflowservice.StartWorkflowExecutionRequest, +) (*workflowservice.StartWorkflowExecutionResponse, error) { + return &workflowservice.StartWorkflowExecutionResponse{}, nil +} + +// testListener is a fake listener which uses a nettest.Pipe to simulate a network connection. +type testListener struct { + *nettest.Pipe + // We cancel calls to Accept using the done channel so that tests don't hang if they're broken. + done <-chan struct{} +} + +func (t testListener) Accept() (net.Conn, error) { + return t.Pipe.Accept(t.done) +} + +func (t testListener) Close() error { + return nil +} + +func (t testListener) Addr() net.Addr { + return &net.TCPAddr{} +} diff --git a/service/frontend/service.go b/service/frontend/service.go index aed7568a3b2..ecdbb4727af 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -71,6 +71,7 @@ type Config struct { HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter RPS dynamicconfig.IntPropertyFn + GlobalRPS dynamicconfig.IntPropertyFn NamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFn MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -205,6 +206,7 @@ func NewConfig( HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), + GlobalRPS: dc.GetIntProperty(dynamicconfig.FrontendGlobalRPS, 0), NamespaceReplicationInducingAPIsRPS: dc.GetIntProperty(dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS, 20), MaxNamespaceRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceRPSPerInstance, 2400), @@ -405,14 +407,19 @@ func namespaceRPS( frontendResolver membership.ServiceResolver, namespace string, ) float64 { - globalRPS := float64(globalRPSFn(namespace)) + globalRPS := globalRPSFn(namespace) + hostRPS := perInstanceRPSFn(namespace) + + return effectiveRPS(frontendResolver, hostRPS, globalRPS) +} + +func effectiveRPS(frontendResolver membership.ServiceResolver, hostRPS int, globalRPS int) float64 { if globalRPS > 0 && frontendResolver != nil { hosts := float64(numFrontendHosts(frontendResolver)) - return globalRPS / hosts + return float64(globalRPS) / hosts } - hostRPS := float64(perInstanceRPSFn(namespace)) - return hostRPS + return float64(hostRPS) } func numFrontendHosts(