Skip to content

Commit

Permalink
Shard count based rate limiting (#4923)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Oct 6, 2023
1 parent 2f62c28 commit d54f4d8
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 153 deletions.
26 changes: 20 additions & 6 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,32 @@ func ESProcessorConfigProvider(
func PersistenceRateLimitingParamsProvider(
serviceConfig *configs.Config,
persistenceLazyLoadedServiceResolver service.PersistenceLazyLoadedServiceResolver,
ownershipBasedQuotaScaler shard.LazyLoadedOwnershipBasedQuotaScaler,
) service.PersistenceRateLimitingParams {
return service.NewPersistenceRateLimitingParams(
calculator := shard.NewOwnershipAwareQuotaCalculator(
ownershipBasedQuotaScaler,
persistenceLazyLoadedServiceResolver,
serviceConfig.PersistenceMaxQPS,
serviceConfig.PersistenceGlobalMaxQPS,
)
namespaceCalculator := shard.NewOwnershipAwareNamespaceQuotaCalculator(
ownershipBasedQuotaScaler,
persistenceLazyLoadedServiceResolver,
serviceConfig.PersistenceNamespaceMaxQPS,
serviceConfig.PersistenceGlobalNamespaceMaxQPS,
serviceConfig.PersistencePerShardNamespaceMaxQPS,
serviceConfig.EnablePersistencePriorityRateLimiting,
serviceConfig.OperatorRPSRatio,
serviceConfig.PersistenceDynamicRateLimitingParams,
persistenceLazyLoadedServiceResolver,
)
return service.PersistenceRateLimitingParams{
PersistenceMaxQps: func() int {
return int(calculator.GetQuota())
},
PersistenceNamespaceMaxQps: func(namespace string) int {
return int(namespaceCalculator.GetQuota(namespace))
},
PersistencePerShardNamespaceMaxQPS: persistenceClient.PersistencePerShardNamespaceMaxQPS(serviceConfig.PersistencePerShardNamespaceMaxQPS),
EnablePriorityRateLimiting: persistenceClient.EnablePriorityRateLimiting(serviceConfig.EnablePersistencePriorityRateLimiting),
OperatorRPSRatio: persistenceClient.OperatorRPSRatio(serviceConfig.OperatorRPSRatio),
DynamicRateLimitingParams: persistenceClient.DynamicRateLimitingParams(serviceConfig.PersistenceDynamicRateLimitingParams),
}
}

func VisibilityManagerProvider(
Expand Down
45 changes: 25 additions & 20 deletions service/history/queue_factory_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,31 +158,36 @@ func getOptionalQueueFactories(
}

func QueueSchedulerRateLimiterProvider(
ownershipBasedQuotaScaler shard.LazyLoadedOwnershipBasedQuotaScaler,
serviceResolver membership.ServiceResolver,
config *configs.Config,
timeSource clock.TimeSource,
) (queues.SchedulerRateLimiter, error) {
return queues.NewPrioritySchedulerRateLimiter(
quotas.ClusterAwareNamespaceSpecificQuotaCalculator{
MemberCounter: serviceResolver,
PerInstanceQuota: config.TaskSchedulerNamespaceMaxQPS,
GlobalQuota: config.TaskSchedulerGlobalNamespaceMaxQPS,
}.GetQuota,
quotas.ClusterAwareQuotaCalculator{
MemberCounter: serviceResolver,
PerInstanceQuota: config.TaskSchedulerMaxQPS,
GlobalQuota: config.TaskSchedulerGlobalMaxQPS,
}.GetQuota,
quotas.ClusterAwareNamespaceSpecificQuotaCalculator{
MemberCounter: serviceResolver,
PerInstanceQuota: config.PersistenceNamespaceMaxQPS,
GlobalQuota: config.PersistenceGlobalNamespaceMaxQPS,
}.GetQuota,
quotas.ClusterAwareQuotaCalculator{
MemberCounter: serviceResolver,
PerInstanceQuota: config.PersistenceMaxQPS,
GlobalQuota: config.PersistenceGlobalMaxQPS,
}.GetQuota,
shard.NewOwnershipAwareNamespaceQuotaCalculator(
ownershipBasedQuotaScaler,
serviceResolver,
config.TaskSchedulerNamespaceMaxQPS,
config.TaskSchedulerGlobalNamespaceMaxQPS,
).GetQuota,
shard.NewOwnershipAwareQuotaCalculator(
ownershipBasedQuotaScaler,
serviceResolver,
config.TaskSchedulerMaxQPS,
config.TaskSchedulerGlobalMaxQPS,
).GetQuota,
shard.NewOwnershipAwareNamespaceQuotaCalculator(
ownershipBasedQuotaScaler,
serviceResolver,
config.PersistenceNamespaceMaxQPS,
config.PersistenceGlobalNamespaceMaxQPS,
).GetQuota,
shard.NewOwnershipAwareQuotaCalculator(
ownershipBasedQuotaScaler,
serviceResolver,
config.PersistenceMaxQPS,
config.PersistenceGlobalMaxQPS,
).GetQuota,
)
}

Expand Down
6 changes: 6 additions & 0 deletions service/history/queue_factory_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package history

import (
"sync/atomic"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -46,6 +47,7 @@ import (
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/service/history/archival"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/workflow"
)
Expand Down Expand Up @@ -156,6 +158,9 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx.
clusterMetadata.EXPECT().GetCurrentClusterName().Return("module-test-cluster-name").AnyTimes()
serviceResolver := membership.NewMockServiceResolver(controller)
serviceResolver.EXPECT().MemberCount().Return(1).AnyTimes()
lazyLoadedOwnershipBasedQuotaScaler := shard.LazyLoadedOwnershipBasedQuotaScaler{
Value: &atomic.Value{},
}
return fx.Supply(
compileTimeDependencies{},
cfg,
Expand All @@ -164,6 +169,7 @@ func getModuleDependencies(controller *gomock.Controller, c *moduleTestCase) fx.
fx.Annotate(clusterMetadata, fx.As(new(cluster.Metadata))),
fx.Annotate(serviceResolver, fx.As(new(membership.ServiceResolver))),
fx.Annotate(clock.NewEventTimeSource(), fx.As(new(clock.TimeSource))),
lazyLoadedOwnershipBasedQuotaScaler,
)
}

Expand Down
69 changes: 58 additions & 11 deletions service/history/shard/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,72 @@
package shard

import (
"go.temporal.io/server/service/history/configs"
"context"
"sync/atomic"

"go.uber.org/fx"

"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/service/history/configs"
)

var Module = fx.Options(
fx.Provide(
ControllerProvider,
func(impl *ControllerImpl) Controller { return impl },
ContextFactoryProvider,
fx.Annotate(
func(p Controller) common.Pingable { return p },
fx.ResultTags(`group:"deadlockDetectorRoots"`),
),
),
ownershipBasedQuotaScalerModule,
)

var Module = fx.Provide(
ControllerProvider,
func(impl *ControllerImpl) Controller { return impl },
func(impl *ControllerImpl, cfg *configs.Config) (*OwnershipBasedQuotaScaler, error) {
var ownershipBasedQuotaScalerModule = fx.Options(
fx.Provide(func(
impl *ControllerImpl,
cfg *configs.Config,
) (*OwnershipBasedQuotaScalerImpl, error) {
return NewOwnershipBasedQuotaScaler(
impl,
int(cfg.NumberOfShards),
nil,
)
},
ContextFactoryProvider,
fx.Annotate(
func(p Controller) common.Pingable { return p },
fx.ResultTags(`group:"deadlockDetectorRoots"`),
),
}),
fx.Provide(func(
impl *OwnershipBasedQuotaScalerImpl,
) OwnershipBasedQuotaScaler {
return impl
}),
fx.Provide(func() LazyLoadedOwnershipBasedQuotaScaler {
return LazyLoadedOwnershipBasedQuotaScaler{
Value: &atomic.Value{},
}
}),
fx.Invoke(initLazyLoadedOwnershipBasedQuotaScaler),
fx.Invoke(func(
lc fx.Lifecycle,
impl *OwnershipBasedQuotaScalerImpl,
) {
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
impl.Close()
return nil
},
})
}),
)

func initLazyLoadedOwnershipBasedQuotaScaler(
serviceName primitives.ServiceName,
logger log.SnTaggedLogger,
ownershipBasedQuotaScaler OwnershipBasedQuotaScaler,
lazyLoadedOwnershipBasedQuotaScaler LazyLoadedOwnershipBasedQuotaScaler,
) {
lazyLoadedOwnershipBasedQuotaScaler.Store(ownershipBasedQuotaScaler)
logger.Info("Initialized lazy loaded OwnershipBasedQuotaScaler", tag.Service(serviceName))
}
101 changes: 101 additions & 0 deletions service/history/shard/ownership_based_quota_calculator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package shard

import (
"go.temporal.io/server/common/quotas"
)

type (
OwnershipAwareQuotaCalculator struct {
quotas.ClusterAwareQuotaCalculator

scaler OwnershipBasedQuotaScaler
}

OwnershipAwareNamespaceQuotaCalculator struct {
quotas.ClusterAwareNamespaceSpecificQuotaCalculator

scaler OwnershipBasedQuotaScaler
}
)

func NewOwnershipAwareQuotaCalculator(
scaler OwnershipBasedQuotaScaler,
memberCounter quotas.MemberCounter,
perInstanceQuota func() int,
globalQuota func() int,
) *OwnershipAwareQuotaCalculator {
return &OwnershipAwareQuotaCalculator{
ClusterAwareQuotaCalculator: quotas.ClusterAwareQuotaCalculator{
MemberCounter: memberCounter,
PerInstanceQuota: perInstanceQuota,
GlobalQuota: globalQuota,
},
scaler: scaler,
}
}

func (c *OwnershipAwareQuotaCalculator) GetQuota() float64 {
if quota, ok := getOwnershipScaledQuota(c.scaler, c.GlobalQuota()); ok {
return quota
}
return c.ClusterAwareQuotaCalculator.GetQuota()
}

func NewOwnershipAwareNamespaceQuotaCalculator(
scaler OwnershipBasedQuotaScaler,
memberCounter quotas.MemberCounter,
perInstanceQuota func(namespace string) int,
globalQuota func(namespace string) int,
) *OwnershipAwareNamespaceQuotaCalculator {
return &OwnershipAwareNamespaceQuotaCalculator{
ClusterAwareNamespaceSpecificQuotaCalculator: quotas.ClusterAwareNamespaceSpecificQuotaCalculator{
MemberCounter: memberCounter,
PerInstanceQuota: perInstanceQuota,
GlobalQuota: globalQuota,
},
scaler: scaler,
}
}

func (c *OwnershipAwareNamespaceQuotaCalculator) GetQuota(namespace string) float64 {
if quota, ok := getOwnershipScaledQuota(c.scaler, c.GlobalQuota(namespace)); ok {
return quota
}
return c.ClusterAwareNamespaceSpecificQuotaCalculator.GetQuota(namespace)
}

func getOwnershipScaledQuota(
scaler OwnershipBasedQuotaScaler,
globalLimit int,
) (float64, bool) {
if globalLimit > 0 && scaler != nil {
if scaleFactor, ok := scaler.ScaleFactor(); ok {
return scaleFactor * float64(globalLimit), true
}
}
return 0, false
}

0 comments on commit d54f4d8

Please sign in to comment.