diff --git a/common/constants.go b/common/constants.go index 66ebf4c6252..e5061b48063 100644 --- a/common/constants.go +++ b/common/constants.go @@ -107,6 +107,22 @@ const ( MaxWorkflowRetentionPeriod = 30 * time.Hour * 24 ) +const ( + // DefaultWorkflowExecutionTimeout is the Default Workflow Execution timeout applied to a Workflow when + // this value is not explicitly set by the user on a Start Workflow request + // Intention is 10 years + DefaultWorkflowExecutionTimeout = 24 * 365 * 10 * time.Hour + + // DefaultWorkflowRunTimeout is the Default Workflow Run timeout applied to a Workflow when + // this value is not explicitly set by the user on a Start Workflow request + // Intention is 10 years + DefaultWorkflowRunTimeout = 24 * 365 * 10 * time.Hour + + // DefaultWorkflowTaskTimeout sets the Default Workflow Task timeout for a Workflow + // when the value is not explicitly set by the user. Intention is 10 seconds. + DefaultWorkflowTaskTimeout = 10 * time.Second +) + const ( // DefaultTransactionSizeLimit is the largest allowed transaction size to persistence DefaultTransactionSizeLimit = 14 * 1024 * 1024 diff --git a/common/util.go b/common/util.go index 2ab8279994b..d389e7d01d5 100644 --- a/common/util.go +++ b/common/util.go @@ -50,6 +50,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/service/dynamicconfig" serviceerrors "go.temporal.io/server/common/serviceerror" ) @@ -668,3 +669,54 @@ func GetPayloadsMapSize(data map[string]*commonpb.Payloads) int { return size } + +// GetWorkflowExecutionTimeout gets the default allowed execution timeout or truncates the requested value to the maximum allowed timeout +func GetWorkflowExecutionTimeout( + namespace string, + requestedTimeout time.Duration, + getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter, + getMaxAllowedTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration { + + if requestedTimeout == 0 { + requestedTimeout = getDefaultTimeoutFunc(namespace) + } + + return timestamp.MinDuration( + requestedTimeout, + getMaxAllowedTimeoutFunc(namespace), + ) +} + +// GetWorkflowRunTimeout gets the default allowed run timeout or truncates the requested value to the maximum allowed timeout +func GetWorkflowRunTimeout( + namespace string, + requestedTimeout time.Duration, + executionTimeout time.Duration, + getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter, + getMaxAllowedTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration { + + if requestedTimeout == 0 { + requestedTimeout = getDefaultTimeoutFunc(namespace) + } + + return timestamp.MinDuration( + timestamp.MinDuration( + requestedTimeout, + executionTimeout, + ), + getMaxAllowedTimeoutFunc(namespace), + ) +} + +// GetWorkflowTaskTimeout gets the default allowed execution timeout or truncates the requested value to the maximum allowed timeout +func GetWorkflowTaskTimeout( + namespace string, + requestedTimeout time.Duration, + getDefaultTimeoutFunc dynamicconfig.DurationPropertyFnWithNamespaceFilter) time.Duration { + + if requestedTimeout == 0 { + return getDefaultTimeoutFunc(namespace) + } + + return requestedTimeout +} diff --git a/service/frontend/service.go b/service/frontend/service.go index cfb5130c91a..73d6f8cd6df 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -108,6 +108,19 @@ type Config struct { EnableRPCReplication dynamicconfig.BoolPropertyFn EnableCleanupReplicationTask dynamicconfig.BoolPropertyFn + + // The execution timeout a workflow execution defaults to if not specified + DefaultWorkflowExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + // The run timeout a workflow run defaults to if not specified + DefaultWorkflowRunTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + + // The execution timeout a workflow execution defaults to if not specified + MaxWorkflowExecutionTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + // The run timeout a workflow run defaults to if not specified + MaxWorkflowRunTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter + + // DefaultWorkflowTaskTimeout the default workflow task timeout + DefaultWorkflowTaskTimeout dynamicconfig.DurationPropertyFnWithNamespaceFilter } // NewConfig returns new service config with default values @@ -149,6 +162,11 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false), EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true), DefaultWorkflowRetryPolicy: dc.GetMapPropertyFnWithNamespaceFilter(dynamicconfig.DefaultWorkflowRetryPolicy, common.GetDefaultRetryPolicyConfigOptions()), + DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout), + DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, common.DefaultWorkflowRunTimeout), + MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout), + MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, common.DefaultWorkflowRunTimeout), + DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), } } diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 74417e73f0b..d61da886135 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -452,16 +452,8 @@ func (wh *WorkflowHandler) StartWorkflowExecution(ctx context.Context, request * return nil, err } - if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 { - return nil, wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope) - } - - if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 { - return nil, wh.error(errInvalidWorkflowRunTimeoutSeconds, scope) - } - - if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 { - return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope) + if err := wh.validateStartWorkflowTimeouts(scope, request); err != nil { + return nil, err } if request.GetRequestId() == "" { @@ -2144,16 +2136,8 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context, return nil, wh.error(errRequestIDTooLong, scope) } - if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 { - return nil, wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope) - } - - if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 { - return nil, wh.error(errInvalidWorkflowRunTimeoutSeconds, scope) - } - - if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 { - return nil, wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope) + if err := wh.validateSignalWithStartWorkflowTimeouts(scope, request); err != nil { + return nil, err } if err := wh.validateRetryPolicy(request.GetNamespace(), request.RetryPolicy); err != nil { @@ -3755,3 +3739,93 @@ func (wh *WorkflowHandler) validateRetryPolicy(namespace string, retryPolicy *co common.EnsureRetryPolicyDefaults(retryPolicy, defaultWorkflowRetrySettings) return common.ValidateRetryPolicy(retryPolicy) } + +func (wh *WorkflowHandler) validateStartWorkflowTimeouts( + scope metrics.Scope, + request *workflowservice.StartWorkflowExecutionRequest) error { + if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 { + return wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope) + } + + if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 { + return wh.error(errInvalidWorkflowRunTimeoutSeconds, scope) + } + + if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 { + return wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope) + } + + request.WorkflowExecutionTimeout = timestamp.DurationPtr( + common.GetWorkflowExecutionTimeout( + request.GetNamespace(), + timestamp.DurationValue(request.GetWorkflowExecutionTimeout()), + wh.config.DefaultWorkflowExecutionTimeout, + wh.config.MaxWorkflowExecutionTimeout, + ), + ) + + request.WorkflowRunTimeout = timestamp.DurationPtr( + common.GetWorkflowRunTimeout( + request.GetNamespace(), + timestamp.DurationValue(request.GetWorkflowRunTimeout()), + timestamp.DurationValue(request.GetWorkflowExecutionTimeout()), + wh.config.DefaultWorkflowRunTimeout, + wh.config.MaxWorkflowRunTimeout, + ), + ) + + request.WorkflowTaskTimeout = timestamp.DurationPtr( + common.GetWorkflowTaskTimeout( + request.GetNamespace(), + timestamp.DurationValue(request.GetWorkflowTaskTimeout()), + wh.config.DefaultWorkflowTaskTimeout, + ), + ) + + return nil +} + +func (wh *WorkflowHandler) validateSignalWithStartWorkflowTimeouts( + scope metrics.Scope, + request *workflowservice.SignalWithStartWorkflowExecutionRequest) error { + if timestamp.DurationValue(request.GetWorkflowExecutionTimeout()) < 0 { + return wh.error(errInvalidWorkflowExecutionTimeoutSeconds, scope) + } + + if timestamp.DurationValue(request.GetWorkflowRunTimeout()) < 0 { + return wh.error(errInvalidWorkflowRunTimeoutSeconds, scope) + } + + if timestamp.DurationValue(request.GetWorkflowTaskTimeout()) < 0 { + return wh.error(errInvalidWorkflowTaskTimeoutSeconds, scope) + } + + request.WorkflowExecutionTimeout = timestamp.DurationPtr( + common.GetWorkflowExecutionTimeout( + request.GetNamespace(), + timestamp.DurationValue(request.GetWorkflowExecutionTimeout()), + wh.config.DefaultWorkflowExecutionTimeout, + wh.config.MaxWorkflowExecutionTimeout, + ), + ) + + request.WorkflowRunTimeout = timestamp.DurationPtr( + common.GetWorkflowRunTimeout( + request.GetNamespace(), + timestamp.DurationValue(request.GetWorkflowRunTimeout()), + timestamp.DurationValue(request.GetWorkflowExecutionTimeout()), + wh.config.DefaultWorkflowRunTimeout, + wh.config.MaxWorkflowRunTimeout, + ), + ) + + request.WorkflowTaskTimeout = timestamp.DurationPtr( + common.GetWorkflowTaskTimeout( + request.GetNamespace(), + timestamp.DurationValue(request.GetWorkflowTaskTimeout()), + wh.config.DefaultWorkflowTaskTimeout, + ), + ) + + return nil +} diff --git a/service/history/service.go b/service/history/service.go index 3df1c9e326e..d7c009f749e 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -273,7 +273,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin VisibilityOpenMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryVisibilityOpenMaxQPS, 300), VisibilityClosedMaxQPS: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryVisibilityClosedMaxQPS, 300), MaxAutoResetPoints: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.HistoryMaxAutoResetPoints, defaultHistoryMaxAutoResetPoints), - DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, time.Second*10), + DefaultWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowTaskTimeout, common.DefaultWorkflowTaskTimeout), MaxWorkflowTaskTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowTaskTimeout, time.Second*60), AdvancedVisibilityWritingMode: dc.GetStringProperty(dynamicconfig.AdvancedVisibilityWritingMode, common.GetDefaultAdvancedVisibilityWritingMode(isAdvancedVisConfigExist)), EmitShardDiffLog: dc.GetBoolProperty(dynamicconfig.EmitShardDiffLog, false), @@ -387,10 +387,10 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024), StickyTTL: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.StickyTTL, time.Hour*24*365), WorkflowTaskHeartbeatTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.WorkflowTaskHeartbeatTimeout, time.Minute*30), - DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, time.Hour*24*365*10), - DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, time.Hour*24*365*10), - MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, time.Hour*24*365*10), - MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, time.Hour*24*365*10), + DefaultWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout), + DefaultWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.DefaultWorkflowRunTimeout, common.DefaultWorkflowRunTimeout), + MaxWorkflowExecutionTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowExecutionTimeout, common.DefaultWorkflowExecutionTimeout), + MaxWorkflowRunTimeout: dc.GetDurationPropertyFilteredByNamespace(dynamicconfig.MaxWorkflowRunTimeout, common.DefaultWorkflowRunTimeout), ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 1), ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second), ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15), diff --git a/service/history/workflowExecutionUtil.go b/service/history/workflowExecutionUtil.go index fc005911b3f..4c2cf9a20ea 100644 --- a/service/history/workflowExecutionUtil.go +++ b/service/history/workflowExecutionUtil.go @@ -230,26 +230,22 @@ func terminateWorkflow( } func getWorkflowExecutionTimeout(namespace string, requestedTimeout time.Duration, serviceConfig *Config) time.Duration { - executionTimeoutSeconds := requestedTimeout - if executionTimeoutSeconds == 0 { - executionTimeoutSeconds = timestamp.RoundUp(serviceConfig.DefaultWorkflowExecutionTimeout(namespace)) - } - maxWorkflowExecutionTimeout := timestamp.RoundUp(serviceConfig.MaxWorkflowExecutionTimeout(namespace)) - executionTimeoutSeconds = timestamp.MinDuration(executionTimeoutSeconds, maxWorkflowExecutionTimeout) - - return executionTimeoutSeconds + return common.GetWorkflowExecutionTimeout( + namespace, + requestedTimeout, + serviceConfig.DefaultWorkflowExecutionTimeout, + serviceConfig.MaxWorkflowExecutionTimeout, + ) } func getWorkflowRunTimeout(namespace string, requestedTimeout, executionTimeout time.Duration, serviceConfig *Config) time.Duration { - runTimeoutSeconds := requestedTimeout - if runTimeoutSeconds == 0 { - runTimeoutSeconds = timestamp.RoundUp(serviceConfig.DefaultWorkflowRunTimeout(namespace)) - } - maxWorkflowRunTimeout := timestamp.RoundUp(serviceConfig.MaxWorkflowRunTimeout(namespace)) - runTimeoutSeconds = timestamp.MinDuration(runTimeoutSeconds, maxWorkflowRunTimeout) - runTimeoutSeconds = timestamp.MinDuration(runTimeoutSeconds, executionTimeout) - - return runTimeoutSeconds + return common.GetWorkflowRunTimeout( + namespace, + requestedTimeout, + executionTimeout, + serviceConfig.DefaultWorkflowRunTimeout, + serviceConfig.MaxWorkflowRunTimeout, + ) } // FindAutoResetPoint returns the auto reset point