Skip to content

Commit

Permalink
Merge pull request #88 from timgit/wildcard-subs
Browse files Browse the repository at this point in the history
3.1
  • Loading branch information
timgit committed Sep 15, 2018
2 parents 33d9383 + 3345338 commit 45258e9
Show file tree
Hide file tree
Showing 16 changed files with 238 additions and 214 deletions.
27 changes: 27 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,32 @@
# Changes

## 3.1.0

### Features
- Added wildcard pattern matching for subscriptions. The allows you to have 1 subscription over many queues. For example, the following subscription uses the `*` placeholder to fetch completed jobs from all queues that start with the text `sensor-report-`.

```js
boss.onComplete('sensor-report-*', processSensorReport);
```
Wildcards may be placed anywhere in the queue name. The motivation for this feature is adding the capability for an orchestration to use a single subscription to listen to potentially thousands of job processors that just have 1 thing to do via isolated queues.

### Changes
- Multiple subscriptions to the same queue are now allowed on the same instance.

Previously an error was thrown when attempting to subscribe to the same queue more than once on the same instance. This was merely an internal concern with worker tracking. Since `teamConcurrency` was introduced in 3.0, it blocks polling until the last job in a batch is completed, which may have the side effect of slowing down queue operations if one job is taking a long time to complete. Being able to have multiple subscriptions isn't necessarily something I'd advertise as a feature, but it's something easy I can offer until implementing a more elaborate producer consumer queue pattern that monitors its promises.

Remember to keep in mind that `subscribe()` is intended to provide a nice abstraction over `fetch()` and `complete()`, which are always there if and when you require a use case that `subscribe()` cannot provide.

- Internal state job suffixes are now prefixes. The following shows a comparison of completed state jobs for the queue `some-job`.

- 3.0: `some-job__state__completed`
- 3.1: `__state__completed__some-job`

This is a internal implementation detail included here if you happen to have any custom queries written against the job tables. The migration will handle this for the job table (the archive will remain as-is).

### Fixes
- Removed connection string parsing and validation. The pg module bundles [pg-connection-string](https://github.com/iceddev/pg-connection-string) which supports everything I was trying to do previously with connection strings. This resolves some existing issues related to conditional connection arguments as well as allowing auto-promotion of any future enhancements that may be provided by these libraries.

## :tada: 3.0.0 :tada:

### Additions and Enhancements
Expand Down
58 changes: 23 additions & 35 deletions docs/examples.md
@@ -1,51 +1,39 @@
<!-- TOC -->
- [Wildcard completion subscription](#wildcard-completion-subscription)
- [Batch job fetch](#batch-job-fetch)

- [Async Readme](#async-readme)
## Wildcard completion subscription

<!-- /TOC -->

## Async Readme

Same as readme, but with async await
Subscribe based on matched pattern when jobs are completed to handle common completion logic.

```js
async function wildcardCompletion() {

const PgBoss = require('pg-boss');

try {
await readme();
} catch (err) {
console.error(err);
}
await boss.onComplete(`worker-register-*`, commonRegistrationCompletion)

async function readme() {

const boss = new PgBoss('postgres://user:pass@host/database');
boss.on('error', error => console.error(error));

await boss.start();
async function commonRegistrationCompletion(job) {
if(job.data.failed)
return console.log(`job ${job.data.request.id} failed`);

await logRegistration(job.data.response);
}

}
```

const queue = 'some-queue';
## Batch job fetch

let jobId = await boss.publish(queue, {param1: 'parameter1'});

console.log(`created job in queue ${queue}: ${jobId}`);
Fetch an array of jobs in a subscription. Returning a promise ensures no more jobs are fetched until it resolves.

await boss.subscribe(queue, job => await onJob(job));
```js
async function highVolumeSubscription() {

await boss.onComplete(queue, job => {
console.log(`job ${job.data.request.id} completed`);
console.log(` - in state ${job.data.state}`);
console.log(` - responded with '${job.data.response.value}'`);
});
await boss.subscribe('send-text-message', {batchSize: 1000}, handleSend);

async function onJob(job) {
console.log(`job ${job.id} received`);
console.log(` - with data: ${JSON.stringify(job.data)}`);

return 'got it';
async function handleSend(jobs) {
await Promise.all(jobs.map(job => textMessageService.send(job.data)));
}

}

```
```
75 changes: 39 additions & 36 deletions docs/usage.md
Expand Up @@ -3,43 +3,44 @@ Usage

<!-- TOC -->

- [Usage](#usage)
- [Instance functions](#instance-functions)
- [`new(connectionString)`](#newconnectionstring)
- [`new(options)`](#newoptions)
- [`start()`](#start)
- [`stop()`](#stop)
- [`connect()`](#connect)
- [`disconnect()`](#disconnect)
- [`publish()`](#publish)
- [`publish(name, data, options)`](#publishname-data-options)
- [`publish(request)`](#publishrequest)
- [`publishAfter(name, data, options, seconds | ISO date string | Date)`](#publishaftername-data-options-seconds--iso-date-string--date)
- [`publishOnce(name, data, options, key)`](#publishoncename-data-options-key)
- [`publishThrottled(name, data, options, seconds [, key])`](#publishthrottledname-data-options-seconds--key)
- [`publishDebounced(name, data, options, seconds [, key])`](#publishdebouncedname-data-options-seconds--key)
- [`subscribe()`](#subscribe)
- [`subscribe(name [, options], handler)`](#subscribename--options-handler)
- [`onComplete(name [, options], handler)`](#oncompletename--options-handler)
- [`unsubscribe(name)`](#unsubscribename)
- [`offComplete(name)`](#offcompletename)
- [`fetch()`](#fetch)
- [`fetch(name)`](#fetchname)
- [`fetch(name, batchSize)`](#fetchname-batchsize)
- [`fetchCompleted(name [, batchSize])`](#fetchcompletedname--batchsize)
- [`cancel(id)`](#cancelid)
- [`cancel([ids])`](#cancelids)
- [`complete(id [, data])`](#completeid--data)
- [`complete([ids])`](#completeids)
- [`fail(id [, data])`](#failid--data)
- [`fail([ids])`](#failids)
- [`new(connectionString)`](#newconnectionstring)
- [`new(options)`](#newoptions)
- [`start()`](#start)
- [`stop()`](#stop)
- [`connect()`](#connect)
- [`disconnect()`](#disconnect)
- [`publish()`](#publish)
- [`publish(name, data, options)`](#publishname-data-options)
- [`publish(request)`](#publishrequest)
- [`publishAfter(name, data, options, seconds | ISO date string | Date)`](#publishaftername-data-options-seconds--iso-date-string--date)
- [`publishOnce(name, data, options, key)`](#publishoncename-data-options-key)
- [`publishThrottled(name, data, options, seconds [, key])`](#publishthrottledname-data-options-seconds--key)
- [`publishDebounced(name, data, options, seconds [, key])`](#publishdebouncedname-data-options-seconds--key)
- [`subscribe()`](#subscribe)
- [`subscribe(name [, options], handler)`](#subscribename--options-handler)
- [`onComplete(name [, options], handler)`](#oncompletename--options-handler)
- [`unsubscribe(name)`](#unsubscribename)
- [`offComplete(name)`](#offcompletename)
- [`fetch()`](#fetch)
- [`fetch(name)`](#fetchname)
- [`fetch(name, batchSize)`](#fetchname-batchsize)
- [`fetchCompleted(name [, batchSize])`](#fetchcompletedname--batchsize)
- [`cancel(id)`](#cancelid)
- [`cancel([ids])`](#cancelids)
- [`complete(id [, data])`](#completeid--data)
- [`complete([ids])`](#completeids)
- [`fail(id [, data])`](#failid--data)
- [`fail([ids])`](#failids)
- [Events](#events)
- [`error`](#error)
- [`archived`](#archived)
- [`expired`](#expired)
- [`monitor-states`](#monitor-states)
- [`error`](#error)
- [`archived`](#archived)
- [`expired`](#expired)
- [`monitor-states`](#monitor-states)
- [Static functions](#static-functions)
- [`string getConstructionPlans(schema)`](#string-getconstructionplansschema)
- [`string getMigrationPlans(schema, version, uninstall)`](#string-getmigrationplansschema-version-uninstall)
- [`string getConstructionPlans(schema)`](#string-getconstructionplansschema)
- [`string getMigrationPlans(schema, version, uninstall)`](#string-getmigrationplansschema-version-uninstall)

<!-- /TOC -->

Expand Down Expand Up @@ -222,7 +223,9 @@ This is a convenience verion of `publish()` with the `singletonSeconds`, `single

**returns: Promise**

Polls the database for a job by name and executes the provided callback function when found. The promise resolves once a subscription has been created.
Polls the database by a queue name or a pattern and executes the provided callback function when jobs are found. The promise resolves once a subscription has been created.

Queue patterns use the `*` character to match 0 or more characters. For example, a job from queue `status-report-12345` would be fetched with pattern `status-report-*` or even `stat*5`.

The default concurrency for `subscribe()` is 1 job per second. Both the interval and the number of jobs per interval can be customized by passing an optional [configuration option](configuration.md#subscribe-options) argument.

Expand Down Expand Up @@ -286,7 +289,7 @@ boss.subscribe('email-welcome', {batchSize: 5},

Sometimes when a job completes, expires or fails, it's important enough to trigger other things that should react to it. `onComplete` works identically to `subscribe()` and was created to facilitate the creation of orchestrations or sagas between jobs that may or may not know about each other. This common messaging pattern allows you to keep multi-job flow logic out of the individual job handlers so you can manage things in a more centralized fashion while not losing your mind. As you most likely already know, asynchronous jobs are complicated enough already.

> Internally, these jobs have a special suffix of `__state__completed`. Since pg-boss creates these and it's possible that no subscriptions will ever be created for retrieving them, they are considered "second class" and will be archived even if they remain in 'created' state. Keep this in mind if you customize your archive interval.
> Internally, these jobs have a special prefix of `__state__completed__`. Since pg-boss creates these and it's possible that no subscriptions will ever be created for retrieving them, they are considered "second class" and will be archived even if they remain in 'created' state. Keep this in mind if you customize your archive interval.
The callback for `onComplete()` returns a job containing the original job and completion details. `request` will be the original job as submitted with `id`, `name` and `data`. `response` may or may not have a value based on arguments in [complete()](#completeid--data) or [fail()](#failid--data).

Expand Down
4 changes: 2 additions & 2 deletions package.json
@@ -1,14 +1,14 @@
{
"name": "pg-boss",
"version": "3.0.0",
"version": "3.1.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./lib/index.js",
"engines": {
"node": ">=6.0.0",
"npm": ">=5.0.0"
},
"dependencies": {
"bluebird": "^3.4.6",
"bluebird": "^3.5.2",
"pg": "^7.4.1",
"uuid": "^3.2.1"
},
Expand Down
21 changes: 17 additions & 4 deletions src/attorney.js
Expand Up @@ -37,7 +37,7 @@ function checkPublishArgs(args) {

options = options || {};

assert(name, 'boss requires all jobs to have a name');
assert(name, 'boss requires all jobs to have a queue name');
assert(typeof options === 'object', 'options should be an object');

} catch (error){
Expand Down Expand Up @@ -66,16 +66,29 @@ function checkSubscribeArgs(name, args){
if(options)
assert(typeof options === 'object', 'expected config to be an object');

name = sanitizeQueueNameForFetch(name);

} catch(e) {
return Promise.reject(e);
}

return Promise.resolve({options, callback});
}

function checkFetchArgs(names, batchSize){
return assertAsync(names.every(n => n), 'missing job name')
.then(() => assert(!batchSize || batchSize >=1, 'fetch() assert: optional batchSize arg must be at least 1'));
function checkFetchArgs(name, batchSize){
return assertAsync(name, 'missing queue name')
.then(() => {
name = sanitizeQueueNameForFetch(name);
assert(!batchSize || batchSize >=1, 'fetch() assert: optional batchSize arg must be at least 1');
})
.then(() => ({name, batchSize}));
}

function sanitizeQueueNameForFetch(name) {
return name.replace(
/[%_\*]/g,
match => match === '*' ? '%' : '\\' + match)
;
}

function assertAsync(arg, errorMessage){
Expand Down
39 changes: 5 additions & 34 deletions src/db.js
@@ -1,49 +1,20 @@
const EventEmitter = require('events');
const pg = require('pg');
const url = require('url');

class Db extends EventEmitter {
constructor(config){
super();

this.config = config;

let poolConfig = (config.connectionString)
? parseConnectionString(config.connectionString)
: config;

this.pool = new pg.Pool({
user: poolConfig.user,
password: poolConfig.password,
host: poolConfig.host,
port: poolConfig.port,
database: poolConfig.database,
application_name: poolConfig.application_name || 'pgboss',
max: poolConfig.poolSize || poolConfig.max,
ssl: !!poolConfig.ssl
});
if(config.poolSize)
config.max = config.poolSize;

this.pool.on('error', error => this.emit('error', error));

function parseConnectionString(connectionString){
const parseQuerystring = true;
const params = url.parse(connectionString, parseQuerystring);
const auth = params.auth.split(':');

let parsed = {
user: auth[0],
host: params.hostname,
port: params.port,
database: params.pathname.split('/')[1],
ssl: !!params.query.ssl
};
config.application_name = config.application_name || 'pgboss'

if(auth.length === 2)
parsed.password = auth[1];

return parsed;
}
this.pool = new pg.Pool(config);

this.pool.on('error', error => this.emit('error', error));
}

close(){
Expand Down
24 changes: 11 additions & 13 deletions src/manager.js
Expand Up @@ -6,7 +6,7 @@ const Worker = require('./worker');
const plans = require('./plans');
const Attorney = require('./attorney');

const completedJobSuffix = plans.completedJobSuffix;
const completedJobPrefix = plans.completedJobPrefix;

const events = {
error: 'error'
Expand Down Expand Up @@ -64,14 +64,12 @@ class Manager extends EventEmitter {

onComplete(name, ...args) {
return Attorney.checkSubscribeArgs(name, args)
.then(({options, callback}) => this.watch(name + completedJobSuffix, options, callback));
.then(({options, callback}) => this.watch(completedJobPrefix + name, options, callback));
}

watch(name, options, callback){
// watch() is always nested in a promise, so assert()s are welcome

assert(!(name in this.subscriptions), 'this job has already been subscribed on this instance.');

if('newJobCheckInterval' in options || 'newJobCheckIntervalSeconds' in options)
options = Attorney.applyNewJobCheckInterval(options);
else
Expand Down Expand Up @@ -121,23 +119,25 @@ class Manager extends EventEmitter {
let worker = new Worker(workerConfig);
worker.start();

let subscription = this.subscriptions[name] = {worker:null};
subscription.worker = worker;
if(!this.subscriptions[name])
this.subscriptions[name] = { workers: [] };

this.subscriptions[name].workers.push(worker);

return Promise.resolve(true);
}

unsubscribe(name){
if(!this.subscriptions[name]) return Promise.reject(`No subscriptions for ${name} were found.`);

this.subscriptions[name].worker.stop();
this.subscriptions[name].workers.forEach(worker => worker.stop());
delete this.subscriptions[name];

return Promise.resolve(true);
}

offComplete(name){
return this.unsubscribe(name + completedJobSuffix);
return this.unsubscribe(completedJobPrefix + name);
}

publish(...args){
Expand Down Expand Up @@ -255,10 +255,8 @@ class Manager extends EventEmitter {
}

fetch(name, batchSize) {
const names = Array.isArray(name) ? name : [name];

return Attorney.checkFetchArgs(names, batchSize)
.then(() => this.db.executeSql(this.nextJobCommand, [names, batchSize || 1]))
return Attorney.checkFetchArgs(name, batchSize)
.then(values => this.db.executeSql(this.nextJobCommand, [values.name, values.batchSize || 1]))
.then(result => {

const jobs = result.rows.map(job => {
Expand All @@ -273,7 +271,7 @@ class Manager extends EventEmitter {
}

fetchCompleted(name, batchSize){
return this.fetch(name + completedJobSuffix, batchSize);
return this.fetch(completedJobPrefix + name, batchSize);
}

mapCompletionIdArg(id, funcName) {
Expand Down

0 comments on commit 45258e9

Please sign in to comment.