-
Notifications
You must be signed in to change notification settings - Fork 32
/
commands.taskqueue.go
142 lines (126 loc) · 4.13 KB
/
commands.taskqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package temporalcli
import (
"fmt"
"time"
"github.com/fatih/color"
"github.com/temporalio/cli/temporalcli/internal/printer"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)
func (c *TemporalTaskQueueDescribeCommand) run(cctx *CommandContext, args []string) error {
// Call describe
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
var taskQueueType enums.TaskQueueType
switch c.TaskQueueType.Value {
case "workflow":
taskQueueType = enums.TASK_QUEUE_TYPE_WORKFLOW
case "activity":
taskQueueType = enums.TASK_QUEUE_TYPE_ACTIVITY
default:
return fmt.Errorf("unrecognized task queue type: %q", c.TaskQueueType.Value)
}
taskQueueName, err := tqname.FromBaseName(c.TaskQueue)
if err != nil {
return fmt.Errorf("failed to parse task queue name: %w", err)
}
partitions := c.Partitions
type statusWithPartition struct {
Partition int `json:"partition"`
taskqueue.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueue.PollerInfo
// copy this out to display nicer in table or card, but not json
Versioning *commonpb.WorkerVersionCapabilities `json:"-"`
}
var statuses []*statusWithPartition
var pollers []*pollerWithPartition
// TODO: remove this when the server does partition fan-out
for p := 0; p < partitions; p++ {
resp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: c.Parent.Namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: taskQueueName.WithPartition(p).FullName(),
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
Versioning: pi.WorkerVersionCapabilities,
})
}
}
// For JSON, we'll just dump the proto
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, printer.StructuredOptions{})
}
// For text, we will use a table for pollers
cctx.Printer.Println(color.MagentaString("Pollers:"))
items := make([]struct {
Identity string
LastAccessTime time.Time
RatePerSecond float64
}, len(pollers))
for i, poller := range pollers {
items[i].Identity = poller.Identity
items[i].LastAccessTime = poller.LastAccessTime.AsTime()
items[i].RatePerSecond = poller.RatePerSecond
}
return cctx.Printer.PrintStructured(items, printer.StructuredOptions{Table: &printer.TableOptions{}})
}
func (c *TemporalTaskQueueListPartitionCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
request := &workflowservice.ListTaskQueuePartitionsRequest{
Namespace: c.Parent.Namespace,
TaskQueue: &taskqueue.TaskQueue{
Name: c.TaskQueue,
Kind: enums.TASK_QUEUE_KIND_NORMAL,
},
}
resp, err := cl.WorkflowService().ListTaskQueuePartitions(cctx, request)
if err != nil {
return fmt.Errorf("unable to list task queues: %w", err)
}
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{})
}
var items []*taskqueue.TaskQueuePartitionMetadata
cctx.Printer.Println(color.MagentaString("Workflow Task Queue Partitions\n"))
for _, e := range resp.WorkflowTaskQueuePartitions {
items = append(items, e)
}
_ = cctx.Printer.PrintStructured(items, printer.StructuredOptions{Table: &printer.TableOptions{}})
items = items[:0]
cctx.Printer.Println(color.MagentaString("\nActivity Task Queue Partitions\n"))
for _, e := range resp.ActivityTaskQueuePartitions {
items = append(items, e)
}
_ = cctx.Printer.PrintStructured(items, printer.StructuredOptions{Table: &printer.TableOptions{}})
return nil
}