Skip to content

Commit

Permalink
Slow down workflow task retry (#2765)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 26, 2022
1 parent cd88c61 commit ca586dd
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
2 changes: 2 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ const (
WorkflowTaskHeartbeatTimeout = "history.workflowTaskHeartbeatTimeout"
// WorkflowTaskCriticalAttempts is the number of attempts for a workflow task that's regarded as critical
WorkflowTaskCriticalAttempts = "history.workflowTaskCriticalAttempt"
// WorkflowTaskRetryMaxInterval is the maximum interval added to a workflow task's startToClose timeout for slowing down retry
WorkflowTaskRetryMaxInterval = "history.workflowTaskRetryMaxInterval"
// DefaultWorkflowTaskTimeout for a workflow task
DefaultWorkflowTaskTimeout = "history.defaultWorkflowTaskTimeout"
// SkipReapplicationByNamespaceID is whether skipping a event re-application for a namespace
Expand Down
2 changes: 2 additions & 0 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ type Config struct {
// So that workflow task will be scheduled to another worker(by clear stickyness)
WorkflowTaskHeartbeatTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
WorkflowTaskCriticalAttempts dynamicconfig.IntPropertyFn
WorkflowTaskRetryMaxInterval dynamicconfig.DurationPropertyFn

// The following is used by the new RPC replication stack
ReplicationTaskFetcherParallelism dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -385,6 +386,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365),
WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30),
WorkflowTaskCriticalAttempts: dc.GetIntProperty(dynamicconfig.WorkflowTaskCriticalAttempts, 10),
WorkflowTaskRetryMaxInterval: dc.GetDurationProperty(dynamicconfig.WorkflowTaskRetryMaxInterval, time.Minute*10),

ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 4),
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
Expand Down
43 changes: 38 additions & 5 deletions service/history/workflow/workflow_task_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

enumsspb "go.temporal.io/server/api/enums/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/primitives/timestamp"
Expand All @@ -51,6 +52,11 @@ type (
}
)

const (
workflowTaskRetryBackoffMinAttempts = 3
workflowTaskRetryInitialInterval = 5 * time.Second
)

func newWorkflowTaskStateMachine(
ms *MutableStateImpl,
) *workflowTaskStateMachine {
Expand Down Expand Up @@ -113,6 +119,10 @@ func (m *workflowTaskStateMachine) ReplicateTransientWorkflowTaskScheduled() (*W
// 2. if no failover happen during the life time of this transient workflow task
// then ReplicateWorkflowTaskScheduledEvent will overwrite everything
// including the workflow task schedule ID
//
// regarding workflow task timeout calculation:
// 1. the attempt will be set to 1, so we still use default worflow task timeout
// 2. ReplicateWorkflowTaskScheduledEvent will overwrite everything including workflowTaskTimeout
workflowTask := &WorkflowTaskInfo{
Version: m.ms.GetCurrentVersion(),
ScheduleID: m.ms.GetNextEventID(),
Expand Down Expand Up @@ -274,11 +284,13 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
scheduleID := m.ms.GetNextEventID() // we will generate the schedule event later for repeatedly failing workflow tasks
// Avoid creating new history events when workflow tasks are continuously failing
scheduleTime := m.ms.timeSource.Now().UTC()
if m.ms.executionInfo.WorkflowTaskAttempt == 1 {
attempt := m.ms.executionInfo.WorkflowTaskAttempt
startToCloseTimeoutSeconds := int32(m.getStartToCloseTimeout(taskTimeout, attempt).Seconds())
if attempt == 1 {
newWorkflowTaskEvent = m.ms.hBuilder.AddWorkflowTaskScheduledEvent(
taskQueue,
int32(taskTimeout.Seconds()),
m.ms.executionInfo.WorkflowTaskAttempt,
startToCloseTimeoutSeconds,
attempt,
m.ms.timeSource.Now(),
)
scheduleID = newWorkflowTaskEvent.GetEventId()
Expand All @@ -289,8 +301,8 @@ func (m *workflowTaskStateMachine) AddWorkflowTaskScheduledEventAsHeartbeat(
m.ms.GetCurrentVersion(),
scheduleID,
taskQueue,
int32(taskTimeout.Seconds()),
m.ms.executionInfo.WorkflowTaskAttempt,
startToCloseTimeoutSeconds,
attempt,
&scheduleTime,
originalScheduledTimestamp,
)
Expand Down Expand Up @@ -769,3 +781,24 @@ func (m *workflowTaskStateMachine) emitWorkflowTaskAttemptStats(
)
}
}

func (m *workflowTaskStateMachine) getStartToCloseTimeout(
defaultTimeout time.Duration,
attempt int32,
) time.Duration {
// This util function is only for calculating active workflow task timeout.
// Transient workflow task in passive cluster won't call this function and
// always use default timeout as it will either be completely overwritten by
// a replicated workflow schedule event from active cluster, or if used, it's
// attempt will be reset to 1.
// Check ReplicateTransientWorkflowTaskScheduled for details.

if attempt <= workflowTaskRetryBackoffMinAttempts {
return defaultTimeout
}

policy := backoff.NewExponentialRetryPolicy(workflowTaskRetryInitialInterval)
policy.SetMaximumInterval(m.ms.shard.GetConfig().WorkflowTaskRetryMaxInterval())
policy.SetExpirationInterval(backoff.NoInterval)
return defaultTimeout + policy.ComputeNextDelay(0, int(attempt)-workflowTaskRetryBackoffMinAttempts)
}

0 comments on commit ca586dd

Please sign in to comment.