From cde9c84d8d0d494dffdb5f718af1424708d7eeb1 Mon Sep 17 00:00:00 2001 From: Shreyas Srivatsan Date: Wed, 1 May 2019 14:57:43 -0700 Subject: [PATCH] add more test --- Gopkg.lock | 17 ++++- internal/headers.go | 9 +++ internal/internal_activity.go | 1 + internal/internal_event_handlers.go | 1 + internal/internal_workflow.go | 17 +++++ internal/internal_workflow_client_test.go | 30 ++++++++ internal/internal_workflow_testsuite.go | 15 +++- internal/internal_workflow_testsuite_test.go | 80 ++++++++++++++++++++ internal/workflow.go | 4 + internal/workflow_testsuite.go | 59 +++++++++++++++ 10 files changed, 231 insertions(+), 2 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 43ff1baa0..6d12b82c2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -83,6 +83,14 @@ revision = "a97ce2ca70fa5a848076093f05e639a89ca34d06" version = "v1.0" +[[projects]] + digest = "1:1d7e1867c49a6dd9856598ef7c3123604ea3daabf5b83f303ff457bcbc410b1d" + name = "github.com/pkg/errors" + packages = ["."] + pruneopts = "" + revision = "ba968bfe8b2f7e042a574c888954fccecfa385b4" + version = "v0.8.1" + [[projects]] digest = "1:256484dbbcd271f9ecebc6795b2df8cad4c458dd0f5fd82a8c2fa0c29f233411" name = "github.com/pmezard/go-difflib" @@ -182,15 +190,21 @@ name = "github.com/uber/jaeger-client-go" packages = [ ".", + "config", "internal/baggage", + "internal/baggage/remote", "internal/spanlog", "internal/throttler", + "internal/throttler/remote", "log", + "rpcmetrics", "thrift", "thrift-gen/agent", + "thrift-gen/baggage", "thrift-gen/jaeger", "thrift-gen/sampling", "thrift-gen/zipkincore", + "transport", "utils", ] pruneopts = "" @@ -342,6 +356,7 @@ "github.com/facebookgo/clock", "github.com/golang/mock/gomock", "github.com/opentracing/opentracing-go", + "github.com/opentracing/opentracing-go/ext", "github.com/pborman/uuid", "github.com/robfig/cron", "github.com/sirupsen/logrus", @@ -350,7 +365,7 @@ "github.com/stretchr/testify/require", "github.com/stretchr/testify/suite", "github.com/uber-go/tally", - "github.com/uber/jaeger-client-go", + "github.com/uber/jaeger-client-go/config", "github.com/uber/tchannel-go/thrift", "go.uber.org/atomic", "go.uber.org/thriftrw/thriftreflect", diff --git a/internal/headers.go b/internal/headers.go index a5a356a61..b644d69be 100644 --- a/internal/headers.go +++ b/internal/headers.go @@ -59,6 +59,9 @@ type headerReader struct { } func (hr *headerReader) ForEachKey(handler func(string, []byte) error) error { + if hr.header == nil { + return nil + } for key, value := range hr.header.Fields { if err := handler(key, value); err != nil { return err @@ -77,10 +80,16 @@ type headerWriter struct { } func (hw *headerWriter) Set(key string, value []byte) { + if hw.header == nil { + return + } hw.header.Fields[key] = value } // NewHeaderWriter returns a header writer interface func NewHeaderWriter(header *shared.Header) HeaderWriter { + if header != nil && header.Fields == nil { + header.Fields = make(map[string][]byte) + } return &headerWriter{header} } diff --git a/internal/internal_activity.go b/internal/internal_activity.go index cde7b64d4..6acc3a7ad 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -75,6 +75,7 @@ type ( ActivityType ActivityType Input []byte DataConverter encoded.DataConverter + Header *shared.Header } executeLocalActivityParams struct { diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index fbccb482c..6ab280352 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -383,6 +383,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar scheduleTaskAttr.ScheduleToStartTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToStartTimeoutSeconds) scheduleTaskAttr.HeartbeatTimeoutSeconds = common.Int32Ptr(parameters.HeartbeatTimeoutSeconds) scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy + scheduleTaskAttr.Header = parameters.Header decision := wc.decisionsHelper.scheduleActivityTask(scheduleTaskAttr) decision.setData(&scheduledActivity{ diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 109b7afac..e068d7255 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -37,6 +37,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/atomic" "go.uber.org/cadence/.gen/go/shared" + s "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/encoded" "go.uber.org/cadence/internal/common" "go.uber.org/cadence/internal/common/metrics" @@ -1172,6 +1173,22 @@ func getDataConverterFromWorkflowContext(ctx Context) encoded.DataConverter { return options.dataConverter } +func getContextPropagatorsFromWorkflowContext(ctx Context) []ContextPropagator { + options := getWorkflowEnvOptions(ctx) + return options.contextPropagators +} + +func getHeadersFromContext(ctx Context) *shared.Header { + header := &s.Header{ + Fields: make(map[string][]byte), + } + contextPropagators := getContextPropagatorsFromWorkflowContext(ctx) + for _, ctxProp := range contextPropagators { + ctxProp.InjectFromWorkflow(ctx, NewHeaderWriter(header)) + } + return header +} + // getSignalChannel finds the associated channel for the signal. func (w *workflowOptions) getSignalChannel(ctx Context, signalName string) Channel { if ch, ok := w.signalChannels[signalName]; ok { diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 65686df95..dfd3e9612 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -22,6 +22,7 @@ package internal import ( "context" "errors" + "fmt" "log" "os" "testing" @@ -748,6 +749,35 @@ func (s *workflowClientTestSuite) TestStartWorkflow() { s.Equal(createResponse.GetRunId(), resp.RunID) } +func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() { + s.client = NewClient(s.service, domain, &ClientOptions{ContextPropagators: []ContextPropagator{&TestContextPropagator{keys: []string{testHeader}}}}) + client, ok := s.client.(*workflowClient) + s.True(ok) + options := StartWorkflowOptions{ + ID: workflowID, + TaskList: tasklist, + ExecutionStartToCloseTimeout: timeoutInSeconds, + DecisionTaskStartToCloseTimeout: timeoutInSeconds, + } + f1 := func(ctx Context, r []byte) error { + value := ctx.Value(contextKey(testHeader)) + if val, ok := value.([]byte); ok { + s.Equal("test-data", string(val)) + return nil + } + return fmt.Errorf("context did not propagate to workflow") + } + + createResponse := &shared.StartWorkflowExecutionResponse{ + RunId: common.StringPtr(runID), + } + s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil) + + resp, err := client.StartWorkflow(context.Background(), options, f1, []byte("test")) + s.Nil(err) + s.Equal(createResponse.GetRunId(), resp.RunID) +} + func (s *workflowClientTestSuite) TestStartWorkflow_WithDataConverter() { dc := newTestDataConverter() s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc}) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index a02f90c15..46b4c4652 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -52,6 +52,7 @@ const ( defaultTestWorkflowTypeName = "default-test-workflow-type-name" defaultTestDomainName = "default-test-domain-name" workflowTypeNotSpecified = "workflow-type-not-specified" + testHeader = "test-header" ) type ( @@ -283,6 +284,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme if env.workerOptions.DataConverter == nil { env.workerOptions.DataConverter = getDefaultDataConverter() } + env.workerOptions.ContextPropagators = append(env.workerOptions.ContextPropagators, &TestContextPropagator{keys: []string{testHeader}}) return env } @@ -404,11 +406,16 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time. panic(err) } env.workflowDef = workflowDefinition + header := &shared.Header{ + Fields: map[string][]byte{ + "test-header": []byte("test-data"), + }, + } // env.workflowDef.Execute() method will execute dispatcher. We want the dispatcher to only run in main loop. // In case of child workflow, this executeWorkflowInternal() is run in separate goroutinue, so use postCallback // to make sure workflowDef.Execute() is run in main loop. env.postCallback(func() { - env.workflowDef.Execute(env, &shared.Header{}, input) + env.workflowDef.Execute(env, header, input) // kick off first decision task to start the workflow if delayStart == 0 { env.startDecisionTask() @@ -463,6 +470,11 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( }, ActivityType: *activityType, Input: input, + Header: &shared.Header{ + Fields: map[string][]byte{ + testHeader: []byte("test-data"), + }, + }, } task := newTestActivityTask( @@ -1475,6 +1487,7 @@ func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, domain Name: common.StringPtr(workflowTypeName), }, WorkflowDomain: common.StringPtr(domainName), + Header: params.Header, } return task } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 8892e7660..e776d5079 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -56,8 +56,10 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() { ScheduleToCloseTimeout: time.Second * 3, } RegisterWorkflowWithOptions(testWorkflowHello, RegisterWorkflowOptions{Name: "testWorkflowHello"}) + RegisterWorkflow(testWorkflowContext) RegisterWorkflow(testWorkflowHeartbeat) RegisterActivityWithOptions(testActivityHello, RegisterActivityOptions{Name: "testActivityHello"}) + RegisterActivity(testActivityContext) RegisterActivity(testActivityHeartbeat) } @@ -362,6 +364,28 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithUserContext() { s.Equal(testValue, value) } +func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithHeaderContext() { + workerOptions := WorkerOptions{} + + // inline activity using value passing through user context. + activityWithUserContext := func(ctx context.Context) (string, error) { + value := ctx.Value(contextKey(testHeader)) + if val, ok := value.([]byte); ok { + return string(val), nil + } + return "", errors.New("value not found from ctx") + } + RegisterActivity(activityWithUserContext) + + env := s.NewTestActivityEnvironment() + env.SetWorkerOptions(workerOptions) + blob, err := env.ExecuteActivity(activityWithUserContext) + s.NoError(err) + var value string + blob.Get(&value) + s.Equal("test-data", value) +} + func (s *WorkflowTestSuiteUnitTest) Test_CompleteActivity() { env := s.NewTestWorkflowEnvironment() var activityInfo ActivityInfo @@ -438,10 +462,26 @@ func testWorkflowHello(ctx Context) (string, error) { return result, nil } +func testWorkflowContext(ctx Context) (string, error) { + value := ctx.Value(contextKey(testHeader)) + if val, ok := value.([]byte); ok { + return string(val), nil + } + return "", fmt.Errorf("context did not propagate to workflow") +} + func testActivityHello(ctx context.Context, msg string) (string, error) { return "hello" + "_" + msg, nil } +func testActivityContext(ctx context.Context) (string, error) { + value := ctx.Value(contextKey(testHeader)) + if val, ok := value.([]byte); ok { + return string(val), nil + } + return "", fmt.Errorf("context did not propagate to workflow") +} + func testWorkflowHeartbeat(ctx Context, msg string, waitTime time.Duration) (string, error) { ao := ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -1305,6 +1345,46 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowFriendlyName() { s.Equal("testWorkflowHello", called[1]) } +func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() { + + workflowFn := func(ctx Context) error { + value := ctx.Value(contextKey(testHeader)) + if val, ok := value.([]byte); ok { + s.Equal("test-data", string(val)) + } else { + return fmt.Errorf("context did not propagate to workflow") + } + + cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} + ctx = WithChildWorkflowOptions(ctx, cwo) + var result string + if err := ExecuteChildWorkflow(ctx, testWorkflowContext).Get(ctx, &result); err != nil { + return err + } + s.Equal("test-data", result) + + ao := ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: 20 * time.Second, + } + ctx = WithActivityOptions(ctx, ao) + if err := ExecuteActivity(ctx, testActivityContext).Get(ctx, &result); err != nil { + return err + } + s.Equal("test-data", result) + return nil + } + + RegisterWorkflow(workflowFn) + env := s.NewTestWorkflowEnvironment() + + env.ExecuteWorkflow(workflowFn) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) +} + func (s *WorkflowTestSuiteUnitTest) Test_ActivityFullyQualifiedName() { // TODO (madhu): Add this back once test workflow environment is able to handle panics gracefully // Right now, the panic happens in a different goroutine and there is no way to catch it diff --git a/internal/workflow.go b/internal/workflow.go index 796de8243..90eb69efd 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -355,11 +355,15 @@ func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Fut return future } + // Retrieve headers from context to pass them on + header := getHeadersFromContext(ctx) + params := executeActivityParams{ activityOptions: *options, ActivityType: *activityType, Input: input, DataConverter: dataConverter, + Header: header, } ctxDone, cancellable := ctx.Done().(*channelImpl) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 3406880dc..89e10264b 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -68,6 +68,11 @@ type ( runFn func(args mock.Arguments) waitDuration func() time.Duration } + + // TestContextPropagator propagates test keys across a workflow + TestContextPropagator struct { + keys []string + } ) func newEncodedValues(values []byte, dc encoded.DataConverter) encoded.Values { @@ -109,6 +114,60 @@ func (b ErrorDetailsValues) HasValues() bool { return b != nil && len(b) != 0 } +// Inject injects values from context into headers for propagation +func (t *TestContextPropagator) Inject(ctx context.Context, writer HeaderWriter) error { + for _, key := range t.keys { + value, ok := ctx.Value(contextKey(key)).([]byte) + if !ok { + return fmt.Errorf("unable to extract key from context %v", key) + } + writer.Set(key, value) + } + return nil +} + +// InjectFromWorkflow injects values from context into headers for propagation +func (t *TestContextPropagator) InjectFromWorkflow(ctx Context, writer HeaderWriter) error { + for _, key := range t.keys { + value, ok := ctx.Value(contextKey(key)).([]byte) + if !ok { + return fmt.Errorf("unable to extract key from context %v", key) + } + writer.Set(key, value) + } + return nil +} + +// Extract extracts values from headers and puts them into context +func (t *TestContextPropagator) Extract(ctx context.Context, reader HeaderReader) (context.Context, error) { + if err := reader.ForEachKey(func(key string, value []byte) error { + for _, k := range t.keys { + if key == k { + ctx = context.WithValue(ctx, contextKey(key), value) + } + } + return nil + }); err != nil { + return nil, err + } + return ctx, nil +} + +// ExtractToWorkflow extracts values from headers and puts them into context +func (t *TestContextPropagator) ExtractToWorkflow(ctx Context, reader HeaderReader) (Context, error) { + if err := reader.ForEachKey(func(key string, value []byte) error { + for _, k := range t.keys { + if key == k { + ctx = WithValue(ctx, contextKey(key), value) + } + } + return nil + }); err != nil { + return nil, err + } + return ctx, nil +} + // NewTestWorkflowEnvironment creates a new instance of TestWorkflowEnvironment. Use the returned TestWorkflowEnvironment // to run your workflow in the test environment. func (s *WorkflowTestSuite) NewTestWorkflowEnvironment() *TestWorkflowEnvironment {