Skip to content

Commit

Permalink
Add dynamic config for per instance visibility API rate limit and bur…
Browse files Browse the repository at this point in the history
…st (#2585)
  • Loading branch information
meiliang86 committed Mar 8, 2022
1 parent 5fb4f1c commit 6fa5b4a
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 39 deletions.
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Expand Up @@ -152,6 +152,12 @@ const (
FrontendMaxNamespaceBurstPerInstance = "frontend.namespaceBurst"
// FrontendMaxNamespaceCountPerInstance is workflow namespace count limit per second
FrontendMaxNamespaceCountPerInstance = "frontend.namespaceCount"
// FrontendMaxNamespaceVisibilityRPSPerInstance is namespace rate limit per second for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityRPSPerInstance = "frontend.namespaceRPS.visibility"
// FrontendMaxNamespaceVisibilityBurstPerInstance is namespace burst limit for visibility APIs.
// This config is EXPERIMENTAL and may be changed or removed in a later release.
FrontendMaxNamespaceVisibilityBurstPerInstance = "frontend.namespaceBurst.visibility"
// FrontendGlobalNamespaceRPS is workflow namespace rate limit per second for the whole cluster
FrontendGlobalNamespaceRPS = "frontend.globalNamespacerps"
// FrontendThrottledLogRPS is the rate limit on number of log messages emitted per second for throttled logger
Expand Down
2 changes: 1 addition & 1 deletion host/archival_test.go
Expand Up @@ -51,7 +51,7 @@ import (

const (
retryLimit = 20
retryBackoffTime = 200 * time.Millisecond
retryBackoffTime = 500 * time.Millisecond
)

func (s *integrationSuite) TestArchival_TimerQueueProcessor() {
Expand Down
10 changes: 6 additions & 4 deletions service/frontend/configs/quotas.go
Expand Up @@ -141,13 +141,15 @@ func (c *NamespaceRateBurstImpl) Burst() int {
}

func NewRequestToRateLimiter(
rateBurstFn quotas.RateBurst,
executionRateBurstFn quotas.RateBurst,
visibilityRateBurstFn quotas.RateBurst,
otherRateBurstFn quotas.RateBurst,
) quotas.RequestRateLimiter {
mapping := make(map[string]quotas.RequestRateLimiter)

executionRateLimiter := NewExecutionPriorityRateLimiter(rateBurstFn)
visibilityRateLimiter := NewVisibilityPriorityRateLimiter(rateBurstFn)
otherRateLimiter := NewOtherAPIPriorityRateLimiter(rateBurstFn)
executionRateLimiter := NewExecutionPriorityRateLimiter(executionRateBurstFn)
visibilityRateLimiter := NewVisibilityPriorityRateLimiter(visibilityRateBurstFn)
otherRateLimiter := NewOtherAPIPriorityRateLimiter(otherRateBurstFn)

for api := range ExecutionAPIToPriority {
mapping[api] = executionRateLimiter
Expand Down
51 changes: 30 additions & 21 deletions service/frontend/fx.go
Expand Up @@ -234,11 +234,12 @@ func TelemetryInterceptorProvider(
func RateLimitInterceptorProvider(
serviceConfig *Config,
) *interceptor.RateLimitInterceptor {
rateFn := func() float64 { return float64(serviceConfig.RPS()) }
return interceptor.NewRateLimitInterceptor(
configs.NewRequestToRateLimiter(
quotas.NewDefaultIncomingRateLimiter(
func() float64 { return float64(serviceConfig.RPS()) },
),
quotas.NewDefaultIncomingRateLimiter(rateFn),
quotas.NewDefaultIncomingRateLimiter(rateFn),
quotas.NewDefaultIncomingRateLimiter(rateFn),
),
map[string]int{},
)
Expand All @@ -249,25 +250,33 @@ func NamespaceRateLimitInterceptorProvider(
namespaceRegistry namespace.Registry,
frontendServiceResolver membership.ServiceResolver,
) *interceptor.NamespaceRateLimitInterceptor {
return interceptor.NewNamespaceRateLimitInterceptor(
namespaceRegistry,
quotas.NewNamespaceRateLimiter(
func(req quotas.Request) quotas.RequestRateLimiter {
return configs.NewRequestToRateLimiter(configs.NewNamespaceRateBurst(
req.Caller,
func(namespace string) float64 {
return namespaceRPS(
serviceConfig,
frontendServiceResolver,
namespace,
)
},
serviceConfig.MaxNamespaceBurstPerInstance,
))
},
),
map[string]int{},
rateFn := func(namespace string) float64 {
return namespaceRPS(
serviceConfig.MaxNamespaceRPSPerInstance,
serviceConfig.GlobalNamespaceRPS,
frontendServiceResolver,
namespace,
)
}

visibilityRateFn := func(namespace string) float64 {
return namespaceRPS(
serviceConfig.MaxNamespaceVisibilityRPSPerInstance,
serviceConfig.GlobalNamespaceRPS,
frontendServiceResolver,
namespace,
)
}
namespaceRateLimiter := quotas.NewNamespaceRateLimiter(
func(req quotas.Request) quotas.RequestRateLimiter {
return configs.NewRequestToRateLimiter(
configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, visibilityRateFn, serviceConfig.MaxNamespaceVisibilityBurstPerInstance),
configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance),
)
},
)
return interceptor.NewNamespaceRateLimitInterceptor(namespaceRegistry, namespaceRateLimiter, map[string]int{})
}

func NamespaceCountLimitInterceptorProvider(
Expand Down
31 changes: 18 additions & 13 deletions service/frontend/service.go
Expand Up @@ -67,16 +67,18 @@ type Config struct {
EnableReadFromSecondaryAdvancedVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter
ESIndexMaxResultWindow dynamicconfig.IntPropertyFn

HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
RPS dynamicconfig.IntPropertyFn
MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxNamespaceCountPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
GlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxIDLengthLimit dynamicconfig.IntPropertyFn
EnableClientVersionCheck dynamicconfig.BoolPropertyFn
DisallowQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter
ShutdownDrainDuration dynamicconfig.DurationPropertyFn
HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter
RPS dynamicconfig.IntPropertyFn
MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxNamespaceCountPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxNamespaceVisibilityRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxNamespaceVisibilityBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter
GlobalNamespaceRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxIDLengthLimit dynamicconfig.IntPropertyFn
EnableClientVersionCheck dynamicconfig.BoolPropertyFn
DisallowQuery dynamicconfig.BoolPropertyFnWithNamespaceFilter
ShutdownDrainDuration dynamicconfig.DurationPropertyFn

MaxBadBinaries dynamicconfig.IntPropertyFnWithNamespaceFilter

Expand Down Expand Up @@ -153,6 +155,8 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int32, esIndexName
MaxNamespaceRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceRPSPerInstance, 2400),
MaxNamespaceBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceBurstPerInstance, 4800),
MaxNamespaceCountPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceCountPerInstance, 1200),
MaxNamespaceVisibilityRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceVisibilityRPSPerInstance, 10),
MaxNamespaceVisibilityBurstPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceVisibilityBurstPerInstance, 10),
GlobalNamespaceRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendGlobalNamespaceRPS, 0),
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
MaxBadBinaries: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxBadBinaries, namespace.MaxBadBinaries),
Expand Down Expand Up @@ -310,12 +314,13 @@ func (s *Service) Stop() {
}

func namespaceRPS(
config *Config,
perInstanceRPSFn dynamicconfig.IntPropertyFnWithNamespaceFilter,
globalRPSFn dynamicconfig.IntPropertyFnWithNamespaceFilter,
frontendResolver membership.ServiceResolver,
namespace string,
) float64 {
hostRPS := float64(config.MaxNamespaceRPSPerInstance(namespace))
globalRPS := float64(config.GlobalNamespaceRPS(namespace))
hostRPS := float64(perInstanceRPSFn(namespace))
globalRPS := float64(globalRPSFn(namespace))
hosts := float64(numFrontendHosts(frontendResolver))

rps := hostRPS + globalRPS*math.Exp((1.0-hosts)/8.0)
Expand Down

0 comments on commit 6fa5b4a

Please sign in to comment.