Skip to content

Commit

Permalink
Merge 56c4660 into 4974fd7
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit authored Dec 20, 2019
2 parents 4974fd7 + 56c4660 commit 65a6e78
Show file tree
Hide file tree
Showing 52 changed files with 3,948 additions and 6,030 deletions.
24 changes: 0 additions & 24 deletions .editorconfig

This file was deleted.

2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
node_modules
.idea
lib
coverage/
*.tgz
.nyc_output/
3 changes: 0 additions & 3 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
.idea
src/
test/
coverage/
.travis.yml
.babelrc
*.tgz
.nyc_output/
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changes

## 3.2.0

- Fixed rare deadlocks by stacking housekeeping operations one at a time during start().
- Added `archive()`, `purge()` and `expire()` to exports for manual housekeeping if desired along with connect(). Use this only if you need it for special cases, as it's not a good idea to run these in parallel (see deadlock comment above).
- Added index to archive table by date to improve housekeeping perf.
- Node 8 is now officially the minimum supported version. Not only have I stopped testing anything lower than 8 in Travis, but I finally migrated to async await in this round of changes.
- Typescript type defs.

## 3.1.7

- Typescript type defs for singletonNextSlot config updated via PR.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async function readme() {

const queue = 'some-queue';

let jobId = await boss.publish(queue, {param1: 'parameter1'})
let jobId = await boss.publish(queue, { param1: 'foo' })

console.log(`created job in queue ${queue}: ${jobId}`);

Expand All @@ -32,11 +32,11 @@ async function someAsyncJobHandler(job) {
}
```

pg-boss is a message queue (aka job queue, task queue) built in Node.js on top of PostgreSQL in order to provide guaranteed messaging and asynchronous execution to your Node apps.
pg-boss is a job queue built in Node.js on top of PostgreSQL in order to provide background job processing and reliable asynchronous execution to Node.js applications.

Why would you consider using this queue over others? pg-boss was created to leverage recent additions in PostgreSQL 9.5
Why would you consider using this queue over others? pg-boss is actually a light abstraction over features added in PostgreSQL 9.5
(specifically [SKIP LOCKED](http://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5) and upserts)
which significantly enhance its ability to act as a reliable, distributed message queue. I wrote this to remove a dependency on Redis (via the kue package), consolidating systems I have to support in production as well as upgrading to guaranteed message processing (hint: [Redis persistence docs](https://redis.io/topics/persistence#ok-so-what-should-i-use)).
which significantly enhanced its ability to act as a reliable, distributed message queue. I wrote this to remove a dependency on Redis (via the kue package), consolidating systems I have to support in production as well as upgrading to guaranteed message processing (hint: [Redis persistence docs](https://redis.io/topics/persistence#ok-so-what-should-i-use)).

This will likely cater the most to teams already familiar with the simplicity of relational database semantics and operations (querying and backups, for example).

Expand Down
7 changes: 0 additions & 7 deletions babel.config.js

This file was deleted.

196 changes: 89 additions & 107 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,44 @@ Usage
- [Intro](#intro)
- [Database installation](#database-installation)
- [Functions](#functions)
- [`new(connectionString)`](#newconnectionstring)
- [`new(options)`](#newoptions)
- [`start()`](#start)
- [`stop()`](#stop)
- [`connect()`](#connect)
- [`disconnect()`](#disconnect)
- [`publish()`](#publish)
- [`publish(name, data, options)`](#publishname-data-options)
- [`publish(request)`](#publishrequest)
- [`publishAfter(name, data, options, seconds | ISO date string | Date)`](#publishaftername-data-options-seconds--iso-date-string--date)
- [`publishOnce(name, data, options, key)`](#publishoncename-data-options-key)
- [`publishThrottled(name, data, options, seconds [, key])`](#publishthrottledname-data-options-seconds--key)
- [`publishDebounced(name, data, options, seconds [, key])`](#publishdebouncedname-data-options-seconds--key)
- [`subscribe()`](#subscribe)
- [`subscribe(name [, options], handler)`](#subscribename--options-handler)
- [`onComplete(name [, options], handler)`](#oncompletename--options-handler)
- [`unsubscribe(name)`](#unsubscribename)
- [`offComplete(name)`](#offcompletename)
- [`fetch()`](#fetch)
- [`fetch(name)`](#fetchname)
- [`fetch(name, batchSize)`](#fetchname-batchsize)
- [`fetchCompleted(name [, batchSize])`](#fetchcompletedname--batchsize)
- [`cancel(id)`](#cancelid)
- [`cancel([ids])`](#cancelids)
- [`complete(id [, data])`](#completeid--data)
- [`complete([ids])`](#completeids)
- [`fail(id [, data])`](#failid--data)
- [`fail([ids])`](#failids)
- [`deleteQueue(name)`](#deletequeuename)
- [`deleteAllQueues()`](#deleteallqueues)
- [new(connectionString)](#newconnectionstring)
- [new(options)](#newoptions)
- [start()](#start)
- [stop()](#stop)
- [connect()](#connect)
- [disconnect()](#disconnect)
- [publish()](#publish)
- [publish(name, data, options)](#publishname-data-options)
- [publish(request)](#publishrequest)
- [publishAfter(name, data, options, seconds | ISO date string | Date)](#publishaftername-data-options-seconds--iso-date-string--date)
- [publishOnce(name, data, options, key)](#publishoncename-data-options-key)
- [publishThrottled(name, data, options, seconds [, key])](#publishthrottledname-data-options-seconds--key)
- [publishDebounced(name, data, options, seconds [, key])](#publishdebouncedname-data-options-seconds--key)
- [subscribe()](#subscribe)
- [subscribe(name [, options], handler)](#subscribename--options-handler)
- [onComplete(name [, options], handler)](#oncompletename--options-handler)
- [unsubscribe(name)](#unsubscribename)
- [offComplete(name)](#offcompletename)
- [fetch()](#fetch)
- [fetch(name)](#fetchname)
- [fetch(name, batchSize)](#fetchname-batchsize)
- [fetchCompleted(name [, batchSize])](#fetchcompletedname--batchsize)
- [cancel(id)](#cancelid)
- [cancel([ids])](#cancelids)
- [complete(id [, data])](#completeid--data)
- [complete([ids])](#completeids)
- [fail(id [, data])](#failid--data)
- [fail([ids])](#failids)
- [deleteQueue(name)](#deletequeuename)
- [deleteAllQueues()](#deleteallqueues)
- [Events](#events)
- [`error`](#error)
- [`archived`](#archived)
- [`expired`](#expired)
- [`monitor-states`](#monitor-states)
- [error](#error)
- [archived](#archived)
- [expired](#expired)
- [monitor-states](#monitor-states)
- [Static functions](#static-functions)
- [`string getConstructionPlans(schema)`](#string-getconstructionplansschema)
- [`string getMigrationPlans(schema, version, uninstall)`](#string-getmigrationplansschema-version-uninstall)
- [string getConstructionPlans(schema)](#string-getconstructionplansschema)
- [string getMigrationPlans(schema, version, uninstall)](#string-getmigrationplansschema-version-uninstall)

<!-- /TOC -->

Expand Down Expand Up @@ -92,14 +92,7 @@ const options = {
archiveCompletedJobsEvery: '2 days'
};

let boss;

try {
boss = new PgBoss(options);
}
catch(error) {
console.error(error);
}
const boss = new PgBoss(options);
```

## `start()`
Expand All @@ -109,10 +102,8 @@ catch(error) {
Prepares the target database and begins job monitoring.

```js
boss.start()
.then(boss => {
boss.publish('hey-there', {msg:'this came for you'});
});
await boss.start()
await boss.publish('hey-there', { msg:'this came for you' })
```

Since it is responsible for monitoring jobs for expiration and archiving, `start()` *should be called once and only once per backing database store.* Once this has been taken care of, if your use cases demand additional instances for job processing, you should use `connect()`.
Expand Down Expand Up @@ -170,18 +161,18 @@ Creates a new job and resolves the job's unique identifier (uuid).
- `options`: object ([publish options](configuration.md#publish-options))

```js
var payload = {
const payload = {
email: "billybob@veganplumbing.com",
name: "Billy Bob"
};

var options = {
const options = {
startAfter: 1,
retryLimit: 2
};

boss.publish('email-send-welcome', payload, options)
.then(jobId => console.log(`job ${jobId} submitted`));
const jobId = await boss.publish('email-send-welcome', payload, options)
console.log(`job ${jobId} submitted`)
```

### `publish(request)`
Expand All @@ -202,11 +193,12 @@ The request object has the following properties.
This overload is for conditionally including data or options based on keys in an object, such as the following.

```js
boss.publish({
name: 'database-backup',
options: { retryLimit: 1 }
const jobId = await boss.publish({
name: 'database-backup',
options: { retryLimit: 1 }
})
.then(id => console.log(`job ${id} submitted`));

console.log(`job ${jobId} submitted`)
```

### `publishAfter(name, data, options, seconds | ISO date string | Date)`
Expand Down Expand Up @@ -267,36 +259,30 @@ If you do not return a promise, `done()` should be used to mark the job as compl

> If you forget to use a promise or the callback function to mark the job as completed, it will expire after the configured expiration period. The default expiration can be found in the [configuration docs](configuration.md#job-expiration).
Following is an example of a subscription that returns a promise for completion with the teamSize option set for increased job concurrency between polling intervals.
Following is an example of a subscription that returns a promise (`sendWelcomeEmail()`) for completion with the teamSize option set for increased job concurrency between polling intervals.

```js
boss.subscribe('email-welcome', {teamSize: 5, teamConcurrency: 5},
job => myEmailService.sendWelcomeEmail(job.data))
.then(() => console.log('subscription created'))
.catch(error => console.error(error));
const options = { teamSize: 5, teamConcurrency: 5 }
await boss.subscribe('email-welcome', options, job => myEmailService.sendWelcomeEmail(job.data))
```

And the same example, but without returning a promise in the handler.

```js
boss.subscribe('email-welcome', {teamSize: 5, teamConcurrency: 5},
job => {
myEmailService.sendWelcomeEmail(job.data)
.then(() => job.done())
.catch(error => job.done(error));
})
.then(() => console.log('subscription created'))
.catch(error => console.error(error));
const options = { teamSize: 5, teamConcurrency: 5 }
await boss.subscribe('email-welcome', options, job => {
myEmailService.sendWelcomeEmail(job.data)
.then(() => job.done())
.catch(error => job.done(error))
})
```

Similar to the first example, but with a batch of jobs at once.

```js
boss.subscribe('email-welcome', {batchSize: 5},
jobs => myEmailService.sendWelcomeEmails(jobs.map(job => job.data))
)
.then(() => console.log('subscription created'))
.catch(error => console.error(error));
await boss.subscribe('email-welcome', { batchSize: 5 },
jobs => myEmailService.sendWelcomeEmails(jobs.map(job => job.data))
)
```

### `onComplete(name [, options], handler)`
Expand All @@ -310,34 +296,26 @@ The callback for `onComplete()` returns a job containing the original job and co
Here's an example from the test suite showing this in action.

```js
it('onComplete should have both request and response', function(finished){

const jobName = 'onCompleteFtw';
const requestPayload = {token:'trivial'};
const responsePayload = {message: 'so verbose', code: '1234'};

let jobId = null;
const jobName = 'onCompleteFtw'
const requestPayload = { token:'trivial' }
const responsePayload = { message: 'so verbose', code: '1234' }

boss.onComplete(jobName, job => {
assert.equal(jobId, job.data.request.id);
assert.equal(job.data.request.data.token, requestPayload.token);
assert.equal(job.data.response.message, responsePayload.message);
assert.equal(job.data.response.code, responsePayload.code);
boss.onComplete(jobName, job => {
assert.equal(jobId, job.data.request.id)
assert.equal(job.data.request.data.token, requestPayload.token)
assert.equal(job.data.response.message, responsePayload.message)
assert.equal(job.data.response.code, responsePayload.code)

finished();
});

boss.publish(jobName, requestPayload)
.then(id => jobId = id)
.then(() => boss.fetch(jobName))
.then(job => boss.complete(job.id, responsePayload));
finished()
})

});
const jobId = await boss.publish(jobName, requestPayload)
const job = await boss.fetch(jobName)
await boss.complete(job.id, responsePayload)
```

And here's an example job from the callback in this test.


```js
{
"request": {
Expand Down Expand Up @@ -396,21 +374,25 @@ Note: If you pass a batchSize, `fetch()` will always resolve an array response,
The following code shows how to utilize batching via `fetch()` to get and complete 20 jobs at once on-demand.

```js
const jobName = 'email-daily-digest';
const batchSize = 20;
const queue = 'email-daily-digest'
const batchSize = 20

boss.fetch(jobName, batchSize)
.then(jobs => {
if(!jobs) return;
const jobs = await boss.fetch(queue, batchSize)

console.log(`received ${jobs.length} ${jobName} jobs`);
if(!jobs) {
return
}

for (let i = 0; i < jobs.length; i++) {
const job = jobs[i]

// our magical emailer knows what to do with job.data
let promises = jobs.map(job => emailer.send(job.data).then(() => boss.complete(job.id)));

return Promise.all(promises);
})
.catch(error => console.log(error));
try {
await emailer.send(job.data)
await boss.complete(job.id)
} catch(err) {
await boss.fail(job.id, err)
}
}
```

### `fetchCompleted(name [, batchSize])`
Expand Down
Loading

0 comments on commit 65a6e78

Please sign in to comment.