Skip to content

Commit

Permalink
test fixes and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed May 4, 2021
1 parent 64f60f9 commit 912a89d
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 30 deletions.
34 changes: 17 additions & 17 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@

## 6.0.0 :tada:

- `stop({ graceful = true, timeout = 30000 })` will now attempt to gracefully stop any polling subscriptions (workers) by sending them a signal and then waiting for the workers to drain. The promise will still resolve once the signal is sent, so if you need to be notified when all work is completed, add a listener to the `stopped` event. Once this event is emitted, if pg-boss had created its own connection pool, it will be closed.
- Added a `wip` event for polling subscriptions that will emit as jobs are in-flight. If no work is being done, no events will be emitted. This will emit at most once every 2 seconds for monitoring purposes.
- Added the `output` jsonb column to storage tables to store result or error data along with the original job, which were previously only available via completion jobs. This has the added benefit of storing any errors or results from completion jobs themselves, which were previously discarded.
- `getJobById(id)` can now be used to fetch a job from either primary or archive storage by id. This may be helpful if needed to inspect `output` and you have the job id.
- Added new option, `ignoreActive`, to `publishOnce()` to ignore active jobs.
- MAJOR: Added a new index to the primary job table to improve fetch time performace as the job table size increases. Depending on how many jobs you have in your job table, creating this index may delay `start()` bootstrapping promise resolution. If this is a concern, you can fetch the new schema version via `getMigrationPlans()` and create the indexes out of band. The migration includes an `IF NOT EXISTS` to bypass creation.
- CHANGE: `stop()` has been enhanced with a **graceful stop** feature that will signal and monitor any polling subscriptions (workers using `subscribe()` or `onComlete()`) before closing the internal connection pool and stopping maintenance operations. The defalt options, `{ graceful = true, timeout = 30000 }`, will wait up to 30s before shutting down.
- NEW: Added a `stopped` event that will be emitted after `stop()` when all workers have completed active jobs, or when the timeout is met, whichever is sooner.
- NEW: Added a `wip` event that will emit as jobs are both fetched and completed. If no work is being done, no events will be emitted. This will emit at most once every 2 seconds for monitoring purposes.
- NEW: Added the `output` jsonb column to storage tables to store result or error data along with the original job, which were previously only available via completion jobs. This has the added benefit of storing any errors or results from completion jobs themselves, which were previously discarded.
- NEW: `getJobById(id)` can now be used to fetch a job from either primary or archive storage by id. This may be helpful if needed to inspect `output` and you have the job id.
- NEW: Added new function, `publishSingleton()`, similar to publishOnce(), but throttles publish to only allow 1 job in the queue at a time, allowing a job to be queued even if 1 or more jobs are currently active.
- MAJOR: `onComplete` is now defaulted to `false`, which breaks backward compatability for automatic creation of completion jobs. To restore the previous behavior of completion jobs being created by default, you should set `onComplete` to `true` in your constructor options.
- MAJOR: The default retention policy has been reduced from 30 to 14 days. This can still be customized as an option in the constructor.
- MAJOR: Node 10 is EOL. Node 12 is now the minimum supported version.
- MAJOR: Added a new index to the primary job table to improve fetch time performace as the job table size increases. Depending on how many jobs you have in your job table, creating this index may delay `start()` promise resolution. If this is a concern, you can fetch the new schema version via `getMigrationPlans()` and create the indexes out of band. The migration includes an `IF NOT EXISTS` to bypass creation.

For example, once you have installed this version, using the node repl, the following command will dump the migration commands for the default schema 'pgboss' (change this if customized).
In the following example, once you have installed this package version, using the node repl, you can get the DDL for the index from `getMigrationPlans()`.

```js
console.log(require('./node_modules/pg-boss').getMigrationPlans())
```
```shell

Which will print the indexes within the standard transaction scope:
$ node
Welcome to Node.js v14.16.1.
Type ".help" for more information.
> console.log(require('./node_modules/pg-boss').getMigrationPlans())

```shell
BEGIN;
...
CREATE INDEX ...
CREATE INDEX IF NOT EXISTS job_fetch ...
...
COMMIT;
```

- MAJOR: `onComplete` is now defaulted to `false`, which breaks backward compatability for automatic creation of completion jobs. To restore the previous behavior of completion jobs being created by default, you should set `onComplete` to `true` in your constructor options.
- MAJOR: The default retention policy has been reduced from 30 to 14 days. This can still be customized as an option in the constructor.
- MAJOR: Node 10 is End-of-Life. Node 12 is now the minimum supported version.

## 5.2.3

- Dependency PR from dependabot
Expand Down
Binary file modified docs/images/job-states.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 8 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- [`publish(request)`](#publishrequest)
- [`publishAfter(name, data, options, seconds | ISO date string | Date)`](#publishaftername-data-options-seconds--iso-date-string--date)
- [`publishOnce(name, data, options, key)`](#publishoncename-data-options-key)
- [`publishSingleton(name, data, options)`](#publishsingletonname-data-options)
- [`publishThrottled(name, data, options, seconds [, key])`](#publishthrottledname-data-options-seconds--key)
- [`publishDebounced(name, data, options, seconds [, key])`](#publishdebouncedname-data-options-seconds--key)
- [`subscribe()`](#subscribe)
Expand Down Expand Up @@ -332,7 +333,13 @@ This is a convenience version of `publish()` with the `startAfter` option assign

### `publishOnce(name, data, options, key)`

Publish a job with a unique key to make sure it isn't processed more than once. Any other jobs published during this archive interval with the same queue name and key will be rejected.
Publish a job with a unique key to only allow 1 job to be in created, retry, or active state at a time.

This is a convenience version of `publish()` with the `singletonKey` option assigned.

### `publishSingleton(name, data, options)`

Publish a job but only allow 1 job to be in created or retry state at at time.

This is a convenience version of `publish()` with the `singletonKey` option assigned.

Expand Down
2 changes: 1 addition & 1 deletion src/contractor.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Contractor {
return plans.create(schema, schemaVersion)
}

static migrationPlans (schema = DEFAULT_SCHEMA, version = schemaVersion) {
static migrationPlans (schema = DEFAULT_SCHEMA, version = schemaVersion - 1) {
return migrationStore.migrate(schema, version)
}

Expand Down
15 changes: 12 additions & 3 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Manager extends EventEmitter {
this.publishThrottled,
this.publishOnce,
this.publishAfter,
this.publishSingleton,
this.deleteQueue,
this.deleteAllQueues,
this.clearStorage,
Expand Down Expand Up @@ -222,9 +223,17 @@ class Manager extends EventEmitter {
async publishOnce (name, data, options, key) {
options = options || {}

const { ignoreActive = false } = options
options.singletonKey = key || name

options.singletonKey = ignoreActive ? SINGLETON_QUEUE_KEY : key || name
const result = Attorney.checkPublishArgs([name, data, options], this.config)

return this.createJob(result.name, result.data, result.options)
}

async publishSingleton (name, data, options) {
options = options || {}

options.singletonKey = SINGLETON_QUEUE_KEY

const result = Attorney.checkPublishArgs([name, data, options], this.config)

Expand Down Expand Up @@ -454,7 +463,7 @@ class Manager extends EventEmitter {
const result2 = await this.db.executeSql(fetchArchiveSql, [id])

if (result2 && result2.rows && result2.rows.length === 1) {
return result1.rows[0]
return result2.rows[0]
}

return null
Expand Down
4 changes: 4 additions & 0 deletions src/migrationStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ function getAll (schema, config) {
install: [
`CREATE INDEX IF NOT EXISTS job_fetch ON ${schema}.job (state, name text_pattern_ops, startAfter) WHERE state < 'active'`,
`CREATE UNIQUE INDEX IF NOT EXISTS job_singleton_queue ON ${schema}.job (name, singletonKey) WHERE state < 'active' AND singletonOn IS NULL AND singletonKey = '__pgboss__singleton_queue'`,
`DROP INDEX ${schema}.job_singletonKey`,
`CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < 'completed' AND singletonOn IS NULL AND NOT singletonKey = '__pgboss__singleton_queue'`,
`ALTER TABLE ${schema}.job ADD output jsonb`,
`ALTER TABLE ${schema}.archive ADD output jsonb`,
`ALTER TABLE ${schema}.job ALTER COLUMN on_complete SET DEFAULT false`
],
uninstall: [
`DROP INDEX ${schema}.job_fetch`,
`DROP INDEX ${schema}.job_singleton_queue`,
`DROP INDEX ${schema}.job_singletonKey`,
`CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < 'completed' AND singletonOn IS NULL`,
`ALTER TABLE ${schema}.job DROP COLUMN output`,
`ALTER TABLE ${schema}.archive DROP COLUMN output`,
`ALTER TABLE ${schema}.job ALTER COLUMN on_complete SET DEFAULT true`
Expand Down
16 changes: 8 additions & 8 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,14 @@ function getQueueSize (schema, options = {}) {
function createIndexSingletonKey (schema) {
// anything with singletonKey means "only 1 job can be queued or active at a time"
return `
CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < '${states.completed}' AND singletonOn IS NULL
CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < '${states.completed}' AND singletonOn IS NULL AND NOT singletonKey = '${SINGLETON_QUEUE_KEY}'
`
}

function createIndexSingletonQueue (schema) {
// "singleton queue" means "only 1 job can be queued at a time"
return `
CREATE UNIQUE INDEX job_singleton_queue ON ${schema}.job (name, singletonKey) WHERE state < '${states.active}' AND singletonOn IS NULL AND singletonKey = '${SINGLETON_QUEUE_KEY}'
`
}

Expand All @@ -247,13 +254,6 @@ function createIndexSingletonKeyOn (schema) {
`
}

function createIndexSingletonQueue (schema) {
// "singleton queue" means "only 1 job can be queued at a time"
return `
CREATE UNIQUE INDEX job_singleton_queue ON ${schema}.job (name, singletonKey) WHERE state < '${states.active}' AND singletonOn IS NULL AND singletonKey = '${SINGLETON_QUEUE_KEY}'
`
}

function createIndexJobName (schema) {
return `
CREATE INDEX job_name ON ${schema}.job (name text_pattern_ops)
Expand Down
20 changes: 20 additions & 0 deletions test/archiveTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ describe('archive', function () {
assert.strictEqual(queue, archivedJob.name)
})

it('should retrieve an archived job via getJobById()', async function () {
const config = { ...this.test.bossConfig, ...defaults }
const boss = this.test.boss = await helper.start(config)
const queue = this.test.bossConfig.schema

const jobId = await boss.publish(queue)
const job = await boss.fetch(queue)

assert.strictEqual(job.id, jobId)

await boss.complete(jobId)

await delay(4000)

const archivedJob = await boss.getJobById(jobId)

assert.strictEqual(jobId, archivedJob.id)
assert.strictEqual(queue, archivedJob.name)
})

it('should archive a created job', async function () {
const config = { ...this.test.bossConfig, ...defaults }
const boss = this.test.boss = await helper.start(config)
Expand Down
30 changes: 30 additions & 0 deletions test/singletonTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ describe('singleton', function () {
const jobId2 = await boss.publishOnce(queue, null, null, key)

assert.strictEqual(jobId2, null)

const job = await boss.fetch(queue)

assert.strictEqual(job.id, jobId)

const jobId3 = await boss.publishOnce(queue, null, null, key)

assert.strictEqual(jobId3, null)
})

it('publishOnce() without a key should also work', async function () {
Expand All @@ -74,4 +82,26 @@ describe('singleton', function () {

assert.strictEqual(jobId2, null)
})

it('publishSingleton() works', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)

const queue = this.test.bossConfig.schema

const jobId = await boss.publishSingleton(queue)

assert(jobId)

const jobId2 = await boss.publishSingleton(queue)

assert.strictEqual(jobId2, null)

const job = await boss.fetch(queue)

assert.strictEqual(job.id, jobId)

const jobId3 = await boss.publishSingleton(queue)

assert(jobId3)
})
})

0 comments on commit 912a89d

Please sign in to comment.