Skip to content

Commit

Permalink
fun with travis and bluebird
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Feb 18, 2018
1 parent 942ac8c commit 127a0a7
Show file tree
Hide file tree
Showing 10 changed files with 5,205 additions and 20 deletions.
5,155 changes: 5,155 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./lib/index.js",
"engines": {
"node": ">=4.0.0"
"node": ">=4.0.0",
"npm": ">=5.0.0"
},
"dependencies": {
"bluebird": "^3.4.6",
"pg": "^7.1.2",
"uuid": "^3.0.0"
},
"devDependencies": {
"bluebird": "^3.4.6",
"babel-cli": "^6.24.1",
"babel-core": "^6.24.1",
"babel-plugin-istanbul": "^4.1.1",
Expand Down
17 changes: 16 additions & 1 deletion src/attorney.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const assert = require('assert');
const Promise = require('bluebird');

module.exports = {
applyConfig,
Expand Down Expand Up @@ -97,6 +96,7 @@ function applyConfig(config) {
config = applyNewJobCheckInterval(config);
config = applyExpireConfig(config);
config = applyArchiveConfig(config);
config = applyDeleteConfig(config);
config = applyMonitoringConfig(config);
config = applyUuidConfig(config);

Expand Down Expand Up @@ -195,6 +195,21 @@ function applyArchiveConfig(config) {
return config;
}

function applyDeleteConfig(config) {

config.deleteCheckInterval = ('deleteCheckInterval' in config)
? config.deleteCheckInterval * 60 * 1000
: 60 * 60 * 1000; // default is 1 hour

// TODO: discontinue pg interval strings in favor of ms int for better validation (when interval is specified lower than check interval, for example)
assert(!('deleteArchivedJobsEvery' in config) || typeof config.deleteArchivedJobsEvery === 'string',
'configuration assert: deleteArchivedJobsEvery should be a readable PostgreSQL interval such as "7 days"');

config.deleteArchivedJobsEvery = config.deleteArchivedJobsEvery || '7 days';

return config;
}

function applyMonitoringConfig(config) {
assert(!('monitorStateIntervalSeconds' in config) || config.monitorStateIntervalSeconds >=1,
'configuration assert: monitorStateIntervalSeconds must be at least every second');
Expand Down
18 changes: 14 additions & 4 deletions src/boss.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const EventEmitter = require('events');
const plans = require('./plans');
const Promise = require('bluebird');

const events = {
archived: 'archived',
deleted: 'deleted',
monitorStates: 'monitor-states',
expiredCount: 'expired-count',
expiredJob: 'expired-job',
Expand All @@ -22,6 +22,7 @@ class Boss extends EventEmitter{

this.expireCommand = plans.expire(config.schema);
this.archiveCommand = plans.archive(config.schema);
this.purgeCommand = plans.purge(config.schema);
this.countStatesCommand = plans.countStates(config.schema);
}

Expand All @@ -30,11 +31,12 @@ class Boss extends EventEmitter{

// todo: add query that calcs avg start time delta vs. creation time

return Promise.join(
return Promise.all([
monitor(this.archive, this.config.archiveCheckInterval),
monitor(this.purge, this.config.deleteCheckInterval),
monitor(this.expire, this.config.expireCheckInterval),
this.config.monitorStateInterval ? monitor(this.countStates, this.config.monitorStateInterval) : null
);
]);

function monitor(func, interval) {

Expand Down Expand Up @@ -73,7 +75,7 @@ class Boss extends EventEmitter{
.then(result => {
if (result.rows.length) {
this.emit(events.expiredCount, result.rows.length);
return Promise.map(result.rows, job => this.emit(events.expiredJob, job));
return Promise.all(result.rows.map(job => this.emit(events.expiredJob, job)));
}
});
}
Expand All @@ -86,6 +88,14 @@ class Boss extends EventEmitter{
});
}

purge(){
return this.db.executeSql(this.purgeCommand, [this.config.deleteArchivedJobsEvery])
.then(result => {
if (result.rowCount)
this.emit(events.deleted, result.rowCount);
});
}

}

module.exports = Boss;
15 changes: 11 additions & 4 deletions src/contractor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const assert = require('assert');
const plans = require('./plans');
const migrations = require('./migrations');
const schemaVersion = require('../version.json').schema;
const Promise = require('bluebird');

class Contractor {

Expand Down Expand Up @@ -47,12 +46,14 @@ class Contractor {
}

create(){
return Promise.each(plans.create(this.config.schema), command => this.db.executeSql(command))
let promises = plans.create(this.config.schema).map(command => () => this.db.executeSql(command));

return this.promiseEach(promises)
.then(() => this.db.executeSql(plans.insertVersion(this.config.schema), [schemaVersion]));
}

update(current) {
if(current == '0.0.2') current = '0.0.1';
if(current === '0.0.2') current = '0.0.1';

return this.migrate(current)
.then(version => {
Expand Down Expand Up @@ -89,9 +90,15 @@ class Contractor {
return Promise.reject(new Error(errorMessage));
}

return Promise.each(migration.commands, command => this.db.executeSql(command))
let promises = migration.commands.map(command => () => this.db.executeSql(command));

return this.promiseEach(promises)
.then(() => migration.version);
}

promiseEach(promises) {
return promises.reduce((promise, func) => promise.then(() => func().then()), Promise.resolve());
}
}

module.exports = Contractor;
1 change: 0 additions & 1 deletion src/db.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const EventEmitter = require('events');
const pg = require('pg');
const Promise = require('bluebird');
const url = require('url');

class Db extends EventEmitter {
Expand Down
5 changes: 2 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const EventEmitter = require('events');
const Promise = require('bluebird');
const Attorney = require('./attorney');
const Contractor = require('./contractor');
const Manager = require('./manager');
Expand Down Expand Up @@ -101,10 +100,10 @@ class PgBoss extends EventEmitter {
stop() {
if(!this.isStarted) return Promise.reject(notStartedErrorMessage);

return Promise.join(
return Promise.all([
this.manager.stop(),
this.boss.stop()
)
])
.then(() => this.db.isOurs ? this.db.close() : null)
.then(() => {
this.isReady = false;
Expand Down
1 change: 0 additions & 1 deletion src/manager.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const assert = require('assert');
const EventEmitter = require('events');
const Promise = require('bluebird');
const uuid = require('uuid');

const Worker = require('./worker');
Expand Down
4 changes: 2 additions & 2 deletions test/fetchTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ describe('fetch', function(){
const jobName = 'fetch-batch';
const batchSize = 4;

Promise.join(
Promise.all([
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName)
)
])
.then(() => boss.fetch(jobName, batchSize))
.then(jobs => {
assert(jobs.length === batchSize);
Expand Down
4 changes: 2 additions & 2 deletions test/subscribeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ describe('subscribe', function(){
const batchSize = 4;
let subscribeCount = 0;

Promise.join(
Promise.all([
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName)
)
])
.then(() => boss.subscribe(jobName, {batchSize}, job => {
subscribeCount++;

Expand Down

0 comments on commit 127a0a7

Please sign in to comment.