Skip to content

Commit

Permalink
Merge branch 'master' into logfix
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Mar 11, 2020
2 parents e6ac6b3 + 2055637 commit 0c34abf
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 37 deletions.
7 changes: 7 additions & 0 deletions error.go
Expand Up @@ -21,6 +21,7 @@
package cadence

import (
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal"
"go.uber.org/cadence/workflow"
)
Expand Down Expand Up @@ -54,6 +55,12 @@ func IsCustomError(err error) bool {
return ok
}

// IsWorkflowExecutionAlreadyStartedError return if the err is a WorkflowExecutionAlreadyStartedError
func IsWorkflowExecutionAlreadyStartedError(err error) bool {
_, ok := err.(*shared.WorkflowExecutionAlreadyStartedError)
return ok
}

// IsCanceledError return if the err is a CanceledError
func IsCanceledError(err error) bool {
_, ok := err.(*CanceledError)
Expand Down
6 changes: 6 additions & 0 deletions internal/internal_task_pollers.go
Expand Up @@ -464,7 +464,12 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
rootCtx = context.Background()
}

workflowTypeLocal := task.params.WorkflowInfo.WorkflowType

ctx := context.WithValue(rootCtx, activityEnvContextKey, &activityEnvironment{
workflowType: &workflowTypeLocal,
workflowDomain: task.params.WorkflowInfo.Domain,
taskList: task.params.WorkflowInfo.TaskListName,
activityType: ActivityType{Name: activityType},
activityID: fmt.Sprintf("%v", task.activityID),
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
Expand Down Expand Up @@ -505,6 +510,7 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}

ctx, cancel := context.WithDeadline(ctx, deadline)
task.Lock()
if task.canceled {
Expand Down
31 changes: 20 additions & 11 deletions internal/internal_worker.go
Expand Up @@ -124,21 +124,24 @@ type (
// Task list name to poll.
TaskList string

// Defines how many concurrent poll requests for the task list by this worker.
ConcurrentPollRoutineSize int

// Defines how many concurrent activity executions by this worker.
ConcurrentActivityExecutionSize int

// Defines rate limiting on number of activity tasks that can be executed per second per worker.
WorkerActivitiesPerSecond float64

// MaxConcurrentActivityPollers is the max number of pollers for activity task list
MaxConcurrentActivityPollers int

// Defines how many concurrent decision task executions by this worker.
ConcurrentDecisionTaskExecutionSize int

// Defines rate limiting on number of decision tasks that can be executed per second per worker.
WorkerDecisionTasksPerSecond float64

// MaxConcurrentDecisionPollers is the max number of pollers for decision task list
MaxConcurrentDecisionPollers int

// Defines how many concurrent local activity executions by this worker.
ConcurrentLocalActivityExecutionSize int

Expand Down Expand Up @@ -294,7 +297,7 @@ func newWorkflowTaskWorkerInternal(
params,
)
worker := newBaseWorker(baseWorkerOptions{
pollerCount: params.ConcurrentPollRoutineSize,
pollerCount: params.MaxConcurrentDecisionPollers,
pollerRate: defaultPollerRate,
maxConcurrentTask: params.ConcurrentDecisionTaskExecutionSize,
maxTaskPerSecond: params.WorkerDecisionTasksPerSecond,
Expand Down Expand Up @@ -395,7 +398,7 @@ func newSessionWorker(service workflowserviceclient.Interface,
params.TaskList = sessionEnvironment.GetResourceSpecificTasklist()
activityWorker := newActivityWorker(service, domain, params, overrides, env, nil)

params.ConcurrentPollRoutineSize = 1
params.MaxConcurrentActivityPollers = 1
params.TaskList = creationTasklist
creationWorker := newActivityWorker(service, domain, params, overrides, env, sessionEnvironment.GetTokenBucket())

Expand Down Expand Up @@ -473,7 +476,7 @@ func newActivityTaskWorker(

base := newBaseWorker(
baseWorkerOptions{
pollerCount: workerParams.ConcurrentPollRoutineSize,
pollerCount: workerParams.MaxConcurrentActivityPollers,
pollerRate: defaultPollerRate,
maxConcurrentTask: workerParams.ConcurrentActivityExecutionSize,
maxTaskPerSecond: workerParams.WorkerActivitiesPerSecond,
Expand Down Expand Up @@ -1117,9 +1120,7 @@ func (aw *aggregatedWorker) Stop() {
aw.logger.Info("Stopped Worker")
}

// aggregatedWorker returns an instance to manage the workers. Use defaultConcurrentPollRoutineSize (which is 2) as
// poller size. The typical RTT (round-trip time) is below 1ms within data center. And the poll API latency is about 5ms.
// With 2 poller, we could achieve around 300~400 RPS.
// aggregatedWorker returns an instance to manage both activity and decision workers
func newAggregatedWorker(
service workflowserviceclient.Interface,
domain string,
Expand All @@ -1135,13 +1136,14 @@ func newAggregatedWorker(

workerParams := workerExecutionParameters{
TaskList: taskList,
ConcurrentPollRoutineSize: defaultConcurrentPollRoutineSize,
ConcurrentActivityExecutionSize: wOptions.MaxConcurrentActivityExecutionSize,
WorkerActivitiesPerSecond: wOptions.WorkerActivitiesPerSecond,
MaxConcurrentActivityPollers: wOptions.MaxConcurrentActivityTaskPollers,
ConcurrentLocalActivityExecutionSize: wOptions.MaxConcurrentLocalActivityExecutionSize,
WorkerLocalActivitiesPerSecond: wOptions.WorkerLocalActivitiesPerSecond,
ConcurrentDecisionTaskExecutionSize: wOptions.MaxConcurrentDecisionTaskExecutionSize,
WorkerDecisionTasksPerSecond: wOptions.WorkerDecisionTasksPerSecond,
MaxConcurrentDecisionPollers: wOptions.MaxConcurrentDecisionTaskPollers,
Identity: wOptions.Identity,
MetricsScope: wOptions.MetricsScope,
Logger: wOptions.Logger,
Expand Down Expand Up @@ -1253,7 +1255,8 @@ func processTestTags(wOptions *WorkerOptions, ep *workerExecutionParameters) {
switch key {
case workerOptionsConfigConcurrentPollRoutineSize:
if size, err := strconv.Atoi(val); err == nil {
ep.ConcurrentPollRoutineSize = size
ep.MaxConcurrentActivityPollers = size
ep.MaxConcurrentDecisionPollers = size
}
}
}
Expand Down Expand Up @@ -1388,12 +1391,18 @@ func augmentWorkerOptions(options WorkerOptions) WorkerOptions {
if options.WorkerActivitiesPerSecond == 0 {
options.WorkerActivitiesPerSecond = defaultWorkerActivitiesPerSecond
}
if options.MaxConcurrentActivityTaskPollers <= 0 {
options.MaxConcurrentActivityTaskPollers = defaultConcurrentPollRoutineSize
}
if options.MaxConcurrentDecisionTaskExecutionSize == 0 {
options.MaxConcurrentDecisionTaskExecutionSize = defaultMaxConcurrentTaskExecutionSize
}
if options.WorkerDecisionTasksPerSecond == 0 {
options.WorkerDecisionTasksPerSecond = defaultWorkerTaskExecutionRate
}
if options.MaxConcurrentDecisionTaskPollers <= 0 {
options.MaxConcurrentDecisionTaskPollers = defaultConcurrentPollRoutineSize
}
if options.MaxConcurrentLocalActivityExecutionSize == 0 {
options.MaxConcurrentLocalActivityExecutionSize = defaultMaxConcurrentLocalActivityExecutionSize
}
Expand Down
18 changes: 10 additions & 8 deletions internal/internal_worker_interfaces_test.go
Expand Up @@ -174,10 +174,11 @@ func (s *InterfacesTestSuite) TestInterface() {
domain := "testDomain"
// Workflow execution parameters.
workflowExecutionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 4,
Logger: logger,
Tracer: opentracing.NoopTracer{},
TaskList: "testTaskList",
MaxConcurrentActivityPollers: 4,
MaxConcurrentDecisionPollers: 4,
Logger: logger,
Tracer: opentracing.NoopTracer{},
}

domainStatus := m.DomainStatusRegistered
Expand All @@ -204,10 +205,11 @@ func (s *InterfacesTestSuite) TestInterface() {

// Create activity execution parameters.
activityExecutionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 10,
Logger: logger,
Tracer: opentracing.NoopTracer{},
TaskList: "testTaskList",
MaxConcurrentActivityPollers: 10,
MaxConcurrentDecisionPollers: 10,
Logger: logger,
Tracer: opentracing.NoopTracer{},
}

// Register activity instances and launch the worker.
Expand Down
125 changes: 125 additions & 0 deletions internal/internal_worker_test.go
Expand Up @@ -31,9 +31,11 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
"go.uber.org/cadence/.gen/go/shared"
"go.uber.org/cadence/internal/common"
Expand Down Expand Up @@ -1133,6 +1135,129 @@ func TestActivityNilArgs_WithDataConverter(t *testing.T) {
require.Error(t, err) // testDataConverter cannot encode nil value
}

func TestWorkerOptionDefaults(t *testing.T) {
domain := "worker-options-test"
taskList := "worker-options-tl"
worker := newAggregatedWorker(nil, domain, taskList, WorkerOptions{})
aggWorker, ok := worker.(*aggregatedWorker)
require.True(t, ok)

decisionWorker, ok := aggWorker.workflowWorker.(*workflowWorker)
require.True(t, ok)
require.True(t, decisionWorker.executionParameters.Identity != "")
require.NotNil(t, decisionWorker.executionParameters.Logger)
require.NotNil(t, decisionWorker.executionParameters.MetricsScope)
require.Nil(t, decisionWorker.executionParameters.ContextPropagators)

expected := workerExecutionParameters{
TaskList: taskList,
MaxConcurrentActivityPollers: defaultConcurrentPollRoutineSize,
MaxConcurrentDecisionPollers: defaultConcurrentPollRoutineSize,
ConcurrentLocalActivityExecutionSize: defaultMaxConcurrentLocalActivityExecutionSize,
ConcurrentActivityExecutionSize: defaultMaxConcurrentActivityExecutionSize,
ConcurrentDecisionTaskExecutionSize: defaultMaxConcurrentTaskExecutionSize,
WorkerActivitiesPerSecond: defaultTaskListActivitiesPerSecond,
WorkerDecisionTasksPerSecond: defaultWorkerTaskExecutionRate,
TaskListActivitiesPerSecond: defaultTaskListActivitiesPerSecond,
WorkerLocalActivitiesPerSecond: defaultWorkerLocalActivitiesPerSecond,
StickyScheduleToStartTimeout: stickyDecisionScheduleToStartTimeoutSeconds * time.Second,
DataConverter: getDefaultDataConverter(),
Tracer: opentracing.NoopTracer{},
Logger: decisionWorker.executionParameters.Logger,
MetricsScope: decisionWorker.executionParameters.MetricsScope,
Identity: decisionWorker.executionParameters.Identity,
UserContext: decisionWorker.executionParameters.UserContext,
}

assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters)

activityWorker, ok := aggWorker.activityWorker.(*activityWorker)
require.True(t, ok)
require.True(t, activityWorker.executionParameters.Identity != "")
require.NotNil(t, activityWorker.executionParameters.Logger)
require.NotNil(t, activityWorker.executionParameters.MetricsScope)
require.Nil(t, activityWorker.executionParameters.ContextPropagators)
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
}

func TestWorkerOptionNonDefaults(t *testing.T) {
domain := "worker-options-test"
taskList := "worker-options-tl"

options := WorkerOptions{
Identity: "143@worker-options-test-1",
TaskListActivitiesPerSecond: 8888,
MaxConcurrentSessionExecutionSize: 3333,
MaxConcurrentDecisionTaskExecutionSize: 2222,
MaxConcurrentActivityExecutionSize: 1111,
MaxConcurrentLocalActivityExecutionSize: 101,
MaxConcurrentDecisionTaskPollers: 11,
MaxConcurrentActivityTaskPollers: 12,
WorkerLocalActivitiesPerSecond: 222,
WorkerDecisionTasksPerSecond: 111,
WorkerActivitiesPerSecond: 99,
StickyScheduleToStartTimeout: 555 * time.Minute,
DataConverter: &defaultDataConverter{},
BackgroundActivityContext: context.Background(),
Logger: zap.NewNop(),
MetricsScope: tally.NoopScope,
Tracer: opentracing.NoopTracer{},
}

worker := newAggregatedWorker(nil, domain, taskList, options)
aggWorker, ok := worker.(*aggregatedWorker)
require.True(t, ok)

decisionWorker, ok := aggWorker.workflowWorker.(*workflowWorker)
require.True(t, len(decisionWorker.executionParameters.ContextPropagators) > 0)
require.True(t, ok)

expected := workerExecutionParameters{
TaskList: taskList,
MaxConcurrentActivityPollers: options.MaxConcurrentActivityTaskPollers,
MaxConcurrentDecisionPollers: options.MaxConcurrentDecisionTaskPollers,
ConcurrentLocalActivityExecutionSize: options.MaxConcurrentLocalActivityExecutionSize,
ConcurrentActivityExecutionSize: options.MaxConcurrentActivityExecutionSize,
ConcurrentDecisionTaskExecutionSize: options.MaxConcurrentDecisionTaskExecutionSize,
WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond,
WorkerDecisionTasksPerSecond: options.WorkerDecisionTasksPerSecond,
TaskListActivitiesPerSecond: options.TaskListActivitiesPerSecond,
WorkerLocalActivitiesPerSecond: options.WorkerLocalActivitiesPerSecond,
StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout,
DataConverter: options.DataConverter,
Tracer: options.Tracer,
Logger: options.Logger,
MetricsScope: options.MetricsScope,
Identity: options.Identity,
}

assertWorkerExecutionParamsEqual(t, expected, decisionWorker.executionParameters)

activityWorker, ok := aggWorker.activityWorker.(*activityWorker)
require.True(t, ok)
require.True(t, len(activityWorker.executionParameters.ContextPropagators) > 0)
assertWorkerExecutionParamsEqual(t, expected, activityWorker.executionParameters)
}

func assertWorkerExecutionParamsEqual(t *testing.T, paramsA workerExecutionParameters, paramsB workerExecutionParameters) {
require.Equal(t, paramsA.TaskList, paramsA.TaskList)
require.Equal(t, paramsA.Identity, paramsB.Identity)
require.Equal(t, paramsA.DataConverter, paramsB.DataConverter)
require.Equal(t, paramsA.Tracer, paramsB.Tracer)
require.Equal(t, paramsA.ConcurrentLocalActivityExecutionSize, paramsB.ConcurrentLocalActivityExecutionSize)
require.Equal(t, paramsA.ConcurrentActivityExecutionSize, paramsB.ConcurrentActivityExecutionSize)
require.Equal(t, paramsA.ConcurrentDecisionTaskExecutionSize, paramsB.ConcurrentDecisionTaskExecutionSize)
require.Equal(t, paramsA.WorkerActivitiesPerSecond, paramsB.WorkerActivitiesPerSecond)
require.Equal(t, paramsA.WorkerDecisionTasksPerSecond, paramsB.WorkerDecisionTasksPerSecond)
require.Equal(t, paramsA.TaskListActivitiesPerSecond, paramsB.TaskListActivitiesPerSecond)
require.Equal(t, paramsA.StickyScheduleToStartTimeout, paramsB.StickyScheduleToStartTimeout)
require.Equal(t, paramsA.MaxConcurrentDecisionPollers, paramsB.MaxConcurrentDecisionPollers)
require.Equal(t, paramsA.MaxConcurrentActivityPollers, paramsB.MaxConcurrentActivityPollers)
require.Equal(t, paramsA.NonDeterministicWorkflowPolicy, paramsB.NonDeterministicWorkflowPolicy)
require.Equal(t, paramsA.EnableLoggingInReplay, paramsB.EnableLoggingInReplay)
require.Equal(t, paramsA.DisableStickyExecution, paramsB.DisableStickyExecution)
}

/*
var testWorkflowID1 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID"), RunId: common.StringPtr("runID")}
var testWorkflowID2 = s.WorkflowExecution{WorkflowId: common.StringPtr("testWID2"), RunId: common.StringPtr("runID2")}
Expand Down
24 changes: 12 additions & 12 deletions internal/internal_workers_test.go
Expand Up @@ -94,11 +94,11 @@ func (s *WorkersTestSuite) TestWorkflowWorker() {

ctx, cancel := context.WithCancel(context.Background())
executionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 5,
Logger: logger,
UserContext: ctx,
UserContextCancel: cancel,
TaskList: "testTaskList",
MaxConcurrentDecisionPollers: 5,
Logger: logger,
UserContext: ctx,
UserContextCancel: cancel,
}
overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()}
workflowWorker := newWorkflowWorkerInternal(
Expand All @@ -119,9 +119,9 @@ func (s *WorkersTestSuite) TestActivityWorker() {
s.service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), callOptions...).Return(nil).AnyTimes()

executionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 5,
Logger: logger,
TaskList: "testTaskList",
MaxConcurrentActivityPollers: 5,
Logger: logger,
}
overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler()}
a := &greeterActivity{}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
ctx, cancel := context.WithCancel(context.Background())
executionParameters := workerExecutionParameters{
TaskList: "testTaskList",
ConcurrentPollRoutineSize: 5,
MaxConcurrentActivityPollers: 5,
ConcurrentActivityExecutionSize: 2,
Logger: logger,
UserContext: ctx,
Expand Down Expand Up @@ -202,9 +202,9 @@ func (s *WorkersTestSuite) TestPollForDecisionTask_InternalServiceError() {
s.service.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), callOptions...).Return(&m.PollForDecisionTaskResponse{}, &m.InternalServiceError{}).AnyTimes()

executionParameters := workerExecutionParameters{
TaskList: "testDecisionTaskList",
ConcurrentPollRoutineSize: 5,
Logger: zap.NewNop(),
TaskList: "testDecisionTaskList",
MaxConcurrentDecisionPollers: 5,
Logger: zap.NewNop(),
}
overrides := &workerOverrides{workflowTaskHandler: newSampleWorkflowTaskHandler()}
workflowWorker := newWorkflowWorkerInternal(
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_testsuite.go
Expand Up @@ -537,7 +537,7 @@ func (env *testWorkflowEnvironmentImpl) executeActivity(
func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
activityFn interface{},
args ...interface{},
) (Value, error) {
) (val Value, err error) {
params := executeLocalActivityParams{
localActivityOptions: localActivityOptions{
ScheduleToCloseTimeoutSeconds: common.Int32Ceil(env.testTimeout.Seconds()),
Expand Down

0 comments on commit 0c34abf

Please sign in to comment.