Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature coalescing support #23

Merged
merged 9 commits into from
Apr 6, 2016
7 changes: 5 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ RUN apt-get update --fix-missing && \
lxc \
aufs-tools && \
apt-get clean && \
rm -rf /var/cache/apt/archives/* /var/lib/apt/lists/*
rm -rf /var/cache/apt/archives/* /var/lib/apt/lists/* && \
wget -O /usr/bin/dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.0.0/dumb-init_1.0.0_amd64 && \
chmod +x /usr/bin/dumb-init

##SSH Folder for known_hosts
RUN mkdir -p /root/.ssh && chmod 500 /root/.ssh && chown -R root:root /root/.ssh
Expand Down Expand Up @@ -70,5 +72,6 @@ EXPOSE 8080

VOLUME /var/lib/docker

ENTRYPOINT ["/usr/sbin/supervisord-wrapper.sh"]
ENTRYPOINT ["/usr/bin/dumb-init", "/usr/sbin/supervisord-wrapper.sh"]

CMD [""]
1 change: 1 addition & 0 deletions config/config-docker.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
}
},
"concurrentJobs" : 2,
"colaescingPeriod": 30000,
"swf":{
"region": "us-west-1",
"defaultTasklist": "build-image-list"
Expand Down
1 change: 1 addition & 0 deletions config/config-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
}
},
"concurrentJobs" : 2,
"colaescingPeriod": 30000,
"swf":{
"region": "us-west-1",
"defaultTasklist": "build-image-list"
Expand Down
1 change: 1 addition & 0 deletions config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
}
},
"concurrentJobs" : 2,
"colaescingPeriod": 30000,
"swf":{
"region": "us-west-1",
"defaultTasklist": "build-image-list"
Expand Down
2 changes: 1 addition & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module.exports = Object.freeze({
STATUS_PENDING: 'pending',
STATUS_RUNNING: 'running',
STATUS_SUCCESS: 'success',
STATUS_FAILURE: 'failure'
STATUS_FAILURE: 'failure',
},

SWF: {
Expand Down
76 changes: 72 additions & 4 deletions lib/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var winston = require('winston'),
JobRunner = require('./job'),
corsResponse = require('./restify/cors'),
usingSignedRequest = require('./restify/authorize-signature'),
constants = require('./constants'),
events = require('events'),
util = require('util'),
zSchema = require('z-schema'),
Expand Down Expand Up @@ -64,6 +65,7 @@ var DEFAULT_CONFIG = {
}
},
concurrentJobs: 1,
coalescingPeriod: 30000,
hook: {
secret: process.env.HOOK_SECRET || 'changeit',
postUrl: process.env.HOOK_POST_URL || ''
Expand Down Expand Up @@ -353,10 +355,11 @@ Factory.prototype.stop = function stop() {
*/
Factory.prototype.newJob = function newJob(context) {
var _this = this;
context.tag = context.tag || context.commit;
var job = _this.jobRunner.newJob(context);

// Store the Job in the datastore
_this.jobs.put(job.id, job);

// Add the Job to the job queue
_this.jobQueue.push(job, function(err) {
if(err) {
Expand Down Expand Up @@ -412,16 +415,81 @@ Factory.prototype.getImage = function getImage(job) {
*/
Factory.prototype.createJob = function createJob(req, res, context, next) {
var _this = this;
try {
// Create a new job
var job = _this.newJob(context);
context.tag = context.tag || context.commit;

function sendResponse(job) {
res.header('Content-Type', 'application/vnd.sh.melt.cdp.if.job.v1+json');
res.header('Link', '</_schema/job>; rel="describedBy"');
res.send(201, _this.toPublicJob(job));
}

try {
// Lump duplicate requests together to save resources
var originalJob = _this.coalesceRequests(res, context);
if(originalJob) {
winston.log('debug', 'Coalescing into ' + originalJob.id);
sendResponse(originalJob);
} else {
// Otherwise create a new job
var job = _this.newJob(context);
sendResponse(job);
}
return next();
} catch (e) {
winston.log('error', 'Error on POST /job due to: ' + e.message);
return next(new restify.InternalError(e.message));
}
};

/**
* Returns an array of job objects
* @returns [] of jobs
*/
Factory.prototype.getJobs = function getJobs() {
return _.values(this.jobs.get());
};

/**
* Checks for duplicate requests and coalesces them if found
* @param res Response object
* @param context Job context holding job parameters (with defaults applied)
* @returns The original job {} if duplicates found, false if not
*/
Factory.prototype.coalesceRequests = function coalesceRequests(res, context) {
var _this = this;

// Check for duplicate context in an array of jobs
function checkIfDuplicate(jobs) {
return _.find(jobs, {'context': context});
}

// Return an array of job that can be coalesced into
function coalescableJobs(jobs) {

// Filters out pending, running, or finished jobs within the coalescing period
function otherJobFilter(job, coalescingPeriod) {
var coalescingPeriod = _this.config.coalescingPeriod || 0;
var now = Date.now();
var diff = now - job.endTime.getTime();

return job.status === constants.EVENTS.STATUS_PENDING ||
job.status === constants.EVENTS.STATUS_RUNNING ||
diff <= coalescingPeriod;
}

var filteredJobs = jobs.filter(jobFilter);
return filteredJobs;
}

// An array of job objects
var jobs = this.getJobs();

// If duplicate job, attach responder to that job
var originalJob = checkIfDuplicate(coalescableJobs(jobs));
if(originalJob) {
return originalJob;
}

return false;
};

52 changes: 46 additions & 6 deletions test/integration/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ var factoryConfig = {
push: {
cmd: 'true',
timeout: 10000
},
concurrentJobs: 1
}
}
},
concurrentJobs: 1,
coalescingPeriod: 5000
};

var MINIMUM_BUILD_REQUEST = {
Expand Down Expand Up @@ -149,6 +150,38 @@ describe('Image Factory - REST API', function () {
);
});

it('should create a new Job and coalesce a second Job when POST /job twice with identical payloads within the coalescing period', function (done) {

async.times(
2,
function (n, next) {
request.post(
BASE_URL + '/job',
{ json: MINIMUM_BUILD_REQUEST },
function (err, response, body) {
next(err);
}
);
},
function (err, results) {
request.get(
BASE_URL + '/job',
function (err, response, body) {
expect(err).to.not.exist;
expect(response.statusCode).to.equal(200);
expect(response.headers).to.include.keys('link');
expect(response.headers['link']).to.contain('</_schema/job-list>; rel="describedBy"');
expect(response.headers).to.include.keys('content-type');
expect(response.headers['content-type']).to.contain('application/vnd.sh.melt.cdp.if.job-list.v1+json');
var results = JSON.parse(body);
expect(results).to.have.length(1);
done();
}
);
}
);
});

it('should create a new Job that is pending in queue while a previous job is building when POST /job with one concurrent job', function (done) {

// Creates the first job
Expand Down Expand Up @@ -346,7 +379,6 @@ describe('Image Factory - REST API', function () {
}
});


});

describe('GET /job/{id}/log', function () {
Expand Down Expand Up @@ -388,17 +420,25 @@ describe('Image Factory - REST API', function () {
});

});

describe('GET /job', function () {

it('should return a list of known Jobs when requesting GET /job', function (done) {

async.times(
5,
function (n, next) {
// Need to differentiate the requests because of coalescing
var owner = "fake-owner-" + n.toString();
var repo = "fake-repo-" + n.toString();
request.post(
BASE_URL + '/job',
{ json: FULL_BUILD_REQUEST },
{
json: {
owner: owner,
repo: repo
}
},
function (err, response, body) {
next(err);
}
Expand Down