Skip to content

Commit

Permalink
Rename task list to task queue (#480)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 27, 2020
1 parent 7da2406 commit c16eef6
Show file tree
Hide file tree
Showing 217 changed files with 4,606 additions and 4,608 deletions.
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
"args": [
"workflow",
"run",
"--tasklist",
"TestTaskList",
"--taskqueue",
"TestTaskQueue",
"--workflow_type",
"TestWorkflow_test",
"--execution_timeout",
Expand Down
8 changes: 4 additions & 4 deletions canary/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ func registerBatch(r registrar) {
const (
// TODO: to get rid of them:
// after batch job has an API, we should use the API: https://github.com/uber/cadence/issues/2225
sysBatchWFTypeName = "temporal-sys-batch-workflow"
systemBatcherTaskListName = "temporal-sys-batcher-tasklist"
sysBatchWFTypeName = "temporal-sys-batch-workflow"
systemBatcherTaskQueueName = "temporal-sys-batcher-taskqueue"

// there are two level, so totally 5*5 + 5 == 30 descendants
// default batch RPS is 50, so it will takes ~1 seconds to terminate all
Expand Down Expand Up @@ -104,7 +104,7 @@ func batchWorkflow(ctx workflow.Context, scheduledTimeNanos int64, namespace str
MaximumAttempts: 4,
}
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskList: taskListName,
TaskQueue: taskQueueName,
ScheduleToStartTimeout: scheduleToStartTimeout,
StartToCloseTimeout: activityTaskTimeout,
RetryPolicy: retryPolicy,
Expand Down Expand Up @@ -179,7 +179,7 @@ func startBatchWorkflow(ctx context.Context, namespace, startTime string) error
options := client.StartWorkflowOptions{
WorkflowExecutionTimeout: childWorkflowTimeout,
WorkflowTaskTimeout: decisionTaskTimeout,
TaskList: systemBatcherTaskListName,
TaskQueue: systemBatcherTaskQueueName,
SearchAttributes: map[string]interface{}{
"CustomNamespace": namespace,
"Operator": "admin",
Expand Down
4 changes: 2 additions & 2 deletions canary/canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ func (c *canaryImpl) startWorker() error {
MaxConcurrentActivityExecutionSize: activityWorkerMaxExecutors,
}

archivalWorker := worker.New(c.archivalClient.Client, archivalTaskListName, options)
archivalWorker := worker.New(c.archivalClient.Client, archivalTaskQueueName, options)
registerHistoryArchival(archivalWorker)

defer archivalWorker.Stop()
if err := archivalWorker.Start(); err != nil {
return err
}

canaryWorker := worker.New(c.canaryClient.Client, taskListName, options)
canaryWorker := worker.New(c.canaryClient.Client, taskQueueName, options)
registerBatch(canaryWorker)
registerCancellation(canaryWorker)
registerConcurrentExec(canaryWorker)
Expand Down
6 changes: 3 additions & 3 deletions canary/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func newCadenceClient(namespace string, runtime *RuntimeContext) (cadenceClient,
func newWorkflowOptions(id string, executionTimeout time.Duration) client.StartWorkflowOptions {
return client.StartWorkflowOptions{
ID: id,
TaskList: taskListName,
TaskQueue: taskQueueName,
WorkflowRunTimeout: executionTimeout,
WorkflowTaskTimeout: decisionTaskTimeout,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
Expand All @@ -127,7 +127,7 @@ func newWorkflowOptions(id string, executionTimeout time.Duration) client.StartW
// newActivityOptions builds and returns activityOptions with reasonable defaults
func newActivityOptions() workflow.ActivityOptions {
return workflow.ActivityOptions{
TaskList: taskListName,
TaskQueue: taskQueueName,
StartToCloseTimeout: activityTaskTimeout,
ScheduleToStartTimeout: scheduleToStartTimeout,
ScheduleToCloseTimeout: scheduleToStartTimeout + activityTaskTimeout,
Expand All @@ -139,7 +139,7 @@ func newChildWorkflowOptions(namespace string, wfID string) workflow.ChildWorkfl
return workflow.ChildWorkflowOptions{
Namespace: namespace,
WorkflowID: wfID,
TaskList: taskListName,
TaskQueue: taskQueueName,
WorkflowRunTimeout: childWorkflowTimeout,
WorkflowTaskTimeout: decisionTaskTimeout,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
Expand Down
4 changes: 2 additions & 2 deletions canary/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ const (
decisionTaskTimeout = 10 * time.Second
activityTaskTimeout = 3 * time.Minute
childWorkflowTimeout = 6 * time.Minute
taskListName = "canary-task-queue"
taskQueueName = "canary-task-queue"
ctxKeyActivityRuntime = "runtime"
ctxKeyActivityArchivalRuntime = "runtime-archival"
ctxKeyActivitySystemClient = "system-client"
archivalNamespace = "canary-archival-namespace"
systemNamespace = "temporal-system"
archivalTaskListName = "canary-archival-task-queue"
archivalTaskQueueName = "canary-archival-task-queue"
)

// workflowVersion represents the current version of every single
Expand Down
2 changes: 1 addition & 1 deletion canary/historyArchival.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func executeArchivalExeternalWorkflow(
) (*commonpb.WorkflowExecution, error) {
workflowID := fmt.Sprintf("%v.%v", wfTypeArchivalExternal, uuid.New())
ops := newWorkflowOptions(workflowID, childWorkflowTimeout)
ops.TaskList = archivalTaskListName
ops.TaskQueue = archivalTaskQueueName
workflowRun, err := client.ExecuteWorkflow(ctx, ops, wfTypeArchivalExternal, scheduledTimeNanos)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions canary/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func resetWorkflow(ctx workflow.Context, scheduledTimeNanos int64, namespace str

expiration := time.Duration(info.WorkflowRunTimeoutSeconds) * time.Second
activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskList: taskListName,
TaskQueue: taskQueueName,
ScheduleToStartTimeout: expiration,
StartToCloseTimeout: expiration,
})
Expand Down Expand Up @@ -121,14 +121,14 @@ func resetBaseWorkflow(ctx workflow.Context, scheduledTimeNanos int64, parentID,
}

activityCtxWithRetry := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskList: taskListName,
TaskQueue: taskQueueName,
ScheduleToCloseTimeout: expiration,
ScheduleToStartTimeout: expiration,
StartToCloseTimeout: expiration,
RetryPolicy: retryPolicy,
})
activityCtxNoRetry := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskList: taskListName,
TaskQueue: taskQueueName,
ScheduleToStartTimeout: expiration,
StartToCloseTimeout: expiration,
})
Expand Down
2 changes: 1 addition & 1 deletion canary/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func retryWorkflow(ctx workflow.Context, scheduledTimeNanos int64, namespace str
}

activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskList: taskListName,
TaskQueue: taskQueueName,
ScheduleToCloseTimeout: expiration,
HeartbeatTimeout: 5 * time.Second,
RetryPolicy: retryPolicy,
Expand Down
24 changes: 12 additions & 12 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ func (c *clientImpl) DescribeNamespace(
return client.DescribeNamespace(ctx, request, opts...)
}

func (c *clientImpl) DescribeTaskList(
func (c *clientImpl) DescribeTaskQueue(
ctx context.Context,
request *workflowservice.DescribeTaskListRequest,
request *workflowservice.DescribeTaskQueueRequest,
opts ...grpc.CallOption,
) (*workflowservice.DescribeTaskListResponse, error) {
) (*workflowservice.DescribeTaskQueueResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.DescribeTaskList(ctx, request, opts...)
return client.DescribeTaskQueue(ctx, request, opts...)
}

func (c *clientImpl) DescribeWorkflowExecution(
Expand Down Expand Up @@ -344,18 +344,18 @@ func (c *clientImpl) RequestCancelWorkflowExecution(
return client.RequestCancelWorkflowExecution(ctx, request, opts...)
}

func (c *clientImpl) ResetStickyTaskList(
func (c *clientImpl) ResetStickyTaskQueue(
ctx context.Context,
request *workflowservice.ResetStickyTaskListRequest,
request *workflowservice.ResetStickyTaskQueueRequest,
opts ...grpc.CallOption,
) (*workflowservice.ResetStickyTaskListResponse, error) {
) (*workflowservice.ResetStickyTaskQueueResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.ResetStickyTaskList(ctx, request, opts...)
return client.ResetStickyTaskQueue(ctx, request, opts...)
}

func (c *clientImpl) ResetWorkflowExecution(
Expand Down Expand Up @@ -582,19 +582,19 @@ func (c *clientImpl) GetClusterInfo(
return client.GetClusterInfo(ctx, request, opts...)
}

func (c *clientImpl) ListTaskListPartitions(
func (c *clientImpl) ListTaskQueuePartitions(
ctx context.Context,
request *workflowservice.ListTaskListPartitionsRequest,
request *workflowservice.ListTaskQueuePartitionsRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListTaskListPartitionsResponse, error) {
) (*workflowservice.ListTaskQueuePartitionsResponse, error) {
client, err := c.getRandomClient()
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()

return client.ListTaskListPartitions(ctx, request, opts...)
return client.ListTaskQueuePartitions(ctx, request, opts...)
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
Expand Down
42 changes: 21 additions & 21 deletions client/frontend/metricClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ func (c *metricClient) DescribeNamespace(
return resp, err
}

func (c *metricClient) DescribeTaskList(
func (c *metricClient) DescribeTaskQueue(
ctx context.Context,
request *workflowservice.DescribeTaskListRequest,
request *workflowservice.DescribeTaskQueueRequest,
opts ...grpc.CallOption,
) (*workflowservice.DescribeTaskListResponse, error) {
) (*workflowservice.DescribeTaskQueueResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientDescribeTaskListScope, metrics.ClientRequests)
c.metricsClient.IncCounter(metrics.FrontendClientDescribeTaskQueueScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeTaskListScope, metrics.ClientLatency)
resp, err := c.client.DescribeTaskList(ctx, request, opts...)
sw := c.metricsClient.StartTimer(metrics.FrontendClientDescribeTaskQueueScope, metrics.ClientLatency)
resp, err := c.client.DescribeTaskQueue(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientDescribeTaskListScope, metrics.ClientFailures)
c.metricsClient.IncCounter(metrics.FrontendClientDescribeTaskQueueScope, metrics.ClientFailures)
}
return resp, err
}
Expand Down Expand Up @@ -408,20 +408,20 @@ func (c *metricClient) RequestCancelWorkflowExecution(
return resp, err
}

func (c *metricClient) ResetStickyTaskList(
func (c *metricClient) ResetStickyTaskQueue(
ctx context.Context,
request *workflowservice.ResetStickyTaskListRequest,
request *workflowservice.ResetStickyTaskQueueRequest,
opts ...grpc.CallOption,
) (*workflowservice.ResetStickyTaskListResponse, error) {
) (*workflowservice.ResetStickyTaskQueueResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientResetStickyTaskListScope, metrics.ClientRequests)
c.metricsClient.IncCounter(metrics.FrontendClientResetStickyTaskQueueScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientResetStickyTaskListScope, metrics.ClientLatency)
resp, err := c.client.ResetStickyTaskList(ctx, request, opts...)
sw := c.metricsClient.StartTimer(metrics.FrontendClientResetStickyTaskQueueScope, metrics.ClientLatency)
resp, err := c.client.ResetStickyTaskQueue(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientResetStickyTaskListScope, metrics.ClientFailures)
c.metricsClient.IncCounter(metrics.FrontendClientResetStickyTaskQueueScope, metrics.ClientFailures)
}
return resp, err
}
Expand Down Expand Up @@ -713,20 +713,20 @@ func (c *metricClient) GetClusterInfo(
return resp, err
}

func (c *metricClient) ListTaskListPartitions(
func (c *metricClient) ListTaskQueuePartitions(
ctx context.Context,
request *workflowservice.ListTaskListPartitionsRequest,
request *workflowservice.ListTaskQueuePartitionsRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListTaskListPartitionsResponse, error) {
) (*workflowservice.ListTaskQueuePartitionsResponse, error) {

c.metricsClient.IncCounter(metrics.FrontendClientListTaskListPartitionsScope, metrics.ClientRequests)
c.metricsClient.IncCounter(metrics.FrontendClientListTaskQueuePartitionsScope, metrics.ClientRequests)

sw := c.metricsClient.StartTimer(metrics.FrontendClientListTaskListPartitionsScope, metrics.ClientLatency)
resp, err := c.client.ListTaskListPartitions(ctx, request, opts...)
sw := c.metricsClient.StartTimer(metrics.FrontendClientListTaskQueuePartitionsScope, metrics.ClientLatency)
resp, err := c.client.ListTaskQueuePartitions(ctx, request, opts...)
sw.Stop()

if err != nil {
c.metricsClient.IncCounter(metrics.FrontendClientListTaskListPartitionsScope, metrics.ClientFailures)
c.metricsClient.IncCounter(metrics.FrontendClientListTaskQueuePartitionsScope, metrics.ClientFailures)
}
return resp, err
}
30 changes: 15 additions & 15 deletions client/frontend/retryableClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ func (c *retryableClient) DescribeNamespace(
return resp, err
}

func (c *retryableClient) DescribeTaskList(
func (c *retryableClient) DescribeTaskQueue(
ctx context.Context,
request *workflowservice.DescribeTaskListRequest,
request *workflowservice.DescribeTaskQueueRequest,
opts ...grpc.CallOption,
) (*workflowservice.DescribeTaskListResponse, error) {
var resp *workflowservice.DescribeTaskListResponse
) (*workflowservice.DescribeTaskQueueResponse, error) {
var resp *workflowservice.DescribeTaskQueueResponse
op := func() error {
var err error
resp, err = c.client.DescribeTaskList(ctx, request, opts...)
resp, err = c.client.DescribeTaskQueue(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
Expand Down Expand Up @@ -350,15 +350,15 @@ func (c *retryableClient) RequestCancelWorkflowExecution(
return resp, backoff.Retry(op, c.policy, c.isRetryable)
}

func (c *retryableClient) ResetStickyTaskList(
func (c *retryableClient) ResetStickyTaskQueue(
ctx context.Context,
request *workflowservice.ResetStickyTaskListRequest,
request *workflowservice.ResetStickyTaskQueueRequest,
opts ...grpc.CallOption,
) (*workflowservice.ResetStickyTaskListResponse, error) {
var resp *workflowservice.ResetStickyTaskListResponse
) (*workflowservice.ResetStickyTaskQueueResponse, error) {
var resp *workflowservice.ResetStickyTaskQueueResponse
op := func() error {
var err error
resp, err = c.client.ResetStickyTaskList(ctx, request, opts...)
resp, err = c.client.ResetStickyTaskQueue(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
Expand Down Expand Up @@ -605,15 +605,15 @@ func (c *retryableClient) GetClusterInfo(
return resp, err
}

func (c *retryableClient) ListTaskListPartitions(
func (c *retryableClient) ListTaskQueuePartitions(
ctx context.Context,
request *workflowservice.ListTaskListPartitionsRequest,
request *workflowservice.ListTaskQueuePartitionsRequest,
opts ...grpc.CallOption,
) (*workflowservice.ListTaskListPartitionsResponse, error) {
var resp *workflowservice.ListTaskListPartitionsResponse
) (*workflowservice.ListTaskQueuePartitionsResponse, error) {
var resp *workflowservice.ListTaskQueuePartitionsResponse
op := func() error {
var err error
resp, err = c.client.ListTaskListPartitions(ctx, request, opts...)
resp, err = c.client.ListTaskQueuePartitions(ctx, request, opts...)
return err
}
err := backoff.Retry(op, c.policy, c.isRetryable)
Expand Down
10 changes: 5 additions & 5 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,21 @@ func (c *clientImpl) DescribeMutableState(
return response, nil
}

func (c *clientImpl) ResetStickyTaskList(
func (c *clientImpl) ResetStickyTaskQueue(
ctx context.Context,
request *historyservice.ResetStickyTaskListRequest,
opts ...grpc.CallOption) (*historyservice.ResetStickyTaskListResponse, error) {
request *historyservice.ResetStickyTaskQueueRequest,
opts ...grpc.CallOption) (*historyservice.ResetStickyTaskQueueResponse, error) {
client, err := c.getClientForWorkflowID(request.Execution.WorkflowId)
if err != nil {
return nil, err
}

var response *historyservice.ResetStickyTaskListResponse
var response *historyservice.ResetStickyTaskQueueResponse
op := func(ctx context.Context, client historyservice.HistoryServiceClient) error {
var err error
ctx, cancel := c.createContext(ctx)
defer cancel()
response, err = client.ResetStickyTaskList(ctx, request, opts...)
response, err = client.ResetStickyTaskQueue(ctx, request, opts...)
return err
}
err = c.executeWithRedirect(ctx, client, op)
Expand Down

0 comments on commit c16eef6

Please sign in to comment.