Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Task reclaim fix #19

Merged
merged 3 commits into from

1 participant

@lightsofapollo

No description provided.

@lightsofapollo

Landing for my gaia work... mostly some small fixes and cleanup... right now we can claim tasks but they are being claimed way to fast (sending tons of running events)

@lightsofapollo lightsofapollo merged commit 109bf3b into taskcluster:master
@lightsofapollo lightsofapollo deleted the lightsofapollo:task-reclaim-fix branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
1  docker_worker/node_modules/superagent-promise/.npmignore
@@ -0,0 +1 @@
+node_modules/
View
84 docker_worker/node_modules/superagent-promise/index.js
@@ -1,29 +1,93 @@
/**
-Promise wrapper for superagent
-*/
+ * Promise wrapper for superagent
+ */
-var superagent = require('superagent');
-var Request = superagent.Request;
-var Promise = require('promise');
+var superagent = require('superagent');
+var Promise = require('promise');
+/**
+ * Request object similar to superagent.Request, but with end() returning
+ * a promise.
+ */
function PromiseRequest() {
- Request.apply(this, arguments);
+ superagent.Request.apply(this, arguments);
}
-PromiseRequest.prototype = Object.create(Request.prototype);
+// Inherit form superagent.Request
+PromiseRequest.prototype = Object.create(superagent.Request.prototype);
+/** Send request and get a promise that `end` was emitted */
PromiseRequest.prototype.end = function() {
- var _super = Request.prototype.end;
+ var _super = superagent.Request.prototype.end;
var context = this;
return new Promise(function(accept, reject) {
_super.call(context, function(err, value) {
- if (err) return reject(err);
+ if (err) {
+ return reject(err);
+ }
accept(value);
});
});
};
-module.exports = function(method, url) {
+/**
+ * Request builder with same interface as superagent.
+ * It is convenient to import this as `request` in place of superagent.
+ */
+var request = function(method, url) {
return new PromiseRequest(method, url);
};
+
+/** Helper for making a get request */
+request.get = function(url, data) {
+ var req = request('GET', url);
+ if (data) {
+ req.query(data);
+ }
+ return req;
+};
+
+/** Helper for making a head request */
+request.head = function(url, data) {
+ var req = request('HEAD', url);
+ if (data) {
+ req.send(data);
+ }
+ return req;
+};
+
+/** Helper for making a delete request */
+request.del = function(url) {
+ return request('DELETE', url);
+};
+
+/** Helper for making a patch request */
+request.patch = function(url, data) {
+ var req = request('PATCH', url);
+ if (data) {
+ req.send(data);
+ }
+ return req;
+};
+
+/** Helper for making a post request */
+request.post = function(url, data) {
+ var req = request('POST', url);
+ if (data) {
+ req.send(data);
+ }
+ return req;
+};
+
+/** Helper for making a put request */
+request.put = function(url, data) {
+ var req = request('PUT', url);
+ if (data) {
+ req.send(data);
+ }
+ return req;
+};
+
+// Export the request builder
+module.exports = request;
View
86 docker_worker/node_modules/superagent-promise/index_test.js
@@ -1,18 +1,37 @@
suite('superagent-promise', function() {
- var assert = require('assert');
- var agent = require('./');
- var http = require('http');
+ var assert = require('assert');
+ var request = require('./');
+ var http = require('http');
+ var debug = require('debug')('index_test');
// start the server
var server;
- var body = 'woot';
+ var successBody = 'woot';
+ var errorBody = 'Not Found';
setup(function(done) {
server = http.createServer(function(req, res) {
- res.writeHead(200, {
- 'Content-Length': body.length,
- 'Content-Type': 'text/plain'
- });
- res.end(body);
+ if (/success$/.test(req.url)) {
+ debug("Responding with 200");
+ res.writeHead(200, {
+ 'Content-Length': successBody.length,
+ 'Content-Type': 'text/plain'
+ });
+ res.end(successBody);
+ } else if(/NotFound$/.test(req.url)) {
+ debug("Responding with 404");
+ res.writeHead(404, {
+ 'Content-Length': errorBody.length,
+ 'Content-Type': 'text/plain'
+ });
+ res.end(errorBody);
+ } else if(/error$/.test(req.url)) {
+ debug("Responding with 200, but mismatching Content-Length");
+ res.writeHead(404, {
+ 'Content-Length': successBody.length - 2,
+ 'Content-Type': 'text/plain'
+ });
+ res.end(successBody);
+ }
});
server.listen(0, done);
@@ -25,11 +44,11 @@ suite('superagent-promise', function() {
test('issue request', function(done) {
var addr = server.address();
- var url = 'http://' + addr.address + ':' + addr.port;
+ var url = 'http://' + addr.address + ':' + addr.port + "/success";
- agent('GET', url).end().then(
+ request('GET', url).end().then(
function(res) {
- assert.equal(res.text, body);
+ assert.equal(res.text, successBody);
done();
},
@@ -38,4 +57,47 @@ suite('superagent-promise', function() {
}
);
});
+
+ test('issue request with .get', function(done) {
+ var addr = server.address();
+ var url = 'http://' + addr.address + ':' + addr.port + "/success";
+
+ request.get(url).end().then(
+ function(res) {
+ assert.equal(res.text, successBody);
+ done();
+ },
+
+ function(err) {
+ done(err);
+ }
+ );
+ });
+
+ test('issue 404 request', function(done) {
+ var addr = server.address();
+ var url = 'http://' + addr.address + ':' + addr.port + "/NotFound";
+
+ request('GET', url).end().then(function(res) {
+ assert.ok(!res.ok);
+ assert.equal(res.text, errorBody);
+ done();
+ }, function(err) {
+ console.log(err);
+ done(err);
+ });
+ });
+
+ test('test error', function(done) {
+ var addr = server.address();
+ var url = 'http://' + addr.address + ':' + addr.port + "/error";
+
+ request('GET', url).end().then(function(res) {
+ assert.ok(false);
+ done();
+ }, function(err) {
+ assert.ok(err);
+ done();
+ });
+ });
});
View
17 docker_worker/node_modules/superagent-promise/package.json
@@ -1,10 +1,10 @@
{
"name": "superagent-promise",
- "version": "0.0.0",
+ "version": "0.1.0",
"description": "superagent promise wrapper",
"main": "index.js",
"scripts": {
- "test": "./node_modules/.bin/mocha --ui tdd index_test.js"
+ "test": "mocha --ui tdd index_test.js"
},
"repository": {
"type": "git",
@@ -27,15 +27,18 @@
"promise": ">= 3.2.0"
},
"devDependencies": {
- "mocha": "~1.17.1"
+ "mocha": "~1.17.1",
+ "superagent": ">= 0.16.0",
+ "promise": ">= 3.2.0",
+ "debug": "0.7.4"
},
"readme": "superagent-promise\n==================\n\nSimple/dumb promise wrapper for superagent. Both `superagent` and\n`promise` are peerDependencies. The `.get`, `.del`, etc.. helper methods\nare not present here.\n\n\n## Usage\n\n```js\nvar agent = require('superagent-promise');\n\nagent('GET', 'http://google.com').end().then(\n function onResult() {\n \n }\n);\n```\n",
"readmeFilename": "README.md",
"homepage": "https://github.com/lightsofapollo/superagent-promise",
- "_id": "superagent-promise@0.0.0",
+ "_id": "superagent-promise@0.1.0",
"dist": {
- "shasum": "9ffb4d8495b980877e1a9a2598ba8a4485951907"
+ "shasum": "39bbfc7c2304dced1c078205ef90efbe66cadb5d"
},
- "_from": "superagent-promise@0.0.0",
- "_resolved": "https://registry.npmjs.org/superagent-promise/-/superagent-promise-0.0.0.tgz"
+ "_from": "superagent-promise@0.1.0",
+ "_resolved": "https://registry.npmjs.org/superagent-promise/-/superagent-promise-0.1.0.tgz"
}
View
2  docker_worker/package.json
@@ -23,7 +23,7 @@
"promise": "~4.0.0",
"dockerode-promise": "0.0.1",
"superagent": "~0.16.0",
- "superagent-promise": "0.0.0",
+ "superagent-promise": "0.1.0",
"commander": "~2.1.0",
"debug": "~0.7.4",
"dockerode-process": "0.3.1",
View
294 docker_worker/taskrun.js
@@ -1,16 +1,19 @@
var Promise = require('promise');
-var request = require('superagent');
+var request = require('superagent-promise');
var fs = require('fs');
var mime = require('mime');
var debug = require('debug')('taskrun');
var assert = require('assert');
var queue = require('./queue');
-/**
- * Minimum time remaining until `takenUntil` expires before reclaim is
- * initialized, if `keepTask()` is used.
- */
-var RECLAIM_TIME = 1000 * 60 * 3;
+
+// XXX: This code should live in the client not in the worker
+function handleRequestError(res) {
+ if (res.error) {
+ // XXX: we can (and actually do) better in the client...
+ throw res.error;
+ }
+}
/**
* Create a new TaskRun instance, this class help you keep a task run, upload
@@ -39,29 +42,28 @@ var TaskRun = function(owner, task, status, runId, logsPutUrl, resultPutUrl) {
* reimplementing the timing logic.
*/
TaskRun.prototype.reclaimTask = function() {
- var that = this;
- var taskId = that.status.taskId;
- return new Promise(function(accept, reject) {
- var url = queue.queueUrl('/task/' + taskId + '/claim');
- request
- .post(url)
- .send({
- workerGroup: that.owner.workerGroup,
- workerId: that.owner.workerId,
- runId: that._runId
- })
- .end(function(res) {
- if (!res.ok) {
- debug("Failed to reclaim task: %s", taskId);
- return reject();
- }
- debug("Successfully, reclaimed task: %s", taskId);
- that.status = res.body.status;
- that._logsPutUrl = res.body.logsPutUrl;
- that._resultPutUrl = res.body.resultPutUrl;
- accept();
- });
- });
+ var taskId = this.status.taskId;
+ var url = queue.queueUrl('/task/' + taskId + '/claim');
+
+ return request
+ .post(url)
+ .send({
+ workerGroup: this.owner.workerGroup,
+ workerId: this.owner.workerId,
+ runId: this._runId
+ })
+ .end()
+ .then(function(res) {
+ if (res.error) {
+ debug("Failed to reclaim task: %s", taskId, res.text);
+ throw res.error;
+ }
+
+ debug("Successfully, reclaimed task: %s", taskId);
+ this.status = res.body.status;
+ this._logsPutUrl = res.body.logsPutUrl;
+ this._resultPutUrl = res.body.resultPutUrl;
+ }.bind(this));
};
/**
@@ -71,27 +73,34 @@ TaskRun.prototype.reclaimTask = function() {
* The optional argument `abortCallback` will be called if a reclaim fails.
*/
TaskRun.prototype.keepTask = function(abortCallback) {
- var that = this;
- var reclaim = null;
- // Function to set reclaim timeout
- var setReclaimTimeout = function() {
- that._reclaimTimeoutHandle = setTimeout(reclaim,
- (new Date(that.status.takenUntil)).getTime() -
- (new Date()).getTime() - RECLAIM_TIME
- );
- };
// Function to reclaim and set reclaim timeout again
- reclaim = function() {
- that.reclaimTask().then(setReclaimTimeout, function() {
- // TODO: This is a little aggressive, we should allow it to fail a few
- // times before we abort... And we should check the error code, 404
- // Task not found, means task completed or canceled, in which case we
- // really should abort immediately
- if (abortCallback) {
- abortCallback();
- }
- });
- };
+ var reclaim = function() {
+ this.reclaimTask().
+ then(setReclaimTimeout).
+ catch(function(err) {
+ console.error('Error while attempting to issue claim');
+ console.error(err.stack);
+
+ // TODO: This is a little aggressive, we should allow it to fail a few
+ // times before we abort... And we should check the error code, 404
+ // Task not found, means task completed or canceled, in which case we
+ // really should abort immediately
+ abortCallback && abortCallback();
+ });
+ }.bind(this);
+
+ var setReclaimTimeout = function() {
+ // calculate when (in absolute time) when to issue the claim
+ var takenUntil = new Date(this.status.takenUntil);
+
+ // calculate the time in milliseconds from now. Default to now if its in the
+ // past.
+ var nextTick = Math.max((takenUntil.valueOf() - Date.now()) * 0.7, 0);
+ debug('rescheduling reclaim', { nextTick: nextTick });
+ this._reclaimTimeoutHandle = setTimeout(reclaim, nextTick);
+
+ }.bind(this);
+
// Set reclaim time out
setReclaimTimeout();
};
@@ -107,40 +116,22 @@ TaskRun.prototype.clearKeepTask = function() {
/** Put logs.json for current run, returns promise of success */
TaskRun.prototype.putLogs = function(json) {
- var that = this;
- return new Promise(function(accept, reject) {
- debug("Uploading logs.json to signed PUT URL");
- request
- .put(that._logsPutUrl)
- .send(json)
- .end(function(res) {
- if (!res.ok) {
- debug("Failed to upload logs.json, error: %s", res.text)
- return reject();
- }
- debug("Successfully, uploaded logs.json");
- accept();
- });
- });
+ debug('Uploading logs.json to signed PUT URL');
+ return request
+ .put(this._logsPutUrl)
+ .send(json)
+ .end()
+ .then(handleRequestError);
};
/** Put result.json for current run, returns promise of success */
TaskRun.prototype.putResult = function(json) {
- var that = this;
- return new Promise(function(accept, reject) {
- debug("Uploading result.json to PUT URL");
- request
- .put(that._resultPutUrl)
- .send(json)
- .end(function(res) {
- if (!res.ok) {
- debug("Failed to upload logs.json, error: %s", res.text)
- return reject();
- }
- debug("Successfully, uploaded result.json");
- accept();
- });
- });
+ debug("Uploading result.json to PUT URL");
+ return request
+ .put(this._resultPutUrl)
+ .send(json)
+ .end()
+ .then(handleRequestError);
};
/**
@@ -163,63 +154,22 @@ TaskRun.prototype.putArtifact = function(name, filename, contentType) {
accept(stat);
});
}).then(function(stat) {
- return new Promise(function(accept, reject) {
- // Lookup mimetype if not provided
- if (!contentType) {
- contentType = mime.lookup(filename);
- }
-
- // Create artifacts map to submit
- var artifacts = {};
- artifacts[name] = {
- contentType: contentType
- };
-
- // Construct request URL for fetching signed artifact PUT URLs
- var url = queue.queueUrl('/task/' + that.status.taskId + '/artifact-urls');
+ // Lookup mimetype if not provided
+ if (!contentType) {
+ contentType = mime.lookup(filename);
+ }
- // Request artifact put urls
- request
- .post(url)
- .send({
- workerGroup: that.owner.workerGroup,
- workerId: that.owner.workerId,
- runId: that._runId,
- artifacts: artifacts
- })
- .end(function(res) {
- if (!res.ok) {
- debug("Failed get a signed artifact URL, errors: %s", res.text);
- return reject(res.text);
- }
- debug("Got signed artifact PUT URL from queue");
- var artifactUrls = res.body.artifacts[name];
- var req = request
- .put(artifactUrls.artifactPutUrl)
- .set('Content-Type', contentType)
- .set('Content-Length', stat.size);
- fs.createReadStream(filename).pipe(req, {end: false});
- req.end(function(res) {
- if (!res.ok) {
- debug("Failed to upload to signed artifact PUT URL");
- return reject(res.text);
- }
- debug("Successfully uploaded artifact %s to PUT URL", name);
- accept(artifactUrls[name].artifactUrl);
- });
- });
- });
- });
-};
+ // Create artifacts map to submit
+ var artifacts = {};
+ artifacts[name] = {
+ contentType: contentType
+ };
-TaskRun.prototype.getArtifactPutUrls = function(artifacts) {
- var that = this;
- return new Promise(function(accept, reject) {
// Construct request URL for fetching signed artifact PUT URLs
var url = queue.queueUrl('/task/' + that.status.taskId + '/artifact-urls');
// Request artifact put urls
- request
+ return request
.post(url)
.send({
workerGroup: that.owner.workerGroup,
@@ -227,41 +177,77 @@ TaskRun.prototype.getArtifactPutUrls = function(artifacts) {
runId: that._runId,
artifacts: artifacts
})
- .end(function(res) {
- if (!res.ok) {
+ .end()
+ .then(function(res) {
+ if (res.error) {
debug("Failed get a signed artifact URL, errors: %s", res.text);
- return reject(res.text);
+ throw res.error;
}
debug("Got signed artifact PUT URL from queue");
- accept(res.body.artifacts);
+ var artifactUrls = res.body.artifacts[name];
+ var req = request
+ .put(artifactUrls.artifactPutUrl)
+ .set('Content-Type', contentType)
+ .set('Content-Length', stat.size);
+ fs.createReadStream(filename).pipe(req, {end: false});
+ return req.end().then(function(res) {
+ if (!res.ok) {
+ debug("Failed to upload to signed artifact PUT URL");
+ return reject(res.text);
+ }
+ debug("Successfully uploaded artifact %s to PUT URL", name);
+ accept(artifactUrls[name].artifactUrl);
+ });
});
});
};
+TaskRun.prototype.getArtifactPutUrls = function(artifacts) {
+ // Construct request URL for fetching signed artifact PUT URLs
+ var url = queue.queueUrl('/task/' + this.status.taskId + '/artifact-urls');
+
+ // Request artifact put urls
+ return request
+ .post(url)
+ .send({
+ workerGroup: this.owner.workerGroup,
+ workerId: this.owner.workerId,
+ runId: this._runId,
+ artifacts: artifacts
+ })
+ .end()
+ .then(function(res) {
+ if (res.error) {
+ debug("Failed get a signed artifact URL, errors: %s", res.text);
+ throw res.error;
+ }
+ debug("Got signed artifact PUT URL from queue");
+ return res.body.artifacts;
+ });
+};
+
/** Report task completed, returns promise of success */
TaskRun.prototype.taskCompleted = function() {
this.clearKeepTask();
- var that = this;
- return new Promise(function(accept, reject) {
- var url = queue.queueUrl('/task/' + that.status.taskId + '/completed');
- request
- .post(url)
- .send({
- workerGroup: that.owner.workerGroup,
- workerId: that.owner.workerId,
- runId: that._runId
- })
- .end(function(res) {
- if (!res.ok) {
- debug("Failed to report task as completed, error code: %s", res.status);
- return reject(res.text);
- }
- debug("Successfully reported task completed");
- accept();
- });
- });
+
+ var url = queue.queueUrl('/task/' + this.status.taskId + '/completed');
+ return request
+ .post(url)
+ .send({
+ workerGroup: this.owner.workerGroup,
+ workerId: this.owner.workerId,
+ runId: this._runId
+ })
+ .end()
+ .then(function(res) {
+ if (res.error) {
+ debug("Failed to report task as completed, error code: %s", res.status);
+ throw res.error;
+ }
+ debug("Successfully reported task completed");
+ });
};
// Export TaskRun
-module.exports = TaskRun;
+module.exports = TaskRun;
View
18 docker_worker/test/integration/reclaim_test.js
@@ -0,0 +1,18 @@
+suite('reclaim timeout', function() {
+ var testworker = require('../testworker');
+
+ test('extract artifacts', function() {
+ return testworker.submitTaskAndGetResults({
+ image: 'ubuntu',
+ command: ['/bin/bash', '-c', 'sleep 65'], // terrible but there is no other way to
+ features: {
+ bufferLog: true
+ },
+ }).then(function(data) {
+ var result = data.result;
+ // XXX: result clearly is not a great name for both levels here.
+ var output = result.result;
+ assert.equal(output.exitCode, 0);
+ });
+ });
+});
View
3  docker_worker/test/testworker.js
@@ -89,7 +89,8 @@ exports.submitTaskAndGetResults = function(payload) {
workerType: workerType,
owner: 'unknown@localhost.local',
name: 'Task from docker-worker test suite',
- deadline: 1
+ deadline: 1,
+ timeout: 30
});
});
Something went wrong with that request. Please try again.