Skip to content

Commit

Permalink
added disconnect API
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Jun 7, 2016
1 parent d8fc02f commit af8cd86
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 4 deletions.
13 changes: 13 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ class PgBoss extends EventEmitter {
});
}

disconnect() {
if(!this.isReady) return Promise.reject(`boss ain't ready. Use start() or connect() to get started.`);
return this.manager.close.apply(this.manager, arguments).then(() => this.isReady = false);
}

// stop() {
// return Promise.all([
// this.disconnect(),
// this.manager.stopSupervise(),
// this.boss.stopMonitor()
// ]);
// }

cancel(){
if(!this.isReady) return Promise.reject(`boss ain't ready. Use start() or connect() to get started.`);
return this.manager.cancel.apply(this.manager, arguments);
Expand Down
6 changes: 6 additions & 0 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class Manager extends EventEmitter {
}
}

close() {
this.workers.forEach(worker => worker.stop());
this.workers.length = 0;
return Promise.resolve(true);
}

subscribe(name, ...args){

let self = this;
Expand Down
13 changes: 9 additions & 4 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ class Worker extends EventEmitter {
checkForWork();

function checkForWork(){
self.config.fetcher()
.then(job => { if(job) self.emit(self.config.name, job); })
.catch(error => self.emit('error', error))
.then(() => setTimeout(checkForWork, self.config.interval));
if(!self.stopped)
self.config.fetcher()
.then(job => { if(job) self.emit(self.config.name, job); })
.catch(error => self.emit('error', error))
.then(() => setTimeout(checkForWork, self.config.interval));
}
}

stop() {
this.stopped = true;
}
}

module.exports = Worker;
4 changes: 4 additions & 0 deletions test/connectTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ describe('connect', function() {
});
});

afterEach(function(finished){
boss.disconnect().then(finished);
});

it('should fail if connecting to an older schema version', function (finished) {
helper.getDb().executeSql(`UPDATE ${helper.config.schema}.version SET VERSION = '0.0.0'`)
.then(() => {
Expand Down
4 changes: 4 additions & 0 deletions test/delayTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ describe('delayed jobs', function(){
});
});

after(function(finished){
boss.disconnect().then(finished);
});

it('should wait before processing a delayed job submission', function(finished) {

var delaySeconds = 2;
Expand Down
4 changes: 4 additions & 0 deletions test/publishTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ describe('publish', function(){
});
});

after(function(finished){
boss.disconnect().then(finished);
});

it('should fail with no arguments', function(finished) {
boss.publish().catch(error => {
assert(true);
Expand Down
4 changes: 4 additions & 0 deletions test/retryTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ describe('retries', function() {
finished();
});
});

after(function(finished){
boss.disconnect().then(finished);
});

it('should retry a job that didn\'t complete', function (finished) {

Expand Down
4 changes: 4 additions & 0 deletions test/speedTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ describe('speed', function() {
});
});

after(function(finished){
boss.disconnect().then(finished);
});

var expectedSeconds = 4;
var jobCount = 1000;

Expand Down
4 changes: 4 additions & 0 deletions test/throttleTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ describe('throttle', function() {
});
});

after(function(finished){
boss.disconnect().then(finished);
});

it('should process at most 1 job per second', function (finished) {

var singletonSeconds = 1;
Expand Down

0 comments on commit af8cd86

Please sign in to comment.