Skip to content

Commit

Permalink
Dynamic rate limiter (#4390)
Browse files Browse the repository at this point in the history
* moving window average

* remove channel avg impl

* add signal aggregator

* adjust record fn

* add health signal clients

* inject signal aggregator

* fix tests

* add metric emission

* add health request rate limiter

* WIP

* race condition

* WIP

* add health request rate limiter

* bench test rate limiter

* WIP

* Revert "race condition"

This reverts commit e283bfb.

* Revert "add metric emission"

This reverts commit 859950e.

* emit per shard RPS metric

* cleanup

* merge metric and signal clients

* cleanup

* cleanup

* linting

* global dynamic rate limiter

* cleanup

* remove generics

* WIP

* cleanup

* fix deferred metric fn

* fix defer metric fn

* fix clients

* types

* simple dynamic rate limiter test

* linting

* tests

* tests

* acquire lock once

* locks

* array moving average

* Revert "array moving average"

This reverts commit 98f2b66.

* cleanup

* emit per shard RPS

* array average

* feedback

* cleanup

* handle nil health signals

* tests

* uncomment signal collection

* refactor dynamic config properties

* cleanup

* add aggregation feature flag

* fix test

* cleanup

* avoid potential race condition

* feedback

* fix

* remove test
  • Loading branch information
pdoerner committed Jun 2, 2023
1 parent 3bee7f7 commit de7f679
Show file tree
Hide file tree
Showing 22 changed files with 395 additions and 47 deletions.
File renamed without changes.
37 changes: 37 additions & 0 deletions common/aggregate/noop_moving_window_average.go
@@ -0,0 +1,37 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package aggregate

var NoopMovingWindowAverage MovingWindowAverage = newNoopMovingWindowAverage()

type (
noopMovingWindowAverage struct{}
)

func newNoopMovingWindowAverage() *noopMovingWindowAverage { return &noopMovingWindowAverage{} }

func (a *noopMovingWindowAverage) Record(_ int64) {}

func (a *noopMovingWindowAverage) Average() float64 { return 0 }
18 changes: 16 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -106,8 +106,10 @@ const (
EnableEagerWorkflowStart = "system.enableEagerWorkflowStart"
// NamespaceCacheRefreshInterval is the key for namespace cache refresh interval dynamic config
NamespaceCacheRefreshInterval = "system.namespaceCacheRefreshInterval"
// PersistenceHealthSignalCollectionEnabled determines whether persistence health signal collection/aggregation is enabled
PersistenceHealthSignalCollectionEnabled = "system.persistenceHealthSignalCollectionEnabled"
// PersistenceHealthSignalMetricsEnabled determines whether persistence shard RPS metrics are emitted
PersistenceHealthSignalMetricsEnabled = "system.persistenceHealthSignalMetricsEnabled"
// PersistenceHealthSignalAggregationEnabled determines whether persistence latency and error averages are tracked
PersistenceHealthSignalAggregationEnabled = "system.persistenceHealthSignalAggregationEnabled"
// PersistenceHealthSignalWindowSize is the time window size in seconds for aggregating persistence signals
PersistenceHealthSignalWindowSize = "system.persistenceHealthSignalWindowSize"
// PersistenceHealthSignalBufferSize is the maximum number of persistence signals to buffer in memory per signal key
Expand Down Expand Up @@ -206,6 +208,9 @@ const (
FrontendPersistenceNamespaceMaxQPS = "frontend.persistenceNamespaceMaxQPS"
// FrontendEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in frontend persistence client
FrontendEnablePersistencePriorityRateLimiting = "frontend.enablePersistencePriorityRateLimiting"
// FrontendPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
FrontendPersistenceDynamicRateLimitingParams = "frontend.persistenceDynamicRateLimitingParams"
// FrontendVisibilityMaxPageSize is default max size for ListWorkflowExecutions in one page
FrontendVisibilityMaxPageSize = "frontend.visibilityMaxPageSize"
// FrontendHistoryMaxPageSize is default max size for GetWorkflowExecutionHistory in one page
Expand Down Expand Up @@ -349,6 +354,9 @@ const (
MatchingPersistenceNamespaceMaxQPS = "matching.persistenceNamespaceMaxQPS"
// MatchingEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in matching persistence client
MatchingEnablePersistencePriorityRateLimiting = "matching.enablePersistencePriorityRateLimiting"
// MatchingPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
MatchingPersistenceDynamicRateLimitingParams = "matching.persistenceDynamicRateLimitingParams"
// MatchingMinTaskThrottlingBurstSize is the minimum burst size for task queue throttling
MatchingMinTaskThrottlingBurstSize = "matching.minTaskThrottlingBurstSize"
// MatchingGetTasksBatchSize is the maximum batch size to fetch from the task buffer
Expand Down Expand Up @@ -413,6 +421,9 @@ const (
HistoryPersistencePerShardNamespaceMaxQPS = "history.persistencePerShardNamespaceMaxQPS"
// HistoryEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in history persistence client
HistoryEnablePersistencePriorityRateLimiting = "history.enablePersistencePriorityRateLimiting"
// HistoryPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
HistoryPersistenceDynamicRateLimitingParams = "history.persistenceDynamicRateLimitingParams"
// HistoryLongPollExpirationInterval is the long poll expiration interval in the history service
HistoryLongPollExpirationInterval = "history.longPollExpirationInterval"
// HistoryCacheInitialSize is initial size of history cache
Expand Down Expand Up @@ -748,6 +759,9 @@ const (
WorkerPersistenceNamespaceMaxQPS = "worker.persistenceNamespaceMaxQPS"
// WorkerEnablePersistencePriorityRateLimiting indicates if priority rate limiting is enabled in worker persistence client
WorkerEnablePersistencePriorityRateLimiting = "worker.enablePersistencePriorityRateLimiting"
// WorkerPersistenceDynamicRateLimitingParams is a map that contains all adjustable dynamic rate limiting params
// see DefaultDynamicRateLimitingParams for available options and defaults
WorkerPersistenceDynamicRateLimitingParams = "worker.persistenceDynamicRateLimitingParams"
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
WorkerIndexerConcurrency = "worker.indexerConcurrency"
// WorkerESProcessorNumOfWorkers is num of workers for esProcessor
Expand Down
34 changes: 33 additions & 1 deletion common/dynamicconfig/shared_constants.go
Expand Up @@ -45,4 +45,36 @@ var defaultNumTaskQueuePartitions = []ConstrainedValue{
},
}

var DefaultPerShardNamespaceRPSMax = func(namespace string) int { return 0 }
var DefaultPerShardNamespaceRPSMax = GetIntPropertyFilteredByNamespace(0)

const (
// dynamic config map keys and defaults for client.DynamicRateLimitingParams for controlling dynamic rate limiting options
// dynamicRateLimitEnabledKey toggles whether dynamic rate limiting is enabled
dynamicRateLimitEnabledKey = "enabled"
dynamicRateLimitEnabledDefault = false
// dynamicRateLimitRefreshIntervalKey is how often the rate limit and dynamic properties are refreshed. should be a string timestamp e.g. 10s
// even if the rate limiter is disabled, this property will still determine how often the dynamic config is reevaluated
dynamicRateLimitRefreshIntervalKey = "refreshInterval"
dynamicRateLimitRefreshIntervalDefault = "10s"
// dynamicRateLimitLatencyThresholdKey is the maximum average latency in ms before the rate limiter should backoff
dynamicRateLimitLatencyThresholdKey = "latencyThreshold"
dynamicRateLimitLatencyThresholdDefault = 0.0 // will not do backoff based on latency
// dynamicRateLimitErrorThresholdKey is the maximum ratio of errors:total_requests before the rate limiter should backoff. should be between 0 and 1
dynamicRateLimitErrorThresholdKey = "errorThreshold"
dynamicRateLimitErrorThresholdDefault = 0.0 // will not do backoff based on errors
// dynamicRateLimitBackoffStepSizeKey is the amount the rate limit multiplier is reduced when backing off. should be between 0 and 1
dynamicRateLimitBackoffStepSizeKey = "rateBackoffStepSize"
dynamicRateLimitBackoffStepSizeDefault = 0.3
// dynamicRateLimitIncreaseStepSizeKey the amount the rate limit multiplier is increased when the system is healthy. should be between 0 and 1
dynamicRateLimitIncreaseStepSizeKey = "rateIncreaseStepSize"
dynamicRateLimitIncreaseStepSizeDefault = 0.1
)

var DefaultDynamicRateLimitingParams = map[string]interface{}{
dynamicRateLimitEnabledKey: dynamicRateLimitEnabledDefault,
dynamicRateLimitRefreshIntervalKey: dynamicRateLimitRefreshIntervalDefault,
dynamicRateLimitLatencyThresholdKey: dynamicRateLimitLatencyThresholdDefault,
dynamicRateLimitErrorThresholdKey: dynamicRateLimitErrorThresholdDefault,
dynamicRateLimitBackoffStepSizeKey: dynamicRateLimitBackoffStepSizeDefault,
dynamicRateLimitIncreaseStepSizeKey: dynamicRateLimitIncreaseStepSizeDefault,
}
16 changes: 12 additions & 4 deletions common/persistence/client/fx.go
Expand Up @@ -45,7 +45,10 @@ type (
PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter
PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter
EnablePriorityRateLimiting dynamicconfig.BoolPropertyFn
ClusterName string

DynamicRateLimitingParams dynamicconfig.MapPropertyFn

ClusterName string

NewFactoryParams struct {
fx.In
Expand All @@ -61,6 +64,7 @@ type (
MetricsHandler metrics.Handler
Logger log.Logger
HealthSignals persistence.HealthSignalAggregator
DynamicRateLimitingParams DynamicRateLimitingParams
}

FactoryProviderFn func(NewFactoryParams) Factory
Expand Down Expand Up @@ -88,6 +92,9 @@ func FactoryProvider(
params.PersistenceMaxQPS,
params.PersistencePerShardNamespaceMaxQPS,
RequestPriorityFn,
params.HealthSignals,
params.DynamicRateLimitingParams,
params.Logger,
)
} else {
requestRatelimiter = NewNoopPriorityRateLimiter(params.PersistenceMaxQPS)
Expand All @@ -111,10 +118,11 @@ func HealthSignalAggregatorProvider(
metricsHandler metrics.Handler,
logger log.Logger,
) persistence.HealthSignalAggregator {
if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalCollectionEnabled, true)() {
if dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalMetricsEnabled, true)() {
return persistence.NewHealthSignalAggregatorImpl(
dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 3*time.Second)(),
dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 500)(),
dynamicCollection.GetBoolProperty(dynamicconfig.PersistenceHealthSignalAggregationEnabled, true)(),
dynamicCollection.GetDurationProperty(dynamicconfig.PersistenceHealthSignalWindowSize, 10*time.Second)(),
dynamicCollection.GetIntProperty(dynamicconfig.PersistenceHealthSignalBufferSize, 5000)(),
metricsHandler,
dynamicCollection.GetIntProperty(dynamicconfig.ShardRPSWarnLimit, 50),
logger,
Expand Down
199 changes: 199 additions & 0 deletions common/persistence/client/health_request_rate_limiter.go
@@ -0,0 +1,199 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package client

import (
"context"
"encoding/json"
"math"
"sync/atomic"
"time"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/quotas"
)

const (
DefaultRefreshInterval = 10 * time.Second
DefaultRateBurstRatio = 1.0
DefaultMinRateMultiplier = 0.1
DefaultMaxRateMultiplier = 1.0
)

type (
HealthRequestRateLimiterImpl struct {
enabled *atomic.Bool
params DynamicRateLimitingParams // dynamic config map
curOptions dynamicRateLimitingOptions // current dynamic config values (updated on refresh)

rateLimiter *quotas.RateLimiterImpl
healthSignals persistence.HealthSignalAggregator

refreshTimer *time.Ticker

rateFn quotas.RateFn
rateToBurstRatio float64

minRateMultiplier float64
maxRateMultiplier float64
curRateMultiplier float64

logger log.Logger
}

dynamicRateLimitingOptions struct {
Enabled bool

RefreshInterval string // string returned by json.Unmarshal will be parsed into a duration

// thresholds which should trigger backoff if exceeded
LatencyThreshold float64
ErrorThreshold float64

// if either threshold is exceeded, the current rate multiplier will be reduced by this amount
RateBackoffStepSize float64
// when the system is healthy and current rate < max rate, the current rate multiplier will be
// increased by this amount
RateIncreaseStepSize float64
}
)

var _ quotas.RequestRateLimiter = (*HealthRequestRateLimiterImpl)(nil)

func NewHealthRequestRateLimiterImpl(
healthSignals persistence.HealthSignalAggregator,
rateFn quotas.RateFn,
params DynamicRateLimitingParams,
logger log.Logger,
) *HealthRequestRateLimiterImpl {
limiter := &HealthRequestRateLimiterImpl{
enabled: &atomic.Bool{},
rateLimiter: quotas.NewRateLimiter(rateFn(), int(DefaultRateBurstRatio*rateFn())),
healthSignals: healthSignals,
rateFn: rateFn,
params: params,
refreshTimer: time.NewTicker(DefaultRefreshInterval),
rateToBurstRatio: DefaultRateBurstRatio,
minRateMultiplier: DefaultMinRateMultiplier,
maxRateMultiplier: DefaultMaxRateMultiplier,
curRateMultiplier: DefaultMaxRateMultiplier,
logger: logger,
}
limiter.refreshDynamicParams()
return limiter
}

func (rl *HealthRequestRateLimiterImpl) Allow(now time.Time, request quotas.Request) bool {
rl.maybeRefresh()
if !rl.enabled.Load() {
return true
}
return rl.rateLimiter.AllowN(now, request.Token)
}

func (rl *HealthRequestRateLimiterImpl) Reserve(now time.Time, request quotas.Request) quotas.Reservation {
rl.maybeRefresh()
if !rl.enabled.Load() {
return quotas.NoopReservation
}
return rl.rateLimiter.ReserveN(now, request.Token)
}

func (rl *HealthRequestRateLimiterImpl) Wait(ctx context.Context, request quotas.Request) error {
rl.maybeRefresh()
if !rl.enabled.Load() {
return nil
}
return rl.rateLimiter.WaitN(ctx, request.Token)
}

func (rl *HealthRequestRateLimiterImpl) maybeRefresh() {
select {
case <-rl.refreshTimer.C:
rl.refreshDynamicParams()
if rl.enabled.Load() {
rl.refreshRate()
}
rl.updateRefreshTimer()

default:
// no-op
}
}

func (rl *HealthRequestRateLimiterImpl) refreshRate() {
if rl.latencyThresholdExceeded() || rl.errorThresholdExceeded() {
// limit exceeded, do backoff
rl.curRateMultiplier = math.Max(rl.minRateMultiplier, rl.curRateMultiplier-rl.curOptions.RateBackoffStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
} else if rl.curRateMultiplier < rl.maxRateMultiplier {
// already doing backoff and under thresholds, increase limit
rl.curRateMultiplier = math.Min(rl.maxRateMultiplier, rl.curRateMultiplier+rl.curOptions.RateIncreaseStepSize)
rl.rateLimiter.SetRate(rl.curRateMultiplier * rl.rateFn())
rl.rateLimiter.SetBurst(int(rl.rateToBurstRatio * rl.rateFn()))
}
}

func (rl *HealthRequestRateLimiterImpl) refreshDynamicParams() {
var options dynamicRateLimitingOptions
b, err := json.Marshal(rl.params())
if err != nil {
rl.logger.Warn("Error marshalling dynamic rate limiting params. Dynamic rate limiting is disabled.", tag.Error(err))
rl.enabled.Store(false)
return
}

err = json.Unmarshal(b, &options)
if err != nil {
rl.logger.Warn("Error unmarshalling dynamic rate limiting params. Dynamic rate limiting is disabled.", tag.Error(err))
rl.enabled.Store(false)
return
}

rl.enabled.Store(options.Enabled)
rl.curOptions = options
}

func (rl *HealthRequestRateLimiterImpl) updateRefreshTimer() {
if len(rl.curOptions.RefreshInterval) > 0 {
if refreshDuration, err := timestamp.ParseDurationDefaultSeconds(rl.curOptions.RefreshInterval); err != nil {
rl.logger.Warn("Error parsing dynamic rate limit refreshInterval timestamp. Using previous value.", tag.Error(err))
} else {
rl.refreshTimer.Reset(refreshDuration)
}
}
}

func (rl *HealthRequestRateLimiterImpl) latencyThresholdExceeded() bool {
return rl.curOptions.LatencyThreshold > 0 && rl.healthSignals.AverageLatency() > rl.curOptions.LatencyThreshold
}

func (rl *HealthRequestRateLimiterImpl) errorThresholdExceeded() bool {
return rl.curOptions.ErrorThreshold > 0 && rl.healthSignals.ErrorRatio() > rl.curOptions.ErrorThreshold
}

0 comments on commit de7f679

Please sign in to comment.