Skip to content

Commit

Permalink
Switch to admin API in tctl list shard tasks by category (#2217)
Browse files Browse the repository at this point in the history
* Switch to admin API in tctl list shard tasks by category
  • Loading branch information
feedmeapples committed Nov 23, 2021
1 parent 90e89dd commit 769b69f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 23 deletions.
9 changes: 4 additions & 5 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,8 @@ func newAdminShardManagementCommands() []cli.Command {
{
Name: "list_tasks",
Usage: "List tasks for given shard Id and task type",
Flags: append(append(
getDBFlags(),
flagsForPagination...),
Flags: append(
flagsForPagination,
cli.StringFlag{
Name: FlagTargetCluster,
Value: "active",
Expand All @@ -188,7 +187,7 @@ func newAdminShardManagementCommands() []cli.Command {
cli.StringFlag{
Name: FlagTaskType,
Value: "transfer",
Usage: "Task type: transfer (default), timer, replication",
Usage: "Task type: transfer (default), timer, replication, visibility",
},
cli.StringFlag{
Name: FlagMinVisibilityTimestamp,
Expand All @@ -206,7 +205,7 @@ func newAdminShardManagementCommands() []cli.Command {
},
),
Action: func(c *cli.Context) {
AdminListTasks(c)
AdminListShardTasks(c)
},
},
{
Expand Down
34 changes: 16 additions & 18 deletions tools/cli/adminCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ func AdminDescribeTask(c *cli.Context) {
}
}

// AdminListTasks outputs a list of a tasks for given Shard and Task Type
func AdminListTasks(c *cli.Context) {
// AdminListShardTasks outputs a list of a tasks for given Shard and Task Category
func AdminListShardTasks(c *cli.Context) {
sid := int32(getRequiredIntOption(c, FlagShardID))
categoryInt, err := stringToEnum(c.String(FlagTaskType), enumsspb.TaskCategory_value)
if err != nil {
Expand All @@ -610,18 +610,16 @@ func AdminListTasks(c *cli.Context) {
ErrorAndExit(fmt.Sprintf("Task type %s is currently not supported", category), nil)
}

pFactory := CreatePersistenceFactory(c)
executionManager, err := pFactory.NewExecutionManager()
if err != nil {
ErrorAndExit("Failed to initialize execution manager", err)
}
client := cFactory.AdminClient(c)

ctx, cancel := newContext(c)
defer cancel()
if category == enumsspb.TASK_CATEGORY_TRANSFER {
req := &persistence.GetTransferTasksRequest{ShardID: sid}
req := &adminservice.ListTransferTasksRequest{ShardId: sid}

paginationFunc := func(paginationToken []byte) ([]interface{}, []byte, error) {
req.NextPageToken = paginationToken
response, err := executionManager.GetTransferTasks(req)
response, err := client.ListTransferTasks(ctx, req)
if err != nil {
return nil, nil, err
}
Expand All @@ -635,11 +633,11 @@ func AdminListTasks(c *cli.Context) {
}
paginate(c, paginationFunc)
} else if category == enumsspb.TASK_CATEGORY_VISIBILITY {
req := &persistence.GetVisibilityTasksRequest{ShardID: sid}
req := &adminservice.ListVisibilityTasksRequest{ShardId: sid}

paginationFunc := func(paginationToken []byte) ([]interface{}, []byte, error) {
req.NextPageToken = paginationToken
response, err := executionManager.GetVisibilityTasks(req)
response, err := client.ListVisibilityTasks(ctx, req)
if err != nil {
return nil, nil, err
}
Expand All @@ -656,14 +654,14 @@ func AdminListTasks(c *cli.Context) {
minVis := parseTime(c.String(FlagMinVisibilityTimestamp), time.Time{}, time.Now().UTC())
maxVis := parseTime(c.String(FlagMaxVisibilityTimestamp), time.Time{}, time.Now().UTC())

req := &persistence.GetTimerTasksRequest{
ShardID: sid,
MinTimestamp: minVis,
MaxTimestamp: maxVis,
req := &adminservice.ListTimerTasksRequest{
ShardId: sid,
MinTime: &minVis,
MaxTime: &maxVis,
}
paginationFunc := func(paginationToken []byte) ([]interface{}, []byte, error) {
req.NextPageToken = paginationToken
response, err := executionManager.GetTimerTasks(req)
response, err := client.ListTimerTasks(ctx, req)
if err != nil {
return nil, nil, err
}
Expand All @@ -677,10 +675,10 @@ func AdminListTasks(c *cli.Context) {
}
paginate(c, paginationFunc)
} else if category == enumsspb.TASK_CATEGORY_REPLICATION {
req := &persistence.GetReplicationTasksRequest{}
req := &adminservice.ListReplicationTasksRequest{ShardId: sid}
paginationFunc := func(paginationToken []byte) ([]interface{}, []byte, error) {
req.NextPageToken = paginationToken
response, err := executionManager.GetReplicationTasks(req)
response, err := client.ListReplicationTasks(ctx, req)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 769b69f

Please sign in to comment.