Skip to content

Commit

Permalink
Allow customizing retry behavior for timeout failure (#2524)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Feb 22, 2022
1 parent 0a76b2a commit f6acc48
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 9 deletions.
8 changes: 8 additions & 0 deletions common/constants.go
Expand Up @@ -111,3 +111,11 @@ const (
// DefaultTransactionSizeLimit is the largest allowed transaction size to persistence
DefaultTransactionSizeLimit = 4 * 1024 * 1024
)

const (
// TimeoutFailureTypePrefix is the prefix for timeout failure types
// used in retry policy
// the actual failure type will be prefix + enums.TimeoutType.String()
// e.g. "TemporalTimeout:StartToClose" or "TemporalTimeout:Heartbeat"
TimeoutFailureTypePrefix = "TemporalTimeout:"
)
12 changes: 12 additions & 0 deletions common/util.go
Expand Up @@ -30,6 +30,7 @@ import (
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -471,6 +472,17 @@ func ValidateRetryPolicy(policy *commonpb.RetryPolicy) error {
if policy.GetMaximumAttempts() < 0 {
return serviceerror.NewInvalidArgument("MaximumAttempts cannot be negative on retry policy.")
}

for _, nrt := range policy.NonRetryableErrorTypes {
if strings.HasPrefix(nrt, TimeoutFailureTypePrefix) {
timeoutTypeValue := nrt[len(TimeoutFailureTypePrefix):]
timeoutType, ok := enumspb.TimeoutType_value[timeoutTypeValue]
if !ok || enumspb.TimeoutType(timeoutType) == enumspb.TIMEOUT_TYPE_UNSPECIFIED {
return serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid timeout type value: %v.", timeoutTypeValue))
}
}
}

return nil
}

Expand Down
37 changes: 37 additions & 0 deletions common/util_test.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/dynamicconfig"
Expand Down Expand Up @@ -105,6 +106,42 @@ func TestValidateRetryPolicy(t *testing.T) {
wantErr: true,
wantErrString: "MaximumAttempts cannot be negative on retry policy.",
},
{
name: "timeout nonretryable error - valid type",
input: &commonpb.RetryPolicy{
BackoffCoefficient: 1,
NonRetryableErrorTypes: []string{
TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String(),
TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String(),
TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String(),
TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String(),
},
},
wantErr: false,
wantErrString: "",
},
{
name: "timeout nonretryable error - unspecified type",
input: &commonpb.RetryPolicy{
BackoffCoefficient: 1,
NonRetryableErrorTypes: []string{
TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_UNSPECIFIED.String(),
},
},
wantErr: true,
wantErrString: "Invalid timeout type value: Unspecified.",
},
{
name: "timeout nonretryable error - unknown type",
input: &commonpb.RetryPolicy{
BackoffCoefficient: 1,
NonRetryableErrorTypes: []string{
TimeoutFailureTypePrefix + "unknown",
},
},
wantErr: true,
wantErrString: "Invalid timeout type value: unknown.",
},
}

for _, tt := range testCases {
Expand Down
3 changes: 2 additions & 1 deletion service/history/timerQueueActiveTaskExecutor.go
Expand Up @@ -241,7 +241,8 @@ Loop:
break Loop
}

timeoutFailure := failure.NewTimeoutFailure("activity timeout", timerSequenceID.TimerType)
failureMsg := fmt.Sprintf("activity %v timeout", timerSequenceID.TimerType.String())
timeoutFailure := failure.NewTimeoutFailure(failureMsg, timerSequenceID.TimerType)
var retryState enumspb.RetryState
if retryState, err = mutableState.RetryActivity(
activityInfo,
Expand Down
35 changes: 27 additions & 8 deletions service/history/workflow/retry.go
Expand Up @@ -39,6 +39,7 @@ import (

"go.temporal.io/server/api/historyservice/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/primitives/timestamp"
Expand Down Expand Up @@ -118,8 +119,16 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool {
}

if failure.GetTimeoutFailureInfo() != nil {
return failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_START_TO_CLOSE ||
failure.GetTimeoutFailureInfo().GetTimeoutType() == enumspb.TIMEOUT_TYPE_HEARTBEAT
timeoutType := failure.GetTimeoutFailureInfo().GetTimeoutType()
if timeoutType == enumspb.TIMEOUT_TYPE_START_TO_CLOSE ||
timeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT {
return !matchNonRetryableTypes(
common.TimeoutFailureTypePrefix+timeoutType.String(),
nonRetryableTypes,
)
}

return false
}

if failure.GetServerFailureInfo() != nil {
Expand All @@ -131,16 +140,26 @@ func isRetryable(failure *failurepb.Failure, nonRetryableTypes []string) bool {
return false
}

failureType := failure.GetApplicationFailureInfo().GetType()
for _, nrt := range nonRetryableTypes {
if nrt == failureType {
return false
}
}
return !matchNonRetryableTypes(
failure.GetApplicationFailureInfo().GetType(),
nonRetryableTypes,
)
}
return true
}

func matchNonRetryableTypes(
failureType string,
nonRetryableTypes []string,
) bool {
for _, nrt := range nonRetryableTypes {
if nrt == failureType {
return true
}
}
return false
}

// Helpers for creating new retry/cron workflows:

func SetupNewWorkflowForRetryOrCron(
Expand Down
7 changes: 7 additions & 0 deletions service/history/workflow/retry_test.go
Expand Up @@ -33,6 +33,7 @@ import (
failurepb "go.temporal.io/api/failure/v1"

persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/failure"
Expand Down Expand Up @@ -65,27 +66,33 @@ func Test_IsRetryable(t *testing.T) {
}},
}
a.True(isRetryable(f, nil))
a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()}))

f = &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START,
}},
}
a.False(isRetryable(f, nil))
a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START.String()}))

f = &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE,
}},
}
a.False(isRetryable(f, nil))
a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE.String()}))

f = &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
}},
}
a.True(isRetryable(f, nil))
a.False(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_HEARTBEAT.String()}))
a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + enumspb.TIMEOUT_TYPE_START_TO_CLOSE.String()}))
a.True(isRetryable(f, []string{common.TimeoutFailureTypePrefix + "unknown timeout type string"}))

f = &failurepb.Failure{
FailureInfo: &failurepb.Failure_ServerFailureInfo{ServerFailureInfo: &failurepb.ServerFailureInfo{
Expand Down

0 comments on commit f6acc48

Please sign in to comment.