Skip to content

Commit

Permalink
Introduce a new constructor to override retry policy in ContinueAsNew…
Browse files Browse the repository at this point in the history
…Error (#1383)
  • Loading branch information
jiezhang committed Feb 14, 2024
1 parent d39fa9d commit 30da688
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 4 deletions.
33 changes: 33 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@ type (
// VersioningIntent specifies whether the continued workflow should run on a worker with a
// compatible build ID or not. See VersioningIntent.
VersioningIntent VersioningIntent

// This is by default nil but may be overridden using NewContinueAsNewErrorWithOptions.
// It specifies the retry policy which gets carried over to the next run.
// If not set, the current workflow's retry policy will be carried over automatically.
//
// NOTES:
// 1. This is always nil when returned from a client as a workflow response.
// 2. Unlike other options that can be overridden using WithWorkflowTaskQueue, WithWorkflowRunTimeout, etc.
// we can't introduce an option, say WithWorkflowRetryPolicy, for backward compatibility.
// See #676 or IntegrationTestSuite::TestContinueAsNewWithWithChildWF for more details.
RetryPolicy *RetryPolicy
}

// ContinueAsNewErrorOptions specifies optional attributes to be carried over to the next run.
ContinueAsNewErrorOptions struct {
// RetryPolicy specifies the retry policy to be used for the next run.
// If nil, the current workflow's retry policy will be used.
RetryPolicy *RetryPolicy
}

// UnknownExternalWorkflowExecutionError can be returned when external workflow doesn't exist
Expand Down Expand Up @@ -459,6 +477,20 @@ func NewContinueAsNewError(ctx Context, wfn interface{}, args ...interface{}) er
return i.NewContinueAsNewError(ctx, wfn, args...)
}

// NewContinueAsNewErrorWithOptions creates ContinueAsNewError instance with additional options.
func NewContinueAsNewErrorWithOptions(ctx Context, options ContinueAsNewErrorOptions, wfn interface{}, args ...interface{}) error {
err := NewContinueAsNewError(ctx, wfn, args...)

var continueAsNewErr *ContinueAsNewError
if errors.As(err, &continueAsNewErr) {
if options.RetryPolicy != nil {
continueAsNewErr.RetryPolicy = options.RetryPolicy
}
}

return err
}

func (wc *workflowEnvironmentInterceptor) NewContinueAsNewError(
ctx Context,
wfn interface{},
Expand Down Expand Up @@ -489,6 +521,7 @@ func (wc *workflowEnvironmentInterceptor) NewContinueAsNewError(
WorkflowRunTimeout: options.WorkflowRunTimeout,
WorkflowTaskTimeout: options.WorkflowTaskTimeout,
VersioningIntent: options.VersioningIntent,
RetryPolicy: nil, // The retry policy can't be propagated like other options due to #676.
}
}

Expand Down
77 changes: 75 additions & 2 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ import (
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"

"go.temporal.io/sdk/converter"
ilog "go.temporal.io/sdk/internal/log"
)
Expand Down Expand Up @@ -565,6 +564,80 @@ func Test_ContinueAsNewError(t *testing.T) {
require.Equal(t, a1, intArg)
require.Equal(t, a2, stringArg)
require.Equal(t, header, continueAsNewErr.Header)

require.Nil(t, continueAsNewErr.RetryPolicy)
}

func Test_ContinueAsNewErrorWithOptions(t *testing.T) {
const (
a1 = 1234
a2 = "some random input"
continueAsNewWfName = "continueAsNewWorkflowFn"
initialInterval = 2 * time.Second
backoffCoefficient = 1.1
maximumAttempts int32 = 23
maximumInterval = time.Minute
)

require := require.New(t)
continueAsNewWorkflowFn := func(ctx Context, testInt int, testString string) error {
err := NewContinueAsNewErrorWithOptions(
ctx,
ContinueAsNewErrorOptions{RetryPolicy: &RetryPolicy{
BackoffCoefficient: backoffCoefficient,
InitialInterval: initialInterval,
MaximumAttempts: maximumAttempts,
MaximumInterval: maximumInterval,
}},
continueAsNewWfName,
a1,
a2,
)

continueAsNewErr := err.(*ContinueAsNewError)
if continueAsNewErr.RetryPolicy == nil {
return errors.New("retry policy is nil")
}

if continueAsNewErr.RetryPolicy.MaximumAttempts != maximumAttempts {
return errors.New("retry policy maximum attempts is not set")
}

return err
}

s := &WorkflowTestSuite{}
wfEnv := s.NewTestWorkflowEnvironment()
wfEnv.RegisterWorkflowWithOptions(continueAsNewWorkflowFn, RegisterWorkflowOptions{
Name: continueAsNewWfName,
})
wfEnv.ExecuteWorkflow(continueAsNewWorkflowFn, 101, "another random string")

err := wfEnv.GetWorkflowError()

require.Error(err)
var workflowErr *WorkflowExecutionError
require.True(errors.As(err, &workflowErr))

err = errors.Unwrap(workflowErr)
var continueAsNewErr *ContinueAsNewError
require.True(errors.As(err, &continueAsNewErr))
require.Equal(continueAsNewWfName, continueAsNewErr.WorkflowType.Name)

input := continueAsNewErr.Input
var intArg int
var stringArg string
dataConverter := converter.GetDefaultDataConverter()
err = dataConverter.FromPayloads(input, &intArg, &stringArg)
require.NoError(err)
require.Equal(a1, intArg)
require.Equal(a2, stringArg)

require.NotNil(continueAsNewErr.RetryPolicy)
require.Equal(backoffCoefficient, continueAsNewErr.RetryPolicy.BackoffCoefficient)
require.Equal(initialInterval, continueAsNewErr.RetryPolicy.InitialInterval)
require.Equal(maximumAttempts, continueAsNewErr.RetryPolicy.MaximumAttempts)
require.Equal(maximumInterval, continueAsNewErr.RetryPolicy.MaximumInterval)
}

type coolError struct{}
Expand Down
9 changes: 8 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,6 +1715,13 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
metricsHandler.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION)

// ContinueAsNewError.RetryPolicy is optional.
// If not set, use the retry policy from the workflow context.
retryPolicy := contErr.RetryPolicy
if retryPolicy == nil {
retryPolicy = workflowContext.workflowInfo.RetryPolicy
}

useCompat := determineUseCompatibleFlagForCommand(
contErr.VersioningIntent, workflowContext.workflowInfo.TaskQueueName, contErr.TaskQueueName)
closeCommand.Attributes = &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{
Expand All @@ -1726,7 +1733,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
Header: contErr.Header,
Memo: workflowContext.workflowInfo.Memo,
SearchAttributes: workflowContext.workflowInfo.SearchAttributes,
RetryPolicy: convertToPBRetryPolicy(workflowContext.workflowInfo.RetryPolicy),
RetryPolicy: convertToPBRetryPolicy(retryPolicy),
UseCompatibleVersion: useCompat,
}}
} else if workflowContext.err != nil {
Expand Down
91 changes: 91 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,57 @@ func (ts *IntegrationTestSuite) TestContinueAsNewCarryOver() {
ts.Equal("memoVal,searchAttr,123", result)
}

func (ts *IntegrationTestSuite) TestContinueAsNewWithRetryPolicy() {
const (
initialMaximumAttempts = 3
newMaximumAttempts = 100
iterations = 4
)

var result string
startOptions := ts.startWorkflowOptions("test-continueasnew-with-retry-policy")
startOptions.RetryPolicy = &temporal.RetryPolicy{
MaximumAttempts: initialMaximumAttempts,
}
err := ts.executeWorkflowWithOption(
startOptions,
ts.workflows.ContinueAsNewWithRetryPolicy,
&result,
initialMaximumAttempts,
newMaximumAttempts,
initialMaximumAttempts,
iterations,
)
ts.NoError(err)
ts.Equal(fmt.Sprintf("End of workflow: %v", newMaximumAttempts), result)

expectedActivities := make([]string, iterations+1)
for i := 0; i <= iterations; i++ {
expectedActivities[i] = "toUpper"
}
ts.EqualValues(expectedActivities, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestContinueAsNewWithWithChildWF() {
const (
iterations = 6
)

err := ts.executeWorkflow(
"test-continueasnew-with-child-wf",
ts.workflows.ContinueAsNewWithChildWF,
nil,
iterations,
)
ts.NoError(err)

expectedActivities := make([]string, iterations+1)
for i := 0; i <= iterations; i++ {
expectedActivities[i] = "toUpper"
}
ts.EqualValues(expectedActivities, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestCancellation() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
Expand Down Expand Up @@ -873,6 +924,46 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseIgnoreDuplicateWhileRunning()
ts.NotEqual(run1.GetRunID(), run3.GetRunID())
}

func (ts *IntegrationTestSuite) TestChildWFWithRetryPolicy_ShortLived() {
ts.testChildWFWithRetryPolicy(ts.workflows.ChildWorkflowWithRetryPolicy, 0)
}

func (ts *IntegrationTestSuite) TestChildWFWithRetryPolicy_LongRunning() {
ts.testChildWFWithRetryPolicy(ts.workflows.ChildWorkflowWithRetryPolicy, 4)
}

func (ts *IntegrationTestSuite) TestChildWFWithRetryPolicy_LongRunningWithCustomRetry() {
ts.testChildWFWithRetryPolicy(ts.workflows.ChildWorkflowWithCustomRetryPolicy, 6)
}

func (ts *IntegrationTestSuite) testChildWFWithRetryPolicy(wfFunc interface{}, iterations int) {
const (
parentWorkflowMaximumAttempts = 3
)

startOptions := ts.startWorkflowOptions("test-childwf-with-retry-policy")
startOptions.RetryPolicy = &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second,
MaximumAttempts: parentWorkflowMaximumAttempts,
}
err := ts.executeWorkflowWithOption(
startOptions,
wfFunc,
nil,
parentWorkflowMaximumAttempts,
iterations,
)
ts.NoError(err)

expectedActivities := make([]string, iterations+1)
for i := 0; i <= iterations; i++ {
expectedActivities[i] = "toUpper"
}
ts.EqualValues(expectedActivities, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestChildWFRetryOnError() {
err := ts.executeWorkflow("test-childwf-retry-on-error", ts.workflows.ChildWorkflowRetryOnError, nil)
ts.Error(err)
Expand Down
Loading

0 comments on commit 30da688

Please sign in to comment.