Skip to content

Commit

Permalink
let the worker process handle the expensive bulk upsert call
Browse files Browse the repository at this point in the history
  • Loading branch information
shriramshankar committed Nov 10, 2016
1 parent 1239a21 commit 372f5e4
Show file tree
Hide file tree
Showing 13 changed files with 356 additions and 4 deletions.
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
web: npm start
clock: npm run start-clock
worker: npm run start-worker
18 changes: 15 additions & 3 deletions api/v1/controllers/samples.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/
'use strict';

const featureToggles = require('feature-toggles');
const helper = require('../helpers/nouns/samples');
const doDelete = require('../helpers/verbs/doDelete');
const doFind = require('../helpers/verbs/doFind');
Expand All @@ -21,7 +22,11 @@ const doPut = require('../helpers/verbs/doPut');
const u = require('../helpers/verbs/utils');
const httpStatus = require('../constants').httpStatus;
const logAPI = require('../../../utils/loggingUtil').logAPI;

const jobType = featureToggles.isFeatureEnabled('useWorkerProcess') === true ?
require('../../../jobQueue/setup').jobType : null;
const jobWrapper =
featureToggles.isFeatureEnabled('useWorkerProcess') === true ?
require('../../../jobQueue/jobWrapper'): null;
module.exports = {

/**
Expand Down Expand Up @@ -133,15 +138,22 @@ module.exports = {
* POST /samples/upsert/bulk
*
* Upserts multiple samples. Returns "OK" without waiting for the upserts to
* happen.
* happen. The bulk upsert is sent to the queue to be later processed by
* the workers, if they are enabled.
*
* @param {IncomingMessage} req - The request object
* @param {ServerResponse} res - The response object
* @returns {ServerResponse} - The response object indicating merely that the
* bulk upsert request has been received.
*/
bulkUpsertSample(req, res /* , next */) {
helper.model.bulkUpsertByName(req.swagger.params.queryBody.value);
if (jobType && jobWrapper) {
jobWrapper.createJob(jobType.BULKUPSERTSAMPLES,
req.swagger.params.queryBody.value);
} else {
helper.model.bulkUpsertByName(req.swagger.params.queryBody.value);
}

if (helper.loggingEnabled) {
logAPI(req, helper.modelName);
}
Expand Down
3 changes: 3 additions & 0 deletions clock/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ const conf = require('../config');
const env = conf.environment[conf.nodeEnv];
const dbSample = require('../db/index').Sample;

const clockStarted = 'Clock process started';
console.log(clockStarted); // eslint-disable-line no-console

if (featureToggles.isFeatureEnabled('enableClockDyno')) {
setInterval(() => dbSample.doTimeout(), env.checkTimeoutIntervalMillis);
}
2 changes: 2 additions & 0 deletions config/migrationConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* /config/migrationConfig
* Migration config parameters
*/
'use strict'; // eslint-disable-line strict

const conf = require('../config');

module.exports = {
Expand Down
1 change: 1 addition & 0 deletions config/passportconfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* ./passport.js
* Passport strategies
*/
'use strict'; // eslint-disable-line strict

const LocalStrategy = require('passport-local').Strategy;
const User = require('../db/index').User;
Expand Down
3 changes: 3 additions & 0 deletions config/toggles.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const longTermToggles = {
// Enable heroku clock dyno
enableClockDyno: environmentVariableTrue(pe, 'HEROKU_CLOCK_DYNO'),

// Use worker process
useWorkerProcess: environmentVariableTrue(pe, 'USE_WORKER_PROCESS'),

}; // longTermToggles

/*
Expand Down
64 changes: 64 additions & 0 deletions jobQueue/jobWrapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Copyright (c) 2016, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or
* https://opensource.org/licenses/BSD-3-Clause
*/

/**
* /jobQueue/jobWrapper.js
*
* Expsoses a wrapper to create jobs and save them to the redis queue for
* it to be processed by the workers later. The "kue" library defined in the
* setup is used for this purpose.
*/

'use strict'; // eslint-disable-line strict

const jobQueue = require('./setup').jobQueue;

const delayToRemoveJobs = 3000;

/**
* Listen for a job completion event and clean up (remove) the job. The delay
* is introduced to avoid the job.id leakage.
* TODO: This needs to be moved to the clock process, once we start exposing
* APIs to monitor the jobs.
* @param {Object} job - Job object to be cleaned up from the queue
*
*/
function cleanUpJobOnComplete(job) {
if (job) {
job.on('complete', () => {
setTimeout(() => {
job.remove();
}, delayToRemoveJobs);
});
}
}

/**
* Creates a job to be prossed using the KUE api, when given the jobName and
* data to be processed by the job
* @param {String} jobName - Name of the job (A worker process should be
* listning to this jobName for it to be processed)
* @param {Json} data - Data for the job to work with.
* @returns {Object} - A job object
*/
function createJob(jobName, data) {
// jobQueue.create will return undefined when it is created in the test mode
const job = jobQueue.create(jobName, data)
.save((err) => {
if (err) {
throw new Error('There was a problem in adding the job: ' + jobName +
', with id:' + job.id + 'to the worker queue');
}
});
cleanUpJobOnComplete(job);
return job;
}
module.exports = {
jobQueue,
createJob
};
42 changes: 42 additions & 0 deletions jobQueue/setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright (c) 2016, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or
* https://opensource.org/licenses/BSD-3-Clause
*/

/**
* /jobQueue/setup.js
*
* Setup the "Kue" library to process background jobs
*/

'use strict'; // eslint-disable-line strict

const conf = require('../config');
const env = conf.environment[conf.nodeEnv];
const urlParser = require('url');
const kue = require('kue');

let redisUrl = env.redisUrl;
const redisOptions = {};
if (redisUrl) {
const redisInfo = urlParser.parse(redisUrl, true);

if (redisInfo.protocol !== 'redis:') {
redisUrl = 'redis:' + redisUrl;
}

redisOptions.redis = redisUrl;
}

// create a job queue using the redis options specified
const jobQueue = kue.createQueue(redisOptions);

module.exports = {
jobQueue,
jobType: {
BULKUPSERTSAMPLES: 'bulkUpsertSamples',
}
};
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"view": "NODE_ENV=production gulp browserifyViews && npm start",
"start": "node --max_old_space_size=1500 --nouse-idle-notification index.js",
"start-clock": "node --max_old_space_size=1500 --nouse-idle-notification clock/index.js",
"start-worker": "node --max_old_space_size=1500 --nouse-idle-notification worker/jobProcessor.js",
"dropdb": "node db/createOrDropDb.js -d",
"initdb": "node db/createOrDropDb.js -i",
"resetdb": "node db/reset.js",
Expand All @@ -29,12 +30,13 @@
"undo-migratedb": "node db/migrateUndo.js",
"test-api": "mocha -R dot --recursive tests/api",
"test-disablehttp": "DISABLE_HTTP=true mocha -R dot --recursive tests/disableHttp",
"test-jobQueue": "USE_WORKER_PROCESS=true mocha -R dot --recursive tests/jobQueue",
"test-token-req": "NODE_ENV=testTokenReq mocha -R dot --recursive tests/tokenReq",
"test-token-notreq": "NODE_ENV=testTokenNotReq mocha -R dot --recursive tests/tokenNotReq",
"test-db": "npm run resetdb && mocha -R dot --recursive tests/db",
"test-realtime": "mocha -R dot --recursive tests/realtime",
"test-view": "NODE_ENV=test mocha -R dot --recursive --compilers js:babel-core/register --require ./tests/view/setup.js tests/view",
"test": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R dot --recursive tests/api tests/db tests/config && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage && npm run test-view && npm run test-realtime && npm run test-token-req && npm run test-token-notreq && npm run test-disablehttp",
"test": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R dot --recursive tests/api tests/db tests/config && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage && npm run test-view && npm run test-realtime && npm run test-token-req && npm run test-token-notreq && npm run test-disablehttp && npm run test-jobQueue",
"postinstall": "NODE_ENV=production gulp browserifyViews && gulp movecss && gulp movesocket && gulp movelensutil",
"prestart": "npm run checkdb"
},
Expand Down Expand Up @@ -80,6 +82,7 @@
"jscs": "^3.0.7",
"jsdom": "^9.8.3",
"jsonwebtoken": "^5.7.0",
"kue": "^0.11.5",
"lodash": "^4.0.0",
"mocha": "^2.3.4",
"multer": "^1.0.3",
Expand Down Expand Up @@ -122,6 +125,7 @@
"swagger-tools": "^0.10.1",
"throng": "^1.0.1",
"underscore": "^1.8.3",
"url": "^0.11.0",
"validator": "^4.0.2",
"vinyl-source-stream": "^1.1.0",
"webpack": "^1.12.10",
Expand Down
129 changes: 129 additions & 0 deletions tests/jobQueue/v1/bulkUpsert.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/**
* Copyright (c) 2016, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or
* https://opensource.org/licenses/BSD-3-Clause
*/

/**
* tests/jobQueue/v1/bulkUpsert.js
*/
'use strict';

const jobQueue = require('../../../jobQueue/setup').jobQueue;
const jobType = require('../../../jobQueue/setup').jobType;
const expect = require('chai').expect;
const supertest = require('supertest');
const api = supertest(require('../../../index').app);
const tu = require('../../testUtils');
const u = require('./utils');
const constants = require('../../../api/v1/constants');
const Aspect = tu.db.Aspect;
const Subject = tu.db.Subject;
const path = '/v1/samples/upsert/bulk';
describe('api: POST ' + path, () => {
let token;

before((done) => {
tu.createToken()
.then((returnedToken) => {
token = returnedToken;
done();
})
.catch((err) => done(err));
});
// force the job queue to enter the test mode.
beforeEach((done) => {
jobQueue.testMode.enter();
done();
});

before((done) => {
Aspect.create({
isPublished: true,
name: `${tu.namePrefix}Aspect1`,
timeout: '30s',
valueType: 'NUMERIC',
criticalRange: [0, 1],
})
.then(() => Aspect.create({
isPublished: true,
name: `${tu.namePrefix}Aspect2`,
timeout: '10m',
valueType: 'BOOLEAN',
okRange: [10, 100],
}))
.then(() => Subject.create({
isPublished: true,
name: `${tu.namePrefix}Subject`,
}))
.then(() => done())
.catch((err) => done(err));
});


afterEach(() => {
jobQueue.testMode.clear();
});

after(u.forceDelete);
after(tu.forceDeleteUser);

it('sample bulkUpsert should be sent to the queue', (done) => {
api.post(path)
.set('Authorization', token)
.send([
{
name: `${tu.namePrefix}Subject|${tu.namePrefix}Aspect1`,
value: '2',
}, {
name: `${tu.namePrefix}Subject|${tu.namePrefix}Aspect2`,
value: '4',
},
])
.expect(constants.httpStatus.OK)
.end((err) => {
if (err) {
return done(err);
}
// make sure only 1 job is created for each bulk upsert call
expect(jobQueue.testMode.jobs.length).to.equal(1);
// make sure the job type is correct
expect(jobQueue.testMode.jobs[0].type)
.to.equal(jobType.BULKUPSERTSAMPLES);
// make sure the queue has the right data inside it
expect(jobQueue.testMode.jobs[0].data).to.have.length(2);
expect(jobQueue.testMode.jobs[0].data[0])
.to.have.all.keys('name', 'value');

return done();
});
});

it('should still return ok for good or bad samples', (done) => {
api.post(path)
.set('Authorization', token)
.send([
{
name: `${tu.namePrefix}NOT_EXIST|${tu.namePrefix}Aspect1`,
value: '2',
}, {
name: `${tu.namePrefix}Subject|${tu.namePrefix}Aspect2`,
value: '4',
},
])
.expect(constants.httpStatus.OK)
.expect((res) => {
expect(res.body.status).to.contain('OK');
})
.end((err, res) => {
if (err) {
return done(err);
}

return done();
});
});
});

34 changes: 34 additions & 0 deletions tests/jobQueue/v1/jobWrapper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright (c) 2016, salesforce.com, inc.
* All rights reserved.
* Licensed under the BSD 3-Clause license.
* For full license text, see LICENSE.txt file in the repo root or
* https://opensource.org/licenses/BSD-3-Clause
*/

/**
* tests/jobQueue/v1/jobWrapper.js
*/
'use strict';

const jobQueue = require('../../../jobQueue/jobWrapper').jobQueue;
const expect = require('chai').expect;
const tu = require('../../testUtils');
const u = require('./utils');
const path = '/v1/samples/upsert/bulk';

describe('api: POST ' + path, () => {
after(u.forceDelete);
after(tu.forceDeleteUser);

it('jobWrapper should let you create any job type of job', (done) => {
const jobType = 'myTestJob';
const testData = { foo: 'bar' };
const job = jobQueue.createJob(jobType, testData);
expect(job).to.not.equal(null);
expect(job.type).to.equal(jobType);
expect(job.data).to.equal(testData);
done();
});
});

0 comments on commit 372f5e4

Please sign in to comment.