Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add API to set custom status ([#81](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting)
- Add missing purge orchestration options ([#82](https://github.com/microsoft/durabletask-go/pull/82)) - by [@famarting](https://github.com/famarting)
- Add support for activity retry policies ([#83](https://github.com/microsoft/durabletask-go/pull/83)) - by [@famarting](https://github.com/famarting)
- Add support for sub-orchestration retry policies ([#84](https://github.com/microsoft/durabletask-go/pull/84)) - by [@famarting](https://github.com/famarting)

### Changed

Expand Down
5 changes: 4 additions & 1 deletion backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
for _, msg := range wi.State.PendingMessages() {
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil {
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, backend.WithOrchestrationIdReusePolicy(&protos.OrchestrationIdReusePolicy{
OperationStatus: []protos.OrchestrationStatus{protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED},
Action: api.REUSE_ID_ACTION_TERMINATE,
})); err != nil {
if err == backend.ErrDuplicateEvent {
be.logger.Warnf(
"%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.",
Expand Down
2 changes: 1 addition & 1 deletion samples/retries/retries.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, bac
}

func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
if err := ctx.CallActivity(RandomFailActivity, task.WithRetryPolicy(&task.ActivityRetryPolicy{
if err := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 10,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
Expand Down
54 changes: 31 additions & 23 deletions task/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type callActivityOption func(*callActivityOptions) error

type callActivityOptions struct {
rawInput *wrapperspb.StringValue
retryPolicy *ActivityRetryPolicy
retryPolicy *RetryPolicy
}

type ActivityRetryPolicy struct {
type RetryPolicy struct {
// Max number of attempts to try the activity call, first execution inclusive
MaxAttempts int
// Timespan to wait for the first retry
Expand All @@ -32,6 +32,31 @@ type ActivityRetryPolicy struct {
Handle func(error) bool
}

func (policy *RetryPolicy) Validate() error {
if policy.InitialRetryInterval <= 0 {
return fmt.Errorf("InitialRetryInterval must be greater than 0")
}
if policy.MaxAttempts <= 0 {
// setting 1 max attempt is equivalent to not retrying
policy.MaxAttempts = 1
}
if policy.BackoffCoefficient <= 0 {
policy.BackoffCoefficient = 1
}
if policy.MaxRetryInterval <= 0 {
policy.MaxRetryInterval = math.MaxInt64
}
if policy.RetryTimeout <= 0 {
policy.RetryTimeout = math.MaxInt64
}
if policy.Handle == nil {
policy.Handle = func(err error) bool {
return true
}
}
return nil
}

// WithActivityInput configures an input for an activity invocation.
// The specified input must be JSON serializable.
func WithActivityInput(input any) callActivityOption {
Expand All @@ -53,31 +78,14 @@ func WithRawActivityInput(input string) callActivityOption {
}
}

func WithRetryPolicy(policy *ActivityRetryPolicy) callActivityOption {
func WithActivityRetryPolicy(policy *RetryPolicy) callActivityOption {
return func(opt *callActivityOptions) error {
if policy == nil {
return nil
}
if policy.InitialRetryInterval <= 0 {
return fmt.Errorf("InitialRetryInterval must be greater than 0")
}
if policy.MaxAttempts <= 0 {
// setting 1 max attempt is equivalent to not retrying
policy.MaxAttempts = 1
}
if policy.BackoffCoefficient <= 0 {
policy.BackoffCoefficient = 1
}
if policy.MaxRetryInterval <= 0 {
policy.MaxRetryInterval = math.MaxInt64
}
if policy.RetryTimeout <= 0 {
policy.RetryTimeout = math.MaxInt64
}
if policy.Handle == nil {
policy.Handle = func(err error) bool {
return true
}
err := policy.Validate()
if err != nil {
return err
}
opt.retryPolicy = policy
return nil
Expand Down
80 changes: 53 additions & 27 deletions task/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type OrchestrationContext struct {
type callSubOrchestratorOptions struct {
instanceID string
rawInput *wrapperspb.StringValue

retryPolicy *RetryPolicy
}

// subOrchestratorOption is a functional option type for the CallSubOrchestrator orchestrator method.
Expand Down Expand Up @@ -96,6 +98,20 @@ func WithSubOrchestrationInstanceID(instanceID string) subOrchestratorOption {
}
}

func WithSubOrchestrationRetryPolicy(policy *RetryPolicy) subOrchestratorOption {
return func(opt *callSubOrchestratorOptions) error {
if policy == nil {
return nil
}
err := policy.Validate()
if err != nil {
return err
}
opt.retryPolicy = policy
return nil
}
}

// NewOrchestrationContext returns a new [OrchestrationContext] struct with the specified parameters.
func NewOrchestrationContext(registry *TaskRegistry, id api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) *OrchestrationContext {
return &OrchestrationContext{
Expand Down Expand Up @@ -238,7 +254,7 @@ func (ctx *OrchestrationContext) CallActivity(activity interface{}, opts ...call
}

if options.retryPolicy != nil {
return ctx.internalCallActivityWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalScheduleTaskWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalScheduleActivity(activity, options)
}, *options.retryPolicy, 0)
}
Expand All @@ -259,7 +275,40 @@ func (ctx *OrchestrationContext) internalScheduleActivity(activity interface{},
return task
}

func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt time.Time, schedule func() Task, policy ActivityRetryPolicy, retryCount int) Task {
func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
options := new(callSubOrchestratorOptions)
for _, configure := range opts {
if err := configure(options); err != nil {
failedTask := newTask(ctx)
failedTask.fail(helpers.NewTaskFailureDetails(err))
return failedTask
}
}

if options.retryPolicy != nil {
return ctx.internalScheduleTaskWithRetries(ctx.CurrentTimeUtc, func() Task {
return ctx.internalCallSubOrchestrator(orchestrator, options)
}, *options.retryPolicy, 0)
}

return ctx.internalCallSubOrchestrator(orchestrator, options)
}

func (ctx *OrchestrationContext) internalCallSubOrchestrator(orchestrator interface{}, options *callSubOrchestratorOptions) Task {
createSubOrchestrationAction := helpers.NewCreateSubOrchestrationAction(
ctx.getNextSequenceNumber(),
helpers.GetTaskFunctionName(orchestrator),
options.instanceID,
options.rawInput,
)
ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction

task := newTask(ctx)
ctx.pendingTasks[createSubOrchestrationAction.Id] = task
return task
}

func (ctx *OrchestrationContext) internalScheduleTaskWithRetries(initialAttempt time.Time, schedule func() Task, policy RetryPolicy, retryCount int) Task {
return &taskWrapper{
delegate: schedule(),
onAwaitResult: func(v any, err error) error {
Expand All @@ -283,7 +332,7 @@ func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt
return fmt.Errorf("%v %w", timerErr, err)
}

err = ctx.internalCallActivityWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v)
err = ctx.internalScheduleTaskWithRetries(initialAttempt, schedule, policy, retryCount+1).Await(v)
if err == nil {
return nil
}
Expand All @@ -292,7 +341,7 @@ func (ctx *OrchestrationContext) internalCallActivityWithRetries(initialAttempt
}
}

func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration {
func computeNextDelay(currentTimeUtc time.Time, policy RetryPolicy, attempt int, firstAttempt time.Time, err error) time.Duration {
if policy.Handle(err) {
isExpired := false
if policy.RetryTimeout != math.MaxInt64 {
Expand All @@ -309,29 +358,6 @@ func computeNextDelay(currentTimeUtc time.Time, policy ActivityRetryPolicy, atte
return 0
}

func (ctx *OrchestrationContext) CallSubOrchestrator(orchestrator interface{}, opts ...subOrchestratorOption) Task {
options := new(callSubOrchestratorOptions)
for _, configure := range opts {
if err := configure(options); err != nil {
failedTask := newTask(ctx)
failedTask.fail(helpers.NewTaskFailureDetails(err))
return failedTask
}
}

createSubOrchestrationAction := helpers.NewCreateSubOrchestrationAction(
ctx.getNextSequenceNumber(),
helpers.GetTaskFunctionName(orchestrator),
options.instanceID,
options.rawInput,
)
ctx.pendingActions[createSubOrchestrationAction.Id] = createSubOrchestrationAction

task := newTask(ctx)
ctx.pendingTasks[createSubOrchestrationAction.Id] = task
return task
}

// CreateTimer schedules a durable timer that expires after the specified delay.
func (ctx *OrchestrationContext) CreateTimer(delay time.Duration) Task {
return ctx.createTimerInternal(delay)
Expand Down
14 changes: 7 additions & 7 deletions task/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func Test_computeNextDelay(t *testing.T) {
time2 := time.Now().Add(1 * time.Minute)
type args struct {
currentTimeUtc time.Time
policy ActivityRetryPolicy
policy RetryPolicy
attempt int
firstAttempt time.Time
err error
Expand All @@ -24,7 +24,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "first attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -41,7 +41,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "second attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -58,7 +58,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "third attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -75,7 +75,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "fourth attempt",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -92,7 +92,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "expired",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 2,
Expand All @@ -109,7 +109,7 @@ func Test_computeNextDelay(t *testing.T) {
name: "fourth attempt backoff 1",
args: args{
currentTimeUtc: time2,
policy: ActivityRetryPolicy{
policy: RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 2 * time.Second,
BackoffCoefficient: 1,
Expand Down
35 changes: 34 additions & 1 deletion tests/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func Test_Grpc_ReuseInstanceIDError(t *testing.T) {
func Test_Grpc_ActivityRetries(t *testing.T) {
r := task.NewTaskRegistry()
r.AddOrchestratorN("ActivityRetries", func(ctx *task.OrchestrationContext) (any, error) {
if err := ctx.CallActivity("FailActivity", task.WithRetryPolicy(&task.ActivityRetryPolicy{
if err := ctx.CallActivity("FailActivity", task.WithActivityRetryPolicy(&task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 10 * time.Millisecond,
})).Await(nil); err != nil {
Expand All @@ -453,3 +453,36 @@ func Test_Grpc_ActivityRetries(t *testing.T) {
// With 3 max attempts there will be two retries with 10 millis delay before each
require.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt.Add(2*10*time.Millisecond))
}

func Test_Grpc_SubOrchestratorRetries(t *testing.T) {
r := task.NewTaskRegistry()
r.AddOrchestratorN("Parent", func(ctx *task.OrchestrationContext) (any, error) {
err := ctx.CallSubOrchestrator(
"Child",
task.WithSubOrchestrationInstanceID(string(ctx.ID)+"_child"),
task.WithSubOrchestrationRetryPolicy(&task.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 10 * time.Millisecond,
BackoffCoefficient: 2,
})).Await(nil)
return nil, err
})
r.AddOrchestratorN("Child", func(ctx *task.OrchestrationContext) (any, error) {
return nil, errors.New("Child failed")
})

cancelListener := startGrpcListener(t, r)
defer cancelListener()
instanceID := api.InstanceID("orchestrator_retries")

id, err := grpcClient.ScheduleNewOrchestration(ctx, "Parent", api.WithInstanceID(instanceID))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := grpcClient.WaitForOrchestrationCompletion(timeoutCtx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
assert.Equal(t, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, metadata.RuntimeStatus)
// With 3 max attempts there will be two retries with 10 millis delay before each
require.GreaterOrEqual(t, metadata.LastUpdatedAt, metadata.CreatedAt.Add(2*10*time.Millisecond))
}
Loading