Skip to content

Commit

Permalink
Sets default WorkflowExecutionTimeout and WorkflowRunTimeout on the r…
Browse files Browse the repository at this point in the history
…equest in FrontEnd. (#763)

When starting a Workflow, we auto fill-in "defaults" for Workflow Execution and Run Timeouts in the history service. Unfortunately, some aspects of the code (such as our retry logic) depends on the default value being filled in even when that auto fill-in logic has not yet been executed. This PR ensures that defaults are filled in at Workflow Start Request creation time to avoid this.

As an example, this was causing retries to not be executed with MaxAttempts = 0 if the user did not explicitly set the WorkflowExecutionTime.

Unit / Integration / Manual testing

Minimal risk
  • Loading branch information
mastermanu committed Sep 25, 2020
1 parent 666dadb commit 8fd6fd3
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 42 deletions.
16 changes: 16 additions & 0 deletions common/constants.go
Expand Up @@ -107,6 +107,22 @@ const (
MaxWorkflowRetentionPeriod = 30 * time.Hour * 24
)

const (
// DefaultWorkflowExecutionTimeout is the Default Workflow Execution timeout applied to a Workflow when
// this value is not explicitly set by the user on a Start Workflow request
// Intention is 10 years
DefaultWorkflowExecutionTimeout = 24 * 365 * 10 * time.Hour

// DefaultWorkflowRunTimeout is the Default Workflow Run timeout applied to a Workflow when
// this value is not explicitly set by the user on a Start Workflow request
// Intention is 10 years
DefaultWorkflowRunTimeout = 24 * 365 * 10 * time.Hour

// DefaultWorkflowTaskTimeout sets the Default Workflow Task timeout for a Workflow
// when the value is not explicitly set by the user. Intention is 10 seconds.
DefaultWorkflowTaskTimeout = 10 * time.Second
)

const (
// DefaultTransactionSizeLimit is the largest allowed transaction size to persistence
DefaultTransactionSizeLimit = 14 * 1024 * 1024
Expand Down
52 changes: 52 additions & 0 deletions common/util.go
Expand Up @@ -50,6 +50,7 @@ import (
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/service/dynamicconfig"
serviceerrors "go.temporal.io/server/common/serviceerror"
)

Expand Down Expand Up @@ -668,3 +669,54 @@ func GetPayloadsMapSize(data map[string]*commonpb.Payloads) int {

return size
}

// GetWorkflowExecutionTimeout gets the default allowed execution timeout or truncates the requested value to the maximum allowed timeout
func GetWorkflowExecutionTimeout(
namespace string,
requestedTimeout time.Duration,
getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter,
getMaxAllowedTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration {

if requestedTimeout == 0 {
requestedTimeout = getDefaultTimeoutFunc(namespace)
}

return timestamp.MinDuration(
requestedTimeout,
getMaxAllowedTimeoutFunc(namespace),
)
}

// GetWorkflowRunTimeout gets the default allowed run timeout or truncates the requested value to the maximum allowed timeout
func GetWorkflowRunTimeout(
namespace string,
requestedTimeout time.Duration,
executionTimeout time.Duration,
getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter,
getMaxAllowedTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration {

if requestedTimeout == 0 {
requestedTimeout = getDefaultTimeoutFunc(namespace)
}

return timestamp.MinDuration(
timestamp.MinDuration(
requestedTimeout,
executionTimeout,
),
getMaxAllowedTimeoutFunc(namespace),
)
}

// GetWorkflowTaskTimeout gets the default allowed execution timeout or truncates the requested value to the maximum allowed timeout
func GetWorkflowTaskTimeout(
namespace string,
requestedTimeout time.Duration,
getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration {

if requestedTimeout == 0 {
return getDefaultTimeoutFunc(namespace)
}

return requestedTimeout
}
18 changes: 18 additions & 0 deletions service/frontend/service.go
Expand Up @@ -108,6 +108,19 @@ type Config struct {

EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn

// The execution timeout a workflow execution defaults to if not specified
DefaultWorkflowExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// The run timeout a workflow run defaults to if not specified
DefaultWorkflowRunTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter

// The execution timeout a workflow execution defaults to if not specified
MaxWorkflowExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
// The run timeout a workflow run defaults to if not specified
MaxWorkflowRunTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter

// DefaultWorkflowTaskTimeout the default workflow task timeout
DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter
}

// NewConfig returns new service config with default values
Expand Down Expand Up @@ -149,6 +162,11 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true),
DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()),
DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
}
}

Expand Down
114 changes: 94 additions & 20 deletions service/frontend/workflowHandler.go
Expand Up @@ -452,16 +452,8 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request *
return nil, err
}

if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
if err := wh.validateStartWorkflowTimeouts(scope, request); err != nil {
return nil, err
}

if request.GetRequestId() == "" {
Expand Down Expand Up @@ -2144,16 +2136,8 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
return nil, wh.error(errRequestIDTooLong, scope)
}

if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
if err := wh.validateSignalWithStartWorkflowTimeouts(scope, request); err != nil {
return nil, err
}

if err := wh.validateRetryPolicy(request.GetNamespace(), request.RetryPolicy); err != nil {
Expand Down Expand Up @@ -3755,3 +3739,93 @@ func (wh *WorkflowHandler) validateRetryPolicy(namespace string, retryPolicy *co
common.EnsureRetryPolicyDefaults(retryPolicy, defaultWorkflowRetrySettings)
return common.ValidateRetryPolicy(retryPolicy)
}

func (wh *WorkflowHandler) validateStartWorkflowTimeouts(
scope metrics.Scope,
request *workflowservice.StartWorkflowExecutionRequest) error {
if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
}

request.WorkflowExecutionTimeout = timestamp.DurationPtr(
common.GetWorkflowExecutionTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowExecutionTimeout,
wh.config.MaxWorkflowExecutionTimeout,
),
)

request.WorkflowRunTimeout = timestamp.DurationPtr(
common.GetWorkflowRunTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowRunTimeout()),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowRunTimeout,
wh.config.MaxWorkflowRunTimeout,
),
)

request.WorkflowTaskTimeout = timestamp.DurationPtr(
common.GetWorkflowTaskTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowTaskTimeout()),
wh.config.DefaultWorkflowTaskTimeout,
),
)

return nil
}

func (wh *WorkflowHandler) validateSignalWithStartWorkflowTimeouts(
scope metrics.Scope,
request *workflowservice.SignalWithStartWorkflowExecutionRequest) error {
if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 {
return wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 {
return wh.error(errInvalidWorkflowRunTimeoutSeconds, scope)
}

if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 {
return wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope)
}

request.WorkflowExecutionTimeout = timestamp.DurationPtr(
common.GetWorkflowExecutionTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowExecutionTimeout,
wh.config.MaxWorkflowExecutionTimeout,
),
)

request.WorkflowRunTimeout = timestamp.DurationPtr(
common.GetWorkflowRunTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowRunTimeout()),
timestamp.DurationValue(request.GetWorkflowExecutionTimeout()),
wh.config.DefaultWorkflowRunTimeout,
wh.config.MaxWorkflowRunTimeout,
),
)

request.WorkflowTaskTimeout = timestamp.DurationPtr(
common.GetWorkflowTaskTimeout(
request.GetNamespace(),
timestamp.DurationValue(request.GetWorkflowTaskTimeout()),
wh.config.DefaultWorkflowTaskTimeout,
),
)

return nil
}
10 changes: 5 additions & 5 deletions service/history/service.go
Expand Up @@ -273,7 +273,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
VisibilityOpenMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryVisibilityOpenMaxQPS, 300),
VisibilityClosedMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryVisibilityClosedMaxQPS, 300),
MaxAutoResetPoints: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxAutoResetPoints, defaultHistoryMaxAutoResetPoints),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, time.Second*10),
DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout),
MaxWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowTaskTimeout, time.Second*60),
AdvancedVisibilityWritingMode: dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, common.GetDefaultAdvancedVisibilityWritingMode(isAdvancedVisConfigExist)),
EmitShardDiffLog: dc.GetBoolProperty(dynamicconfig.EmitShardDiffLog, false),
Expand Down Expand Up @@ -387,10 +387,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365),
WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30),
DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, time.Hour*24*365*10),
DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, time.Hour*24*365*10),
MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, time.Hour*24*365*10),
MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, time.Hour*24*365*10),
DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout),
MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, common.DefaultWorkflowRunTimeout),
ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 1),
ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second),
ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15),
Expand Down
30 changes: 13 additions & 17 deletions service/history/workflowExecutionUtil.go
Expand Up @@ -230,26 +230,22 @@ func terminateWorkflow(
}

func getWorkflowExecutionTimeout(namespace string, requestedTimeout time.Duration, serviceConfig *Config) time.Duration {
executionTimeoutSeconds := requestedTimeout
if executionTimeoutSeconds == 0 {
executionTimeoutSeconds = timestamp.RoundUp(serviceConfig.DefaultWorkflowExecutionTimeout(namespace))
}
maxWorkflowExecutionTimeout := timestamp.RoundUp(serviceConfig.MaxWorkflowExecutionTimeout(namespace))
executionTimeoutSeconds = timestamp.MinDuration(executionTimeoutSeconds, maxWorkflowExecutionTimeout)

return executionTimeoutSeconds
return common.GetWorkflowExecutionTimeout(
namespace,
requestedTimeout,
serviceConfig.DefaultWorkflowExecutionTimeout,
serviceConfig.MaxWorkflowExecutionTimeout,
)
}

func getWorkflowRunTimeout(namespace string, requestedTimeout, executionTimeout time.Duration, serviceConfig *Config) time.Duration {
runTimeoutSeconds := requestedTimeout
if runTimeoutSeconds == 0 {
runTimeoutSeconds = timestamp.RoundUp(serviceConfig.DefaultWorkflowRunTimeout(namespace))
}
maxWorkflowRunTimeout := timestamp.RoundUp(serviceConfig.MaxWorkflowRunTimeout(namespace))
runTimeoutSeconds = timestamp.MinDuration(runTimeoutSeconds, maxWorkflowRunTimeout)
runTimeoutSeconds = timestamp.MinDuration(runTimeoutSeconds, executionTimeout)

return runTimeoutSeconds
return common.GetWorkflowRunTimeout(
namespace,
requestedTimeout,
executionTimeout,
serviceConfig.DefaultWorkflowRunTimeout,
serviceConfig.MaxWorkflowRunTimeout,
)
}

// FindAutoResetPoint returns the auto reset point
Expand Down

0 comments on commit 8fd6fd3

Please sign in to comment.