This repository has been archived by the owner on Jan 5, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
handler.js
370 lines (329 loc) · 12.6 KB
/
handler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
const assert = require('assert');
const Debug = require('debug');
const _ = require('lodash');
const slugid = require('slugid');
const taskcluster = require('taskcluster-client');
const libUrls = require('taskcluster-lib-urls');
const parseRoute = require('./util/route_parser');
const addArtifactUploadedLinks = require('./transform/artifact_links');
const {consume} = require('taskcluster-lib-pulse');
let debug = Debug('taskcluster-treeherder:handler');
function stateFromRun(run) {
switch (run.state) {
case 'exception':
case 'failed':
return 'completed';
default:
return run.state;
}
}
function resultFromRun(run) {
switch (run.state) {
case 'completed':
return 'success';
case 'failed':
return 'fail';
case 'exception':
if (run.reasonResolved === 'canceled') {
return 'canceled';
}
if (run.reasonResolved === 'superseded') {
return 'superseded';
}
return 'exception';
default:
return 'unknown';
}
}
// Creates a log entry for Treeherder to retrieve and parse. This log is
// displayed on the Treeherder Log Viewer once parsed.
function createLogReference(queue, taskId, run) {
let logUrl = `https://queue.taskcluster.net/v1/task/${taskId}` +
`/runs/${run.runId}/artifacts/public/logs/live_backing.log`;
return {
// XXX: This is a magical name see 1147958 which enables the log viewer.
name: 'builds-4h',
url: logUrl,
};
}
// Filters the task routes for the treeherder specific route. Once found,
// the route is parsed into distinct parts used for constructing the
// Treeherder job message.
function parseRouteInfo(prefix, taskId, routes, task) {
let matchingRoutes = routes.filter((r) => {
return r.split('.')[0] === prefix;
});
if (matchingRoutes.length != 1) {
throw new Error(
'Could not determine treeherder route. Either there is no route, ' +
`or more than one matching route exists. Task ID: ${taskId} Routes: ${routes}`
);
}
let parsedRoute = parseRoute(matchingRoutes[0]);
// During a transition period, some tasks might contain a revision within
// the task definition that should override the revision in the routing key.
let revision = _.get(task, 'extra.treeherder.revision');
if (revision) {
parsedRoute.revision = revision;
}
return parsedRoute;
}
function validateTask(monitor, validate, taskId, task, schema) {
if (!task.extra || !task.extra.treeherder) {
monitor.count('validateTask.no-config');
debug(`Message is missing Treeherder job configuration. Task ID: ${taskId}`);
return false;
}
let validationErrors = validate(task.extra.treeherder, schema);
if (validationErrors) {
monitor.count('validateTask.invalid-config');
debug(`Message contains an invalid Treeherder job configuration. Task ID: ${taskId} ${validationErrors}`);
return false;
}
monitor.count('validateTask.good-config');
return true;
}
module.exports = class Handler {
constructor(options) {
this.cfg = options.cfg;
this.queue = options.queue;
this.queueEvents = options.queueEvents;
this.pulseClient = options.pulseClient;
this.prefix = options.prefix;
this.publisher = options.publisher;
this.validator = options.validator;
this.monitor = options.monitor;
// build a mapping from exchange name to status
this.eventMap = {
[this.queueEvents.taskPending().exchange]: 'pending',
[this.queueEvents.taskRunning().exchange]: 'running',
[this.queueEvents.taskCompleted().exchange]: 'completed',
[this.queueEvents.taskFailed().exchange]: 'failed',
[this.queueEvents.taskException().exchange]: 'exception',
};
}
// Starts up the message handler and listens for messages
async start() {
debug('Starting handler');
const routingPattern = `route.${this.prefix}.#`;
this.pq = await consume({
client: this.pulseClient,
bindings: [
this.queueEvents.taskPending(routingPattern),
this.queueEvents.taskRunning(routingPattern),
this.queueEvents.taskCompleted(routingPattern),
this.queueEvents.taskFailed(routingPattern),
this.queueEvents.taskException(routingPattern),
],
queueName: this.cfg.pulse.queueName,
prefetch: this.cfg.pulse.prefetch,
}, async message => {
try {
await this.monitor.timer('handle-message.time', this.handleMessage(message));
this.monitor.count('handle-message.success');
} catch (err) {
this.monitor.count('handle-message.failure');
this.monitor.reportError(err);
};
});
}
// Listens for Task event messages and invokes the appropriate handler
// for the type of message received.
//
// Only messages that contain the properly formatted routing key and contains
// treeherder job information in task.extra.treeherder are accepted
async handleMessage(message) {
this.monitor.count('handle-message');
let taskId = message.payload.status.taskId;
let task = await this.queue.task(taskId);
let parsedRoute = parseRouteInfo(this.prefix, taskId, message.routes, task);
debug(`message received for task ${taskId} with route ${message.routes}`);
this.monitor.count(`${parsedRoute.project}.handle-message`);
// validation failures are common and logged, so do nothing more.
const schema = libUrls.schema(this.cfg.taskcluster.rootUrl,
'treeherder', 'v1/task-treeherder-config.json#');
if (!validateTask(this.monitor, this.validator, taskId, task, schema)) {
return;
}
switch (this.eventMap[message.exchange]) {
case 'pending':
let runId = message.payload.runId;
let run = message.payload.status.runs[message.payload.runId];
// If the task run was created for an infrastructure rerun, then resolve
// the previous run as retried.
if (runId > 0) {
await this.handleTaskRerun(parsedRoute, task, message.payload);
}
return await this.handleTaskPending(parsedRoute, task, message.payload);
case 'running':
return await this.handleTaskRunning(parsedRoute, task, message.payload);
case 'completed':
return await this.handleTaskCompleted(parsedRoute, task, message.payload);
case 'failed':
return await this.handleTaskFailed(parsedRoute, task, message.payload);
case 'exception':
return await this.handleTaskException(parsedRoute, task, message.payload);
default:
throw new Error(`Unknown exchange: ${message.exchange}`);
}
}
// Publishes the Treeherder job message to pulse.
async publishJobMessage(pushInfo, job, taskId) {
try {
debug(`Publishing message for ${pushInfo.project} with task ID ${taskId}`);
await this.publisher.jobs(job, {project:pushInfo.project, destination: pushInfo.destination});
debug(`Published message for ${pushInfo.project} with task ID ${taskId}`);
this.monitor.count(`${pushInfo.project}.publish-message.success`);
} catch (err) {
this.monitor.count(`${pushInfo.project}.publish-message.failure`);
this.monitor.reportError(err, {project: pushInfo.project});
let e = new Error(
`Could not publish job message for ${pushInfo.project} with task ID ${taskId}. ${err.message}. \n` +
`Job: ${JSON.stringify(job, null, 4)}`);
throw e;
}
}
// Builds the basic Treeherder job message that's universal for all
// messsage types.
//
// Specific handlers for each message type will add/remove information necessary
// for the type of task event..
buildMessage(pushInfo, task, runId, message) {
let taskId = message.status.taskId;
let run = message.status.runs[runId];
let treeherderConfig = task.extra.treeherder;
let job = {
buildSystem: 'taskcluster',
owner: task.metadata.owner,
taskId: `${slugid.decode(taskId)}/${runId}`,
retryId: runId,
isRetried: false,
display: {
// jobSymbols could be an integer (i.e. Chunk ID) but need to be strings
// for treeherder
jobSymbol: String(treeherderConfig.symbol),
groupSymbol: treeherderConfig.groupSymbol || '?',
// Maximum job name length is 100 chars...
jobName: task.metadata.name.slice(0, 99),
},
state: stateFromRun(run),
result: resultFromRun(run),
tier: treeherderConfig.tier || 1,
timeScheduled: task.created,
jobKind: treeherderConfig.jobKind ? treeherderConfig.jobKind : 'other',
reason: treeherderConfig.reason || 'scheduled',
jobInfo: {
links: [],
summary: task.metadata.description,
},
};
job.origin = {
kind: pushInfo.origin,
project: pushInfo.project,
revision: pushInfo.revision,
};
if (pushInfo.origin === 'hg.mozilla.org') {
job.origin.pushLogID = pushInfo.pushId;
} else {
job.origin.pullRequestID = pushInfo.pushId;
job.origin.owner = pushInfo.owner;
}
// Transform "collection" into an array of labels if task doesn't
// define "labels".
let labels = treeherderConfig.labels ? treeherderConfig.labels : [];
if (!labels.length) {
if (!treeherderConfig.collection) {
labels = ['opt'];
} else {
labels = Object.keys(treeherderConfig.collection);
}
}
job.labels = labels;
let machine = treeherderConfig.machine || {};
job.buildMachine = {
name: run.workerId || 'unknown',
platform: machine.platform || task.workerType,
os: machine.os || '-',
architecture: machine.architecture || '-',
};
if (treeherderConfig.productName) {
job.productName = treeherderConfig.productName;
}
if (treeherderConfig.groupName) {
job.display.groupName = treeherderConfig.groupName;
}
return job;
}
async handleTaskPending(pushInfo, task, message) {
let job = this.buildMessage(pushInfo, task, message.runId, message);
await this.publishJobMessage(pushInfo, job, message.status.taskId);
}
async handleTaskRerun(pushInfo, task, message) {
let run = message.status.runs[message.runId-1];
let job = this.buildMessage(pushInfo, task, message.runId-1, message);
job.state = 'completed';
job.result = 'fail';
job.isRetried = true;
// reruns often have no logs, so in the interest of not linking to a 404'ing artifact,
// don't include a link
job.logs = [];
job = await addArtifactUploadedLinks(this.queue,
this.monitor,
message.status.taskId,
message.runId-1,
job);
await this.publishJobMessage(pushInfo, job, message.status.taskId);
}
async handleTaskRunning(pushInfo, task, message) {
let run = message.status.runs[message.runId];
let job = this.buildMessage(pushInfo, task, message.runId, message);
job.timeStarted = message.status.runs[message.runId].started;
await this.publishJobMessage(pushInfo, job, message.status.taskId);
}
async handleTaskCompleted(pushInfo, task, message) {
let run = message.status.runs[message.runId];
let job = this.buildMessage(pushInfo, task, message.runId, message);
job.timeStarted = run.started;
job.timeCompleted = run.resolved;
job.logs = [createLogReference(this.queue, message.status.taskId, run)];
job = await addArtifactUploadedLinks(this.queue,
this.monitor,
message.status.taskId,
message.runId,
job);
await this.publishJobMessage(pushInfo, job, message.status.taskId);
}
async handleTaskFailed(pushInfo, task, message) {
let run = message.status.runs[message.runId];
let job = this.buildMessage(pushInfo, task, message.runId, message);
job.timeStarted = run.started;
job.timeCompleted = run.resolved;
job.logs = [createLogReference(this.queue, message.status.taskId, run)];
job = await addArtifactUploadedLinks(this.queue,
this.monitor,
message.status.taskId,
message.runId,
job);
await this.publishJobMessage(pushInfo, job, message.status.taskId);
}
async handleTaskException(pushInfo, task, message) {
let run = message.status.runs[message.runId];
// Do not report runs that were created as an exception. Such cases
// are deadline-exceeded
if (run.reasonCreated === 'exception') {
return;
}
let job = this.buildMessage(pushInfo, task, message.runId, message);
job.timeStarted = run.started;
job.timeCompleted = run.resolved;
// exceptions generally have no logs, so in the interest of not linking to a 404'ing artifact,
// don't include a link
job.logs = [];
job = await addArtifactUploadedLinks(this.queue,
this.monitor,
message.status.taskId,
message.runId,
job);
await this.publishJobMessage(pushInfo, job, message.status.taskId);
}
};