From 9ebe8ea71b635d88a9d3298f3dd6e1cb2627a811 Mon Sep 17 00:00:00 2001 From: Liang Mei Date: Wed, 4 Mar 2020 10:22:48 -0800 Subject: [PATCH 1/5] worker_options: expose poller count as a user option (#940) --- internal/internal_worker.go | 31 +++-- internal/internal_worker_interfaces_test.go | 18 +-- internal/internal_worker_test.go | 125 ++++++++++++++++++++ internal/internal_workers_test.go | 24 ++-- internal/worker.go | 12 ++ 5 files changed, 179 insertions(+), 31 deletions(-) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 2302b7a62..8b251bfe3 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -124,21 +124,24 @@ type ( // Task list name to poll. TaskList string - // Defines how many concurrent poll requests for the task list by this worker. - ConcurrentPollRoutineSize int - // Defines how many concurrent activity executions by this worker. ConcurrentActivityExecutionSize int // Defines rate limiting on number of activity tasks that can be executed per second per worker. WorkerActivitiesPerSecond float64 + // MaxConcurrentActivityPollers is the max number of pollers for activity task list + MaxConcurrentActivityPollers int + // Defines how many concurrent decision task executions by this worker. ConcurrentDecisionTaskExecutionSize int // Defines rate limiting on number of decision tasks that can be executed per second per worker. WorkerDecisionTasksPerSecond float64 + // MaxConcurrentDecisionPollers is the max number of pollers for decision task list + MaxConcurrentDecisionPollers int + // Defines how many concurrent local activity executions by this worker. ConcurrentLocalActivityExecutionSize int @@ -294,7 +297,7 @@ func newWorkflowTaskWorkerInternal( params, ) worker := newBaseWorker(baseWorkerOptions{ - pollerCount: params.ConcurrentPollRoutineSize, + pollerCount: params.MaxConcurrentDecisionPollers, pollerRate: defaultPollerRate, maxConcurrentTask: params.ConcurrentDecisionTaskExecutionSize, maxTaskPerSecond: params.WorkerDecisionTasksPerSecond, @@ -395,7 +398,7 @@ func newSessionWorker(service workflowserviceclient.Interface, params.TaskList = sessionEnvironment.GetResourceSpecificTasklist() activityWorker := newActivityWorker(service, domain, params, overrides, env, nil) - params.ConcurrentPollRoutineSize = 1 + params.MaxConcurrentActivityPollers = 1 params.TaskList = creationTasklist creationWorker := newActivityWorker(service, domain, params, overrides, env, sessionEnvironment.GetTokenBucket()) @@ -473,7 +476,7 @@ func newActivityTaskWorker( base := newBaseWorker( baseWorkerOptions{ - pollerCount: workerParams.ConcurrentPollRoutineSize, + pollerCount: workerParams.MaxConcurrentActivityPollers, pollerRate: defaultPollerRate, maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize, maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond, @@ -1117,9 +1120,7 @@ func (aw *aggregatedWorker) Stop() { aw.logger.Info("Stopped Worker") } -// aggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as -// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms. -// With 2 poller, we could achieve around 300~400 RPS. +// aggregatedWorker returns an instance to manage both activity and decision workers func newAggregatedWorker( service workflowserviceclient.Interface, domain string, @@ -1135,13 +1136,14 @@ func newAggregatedWorker( workerParams := workerExecutionParameters{ TaskList: taskList, - ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize, ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize, WorkerActivitiesPerSecond: wOptions.WorkerActivitiesPerSecond, + MaxConcurrentActivityPollers: wOptions.MaxConcurrentActivityTaskPollers, ConcurrentLocalActivityExecutionSize: wOptions.MaxConcurrentLocalActivityExecutionSize, WorkerLocalActivitiesPerSecond: wOptions.WorkerLocalActivitiesPerSecond, ConcurrentDecisionTaskExecutionSize: wOptions.MaxConcurrentDecisionTaskExecutionSize, WorkerDecisionTasksPerSecond: wOptions.WorkerDecisionTasksPerSecond, + MaxConcurrentDecisionPollers: wOptions.MaxConcurrentDecisionTaskPollers, Identity: wOptions.Identity, MetricsScope: wOptions.MetricsScope, Logger: wOptions.Logger, @@ -1253,7 +1255,8 @@ func processTestTags(wOptions *WorkerOptions, ep *workerExecutionParameters) { switch key { case workerOptionsConfigConcurrentPollRoutineSize: if size, err := strconv.Atoi(val); err == nil { - ep.ConcurrentPollRoutineSize = size + ep.MaxConcurrentActivityPollers = size + ep.MaxConcurrentDecisionPollers = size } } } @@ -1388,12 +1391,18 @@ func augmentWorkerOptions(options WorkerOptions) WorkerOptions { if options.WorkerActivitiesPerSecond == 0 { options.WorkerActivitiesPerSecond = defaultWorkerActivitiesPerSecond } + if options.MaxConcurrentActivityTaskPollers <= 0 { + options.MaxConcurrentActivityTaskPollers = defaultConcurrentPollRoutineSize + } if options.MaxConcurrentDecisionTaskExecutionSize == 0 { options.MaxConcurrentDecisionTaskExecutionSize = defaultMaxConcurrentTaskExecutionSize } if options.WorkerDecisionTasksPerSecond == 0 { options.WorkerDecisionTasksPerSecond = defaultWorkerTaskExecutionRate } + if options.MaxConcurrentDecisionTaskPollers <= 0 { + options.MaxConcurrentDecisionTaskPollers = defaultConcurrentPollRoutineSize + } if options.MaxConcurrentLocalActivityExecutionSize == 0 { options.MaxConcurrentLocalActivityExecutionSize = defaultMaxConcurrentLocalActivityExecutionSize } diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 0d37e6941..88c86ca9e 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -174,10 +174,11 @@ func (s *InterfacesTestSuite) TestInterface() { domain := "testDomain" // Workflow execution parameters. workflowExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 4, - Logger: logger, - Tracer: opentracing.NoopTracer{}, + TaskList: "testTaskList", + MaxConcurrentActivityPollers: 4, + MaxConcurrentDecisionPollers: 4, + Logger: logger, + Tracer: opentracing.NoopTracer{}, } domainStatus := m.DomainStatusRegistered @@ -204,10 +205,11 @@ func (s *InterfacesTestSuite) TestInterface() { // Create activity execution parameters. activityExecutionParameters := workerExecutionParameters{ - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 10, - Logger: logger, - Tracer: opentracing.NoopTracer{}, + TaskList: "testTaskList", + MaxConcurrentActivityPollers: 10, + MaxConcurrentDecisionPollers: 10, + Logger: logger, + Tracer: opentracing.NoopTracer{}, } // Register activity instances and launch the worker. diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 75ab39955..8d0482dec 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -31,9 +31,11 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/opentracing/opentracing-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/internal/common" @@ -1133,6 +1135,129 @@ func TestActivityNilArgs_WithDataConverter(t *testing.T) { require.Error(t, err) // testDataConverter cannot encode nil value } +func TestWorkerOptionDefaults(t *testing.T) { + domain := "worker-options-test" + taskList := "worker-options-tl" + worker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{}) + aggWorker, ok := worker.(*aggregatedWorker) + require.True(t, ok) + + decisionWorker, ok := aggWorker.workflowWorker.(*workflowWorker) + require.True(t, ok) + require.True(t, decisionWorker.executionParameters.Identity != "") + require.NotNil(t, decisionWorker.executionParameters.Logger) + require.NotNil(t, decisionWorker.executionParameters.MetricsScope) + require.Nil(t, decisionWorker.executionParameters.ContextPropagators) + + expected := workerExecutionParameters{ + TaskList: taskList, + MaxConcurrentActivityPollers: defaultConcurrentPollRoutineSize, + MaxConcurrentDecisionPollers: defaultConcurrentPollRoutineSize, + ConcurrentLocalActivityExecutionSize: defaultMaxConcurrentLocalActivityExecutionSize, + ConcurrentActivityExecutionSize: defaultMaxConcurrentActivityExecutionSize, + ConcurrentDecisionTaskExecutionSize: defaultMaxConcurrentTaskExecutionSize, + WorkerActivitiesPerSecond: defaultTaskListActivitiesPerSecond, + WorkerDecisionTasksPerSecond: defaultWorkerTaskExecutionRate, + TaskListActivitiesPerSecond: defaultTaskListActivitiesPerSecond, + WorkerLocalActivitiesPerSecond: defaultWorkerLocalActivitiesPerSecond, + StickyScheduleToStartTimeout: stickyDecisionScheduleToStartTimeoutSeconds * time.Second, + DataConverter: getDefaultDataConverter(), + Tracer: opentracing.NoopTracer{}, + Logger: decisionWorker.executionParameters.Logger, + MetricsScope: decisionWorker.executionParameters.MetricsScope, + Identity: decisionWorker.executionParameters.Identity, + UserContext: decisionWorker.executionParameters.UserContext, + } + + assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters) + + activityWorker, ok := aggWorker.activityWorker.(*activityWorker) + require.True(t, ok) + require.True(t, activityWorker.executionParameters.Identity != "") + require.NotNil(t, activityWorker.executionParameters.Logger) + require.NotNil(t, activityWorker.executionParameters.MetricsScope) + require.Nil(t, activityWorker.executionParameters.ContextPropagators) + assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters) +} + +func TestWorkerOptionNonDefaults(t *testing.T) { + domain := "worker-options-test" + taskList := "worker-options-tl" + + options := WorkerOptions{ + Identity: "143@worker-options-test-1", + TaskListActivitiesPerSecond: 8888, + MaxConcurrentSessionExecutionSize: 3333, + MaxConcurrentDecisionTaskExecutionSize: 2222, + MaxConcurrentActivityExecutionSize: 1111, + MaxConcurrentLocalActivityExecutionSize: 101, + MaxConcurrentDecisionTaskPollers: 11, + MaxConcurrentActivityTaskPollers: 12, + WorkerLocalActivitiesPerSecond: 222, + WorkerDecisionTasksPerSecond: 111, + WorkerActivitiesPerSecond: 99, + StickyScheduleToStartTimeout: 555 * time.Minute, + DataConverter: &defaultDataConverter{}, + BackgroundActivityContext: context.Background(), + Logger: zap.NewNop(), + MetricsScope: tally.NoopScope, + Tracer: opentracing.NoopTracer{}, + } + + worker := newAggregatedWorker(nil, domain, taskList, options) + aggWorker, ok := worker.(*aggregatedWorker) + require.True(t, ok) + + decisionWorker, ok := aggWorker.workflowWorker.(*workflowWorker) + require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0) + require.True(t, ok) + + expected := workerExecutionParameters{ + TaskList: taskList, + MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers, + MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers, + ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize, + ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize, + ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize, + WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond, + WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond, + TaskListActivitiesPerSecond: options.TaskListActivitiesPerSecond, + WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond, + StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout, + DataConverter: options.DataConverter, + Tracer: options.Tracer, + Logger: options.Logger, + MetricsScope: options.MetricsScope, + Identity: options.Identity, + } + + assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters) + + activityWorker, ok := aggWorker.activityWorker.(*activityWorker) + require.True(t, ok) + require.True(t, len(activityWorker.executionParameters.ContextPropagators) > 0) + assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters) +} + +func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) { + require.Equal(t, paramsA.TaskList, paramsA.TaskList) + require.Equal(t, paramsA.Identity, paramsB.Identity) + require.Equal(t, paramsA.DataConverter, paramsB.DataConverter) + require.Equal(t, paramsA.Tracer, paramsB.Tracer) + require.Equal(t, paramsA.ConcurrentLocalActivityExecutionSize, paramsB.ConcurrentLocalActivityExecutionSize) + require.Equal(t, paramsA.ConcurrentActivityExecutionSize, paramsB.ConcurrentActivityExecutionSize) + require.Equal(t, paramsA.ConcurrentDecisionTaskExecutionSize, paramsB.ConcurrentDecisionTaskExecutionSize) + require.Equal(t, paramsA.WorkerActivitiesPerSecond, paramsB.WorkerActivitiesPerSecond) + require.Equal(t, paramsA.WorkerDecisionTasksPerSecond, paramsB.WorkerDecisionTasksPerSecond) + require.Equal(t, paramsA.TaskListActivitiesPerSecond, paramsB.TaskListActivitiesPerSecond) + require.Equal(t, paramsA.StickyScheduleToStartTimeout, paramsB.StickyScheduleToStartTimeout) + require.Equal(t, paramsA.MaxConcurrentDecisionPollers, paramsB.MaxConcurrentDecisionPollers) + require.Equal(t, paramsA.MaxConcurrentActivityPollers, paramsB.MaxConcurrentActivityPollers) + require.Equal(t, paramsA.NonDeterministicWorkflowPolicy, paramsB.NonDeterministicWorkflowPolicy) + require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay) + require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution) +} + /* var testWorkflowID1 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID"), RunId: common.StringPtr("runID")} var testWorkflowID2 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID2"), RunId: common.StringPtr("runID2")} diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index db7104685..ac0676dee 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -94,11 +94,11 @@ func (s *WorkersTestSuite) TestWorkflowWorker() { ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 5, - Logger: logger, - UserContext: ctx, - UserContextCancel: cancel, + TaskList: "testTaskList", + MaxConcurrentDecisionPollers: 5, + Logger: logger, + UserContext: ctx, + UserContextCancel: cancel, } overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()} workflowWorker := newWorkflowWorkerInternal( @@ -119,9 +119,9 @@ func (s *WorkersTestSuite) TestActivityWorker() { s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions...).Return(nil).AnyTimes() executionParameters := workerExecutionParameters{ - TaskList: "testTaskList", - ConcurrentPollRoutineSize: 5, - Logger: logger, + TaskList: "testTaskList", + MaxConcurrentActivityPollers: 5, + Logger: logger, } overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler()} a := &greeterActivity{} @@ -163,7 +163,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() { ctx, cancel := context.WithCancel(context.Background()) executionParameters := workerExecutionParameters{ TaskList: "testTaskList", - ConcurrentPollRoutineSize: 5, + MaxConcurrentActivityPollers: 5, ConcurrentActivityExecutionSize: 2, Logger: logger, UserContext: ctx, @@ -202,9 +202,9 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() { s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes() executionParameters := workerExecutionParameters{ - TaskList: "testDecisionTaskList", - ConcurrentPollRoutineSize: 5, - Logger: zap.NewNop(), + TaskList: "testDecisionTaskList", + MaxConcurrentDecisionPollers: 5, + Logger: zap.NewNop(), } overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()} workflowWorker := newWorkflowWorkerInternal( diff --git a/internal/worker.go b/internal/worker.go index 9415c6f55..6483afb4d 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -92,6 +92,12 @@ type ( // The zero value of this uses the default value. Default: 100k TaskListActivitiesPerSecond float64 + // optional: Sets the maximum number of goroutines that will concurrently poll the + // cadence-server to retrieve activity tasks. Changing this value will affect the + // rate at which the worker is able to consume tasks from a task list. + // Default value is 2 + MaxConcurrentActivityTaskPollers int + // Optional: To set the maximum concurrent decision task executions this worker can have. // The zero value of this uses the default value. // default: defaultMaxConcurrentTaskExecutionSize(1k) @@ -102,6 +108,12 @@ type ( // The zero value of this uses the default value. Default: 100k WorkerDecisionTasksPerSecond float64 + // optional: Sets the maximum number of goroutines that will concurrently poll the + // cadence-server to retrieve decision tasks. Changing this value will affect the + // rate at which the worker is able to consume tasks from a task list. + // Default value is 2 + MaxConcurrentDecisionTaskPollers int + // Optional: if the activities need auto heart beating for those activities // by the framework // default: false not to heartbeat. From 781a033008d54261a6f7c3b89df7e9e65b159d57 Mon Sep 17 00:00:00 2001 From: Venkat Date: Thu, 5 Mar 2020 07:31:02 -0800 Subject: [PATCH 2/5] Add missing info population in the activity info. (#936) (#937) --- internal/internal_task_pollers.go | 6 ++++++ internal/internal_workflow_testsuite.go | 8 ++++---- internal/internal_workflow_testsuite_test.go | 5 ++++- internal/workflow_testsuite.go | 2 +- test/workflow_test.go | 6 ++++++ 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 17e83a417..123840ba9 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -464,7 +464,12 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi rootCtx = context.Background() } + workflowTypeLocal := task.params.WorkflowInfo.WorkflowType + ctx := context.WithValue(rootCtx, activityEnvContextKey, &activityEnvironment{ + workflowType: &workflowTypeLocal, + workflowDomain: task.params.WorkflowInfo.Domain, + taskList: task.params.WorkflowInfo.TaskListName, activityType: ActivityType{Name: activityType}, activityID: fmt.Sprintf("%v", task.activityID), workflowExecution: task.params.WorkflowInfo.WorkflowExecution, @@ -505,6 +510,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi // this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout deadline = task.expireTime } + ctx, cancel := context.WithDeadline(ctx, deadline) task.Lock() if task.canceled { diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index b35f14073..b88587951 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -537,7 +537,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( func (env *testWorkflowEnvironmentImpl) executeLocalActivity( activityFn interface{}, args ...interface{}, -) (Value, error) { +) (val Value, result *localActivityResult, err error) { params := executeLocalActivityParams{ localActivityOptions: localActivityOptions{ ScheduleToCloseTimeoutSeconds: common.Int32Ceil(env.testTimeout.Seconds()), @@ -559,11 +559,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity( tracer: opentracing.NoopTracer{}, } - result := taskHandler.executeLocalActivityTask(task) + result = taskHandler.executeLocalActivityTask(task) if result.err != nil { - return nil, result.err + return nil, nil, result.err } - return newEncodedValue(result.result, env.GetDataConverter()), nil + return newEncodedValue(result.result, env.GetDataConverter()), result, nil } func (env *testWorkflowEnvironmentImpl) startDecisionTask() { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index f51f07279..04bf27861 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1616,8 +1616,11 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivity() { } env := s.NewTestActivityEnvironment() - result, err := env.ExecuteLocalActivity(localActivityFn, "local_activity") + result, localActivity, err := env.ExecuteLocalActivity(localActivityFn, "local_activity") s.NoError(err) + s.Equal(WorkflowType{Name: workflowTypeNotSpecified}, localActivity.task.params.WorkflowInfo.WorkflowType) + s.Equal(defaultTestDomain, localActivity.task.params.WorkflowInfo.Domain) + s.Equal(defaultTestTaskList, localActivity.task.params.WorkflowInfo.TaskListName) var laResult string err = result.Get(&laResult) s.NoError(err) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 82bad96cd..ca4fe1535 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -159,7 +159,7 @@ func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args . // ExecuteLocalActivity executes a local activity. The tested activity will be executed synchronously in the calling goroutinue. // Caller should use Value.Get() to extract strong typed result value. -func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (Value, error) { +func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, result *localActivityResult, err error) { return t.impl.executeLocalActivity(activityFn, args...) } diff --git a/test/workflow_test.go b/test/workflow_test.go index 26d8977e3..2e5a263d7 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -427,6 +427,12 @@ func (w *Workflows) ConsistentQueryWorkflow(ctx workflow.Context, delay time.Dur laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ ScheduleToCloseTimeout: 5 * time.Second, }) + + workflowInfo := internal.GetWorkflowInfo(laCtx) + if &workflowInfo.WorkflowType == nil { + return errors.New("failed to get work flow type") + } + workflow.ExecuteLocalActivity(laCtx, LocalSleep, delay).Get(laCtx, nil) queryResult = signalData return nil From f1b2fc7fcd05da9f613d71b511121975b6efedab Mon Sep 17 00:00:00 2001 From: Venkat Date: Fri, 6 Mar 2020 16:10:49 -0800 Subject: [PATCH 3/5] #936 followup: remove breaking test framework change, add integration test (#948) --- internal/internal_workflow_testsuite.go | 8 +++--- internal/internal_workflow_testsuite_test.go | 5 +--- internal/workflow_testsuite.go | 2 +- test/activity_test.go | 17 ++++++++++++ test/integration_test.go | 10 +++++++ test/workflow_test.go | 29 ++++++++++++++++++++ 6 files changed, 62 insertions(+), 9 deletions(-) diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index b88587951..ff01826c3 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -537,7 +537,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( func (env *testWorkflowEnvironmentImpl) executeLocalActivity( activityFn interface{}, args ...interface{}, -) (val Value, result *localActivityResult, err error) { +) (val Value, err error) { params := executeLocalActivityParams{ localActivityOptions: localActivityOptions{ ScheduleToCloseTimeoutSeconds: common.Int32Ceil(env.testTimeout.Seconds()), @@ -559,11 +559,11 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity( tracer: opentracing.NoopTracer{}, } - result = taskHandler.executeLocalActivityTask(task) + result := taskHandler.executeLocalActivityTask(task) if result.err != nil { - return nil, nil, result.err + return nil, result.err } - return newEncodedValue(result.result, env.GetDataConverter()), result, nil + return newEncodedValue(result.result, env.GetDataConverter()), nil } func (env *testWorkflowEnvironmentImpl) startDecisionTask() { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 04bf27861..f51f07279 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1616,11 +1616,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_LocalActivity() { } env := s.NewTestActivityEnvironment() - result, localActivity, err := env.ExecuteLocalActivity(localActivityFn, "local_activity") + result, err := env.ExecuteLocalActivity(localActivityFn, "local_activity") s.NoError(err) - s.Equal(WorkflowType{Name: workflowTypeNotSpecified}, localActivity.task.params.WorkflowInfo.WorkflowType) - s.Equal(defaultTestDomain, localActivity.task.params.WorkflowInfo.Domain) - s.Equal(defaultTestTaskList, localActivity.task.params.WorkflowInfo.TaskListName) var laResult string err = result.Get(&laResult) s.NoError(err) diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index ca4fe1535..d6e6e8159 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -159,7 +159,7 @@ func (t *TestActivityEnvironment) ExecuteActivity(activityFn interface{}, args . // ExecuteLocalActivity executes a local activity. The tested activity will be executed synchronously in the calling goroutinue. // Caller should use Value.Get() to extract strong typed result value. -func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, result *localActivityResult, err error) { +func (t *TestActivityEnvironment) ExecuteLocalActivity(activityFn interface{}, args ...interface{}) (val Value, err error) { return t.impl.executeLocalActivity(activityFn, args...) } diff --git a/test/activity_test.go b/test/activity_test.go index 3eb1cf988..61da8f0ab 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -22,6 +22,7 @@ package test import ( "context" + "fmt" "strings" "sync" "time" @@ -88,6 +89,21 @@ func (a *Activities) Fail(ctx context.Context) error { return errFailOnPurpose } +func (a *Activities) InspectActivityInfo(ctx context.Context, domain, taskList, wfType string) error { + a.append("inspectActivityInfo") + info := activity.GetInfo(ctx) + if info.WorkflowDomain != domain { + return fmt.Errorf("expected domainName %v but got %v", domain, info.WorkflowDomain) + } + if info.WorkflowType == nil || info.WorkflowType.Name != wfType { + return fmt.Errorf("expected workflowType %v but got %v", wfType, info.WorkflowType) + } + if info.TaskList != taskList { + return fmt.Errorf("expected taskList %v but got %v", taskList, info.TaskList) + } + return nil +} + func (a *Activities) append(name string) { a.Lock() defer a.Unlock() @@ -118,4 +134,5 @@ func (a *Activities) register() { activity.RegisterWithOptions(a.HeartbeatAndSleep, activity.RegisterOptions{Name: "heartbeatAndSleep"}) activity.RegisterWithOptions(a.GetMemoAndSearchAttr, activity.RegisterOptions{Name: "getMemoAndSearchAttr"}) activity.RegisterWithOptions(a.RetryTimeoutStableErrorActivity, activity.RegisterOptions{Name: "retryTimeoutStableErrorActivity"}) + activity.RegisterWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"}) } diff --git a/test/integration_test.go b/test/integration_test.go index b0a8301d8..ce1f99c6a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -401,6 +401,16 @@ func (ts *IntegrationTestSuite) TestLargeQueryResultError() { ts.Nil(value) } +func (ts *IntegrationTestSuite) TestInspectActivityInfo() { + err := ts.executeWorkflow("test-activity-info", ts.workflows.InspectActivityInfo, nil) + ts.Nil(err) +} + +func (ts *IntegrationTestSuite) TestInspectLocalActivityInfo() { + err := ts.executeWorkflow("test-local-activity-info", ts.workflows.InspectLocalActivityInfo, nil) + ts.Nil(err) +} + func (ts *IntegrationTestSuite) registerDomain() { client := client.NewDomainClient(ts.rpcClient.Interface, &client.Options{}) ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) diff --git a/test/workflow_test.go b/test/workflow_test.go index 2e5a263d7..10aad0d24 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -494,6 +494,26 @@ func (w *Workflows) sleep(ctx workflow.Context, d time.Duration) error { return workflow.ExecuteActivity(ctx, "sleep", d).Get(ctx, nil) } +func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error { + info := workflow.GetInfo(ctx) + domain := info.Domain + wfType := info.WorkflowType.Name + taskList := info.TaskListName + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + return workflow.ExecuteActivity(ctx, "inspectActivityInfo", domain, taskList, wfType).Get(ctx, nil) +} + +func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { + info := workflow.GetInfo(ctx) + domain := info.Domain + wfType := info.WorkflowType.Name + taskList := info.TaskListName + ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions()) + activites := Activities{} + return workflow.ExecuteLocalActivity( + ctx, activites.InspectActivityInfo, domain, taskList, wfType).Get(ctx, nil) +} + func (w *Workflows) register() { workflow.Register(w.Basic) workflow.Register(w.ActivityRetryOnError) @@ -508,6 +528,8 @@ func (w *Workflows) register() { workflow.Register(w.ChildWorkflowSuccess) workflow.Register(w.ChildWorkflowSuccessWithParentClosePolicyTerminate) workflow.Register(w.ChildWorkflowSuccessWithParentClosePolicyAbandon) + workflow.Register(w.InspectActivityInfo) + workflow.Register(w.InspectLocalActivityInfo) workflow.Register(w.sleep) workflow.Register(w.child) workflow.Register(w.childForMemoAndSearchAttr) @@ -525,6 +547,13 @@ func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions { StartToCloseTimeout: 9 * time.Second, } } + +func (w *Workflows) defaultLocalActivityOptions() workflow.LocalActivityOptions { + return workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: 5 * time.Second, + } +} + func (w *Workflows) defaultActivityOptionsWithRetry() workflow.ActivityOptions { return workflow.ActivityOptions{ ScheduleToStartTimeout: 5 * time.Second, From 5d824c9068fd27a1fa106d0a4cef89a165281fc2 Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Mon, 9 Mar 2020 19:43:13 -0400 Subject: [PATCH 4/5] Adding IsWorkflowExecutionAlreadyStartedError helper (#946) Co-authored-by: Liang Mei --- error.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/error.go b/error.go index ee73fb4f8..cd67e9488 100644 --- a/error.go +++ b/error.go @@ -21,6 +21,7 @@ package cadence import ( + "go.uber.org/cadence/.gen/go/shared" "go.uber.org/cadence/internal" "go.uber.org/cadence/workflow" ) @@ -54,6 +55,12 @@ func IsCustomError(err error) bool { return ok } +// IsWorkflowExecutionAlreadyStartedError return if the err is a WorkflowExecutionAlreadyStartedError +func IsWorkflowExecutionAlreadyStartedError(err error) bool { + _, ok := err.(*shared.WorkflowExecutionAlreadyStartedError) + return ok +} + // IsCanceledError return if the err is a CanceledError func IsCanceledError(err error) bool { _, ok := err.(*CanceledError) From 2055637e4926d768a89d0d2973a2dccebb7d1e19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jes=C3=BAs=20Garc=C3=ADa=20Crespo?= Date: Wed, 11 Mar 2020 10:57:45 -0700 Subject: [PATCH 5/5] Delete unused worker option AutoHeartBeat (#941) Co-authored-by: Liang Mei --- internal/worker.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/worker.go b/internal/worker.go index 6483afb4d..d15a8e775 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -114,11 +114,6 @@ type ( // Default value is 2 MaxConcurrentDecisionTaskPollers int - // Optional: if the activities need auto heart beating for those activities - // by the framework - // default: false not to heartbeat. - AutoHeartBeat bool - // Optional: Sets an identify that can be used to track this host for debugging. // default: default identity that include hostname, groupName and process ID. Identity string