From 73287b6ec40c52adbbd4065f67e541724685ad99 Mon Sep 17 00:00:00 2001 From: Vytautas Date: Wed, 22 Jul 2020 20:00:59 +0300 Subject: [PATCH] Fix decode issue for local activities returnin nil result (#943) (#1011) When local activity returns nil result, it is encoded into nil byte slice ([]byte). Later this is converted into string field ResultJSON within localActivityMarkerData structure. Such casting results in empty string, as this is its zero value in golang. Reverse conversion is not simetrical. Casting empty string back to byte slice results in empty byte slice. This later causes EOF error when trying to decode it back to the original result. The fix is to check whether it is non-empty string and only then assign the result back. This will be simetrical and will not cause decoding error later. This change also adds additional replay tests, that covers this scenario among many other local activity related cases. Co-authored-by: Bowei Xu --- internal/internal_event_handlers.go | 2 +- internal/internal_worker_test.go | 88 ++++++++++ internal/testdata/localActivities.json | 234 +++++++++++++++++++++++++ 3 files changed, 323 insertions(+), 1 deletion(-) create mode 100644 internal/testdata/localActivities.json diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index dc36b4720..ad73d4945 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1104,7 +1104,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa lar.attempt = lamd.Attempt lar.backoff = lamd.Backoff lar.err = constructError(lamd.ErrReason, []byte(lamd.ErrJSON), weh.GetDataConverter()) - } else { + } else if len(lamd.ResultJSON) > 0 { lar.result = []byte(lamd.ResultJSON) } la.callback(lar) diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 468e545b6..d2e75451a 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -351,6 +351,15 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activi require.Error(s.T(), err) } +func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileLocalActivities() { + logger := getLogger() + replayer := NewWorkflowReplayer() + wf := localActivitiesCallingOptionsWorkflow{s.T()} + replayer.RegisterWorkflow(wf.Execute) + err := replayer.ReplayWorkflowHistoryFromJSONFile(logger, "testdata/localActivities.json") + require.NoError(s.T(), err) +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileParent() { logger := getLogger() replayer := NewWorkflowReplayer() @@ -727,6 +736,85 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatByID() { require.NotNil(s.T(), heartbeatRequest) } +type localActivitiesCallingOptionsWorkflow struct { + t *testing.T +} + +func (w localActivitiesCallingOptionsWorkflow) Execute(ctx Context, input []byte) (result []byte, err error) { + ao := LocalActivityOptions{ + ScheduleToCloseTimeout: time.Second, + } + ctx = WithLocalActivityOptions(ctx, ao) + + // By functions. + err = ExecuteLocalActivity(ctx, testActivityByteArgs, input).Get(ctx, nil) + require.NoError(w.t, err, err) + + err = ExecuteLocalActivity(ctx, testActivityMultipleArgs, 2, []string{"test"}, true).Get(ctx, nil) + require.NoError(w.t, err, err) + + err = ExecuteLocalActivity(ctx, testActivityNoResult, 2, "test").Get(ctx, nil) + require.NoError(w.t, err, err) + + err = ExecuteLocalActivity(ctx, testActivityNoContextArg, 2, "test").Get(ctx, nil) + require.NoError(w.t, err, err) + + f := ExecuteLocalActivity(ctx, testActivityReturnByteArray) + var r []byte + err = f.Get(ctx, &r) + require.NoError(w.t, err, err) + require.Equal(w.t, []byte("testActivity"), r) + + f = ExecuteLocalActivity(ctx, testActivityReturnInt) + var rInt int + err = f.Get(ctx, &rInt) + require.NoError(w.t, err, err) + require.Equal(w.t, 5, rInt) + + f = ExecuteLocalActivity(ctx, testActivityReturnString) + var rString string + err = f.Get(ctx, &rString) + + require.NoError(w.t, err, err) + require.Equal(w.t, "testActivity", rString) + + f = ExecuteLocalActivity(ctx, testActivityReturnEmptyString) + var r2String string + err = f.Get(ctx, &r2String) + require.NoError(w.t, err, err) + require.Equal(w.t, "", r2String) + + f = ExecuteLocalActivity(ctx, testActivityReturnEmptyStruct) + var r2Struct testActivityResult + err = f.Get(ctx, &r2Struct) + require.NoError(w.t, err, err) + require.Equal(w.t, testActivityResult{}, r2Struct) + + f = ExecuteLocalActivity(ctx, testActivityReturnNilStructPtr) + var rStructPtr *testActivityResult + err = f.Get(ctx, &rStructPtr) + require.NoError(w.t, err, err) + require.True(w.t, rStructPtr == nil) + + f = ExecuteLocalActivity(ctx, testActivityReturnStructPtr) + err = f.Get(ctx, &rStructPtr) + require.NoError(w.t, err, err) + require.Equal(w.t, *rStructPtr, testActivityResult{Index: 10}) + + f = ExecuteLocalActivity(ctx, testActivityReturnNilStructPtrPtr) + var rStruct2Ptr **testActivityResult + err = f.Get(ctx, &rStruct2Ptr) + require.NoError(w.t, err, err) + require.True(w.t, rStruct2Ptr == nil) + + f = ExecuteLocalActivity(ctx, testActivityReturnStructPtrPtr) + err = f.Get(ctx, &rStruct2Ptr) + require.NoError(w.t, err, err) + require.True(w.t, **rStruct2Ptr == testActivityResult{Index: 10}) + + return []byte("Done"), nil +} + type activitiesCallingOptionsWorkflow struct { t *testing.T } diff --git a/internal/testdata/localActivities.json b/internal/testdata/localActivities.json new file mode 100644 index 000000000..f835cefd7 --- /dev/null +++ b/internal/testdata/localActivities.json @@ -0,0 +1,234 @@ +[ + { + "eventId": 1, + "timestamp": 1595342431739033300, + "eventType": "WorkflowExecutionStarted", + "version": -24, + "taskId": 1049501, + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "go.uber.org/cadence/internal.localActivitiesCallingOptionsWorkflow.Execute" + }, + "taskList": { + "name": "tl-1" + }, + "executionStartToCloseTimeoutSeconds": 15, + "taskStartToCloseTimeoutSeconds": 1, + "originalExecutionRunId": "dd67af3b-85d0-4217-83e5-a2fc66172afc", + "identity": "57330@vytautas-C02X55E1JGH6@", + "firstExecutionRunId": "dd67af3b-85d0-4217-83e5-a2fc66172afc", + "attempt": 0, + "cronSchedule": "", + "firstDecisionTaskBackoffSeconds": 0, + "header": { + "fields": {} + } + } + }, + { + "eventId": 2, + "timestamp": 1595342431739132600, + "eventType": "DecisionTaskScheduled", + "version": -24, + "taskId": 1049502, + "decisionTaskScheduledEventAttributes": { + "taskList": { + "name": "tl-1" + }, + "startToCloseTimeoutSeconds": 1, + "attempt": 0 + } + }, + { + "eventId": 3, + "timestamp": 1595342431763649100, + "eventType": "DecisionTaskStarted", + "version": -24, + "taskId": 1049510, + "decisionTaskStartedEventAttributes": { + "scheduledEventId": 2, + "identity": "57330@vytautas-C02X55E1JGH6@tl-1", + "requestId": "bf3ec0d2-f940-460b-82e2-7da51fc73c79" + } + }, + { + "eventId": 4, + "timestamp": 1595342431780575200, + "eventType": "DecisionTaskCompleted", + "version": -24, + "taskId": 1049513, + "decisionTaskCompletedEventAttributes": { + "scheduledEventId": 2, + "startedEventId": 3, + "identity": "57330@vytautas-C02X55E1JGH6@tl-1", + "binaryChecksum": "dc561051f5fc11e373587ce8e2a8dd93" + } + }, + { + "eventId": 5, + "timestamp": 1595342431780709600, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049514, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlCeXRlQXJncyIsInJlcGxheVRpbWUiOiIyMDIwLTA3LTIxVDE3OjQwOjMxLjc2Mzg1MDM2NSswMzowMCJ9Cg==", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 6, + "timestamp": 1595342431780729200, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049515, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlNdWx0aXBsZUFyZ3MiLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjM5Mjc4OTErMDM6MDAifQo=", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 7, + "timestamp": 1595342431780748400, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049516, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMiIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlOb1Jlc3VsdCIsInJlcGxheVRpbWUiOiIyMDIwLTA3LTIxVDE3OjQwOjMxLjc2Mzk5NjQ2MyswMzowMCJ9Cg==", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 8, + "timestamp": 1595342431780770400, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049517, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMyIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlOb0NvbnRleHRBcmciLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQwNzk4NDYrMDM6MDAifQo=", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 9, + "timestamp": 1595342431780787500, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049518, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiNCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5CeXRlQXJyYXkiLCJyZXN1bHRKc29uIjoidGVzdEFjdGl2aXR5IiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MTM0NzEyKzAzOjAwIn0K", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 10, + "timestamp": 1595342431780807500, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049519, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiNSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5JbnQiLCJyZXN1bHRKc29uIjoiNVxuIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MjAyODU3KzAzOjAwIn0K", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 11, + "timestamp": 1595342431780827000, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049520, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiNiIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5TdHJpbmciLCJyZXN1bHRKc29uIjoiXCJ0ZXN0QWN0aXZpdHlcIlxuIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MjUxMTI2KzAzOjAwIn0K", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 12, + "timestamp": 1595342431780845100, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049521, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiNyIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5FbXB0eVN0cmluZyIsInJlc3VsdEpzb24iOiJcIlwiXG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQyOTc4ODcrMDM6MDAifQo=", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 13, + "timestamp": 1595342431780861100, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049522, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiOCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5FbXB0eVN0cnVjdCIsInJlc3VsdEpzb24iOiJ7XCJJbmRleFwiOjB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQzNzE5MzIrMDM6MDAifQo=", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 14, + "timestamp": 1595342431780881000, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049523, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiOSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5OaWxTdHJ1Y3RQdHIiLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ0NDIzNDMrMDM6MDAifQo=", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 15, + "timestamp": 1595342431780901700, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049524, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMTAiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuU3RydWN0UHRyIiwicmVzdWx0SnNvbiI6IntcIkluZGV4XCI6MTB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ0ODYzOSswMzowMCJ9Cg==", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 16, + "timestamp": 1595342431780918200, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049525, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMTEiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuTmlsU3RydWN0UHRyUHRyIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0NTM2MTIzKzAzOjAwIn0K", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 17, + "timestamp": 1595342431780930300, + "eventType": "MarkerRecorded", + "version": -24, + "taskId": 1049526, + "markerRecordedEventAttributes": { + "markerName": "LocalActivity", + "details": "eyJhY3Rpdml0eUlkIjoiMTIiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuU3RydWN0UHRyUHRyIiwicmVzdWx0SnNvbiI6IntcIkluZGV4XCI6MTB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ2MzY3NjgrMDM6MDAifQo=", + "decisionTaskCompletedEventId": 4 + } + }, + { + "eventId": 18, + "timestamp": 1595342431780949100, + "eventType": "WorkflowExecutionCompleted", + "version": -24, + "taskId": 1049527, + "workflowExecutionCompletedEventAttributes": { + "result": "RG9uZQ==", + "decisionTaskCompletedEventId": 4 + } + } +] \ No newline at end of file