Skip to content

Commit

Permalink
added onComplete API
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Apr 15, 2017
1 parent 7e0959a commit d19e28e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 12 deletions.
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "2.0.0",
"version": "2.0.0-beta2",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./lib/index.js",
"engines": { "node" : ">=4.0.0" },
Expand Down
2 changes: 1 addition & 1 deletion src/index.js
Expand Up @@ -40,7 +40,7 @@ class PgBoss extends EventEmitter {
manager.promotedEvents.forEach(event => promoteEvent.call(this, manager, event));
this.manager = manager;

['fetch','complete','cancel','fail','publish','subscribe','unsubscribe','onExpire']
['fetch','complete','cancel','fail','publish','subscribe','unsubscribe','onComplete','onExpire']
.forEach(func => promoteApi.call(this, manager, func));

function promoteApi(obj, func){
Expand Down
27 changes: 21 additions & 6 deletions src/manager.js
Expand Up @@ -7,7 +7,8 @@ const Worker = require('./worker');
const plans = require('./plans');
const Attorney = require('./attorney');

const expireJobSuffix = plans.expireJobSuffix;
const expiredJobSuffix = plans.expiredJobSuffix;
const completedJobSuffix = plans.completedJobSuffix;

const events = {
job: 'job',
Expand Down Expand Up @@ -49,7 +50,7 @@ class Manager extends EventEmitter {

return Promise.map(result.rows, job => {
self.emit(events.expiredJob, job);
return self.publish(job.name + expireJobSuffix, job);
return self.publish(job.name + expiredJobSuffix, job);
});
}
});
Expand Down Expand Up @@ -88,7 +89,8 @@ class Manager extends EventEmitter {
assert(name in this.subscriptions, 'subscription not found for job ' + name);

removeSubscription.call(this, name);
removeSubscription.call(this, name + expireJobSuffix);
removeSubscription.call(this, name + expiredJobSuffix);
removeSubscription.call(this, name + completedJobSuffix);

function removeSubscription(name){
if(!this.subscriptions[name]) return;
Expand Down Expand Up @@ -195,7 +197,11 @@ class Manager extends EventEmitter {

onExpire(name, callback) {
// unwrapping job in callback because we love our customers
return this.subscribe(name + expireJobSuffix, (job, done) => callback(job.data, done));
return this.subscribe(name + expiredJobSuffix, (job, done) => callback(job.data, done));
}

onComplete(name, callback) {
return this.subscribe(name + completedJobSuffix, callback);
}

publish(...args){
Expand Down Expand Up @@ -302,11 +308,20 @@ class Manager extends EventEmitter {
});
}

complete(id){
complete(id, data){
return this.db.executeSql(this.completeJobCommand, [id])
.then(result => {
assert(result.rowCount === 1, `Job ${id} could not be completed.`);
return result.rows[0];
let job = result.rows[0];

if(data){
this.publish(job.name + completedJobSuffix, {
request: job,
response: data
});
}

return job;
});
}

Expand Down
14 changes: 10 additions & 4 deletions src/plans.js
@@ -1,5 +1,3 @@
const expireJobSuffix = '__expired';

const states = {
created: 'created',
retry: 'retry',
Expand All @@ -10,6 +8,9 @@ const states = {
failed: 'failed'
};

const expiredJobSuffix = `__${states.expired}`;
const completedJobSuffix = `__${states.complete}`;

module.exports = {
create,
insertVersion,
Expand All @@ -24,7 +25,8 @@ module.exports = {
archive,
countStates,
states,
expireJobSuffix
expiredJobSuffix,
completedJobSuffix
};

function create(schema) {
Expand Down Expand Up @@ -213,7 +215,11 @@ function archive(schema) {
return `
DELETE FROM ${schema}.job
WHERE (completedOn + CAST($1 as interval) < now())
OR (state = '${states.created}' and name like '%${expireJobSuffix}' and createdOn + CAST($1 as interval) < now())
OR (
state = '${states.created}'
AND (name LIKE '%${expiredJobSuffix}' OR name LIKE '%${completedJobSuffix}')
AND createdOn + CAST($1 as interval) < now()
)
`;
}

Expand Down
42 changes: 42 additions & 0 deletions test/completeTest.js
@@ -0,0 +1,42 @@
const assert = require('chai').assert;
const helper = require('./testHelper');

describe('complete', function() {

let boss;

before(function(finished){
helper.start()
.then(dabauce => {
boss = dabauce;
finished();
});
});

after(function(finished) {
boss.stop().then(() => finished());
});

it('should subscribe to the response on a complete call', function(finished){

const jobName = 'part-of-something-important';
const responsePayload = {message: 'super-important-payload', arg2: '123'};

let jobId = null;

boss.onComplete(jobName, job => {
assert.equal(jobId, job.data.request.id);
assert.equal(job.data.response.message, responsePayload.message);
assert.equal(job.data.response.arg2, responsePayload.arg2);

finished();
});

boss.publish(jobName)
.then(id => jobId = id)
.then(() => boss.fetch(jobName))
.then(job => boss.complete(job.id, responsePayload));

});

});

0 comments on commit d19e28e

Please sign in to comment.