Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into pg-notify
Browse files Browse the repository at this point in the history
# Conflicts:
#	package-lock.json
  • Loading branch information
darky committed Apr 16, 2023
2 parents 89860a4 + 1f54126 commit 8049e7b
Show file tree
Hide file tree
Showing 18 changed files with 215 additions and 142 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Expand Up @@ -4,7 +4,6 @@ language: node_js
node_js:
- '18'
- '16'
- '14'
addons:
postgresql: '13'
apt:
Expand Down
6 changes: 3 additions & 3 deletions README.md
@@ -1,6 +1,6 @@
Queueing jobs in Node.js using PostgreSQL like a boss.

[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-9.5+-blue.svg?maxAge=2592000)](http://www.postgresql.org)
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-11+-blue.svg?maxAge=2592000)](http://www.postgresql.org)
[![npm version](https://badge.fury.io/js/pg-boss.svg)](https://badge.fury.io/js/pg-boss)
[![Build Status](https://app.travis-ci.com/timgit/pg-boss.svg?branch=master)](https://app.travis-ci.com/github/timgit/pg-boss)
[![Coverage Status](https://coveralls.io/repos/github/timgit/pg-boss/badge.svg?branch=master)](https://coveralls.io/github/timgit/pg-boss?branch=master)
Expand Down Expand Up @@ -50,8 +50,8 @@ This will likely cater the most to teams already familiar with the simplicity of
* Automatic maintenance operations to manage table growth

## Requirements
* Node 14 or higher
* PostgreSQL 9.5 or higher
* Node 16 or higher
* PostgreSQL 11 or higher

## Installation

Expand Down
55 changes: 25 additions & 30 deletions docs/readme.md
Expand Up @@ -47,17 +47,17 @@
- [`schedule(name, cron, data, options)`](#schedulename-cron-data-options)
- [`unschedule(name)`](#unschedulename)
- [`getSchedules()`](#getschedules)
- [`cancel(id)`](#cancelid)
- [`cancel([ids])`](#cancelids)
- [`resume(id)`](#resumeid)
- [`resume([ids])`](#resumeids)
- [`complete(id [, data])`](#completeid--data)
- [`complete([ids])`](#completeids)
- [`fail(id [, data])`](#failid--data)
- [`fail([ids])`](#failids)
- [`cancel(id, options)`](#cancelid-options)
- [`cancel([ids], options)`](#cancelids-options)
- [`resume(id, options)`](#resumeid-options)
- [`resume([ids], options)`](#resumeids-options)
- [`complete(id [, data, options])`](#completeid--data-options)
- [`complete([ids], options)`](#completeids-options)
- [`fail(id [, data, options])`](#failid--data-options)
- [`fail([ids], options)`](#failids-options)
- [`notifyWorker(id)`](#notifyworkerid)
- [`getQueueSize(name [, options])`](#getqueuesizename--options)
- [`getJobById(id)`](#getjobbyidid)
- [`getJobById(id, options)`](#getjobbyidid-options)
- [`deleteQueue(name)`](#deletequeuename)
- [`deleteAllQueues()`](#deleteallqueues)
- [`clearStorage()`](#clearstorage)
Expand Down Expand Up @@ -566,7 +566,7 @@ Available in constructor as a default, or overridden in send.

* **singletonKey** string

Only allows 1 job (within the same name) to be queued or active with the same singletonKey.
Allows a max of 1 job (with the same name and singletonKey) to be queued or active.

```js
boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId
Expand All @@ -577,7 +577,9 @@ Available in constructor as a default, or overridden in send.

* **useSingletonQueue** boolean

When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey.
When used in conjunction with singletonKey, allows a max of 1 job to be queued.

>By default, there is no limit on the number of these jobs that may be active. However, this behavior may be modified by passing the [enforceSingletonQueueActiveLimit](#fetch) option.
```js
boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId
Expand Down Expand Up @@ -751,6 +753,11 @@ Typically one would use `work()` for automated polling for new jobs based upon a
| oncomplete | bool |
| output | object |

* `enforceSingletonQueueActiveLimit`, bool

If `true`, modifies the behavior of the `useSingletonQueue` flag to allow a max of 1 job to be queued plus a max of 1 job to be active.
>Note that use of this option can impact performance on instances with large numbers of jobs.

**Resolves**
- `[job]`: array of job objects, `null` if none found
Expand Down Expand Up @@ -826,6 +833,10 @@ The default concurrency for `work()` is 1 job every 2 seconds. Both the interval

Same as in [`fetch()`](#fetch)

* **enforceSingletonQueueActiveLimit**, bool

Same as in [`fetch()`](#fetch)

**Polling options**

How often workers will poll the queue table for jobs. Available in the constructor as a default or per worker in `work()` and `onComplete()`.
Expand All @@ -845,11 +856,9 @@ How often workers will poll the queue table for jobs. Available in the construct

**Handler function**

Typically `handler` will be an `async` function, since this automatically returns promises that can be awaited for backpressure support.

If handler returns a promise, the value resolved/returned will be stored in a completion job. Likewise, if an error occurs in the handler, it will be caught and useful error properties stored into a completion job in addition to marking the job as failed.
`handler` should either be an `async` function or return a promise. If an error occurs in the handler, it will be caught and stored into an output storage column in addition to marking the job as failed.

Finally, and importantly, promise-returning handlers will be awaited before polling for new jobs which provides **automatic backpressure**.
Enforcing promise-returning handlers that are awaited in the workers defers polling for new jobs until the existing jobs are completed, providing backpressure.

The job object has the following properties.

Expand All @@ -858,11 +867,8 @@ The job object has the following properties.
|`id`| string, uuid |
|`name`| string |
|`data`| object |
|`done(err, data)` | function | callback function used to mark the job as completed or failed. Returns a promise.

If `handler` does not return a promise, `done()` should be used to mark the job as completed or failed. `done()` accepts optional arguments, `err` and `data`, for usage with [`onComplete()`](#oncompletename--options-handler) state-based workers. If `err` is truthy, it will mark the job as failed.

> If the job is not completed, either by returning a promise from `handler` or manually via `job.done()`, it will expire after the configured expiration period.
> If the job is not completed, it will expire after the configured expiration period.
Following is an example of a worker that returns a promise (`sendWelcomeEmail()`) for completion with the teamSize option set for increased job concurrency between polling intervals.

Expand All @@ -871,17 +877,6 @@ const options = { teamSize: 5, teamConcurrency: 5 }
await boss.work('email-welcome', options, job => myEmailService.sendWelcomeEmail(job.data))
```

And the same example, but without returning a promise in the handler.

```js
const options = { teamSize: 5, teamConcurrency: 5 }
await boss.work('email-welcome', options, job => {
myEmailService.sendWelcomeEmail(job.data)
.then(() => job.done())
.catch(error => job.done(error))
})
```

Similar to the first example, but with a batch of jobs at once.

```js
Expand Down
6 changes: 3 additions & 3 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
@@ -1,10 +1,10 @@
{
"name": "pg-boss",
"version": "8.4.2",
"version": "9.0.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
"node": ">=14"
"node": ">=16"
},
"dependencies": {
"cron-parser": "^4.0.0",
Expand Down
2 changes: 2 additions & 0 deletions src/attorney.js
Expand Up @@ -129,6 +129,7 @@ function checkWorkArgs (name, args, defaults) {
assert(!('teamSize' in options) || (Number.isInteger(options.teamSize) && options.teamSize >= 1), 'teamSize must be an integer > 0')
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean')

return { options, callback }
}
Expand All @@ -140,6 +141,7 @@ function checkFetchArgs (name, batchSize, options) {

assert(!batchSize || (Number.isInteger(batchSize) && batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean')

return { name }
}
Expand Down
4 changes: 2 additions & 2 deletions src/boss.js
Expand Up @@ -140,7 +140,7 @@ class Boss extends EventEmitter {
this.emit('maintenance', { ms: ended - started })

if (!this.stopped) {
await job.done() // pre-complete to bypass throttling
await this.manager.complete(job.id) // pre-complete to bypass throttling
await this.maintenanceAsync({ startAfter: this.maintenanceIntervalSeconds })
}
} catch (err) {
Expand All @@ -159,7 +159,7 @@ class Boss extends EventEmitter {
this.emit(events.monitorStates, states)

if (!this.stopped && this.monitorStates) {
await job.done() // pre-complete to bypass throttling
await this.manager.complete(job.id) // pre-complete to bypass throttling
await this.monitorStatesAsync({ startAfter: this.monitorIntervalSeconds })
}
} catch (err) {
Expand Down
8 changes: 8 additions & 0 deletions src/db.js
Expand Up @@ -36,6 +36,14 @@ class Db extends EventEmitter {
return await this.pool.query(text, values)
}
}

static quotePostgresStr (str) {
const delimeter = '$sanitize$'
if (str.includes(delimeter)) {
throw new Error(`Attempted to quote string that contains reserved Postgres delimeter: ${str}`)
}
return `${delimeter}${str}${delimeter}`
}
}

module.exports = Db
45 changes: 26 additions & 19 deletions src/manager.js
Expand Up @@ -6,6 +6,7 @@ const debounce = require('lodash.debounce')
const { serializeError: stringify } = require('serialize-error')
const Attorney = require('./attorney')
const Worker = require('./worker')
const Db = require('./db')
const pMap = require('p-map')

const { QUEUES: BOSS_QUEUES } = require('./boss')
Expand Down Expand Up @@ -184,7 +185,8 @@ class Manager extends EventEmitter {
teamSize = 1,
teamConcurrency = 1,
teamRefill: refill = false,
includeMetadata = false
includeMetadata = false,
enforceSingletonQueueActiveLimit = false
} = options

const id = uuid.v4()
Expand All @@ -208,7 +210,7 @@ class Manager extends EventEmitter {
createTeamRefillPromise()
}

const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata })
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, enforceSingletonQueueActiveLimit })

const onFetch = async (jobs) => {
if (this.config.__test__throw_worker) {
Expand All @@ -220,8 +222,8 @@ class Manager extends EventEmitter {
if (batchSize) {
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expire_in_seconds), 0)

// Failing will fail all fetched jobs
await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration)
.then(() => this.complete(jobs.map(job => job.id)))
.catch(err => this.fail(jobs.map(job => job.id), err))
} else {
if (refill) {
Expand Down Expand Up @@ -487,27 +489,32 @@ class Manager extends EventEmitter {
async fetch (name, batchSize, options = {}) {
const values = Attorney.checkFetchArgs(name, batchSize, options)
const db = options.db || this.db
const result = await db.executeSql(
this.nextJobCommand(options.includeMetadata || false),
[values.name, batchSize || 1]
)
const preparedStatement = this.nextJobCommand(options.includeMetadata || false, options.enforceSingletonQueueActiveLimit || false)
const statementValues = [values.name, batchSize || 1]

let result
if (options.enforceSingletonQueueActiveLimit && !options.db) {
// Prepare/format now and send multi-statement transaction
const fetchQuery = preparedStatement
.replace('$1', Db.quotePostgresStr(statementValues[0]))
.replace('$2', statementValues[1].toString())
// eslint-disable-next-line no-unused-vars
const [_begin, _setLocal, fetchResult, _commit] = await db.executeSql([
'BEGIN',
'SET LOCAL jit = OFF', // JIT can slow things down significantly
fetchQuery,
'COMMIT'
].join(';\n'))
result = fetchResult
} else {
result = await db.executeSql(preparedStatement, statementValues)
}

if (!result || result.rows.length === 0) {
return null
}

const jobs = result.rows.map(job => {
job.done = async (error, response) => {
if (error) {
await this.fail(job.id, error)
} else {
await this.complete(job.id, response)
}
}
return job
})

return jobs.length === 1 && !batchSize ? jobs[0] : jobs
return result.rows.length === 1 && !batchSize ? result.rows[0] : result.rows
}

async fetchCompleted (name, batchSize, options = {}) {
Expand Down
22 changes: 20 additions & 2 deletions src/plans.js
Expand Up @@ -351,13 +351,31 @@ function insertVersion (schema, version) {
}

function fetchNextJob (schema) {
return (includeMetadata) => `
return (includeMetadata, enforceSingletonQueueActiveLimit) => `
WITH nextJob as (
SELECT id
FROM ${schema}.job
FROM ${schema}.job j
WHERE state < '${states.active}'
AND name LIKE $1
AND startAfter < now()
${enforceSingletonQueueActiveLimit
? `AND (
CASE
WHEN singletonKey IS NOT NULL
AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%'
THEN NOT EXISTS (
SELECT 1
FROM ${schema}.job active_job
WHERE active_job.state = '${states.active}'
AND active_job.name = j.name
AND active_job.singletonKey = j.singletonKey
LIMIT 1
)
ELSE
true
END
)`
: ''}
ORDER BY priority desc, createdOn, id
LIMIT $2
FOR UPDATE SKIP LOCKED
Expand Down
18 changes: 0 additions & 18 deletions test/completeTest.js
Expand Up @@ -109,24 +109,6 @@ describe('complete', function () {
})
})

it('work()\'s job.done() should allow sending completion payload', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true })

const queue = this.test.bossConfig.schema
const responsePayload = { arg1: '123' }

await boss.send(queue)

boss.work(queue, job => job.done(null, responsePayload))

return new Promise((resolve) => {
boss.onComplete(queue, async job => {
assert.strictEqual(job.data.response.arg1, responsePayload.arg1)
resolve()
})
})
})

it('should remove an onComplete worker', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true })

Expand Down

0 comments on commit 8049e7b

Please sign in to comment.