From d54f4d8c1547222c4cae48fdf59c67b5776c3c2e Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Fri, 6 Oct 2023 11:03:01 -0700 Subject: [PATCH] Shard count based rate limiting (#4923) --- service/history/fx.go | 26 +++- service/history/queue_factory_base.go | 45 +++--- service/history/queue_factory_base_test.go | 6 + service/history/shard/fx.go | 69 ++++++++-- .../shard/ownership_based_quota_calculator.go | 101 ++++++++++++++ .../shard/ownership_based_quota_scaler.go | 128 ++++++------------ .../ownership_based_quota_scaler_test.go | 39 ++---- 7 files changed, 261 insertions(+), 153 deletions(-) create mode 100644 service/history/shard/ownership_based_quota_calculator.go diff --git a/service/history/fx.go b/service/history/fx.go index 3e3567baab8..6e6c794fc9d 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -220,18 +220,32 @@ func ESProcessorConfigProvider( func PersistenceRateLimitingParamsProvider( serviceConfig *configs.Config, persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver, + ownershipBasedQuotaScaler shard.LazyLoadedOwnershipBasedQuotaScaler, ) service.PersistenceRateLimitingParams { - return service.NewPersistenceRateLimitingParams( + calculator := shard.NewOwnershipAwareQuotaCalculator( + ownershipBasedQuotaScaler, + persistenceLazyLoadedServiceResolver, serviceConfig.PersistenceMaxQPS, serviceConfig.PersistenceGlobalMaxQPS, + ) + namespaceCalculator := shard.NewOwnershipAwareNamespaceQuotaCalculator( + ownershipBasedQuotaScaler, + persistenceLazyLoadedServiceResolver, serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistenceGlobalNamespaceMaxQPS, - serviceConfig.PersistencePerShardNamespaceMaxQPS, - serviceConfig.EnablePersistencePriorityRateLimiting, - serviceConfig.OperatorRPSRatio, - serviceConfig.PersistenceDynamicRateLimitingParams, - persistenceLazyLoadedServiceResolver, ) + return service.PersistenceRateLimitingParams{ + PersistenceMaxQps: func() int { + return int(calculator.GetQuota()) + }, + PersistenceNamespaceMaxQps: func(namespace string) int { + return int(namespaceCalculator.GetQuota(namespace)) + }, + PersistencePerShardNamespaceMaxQPS: persistenceClient.PersistencePerShardNamespaceMaxQPS(serviceConfig.PersistencePerShardNamespaceMaxQPS), + EnablePriorityRateLimiting: persistenceClient.EnablePriorityRateLimiting(serviceConfig.EnablePersistencePriorityRateLimiting), + OperatorRPSRatio: persistenceClient.OperatorRPSRatio(serviceConfig.OperatorRPSRatio), + DynamicRateLimitingParams: persistenceClient.DynamicRateLimitingParams(serviceConfig.PersistenceDynamicRateLimitingParams), + } } func VisibilityManagerProvider( diff --git a/service/history/queue_factory_base.go b/service/history/queue_factory_base.go index 71d1a9fb802..72eceef8b26 100644 --- a/service/history/queue_factory_base.go +++ b/service/history/queue_factory_base.go @@ -158,31 +158,36 @@ func getOptionalQueueFactories( } func QueueSchedulerRateLimiterProvider( + ownershipBasedQuotaScaler shard.LazyLoadedOwnershipBasedQuotaScaler, serviceResolver membership.ServiceResolver, config *configs.Config, timeSource clock.TimeSource, ) (queues.SchedulerRateLimiter, error) { return queues.NewPrioritySchedulerRateLimiter( - quotas.ClusterAwareNamespaceSpecificQuotaCalculator{ - MemberCounter: serviceResolver, - PerInstanceQuota: config.TaskSchedulerNamespaceMaxQPS, - GlobalQuota: config.TaskSchedulerGlobalNamespaceMaxQPS, - }.GetQuota, - quotas.ClusterAwareQuotaCalculator{ - MemberCounter: serviceResolver, - PerInstanceQuota: config.TaskSchedulerMaxQPS, - GlobalQuota: config.TaskSchedulerGlobalMaxQPS, - }.GetQuota, - quotas.ClusterAwareNamespaceSpecificQuotaCalculator{ - MemberCounter: serviceResolver, - PerInstanceQuota: config.PersistenceNamespaceMaxQPS, - GlobalQuota: config.PersistenceGlobalNamespaceMaxQPS, - }.GetQuota, - quotas.ClusterAwareQuotaCalculator{ - MemberCounter: serviceResolver, - PerInstanceQuota: config.PersistenceMaxQPS, - GlobalQuota: config.PersistenceGlobalMaxQPS, - }.GetQuota, + shard.NewOwnershipAwareNamespaceQuotaCalculator( + ownershipBasedQuotaScaler, + serviceResolver, + config.TaskSchedulerNamespaceMaxQPS, + config.TaskSchedulerGlobalNamespaceMaxQPS, + ).GetQuota, + shard.NewOwnershipAwareQuotaCalculator( + ownershipBasedQuotaScaler, + serviceResolver, + config.TaskSchedulerMaxQPS, + config.TaskSchedulerGlobalMaxQPS, + ).GetQuota, + shard.NewOwnershipAwareNamespaceQuotaCalculator( + ownershipBasedQuotaScaler, + serviceResolver, + config.PersistenceNamespaceMaxQPS, + config.PersistenceGlobalNamespaceMaxQPS, + ).GetQuota, + shard.NewOwnershipAwareQuotaCalculator( + ownershipBasedQuotaScaler, + serviceResolver, + config.PersistenceMaxQPS, + config.PersistenceGlobalMaxQPS, + ).GetQuota, ) } diff --git a/service/history/queue_factory_base_test.go b/service/history/queue_factory_base_test.go index 79ecbe9aaf8..91c488c5b8d 100644 --- a/service/history/queue_factory_base_test.go +++ b/service/history/queue_factory_base_test.go @@ -25,6 +25,7 @@ package history import ( + "sync/atomic" "testing" "github.com/golang/mock/gomock" @@ -46,6 +47,7 @@ import ( "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/history/archival" "go.temporal.io/server/service/history/configs" + "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" ) @@ -156,6 +158,9 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx. clusterMetadata.EXPECT().GetCurrentClusterName().Return("module-test-cluster-name").AnyTimes() serviceResolver := membership.NewMockServiceResolver(controller) serviceResolver.EXPECT().MemberCount().Return(1).AnyTimes() + lazyLoadedOwnershipBasedQuotaScaler := shard.LazyLoadedOwnershipBasedQuotaScaler{ + Value: &atomic.Value{}, + } return fx.Supply( compileTimeDependencies{}, cfg, @@ -164,6 +169,7 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx. fx.Annotate(clusterMetadata, fx.As(new(cluster.Metadata))), fx.Annotate(serviceResolver, fx.As(new(membership.ServiceResolver))), fx.Annotate(clock.NewEventTimeSource(), fx.As(new(clock.TimeSource))), + lazyLoadedOwnershipBasedQuotaScaler, ) } diff --git a/service/history/shard/fx.go b/service/history/shard/fx.go index d426c8ff7b2..d997cb7d73b 100644 --- a/service/history/shard/fx.go +++ b/service/history/shard/fx.go @@ -25,25 +25,72 @@ package shard import ( - "go.temporal.io/server/service/history/configs" + "context" + "sync/atomic" + "go.uber.org/fx" "go.temporal.io/server/common" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/primitives" + "go.temporal.io/server/service/history/configs" +) + +var Module = fx.Options( + fx.Provide( + ControllerProvider, + func(impl *ControllerImpl) Controller { return impl }, + ContextFactoryProvider, + fx.Annotate( + func(p Controller) common.Pingable { return p }, + fx.ResultTags(`group:"deadlockDetectorRoots"`), + ), + ), + ownershipBasedQuotaScalerModule, ) -var Module = fx.Provide( - ControllerProvider, - func(impl *ControllerImpl) Controller { return impl }, - func(impl *ControllerImpl, cfg *configs.Config) (*OwnershipBasedQuotaScaler, error) { +var ownershipBasedQuotaScalerModule = fx.Options( + fx.Provide(func( + impl *ControllerImpl, + cfg *configs.Config, + ) (*OwnershipBasedQuotaScalerImpl, error) { return NewOwnershipBasedQuotaScaler( impl, int(cfg.NumberOfShards), nil, ) - }, - ContextFactoryProvider, - fx.Annotate( - func(p Controller) common.Pingable { return p }, - fx.ResultTags(`group:"deadlockDetectorRoots"`), - ), + }), + fx.Provide(func( + impl *OwnershipBasedQuotaScalerImpl, + ) OwnershipBasedQuotaScaler { + return impl + }), + fx.Provide(func() LazyLoadedOwnershipBasedQuotaScaler { + return LazyLoadedOwnershipBasedQuotaScaler{ + Value: &atomic.Value{}, + } + }), + fx.Invoke(initLazyLoadedOwnershipBasedQuotaScaler), + fx.Invoke(func( + lc fx.Lifecycle, + impl *OwnershipBasedQuotaScalerImpl, + ) { + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { + impl.Close() + return nil + }, + }) + }), ) + +func initLazyLoadedOwnershipBasedQuotaScaler( + serviceName primitives.ServiceName, + logger log.SnTaggedLogger, + ownershipBasedQuotaScaler OwnershipBasedQuotaScaler, + lazyLoadedOwnershipBasedQuotaScaler LazyLoadedOwnershipBasedQuotaScaler, +) { + lazyLoadedOwnershipBasedQuotaScaler.Store(ownershipBasedQuotaScaler) + logger.Info("Initialized lazy loaded OwnershipBasedQuotaScaler", tag.Service(serviceName)) +} diff --git a/service/history/shard/ownership_based_quota_calculator.go b/service/history/shard/ownership_based_quota_calculator.go new file mode 100644 index 00000000000..70f96b26b9e --- /dev/null +++ b/service/history/shard/ownership_based_quota_calculator.go @@ -0,0 +1,101 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package shard + +import ( + "go.temporal.io/server/common/quotas" +) + +type ( + OwnershipAwareQuotaCalculator struct { + quotas.ClusterAwareQuotaCalculator + + scaler OwnershipBasedQuotaScaler + } + + OwnershipAwareNamespaceQuotaCalculator struct { + quotas.ClusterAwareNamespaceSpecificQuotaCalculator + + scaler OwnershipBasedQuotaScaler + } +) + +func NewOwnershipAwareQuotaCalculator( + scaler OwnershipBasedQuotaScaler, + memberCounter quotas.MemberCounter, + perInstanceQuota func() int, + globalQuota func() int, +) *OwnershipAwareQuotaCalculator { + return &OwnershipAwareQuotaCalculator{ + ClusterAwareQuotaCalculator: quotas.ClusterAwareQuotaCalculator{ + MemberCounter: memberCounter, + PerInstanceQuota: perInstanceQuota, + GlobalQuota: globalQuota, + }, + scaler: scaler, + } +} + +func (c *OwnershipAwareQuotaCalculator) GetQuota() float64 { + if quota, ok := getOwnershipScaledQuota(c.scaler, c.GlobalQuota()); ok { + return quota + } + return c.ClusterAwareQuotaCalculator.GetQuota() +} + +func NewOwnershipAwareNamespaceQuotaCalculator( + scaler OwnershipBasedQuotaScaler, + memberCounter quotas.MemberCounter, + perInstanceQuota func(namespace string) int, + globalQuota func(namespace string) int, +) *OwnershipAwareNamespaceQuotaCalculator { + return &OwnershipAwareNamespaceQuotaCalculator{ + ClusterAwareNamespaceSpecificQuotaCalculator: quotas.ClusterAwareNamespaceSpecificQuotaCalculator{ + MemberCounter: memberCounter, + PerInstanceQuota: perInstanceQuota, + GlobalQuota: globalQuota, + }, + scaler: scaler, + } +} + +func (c *OwnershipAwareNamespaceQuotaCalculator) GetQuota(namespace string) float64 { + if quota, ok := getOwnershipScaledQuota(c.scaler, c.GlobalQuota(namespace)); ok { + return quota + } + return c.ClusterAwareNamespaceSpecificQuotaCalculator.GetQuota(namespace) +} + +func getOwnershipScaledQuota( + scaler OwnershipBasedQuotaScaler, + globalLimit int, +) (float64, bool) { + if globalLimit > 0 && scaler != nil { + if scaleFactor, ok := scaler.ScaleFactor(); ok { + return scaleFactor * float64(globalLimit), true + } + } + return 0, false +} diff --git a/service/history/shard/ownership_based_quota_scaler.go b/service/history/shard/ownership_based_quota_scaler.go index 557087cab6e..75cfecf551f 100644 --- a/service/history/shard/ownership_based_quota_scaler.go +++ b/service/history/shard/ownership_based_quota_scaler.go @@ -27,41 +27,32 @@ package shard import ( "errors" "fmt" - "math" "sync" "sync/atomic" - - "go.temporal.io/server/common/quotas" ) type ( - // OwnershipBasedQuotaScaler scales rate-limiting quotas linearly with the fraction of the total shards in the + OwnershipBasedQuotaScaler interface { + ScaleFactor() (float64, bool) + } + + // OwnershipBasedQuotaScalerImpl scales rate-limiting quotas linearly with the fraction of the total shards in the // cluster owned by this host. The purpose is to allocate more quota to hosts with a higher workload. This object // can be obtained from the fx Module within this package. - OwnershipBasedQuotaScaler struct { + OwnershipBasedQuotaScalerImpl struct { shardCounter ShardCounter totalNumShards int updateAppliedCallback chan struct{} - } - // OwnershipScaledRateBurst is a quotas.RateBurst implementation that scales the RPS and burst quotas linearly with - // the fraction of the total shards in the cluster owned by this host. The effective Rate and Burst are both - // multiplied by (shardCount / totalShards). Note that there is no scaling until the first shard count update is - // received. - OwnershipScaledRateBurst struct { - // rb is the base rate burst that we will scale. - rb quotas.RateBurst - // shardCount is the number of shards owned by this host. - shardCount atomic.Int64 - // totalShards is the total number of shards in the cluster. - totalShards int - // subscription is the subscription to the shard counter. + + shardCount atomic.Int64 subscription ShardCountSubscription - // updateAppliedCallback is a callback channel that is sent to when the shard count updates are applied. This is - // useful for testing. In production, it should be nil. - updateAppliedCallback chan struct{} - // wg is a wait group that is used to wait for the shard count subscription goroutine to exit. - wg sync.WaitGroup + shutdownWG sync.WaitGroup + } + + LazyLoadedOwnershipBasedQuotaScaler struct { + *atomic.Value // value type is OwnershipBasedQuotaScaler } + // ShardCountSubscription is a subscription to a ShardCounter. It provides a channel that receives the // shard count updates and an Unsubscribe method that unsubscribes from the counter. ShardCountSubscription interface { @@ -70,6 +61,7 @@ type ( // Unsubscribe unsubscribes from the shard counter. This closes the ShardCount channel. Unsubscribe() } + // ShardCounter is an observable object that emits the current shard count. ShardCounter interface { // SubscribeShardCount returns a ShardCountSubscription for receiving shard count updates. @@ -93,87 +85,51 @@ func NewOwnershipBasedQuotaScaler( shardCounter ShardCounter, totalNumShards int, updateAppliedCallback chan struct{}, -) (*OwnershipBasedQuotaScaler, error) { +) (*OwnershipBasedQuotaScalerImpl, error) { if totalNumShards <= 0 { return nil, fmt.Errorf("%w: %d", ErrNonPositiveTotalNumShards, totalNumShards) } - return &OwnershipBasedQuotaScaler{ + scaler := &OwnershipBasedQuotaScalerImpl{ shardCounter: shardCounter, totalNumShards: totalNumShards, updateAppliedCallback: updateAppliedCallback, - }, nil -} - -// ScaleRateBurst returns a new OwnershipScaledRateBurst instance which scales the rate/burst quotas of the base -// RateBurst by the fraction of the total shards in the cluster owned by this host. You should call -// OwnershipScaledRateBurst.StopScaling on the returned instance when you are done with it to avoid leaking resources. -func (s *OwnershipBasedQuotaScaler) ScaleRateBurst(rb quotas.RateBurst) *OwnershipScaledRateBurst { - return newOwnershipScaledRateBurst(rb, s.shardCounter, s.totalNumShards, s.updateAppliedCallback) -} - -func newOwnershipScaledRateBurst( - rb quotas.RateBurst, - shardCounter ShardCounter, - totalNumShards int, - updateAppliedCallback chan struct{}, -) *OwnershipScaledRateBurst { - subscription := shardCounter.SubscribeShardCount() - srb := &OwnershipScaledRateBurst{ - rb: rb, - totalShards: totalNumShards, - subscription: subscription, - updateAppliedCallback: updateAppliedCallback, + subscription: shardCounter.SubscribeShardCount(), } - // Initialize the shard count to the shardCountNotSet sentinel value so that we don't try to apply the scale factor - // until we receive the first shard count. - srb.shardCount.Store(shardCountNotSet) - srb.wg.Add(1) - go srb.startScaling() + scaler.shardCount.Store(shardCountNotSet) + scaler.shutdownWG.Add(1) + go func() { + defer scaler.shutdownWG.Done() - return srb -} - -// Rate returns the rate of the base rate limiter multiplied by the shard ownership share. -func (rb *OwnershipScaledRateBurst) Rate() float64 { - return rb.rb.Rate() * rb.scaleFactor() -} + for count := range scaler.subscription.ShardCount() { + scaler.shardCount.Store(int64(count)) + if scaler.updateAppliedCallback != nil { + scaler.updateAppliedCallback <- struct{}{} + } + } + }() -// Burst returns the burst quota of the base rate limiter multiplied by the shard ownership share, rounded up to the -// nearest integer. We round up because we don't want to let this drop to zero unless the base burst is zero. -func (rb *OwnershipScaledRateBurst) Burst() int { - return int(math.Ceil(float64(rb.rb.Burst()) * rb.scaleFactor())) + return scaler, nil } -// scaleFactor returns the fraction of the total shards in the cluster owned by this host. It returns 1.0 if there -// haven't been any shard count updates yet. -func (rb *OwnershipScaledRateBurst) scaleFactor() float64 { - shardCount := rb.shardCount.Load() +func (s *OwnershipBasedQuotaScalerImpl) ScaleFactor() (float64, bool) { + shardCount := s.shardCount.Load() if shardCount == shardCountNotSet { - // If the shard count is not set, then we haven't received the first shard count update yet. In this case, we - // return 1.0 so that the base rate/burst quotas are not scaled. - return 1.0 + return 0, false } - return float64(shardCount) / float64(rb.totalShards) + return float64(shardCount) / float64(s.totalNumShards), true } -func (rb *OwnershipScaledRateBurst) startScaling() { - defer rb.wg.Done() - - for shardCount := range rb.subscription.ShardCount() { - rb.shardCount.Store(int64(shardCount)) - - if rb.updateAppliedCallback != nil { - rb.updateAppliedCallback <- struct{}{} - } - } +func (s *OwnershipBasedQuotaScalerImpl) Close() { + s.subscription.Unsubscribe() + s.shutdownWG.Wait() } -// StopScaling unsubscribes from the shard counter and stops scaling the rate and burst quotas. This method blocks until -// the shard count subscription goroutine exits (which should be almost immediately). -func (rb *OwnershipScaledRateBurst) StopScaling() { - rb.subscription.Unsubscribe() - rb.wg.Wait() +func (s LazyLoadedOwnershipBasedQuotaScaler) ScaleFactor() (float64, bool) { + if value := s.Load(); value != nil { + return value.(OwnershipBasedQuotaScaler).ScaleFactor() + } + return 0, false } diff --git a/service/history/shard/ownership_based_quota_scaler_test.go b/service/history/shard/ownership_based_quota_scaler_test.go index e5efd3fe2d2..6f400e29899 100644 --- a/service/history/shard/ownership_based_quota_scaler_test.go +++ b/service/history/shard/ownership_based_quota_scaler_test.go @@ -51,24 +51,6 @@ func (s *shardCounter) Unsubscribe() { s.closed = true } -// constantRateBurst is a quotas.RateBurst implementation that returns the same rate and burst every time. -type constantRateBurst struct { - rate float64 - burst int -} - -func (rb constantRateBurst) Rate() float64 { - return rb.rate -} - -func (rb constantRateBurst) Burst() int { - return rb.burst -} - -func newRateBurst(rate float64, burst int) constantRateBurst { - return constantRateBurst{rate: rate, burst: burst} -} - func TestOwnershipBasedQuotaScaler_NonPositiveTotalNumShards(t *testing.T) { t.Parallel() @@ -84,7 +66,6 @@ func TestOwnershipBasedQuotaScaler_NonPositiveTotalNumShards(t *testing.T) { func TestOwnershipBasedQuotaScaler(t *testing.T) { t.Parallel() - rb := newRateBurst(2, 4) sc := &shardCounter{ ch: make(chan int), closed: false, @@ -93,22 +74,20 @@ func TestOwnershipBasedQuotaScaler(t *testing.T) { updateAppliedCallback := make(chan struct{}) scaler, err := shard.NewOwnershipBasedQuotaScaler(sc, totalNumShards, updateAppliedCallback) require.NoError(t, err) - srb := scaler.ScaleRateBurst(rb) - assert.Equal(t, 2.0, srb.Rate(), "Rate should be equal to the base rate before any shard count updates") - assert.Equal(t, 4, srb.Burst(), "Burst should be equal to the base burst before any shard count updates") - sc.ch <- 3 + _, ok := scaler.ScaleFactor() + assert.False(t, ok, "ScaleFactor should return false before any shard count updates") + sc.ch <- 3 // Wait for the update to be applied. Even though the send above is blocking, we still need to wait for the // rate/burst scaler's goroutine to use it to adjust the scale factor. <-updateAppliedCallback - // After the update is applied, the scale factor is calculated as 3/10 = 0.3, so the rate and burst should be - // multiplied by 0.3. Since the initial rate and burst are 2 and 4, respectively, the final rate and burst should be - // 0.6 and 1.2, respectively. However, since the burst is rounded up to the nearest integer, the final burst should - // be 2. - assert.Equal(t, 0.6, srb.Rate()) - assert.Equal(t, 2, srb.Burst()) + // After the update is applied, the scale factor is calculated as 3/10 = 0.3 + factor, ok := scaler.ScaleFactor() + assert.True(t, ok) + assert.Equal(t, 0.3, factor) + assert.False(t, sc.closed, "The shard counter should not be closed until the srb scaler is stopped") - srb.StopScaling() + scaler.Close() assert.True(t, sc.closed, "The shard counter should be closed after the srb scaler is stopped") }