Skip to content

Commit

Permalink
Merge pull request #378 from timgit/singletonqueue-active-limit
Browse files Browse the repository at this point in the history
Singleton queue and work completion using batches
  • Loading branch information
timgit committed Apr 16, 2023
2 parents cf8bc92 + ce9920e commit 1f54126
Show file tree
Hide file tree
Showing 18 changed files with 356 additions and 4,438 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Expand Up @@ -4,7 +4,6 @@ language: node_js
node_js:
- '18'
- '16'
- '14'
addons:
postgresql: '13'
apt:
Expand Down
6 changes: 3 additions & 3 deletions README.md
@@ -1,6 +1,6 @@
Queueing jobs in Node.js using PostgreSQL like a boss.

[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-9.5+-blue.svg?maxAge=2592000)](http://www.postgresql.org)
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-11+-blue.svg?maxAge=2592000)](http://www.postgresql.org)
[![npm version](https://badge.fury.io/js/pg-boss.svg)](https://badge.fury.io/js/pg-boss)
[![Build Status](https://app.travis-ci.com/timgit/pg-boss.svg?branch=master)](https://app.travis-ci.com/github/timgit/pg-boss)
[![Coverage Status](https://coveralls.io/repos/github/timgit/pg-boss/badge.svg?branch=master)](https://coveralls.io/github/timgit/pg-boss?branch=master)
Expand Down Expand Up @@ -50,8 +50,8 @@ This will likely cater the most to teams already familiar with the simplicity of
* Automatic maintenance operations to manage table growth

## Requirements
* Node 14 or higher
* PostgreSQL 9.5 or higher
* Node 16 or higher
* PostgreSQL 11 or higher

## Installation

Expand Down
55 changes: 25 additions & 30 deletions docs/readme.md
Expand Up @@ -47,17 +47,17 @@
- [`schedule(name, cron, data, options)`](#schedulename-cron-data-options)
- [`unschedule(name)`](#unschedulename)
- [`getSchedules()`](#getschedules)
- [`cancel(id)`](#cancelid)
- [`cancel([ids])`](#cancelids)
- [`resume(id)`](#resumeid)
- [`resume([ids])`](#resumeids)
- [`complete(id [, data])`](#completeid--data)
- [`complete([ids])`](#completeids)
- [`fail(id [, data])`](#failid--data)
- [`fail([ids])`](#failids)
- [`cancel(id, options)`](#cancelid-options)
- [`cancel([ids], options)`](#cancelids-options)
- [`resume(id, options)`](#resumeid-options)
- [`resume([ids], options)`](#resumeids-options)
- [`complete(id [, data, options])`](#completeid--data-options)
- [`complete([ids], options)`](#completeids-options)
- [`fail(id [, data, options])`](#failid--data-options)
- [`fail([ids], options)`](#failids-options)
- [`notifyWorker(id)`](#notifyworkerid)
- [`getQueueSize(name [, options])`](#getqueuesizename--options)
- [`getJobById(id)`](#getjobbyidid)
- [`getJobById(id, options)`](#getjobbyidid-options)
- [`deleteQueue(name)`](#deletequeuename)
- [`deleteAllQueues()`](#deleteallqueues)
- [`clearStorage()`](#clearstorage)
Expand Down Expand Up @@ -559,7 +559,7 @@ Available in constructor as a default, or overridden in send.

* **singletonKey** string

Only allows 1 job (within the same name) to be queued or active with the same singletonKey.
Allows a max of 1 job (with the same name and singletonKey) to be queued or active.

```js
boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId
Expand All @@ -570,7 +570,9 @@ Available in constructor as a default, or overridden in send.

* **useSingletonQueue** boolean

When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey.
When used in conjunction with singletonKey, allows a max of 1 job to be queued.

>By default, there is no limit on the number of these jobs that may be active. However, this behavior may be modified by passing the [enforceSingletonQueueActiveLimit](#fetch) option.
```js
boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId
Expand Down Expand Up @@ -744,6 +746,11 @@ Typically one would use `work()` for automated polling for new jobs based upon a
| oncomplete | bool |
| output | object |

* `enforceSingletonQueueActiveLimit`, bool

If `true`, modifies the behavior of the `useSingletonQueue` flag to allow a max of 1 job to be queued plus a max of 1 job to be active.
>Note that use of this option can impact performance on instances with large numbers of jobs.

**Resolves**
- `[job]`: array of job objects, `null` if none found
Expand Down Expand Up @@ -819,6 +826,10 @@ The default concurrency for `work()` is 1 job every 2 seconds. Both the interval

Same as in [`fetch()`](#fetch)

* **enforceSingletonQueueActiveLimit**, bool

Same as in [`fetch()`](#fetch)

**Polling options**

How often workers will poll the queue table for jobs. Available in the constructor as a default or per worker in `work()` and `onComplete()`.
Expand All @@ -838,11 +849,9 @@ How often workers will poll the queue table for jobs. Available in the construct

**Handler function**

Typically `handler` will be an `async` function, since this automatically returns promises that can be awaited for backpressure support.

If handler returns a promise, the value resolved/returned will be stored in a completion job. Likewise, if an error occurs in the handler, it will be caught and useful error properties stored into a completion job in addition to marking the job as failed.
`handler` should either be an `async` function or return a promise. If an error occurs in the handler, it will be caught and stored into an output storage column in addition to marking the job as failed.

Finally, and importantly, promise-returning handlers will be awaited before polling for new jobs which provides **automatic backpressure**.
Enforcing promise-returning handlers that are awaited in the workers defers polling for new jobs until the existing jobs are completed, providing backpressure.

The job object has the following properties.

Expand All @@ -851,11 +860,8 @@ The job object has the following properties.
|`id`| string, uuid |
|`name`| string |
|`data`| object |
|`done(err, data)` | function | callback function used to mark the job as completed or failed. Returns a promise.

If `handler` does not return a promise, `done()` should be used to mark the job as completed or failed. `done()` accepts optional arguments, `err` and `data`, for usage with [`onComplete()`](#oncompletename--options-handler) state-based workers. If `err` is truthy, it will mark the job as failed.

> If the job is not completed, either by returning a promise from `handler` or manually via `job.done()`, it will expire after the configured expiration period.
> If the job is not completed, it will expire after the configured expiration period.
Following is an example of a worker that returns a promise (`sendWelcomeEmail()`) for completion with the teamSize option set for increased job concurrency between polling intervals.

Expand All @@ -864,17 +870,6 @@ const options = { teamSize: 5, teamConcurrency: 5 }
await boss.work('email-welcome', options, job => myEmailService.sendWelcomeEmail(job.data))
```

And the same example, but without returning a promise in the handler.

```js
const options = { teamSize: 5, teamConcurrency: 5 }
await boss.work('email-welcome', options, job => {
myEmailService.sendWelcomeEmail(job.data)
.then(() => job.done())
.catch(error => job.done(error))
})
```

Similar to the first example, but with a batch of jobs at once.

```js
Expand Down

0 comments on commit 1f54126

Please sign in to comment.