diff --git a/.travis.yml b/.travis.yml index 5a4f489b..7aec7b65 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,6 @@ language: node_js node_js: - '18' - '16' - - '14' addons: postgresql: '13' apt: diff --git a/README.md b/README.md index 195975a3..420915d6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ Queueing jobs in Node.js using PostgreSQL like a boss. -[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-9.5+-blue.svg?maxAge=2592000)](http://www.postgresql.org) +[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-11+-blue.svg?maxAge=2592000)](http://www.postgresql.org) [![npm version](https://badge.fury.io/js/pg-boss.svg)](https://badge.fury.io/js/pg-boss) [![Build Status](https://app.travis-ci.com/timgit/pg-boss.svg?branch=master)](https://app.travis-ci.com/github/timgit/pg-boss) [![Coverage Status](https://coveralls.io/repos/github/timgit/pg-boss/badge.svg?branch=master)](https://coveralls.io/github/timgit/pg-boss?branch=master) @@ -50,8 +50,8 @@ This will likely cater the most to teams already familiar with the simplicity of * Automatic maintenance operations to manage table growth ## Requirements -* Node 14 or higher -* PostgreSQL 9.5 or higher +* Node 16 or higher +* PostgreSQL 11 or higher ## Installation diff --git a/docs/readme.md b/docs/readme.md index 45b24e7d..df80a18c 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -47,17 +47,17 @@ - [`schedule(name, cron, data, options)`](#schedulename-cron-data-options) - [`unschedule(name)`](#unschedulename) - [`getSchedules()`](#getschedules) - - [`cancel(id)`](#cancelid) - - [`cancel([ids])`](#cancelids) - - [`resume(id)`](#resumeid) - - [`resume([ids])`](#resumeids) - - [`complete(id [, data])`](#completeid--data) - - [`complete([ids])`](#completeids) - - [`fail(id [, data])`](#failid--data) - - [`fail([ids])`](#failids) + - [`cancel(id, options)`](#cancelid-options) + - [`cancel([ids], options)`](#cancelids-options) + - [`resume(id, options)`](#resumeid-options) + - [`resume([ids], options)`](#resumeids-options) + - [`complete(id [, data, options])`](#completeid--data-options) + - [`complete([ids], options)`](#completeids-options) + - [`fail(id [, data, options])`](#failid--data-options) + - [`fail([ids], options)`](#failids-options) - [`notifyWorker(id)`](#notifyworkerid) - [`getQueueSize(name [, options])`](#getqueuesizename--options) - - [`getJobById(id)`](#getjobbyidid) + - [`getJobById(id, options)`](#getjobbyidid-options) - [`deleteQueue(name)`](#deletequeuename) - [`deleteAllQueues()`](#deleteallqueues) - [`clearStorage()`](#clearstorage) @@ -566,7 +566,7 @@ Available in constructor as a default, or overridden in send. * **singletonKey** string - Only allows 1 job (within the same name) to be queued or active with the same singletonKey. + Allows a max of 1 job (with the same name and singletonKey) to be queued or active. ```js boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId @@ -577,7 +577,9 @@ Available in constructor as a default, or overridden in send. * **useSingletonQueue** boolean - When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey. + When used in conjunction with singletonKey, allows a max of 1 job to be queued. + + >By default, there is no limit on the number of these jobs that may be active. However, this behavior may be modified by passing the [enforceSingletonQueueActiveLimit](#fetch) option. ```js boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId @@ -751,6 +753,11 @@ Typically one would use `work()` for automated polling for new jobs based upon a | oncomplete | bool | | output | object | + * `enforceSingletonQueueActiveLimit`, bool + + If `true`, modifies the behavior of the `useSingletonQueue` flag to allow a max of 1 job to be queued plus a max of 1 job to be active. + >Note that use of this option can impact performance on instances with large numbers of jobs. + **Resolves** - `[job]`: array of job objects, `null` if none found @@ -826,6 +833,10 @@ The default concurrency for `work()` is 1 job every 2 seconds. Both the interval Same as in [`fetch()`](#fetch) +* **enforceSingletonQueueActiveLimit**, bool + + Same as in [`fetch()`](#fetch) + **Polling options** How often workers will poll the queue table for jobs. Available in the constructor as a default or per worker in `work()` and `onComplete()`. @@ -845,11 +856,9 @@ How often workers will poll the queue table for jobs. Available in the construct **Handler function** -Typically `handler` will be an `async` function, since this automatically returns promises that can be awaited for backpressure support. - -If handler returns a promise, the value resolved/returned will be stored in a completion job. Likewise, if an error occurs in the handler, it will be caught and useful error properties stored into a completion job in addition to marking the job as failed. +`handler` should either be an `async` function or return a promise. If an error occurs in the handler, it will be caught and stored into an output storage column in addition to marking the job as failed. -Finally, and importantly, promise-returning handlers will be awaited before polling for new jobs which provides **automatic backpressure**. +Enforcing promise-returning handlers that are awaited in the workers defers polling for new jobs until the existing jobs are completed, providing backpressure. The job object has the following properties. @@ -858,11 +867,8 @@ The job object has the following properties. |`id`| string, uuid | |`name`| string | |`data`| object | -|`done(err, data)` | function | callback function used to mark the job as completed or failed. Returns a promise. -If `handler` does not return a promise, `done()` should be used to mark the job as completed or failed. `done()` accepts optional arguments, `err` and `data`, for usage with [`onComplete()`](#oncompletename--options-handler) state-based workers. If `err` is truthy, it will mark the job as failed. - -> If the job is not completed, either by returning a promise from `handler` or manually via `job.done()`, it will expire after the configured expiration period. +> If the job is not completed, it will expire after the configured expiration period. Following is an example of a worker that returns a promise (`sendWelcomeEmail()`) for completion with the teamSize option set for increased job concurrency between polling intervals. @@ -871,17 +877,6 @@ const options = { teamSize: 5, teamConcurrency: 5 } await boss.work('email-welcome', options, job => myEmailService.sendWelcomeEmail(job.data)) ``` -And the same example, but without returning a promise in the handler. - -```js -const options = { teamSize: 5, teamConcurrency: 5 } -await boss.work('email-welcome', options, job => { - myEmailService.sendWelcomeEmail(job.data) - .then(() => job.done()) - .catch(error => job.done(error)) - }) -``` - Similar to the first example, but with a batch of jobs at once. ```js diff --git a/package-lock.json b/package-lock.json index f4f77617..50cc8966 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "pg-boss", - "version": "8.4.2", + "version": "9.0.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "pg-boss", - "version": "8.4.2", + "version": "9.0.0", "license": "MIT", "dependencies": { "cron-parser": "^4.0.0", @@ -27,7 +27,7 @@ "standard": "^17.0.0" }, "engines": { - "node": ">=14" + "node": ">=16" } }, "node_modules/@ampproject/remapping": { diff --git a/package.json b/package.json index 4751504a..bbc55be2 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,10 @@ { "name": "pg-boss", - "version": "8.4.2", + "version": "9.0.0", "description": "Queueing jobs in Node.js using PostgreSQL like a boss", "main": "./src/index.js", "engines": { - "node": ">=14" + "node": ">=16" }, "dependencies": { "cron-parser": "^4.0.0", diff --git a/src/attorney.js b/src/attorney.js index 907c4724..2a6fae4d 100644 --- a/src/attorney.js +++ b/src/attorney.js @@ -129,6 +129,7 @@ function checkWorkArgs (name, args, defaults) { assert(!('teamSize' in options) || (Number.isInteger(options.teamSize) && options.teamSize >= 1), 'teamSize must be an integer > 0') assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0') assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') + assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean') return { options, callback } } @@ -140,6 +141,7 @@ function checkFetchArgs (name, batchSize, options) { assert(!batchSize || (Number.isInteger(batchSize) && batchSize >= 1), 'batchSize must be an integer > 0') assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') + assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean') return { name } } diff --git a/src/boss.js b/src/boss.js index d097130c..01a47c91 100644 --- a/src/boss.js +++ b/src/boss.js @@ -140,7 +140,7 @@ class Boss extends EventEmitter { this.emit('maintenance', { ms: ended - started }) if (!this.stopped) { - await job.done() // pre-complete to bypass throttling + await this.manager.complete(job.id) // pre-complete to bypass throttling await this.maintenanceAsync({ startAfter: this.maintenanceIntervalSeconds }) } } catch (err) { @@ -159,7 +159,7 @@ class Boss extends EventEmitter { this.emit(events.monitorStates, states) if (!this.stopped && this.monitorStates) { - await job.done() // pre-complete to bypass throttling + await this.manager.complete(job.id) // pre-complete to bypass throttling await this.monitorStatesAsync({ startAfter: this.monitorIntervalSeconds }) } } catch (err) { diff --git a/src/db.js b/src/db.js index f03dde84..4826137e 100644 --- a/src/db.js +++ b/src/db.js @@ -36,6 +36,14 @@ class Db extends EventEmitter { return await this.pool.query(text, values) } } + + static quotePostgresStr (str) { + const delimeter = '$sanitize$' + if (str.includes(delimeter)) { + throw new Error(`Attempted to quote string that contains reserved Postgres delimeter: ${str}`) + } + return `${delimeter}${str}${delimeter}` + } } module.exports = Db diff --git a/src/manager.js b/src/manager.js index 302d1a7d..c6d7576a 100644 --- a/src/manager.js +++ b/src/manager.js @@ -6,6 +6,7 @@ const debounce = require('lodash.debounce') const { serializeError: stringify } = require('serialize-error') const Attorney = require('./attorney') const Worker = require('./worker') +const Db = require('./db') const pMap = require('p-map') const { QUEUES: BOSS_QUEUES } = require('./boss') @@ -184,7 +185,8 @@ class Manager extends EventEmitter { teamSize = 1, teamConcurrency = 1, teamRefill: refill = false, - includeMetadata = false + includeMetadata = false, + enforceSingletonQueueActiveLimit = false } = options const id = uuid.v4() @@ -208,7 +210,7 @@ class Manager extends EventEmitter { createTeamRefillPromise() } - const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata }) + const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, enforceSingletonQueueActiveLimit }) const onFetch = async (jobs) => { if (this.config.__test__throw_worker) { @@ -220,8 +222,8 @@ class Manager extends EventEmitter { if (batchSize) { const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expire_in_seconds), 0) - // Failing will fail all fetched jobs await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration) + .then(() => this.complete(jobs.map(job => job.id))) .catch(err => this.fail(jobs.map(job => job.id), err)) } else { if (refill) { @@ -487,27 +489,32 @@ class Manager extends EventEmitter { async fetch (name, batchSize, options = {}) { const values = Attorney.checkFetchArgs(name, batchSize, options) const db = options.db || this.db - const result = await db.executeSql( - this.nextJobCommand(options.includeMetadata || false), - [values.name, batchSize || 1] - ) + const preparedStatement = this.nextJobCommand(options.includeMetadata || false, options.enforceSingletonQueueActiveLimit || false) + const statementValues = [values.name, batchSize || 1] + + let result + if (options.enforceSingletonQueueActiveLimit && !options.db) { + // Prepare/format now and send multi-statement transaction + const fetchQuery = preparedStatement + .replace('$1', Db.quotePostgresStr(statementValues[0])) + .replace('$2', statementValues[1].toString()) + // eslint-disable-next-line no-unused-vars + const [_begin, _setLocal, fetchResult, _commit] = await db.executeSql([ + 'BEGIN', + 'SET LOCAL jit = OFF', // JIT can slow things down significantly + fetchQuery, + 'COMMIT' + ].join(';\n')) + result = fetchResult + } else { + result = await db.executeSql(preparedStatement, statementValues) + } if (!result || result.rows.length === 0) { return null } - const jobs = result.rows.map(job => { - job.done = async (error, response) => { - if (error) { - await this.fail(job.id, error) - } else { - await this.complete(job.id, response) - } - } - return job - }) - - return jobs.length === 1 && !batchSize ? jobs[0] : jobs + return result.rows.length === 1 && !batchSize ? result.rows[0] : result.rows } async fetchCompleted (name, batchSize, options = {}) { diff --git a/src/plans.js b/src/plans.js index bb7b2760..73dc0014 100644 --- a/src/plans.js +++ b/src/plans.js @@ -351,13 +351,31 @@ function insertVersion (schema, version) { } function fetchNextJob (schema) { - return (includeMetadata) => ` + return (includeMetadata, enforceSingletonQueueActiveLimit) => ` WITH nextJob as ( SELECT id - FROM ${schema}.job + FROM ${schema}.job j WHERE state < '${states.active}' AND name LIKE $1 AND startAfter < now() + ${enforceSingletonQueueActiveLimit + ? `AND ( + CASE + WHEN singletonKey IS NOT NULL + AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' + THEN NOT EXISTS ( + SELECT 1 + FROM ${schema}.job active_job + WHERE active_job.state = '${states.active}' + AND active_job.name = j.name + AND active_job.singletonKey = j.singletonKey + LIMIT 1 + ) + ELSE + true + END + )` + : ''} ORDER BY priority desc, createdOn, id LIMIT $2 FOR UPDATE SKIP LOCKED diff --git a/test/completeTest.js b/test/completeTest.js index 8be87524..1a4d953b 100644 --- a/test/completeTest.js +++ b/test/completeTest.js @@ -109,24 +109,6 @@ describe('complete', function () { }) }) - it('work()\'s job.done() should allow sending completion payload', async function () { - const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true }) - - const queue = this.test.bossConfig.schema - const responsePayload = { arg1: '123' } - - await boss.send(queue) - - boss.work(queue, job => job.done(null, responsePayload)) - - return new Promise((resolve) => { - boss.onComplete(queue, async job => { - assert.strictEqual(job.data.response.arg1, responsePayload.arg1) - resolve() - }) - }) - }) - it('should remove an onComplete worker', async function () { const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true }) diff --git a/test/databaseTest.js b/test/databaseTest.js index 3f4115ac..5c8bbcf4 100644 --- a/test/databaseTest.js +++ b/test/databaseTest.js @@ -1,5 +1,6 @@ const assert = require('assert') const PgBoss = require('../') +const Db = require('../src/db') describe('database', function () { it('should fail on invalid database host', async function () { @@ -25,4 +26,21 @@ describe('database', function () { assert(response.text === query) }) + + describe('Db.quotePostgresStr', function () { + it('should dollar-sign quote specified input', async function () { + const str = Db.quotePostgresStr('Here\'s my input') + assert(str === '$sanitize$Here\'s my input$sanitize$') + }) + + it('should error if input contains reserved quote delimiter', async function () { + const badInput = '$sanitize$; DROP TABLE job --' + try { + Db.quotePostgresStr(badInput) + assert(false, 'Error was expected but did not occur') + } catch (err) { + assert(err.message === `Attempted to quote string that contains reserved Postgres delimeter: ${badInput}`) + } + }) + }) }) diff --git a/test/errorTest.js b/test/errorTest.js index f8eeb58c..58564841 100644 --- a/test/errorTest.js +++ b/test/errorTest.js @@ -10,14 +10,13 @@ describe('error', function () { await boss.send(queue) await boss.send(queue) - return new Promise((resolve) => { + await new Promise((resolve) => { boss.work(queue, async job => { processCount++ if (processCount === 1) { throw new Error('test - nothing to see here') } else { - job.done() resolve() } }) diff --git a/test/failureTest.js b/test/failureTest.js index b170a84e..6b66dd4f 100644 --- a/test/failureTest.js +++ b/test/failureTest.js @@ -113,29 +113,6 @@ describe('failure', function () { assert.strictEqual(job.data.response.some.deeply.nested.reason, failPayload.some.deeply.nested.reason) }) - it('failure via done() should pass error payload to failed job', async function () { - const boss = this.test.boss = await helper.start(this.test.bossConfig) - const queue = this.test.bossConfig.schema - const errorMessage = 'mah error' - - await boss.send(queue, null, { onComplete: true }) - - return new Promise((resolve) => { - boss.work(queue, async job => { - const error = new Error(errorMessage) - - await job.done(error) - - const failedJob = await boss.fetchCompleted(queue) - - assert.strictEqual(failedJob.data.state, 'failed') - assert.strictEqual(failedJob.data.response.message, errorMessage) - - resolve() - }) - }) - }) - it('failure via Promise reject() should pass string wrapped in value prop', async function () { const boss = this.test.boss = await helper.start(this.test.bossConfig) const queue = this.test.bossConfig.schema @@ -208,4 +185,27 @@ describe('failure', function () { assert.strictEqual(called, true) }) + + it('failure with circular payload should be safely serialized', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + + await boss.send(queue, null, { onComplete: true }) + + await boss.work(queue, async job => { + const err = { + message: 'something' + } + + err.myself = err + + throw err + }) + + await delay(2000) + + const job = await boss.fetchCompleted(queue) + + assert(job) + }) }) diff --git a/test/fetchTest.js b/test/fetchTest.js index d2a9a858..092230c4 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -121,4 +121,49 @@ describe('fetch', function () { assert(job.startedon === undefined) assert.strictEqual(calledCounter, 2) }) + + describe('enforceSingletonQueueActiveLimit option', function () { + it('when enforceSingletonQueueActiveLimit=false, should fetch singleton queue job even if there is already an active one', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true } + const sendArgs = [queue, {}, jobOptions] + const fetchArgs = [queue, undefined, { enforceSingletonQueueActiveLimit: false }] + + const publish1 = await boss.send(...sendArgs) + assert(publish1) + const fetch1 = await boss.fetch(...fetchArgs) + assert(fetch1) + + const publish2 = await boss.send(...sendArgs) + assert(publish2) + const fetch2 = await boss.fetch(...fetchArgs) + assert(fetch2) + }) + + it('when enforceSingletonQueueActiveLimit=true, should not fetch singleton queue job if there is already an active one', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true } + const sendArgs = [queue, {}, jobOptions] + const fetchArgs = [queue, undefined, { enforceSingletonQueueActiveLimit: true }] + + const publish1 = await boss.send(...sendArgs) + assert(publish1) + const fetch1 = await boss.fetch(...fetchArgs) + assert(fetch1) + + const publish2 = await boss.send(...sendArgs) + assert(publish2) + // Job 1 still active, can't fetch job 2 + const fetch2 = await boss.fetch(...fetchArgs) + assert(fetch2 === null) + + await boss.complete(fetch1.id) + // Job 1 no longer active, should be able to fetch job 2 + const retryFetch2 = await boss.fetch(...fetchArgs) + assert(retryFetch2) + assert(retryFetch2.id === publish2) + }) + }) }) diff --git a/test/retryTest.js b/test/retryTest.js index d08f4fad..13f5dbb4 100644 --- a/test/retryTest.js +++ b/test/retryTest.js @@ -83,7 +83,11 @@ describe('retries', function () { let processCount = 0 const retryLimit = 4 - await boss.work(queue, { newJobCheckInterval: 500 }, job => job.done(++processCount)) + await boss.work(queue, { newJobCheckInterval: 500 }, async () => { + ++processCount + throw new Error('retry') + }) + await boss.send(queue, null, { retryLimit, retryBackoff: true }) await delay(9000) diff --git a/test/workTest.js b/test/workTest.js index 1cc3fa1d..75afc133 100644 --- a/test/workTest.js +++ b/test/workTest.js @@ -213,14 +213,34 @@ describe('work', function () { }) }) + it('batchSize should auto-complete the jobs', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + + await boss.send(queue, null, { onComplete: true }) + + await new Promise((resolve) => { + boss.work(queue, { batchSize: 1 }, async jobs => { + assert.strictEqual(jobs.length, 1) + resolve() + }) + }) + + await delay(2000) + + const result = await boss.fetchCompleted(queue) + + assert(result) + }) + it('returning promise applies backpressure', async function () { const boss = this.test.boss = await helper.start(this.test.bossConfig) const queue = 'backpressure' - const batchSize = 4 + const jobCount = 4 let processCount = 0 - for (let i = 0; i < batchSize; i++) { + for (let i = 0; i < jobCount; i++) { await boss.send(queue) } @@ -232,7 +252,7 @@ describe('work', function () { await delay(7000) - assert(processCount < batchSize) + assert(processCount < jobCount) }) it('top up jobs when at least one job in team is still running', async function () { @@ -300,20 +320,6 @@ describe('work', function () { assert(remainCount === 2) }) - it('should have a done callback for single job', async function () { - const boss = this.test.boss = await helper.start(this.test.bossConfig) - const queue = 'process-single' - - await boss.send(queue) - - return new Promise((resolve) => { - boss.work(queue, async job => { - job.done() - resolve() - }) - }) - }) - it('completion should pass string wrapped in value prop', async function () { const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true }) diff --git a/types.d.ts b/types.d.ts index 57f83a7e..183f8661 100644 --- a/types.d.ts +++ b/types.d.ts @@ -114,20 +114,22 @@ declare namespace PgBoss { teamRefill?: boolean; batchSize?: number; includeMetadata?: boolean; + enforceSingletonQueueActiveLimit?: boolean; } type WorkOptions = JobFetchOptions & JobPollingOptions type FetchOptions = { includeMetadata?: boolean; + enforceSingletonQueueActiveLimit?: boolean; } & ConnectionOptions; - interface WorkHandler { - (job: PgBoss.JobWithDoneCallback): Promise | void; + interface WorkHandler { + (job: PgBoss.Job): void; } - interface WorkWithMetadataHandler { - (job: PgBoss.JobWithMetadataDoneCallback): Promise | void; + interface WorkWithMetadataHandler { + (job: PgBoss.JobWithMetadata): void; } interface Request { @@ -143,10 +145,6 @@ declare namespace PgBoss { options?: ScheduleOptions; } - interface JobDoneCallback { - (err?: Error | null, data?: T): void; - } - // source (for now): https://github.com/bendrucker/postgres-interval/blob/master/index.d.ts interface PostgresInterval { years?: number; @@ -205,14 +203,6 @@ declare namespace PgBoss { onComplete?: boolean } - interface JobWithDoneCallback extends Job { - done: JobDoneCallback; - } - - interface JobWithMetadataDoneCallback extends JobWithMetadata { - done: JobDoneCallback; - } - interface MonitorState { all: number; created: number; @@ -309,9 +299,9 @@ declare class PgBoss extends EventEmitter { insert(jobs: PgBoss.JobInsert[]): Promise; insert(jobs: PgBoss.JobInsert[], options: PgBoss.InsertOptions): Promise; - work(name: string, handler: PgBoss.WorkHandler): Promise; - work(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler): Promise; - work(name: string, options: PgBoss.WorkOptions, handler: PgBoss.WorkHandler): Promise; + work(name: string, handler: PgBoss.WorkHandler): Promise; + work(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler): Promise; + work(name: string, options: PgBoss.WorkOptions, handler: PgBoss.WorkHandler): Promise; onComplete(name: string, handler: Function): Promise; onComplete(name: string, options: PgBoss.WorkOptions, handler: Function): Promise;