Skip to content

Commit

Permalink
Task priority assigner implementation (#2740)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 20, 2022
1 parent ddf8abe commit dd4a427
Show file tree
Hide file tree
Showing 8 changed files with 336 additions and 9 deletions.
6 changes: 6 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ const (
// StandbyTaskMissingEventsDiscardDelay is the amount of time standby cluster's will wait (if events are missing)
// before discarding the task
StandbyTaskMissingEventsDiscardDelay = "history.standbyTaskMissingEventsDiscardDelay"
// TimerTaskHighPriorityRPS is the per namespace rps limit for processing timer tasks as high priority
TimerTaskHighPriorityRPS = "history.timerTaskHighPriorityRPS"
// TimerTaskBatchSize is batch size for timer processor to process tasks
TimerTaskBatchSize = "history.timerTaskBatchSize"
// TimerTaskWorkerCount is number of task workers for timer processor
Expand Down Expand Up @@ -341,6 +343,8 @@ const (
TimerProcessorHistoryArchivalSizeLimit = "history.timerProcessorHistoryArchivalSizeLimit"
// TimerProcessorArchivalTimeLimit is the upper time limit for inline history archival
TimerProcessorArchivalTimeLimit = "history.timerProcessorArchivalTimeLimit"
// TransferTaskHighPriorityRPS is the per namespace rps limit for processing timer tasks as high priority
TransferTaskHighPriorityRPS = "history.transferTaskHighPriorityRPS"
// TransferTaskBatchSize is batch size for transferQueueProcessor
TransferTaskBatchSize = "history.transferTaskBatchSize"
// TransferProcessorFailoverMaxPollRPS is max poll rate per second for transferQueueProcessor
Expand Down Expand Up @@ -376,6 +380,8 @@ const (
// TransferProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
TransferProcessorVisibilityArchivalTimeLimit = "history.transferProcessorVisibilityArchivalTimeLimit"

// VisibilityTaskHighPriorityRPS is the per namespace rps limit for processing timer tasks as high priority
VisibilityTaskHighPriorityRPS = "history.visibilityTaskHighPriorityRPS"
// VisibilityTaskBatchSize is batch size for visibilityQueueProcessor
VisibilityTaskBatchSize = "history.visibilityTaskBatchSize"
// VisibilityProcessorFailoverMaxPollRPS is max poll rate per second for visibilityQueueProcessor
Expand Down
9 changes: 3 additions & 6 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (
StatsTypeTagName = "stats_type"
CacheTypeTagName = "cache_type"
FailureTagName = "failure"
TaskCategoryTagName = "task_category"
TaskTypeTagName = "task_type"
QueueTypeTagName = "queue_type"
visibilityTypeTagName = "visibility_type"
Expand Down Expand Up @@ -1934,12 +1935,9 @@ const (
TaskNoUserQueueLatency
TaskRedispatchQueuePendingTasksTimer
TaskScheduleToStartLatency

TaskThrottledCounter
TransferTaskMissingEventCounter

TransferTaskThrottledCounter
TimerTaskThrottledCounter

ActivityE2ELatency
AckLevelUpdateCounter
AckLevelUpdateFailedCounter
Expand Down Expand Up @@ -2421,8 +2419,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TransferTaskMissingEventCounter: NewCounterDef("transfer_task_missing_event_counter"),
TaskBatchCompleteCounter: NewCounterDef("task_batch_complete_counter"),
TaskRedispatchQueuePendingTasksTimer: NewTimerDef("task_redispatch_queue_pending_tasks"),
TransferTaskThrottledCounter: NewCounterDef("transfer_task_throttled_counter"),
TimerTaskThrottledCounter: NewCounterDef("timer_task_throttled_counter"),
TaskThrottledCounter: NewCounterDef("task_throttled_counter"),
ActivityE2ELatency: NewTimerDef("activity_end_to_end_latency"),
AckLevelUpdateCounter: NewCounterDef("ack_level_update"),
AckLevelUpdateFailedCounter: NewCounterDef("ack_level_update_failed"),
Expand Down
7 changes: 7 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ func FailureTag(value string) Tag {
return &tagImpl{key: FailureTagName, value: value}
}

func TaskCategoryTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
}
return &tagImpl{key: TaskCategoryTagName, value: value}
}

func TaskTypeTag(value string) Tag {
if len(value) == 0 {
value = unknownValue
Expand Down
6 changes: 6 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Config struct {
StandbyTaskMissingEventsDiscardDelay dynamicconfig.DurationPropertyFn

// TimerQueueProcessor settings
TimerTaskHighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
TimerTaskBatchSize dynamicconfig.IntPropertyFn
TimerTaskWorkerCount dynamicconfig.IntPropertyFn
TimerTaskMaxRetryCount dynamicconfig.IntPropertyFn
Expand All @@ -101,6 +102,7 @@ type Config struct {
TimerProcessorArchivalTimeLimit dynamicconfig.DurationPropertyFn

// TransferQueueProcessor settings
TransferTaskHighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
TransferTaskBatchSize dynamicconfig.IntPropertyFn
TransferTaskWorkerCount dynamicconfig.IntPropertyFn
TransferTaskMaxRetryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -223,6 +225,7 @@ type Config struct {

// ===== Visibility related =====
// VisibilityQueueProcessor settings
VisibilityTaskHighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
VisibilityTaskBatchSize dynamicconfig.IntPropertyFn
VisibilityTaskWorkerCount dynamicconfig.IntPropertyFn
VisibilityTaskMaxRetryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -293,6 +296,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
StandbyTaskMissingEventsResendDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsResendDelay, 10*time.Minute),
StandbyTaskMissingEventsDiscardDelay: dc.GetDurationProperty(dynamicconfig.StandbyTaskMissingEventsDiscardDelay, 15*time.Minute),

TimerTaskHighPriorityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TimerTaskHighPriorityRPS, 500),
TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100),
Expand All @@ -312,6 +316,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicconfig.TimerProcessorHistoryArchivalSizeLimit, 500*1024),
TimerProcessorArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TimerProcessorArchivalTimeLimit, 1*time.Second),

TransferTaskHighPriorityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TransferTaskHighPriorityRPS, 500),
TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100),
TransferProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorFailoverMaxPollRPS, 1),
TransferProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxPollRPS, 20),
Expand Down Expand Up @@ -405,6 +410,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
SkipReapplicationByNamespaceID: dc.GetBoolPropertyFnWithNamespaceIDFilter(dynamicconfig.SkipReapplicationByNamespaceID, false),

// ===== Visibility related =====
VisibilityTaskHighPriorityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.VisibilityTaskHighPriorityRPS, 500),
VisibilityTaskBatchSize: dc.GetIntProperty(dynamicconfig.VisibilityTaskBatchSize, 100),
VisibilityProcessorFailoverMaxPollRPS: dc.GetIntProperty(dynamicconfig.VisibilityProcessorFailoverMaxPollRPS, 1),
VisibilityProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxPollRPS, 20),
Expand Down
12 changes: 9 additions & 3 deletions service/history/configs/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ const (
TaskLowPrioritySubclass
)

var (
TaskPriorityHigh = GetTaskPriority(TaskHighPriorityClass, TaskDefaultPrioritySubclass)
TaskPriorityDefault = GetTaskPriority(TaskDefaultPriorityClass, TaskDefaultPrioritySubclass)
TaskPriorityLow = GetTaskPriority(TaskLowPriorityClass, TaskDefaultPrioritySubclass)
)

var DefaultTaskPriorityWeight = map[int]int{
GetTaskPriority(TaskHighPriorityClass, TaskDefaultPrioritySubclass): 200,
GetTaskPriority(TaskDefaultPriorityClass, TaskDefaultPrioritySubclass): 100,
GetTaskPriority(TaskLowPriorityClass, TaskDefaultPrioritySubclass): 50,
TaskPriorityHigh: 200,
TaskPriorityDefault: 100,
TaskPriorityLow: 50,
}

func ConvertWeightsToDynamicConfigValue(
Expand Down
137 changes: 137 additions & 0 deletions service/history/queues/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,146 @@

package queues

import (
"sync"

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/service/history/configs"
)

type (
// PriorityAssigner assigns priority to task executables
PriorityAssigner interface {
Assign(Executable) error
}

PriorityAssignerOptions struct {
HighPriorityRPS dynamicconfig.IntPropertyFnWithNamespaceFilter
CriticalRetryAttempts dynamicconfig.IntPropertyFn
}

priorityAssignerImpl struct {
currentClusterName string
namespaceRegistry namespace.Registry
scope metrics.Scope
options PriorityAssignerOptions

sync.RWMutex
rateLimiters map[string]quotas.RateLimiter
}
)

func NewPriorityAssigner(
currentClusterName string,
namespaceRegistry namespace.Registry,
options PriorityAssignerOptions,
metricsClient metrics.Client,
) PriorityAssigner {
return &priorityAssignerImpl{
currentClusterName: currentClusterName,
namespaceRegistry: namespaceRegistry,
scope: metricsClient.Scope(metrics.TaskPriorityAssignerScope),
options: options,
rateLimiters: make(map[string]quotas.RateLimiter),
}
}

func (a *priorityAssignerImpl) Assign(executable Executable) error {

/*
Summary:
- High priority: active tasks from active queue processor and no-op tasks (currently ignoring overrides)
- Default priority: throttled tasks and selected task types (e.g. delete history events)
- Low priority: standby tasks and tasks keep retrying
Only candidates for high priority will consume the token in the rate limiter for high priority tasks and
potentially be throttled.
*/

if executable.Attempt() > a.options.CriticalRetryAttempts() {
executable.SetPriority(configs.TaskPriorityLow)
return nil
}

namespaceEntry, err := a.namespaceRegistry.GetNamespaceByID(namespace.ID(executable.GetNamespaceID()))
if err != nil {
return err
}

namespaceName := namespaceEntry.Name().String()
namespaceActive := namespaceEntry.ActiveInCluster(a.currentClusterName)
// TODO: remove QueueType() and the special logic for assgining high priority to no-op tasks
// after merging active/standby queue processor or performing task filtering before submitting
// tasks to worker pool
taskActive := executable.QueueType() != QueueTypeStandbyTransfer &&
executable.QueueType() != QueueTypeStandbyTimer

if !taskActive && !namespaceActive {
// standby tasks
executable.SetPriority(configs.TaskPriorityLow)
return nil
}

if (taskActive && !namespaceActive) || (!taskActive && namespaceActive) {
// no-op tasks, set to high priority to ack them as soon as possible
// don't consume rps limit
// ignoring overrides for some no-op standby tasks for now
executable.SetPriority(configs.TaskPriorityHigh)
return nil
}

// active tasks for active namespaces
switch executable.GetType() {
case enumsspb.TASK_TYPE_DELETE_HISTORY_EVENT,
enumsspb.TASK_TYPE_TRANSFER_DELETE_EXECUTION,
enumsspb.TASK_TYPE_VISIBILITY_DELETE_EXECUTION:
// add more task types here if we believe it's ok to delay those tasks
// and assign them the same priority as throttled tasks
executable.SetPriority(configs.TaskPriorityDefault)
return nil
}

ratelimiter := a.getOrCreateRateLimiter(executable.GetNamespaceID())
if !ratelimiter.Allow() {
executable.SetPriority(configs.TaskPriorityDefault)

category := executable.GetCategory()
a.scope.Tagged(
metrics.NamespaceTag(namespaceName),
metrics.TaskCategoryTag(category.Name()),
).IncCounter(metrics.TaskThrottledCounter)
return nil
}

executable.SetPriority(configs.TaskPriorityHigh)
return nil
}

func (a *priorityAssignerImpl) getOrCreateRateLimiter(
namespaceName string,
) quotas.RateLimiter {
a.RLock()
rateLimiter, ok := a.rateLimiters[namespaceName]
a.RUnlock()
if ok {
return rateLimiter
}

newRateLimiter := quotas.NewDefaultIncomingRateLimiter(
func() float64 { return float64(a.options.HighPriorityRPS(namespaceName)) },
)

a.Lock()
defer a.Unlock()

rateLimiter, ok = a.rateLimiters[namespaceName]
if ok {
return rateLimiter
}
a.rateLimiters[namespaceName] = newRateLimiter
return newRateLimiter
}

0 comments on commit dd4a427

Please sign in to comment.