Skip to content

Commit

Permalink
feat: gRPC error handling (#744)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <juanlu_yu@intuit.com>
  • Loading branch information
jy4096 committed Jun 1, 2023
1 parent d6c3bda commit 466e380
Show file tree
Hide file tree
Showing 6 changed files with 527 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.7.5-0.20220415000625-a6b62f61a703
github.com/nats-io/nats.go v1.24.0
github.com/numaproj/numaflow-go v0.4.6-0.20230509160243-b9e82285daf3
github.com/numaproj/numaflow-go v0.4.6-0.20230525172000-ac3f0e9da8ec
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
github.com/redis/go-redis/v9 v9.0.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,8 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/numaproj/numaflow-go v0.4.6-0.20230509160243-b9e82285daf3 h1:U1irDJfatdjhOoMEZ0mskVB3bOq11Eej7T0BuRtkQ8c=
github.com/numaproj/numaflow-go v0.4.6-0.20230509160243-b9e82285daf3/go.mod h1:6hl5mLd3BFDNJ4gJhrft9vuHHPaog68iog2KkibHnbo=
github.com/numaproj/numaflow-go v0.4.6-0.20230525172000-ac3f0e9da8ec h1:wZz6QAnXAhqgiXNQ6KdK3srxJlTcLGu5v1rgqzQBDjk=
github.com/numaproj/numaflow-go v0.4.6-0.20230525172000-ac3f0e9da8ec/go.mod h1:6hl5mLd3BFDNJ4gJhrft9vuHHPaog68iog2KkibHnbo=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
Expand Down
63 changes: 56 additions & 7 deletions pkg/sources/transformer/grpc_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/function/client"
"github.com/numaproj/numaflow-go/pkg/function/udferr"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/numaproj/numaflow/pkg/forward/applier"
"github.com/numaproj/numaflow/pkg/isb"
Expand Down Expand Up @@ -93,13 +95,60 @@ func (u *gRPCBasedTransformer) ApplyMap(ctx context.Context, readMessage *isb.Re

datumList, err := u.client.MapTFn(ctx, d)
if err != nil {
return nil, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapTFn failed, %s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
udfErr, _ := udferr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
var success bool
_ = wait.ExponentialBackoffWithContext(ctx, wait.Backoff{
// retry every "duration * factor + [0, jitter]" interval for 5 times
Duration: 1 * time.Second,
Factor: 1,
Jitter: 0.1,
Steps: 5,
}, func() (done bool, err error) {
datumList, err = u.client.MapTFn(ctx, d)
if err != nil {
udfErr, _ = udferr.FromError(err)
switch udfErr.ErrorKind() {
case udferr.Retryable:
return false, nil
case udferr.NonRetryable:
return true, nil
default:
return true, nil
}
}
success = true
return true, nil
})
if !success {
return nil, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
}
}
case udferr.NonRetryable:
return nil, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
}
default:
return nil, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
}
}
}

Expand Down
206 changes: 206 additions & 0 deletions pkg/sources/transformer/grpc_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1/funcmock"
"github.com/numaproj/numaflow-go/pkg/function/clienttest"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
Expand Down Expand Up @@ -181,6 +183,210 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
},
})
})

t.Run("test error retryable: failed after 5 retries", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
req := &functionpb.DatumRequest{
Keys: []string{"test_error_key"},
Value: []byte(`forward_message`),
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169660, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
t.Log(t.Name(), "test timeout")
}
}()

u := NewMockGRPCBasedTransformer(mockClient)
_, err := u.ApplyMap(ctx, &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: time.Unix(1661169660, 0),
},
ID: "test_id",
Keys: []string{"test_error_key"},
},
Body: isb.Body{
Payload: []byte(`forward_message`),
},
},
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
})
})

t.Run("test error retryable: failed after 1 retry", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
req := &functionpb.DatumRequest{
Keys: []string{"test_error_key"},
Value: []byte(`forward_message`),
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169660, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.InvalidArgument, "mock test err: non retryable").Err())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
t.Log(t.Name(), "test timeout")
}
}()

u := NewMockGRPCBasedTransformer(mockClient)
_, err := u.ApplyMap(ctx, &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: time.Unix(1661169660, 0),
},
ID: "test_id",
Keys: []string{"test_error_key"},
},
Body: isb.Body{
Payload: []byte(`forward_message`),
},
},
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
})
})

t.Run("test error retryable: success after 1 retry", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
req := &functionpb.DatumRequest{
Keys: []string{"test_success_key"},
Value: []byte(`forward_message`),
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169720, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.DeadlineExceeded, "mock test err").Err())
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(&functionpb.DatumResponseList{
Elements: []*functionpb.DatumResponse{
{
Keys: []string{"test_success_key"},
Value: []byte(`forward_message`),
},
},
}, nil)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
t.Log(t.Name(), "test timeout")
}
}()

u := NewMockGRPCBasedTransformer(mockClient)
got, err := u.ApplyMap(ctx, &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: time.Unix(1661169720, 0),
},
ID: "test_id",
Keys: []string{"test_success_key"},
},
Body: isb.Body{
Payload: []byte(`forward_message`),
},
},
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.NoError(t, err)
assert.Equal(t, req.Keys, got[0].Keys)
assert.Equal(t, req.Value, got[0].Payload)
})

t.Run("test error non retryable", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
req := &functionpb.DatumRequest{
Keys: []string{"test_error_key"},
Value: []byte(`forward_message`),
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169660, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: req}).Return(nil, status.New(codes.InvalidArgument, "mock test err: non retryable").Err())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded {
t.Log(t.Name(), "test timeout")
}
}()

u := NewMockGRPCBasedTransformer(mockClient)
_, err := u.ApplyMap(ctx, &isb.ReadMessage{
Message: isb.Message{
Header: isb.Header{
MessageInfo: isb.MessageInfo{
EventTime: time.Unix(1661169660, 0),
},
ID: "test_id",
Keys: []string{"test_error_key"},
},
Body: isb.Body{
Payload: []byte(`forward_message`),
},
},
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, function.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: function.InternalErr{
Flag: true,
MainCarDown: false,
},
})
})
}

func TestGRPCBasedTransformer_ApplyWithMockClient_ChangePayload(t *testing.T) {
Expand Down
Loading

0 comments on commit 466e380

Please sign in to comment.