From af8cd8606a7132b8eab981adaa5999cf7c77968e Mon Sep 17 00:00:00 2001 From: Tim Jones Date: Tue, 7 Jun 2016 00:22:59 -0500 Subject: [PATCH] added disconnect API --- src/index.js | 13 +++++++++++++ src/manager.js | 6 ++++++ src/worker.js | 13 +++++++++---- test/connectTest.js | 4 ++++ test/delayTest.js | 4 ++++ test/publishTest.js | 4 ++++ test/retryTest.js | 4 ++++ test/speedTest.js | 4 ++++ test/throttleTest.js | 4 ++++ 9 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/index.js b/src/index.js index 935f639e..40ef6239 100644 --- a/src/index.js +++ b/src/index.js @@ -64,6 +64,19 @@ class PgBoss extends EventEmitter { }); } + disconnect() { + if(!this.isReady) return Promise.reject(`boss ain't ready. Use start() or connect() to get started.`); + return this.manager.close.apply(this.manager, arguments).then(() => this.isReady = false); + } + + // stop() { + // return Promise.all([ + // this.disconnect(), + // this.manager.stopSupervise(), + // this.boss.stopMonitor() + // ]); + // } + cancel(){ if(!this.isReady) return Promise.reject(`boss ain't ready. Use start() or connect() to get started.`); return this.manager.cancel.apply(this.manager, arguments); diff --git a/src/manager.js b/src/manager.js index 2461ba42..30793f37 100644 --- a/src/manager.js +++ b/src/manager.js @@ -42,6 +42,12 @@ class Manager extends EventEmitter { } } + close() { + this.workers.forEach(worker => worker.stop()); + this.workers.length = 0; + return Promise.resolve(true); + } + subscribe(name, ...args){ let self = this; diff --git a/src/worker.js b/src/worker.js index 7f317b4f..0417262f 100644 --- a/src/worker.js +++ b/src/worker.js @@ -12,12 +12,17 @@ class Worker extends EventEmitter { checkForWork(); function checkForWork(){ - self.config.fetcher() - .then(job => { if(job) self.emit(self.config.name, job); }) - .catch(error => self.emit('error', error)) - .then(() => setTimeout(checkForWork, self.config.interval)); + if(!self.stopped) + self.config.fetcher() + .then(job => { if(job) self.emit(self.config.name, job); }) + .catch(error => self.emit('error', error)) + .then(() => setTimeout(checkForWork, self.config.interval)); } } + + stop() { + this.stopped = true; + } } module.exports = Worker; \ No newline at end of file diff --git a/test/connectTest.js b/test/connectTest.js index 6a447355..1c64550a 100644 --- a/test/connectTest.js +++ b/test/connectTest.js @@ -13,6 +13,10 @@ describe('connect', function() { }); }); + afterEach(function(finished){ + boss.disconnect().then(finished); + }); + it('should fail if connecting to an older schema version', function (finished) { helper.getDb().executeSql(`UPDATE ${helper.config.schema}.version SET VERSION = '0.0.0'`) .then(() => { diff --git a/test/delayTest.js b/test/delayTest.js index dee783be..015d2618 100644 --- a/test/delayTest.js +++ b/test/delayTest.js @@ -13,6 +13,10 @@ describe('delayed jobs', function(){ }); }); + after(function(finished){ + boss.disconnect().then(finished); + }); + it('should wait before processing a delayed job submission', function(finished) { var delaySeconds = 2; diff --git a/test/publishTest.js b/test/publishTest.js index 613bbf37..2229775a 100644 --- a/test/publishTest.js +++ b/test/publishTest.js @@ -13,6 +13,10 @@ describe('publish', function(){ }); }); + after(function(finished){ + boss.disconnect().then(finished); + }); + it('should fail with no arguments', function(finished) { boss.publish().catch(error => { assert(true); diff --git a/test/retryTest.js b/test/retryTest.js index fa9c4b24..362f0950 100644 --- a/test/retryTest.js +++ b/test/retryTest.js @@ -12,6 +12,10 @@ describe('retries', function() { finished(); }); }); + + after(function(finished){ + boss.disconnect().then(finished); + }); it('should retry a job that didn\'t complete', function (finished) { diff --git a/test/speedTest.js b/test/speedTest.js index 96219f5d..1e4cfc72 100644 --- a/test/speedTest.js +++ b/test/speedTest.js @@ -13,6 +13,10 @@ describe('speed', function() { }); }); + after(function(finished){ + boss.disconnect().then(finished); + }); + var expectedSeconds = 4; var jobCount = 1000; diff --git a/test/throttleTest.js b/test/throttleTest.js index 65d2f0a5..204061cc 100644 --- a/test/throttleTest.js +++ b/test/throttleTest.js @@ -13,6 +13,10 @@ describe('throttle', function() { }); }); + after(function(finished){ + boss.disconnect().then(finished); + }); + it('should process at most 1 job per second', function (finished) { var singletonSeconds = 1;