Skip to content

Commit

Permalink
Dynamic config for the outbound queue host scheduler task rps (#5866)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Dynamic config for the outbound queue host scheduler task rps

## Why?
<!-- Tell your future self why have you made these changes -->
Be able to config by destination.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
rodrigozhou authored May 13, 2024
1 parent 69607fd commit ee3e492
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1654,6 +1654,11 @@ If value less or equal to 0, will fall back to HistoryPersistenceNamespaceMaxQPS
100,
`OutboundQueueGroupLimiterConcurrency is the concurrency of the group limiter`,
)
OutboundQueueHostSchedulerMaxTaskRPS = NewDestinationFloatSetting(
"history.outboundQueue.hostScheduler.maxTaskRPS",
100.0,
`OutboundQueueHostSchedulerMaxTaskRPS is the host scheduler max task RPS`,
)

VisibilityTaskBatchSize = NewGlobalIntSetting(
"history.visibilityTaskBatchSize",
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type Config struct {
OutboundQueueMaxReaderCount dynamicconfig.IntPropertyFn
OutboundQueueGroupLimiterBufferSize dynamicconfig.IntPropertyFnWithDestinationFilter
OutboundQueueGroupLimiterConcurrency dynamicconfig.IntPropertyFnWithDestinationFilter
OutboundQueueHostSchedulerMaxTaskRPS dynamicconfig.FloatPropertyFnWithDestinationFilter

// ReplicatorQueueProcessor settings
ReplicatorProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
Expand Down Expand Up @@ -466,6 +467,7 @@ func NewConfig(
OutboundQueueMaxReaderCount: dynamicconfig.OutboundQueueMaxReaderCount.Get(dc),
OutboundQueueGroupLimiterBufferSize: dynamicconfig.OutboundQueueGroupLimiterBufferSize.Get(dc),
OutboundQueueGroupLimiterConcurrency: dynamicconfig.OutboundQueueGroupLimiterConcurrency.Get(dc),
OutboundQueueHostSchedulerMaxTaskRPS: dynamicconfig.OutboundQueueHostSchedulerMaxTaskRPS.Get(dc),

ReplicatorProcessorMaxPollInterval: dynamicconfig.ReplicatorProcessorMaxPollInterval.Get(dc),
ReplicatorProcessorMaxPollIntervalJitterCoefficient: dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient.Get(dc),
Expand Down
27 changes: 20 additions & 7 deletions service/history/outbound_queue_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
const outboundQueuePersistenceMaxRPSRatio = 0.3

var (
groupLimiterErrors = metrics.NewCounterDef("group_limiter_errors")
readNamespaceErrors = metrics.NewCounterDef("read_namespace_errors")
)

type outboundQueueFactoryParams struct {
Expand Down Expand Up @@ -74,7 +74,7 @@ func (l groupLimiter) BufferSize() int {
// would make it unnecessarily complex. Also, in this case, if the namespace
// registry fails to get the name, then the task itself will fail when it is
// processed and tries to get the namespace name.
groupLimiterErrors.With(l.metricsHandler).
readNamespaceErrors.With(l.metricsHandler).
Record(1, metrics.ReasonTag(metrics.ReasonString(err.Error())))
}
return l.bufferSize(nsName.String(), l.key.Destination)
Expand All @@ -84,7 +84,7 @@ func (l groupLimiter) Concurrency() int {
nsName, err := l.namespaceRegistry.GetNamespaceName(namespace.ID(l.key.NamespaceID))
if err != nil {
// Ditto comment above.
groupLimiterErrors.With(l.metricsHandler).
readNamespaceErrors.With(l.metricsHandler).
Record(1, metrics.ReasonTag(metrics.ReasonString(err.Error())))
}
return l.concurrency(nsName.String(), l.key.Destination)
Expand All @@ -100,10 +100,23 @@ type outboundQueueFactory struct {
}

func NewOutboundQueueFactory(params outboundQueueFactoryParams) QueueFactory {
metricsHandler := getOutbountQueueProcessorMetricsHandler(params.MetricsHandler)

rateLimiterPool := collection.NewOnceMap(
func(queues.StateMachineTaskTypeNamespaceIDAndDestination) quotas.RateLimiter {
// TODO: get this value from dynamic config.
return quotas.NewDefaultOutgoingRateLimiter(func() float64 { return 100.0 })
func(key queues.StateMachineTaskTypeNamespaceIDAndDestination) quotas.RateLimiter {
return quotas.NewDefaultOutgoingRateLimiter(func() float64 {
nsName, err := params.NamespaceRegistry.GetNamespaceName(namespace.ID(key.NamespaceID))
if err != nil {
// This is intentionally not failing the function in case of error. The task
// scheduler doesn't expect errors to happen, and modifying to handle errors
// would make it unnecessarily complex. Also, in this case, if the namespace
// registry fails to get the name, then the task itself will fail when it is
// processed and tries to get the namespace name.
readNamespaceErrors.With(metricsHandler).
Record(1, metrics.ReasonTag(metrics.ReasonString(err.Error())))
}
return params.Config.OutboundQueueHostSchedulerMaxTaskRPS(nsName.String(), key.Destination)
})
},
)
circuitBreakerPool := collection.NewOnceMap(
Expand Down Expand Up @@ -157,7 +170,7 @@ func NewOutboundQueueFactory(params outboundQueueFactoryParams) QueueFactory {
return ctasks.NewDynamicWorkerPoolScheduler(groupLimiter{
key: key,
namespaceRegistry: params.NamespaceRegistry,
metricsHandler: getOutbountQueueProcessorMetricsHandler(params.MetricsHandler),
metricsHandler: metricsHandler,
bufferSize: params.Config.OutboundQueueGroupLimiterBufferSize,
concurrency: params.Config.OutboundQueueGroupLimiterConcurrency,
})
Expand Down

0 comments on commit ee3e492

Please sign in to comment.