More monitor #98
More monitor #98
Changes from all commits
08491d4
f5fa8db
4f45ea9
39a3180
3b462c8
7c14f95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{ | ||
"extends": "eslint-config-taskcluster" | ||
} |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ let Promise = require('promise'); | |
* secretAccessKey: // ... | ||
* }, | ||
* bucketCDN: // https://cdn-for-bucket.com | ||
* monitor: // base.monitor instance | ||
* } | ||
*/ | ||
var Bucket = function(options) { | ||
|
@@ -23,11 +24,14 @@ var Bucket = function(options) { | |
assert(options.credentials, "credentials must be specified"); | ||
assert(!options.bucketCDN || typeof(options.bucketCDN) === 'string', | ||
"Expected bucketCDN to be a hostname or empty string for none"); | ||
assert(options.monitor, "options.monitor is required"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit, this error doesn't is a little different than the others... perhaps just "monitor must be given" ? I'm using the bucket and credentials assertions above as an example. |
||
if (options.bucketCDN) { | ||
assert(/^https?:\/\//.test(options.bucketCDN), "bucketCDN must be http(s)"); | ||
assert(/[^\/]$/.test(options.bucketCDN), | ||
"bucketCDN shouldn't end with slash"); | ||
} | ||
// Store the monitor | ||
this.monitor = options.monitor; | ||
// Ensure access to the bucket property | ||
this.bucket = options.bucket; | ||
// Create S3 client | ||
|
@@ -149,11 +153,10 @@ Bucket.prototype.setupCORS = async function() { | |
debug("CORS already set for bucket: %s", this.bucket); | ||
return; | ||
} | ||
} | ||
catch (err) { | ||
} catch (err) { | ||
// Failed to fetch CORS, ignoring issue for now | ||
debug("[alert-operator] Failed to fetch CORS, err: %s, JSON: %j", | ||
err, err, err.stack); | ||
err.note = 'Failed to fetch CORS in bucket.js'; | ||
this.monitor.reportError(err, 'warning'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. need to wait for this to complete? |
||
} | ||
|
||
// Set CURS | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,7 +28,7 @@ const RESOLVED_STATES = [ | |
* ignored as it implies a failed request (which wasn't retried) or a run | ||
* that was reclaimed. | ||
*/ | ||
class ClaimResolver extends events.EventEmitter { | ||
class ClaimResolver { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this no longer extends EventEmitter, do you need to import events anymore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point |
||
/** | ||
* Create ClaimResolver instance. | ||
* | ||
|
@@ -41,11 +41,10 @@ class ClaimResolver extends events.EventEmitter { | |
* pollingDelay: // Number of ms to sleep between polling | ||
* parallelism: // Number of polling loops to run in parallel | ||
* // Each handles up to 32 messages in parallel | ||
* monitor: // base.monitor instance | ||
* } | ||
*/ | ||
constructor(options) { | ||
super(); | ||
|
||
assert(options, "options must be given"); | ||
assert(options.Task.prototype instanceof data.Task, | ||
"Expected data.Task instance"); | ||
|
@@ -57,12 +56,14 @@ class ClaimResolver extends events.EventEmitter { | |
"Expected pollingDelay to be a number"); | ||
assert(typeof(options.parallelism) === 'number', | ||
"Expected parallelism to be a number"); | ||
assert(options.monitor !== null, 'options.monitor required!'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment I gave with the other assertion |
||
this.Task = options.Task; | ||
this.queueService = options.queueService; | ||
this.dependencyTracker = options.dependencyTracker; | ||
this.publisher = options.publisher; | ||
this.pollingDelay = options.pollingDelay; | ||
this.parallelism = options.parallelism; | ||
this.monitor = options.monitor; | ||
|
||
// Promise that polling is done | ||
this.done = null; | ||
|
@@ -83,8 +84,13 @@ class ClaimResolver extends events.EventEmitter { | |
loops.push(this.poll()); | ||
} | ||
// Create promise that we're done looping | ||
this.done = Promise.all(loops).catch((err) => { | ||
this.emit('error', err); // This should crash the process | ||
this.done = Promise.all(loops).catch(async (err) => { | ||
console.log("Crashing the process: %s, as json: %j", err, err); | ||
// TODO: use this.monitor.reportError(err); when PR lands: | ||
// https://github.com/taskcluster/taskcluster-lib-monitor/pull/27 | ||
await this.monitor.reportError(err, 'error', {}, true); | ||
// Crash the process | ||
process.exit(1); | ||
}).then(() => { | ||
this.done = null; | ||
}); | ||
|
@@ -102,13 +108,14 @@ class ClaimResolver extends events.EventEmitter { | |
var messages = await this.queueService.pollClaimQueue(); | ||
debug("Fetched %s messages", messages.length); | ||
|
||
await Promise.all(messages.map((message) => { | ||
await Promise.all(messages.map(async (message) => { | ||
// Don't let a single task error break the loop, it'll be retried later | ||
// as we don't remove message unless they are handled | ||
return this.handleMessage(message).catch((err) => { | ||
debug("[alert-operator] Failed to handle message: %j" + | ||
", with err: %s, as JSON: %j", message, err, err, err.stack); | ||
}); | ||
try { | ||
await this.handleMessage(message); | ||
} catch (err) { | ||
this.monitor.reportError(err, 'warning'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before you were including the message in the alert-operator statement, do you not want to include that in the error you're reporting? |
||
} | ||
})); | ||
|
||
if(messages.length === 0 && !this.stopping) { | ||
|
@@ -147,11 +154,15 @@ class ClaimResolver extends events.EventEmitter { | |
|
||
// Check if this is the takenUntil we're supposed to be resolving for, if | ||
// this check fails, then the conditional load must have failed so we should | ||
// alert operator! | ||
// report an error | ||
if (task.takenUntil.getTime() !== takenUntil.getTime()) { | ||
debug("[alert-operator] Task takenUntil doesn't match takenUntil from " + | ||
"message, taskId: %s, task.takenUntil: %s, message.takenUntil: %s ", | ||
taskId, task.takenUntil.toJSON(), takenUntil.toJSON()); | ||
let err = new Error('Task takenUntil does not match takenUntil from ' + | ||
'message, taskId: ' + taskId + ' this only happens ' + | ||
'if conditional load does not work'); | ||
err.taskId = taskId; | ||
err.taskTakenUntil = task.takenUntil.toJSON(); | ||
err.messageTakenUntil = takenUntil.toJSON(); | ||
await this.monitor.reportError(err); | ||
return remove(); | ||
} | ||
|
||
|
@@ -161,8 +172,6 @@ class ClaimResolver extends events.EventEmitter { | |
if (!run) { | ||
// The run might not have been created, if the claimTask operation | ||
// failed | ||
debug("[not-a-bug] runId: %s does exists on taskId: %s, but " + | ||
"deadline message has arrived", runId, taskId); | ||
return; | ||
} | ||
|
||
|
@@ -191,8 +200,11 @@ class ClaimResolver extends events.EventEmitter { | |
|
||
// If the run isn't the last run, then something is very wrong | ||
if (task.runs.length - 1 !== runId) { | ||
debug("[alert-operator] running runId: %s, resolved exception, " + | ||
"but it wasn't the last run! taskId: ", runId, taskId); | ||
let err = new Error('Running runId: ' + runId + ' resolved exception,' + | ||
'but it was not the last run! taskId: ' + taskId); | ||
err.taskId = taskId; | ||
err.runId = runId; | ||
this.monitor.reportError(err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. await? |
||
return; | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -279,7 +279,8 @@ let Artifact = base.Entity.configure({ | |
context: [ | ||
'blobStore', // BlobStore instance wrapping Azure Blob Storage | ||
'privateBucket', // Private artifact bucket wrapping S3 | ||
'publicBucket' // Public artifact bucket wrapping S3 | ||
'publicBucket', // Public artifact bucket wrapping S3 | ||
'monitor', // base.monitor instance | ||
] | ||
}); | ||
|
||
|
@@ -316,9 +317,12 @@ Artifact.prototype.remove = function(ignoreError) { | |
} else if (this.details.bucket === this.privateBucket.bucket) { | ||
deleted = this.privateBucket.deleteObject(this.details.prefix); | ||
} else { | ||
debug("[alert-operator] Expiring artifact with bucket: %s, which isn't " + | ||
"configured for use. Please investigate taskId: %s, runId: %s", | ||
this.details.bucket, this.taskId, this.runId); | ||
let err = new Error("Expiring artifact with bucket which isn't " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you intermingle single and double quotes when you deal with strings, I suppose the linter stuff will pick up on this if there is the right rule for it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. linter PR will fix this... |
||
"configured for use. Please investigate!"); | ||
err.bucket = this.details.bucket; | ||
err.taskId = this.taskId; | ||
err.runId = this.runId; | ||
this.monitor.reportError(err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. await? |
||
return; | ||
} | ||
} | ||
|
@@ -329,9 +333,12 @@ Artifact.prototype.remove = function(ignoreError) { | |
this.details.container, this.details.path); | ||
// Validate that this is the configured container | ||
if (this.details.container !== this.blobStore.container) { | ||
debug("[alert-operator] Expiring artifact with container: %s, which " + | ||
"configured for use. Please investigate taskId: %s, runId: %s", | ||
this.details.container, this.taskId, this.runId); | ||
let err = new Error("Expiring artifact with container which isn't " + | ||
"configured for use. Please investigate!"); | ||
err.container = this.details.container; | ||
err.taskId = this.taskId; | ||
err.runId = this.runId; | ||
this.monitor.reportError(err); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, so I think that this should be used with "await" but I'll refrain from commenting on it anymore just to reduce the noise. |
||
return; | ||
} | ||
deleted = this.blobStore.deleteBlob(this.details.path, true); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should wait for this promise to resolve?