-
Notifications
You must be signed in to change notification settings - Fork 251
/
actors.go
299 lines (250 loc) · 8.59 KB
/
actors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
package group
import (
"context"
"fmt"
"html/template"
"io"
"strings"
"sync"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
tcclient "github.com/taskcluster/taskcluster/v27/clients/client-go"
"github.com/taskcluster/taskcluster/v27/clients/client-go/tcqueue"
"github.com/taskcluster/taskcluster/v27/clients/client-shell/config"
)
var listFormat string
func init() {
cancelCmd := &cobra.Command{
Use: "cancel <taskGroupId>",
Short: "Cancel a whole group by taskGroupId.",
RunE: executeHelperE(runCancel),
}
cancelCmd.Flags().StringP("worker-type", "w", "", "Only cancel tasks with a certain worker type.")
cancelCmd.Flags().BoolP("force", "f", false, "Skip cancellation confirmation.")
Command.AddCommand(cancelCmd)
statusCmd := &cobra.Command{
Use: "status <taskGroupId>",
Short: "Show the status of a task group",
RunE: executeHelperE(runStatus),
}
Command.AddCommand(statusCmd)
listCmd := &cobra.Command{
Use: "list <taskGroupId>",
Short: "List task details: ID and label",
RunE: executeHelperE(runList),
}
listCmd.Flags().BoolP("all", "a", false, "Include all tasks (Overrides other options).")
listCmd.Flags().BoolP("running", "r", false, "Include running tasks.")
listCmd.Flags().BoolP("failed", "f", false, "Include failed tasks.")
listCmd.Flags().BoolP("exception", "e", false, "Include exception tasks.")
listCmd.Flags().BoolP("complete", "c", false, "Include complete tasks.")
listCmd.Flags().BoolP("unscheduled", "u", false, "Include unscheduled tasks.")
listCmd.Flags().BoolP("pending", "p", false, "Include pending tasks.")
listCmd.Flags().StringVar(&listFormat, "format-string", "{{ .Status.TaskID }} {{ .Task.Metadata.Name }} {{ .Status.State }}", "Go Template string for output")
Command.AddCommand(listCmd)
}
func makeQueue(credentials *tcclient.Credentials) *tcqueue.Queue {
return tcqueue.New(credentials, config.RootURL())
}
// runCancel cancels all tasks of a group.
//
// It first fetches the list of all tasks associated with the given group,
// then filters for only cancellable tasks (unscheduled, pending, running),
// and finally runs all cancellations concurrently, because they are
// independent of each other.
func runCancel(credentials *tcclient.Credentials, args []string, out io.Writer, flags *pflag.FlagSet) error {
q := makeQueue(credentials)
groupID := args[0]
// Because the list of tasks can be arbitrarily long, we have to loop until
// we are told not to.
tasks := make([]string, 0)
tasksNames := make([]string, 0)
cont := ""
for {
// get next TaskGroup for groupID
ts, err := q.ListTaskGroup(groupID, cont, "")
if err != nil {
return fmt.Errorf("could not fetch tasks for group %s: %v", groupID, err)
}
// set tasks that meet the criteria (see filterTask) to be deleted
for _, t := range ts.Tasks {
if filterTask(t.Status, flags) {
// add id to be deleted, and name for cancellation
tasks = append(tasks, t.Status.TaskID)
tasksNames = append(tasksNames, t.Task.Metadata.Name)
}
}
// break if there are no more tasks for that groupID
if cont = ts.ContinuationToken; cont == "" {
break
}
}
if len(tasks) == 0 {
fmt.Fprintln(out, "No suitable tasks found for cancellation.")
return nil
}
// ask for confirmation before cancellation
if force, _ := flags.GetBool("force"); !force && !confirmCancellation(tasks, tasksNames, out) {
fmt.Fprintln(out, "Cancellation of tasks aborted.")
return nil
}
// Here we use a waitgroup to ensure that we return once all the tasks have
// been completed.
wg := &sync.WaitGroup{}
// The context allows us to exit early if any of the cancellation fails.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// errChan allows the first request that panics to propagate the error message
// up to us.
errChan := make(chan error, 1)
for _, taskID := range tasks {
wg.Add(1)
go func(taskID string) {
// If we paniced on the way out, abort all the other tasks.
defer func() {
//recover from panic and abort
if err := recover(); err != nil {
if e, ok := err.(error); ok {
errChan <- e
} else {
errChan <- fmt.Errorf("%v", err)
}
cancel()
}
}()
fmt.Fprintf(out, "cancelling task %s\n", taskID)
c := make(chan error, 1)
go func() { _, err := q.CancelTask(taskID); c <- err }()
// we select the first that returns or closes:
// - ctx.Done() if we aborted;
// - c if we got a completed cancellation.
select {
case <-ctx.Done():
// nothing because we can't cancel the existing requests.
case err := <-c:
if err != nil {
panic(fmt.Errorf("could not cancel task %s: %v", taskID, err))
}
}
// if we exited normally, we indicate that we completed.
wg.Done()
}(taskID)
}
// change the semantics of waitgroup to close a channel instead of blocking
// the main thread.
regularExit := make(chan bool)
go func() { wg.Wait(); close(regularExit) }()
// We select the first that closes:
// - ctx.Done() if we aborted;
// - regularExit if all the goroutine exited manually.
select {
case <-ctx.Done():
return fmt.Errorf("could not cancel all tasks: %v", <-errChan)
case <-regularExit:
return nil
}
}
// filterTask takes a task and returns whether or not this task should be
// set for cancellation, based on the specified filters through flags
func filterTask(status tcqueue.TaskStatusStructure, flags *pflag.FlagSet) bool {
// first check - only delete tasks that are unscheduled, pending, running
if status.State != "unscheduled" && status.State != "pending" && status.State != "running" {
return false
}
// filter for worker type, if one specified
// if no worker type is specified, its value is "" so the condition is skipped
if workerType, _ := flags.GetString("worker-type"); workerType != "" {
if workerType != status.WorkerType {
return false
}
}
// ..other filters can be added here as necessary
return true
}
// confirmCancellation lists the tasks to be cancelled and prompts to confirm cancellation
func confirmCancellation(ids []string, names []string, out io.Writer) bool {
// list tasks
fmt.Fprintf(out, "The following %d tasks will be cancelled:\n", len(ids))
for n, id := range ids {
fmt.Fprintf(out, "\tTask %s: %s\n", id, names[n])
}
for {
fmt.Fprint(out, "Are you sure you want to cancel these tasks? [y/n] ")
var c string
fmt.Scanf("%s", &c)
if c == "y" || c == "Y" {
return true
} else if c == "n" || c == "N" {
return false
}
// otherwise reloop to ask again
}
}
// runStatus displays the status summary of tasks in a group.
//
// It first fetches the list of all tasks associated with the given group,
// then counts the unique states of the final run of each task
func runStatus(credentials *tcclient.Credentials, args []string, out io.Writer, flags *pflag.FlagSet) error {
q := makeQueue(credentials)
groupID := args[0]
counter := make(map[string]int)
cont := ""
for {
// get next TaskGroup for groupID
ts, err := q.ListTaskGroup(groupID, cont, "")
if err != nil {
return fmt.Errorf("could not fetch tasks for group %s: %v", groupID, err)
}
for _, t := range ts.Tasks {
counter[t.Status.State]++
}
// break if there are no more tasks for that groupID
if cont = ts.ContinuationToken; cont == "" {
break
}
}
for status, count := range counter {
fmt.Fprintf(out, "%s: %d\n", status, count)
}
return nil
}
// runList displays the a list of task IDs and labels that match the given statuses
//
// It first fetches the list of all tasks associated with the given group
func runList(credentials *tcclient.Credentials, args []string, out io.Writer, flags *pflag.FlagSet) error {
q := makeQueue(credentials)
groupID := args[0]
cont := ""
templ := template.Must(template.New("listFormat").Parse(strings.Join([]string{listFormat, "\n"}, "")))
for {
// get next TaskGroup for groupID
ts, err := q.ListTaskGroup(groupID, cont, "")
if err != nil {
return fmt.Errorf("could not fetch tasks for group %s: %v", groupID, err)
}
for _, t := range ts.Tasks {
if filterListTask(t.Status, flags) {
err := templ.Execute(out, t)
if err != nil {
return err
}
}
}
// break if there are no more tasks for that groupID
if cont = ts.ContinuationToken; cont == "" {
break
}
}
return nil
}
// filterListTask takes a task and returns whether or not this task should be
// included in the list requested by the user
func filterListTask(status tcqueue.TaskStatusStructure, flags *pflag.FlagSet) bool {
if include, err := flags.GetBool("all"); include && err == nil {
return true
}
if include, err := flags.GetBool(status.State); include && err == nil {
return true
}
return false
}