From 10c731c15bc76e44fc36706e33f0e667ecb5a30e Mon Sep 17 00:00:00 2001 From: rodrigozhou Date: Thu, 9 May 2024 15:53:15 -0500 Subject: [PATCH] Dynamic config for the outbound queue group limiter settings --- common/dynamicconfig/constants.go | 10 +++++ service/history/configs/config.go | 4 ++ service/history/outbound_queue_factory.go | 51 +++++++++++++++++++---- 3 files changed, 58 insertions(+), 7 deletions(-) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 4f6bdc9b53e..6e767a72c81 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1644,6 +1644,16 @@ If value less or equal to 0, will fall back to HistoryPersistenceNamespaceMaxQPS 4, `OutboundQueueMaxReaderCount is the max number of readers in one multi-cursor outbound queue`, ) + OutboundQueueGroupLimiterBufferSize = NewDestinationIntSetting( + "history.outboundQueue.groupLimiter.bufferSize", + 100, + `OutboundQueueGroupLimiterBufferSize is the max buffer size of the group limiter`, + ) + OutboundQueueGroupLimiterConcurrency = NewDestinationIntSetting( + "history.outboundQueue.groupLimiter.concurrency", + 100, + `OutboundQueueGroupLimiterConcurrency is the concurrency of the group limiter`, + ) VisibilityTaskBatchSize = NewGlobalIntSetting( "history.visibilityTaskBatchSize", diff --git a/service/history/configs/config.go b/service/history/configs/config.go index c81ddb952cb..c2a7bd65453 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -164,6 +164,8 @@ type Config struct { OutboundProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn OutboundProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn OutboundQueueMaxReaderCount dynamicconfig.IntPropertyFn + OutboundQueueGroupLimiterBufferSize dynamicconfig.IntPropertyFnWithDestinationFilter + OutboundQueueGroupLimiterConcurrency dynamicconfig.IntPropertyFnWithDestinationFilter // ReplicatorQueueProcessor settings ReplicatorProcessorMaxPollInterval dynamicconfig.DurationPropertyFn @@ -462,6 +464,8 @@ func NewConfig( OutboundProcessorUpdateAckIntervalJitterCoefficient: dynamicconfig.OutboundProcessorUpdateAckIntervalJitterCoefficient.Get(dc), OutboundProcessorPollBackoffInterval: dynamicconfig.OutboundProcessorPollBackoffInterval.Get(dc), OutboundQueueMaxReaderCount: dynamicconfig.OutboundQueueMaxReaderCount.Get(dc), + OutboundQueueGroupLimiterBufferSize: dynamicconfig.OutboundQueueGroupLimiterBufferSize.Get(dc), + OutboundQueueGroupLimiterConcurrency: dynamicconfig.OutboundQueueGroupLimiterConcurrency.Get(dc), ReplicatorProcessorMaxPollInterval: dynamicconfig.ReplicatorProcessorMaxPollInterval.Get(dc), ReplicatorProcessorMaxPollIntervalJitterCoefficient: dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient.Get(dc), diff --git a/service/history/outbound_queue_factory.go b/service/history/outbound_queue_factory.go index 4495ce64efd..3f40831ea0a 100644 --- a/service/history/outbound_queue_factory.go +++ b/service/history/outbound_queue_factory.go @@ -29,9 +29,11 @@ import ( "go.uber.org/fx" "go.temporal.io/server/common/collection" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/quotas" ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/service/history/queues" @@ -42,25 +44,50 @@ import ( const outboundQueuePersistenceMaxRPSRatio = 0.3 +var ( + groupLimiterErrors = metrics.NewCounterDef("group_limiter_errors") +) + type outboundQueueFactoryParams struct { fx.In QueueFactoryBaseParams } -// TODO: get actual limits from dynamic config. type groupLimiter struct { key queues.StateMachineTaskTypeNamespaceIDAndDestination + + namespaceRegistry namespace.Registry + metricsHandler metrics.Handler + + bufferSize dynamicconfig.IntPropertyFnWithDestinationFilter + concurrency dynamicconfig.IntPropertyFnWithDestinationFilter } var _ ctasks.DynamicWorkerPoolLimiter = (*groupLimiter)(nil) -func (groupLimiter) BufferSize() int { - return 100 +func (l groupLimiter) BufferSize() int { + nsName, err := l.namespaceRegistry.GetNamespaceName(namespace.ID(l.key.NamespaceID)) + if err != nil { + // This is intentionally not failing the function in case of error. The task + // scheduler doesn't expect errors to happen, and modifying to handle errors + // would make it unnecessarily complex. Also, in this case, if the namespace + // registry fails to get the name, then the task itself will fail when it is + // processed and tries to get the namespace name. + groupLimiterErrors.With(l.metricsHandler). + Record(1, metrics.ReasonTag(metrics.ReasonString(err.Error()))) + } + return l.bufferSize(nsName.String(), l.key.Destination) } -func (groupLimiter) Concurrency() int { - return 100 +func (l groupLimiter) Concurrency() int { + nsName, err := l.namespaceRegistry.GetNamespaceName(namespace.ID(l.key.NamespaceID)) + if err != nil { + // Ditto comment above. + groupLimiterErrors.With(l.metricsHandler). + Record(1, metrics.ReasonTag(metrics.ReasonString(err.Error()))) + } + return l.concurrency(nsName.String(), l.key.Destination) } type outboundQueueFactory struct { @@ -127,7 +154,13 @@ func NewOutboundQueueFactory(params outboundQueueFactoryParams) QueueFactory { SchedulerFactory: func( key queues.StateMachineTaskTypeNamespaceIDAndDestination, ) ctasks.RunnableScheduler { - return ctasks.NewDynamicWorkerPoolScheduler(groupLimiter{key}) + return ctasks.NewDynamicWorkerPoolScheduler(groupLimiter{ + key: key, + namespaceRegistry: params.NamespaceRegistry, + metricsHandler: getOutbountQueueProcessorMetricsHandler(params.MetricsHandler), + bufferSize: params.Config.OutboundQueueGroupLimiterBufferSize, + concurrency: params.Config.OutboundQueueGroupLimiterConcurrency, + }) }, }, ), @@ -156,7 +189,7 @@ func (f *outboundQueueFactory) CreateQueue( workflowCache wcache.Cache, ) queues.Queue { logger := log.With(shardContext.GetLogger(), tag.ComponentOutboundQueue) - metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope)) + metricsHandler := getOutbountQueueProcessorMetricsHandler(f.MetricsHandler) currentClusterName := f.ClusterMetadata.GetCurrentClusterName() @@ -235,3 +268,7 @@ func (f *outboundQueueFactory) CreateQueue( factory, ) } + +func getOutbountQueueProcessorMetricsHandler(handler metrics.Handler) metrics.Handler { + return handler.WithTags(metrics.OperationTag(metrics.OperationOutboundQueueProcessorScope)) +}