Skip to content

Commit a2ac97f

Browse files
authored
Changes to gRPC message too large error handling (#2042)
* Changed gRPC message too large error handling to wrap error in a new type instead of using non-standard cause enum * Improved TestGrpcMessageTooLarge to not rely on timeouts
1 parent b2d1477 commit a2ac97f

File tree

5 files changed

+57
-68
lines changed

5 files changed

+57
-68
lines changed

internal/common/retry/interceptor.go

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99

1010
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/retry"
1111
"github.com/grpc-ecosystem/go-grpc-middleware/v2/util/backoffutils"
12-
enumspb "go.temporal.io/api/enums/v1"
13-
"go.temporal.io/api/errordetails/v1"
1412
"google.golang.org/grpc"
1513
"google.golang.org/grpc/codes"
1614
"google.golang.org/grpc/status"
@@ -45,6 +43,11 @@ type (
4543
maximumAttempts int
4644
}
4745

46+
GrpcMessageTooLargeError struct {
47+
err error
48+
status *status.Status
49+
}
50+
4851
contextKey struct{}
4952
)
5053

@@ -94,15 +97,15 @@ func NewGrpcRetryConfig(initialInterval time.Duration) *GrpcRetryConfig {
9497
var (
9598
// ConfigKey context key for GrpcRetryConfig
9699
ConfigKey = contextKey{}
97-
// gRPC response codes that represent unconditionally retryable errors.
100+
// gRPC response codes that represent retryable errors.
98101
// The following status codes are never retried by the library:
99102
// INVALID_ARGUMENT, NOT_FOUND, ALREADY_EXISTS, FAILED_PRECONDITION, ABORTED, OUT_OF_RANGE, DATA_LOSS
100103
// codes.DeadlineExceeded and codes.Canceled are not here (and shouldn't be here!)
101104
// because they are coming from go context and "context errors are not retriable based on user settings"
102105
// by gRPC library.
103-
// codes.Internal and codes.ResourceExhausted have special logic for whether they are retryable or not,
104-
// and so they're not included in this list.
105-
alwaysRetryableCodes = []codes.Code{codes.Aborted, codes.Unavailable, codes.Unknown}
106+
// codes.ResourceExhausted is non-retryable if it comes from GrpcMessageTooLargeError, but otherwise is retryable.
107+
// codes.Internal is not included because it's retryable or non-retryable depending on server capabilities.
108+
retryableCodesWithoutInternal = []codes.Code{codes.Aborted, codes.ResourceExhausted, codes.Unavailable, codes.Unknown}
106109
)
107110

108111
// NewRetryOptionsInterceptor creates a new gRPC interceptor that populates retry options for each call based on values
@@ -132,7 +135,7 @@ func NewRetryOptionsInterceptor(excludeInternal *atomic.Bool) grpc.UnaryClientIn
132135
opts = append(opts, grpc_retry.WithMax(math.MaxUint32))
133136
}
134137
opts = append(opts, grpc_retry.WithRetriable(func(err error) bool {
135-
return IsRetryable(status.Convert(err), excludeInternal)
138+
return IsRetryable(err, excludeInternal)
136139
}))
137140
} else {
138141
// Do not retry if retry config is not set.
@@ -142,51 +145,48 @@ func NewRetryOptionsInterceptor(excludeInternal *atomic.Bool) grpc.UnaryClientIn
142145
}
143146
}
144147

145-
func IsRetryable(status *status.Status, excludeInternalFromRetry *atomic.Bool) bool {
146-
if status == nil {
148+
func IsRetryable(err error, excludeInternalFromRetry *atomic.Bool) bool {
149+
if _, ok := err.(*GrpcMessageTooLargeError); ok {
150+
return false
151+
}
152+
grpcStatus := status.Convert(err)
153+
if grpcStatus == nil {
147154
return false
148155
}
149-
errCode := status.Code()
150-
for _, retryable := range alwaysRetryableCodes {
156+
errCode := grpcStatus.Code()
157+
for _, retryable := range retryableCodesWithoutInternal {
151158
if errCode == retryable {
152159
return true
153160
}
154161
}
155162
if errCode == codes.Internal {
156163
return !excludeInternalFromRetry.Load()
157164
}
158-
if errCode == codes.ResourceExhausted {
159-
if details := status.Details(); len(details) > 0 {
160-
if failure, ok := details[0].(*errordetails.ResourceExhaustedFailure); ok {
161-
return failure.Cause != RESOURCE_EXHAUSTED_CAUSE_EXT_GRPC_MESSAGE_TOO_LARGE
162-
}
163-
}
164-
}
165165
return false
166166
}
167167

168-
// RESOURCE_EXHAUSTED_CAUSE_EXT_GRPC_MESSAGE_TOO_LARGE is an extension to the ResourceExhaustedCause enum to mark gRPC message too large errors.
169-
const RESOURCE_EXHAUSTED_CAUSE_EXT_GRPC_MESSAGE_TOO_LARGE enumspb.ResourceExhaustedCause = 101 // TODO: add the cause to the upstream API repo and remove this (see https://github.com/temporalio/sdk-go/issues/2030)
170-
171-
// SetGrpcMessageTooLargeErrorCauseInterceptor adds appropriate error details if the error cause is gRPC message being too large.
172-
func SetGrpcMessageTooLargeErrorCauseInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
168+
// GrpcMessageTooLargeErrorInterceptor checks if the error is caused by gRPC message being too large and converts it into GrpcMessageTooLargeError.
169+
func GrpcMessageTooLargeErrorInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
173170
err := invoker(ctx, method, req, reply, cc, opts...)
174-
if grpcStatus := status.Convert(err); isGrpcMessageTooLargeError(grpcStatus) {
175-
// Copying code and message but ignoring original details
176-
newStatus := status.New(grpcStatus.Code(), grpcStatus.Message())
177-
newStatus, detailsErr := newStatus.WithDetails(&errordetails.ResourceExhaustedFailure{
178-
Cause: RESOURCE_EXHAUSTED_CAUSE_EXT_GRPC_MESSAGE_TOO_LARGE,
179-
})
180-
if detailsErr == nil {
181-
if newErr := newStatus.Err(); newErr != nil {
182-
err = newErr
183-
}
184-
}
171+
if grpcStatus := status.Convert(err); isGrpcMessageTooLargeStatus(grpcStatus) {
172+
err = &GrpcMessageTooLargeError{err: err, status: grpcStatus}
185173
}
186174
return err
187175
}
188176

189-
func isGrpcMessageTooLargeError(status *status.Status) bool {
177+
func (e *GrpcMessageTooLargeError) Error() string {
178+
return e.err.Error()
179+
}
180+
181+
func (e *GrpcMessageTooLargeError) Unwrap() error {
182+
return e.err
183+
}
184+
185+
func (e *GrpcMessageTooLargeError) GRPCStatus() *status.Status {
186+
return e.status
187+
}
188+
189+
func isGrpcMessageTooLargeStatus(status *status.Status) bool {
190190
if status == nil {
191191
return false
192192
}

internal/grpc_dialer.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package internal
22

33
import (
44
"context"
5+
"errors"
56
"sync/atomic"
67
"time"
78

@@ -139,7 +140,7 @@ func requiredInterceptors(
139140
// Performs retries *IF* retry options are set for the call.
140141
grpc_retry.UnaryClientInterceptor(),
141142
// Prevents retrying grpc message too large errors, while allowing retries of other resource exhausted errors.
142-
retry.SetGrpcMessageTooLargeErrorCauseInterceptor,
143+
retry.GrpcMessageTooLargeErrorInterceptor,
143144
// Report metrics for every call made to the server.
144145
metrics.NewGRPCInterceptor(clientOptions.MetricsHandler, attemptSuffix, clientOptions.DisableErrorCodeMetricTags),
145146
}
@@ -199,6 +200,9 @@ func headersProviderInterceptor(headersProvider HeadersProvider) grpc.UnaryClien
199200

200201
func errorInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
201202
err := invoker(ctx, method, req, reply, cc, opts...)
202-
err = serviceerror.FromStatus(status.Convert(err))
203+
var grpcMessageTooLargeErr *retry.GrpcMessageTooLargeError
204+
if !errors.As(err, &grpcMessageTooLargeErr) {
205+
err = serviceerror.FromStatus(status.Convert(err))
206+
}
203207
return err
204208
}

internal/internal_task_handlers.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"go.temporal.io/api/serviceerror"
2525
taskqueuepb "go.temporal.io/api/taskqueue/v1"
2626
"go.temporal.io/api/workflowservice/v1"
27-
"google.golang.org/grpc/status"
2827
"google.golang.org/protobuf/types/known/durationpb"
2928

3029
"go.temporal.io/sdk/internal/common/retry"
@@ -2164,8 +2163,7 @@ func (i *temporalInvoker) internalHeartBeat(ctx context.Context, details *common
21642163
// Transient errors are getting retried for the duration of the heartbeat timeout.
21652164
// The fact that error has been returned means that activity should now be timed out, hence we should
21662165
// propagate cancellation to the handler.
2167-
statusErr, _ := status.FromError(err)
2168-
if retry.IsRetryable(statusErr, i.excludeInternalFromRetry) {
2166+
if retry.IsRetryable(err, i.excludeInternalFromRetry) {
21692167
i.cancelHandler(err)
21702168
isActivityCanceled = true
21712169
}

internal/internal_task_pollers.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,8 @@ func (wtp *workflowTaskProcessor) RespondTaskCompletedWithMetrics(
544544

545545
response, err = wtp.sendTaskCompletedRequest(completedRequest, task)
546546

547-
if isGrpcMessageTooLargeError(err) {
547+
var grpcMessageTooLargeErr *retry.GrpcMessageTooLargeError
548+
if errors.As(err, &grpcMessageTooLargeErr) {
548549
secondEmitFailMetric, secondErr := wtp.reportGrpcMessageTooLarge(completedRequest, task, err)
549550
if secondEmitFailMetric {
550551
emitFailMetric = true
@@ -1537,8 +1538,3 @@ func (nt *nexusTask) scaleDecision() (pollerScaleDecision, bool) {
15371538
pollRequestDeltaSuggestion: int(nt.task.PollerScalingDecision.PollRequestDeltaSuggestion),
15381539
}, true
15391540
}
1540-
1541-
func isGrpcMessageTooLargeError(err error) bool {
1542-
serviceErr, ok := err.(*serviceerror.ResourceExhausted)
1543-
return ok && serviceErr.Cause == retry.RESOURCE_EXHAUSTED_CAUSE_EXT_GRPC_MESSAGE_TOO_LARGE
1544-
}

test/integration_test.go

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7751,23 +7751,17 @@ func (ts *IntegrationTestSuite) TestLocalActivitySummary() {
77517751
}
77527752

77537753
func (ts *IntegrationTestSuite) TestGrpcMessageTooLarge() {
7754-
ts.T().Skip("issue-2033: Test is flaky")
7755-
assertGrpcErrorInHistoryOnce := func(ctx context.Context, run client.WorkflowRun) {
7756-
found := false
7754+
assertGrpcErrorInHistory := func(ctx context.Context, run client.WorkflowRun) {
77577755
iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), true, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
77587756
for iter.HasNext() {
77597757
event, err := iter.Next()
77607758
ts.NoError(err)
77617759
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED {
7762-
if found {
7763-
ts.Fail("Found more than 1 workflow task failed event in history")
7764-
} else {
7765-
found = true
7766-
}
77677760
ts.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_GRPC_MESSAGE_TOO_LARGE, event.GetWorkflowTaskFailedEventAttributes().Cause)
7761+
return
77687762
}
77697763
}
7770-
ts.True(found, "Workflow task failed event not found in history")
7764+
ts.Fail("Workflow task failed event not found in history")
77717765
}
77727766

77737767
veryLargeData := strings.Repeat("Very Large Data ", 500_000) // circa 8MB, double the default 4MB limit
@@ -7801,25 +7795,22 @@ func (ts *IntegrationTestSuite) TestGrpcMessageTooLarge() {
78017795
ts.worker.RegisterWorkflow(failureInQueryTaskWorkflowFn)
78027796
ts.worker.RegisterActivity(activityFn)
78037797
startOptions := client.StartWorkflowOptions{
7804-
TaskQueue: ts.taskQueueName,
7805-
WorkflowTaskTimeout: 300 * time.Millisecond,
7806-
WorkflowRunTimeout: 1 * time.Second,
7798+
TaskQueue: ts.taskQueueName,
78077799
}
78087800

7809-
ts.Run("Activity start too large", func() {
7810-
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, failureInWorkflowTaskWorkflowFn, true)
7801+
testFailureInWorkflowTask := func(success bool) {
7802+
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, failureInWorkflowTaskWorkflowFn, success)
78117803
ts.NoError(err)
7812-
err = run.Get(ctx, nil)
7813-
ts.Error(err)
7814-
assertGrpcErrorInHistoryOnce(ctx, run)
7804+
assertGrpcErrorInHistory(ctx, run)
7805+
ts.NoError(ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "Test completed"))
7806+
}
7807+
7808+
ts.Run("Activity start too large", func() {
7809+
testFailureInWorkflowTask(true)
78157810
})
78167811

78177812
ts.Run("Workflow failure too large", func() {
7818-
run, err := ts.client.ExecuteWorkflow(ctx, startOptions, failureInWorkflowTaskWorkflowFn, true)
7819-
ts.NoError(err)
7820-
err = run.Get(ctx, nil)
7821-
ts.Error(err)
7822-
assertGrpcErrorInHistoryOnce(ctx, run)
7813+
testFailureInWorkflowTask(false)
78237814
})
78247815

78257816
// successful query case is tested by TestLargeQueryResultError

0 commit comments

Comments
 (0)