Skip to content

Commit

Permalink
Merge 7f30d18 into 9a9c1a4
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed May 8, 2021
2 parents 9a9c1a4 + 7f30d18 commit d72c35b
Show file tree
Hide file tree
Showing 50 changed files with 7,300 additions and 1,698 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ services:
- postgresql
language: node_js
node_js:
- "16"
- "14"
- "12"
- "10"
before_script:
- psql -c 'create database pgboss' -U postgres
- psql -c 'create extension pgcrypto' -d pgboss -U postgres
Expand Down
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
# Changes

## 6.0.0 :tada:

- CHANGE: `stop()` has been enhanced with a graceful stop feature that will signal and monitor any polling subscriptions (workers using `subscribe()` or `onComlete()`) before closing the internal connection pool and stopping maintenance operations. The defalt options, `{ graceful = true, timeout = 30000 }`, will wait for 30s before shutting down.
- NEW: Added a `stopped` event that will be emitted after `stop()` when all workers have completed active jobs, or when the timeout is met, whichever is sooner.
- NEW: Added a `wip` event that will emit as jobs are both fetched and completed per instance. If no work is being done, no events will be emitted. This will emit at most once every 2 seconds for monitoring purposes.
- NEW: Added the `output` jsonb column to storage tables to store result or error data along with the original job, which were previously only available via completion jobs. This has the added benefit of storing any errors or results from completion jobs themselves, which were previously discarded.
- NEW: `getJobById(id)` can now be used to fetch a job from either primary or archive storage by id. This may be helpful if needed to inspect `output` and you have the job id.
- NEW: Added new function, `publishSingleton()`, similar to publishOnce(), but throttles publish to only allow 1 job in the queue at a time, allowing a job to be queued even if 1 or more jobs are currently active.
- CHANGE: `subscribe()` and `onComplete()` now resolve with a unique id that will be visible in `wip` along with additional metadata about the subscription.
- CHANGE: `subscribe()` and `onComplete()` now abort promise execution client-side based on the job's expiration.
- CHANGE: `unsubscribe()` and `offComplete()` now accept an object as an argument to allow removing a specific subscription by id.
- MAJOR: The `onComplete` publish option is now defaulted to `false`, which breaks backward compatability for automatic creation of completion jobs. To restore the previous behavior of completion jobs being created by default, you should set `onComplete` to `true` in your constructor options.
- MAJOR: The default retention policy has been reduced from 30 to 14 days. This can still be customized as an option in the constructor.
- MAJOR: Node 10 is EOL. Node 12 is now the minimum supported version.
- MAJOR: Added a new index to the primary job table, `job_fetch`, to improve fetch time performace as the job table size increases. Depending on how many jobs you have in your job table, creating this index may delay `start()` promise resolution. If this is a concern, you can get the DDL via `getMigrationPlans()` and create the index out of band. The DLL includes an `IF NOT EXISTS` which will skip this step in the migration if already created.

In the following example, once you have installed this package version, using the node repl, you can get the DDL for the index from `getMigrationPlans()`.

```shell

$ node
Welcome to Node.js v14.16.1.
Type ".help" for more information.
> console.log(require('./node_modules/pg-boss').getMigrationPlans())

BEGIN;
...
CREATE INDEX IF NOT EXISTS job_fetch ...
...
COMMIT;
```

## 5.2.3

- Dependency PR from dependabot
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ This will likely cater the most to teams already familiar with the simplicity of
* Automatic maintenance operations to manage table growth

## Requirements
* Node 10 or higher
* Node 12 or higher
* PostgreSQL 9.5 or higher

## Installation
Expand Down
7 changes: 3 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ Alternatively, the following options can be set as properties in an object.
This option may be beneficial if you'd like to use an existing database service
with its own connection pool.

For example, you may be relying on the cluster module on
a web server, and you'd like to limit the growth of total connections as much as possible.

* **schema** - string, defaults to "pgboss"

Database schema that contains all required storage objects. Only alphanumeric and underscore allowed, length: <= 50 characters
Expand Down Expand Up @@ -259,7 +256,7 @@ For example, if you set the `singletonMinutes` to 1, then submit 2 jobs within a
Setting `singletonNextSlot` to true will cause the job to be scheduled to run after the current time slot if and when a job is throttled. This option is set to true, for example, when calling the convenience function `publishDebounced()`.

### Completion jobs
* **onComplete**, bool (Default: true)
* **onComplete**, bool (Default: false)

When a job completes, a completion job will be created in the queue, copying the same retention policy as the job, for the purpose of `onComplete()` or `fetchCompleted()`. If completion jobs are not used, they will be archived according to the retention policy. If the queue in question has a very high volume, this can be set to `false` to bypass creating the completion job. This can also be set in the constructor as a default for all calls to `publish()`.

Expand Down Expand Up @@ -288,6 +285,8 @@ When a job completes, a completion job will be created in the queue, copying the
| createdon | string, timestamp |
| completedon | string, timestamp |
| keepuntil | string, timestamp |
| oncomplete | bool |
| output | object |

## Subscribe options

Expand Down
Binary file modified docs/images/job-states.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
63 changes: 55 additions & 8 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- [Events](#events)
- [`error`](#error)
- [`monitor-states`](#monitor-states)
- [`wip`](#wip)
- [`stopped`](#stopped)
- [Static functions](#static-functions)
- [`string getConstructionPlans(schema)`](#string-getconstructionplansschema)
- [`string getMigrationPlans(schema, version)`](#string-getmigrationplansschema-version)
Expand All @@ -25,13 +27,14 @@
- [`publish(request)`](#publishrequest)
- [`publishAfter(name, data, options, seconds | ISO date string | Date)`](#publishaftername-data-options-seconds--iso-date-string--date)
- [`publishOnce(name, data, options, key)`](#publishoncename-data-options-key)
- [`publishSingleton(name, data, options)`](#publishsingletonname-data-options)
- [`publishThrottled(name, data, options, seconds [, key])`](#publishthrottledname-data-options-seconds--key)
- [`publishDebounced(name, data, options, seconds [, key])`](#publishdebouncedname-data-options-seconds--key)
- [`subscribe()`](#subscribe)
- [`subscribe(name [, options], handler)`](#subscribename--options-handler)
- [`onComplete(name [, options], handler)`](#oncompletename--options-handler)
- [`unsubscribe(name)`](#unsubscribename)
- [`offComplete(name)`](#offcompletename)
- [`unsubscribe(value)`](#unsubscribevalue)
- [`offComplete(value)`](#offcompletevalue)
- [`fetch()`](#fetch)
- [`fetch(name)`](#fetchname)
- [`fetch(name, batchSize, [, options])`](#fetchname-batchsize--options)
Expand All @@ -47,6 +50,7 @@
- [`fail(id [, data])`](#failid--data)
- [`fail([ids])`](#failids)
- [`getQueueSize(name [, options])`](#getqueuesizename--options)
- [`getJobById(id)`](#getjobbyidid)
- [`deleteQueue(name)`](#deletequeuename)
- [`deleteAllQueues()`](#deleteallqueues)
- [`clearStorage()`](#clearstorage)
Expand Down Expand Up @@ -187,6 +191,32 @@ The payload of the event is an object with a key per queue and state, such as th
"all": 4694
}
```
## `wip`

Emitted at most once every 2 seconds when polling subscriptions are active and jobs are entering or leaving active state. The payload is an array that represents each worker in this instance of pg-boss. If you want to monitor queue activity across all instances, use `monitor-states`.

```js
[
{
id: 'fc738fb0-1de5-4947-b138-40d6a790749e',
name: 'my-queue',
options: { newJobCheckInterval: 2000 },
state: 'active',
count: 1,
createdOn: 1620149137015,
lastFetchedOn: 1620149137015,
lastJobStartedOn: 1620149137015,
lastJobEndedOn: null,
lastJobDuration: 343
lastError: null,
lastErrorOn: null
}
]
```

## `stopped`

Emitted after `stop()` once all subscription workers have completed their work and maintenance has been shut down.

# Static functions

Expand Down Expand Up @@ -332,7 +362,13 @@ This is a convenience version of `publish()` with the `startAfter` option assign

### `publishOnce(name, data, options, key)`

Publish a job with a unique key to make sure it isn't processed more than once. Any other jobs published during this archive interval with the same queue name and key will be rejected.
Publish a job with a unique key to only allow 1 job to be in created, retry, or active state at a time.

This is a convenience version of `publish()` with the `singletonKey` option assigned.

### `publishSingleton(name, data, options)`

Publish a job but only allow 1 job to be in created or retry state at at time.

This is a convenience version of `publish()` with the `singletonKey` option assigned.

Expand All @@ -352,7 +388,7 @@ This is a convenience version of `publish()` with the `singletonSeconds`, `singl

**returns: Promise**

Polls the database by a queue name or a pattern and executes the provided callback function when jobs are found. The promise resolves once a subscription has been created.
Polls the database by a queue name or a pattern and executes the provided callback function when jobs are found. The promise resolves once a subscription has been created with a unique id of the subscription. You can monitor the state of subscriptions using the `wip` event.

Queue patterns use the `*` character to match 0 or more characters. For example, a job from queue `status-report-12345` would be fetched with pattern `status-report-*` or even `stat*5`.

Expand Down Expand Up @@ -461,13 +497,20 @@ The following is an example data object from the job retrieved in the onComplete
}
```

## `unsubscribe(name)`
## `unsubscribe(value)`

Removes a subscription by name or id and stops polling.

** Arguments **
- value: string or object

Removes a subscription by name and stops polling.
If a string, removes all subscriptions found matching the name. If an object, only the subscription with a matching `id` will be removed.

### `offComplete(name)`
### `offComplete(value)`

Same as `unsubscribe()`, but removes an `onComplete()` subscription.
Similar to `unsubscribe()`, but removes an `onComplete()` subscription.

**

## `fetch()`

Expand Down Expand Up @@ -638,6 +681,10 @@ As an example, the following options object include active jobs along with creat
}
```

## `getJobById(id)`

Retrieves a job with all metadata by id in either the primary or archive storage.

## `deleteQueue(name)`

Deletes all pending jobs in the specified queue from the active job table. All jobs in the archive table are retained.
Expand Down
Loading

0 comments on commit d72c35b

Please sign in to comment.