New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Data Converter to support custom serialization/deserialization #463
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You won't be able to specify different data converter for activity calls. Could you follow up with Max about this? I think that is something he wanted.
@@ -139,7 +140,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) { | |||
var err error | |||
// We would like to be a able to pass in "nil" as part of details(that is no progress to report to) | |||
if len(details) != 1 || details[0] != nil { | |||
data, err = getHostEnvironment().encodeArgs(details) | |||
data, err = encodeArgs(getDataConverterFromActivityCtx(ctx), details) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do getDataConverterFromActivityCtx(ctx).ToData(details...)? Make sure that getDataConverterFromActivityCtx() does not return nil. You can set the default data converter to the context if there is no customized one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do that, but to follow the existing pattern, we only use encodeArgs
encodeArg
decodeArgs
decodeArg
with data converter passed in as first argument, to avoid directly use dc.ToData. This is safer because 1: you don't need to be careful about the array...
2: you won't panic if careless had dc been nil. So prefer to not change this
internal/client.go
Outdated
@@ -335,11 +336,16 @@ func NewClient(service workflowserviceclient.Interface, domain string, options * | |||
metricScope = options.MetricsScope | |||
} | |||
metricScope = tagScope(metricScope, tagDomain, domain) | |||
dataConverter := newDefaultDataConverter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try to avoid this allocation if a customized one is supplied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
TimeoutType: &timeoutType, | ||
}) | ||
weh := &workflowExecutionEventHandlerImpl{context, nil} | ||
weh.handleActivityTaskTimedOut(event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you use the test framework to cover this heartbeat case?
internal/internal_activity.go
Outdated
@@ -68,6 +69,7 @@ type ( | |||
InputArgs []interface{} | |||
ScheduleToCloseTimeoutSeconds int32 | |||
WorkflowInfo *WorkflowInfo | |||
DataConverter encoded.DataConverter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you will need a rebase here, the executeLocalActivityParams has changed to contain localActivityOptions and other fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks for remind here
fType := reflect.TypeOf(f) | ||
if dataConverter == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make sure this dataConverter never be nil so we don't have to check this nil everywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually on purpose, for test code that use this, I want people just pass nil instead of figuration out how to deal with dc. So prefer not change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean our framework code to set a default value when user don't supply a custom one, that way, we guarantee the data converter won't be nil anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understand, that's what I already did. But to be in defensive way, I feel it is still better to have this check, especially in this function.
func (th *hostEnvImpl) decodeArg(data []byte, to interface{}) error { | ||
return th.decode(data, []interface{}{to}) | ||
func decodeArg(dc encoded.DataConverter, data []byte, to interface{}) error { | ||
if dc == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you have unit test to cover input with multiple parameters case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added TestDecodeArg in internal_worker_test
internal/workflow_testsuite.go
Outdated
if !b.HasValues() { | ||
return ErrNoData | ||
} | ||
for i, item := range b { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe do a sanity check to make sure len(valuePtr) <= len(b) and then loop through valuePtr instead of b.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
encoded/encoded.go
Outdated
@@ -38,4 +38,22 @@ type ( | |||
// Get extract the encoded values into strong typed value pointers. | |||
Get(valuePtr ...interface{}) error | |||
} | |||
|
|||
// DataConverter is used by the framework to serialize/deserialize method parameters that need to be sent over the wire. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to serialize/deserialize input and output of activity/workflow that ...
encoded/encoded.go
Outdated
// To encode/decode workflow arguments, one should set DataConverter in two places: | ||
// 1. Workflow worker, through worker.Options | ||
// 2. Client, through client.Options | ||
// To encode/decode Activity/ChildWorkflow arguments, one should set DataConverter two 2 places: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in two places
encoded/encoded.go
Outdated
// 2. Client, through client.Options | ||
// To encode/decode Activity/ChildWorkflow arguments, one should set DataConverter two 2 places: | ||
// 1. Inside workflow code, use workflow.WithDataConverter to create new Context, | ||
// and pass that context ExecuteActivity/ExecuteChildWorkflow calls. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pass that context to ...
internal/internal_activity.go
Outdated
@@ -59,10 +60,12 @@ type ( | |||
HeartbeatTimeoutSeconds int32 | |||
WaitForCancellation bool | |||
OriginalTaskListName string | |||
DataConverter encoded.DataConverter // used in testsuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to executeActivityParams
internal/internal_activity.go
Outdated
} | ||
|
||
localActivityOptions struct { | ||
ScheduleToCloseTimeoutSeconds int32 | ||
DataConverter encoded.DataConverter // used in testsuite |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this one as it is already in executeLocalActivityParams
internal/internal_activity.go
Outdated
@@ -186,6 +191,9 @@ func getValidatedLocalActivityOptions(ctx Context) (*localActivityOptions, error | |||
if p.ScheduleToCloseTimeoutSeconds <= 0 { | |||
return nil, errors.New("missing or negative ScheduleToCloseTimeoutSeconds") | |||
} | |||
if p.DataConverter == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -504,7 +511,7 @@ func (wc *workflowEnvironmentImpl) SideEffect(f func() ([]byte, error), callback | |||
callback(result, err) | |||
return | |||
} | |||
details, err = wc.hostEnv.encodeArgs([]interface{}{sideEffectID, result}) | |||
details, err = encodeArgs(wc.GetDataConverter(), []interface{}{sideEffectID, result}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are using wc.dataConverter and wc.GetDataConverter(). Better make them consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, changed all to use GetDataConverter()
} | ||
|
||
var encoder encoding | ||
if isUseThriftEncoding(r) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think think this Thrift encoding is working. Could you file a task to make sure this thrift encoding works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created #479
@@ -282,6 +286,7 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param | |||
childEnv.parentEnv = env | |||
childEnv.startedHandler = startedHandler | |||
childEnv.testWorkflowEnvironmentShared = env.testWorkflowEnvironmentShared | |||
childEnv.workerOptions = WorkerOptions{DataConverter: params.dataConverter} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to copy workerOptions from parent to child, no just for DataConverter.
@@ -392,6 +413,30 @@ func testWorkflowHello(ctx Context) (string, error) { | |||
return result, nil | |||
} | |||
|
|||
func testWorkflowHelloWithDataConverter(ctx Context) (string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you move this function as workflowFn := func() ... into the Test_ActivityMockFunction_WithDataConverter
…ber-go#463) * Add data converter to support custom encode/decode * Add comments on DataConverter interface * Address comments * Address comments
Users now can implement encoded.DataConverter and let Cadence use it to serialize/deserialize parameters that need to be sent over the wire.
To encode/decode workflow arguments, one should set DataConverter in 2 places:
To encode/decode Activity/ChildWorkflow arguments, one should set DataConverter in 2 places: