Skip to content

Commit

Permalink
added fetch api for one off job requests
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Jan 31, 2017
1 parent 4dd8e8d commit 2cefca2
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 4 deletions.
10 changes: 10 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ class PgBoss extends EventEmitter {
if(!this.isReady) return Promise.reject(notReadyErrorMessage);
return this.manager.publish.apply(this.manager, arguments);
}

fetch(){
if(!this.isReady) return Promise.reject(notReadyErrorMessage);
return this.manager.fetch.apply(this.manager, arguments);
}

complete(){
if(!this.isReady) return Promise.reject(notReadyErrorMessage);
return this.manager.complete.apply(this.manager, arguments);
}
}

module.exports = PgBoss;
19 changes: 15 additions & 4 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ class Manager extends EventEmitter {
}

function register() {
let jobFetcher = () =>
self.db.executePreparedSql('nextJob', self.nextJobCommand, name)
.then(result => result.rows.length ? result.rows[0] : null);
let jobFetcher = () => self.fetch(name);

let workerConfig = {name: name, fetcher: jobFetcher, interval: self.config.newJobCheckInterval};

Expand All @@ -120,7 +118,6 @@ class Manager extends EventEmitter {
worker.on('error', error => self.emit('error', error));

worker.on(name, job => {
job.name = name;
self.emit('job', job);
setImmediate(() => callback(job, () => self.complete(job.id)));
});
Expand Down Expand Up @@ -209,6 +206,20 @@ class Manager extends EventEmitter {

}

fetch(name) {
return this.db.executePreparedSql('nextJob', this.nextJobCommand, name)
.then(result => {
if(result.rows.length === 0)
return null;

let job = result.rows[0];

job.name = name;

return job;
});
}

complete(id){
return this.db.executeSql(this.completeJobCommand, [id])
.then(result => {
Expand Down
38 changes: 38 additions & 0 deletions test/fetchTest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
var assert = require('chai').assert;
var helper = require('./testHelper');

describe('fetch', function(){

var boss;

before(function(finished){
helper.start()
.then(dabauce => {
boss = dabauce;
finished();
});
});

after(function(finished){
boss.stop().then(() => finished());
});

it('should fetch a single job by name and manually complete', function(finished) {
var jobName = 'no-subscribe-required';

boss.publish(jobName)
.then(() => boss.fetch(jobName))
.then(job => {
assert(jobName === job.name);
return boss.complete(job.id);
})
.then(() => {
assert(true);
finished();
});
});

});



0 comments on commit 2cefca2

Please sign in to comment.