Skip to content

Commit

Permalink
expiration support via callback in subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Mar 25, 2017
1 parent 8337a16 commit 7a39073
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 18 deletions.
62 changes: 49 additions & 13 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Manager extends EventEmitter {
this.completeJobCommand = plans.completeJob(config.schema);
this.cancelJobCommand = plans.cancelJob(config.schema);

this.workers = [];
this.subscriptions = {};
}

monitor(){
Expand All @@ -31,8 +31,19 @@ class Manager extends EventEmitter {
function expire() {
return self.db.executeSql(self.expireJobCommand)
.then(result => {
if (result.rowCount)
self.emit('expired', result.rowCount);
if (result.rows.length){
self.emit('expired', result.rows.length);

result.rows.forEach(job => {
let subscription = self.subscriptions[job.name];

if(!subscription || !subscription.onExpire)
return;

subscription.onExpire(job);
});
}

});
}

Expand All @@ -48,48 +59,68 @@ class Manager extends EventEmitter {
}

close() {
this.workers.forEach(worker => worker.stop());
this.workers.length = 0;
Object.keys(this.subscriptions)
.forEach(name => this.subscriptions[name].workers.forEach(worker => worker.stop())
);

this.subscriptions = {};

return Promise.resolve(true);
}

stop() {
this.close().then(() => {
return this.close().then(() => {
this.stopped = true;

if(this.expireTimer)
clearTimeout(this.expireTimer);
});
}

unsubscribe(name){
assert(name in this.subscriptions, 'subscription not found for job ' + name);

this.subscriptions[name].workers.forEach(worker => worker.stop());

delete this.subscriptions[name];
}

subscribe(name, ...args){

assert(!(name in this.subscriptions), 'this job has already been subscribed on this instance.')

let self = this;
let noop = function(){};

return getArgs(args)
.then(({options, callback}) => register(options, callback));
.then(({options, callback, onExpire}) => register(options, callback, onExpire));


function getArgs(args) {

let options, callback;
let options, callback, onExpire = noop;

try {
assert(name, 'boss requires all jobs to have a name');

if(args.length === 1){
callback = args[0];
options = {};
} else if (args.length === 2){
} else if (args.length > 1){
options = args[0] || {};
callback = args[1];

if(args[2])
onExpire = args[2];
}

assert(typeof callback == 'function', 'expected a callback function');
assert(typeof callback == 'function', 'expected callback to be a function');

if(options)
assert(typeof options == 'object', 'expected config to be an object');

assert(typeof onExpire == 'function', 'expected onExpire to be a function');

options = options || {};
options.teamSize = options.teamSize || 1;

Expand All @@ -102,10 +133,12 @@ class Manager extends EventEmitter {
return Promise.reject(e);
}

return Promise.resolve({options, callback});
return Promise.resolve({options, callback, onExpire});
}

function register(options, callback) {
function register(options, callback, onExpire) {

let subscription = self.subscriptions[name] = {workers:[]};

let onError = error => self.emit('error', error);

Expand Down Expand Up @@ -134,8 +167,11 @@ class Manager extends EventEmitter {
for(let w=0; w < options.teamSize; w++){
let worker = new Worker(workerConfig);
worker.start();
self.workers.push(worker);
subscription.workers.push(worker);
}

if(onExpire !== noop)
subscription.onExpire = onExpire;
}

}
Expand Down
14 changes: 9 additions & 5 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,15 @@ function fetchNextJob(schema) {

function expireJob(schema) {
return `
UPDATE ${schema}.job
SET state = CASE WHEN retryCount < retryLimit THEN 'retry'::${schema}.job_state ELSE 'expired'::${schema}.job_state END,
completedOn = CASE WHEN retryCount < retryLimit THEN NULL ELSE now() END
WHERE state = 'active'
AND (startedOn + expireIn) < now()
WITH expired AS (
UPDATE ${schema}.job
SET state = CASE WHEN retryCount < retryLimit THEN 'retry'::${schema}.job_state ELSE 'expired'::${schema}.job_state END,
completedOn = CASE WHEN retryCount < retryLimit THEN NULL ELSE now() END
WHERE state = 'active'
AND (startedOn + expireIn) < now()
RETURNING id, name, state
)
SELECT * FROM expired WHERE state = 'expired';
`;
}

Expand Down

0 comments on commit 7a39073

Please sign in to comment.