From 8b11efd7046f52b4dc7f47b1a95cdefad723dc96 Mon Sep 17 00:00:00 2001 From: respinha Date: Fri, 16 Mar 2018 13:50:55 +0100 Subject: [PATCH] Added saveUniqueJob to Queue#schedule waterfall functions --- index.js | 81 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/index.js b/index.js index f3d1779..e52dc37 100644 --- a/index.js +++ b/index.js @@ -60,12 +60,12 @@ function ensureUniqueJob(job, done) { //assuming updated_at is in the past or now // updated_at is a built-in from kue. var timeSinceLastUpdate = now.getTime() - job.updated_at; // jshint ignore:line - var arbitraryThreshold = job.data.ttl + (job.data.ttl/2); + var arbitraryThreshold = job.data.ttl + (job.data.ttl / 2); var isStaleJob = - (job.state() === 'active' && + (job.state() === 'active' && timeSinceLastUpdate > arbitraryThreshold ); - if (isCompletedOrFailedJob|| isStaleJob) { + if (isCompletedOrFailedJob || isStaleJob) { //resave job for next run // //NOTE!: We inactivate job to allow kue to queue the same job for next run. @@ -286,8 +286,8 @@ Queue.prototype._getJobUUID = function (key) { var splits = key.split(':'); - splits = _.filter(splits, function(o) { return o !== ''; }); - if(splits.length > 0){ + splits = _.filter(splits, function (o) { return o !== ''; }); + if (splits.length > 0) { uuid = splits[splits.length - 1]; } @@ -338,7 +338,7 @@ Queue.prototype._saveJobData = function (jobDataKey, jobData, done) { //TODO make use of redis hash i.e redis.hmset(, ); this ._scheduler - .set(jobDataKey, JSON.stringify(jobData), function (error /*, response*/ ) { + .set(jobDataKey, JSON.stringify(jobData), function (error /*, response*/) { done(error, jobData); }); }; @@ -407,33 +407,33 @@ Queue.prototype._buildJob = function (jobDefinition, done) { //this refer to kue Queue instance context async.parallel({ - isDefined: function (next) { - //is job definition provided - var isObject = _.isPlainObject(jobDefinition); - if (!isObject) { - next(new Error('Invalid job definition')); - } else { - next(null, true); - } - }, - isValid: function (next) { - //check job for required attributes - // - //a valid job must have a type and - //associated data - var isValidJob = _.has(jobDefinition, 'type') && - ( - _.has(jobDefinition, 'data') && - _.isPlainObject(jobDefinition.data) - ); - - if (!isValidJob) { - next(new Error('Missing job type or data')); - } else { - next(null, true); - } + isDefined: function (next) { + //is job definition provided + var isObject = _.isPlainObject(jobDefinition); + if (!isObject) { + next(new Error('Invalid job definition')); + } else { + next(null, true); } }, + isValid: function (next) { + //check job for required attributes + // + //a valid job must have a type and + //associated data + var isValidJob = _.has(jobDefinition, 'type') && + ( + _.has(jobDefinition, 'data') && + _.isPlainObject(jobDefinition.data) + ); + + if (!isValidJob) { + next(new Error('Missing job type or data')); + } else { + next(null, true); + } + } + }, function finish(error, validations) { //is not well formatted job //back-off @@ -954,7 +954,18 @@ Queue.prototype.schedule = function (when, job, done) { function ensureSingleUniqueJob(job, next) { ensureUniqueJob(job, next); - } + }, + function saveUniqueJob(job, next) { + // if a unique name is specified, save it with the job details + if (job.data && job.data.unique) { + const jobDataKey = this._getJobDataKey(job.data.unique); + this._saveJobData(jobDataKey, job, function (error) { + next(error, job); + }); + } else { + next(null, job); + } + }.bind(this) ], function (error, job) { //fire schedule error event @@ -1086,7 +1097,7 @@ var shutdown = Queue.prototype.shutdown; * @return {Queue} for chaining * @api public */ -Queue.prototype.shutdown = function ( /*fn, timeout, type*/ ) { +Queue.prototype.shutdown = function ( /*fn, timeout, type*/) { //this refer to kue Queue instance context //TODO ensure all client shutdown with waiting delay @@ -1204,7 +1215,7 @@ kue.createQueue = function (options) { */ Queue.prototype.remove = Queue.prototype.removeJob = function (criteria, done) { //normalize callback - done = done || function noop() {}; + done = done || function noop() { }; //compute criteria and job instance async.parallel({ @@ -1449,7 +1460,7 @@ Queue.prototype._getAllJobData = function (done) { */ Queue.prototype.restore = function (done) { //ensure callback - done = _.isFunction(done) ? done : function () {}; + done = _.isFunction(done) ? done : function () { }; //fetch all job data this._getAllJobData(function (error, data) {