Skip to content

Commit

Permalink
add more test
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan committed May 3, 2019
1 parent 476ac18 commit cde9c84
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 2 deletions.
17 changes: 16 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions internal/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type headerReader struct {
}

func (hr *headerReader) ForEachKey(handler func(string, []byte) error) error {
if hr.header == nil {
return nil
}
for key, value := range hr.header.Fields {
if err := handler(key, value); err != nil {
return err
Expand All @@ -77,10 +80,16 @@ type headerWriter struct {
}

func (hw *headerWriter) Set(key string, value []byte) {
if hw.header == nil {
return
}
hw.header.Fields[key] = value
}

// NewHeaderWriter returns a header writer interface
func NewHeaderWriter(header *shared.Header) HeaderWriter {
if header != nil && header.Fields == nil {
header.Fields = make(map[string][]byte)
}
return &headerWriter{header}
}
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type (
ActivityType ActivityType
Input []byte
DataConverter encoded.DataConverter
Header *shared.Header
}

executeLocalActivityParams struct {
Expand Down
1 change: 1 addition & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar
scheduleTaskAttr.ScheduleToStartTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToStartTimeoutSeconds)
scheduleTaskAttr.HeartbeatTimeoutSeconds = common.Int32Ptr(parameters.HeartbeatTimeoutSeconds)
scheduleTaskAttr.RetryPolicy = parameters.RetryPolicy
scheduleTaskAttr.Header = parameters.Header

decision := wc.decisionsHelper.scheduleActivityTask(scheduleTaskAttr)
decision.setData(&scheduledActivity{
Expand Down
17 changes: 17 additions & 0 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/cadence/.gen/go/shared"
s "go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/encoded"
"go.uber.org/cadence/internal/common"
"go.uber.org/cadence/internal/common/metrics"
Expand Down Expand Up @@ -1172,6 +1173,22 @@ func getDataConverterFromWorkflowContext(ctx Context) encoded.DataConverter {
return options.dataConverter
}

func getContextPropagatorsFromWorkflowContext(ctx Context) []ContextPropagator {
options := getWorkflowEnvOptions(ctx)
return options.contextPropagators
}

func getHeadersFromContext(ctx Context) *shared.Header {
header := &s.Header{
Fields: make(map[string][]byte),
}
contextPropagators := getContextPropagatorsFromWorkflowContext(ctx)
for _, ctxProp := range contextPropagators {
ctxProp.InjectFromWorkflow(ctx, NewHeaderWriter(header))
}
return header
}

// getSignalChannel finds the associated channel for the signal.
func (w *workflowOptions) getSignalChannel(ctx Context, signalName string) Channel {
if ch, ok := w.signalChannels[signalName]; ok {
Expand Down
30 changes: 30 additions & 0 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package internal
import (
"context"
"errors"
"fmt"
"log"
"os"
"testing"
Expand Down Expand Up @@ -748,6 +749,35 @@ func (s *workflowClientTestSuite) TestStartWorkflow() {
s.Equal(createResponse.GetRunId(), resp.RunID)
}

func (s *workflowClientTestSuite) TestStartWorkflow_WithContext() {
s.client = NewClient(s.service, domain, &ClientOptions{ContextPropagators: []ContextPropagator{&TestContextPropagator{keys: []string{testHeader}}}})
client, ok := s.client.(*workflowClient)
s.True(ok)
options := StartWorkflowOptions{
ID: workflowID,
TaskList: tasklist,
ExecutionStartToCloseTimeout: timeoutInSeconds,
DecisionTaskStartToCloseTimeout: timeoutInSeconds,
}
f1 := func(ctx Context, r []byte) error {
value := ctx.Value(contextKey(testHeader))
if val, ok := value.([]byte); ok {
s.Equal("test-data", string(val))
return nil
}
return fmt.Errorf("context did not propagate to workflow")
}

createResponse := &shared.StartWorkflowExecutionResponse{
RunId: common.StringPtr(runID),
}
s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil)

resp, err := client.StartWorkflow(context.Background(), options, f1, []byte("test"))
s.Nil(err)
s.Equal(createResponse.GetRunId(), resp.RunID)
}

func (s *workflowClientTestSuite) TestStartWorkflow_WithDataConverter() {
dc := newTestDataConverter()
s.client = NewClient(s.service, domain, &ClientOptions{DataConverter: dc})
Expand Down
15 changes: 14 additions & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
defaultTestWorkflowTypeName = "default-test-workflow-type-name"
defaultTestDomainName = "default-test-domain-name"
workflowTypeNotSpecified = "workflow-type-not-specified"
testHeader = "test-header"
)

type (
Expand Down Expand Up @@ -283,6 +284,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite) *testWorkflowEnvironme
if env.workerOptions.DataConverter == nil {
env.workerOptions.DataConverter = getDefaultDataConverter()
}
env.workerOptions.ContextPropagators = append(env.workerOptions.ContextPropagators, &TestContextPropagator{keys: []string{testHeader}})

return env
}
Expand Down Expand Up @@ -404,11 +406,16 @@ func (env *testWorkflowEnvironmentImpl) executeWorkflowInternal(delayStart time.
panic(err)
}
env.workflowDef = workflowDefinition
header := &shared.Header{
Fields: map[string][]byte{
"test-header": []byte("test-data"),
},
}
// env.workflowDef.Execute() method will execute dispatcher. We want the dispatcher to only run in main loop.
// In case of child workflow, this executeWorkflowInternal() is run in separate goroutinue, so use postCallback
// to make sure workflowDef.Execute() is run in main loop.
env.postCallback(func() {
env.workflowDef.Execute(env, &shared.Header{}, input)
env.workflowDef.Execute(env, header, input)
// kick off first decision task to start the workflow
if delayStart == 0 {
env.startDecisionTask()
Expand Down Expand Up @@ -463,6 +470,11 @@ func (env *testWorkflowEnvironmentImpl) executeActivity(
},
ActivityType: *activityType,
Input: input,
Header: &shared.Header{
Fields: map[string][]byte{
testHeader: []byte("test-data"),
},
},
}

task := newTestActivityTask(
Expand Down Expand Up @@ -1475,6 +1487,7 @@ func newTestActivityTask(workflowID, runID, activityID, workflowTypeName, domain
Name: common.StringPtr(workflowTypeName),
},
WorkflowDomain: common.StringPtr(domainName),
Header: params.Header,
}
return task
}
Expand Down
80 changes: 80 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() {
ScheduleToCloseTimeout: time.Second * 3,
}
RegisterWorkflowWithOptions(testWorkflowHello, RegisterWorkflowOptions{Name: "testWorkflowHello"})
RegisterWorkflow(testWorkflowContext)
RegisterWorkflow(testWorkflowHeartbeat)
RegisterActivityWithOptions(testActivityHello, RegisterActivityOptions{Name: "testActivityHello"})
RegisterActivity(testActivityContext)
RegisterActivity(testActivityHeartbeat)
}

Expand Down Expand Up @@ -362,6 +364,28 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithUserContext() {
s.Equal(testValue, value)
}

func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithHeaderContext() {
workerOptions := WorkerOptions{}

// inline activity using value passing through user context.
activityWithUserContext := func(ctx context.Context) (string, error) {
value := ctx.Value(contextKey(testHeader))
if val, ok := value.([]byte); ok {
return string(val), nil
}
return "", errors.New("value not found from ctx")
}
RegisterActivity(activityWithUserContext)

env := s.NewTestActivityEnvironment()
env.SetWorkerOptions(workerOptions)
blob, err := env.ExecuteActivity(activityWithUserContext)
s.NoError(err)
var value string
blob.Get(&value)
s.Equal("test-data", value)
}

func (s *WorkflowTestSuiteUnitTest) Test_CompleteActivity() {
env := s.NewTestWorkflowEnvironment()
var activityInfo ActivityInfo
Expand Down Expand Up @@ -438,10 +462,26 @@ func testWorkflowHello(ctx Context) (string, error) {
return result, nil
}

func testWorkflowContext(ctx Context) (string, error) {
value := ctx.Value(contextKey(testHeader))
if val, ok := value.([]byte); ok {
return string(val), nil
}
return "", fmt.Errorf("context did not propagate to workflow")
}

func testActivityHello(ctx context.Context, msg string) (string, error) {
return "hello" + "_" + msg, nil
}

func testActivityContext(ctx context.Context) (string, error) {
value := ctx.Value(contextKey(testHeader))
if val, ok := value.([]byte); ok {
return string(val), nil
}
return "", fmt.Errorf("context did not propagate to workflow")
}

func testWorkflowHeartbeat(ctx Context, msg string, waitTime time.Duration) (string, error) {
ao := ActivityOptions{
ScheduleToStartTimeout: time.Minute,
Expand Down Expand Up @@ -1305,6 +1345,46 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowFriendlyName() {
s.Equal("testWorkflowHello", called[1])
}

func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() {

workflowFn := func(ctx Context) error {
value := ctx.Value(contextKey(testHeader))
if val, ok := value.([]byte); ok {
s.Equal("test-data", string(val))
} else {
return fmt.Errorf("context did not propagate to workflow")
}

cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */}
ctx = WithChildWorkflowOptions(ctx, cwo)
var result string
if err := ExecuteChildWorkflow(ctx, testWorkflowContext).Get(ctx, &result); err != nil {
return err
}
s.Equal("test-data", result)

ao := ActivityOptions{
ScheduleToStartTimeout: time.Minute,
StartToCloseTimeout: time.Minute,
HeartbeatTimeout: 20 * time.Second,
}
ctx = WithActivityOptions(ctx, ao)
if err := ExecuteActivity(ctx, testActivityContext).Get(ctx, &result); err != nil {
return err
}
s.Equal("test-data", result)
return nil
}

RegisterWorkflow(workflowFn)
env := s.NewTestWorkflowEnvironment()

env.ExecuteWorkflow(workflowFn)

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())
}

func (s *WorkflowTestSuiteUnitTest) Test_ActivityFullyQualifiedName() {
// TODO (madhu): Add this back once test workflow environment is able to handle panics gracefully
// Right now, the panic happens in a different goroutine and there is no way to catch it
Expand Down
4 changes: 4 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,15 @@ func ExecuteActivity(ctx Context, activity interface{}, args ...interface{}) Fut
return future
}

// Retrieve headers from context to pass them on
header := getHeadersFromContext(ctx)

params := executeActivityParams{
activityOptions: *options,
ActivityType: *activityType,
Input: input,
DataConverter: dataConverter,
Header: header,
}

ctxDone, cancellable := ctx.Done().(*channelImpl)
Expand Down
Loading

0 comments on commit cde9c84

Please sign in to comment.