Skip to content

Commit

Permalink
better handling of batching via subscribe() - restored support for te…
Browse files Browse the repository at this point in the history
…amSize option
  • Loading branch information
timgit committed May 15, 2017
1 parent 386b7d2 commit 21cd823
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 23 deletions.
33 changes: 14 additions & 19 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ class Manager extends EventEmitter {
watch(name, options, callback){
assert(!(name in this.subscriptions), 'this job has already been subscribed on this instance.');

if('teamSize' in options)
console.warn('pg-boss subscribe(): teamSize option is now obsolete. It has been replaced by batchSize starting in version 2');

options.batchSize = options.batchSize || 1;
options.batchSize = options.batchSize || options.teamSize || 1;

if('newJobCheckInterval' in options || 'newJobCheckIntervalSeconds' in options)
options = Attorney.applyNewJobCheckInterval(options);
Expand All @@ -127,25 +124,23 @@ class Manager extends EventEmitter {
};

let respond = jobs => {
if(!jobs) return;
if (!jobs) return;

if(!Array.isArray(jobs))
if (!Array.isArray(jobs))
jobs = [jobs];

jobs.forEach(job => {
this.emit(events.job, job);
job.done = error => complete(error, job);
});

setImmediate(() => {
try {
if(jobs.length === 1)
callback(jobs[0], jobs[0].done);
else
callback(jobs);
} catch(error) {
jobs.forEach(job => this.emit(events.failed, {job, error}));
}
jobs.forEach(job => {
this.emit(events.job, job);
job.done = error => complete(error, job);

try {
callback(job, job.done);
}
catch (error) {
this.emit(events.failed, {job, error})
}
});
});

};
Expand Down
20 changes: 19 additions & 1 deletion test/fetchTest.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const Promise = require('bluebird');
const assert = require('chai').assert;
const helper = require('./testHelper');

Expand All @@ -21,7 +22,7 @@ describe('fetch', function(){
boss.fetch().catch(() => finished());
});

it('should fetch a single job by name and manually complete', function(finished) {
it('should get a single job by name and manually complete', function(finished) {
let jobName = 'no-subscribe-required';

boss.publish(jobName)
Expand All @@ -36,6 +37,23 @@ describe('fetch', function(){
});
});

it('should get a batch of jobs as an array', function(finished){
const jobName = 'fetch-batch';
const batchSize = 4;

Promise.join(
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName)
)
.then(() => boss.fetch(jobName, batchSize))
.then(jobs => {
assert(jobs.length === batchSize);
finished();
});
});

});


Expand Down
13 changes: 10 additions & 3 deletions test/subscribeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,21 @@ describe('subscribe', function(){

it('should handle a batch of jobs', function(finished){
const jobName = 'subscribe-batch';
const batchSize = 4;
let subscribeCount = 0;

Promise.join(
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName)
)
.then(() => boss.subscribe(jobName, {batchSize:2}, jobs => {
assert(jobs.length === 2);
finished();
.then(() => boss.subscribe(jobName, {batchSize}, job => {
subscribeCount++;

// idea here is that the test would time out if it had to wait for 4 intervals
if(subscribeCount === batchSize)
finished();
})
);
});
Expand Down

0 comments on commit 21cd823

Please sign in to comment.