-
Notifications
You must be signed in to change notification settings - Fork 251
/
tcqueueevents.go
394 lines (355 loc) · 13.6 KB
/
tcqueueevents.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
// The following code is AUTO-GENERATED. Please DO NOT edit.
// To update this generated code, run the following command:
// in the /codegenerator/model subdirectory of this project,
// making sure that `${GOPATH}/bin` is in your `PATH`:
//
// go install && go generate
// This package was generated from the schema defined at
// /references/queue/v1/exchanges.json
// The queue service is responsible for accepting tasks and track their state
// as they are executed by workers. In order ensure they are eventually
// resolved.
//
// This document describes AMQP exchanges offered by the queue, which allows
// third-party listeners to monitor tasks as they progress to resolution.
// These exchanges targets the following audience:
// - Schedulers, who takes action after tasks are completed,
// - Workers, who wants to listen for new or canceled tasks (optional),
// - Tools, that wants to update their view as task progress.
//
// You'll notice that all the exchanges in the document shares the same
// routing key pattern. This makes it very easy to bind to all messages
// about a certain kind tasks.
//
// **Task specific routes**, a task can define a task specific route using
// the `task.routes` property. See task creation documentation for details
// on permissions required to provide task specific routes. If a task has
// the entry `'notify.by-email'` in as task specific route defined in
// `task.routes` all messages about this task will be CC'ed with the
// routing-key `'route.notify.by-email'`.
//
// These routes will always be prefixed `route.`, so that cannot interfere
// with the _primary_ routing key as documented here. Notice that the
// _primary_ routing key is always prefixed `primary.`. This is ensured
// in the routing key reference, so API clients will do this automatically.
//
// Please, note that the way RabbitMQ works, the message will only arrive
// in your queue once, even though you may have bound to the exchange with
// multiple routing key patterns that matches more of the CC'ed routing
// routing keys.
//
// **Delivery guarantees**, most operations on the queue are idempotent,
// which means that if repeated with the same arguments then the requests
// will ensure completion of the operation and return the same response.
// This is useful if the server crashes or the TCP connection breaks, but
// when re-executing an idempotent operation, the queue will also resend
// any related AMQP messages. Hence, messages may be repeated.
//
// This shouldn't be much of a problem, as the best you can achieve using
// confirm messages with AMQP is at-least-once delivery semantics. Hence,
// this only prevents you from obtaining at-most-once delivery semantics.
//
// **Remark**, some message generated by timeouts maybe dropped if the
// server crashes at wrong time. Ideally, we'll address this in the
// future. For now we suggest you ignore this corner case, and notify us
// if this corner case is of concern to you.
//
// See:
//
// # How to use this package
//
// This package is designed to sit on top of https://pkg.go.dev/github.com/taskcluster/pulse-go/pulse. Please read
// the pulse package overview to get an understanding of how the pulse client is implemented in go.
//
// This package provides two things in addition to the basic pulse package: structured types for unmarshaling
// pulse message bodies into, and custom Binding interfaces, for defining the fixed strings for task cluster
// exchange names, and routing keys as structured types.
//
// For example, when specifying a binding, rather than using:
//
// pulse.Bind(
// "*.*.*.*.*.*.gaia.#",
// "exchange/taskcluster-queue/v1/task-defined",
// )
//
// You can rather use:
//
// queueevents.TaskDefined{WorkerType: "gaia"}
//
// In addition, this means that you will also get objects in your callback method like *queueevents.TaskDefinedMessage
// rather than just interface{}.
package tcqueueevents
import (
"reflect"
"strings"
)
// When a task is created or just defined a message is posted to this
// exchange.
//
// This message exchange is mainly useful when tasks are created with dependencies
// on incomplete tasks, as this does not make the task
// `pending`. Thus, no `taskPending` message is published.
//
// See #taskDefined
type TaskDefined struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskDefined) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskDefined) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-defined"
}
func (binding TaskDefined) NewPayloadObject() interface{} {
return new(TaskDefinedMessage)
}
// When a task becomes `pending` a message is posted to this exchange.
//
// This is useful for workers who doesn't want to constantly poll the queue
// for new tasks. The queue will also be authority for task states and
// claims. But using this exchange workers should be able to distribute work
// efficiently and they would be able to reduce their polling interval
// significantly without affecting general responsiveness.
//
// See #taskPending
type TaskPending struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskPending) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskPending) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-pending"
}
func (binding TaskPending) NewPayloadObject() interface{} {
return new(TaskPendingMessage)
}
// Whenever a task is claimed by a worker, a run is started on the worker,
// and a message is posted on this exchange.
//
// See #taskRunning
type TaskRunning struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskRunning) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskRunning) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-running"
}
func (binding TaskRunning) NewPayloadObject() interface{} {
return new(TaskRunningMessage)
}
// Whenever the `createArtifact` end-point is called, the queue will create
// a record of the artifact and post a message on this exchange. All of this
// happens before the queue returns a signed URL for the caller to upload
// the actual artifact with (pending on `storageType`).
//
// This means that the actual artifact is rarely available when this message
// is posted. But it is not unreasonable to assume that the artifact will
// will become available at some point later. Most signatures will expire in
// 30 minutes or so, forcing the uploader to call `createArtifact` with
// the same payload again in-order to continue uploading the artifact.
//
// However, in most cases (especially for small artifacts) it's very
// reasonable assume the artifact will be available within a few minutes.
// This property means that this exchange is mostly useful for tools
// monitoring task evaluation. One could also use it count number of
// artifacts per task, or _index_ artifacts though in most cases it'll be
// smarter to index artifacts after the task in question have completed
// successfully.
//
// *NOTE*: this message is currently only sent for reference and error
// artifacts. This will be remedied in a future version of Taskcluster.
//
// See #artifactCreated
type ArtifactCreated struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding ArtifactCreated) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding ArtifactCreated) ExchangeName() string {
return "exchange/taskcluster-queue/v1/artifact-created"
}
func (binding ArtifactCreated) NewPayloadObject() interface{} {
return new(ArtifactCreatedMessage)
}
// When a task is successfully completed by a worker a message is posted
// this exchange.
// This message is routed using the `runId`, `workerGroup` and `workerId`
// that completed the task. But information about additional runs is also
// available from the task status structure.
//
// See #taskCompleted
type TaskCompleted struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskCompleted) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskCompleted) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-completed"
}
func (binding TaskCompleted) NewPayloadObject() interface{} {
return new(TaskCompletedMessage)
}
// When a task ran, but failed to complete successfully a message is posted
// to this exchange. This is same as worker ran task-specific code, but the
// task specific code exited non-zero.
//
// See #taskFailed
type TaskFailed struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskFailed) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskFailed) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-failed"
}
func (binding TaskFailed) NewPayloadObject() interface{} {
return new(TaskFailedMessage)
}
// Whenever Taskcluster fails to run a message is posted to this exchange.
// This happens if the task isn't completed before its `deadlìne`,
// all retries failed (i.e. workers stopped responding), the task was
// canceled by another entity, or the task carried a malformed payload.
//
// The specific _reason_ is evident from that task status structure, refer
// to the `reasonResolved` property for the last run.
//
// See #taskException
type TaskException struct {
RoutingKeyKind string `mwords:"*"`
TaskID string `mwords:"*"`
RunID string `mwords:"*"`
WorkerGroup string `mwords:"*"`
WorkerID string `mwords:"*"`
ProvisionerID string `mwords:"*"`
WorkerType string `mwords:"*"`
SchedulerID string `mwords:"*"`
TaskGroupID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskException) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskException) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-exception"
}
func (binding TaskException) NewPayloadObject() interface{} {
return new(TaskExceptionMessage)
}
// A message is published on task-group-resolved whenever all submitted
// tasks (whether scheduled or unscheduled) for a given task group have
// been resolved, regardless of whether they resolved as successful or
// not. A task group may be resolved multiple times, since new tasks may
// be submitted against an already resolved task group.
//
// See #taskGroupResolved
type TaskGroupResolved struct {
RoutingKeyKind string `mwords:"*"`
TaskGroupID string `mwords:"*"`
SchedulerID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskGroupResolved) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskGroupResolved) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-group-resolved"
}
func (binding TaskGroupResolved) NewPayloadObject() interface{} {
return new(TaskGroupChangedMessage)
}
// A message is published on task-group-sealed whenever task group is sealed.
// This task group will no longer allow creation of new tasks.
//
// See #taskGroupSealed
type TaskGroupSealed struct {
RoutingKeyKind string `mwords:"*"`
TaskGroupID string `mwords:"*"`
SchedulerID string `mwords:"*"`
Reserved string `mwords:"#"`
}
func (binding TaskGroupSealed) RoutingKey() string {
return generateRoutingKey(&binding)
}
func (binding TaskGroupSealed) ExchangeName() string {
return "exchange/taskcluster-queue/v1/task-group-sealed"
}
func (binding TaskGroupSealed) NewPayloadObject() interface{} {
return new(TaskGroupChangedMessage)
}
func generateRoutingKey(x interface{}) string {
val := reflect.ValueOf(x).Elem()
p := make([]string, 0, val.NumField())
for i := 0; i < val.NumField(); i++ {
valueField := val.Field(i)
typeField := val.Type().Field(i)
tag := typeField.Tag
if t := tag.Get("mwords"); t != "" {
if v := valueField.Interface(); v == "" {
p = append(p, t)
} else {
p = append(p, v.(string))
}
}
}
return strings.Join(p, ".")
}