Skip to content

Commit

Permalink
Merge branch 'master' into includeCreatedBy
Browse files Browse the repository at this point in the history
  • Loading branch information
annyhe committed May 17, 2017
2 parents 24e15ed + 6666c2c commit fae614e
Show file tree
Hide file tree
Showing 27 changed files with 2,729 additions and 202 deletions.
34 changes: 21 additions & 13 deletions api/v1/controllers/samples.js
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,20 @@ module.exports = {
* POST /samples/upsert/bulk
*
* Upserts multiple samples. Returns "OK" without waiting for the upserts to
* happen. When "enableWorkerProcess" is enabled, the bulk upsert is enqueued
* to be processed by a separate worker process.
* happen. When "enableWorkerProcess" is set to true, the bulk upsert is
* enqueued to be processed by a separate worker process and the response
* is returned with a job id.
*
* @param {IncomingMessage} req - The request object
* @param {ServerResponse} res - The response object
* @returns {ServerResponse} - The response object indicating merely that the
* bulk upsert request has been received.
*/
bulkUpsertSample(req, res/* , next */) {
bulkUpsertSample(req, res, next) {
const resultObj = { reqStartTime: new Date() };
const reqStartTime = Date.now();
const value = req.swagger.params.queryBody.value;
const body = { status: 'OK' };
const readOnlyFields = helper.readOnlyFields.filter((field) =>
field !== 'name');
u.getUserNameFromToken(req,
Expand All @@ -211,22 +213,30 @@ module.exports = {
if (featureToggles.isFeatureEnabled('enableWorkerProcess')) {
const jobType = require('../../../jobQueue/setup').jobType;
const jobWrapper = require('../../../jobQueue/jobWrapper');

const wrappedBulkUpsertData = {};
wrappedBulkUpsertData.upsertData = value;
wrappedBulkUpsertData.userName = userName;
wrappedBulkUpsertData.reqStartTime = reqStartTime;
wrappedBulkUpsertData.readOnlyFields = helper.readOnlyFields;
const j = jobWrapper.createJob(jobType.BULKUPSERTSAMPLES,
wrappedBulkUpsertData, req);
wrappedBulkUpsertData.readOnlyFields = readOnlyFields;

const jobPromise = jobWrapper
.createPromisifiedJob(jobType.BULKUPSERTSAMPLES,
wrappedBulkUpsertData, req);
jobPromise.then((job) => {
// set the job id in the response object before it is returned
body.jobId = job.id;
u.logAPI(req, resultObj, body, value.length);
return res.status(httpStatus.OK).json(body);
})
.catch((err) => u.handleError(next, err, helper.modelName));
} else {
const sampleModel =
featureToggles.isFeatureEnabled(sampleStoreConstants.featureName) ?
redisModelSample : helper.model;

/*
*send the upserted sample to the client by publishing it to the redis
*channel
* Send the upserted sample to the client by publishing it to the redis
* channel
*/
sampleModel.bulkUpsertByName(value, userName, readOnlyFields)
.then((samples) => {
Expand All @@ -236,12 +246,10 @@ module.exports = {
}
});
});
u.logAPI(req, resultObj, body, value.length);
return res.status(httpStatus.OK).json(body);
}
});

const body = { status: 'OK' };
u.logAPI(req, resultObj, body, value.length);
return res.status(httpStatus.OK).json(body);
},

/**
Expand Down
28 changes: 15 additions & 13 deletions api/v1/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3145,14 +3145,12 @@ paths:
items:
type: string
enum:
- id
- messageBody
- messageCode
- status
- value
- createdAt
- updatedAt
- -id
- -messageBody
- -messageCode
- -status
Expand Down Expand Up @@ -3303,7 +3301,7 @@ paths:
name: key
in: path
description: >-
The id or name of the sample to delete.
The name of the sample to delete.
required: true
type: string
responses:
Expand All @@ -3324,15 +3322,15 @@ paths:
summary: Retrieve the specified sample
tags: [ samples ]
description: >-
Retrieve the specified sample by its id or name. You may also optionally
Retrieve the specified sample by its name. You may also optionally
specify a list of fields to include in the response. If the Refocus configuration parameter `useAccessToken` is set to `true`, you must include an `Authorization` request header with your [JSON Web Token](https://tools.ietf.org/html/rfc7519) (JWT) as the value. You can get a token using `POST /v1/register` or `POST /v1/tokens`.
operationId: getSample
parameters:
-
name: key
in: path
description: >-
The id or name of the sample to retrieve
The name of the sample to retrieve
required: true
type: string
-
Expand Down Expand Up @@ -3363,7 +3361,7 @@ paths:
name: key
in: path
description: >-
The id or name of the sample to update.
The name of the sample to update.
required: true
type: string
-
Expand Down Expand Up @@ -3432,7 +3430,7 @@ paths:
name: key
in: path
description: >-
The id or name of the sample to update.
The name of the sample to update.
required: true
type: string
-
Expand Down Expand Up @@ -3644,7 +3642,7 @@ paths:
name: key
in: path
description: >-
The id or name of the sample for which related links needs to be
The name of the sample for which related links needs to be
deleted.
required: true
type: string
Expand Down Expand Up @@ -3679,7 +3677,7 @@ paths:
name: key
in: path
description: >-
The id or name of the sample for which related link needs to be
The name of the sample for which related link needs to be
deleted.
required: true
type: string
Expand Down Expand Up @@ -6755,9 +6753,6 @@ definitions:
An observation of a particular aspect for a particular subject at a
particular point in time.
properties:
id:
type: string
readOnly: true
messageBody:
type: string
readOnly: true
Expand Down Expand Up @@ -6859,7 +6854,14 @@ definitions:
readOnly: true
description: >
Status.
jobId:
type: integer
readOnly: true
description: >
Optionally a jobId is returned with the response when the bulk upsert
request is enqueued as a job for the workers to process it
asynchronously. The returned jobId can be used to check the status
of the bulk upsert request.
SubjectsResponse:
type: object
description: >
Expand Down
7 changes: 7 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ const readReplicas = configUtil.getReadReplicas(pe, replicaConfigLabel);

const DEFAULT_JOB_QUEUE_TTL_SECONDS = 3600;

// default set to 30 minutes
const DEFAULT_JOB_REMOVAL_DELAY_SECONDS = 1800;

const JOB_REMOVAL_DELAY_SECONDS = pe.KUE_JOBS_REMOVAL_DELAY ||
DEFAULT_JOB_REMOVAL_DELAY_SECONDS;

/*
* If you're using worker dynos, you can set env vars PRIORITIZE_JOBS_FROM
* and/or DEPRIORITIZE_JOBS_FROM to comma-separated lists of ip addresses if
Expand Down Expand Up @@ -218,6 +224,7 @@ module.exports = {
DEFAULT_CHECK_TIMEOUT_INTERVAL_MILLIS,
CACHE_EXPIRY_IN_SECS,
JOB_QUEUE_TTL_SECONDS,
JOB_REMOVAL_DELAY_SECONDS,
deprioritizeJobsFrom,
endpointToLimit,
httpMethodToLimit,
Expand Down
56 changes: 47 additions & 9 deletions jobQueue/jobWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ const TIME_TO_LIVE =

/*
* The delay is introduced to avoid the job.id leakage. It can be any
* arbitary number large enough that it does not cause the leakage.
* arbitary number large enough that it does not cause the leakage. The delay
* is converted to milliseconds by multiplying it by 1000.
*
* TODO: Clean this up once we move job removal listener to the clock process.
*/
const delayToRemoveJobs = 3000;
const delayToRemoveJobs =
1000 * jobSetup.delayToRemoveJobs; // eslint-disable-line no-magic-numbers

/**
* Set log object params from job results.
Expand Down Expand Up @@ -75,6 +77,8 @@ function mapJobResultsToLogObject(jobResultObj, logObject) {
* APIs to monitor the jobs.
*
* @param {Object} job - Job object to be cleaned up from the queue
* @param {Object} logObject - Object containing the information that needs to
* be logged.
*/
function processJobOnComplete(job, logObject) {
if (job) {
Expand All @@ -98,7 +102,7 @@ function processJobOnComplete(job, logObject) {
}
}, delayToRemoveJobs);

// if enableWorkerActivityLogs is enabled, update logObject
// when enableWorkerActivityLogs are enabled, update the logObject
if (featureToggles.isFeatureEnabled('enableWorkerActivityLogs') &&
jobResultObj && logObject) {
mapJobResultsToLogObject(jobResultObj, logObject);
Expand All @@ -110,8 +114,10 @@ function processJobOnComplete(job, logObject) {
.update(jobResultObj.recordCount, jobResultObj.queueTime);
}

/* The second argument should match the activity type in
/config/activityLog.js */
/*
* The second argument should match the activity type in
* /config/activityLog.js
*/
activityLogUtil.printActivityLogString(logObject, 'worker');
}
});
Expand All @@ -137,11 +143,12 @@ function logAndRemoveJobOnComplete(req, job) {
logObject.jobId = job.id;
}

logObject.ipAddress = activityLogUtil.getIPAddrFromReq(req);

/* if req object, then extract user, token and ipaddress and update log
object */
/*
* If req object is defined; extract the user name, token and ipaddress and
* update the log object
*/
if (req) {
logObject.ipAddress = activityLogUtil.getIPAddrFromReq(req);
jwtUtil.getTokenDetailsFromRequest(req)
.then((resObj) => {
logObject.user = resObj.username;
Expand Down Expand Up @@ -179,6 +186,36 @@ function calculateJobPriority(jobName, data, req) {
return 'normal';
} // calculateJobPriority

/**
* This is a promisified version of the createJob function which resolves to
* the job created and saved by the Kue api.
*
* @param {String} jobName - The job name. A worker process will be
* listening for this jobName to process the jobs.
* @param {Object} data - Data for the job to work with.
* @param {Object} req - Request object.
* @returns {Object} - A job object. The job object will be null when the
* jobQueue is created in the test mode.
*/
function createPromisifiedJob(jobName, data, req) {
const jobPriority = calculateJobPriority(jobName, data, req);
return new Promise((resolve, reject) => {
const job = jobQueue.create(jobName, data)
.ttl(TIME_TO_LIVE)
.priority(jobPriority)
.save((err) => {
if (err) {
const msg =
`Error adding ${jobName} job (id ${job.id}) to the worker queue`;
return reject(msg);
}

logAndRemoveJobOnComplete(req, job);
return resolve(job);
});
});
} // createPromisifiedJob

/**
* Creates a job to be processed using the KUE api, given the jobName and
* data to be processed by the job.
Expand Down Expand Up @@ -211,6 +248,7 @@ function createJob(jobName, data, req) {
module.exports = {
jobQueue,
createJob,
createPromisifiedJob,
mapJobResultsToLogObject,
logAndRemoveJobOnComplete,
}; // exports
1 change: 1 addition & 0 deletions jobQueue/setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ module.exports = {
SAMPLE_TIMEOUT: 'SAMPLE_TIMEOUT',
},
ttlForJobs: conf.JOB_QUEUE_TTL_SECONDS,
delayToRemoveJobs: conf.JOB_REMOVAL_DELAY_SECONDS,
}; // exports

0 comments on commit fae614e

Please sign in to comment.