Skip to content

Commit

Permalink
Fix decode issue for local activities returnin nil result (#943) (#1011)
Browse files Browse the repository at this point in the history
When local activity returns nil result, it is encoded into nil byte
slice ([]byte). Later this is converted into string field ResultJSON
within localActivityMarkerData structure. Such casting results in
empty string, as this is its zero value in golang.

Reverse conversion is not simetrical. Casting empty string back to
byte slice results in empty byte slice. This later causes EOF error
when trying to decode it back to the original result.

The fix is to check whether it is non-empty string and only then
assign the result back. This will be simetrical and will not cause
decoding error later.

This change also adds additional replay tests, that covers this
scenario among many other local activity related cases.

Co-authored-by: Bowei Xu <boweixu@uber.com>
  • Loading branch information
vytautas-karpavicius and vancexu committed Jul 22, 2020
1 parent ba95013 commit 73287b6
Show file tree
Hide file tree
Showing 3 changed files with 323 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Expand Up @@ -1104,7 +1104,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(markerDa
lar.attempt = lamd.Attempt
lar.backoff = lamd.Backoff
lar.err = constructError(lamd.ErrReason, []byte(lamd.ErrJSON), weh.GetDataConverter())
} else {
} else if len(lamd.ResultJSON) > 0 {
lar.result = []byte(lamd.ResultJSON)
}
la.callback(lar)
Expand Down
88 changes: 88 additions & 0 deletions internal/internal_worker_test.go
Expand Up @@ -351,6 +351,15 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activi
require.Error(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileLocalActivities() {
logger := getLogger()
replayer := NewWorkflowReplayer()
wf := localActivitiesCallingOptionsWorkflow{s.T()}
replayer.RegisterWorkflow(wf.Execute)
err := replayer.ReplayWorkflowHistoryFromJSONFile(logger, "testdata/localActivities.json")
require.NoError(s.T(), err)
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistoryFromFileParent() {
logger := getLogger()
replayer := NewWorkflowReplayer()
Expand Down Expand Up @@ -727,6 +736,85 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatByID() {
require.NotNil(s.T(), heartbeatRequest)
}

type localActivitiesCallingOptionsWorkflow struct {
t *testing.T
}

func (w localActivitiesCallingOptionsWorkflow) Execute(ctx Context, input []byte) (result []byte, err error) {
ao := LocalActivityOptions{
ScheduleToCloseTimeout: time.Second,
}
ctx = WithLocalActivityOptions(ctx, ao)

// By functions.
err = ExecuteLocalActivity(ctx, testActivityByteArgs, input).Get(ctx, nil)
require.NoError(w.t, err, err)

err = ExecuteLocalActivity(ctx, testActivityMultipleArgs, 2, []string{"test"}, true).Get(ctx, nil)
require.NoError(w.t, err, err)

err = ExecuteLocalActivity(ctx, testActivityNoResult, 2, "test").Get(ctx, nil)
require.NoError(w.t, err, err)

err = ExecuteLocalActivity(ctx, testActivityNoContextArg, 2, "test").Get(ctx, nil)
require.NoError(w.t, err, err)

f := ExecuteLocalActivity(ctx, testActivityReturnByteArray)
var r []byte
err = f.Get(ctx, &r)
require.NoError(w.t, err, err)
require.Equal(w.t, []byte("testActivity"), r)

f = ExecuteLocalActivity(ctx, testActivityReturnInt)
var rInt int
err = f.Get(ctx, &rInt)
require.NoError(w.t, err, err)
require.Equal(w.t, 5, rInt)

f = ExecuteLocalActivity(ctx, testActivityReturnString)
var rString string
err = f.Get(ctx, &rString)

require.NoError(w.t, err, err)
require.Equal(w.t, "testActivity", rString)

f = ExecuteLocalActivity(ctx, testActivityReturnEmptyString)
var r2String string
err = f.Get(ctx, &r2String)
require.NoError(w.t, err, err)
require.Equal(w.t, "", r2String)

f = ExecuteLocalActivity(ctx, testActivityReturnEmptyStruct)
var r2Struct testActivityResult
err = f.Get(ctx, &r2Struct)
require.NoError(w.t, err, err)
require.Equal(w.t, testActivityResult{}, r2Struct)

f = ExecuteLocalActivity(ctx, testActivityReturnNilStructPtr)
var rStructPtr *testActivityResult
err = f.Get(ctx, &rStructPtr)
require.NoError(w.t, err, err)
require.True(w.t, rStructPtr == nil)

f = ExecuteLocalActivity(ctx, testActivityReturnStructPtr)
err = f.Get(ctx, &rStructPtr)
require.NoError(w.t, err, err)
require.Equal(w.t, *rStructPtr, testActivityResult{Index: 10})

f = ExecuteLocalActivity(ctx, testActivityReturnNilStructPtrPtr)
var rStruct2Ptr **testActivityResult
err = f.Get(ctx, &rStruct2Ptr)
require.NoError(w.t, err, err)
require.True(w.t, rStruct2Ptr == nil)

f = ExecuteLocalActivity(ctx, testActivityReturnStructPtrPtr)
err = f.Get(ctx, &rStruct2Ptr)
require.NoError(w.t, err, err)
require.True(w.t, **rStruct2Ptr == testActivityResult{Index: 10})

return []byte("Done"), nil
}

type activitiesCallingOptionsWorkflow struct {
t *testing.T
}
Expand Down
234 changes: 234 additions & 0 deletions internal/testdata/localActivities.json
@@ -0,0 +1,234 @@
[
{
"eventId": 1,
"timestamp": 1595342431739033300,
"eventType": "WorkflowExecutionStarted",
"version": -24,
"taskId": 1049501,
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "go.uber.org/cadence/internal.localActivitiesCallingOptionsWorkflow.Execute"
},
"taskList": {
"name": "tl-1"
},
"executionStartToCloseTimeoutSeconds": 15,
"taskStartToCloseTimeoutSeconds": 1,
"originalExecutionRunId": "dd67af3b-85d0-4217-83e5-a2fc66172afc",
"identity": "57330@vytautas-C02X55E1JGH6@",
"firstExecutionRunId": "dd67af3b-85d0-4217-83e5-a2fc66172afc",
"attempt": 0,
"cronSchedule": "",
"firstDecisionTaskBackoffSeconds": 0,
"header": {
"fields": {}
}
}
},
{
"eventId": 2,
"timestamp": 1595342431739132600,
"eventType": "DecisionTaskScheduled",
"version": -24,
"taskId": 1049502,
"decisionTaskScheduledEventAttributes": {
"taskList": {
"name": "tl-1"
},
"startToCloseTimeoutSeconds": 1,
"attempt": 0
}
},
{
"eventId": 3,
"timestamp": 1595342431763649100,
"eventType": "DecisionTaskStarted",
"version": -24,
"taskId": 1049510,
"decisionTaskStartedEventAttributes": {
"scheduledEventId": 2,
"identity": "57330@vytautas-C02X55E1JGH6@tl-1",
"requestId": "bf3ec0d2-f940-460b-82e2-7da51fc73c79"
}
},
{
"eventId": 4,
"timestamp": 1595342431780575200,
"eventType": "DecisionTaskCompleted",
"version": -24,
"taskId": 1049513,
"decisionTaskCompletedEventAttributes": {
"scheduledEventId": 2,
"startedEventId": 3,
"identity": "57330@vytautas-C02X55E1JGH6@tl-1",
"binaryChecksum": "dc561051f5fc11e373587ce8e2a8dd93"
}
},
{
"eventId": 5,
"timestamp": 1595342431780709600,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049514,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlCeXRlQXJncyIsInJlcGxheVRpbWUiOiIyMDIwLTA3LTIxVDE3OjQwOjMxLjc2Mzg1MDM2NSswMzowMCJ9Cg==",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 6,
"timestamp": 1595342431780729200,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049515,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlNdWx0aXBsZUFyZ3MiLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjM5Mjc4OTErMDM6MDAifQo=",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 7,
"timestamp": 1595342431780748400,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049516,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMiIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlOb1Jlc3VsdCIsInJlcGxheVRpbWUiOiIyMDIwLTA3LTIxVDE3OjQwOjMxLjc2Mzk5NjQ2MyswMzowMCJ9Cg==",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 8,
"timestamp": 1595342431780770400,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049517,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMyIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlOb0NvbnRleHRBcmciLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQwNzk4NDYrMDM6MDAifQo=",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 9,
"timestamp": 1595342431780787500,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049518,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiNCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5CeXRlQXJyYXkiLCJyZXN1bHRKc29uIjoidGVzdEFjdGl2aXR5IiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MTM0NzEyKzAzOjAwIn0K",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 10,
"timestamp": 1595342431780807500,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049519,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiNSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5JbnQiLCJyZXN1bHRKc29uIjoiNVxuIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MjAyODU3KzAzOjAwIn0K",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 11,
"timestamp": 1595342431780827000,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049520,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiNiIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5TdHJpbmciLCJyZXN1bHRKc29uIjoiXCJ0ZXN0QWN0aXZpdHlcIlxuIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0MjUxMTI2KzAzOjAwIn0K",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 12,
"timestamp": 1595342431780845100,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049521,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiNyIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5FbXB0eVN0cmluZyIsInJlc3VsdEpzb24iOiJcIlwiXG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQyOTc4ODcrMDM6MDAifQo=",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 13,
"timestamp": 1595342431780861100,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049522,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiOCIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5FbXB0eVN0cnVjdCIsInJlc3VsdEpzb24iOiJ7XCJJbmRleFwiOjB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQzNzE5MzIrMDM6MDAifQo=",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 14,
"timestamp": 1595342431780881000,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049523,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiOSIsImFjdGl2aXR5VHlwZSI6ImdvLnViZXIub3JnL2NhZGVuY2UvdGVzdC50ZXN0QWN0aXZpdHlSZXR1cm5OaWxTdHJ1Y3RQdHIiLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ0NDIzNDMrMDM6MDAifQo=",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 15,
"timestamp": 1595342431780901700,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049524,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMTAiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuU3RydWN0UHRyIiwicmVzdWx0SnNvbiI6IntcIkluZGV4XCI6MTB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ0ODYzOSswMzowMCJ9Cg==",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 16,
"timestamp": 1595342431780918200,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049525,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMTEiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuTmlsU3RydWN0UHRyUHRyIiwicmVwbGF5VGltZSI6IjIwMjAtMDctMjFUMTc6NDA6MzEuNzY0NTM2MTIzKzAzOjAwIn0K",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 17,
"timestamp": 1595342431780930300,
"eventType": "MarkerRecorded",
"version": -24,
"taskId": 1049526,
"markerRecordedEventAttributes": {
"markerName": "LocalActivity",
"details": "eyJhY3Rpdml0eUlkIjoiMTIiLCJhY3Rpdml0eVR5cGUiOiJnby51YmVyLm9yZy9jYWRlbmNlL3Rlc3QudGVzdEFjdGl2aXR5UmV0dXJuU3RydWN0UHRyUHRyIiwicmVzdWx0SnNvbiI6IntcIkluZGV4XCI6MTB9XG4iLCJyZXBsYXlUaW1lIjoiMjAyMC0wNy0yMVQxNzo0MDozMS43NjQ2MzY3NjgrMDM6MDAifQo=",
"decisionTaskCompletedEventId": 4
}
},
{
"eventId": 18,
"timestamp": 1595342431780949100,
"eventType": "WorkflowExecutionCompleted",
"version": -24,
"taskId": 1049527,
"workflowExecutionCompletedEventAttributes": {
"result": "RG9uZQ==",
"decisionTaskCompletedEventId": 4
}
}
]

0 comments on commit 73287b6

Please sign in to comment.