Skip to content

Commit

Permalink
Tune host level task processing performance (#2955)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jun 6, 2022
1 parent e5b4e14 commit 96a0bcd
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 12 deletions.
18 changes: 9 additions & 9 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TimerTaskHighPriorityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TimerTaskHighPriorityRPS, 500),
TimerTaskBatchSize: dc.GetIntProperty(dynamicconfig.TimerTaskBatchSize, 100),
TimerTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TimerTaskWorkerCount, 10),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 100),
TimerTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TimerTaskMaxRetryCount, 20),
TimerProcessorEnableSingleCursor: dc.GetBoolProperty(dynamicconfig.TimerProcessorEnableSingleCursor, false),
TimerProcessorEnablePriorityTaskScheduler: dc.GetBoolProperty(dynamicconfig.TimerProcessorEnablePriorityTaskScheduler, false),
TimerProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TimerProcessorSchedulerWorkerCount, 100),
TimerProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TimerProcessorSchedulerWorkerCount, 200),
TimerProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TimerProcessorSchedulerQueueSize, 10),
TimerProcessorSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TimerProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultTaskPriorityWeight)),
TimerProcessorCompleteTimerFailureRetryCount: dc.GetIntProperty(dynamicconfig.TimerProcessorCompleteTimerFailureRetryCount, 10),
Expand All @@ -331,7 +331,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TimerProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollHostRPS, 0),
TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute),
TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15),
TimerProcessorRescheduleInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorRescheduleInterval, 5*time.Second),
TimerProcessorRescheduleInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorRescheduleInterval, 3*time.Second),
TimerProcessorRescheduleIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorRescheduleIntervalJitterCoefficient, 0.15),
TimerProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxReschedulerSize, 10000),
TimerProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorPollBackoffInterval, 5*time.Second),
Expand All @@ -342,10 +342,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TransferTaskHighPriorityRPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.TransferTaskHighPriorityRPS, 500),
TransferTaskBatchSize: dc.GetIntProperty(dynamicconfig.TransferTaskBatchSize, 100),
TransferTaskWorkerCount: dc.GetIntProperty(dynamicconfig.TransferTaskWorkerCount, 10),
TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 100),
TransferTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.TransferTaskMaxRetryCount, 20),
TransferProcessorEnableSingleCursor: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnableSingleCursor, false),
TransferProcessorEnablePriorityTaskScheduler: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnablePriorityTaskScheduler, false),
TransferProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TransferProcessorSchedulerWorkerCount, 100),
TransferProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TransferProcessorSchedulerWorkerCount, 200),
TransferProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TransferProcessorSchedulerQueueSize, 10),
TransferProcessorSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TransferProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultTaskPriorityWeight)),
TransferProcessorCompleteTransferFailureRetryCount: dc.GetIntProperty(dynamicconfig.TransferProcessorCompleteTransferFailureRetryCount, 10),
Expand All @@ -357,7 +357,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 30*time.Second),
TransferProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorUpdateAckIntervalJitterCoefficient, 0.15),
TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 60*time.Second),
TransferProcessorRescheduleInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorRescheduleInterval, 5*time.Second),
TransferProcessorRescheduleInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorRescheduleInterval, 3*time.Second),
TransferProcessorRescheduleIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorRescheduleIntervalJitterCoefficient, 0.15),
TransferProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxReschedulerSize, 10000),
TransferProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorPollBackoffInterval, 5*time.Second),
Expand Down Expand Up @@ -445,9 +445,9 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
VisibilityProcessorMaxPollRPS: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxPollRPS, 20),
VisibilityProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxPollHostRPS, 0),
VisibilityTaskWorkerCount: dc.GetIntProperty(dynamicconfig.VisibilityTaskWorkerCount, 10),
VisibilityTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.VisibilityTaskMaxRetryCount, 100),
VisibilityTaskMaxRetryCount: dc.GetIntProperty(dynamicconfig.VisibilityTaskMaxRetryCount, 20),
VisibilityProcessorEnablePriorityTaskScheduler: dc.GetBoolProperty(dynamicconfig.VisibilityProcessorEnablePriorityTaskScheduler, false),
VisibilityProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.VisibilityProcessorSchedulerWorkerCount, 100),
VisibilityProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.VisibilityProcessorSchedulerWorkerCount, 200),
VisibilityProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.VisibilityProcessorSchedulerQueueSize, 10),
VisibilityProcessorSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.VisibilityProcessorSchedulerRoundRobinWeights, ConvertWeightsToDynamicConfigValue(DefaultTaskPriorityWeight)),
VisibilityProcessorCompleteTaskFailureRetryCount: dc.GetIntProperty(dynamicconfig.VisibilityProcessorCompleteTaskFailureRetryCount, 10),
Expand All @@ -456,7 +456,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
VisibilityProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorUpdateAckInterval, 30*time.Second),
VisibilityProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.VisibilityProcessorUpdateAckIntervalJitterCoefficient, 0.15),
VisibilityProcessorCompleteTaskInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorCompleteTaskInterval, 60*time.Second),
VisibilityProcessorRescheduleInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorRescheduleInterval, 5*time.Second),
VisibilityProcessorRescheduleInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorRescheduleInterval, 3*time.Second),
VisibilityProcessorRescheduleIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.VisibilityProcessorRescheduleIntervalJitterCoefficient, 0.15),
VisibilityProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxReschedulerSize, 10000),
VisibilityProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorPollBackoffInterval, 5*time.Second),
Expand Down
13 changes: 11 additions & 2 deletions service/history/queues/executable.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.temporal.io/server/common/metrics"
ctasks "go.temporal.io/server/common/tasks"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ var (
const (
// resubmitMaxAttempts is the max number of attempts we may skip rescheduler when a task is Nacked.
// check the comment in shouldResubmitOnNack() for more details
resubmitMaxAttempts = 10
resubmitMaxAttempts = 20
)

type (
Expand Down Expand Up @@ -234,6 +235,10 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
func (e *executableImpl) IsRetryableError(err error) bool {
// this determines if the executable should be retried within one submission to scheduler

if shard.IsShardOwnershipLostError(err) {
return false
}

// don't retry immediately for resource exhausted which may incur more load
// context deadline exceed may also suggested downstream is overloaded, so don't retry immediately
if common.IsResourceExhausted(err) || common.IsContextDeadlineExceededErr(err) {
Expand Down Expand Up @@ -336,7 +341,11 @@ func (e *executableImpl) shouldResubmitOnNack(attempt int, err error) bool {
// this is an optimization for skipping rescheduler and retry the task sooner
// this can be useful for errors like unable to get workflow lock, which doesn't
// have to backoff for a long time and wait for the periodic rescheduling.
return (err == consts.ErrWorkflowBusy || e.IsRetryableError(err)) && e.Attempt() < resubmitMaxAttempts
if e.Attempt() > resubmitMaxAttempts {
return false
}

return err == consts.ErrWorkflowBusy || common.IsContextDeadlineExceededErr(err) || e.IsRetryableError(err)
}

func (e *executableImpl) rescheduleBackoff(attempt int) time.Duration {
Expand Down
4 changes: 4 additions & 0 deletions service/history/shard/controller_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ func (c *ControllerImpl) ShardIDs() []int32 {
}

func IsShardOwnershipLostError(err error) bool {
if err == ErrShardClosed {
return true
}

switch err.(type) {
case *persistence.ShardOwnershipLostError:
return true
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueTaskExecutorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import (

const (
taskTimeout = time.Second * 3
taskGetExecutionTimeout = 500 * time.Millisecond
taskGetExecutionTimeout = time.Second
taskHistoryOpTimeout = 20 * time.Second
)

Expand Down

0 comments on commit 96a0bcd

Please sign in to comment.