Skip to content

Commit

Permalink
multi-master maintenance job publish fix
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Mar 29, 2020
1 parent efee899 commit 709bb0d
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 25 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Changes

## 4.0.2
## 4.1.0

- Retention policies added for internal maintenance queues to reduce the number of records in the job table.
- Fixed issue in some multi-master use cases where too many maintenance jobs were being created.
- Changed `deleteQueue(name)` and `deleteAllQueues()` behavior to only impact pending queue items and not delete completed or active jobs.
- Added `getQueueSize(name` to retrieve the current size of a queue.
- Added `clearStorage()` as a utility function if and when needed to empty all job storage, archive included.
- Restored older schema migrations to allow upgrading directly to version 4 from version 1.1 and higher.

## 4.0.1

Expand Down
28 changes: 26 additions & 2 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
- [`fail([ids])`](#failids)
- [`deleteQueue(name)`](#deletequeuename)
- [`deleteAllQueues()`](#deleteallqueues)
- [`getQueueSize(name [, options])`](#getqueuesizename--options)
- [`clearStorage()`](#clearstorage)
- [Events](#events)
- [`error`](#error)
- [`archived`](#archived)
Expand Down Expand Up @@ -444,11 +446,33 @@ The promise will resolve on a successful failure state assignment, or reject if
## `deleteQueue(name)`

Deletes all jobs in the specified queue from the active job table. All jobs in the archive table are retained.
Deletes all pending jobs in the specified queue from the active job table. All jobs in the archive table are retained.

## `deleteAllQueues()`

Deletes all jobs from all queues in the active job table. All jobs in the archive table are retained. I guess you know what you're getting into with a function named like this.
Deletes all pending jobs from all queues in the active job table. All jobs in the archive table are retained.

## `getQueueSize(name [, options])`

Returns the number of pending jobs in a queue by name.

`options`: Optional, object.

| Prop | Type | Description | Default |
| - | - | - | - |
|`before`| string | count jobs in states before this state | states.active |

As an example, the following options object include active jobs along with created and retry.

```js
{
before: states.completed
}
```

## `clearStorage()`

Utility function if and when needed to empty all job storage. Internally, this issues a `TRUNCATE` command against all jobs tables, archive included.

# Events

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "4.0.2",
"version": "4.1.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
Expand Down
29 changes: 17 additions & 12 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Boss extends EventEmitter {

this.db = db
this.config = config
this.manager = config.manager

this.maintenanceIntervalSeconds = config.maintenanceIntervalSeconds

Expand All @@ -47,39 +48,43 @@ class Boss extends EventEmitter {
}

async supervise () {
await this.config.manager.deleteQueue(plans.completedJobPrefix + queues.MAINTENANCE)
await this.manager.deleteQueue(plans.completedJobPrefix + queues.MAINTENANCE)
await this.manager.deleteQueue(queues.MAINTENANCE)

await this.maintenanceAsync()

await this.config.manager.subscribe(queues.MAINTENANCE, { batchSize: 10 }, (jobs) => this.onMaintenance(jobs))
await this.manager.subscribe(queues.MAINTENANCE, { batchSize: 10 }, (jobs) => this.onMaintenance(jobs))

if (this.monitorStates) {
await this.config.manager.deleteQueue(plans.completedJobPrefix + queues.MONITOR_STATES)
await this.manager.deleteQueue(plans.completedJobPrefix + queues.MONITOR_STATES)
await this.manager.deleteQueue(queues.MONITOR_STATES)

await this.monitorStatesAsync()

await this.config.manager.subscribe(queues.MONITOR_STATES, { batchSize: 10 }, (jobs) => this.onMonitorStates(jobs))
await this.manager.subscribe(queues.MONITOR_STATES, { batchSize: 10 }, (jobs) => this.onMonitorStates(jobs))
}
}

async maintenanceAsync () {
const options = {
startAfter: this.maintenanceIntervalSeconds,
keepUntilSeconds: this.maintenanceIntervalSeconds * 2
keepUntilSeconds: this.maintenanceIntervalSeconds * 2,
singletonKey: queues.MAINTENANCE,
retryLimit: 5,
retryBackoff: true
}

await this.config.manager.deleteQueue(queues.MAINTENANCE)
await this.config.manager.publish(queues.MAINTENANCE, null, options)
await this.manager.publish(queues.MAINTENANCE, null, options)
}

async monitorStatesAsync () {
const options = {
startAfter: this.monitorIntervalSeconds,
keepUntilSeconds: this.monitorIntervalSeconds * 2
keepUntilSeconds: this.monitorIntervalSeconds * 2,
singletonKey: queues.MONITOR_STATES
}

await this.config.manager.deleteQueue(queues.MONITOR_STATES)
await this.config.manager.publish(queues.MONITOR_STATES, null, options)
await this.manager.publish(queues.MONITOR_STATES, null, options)
}

async onMaintenance (jobs) {
Expand All @@ -94,7 +99,7 @@ class Boss extends EventEmitter {
this.emitValue(events.archived, await this.archive())
this.emitValue(events.deleted, await this.purge())

await this.config.manager.complete(jobs.map(j => j.id))
await this.manager.complete(jobs.map(j => j.id))

const ended = Date.now()

Expand Down Expand Up @@ -124,7 +129,7 @@ class Boss extends EventEmitter {

this.emit(events.monitorStates, states)

await this.config.manager.complete(jobs.map(j => j.id))
await this.manager.complete(jobs.map(j => j.id))
} catch (err) {
this.emit(events.error, err)
}
Expand Down
19 changes: 18 additions & 1 deletion src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Manager extends EventEmitter {
this.failJobsCommand = plans.failJobs(config.schema)
this.deleteQueueCommand = plans.deleteQueue(config.schema)
this.deleteAllQueuesCommand = plans.deleteAllQueues(config.schema)
this.clearStorageCommand = plans.clearStorage(config.schema)

// exported api to index
this.functions = [
Expand All @@ -48,7 +49,9 @@ class Manager extends EventEmitter {
this.publishOnce,
this.publishAfter,
this.deleteQueue,
this.deleteAllQueues
this.deleteAllQueues,
this.clearStorage,
this.getQueueSize
]
}

Expand Down Expand Up @@ -298,6 +301,20 @@ class Manager extends EventEmitter {
async deleteAllQueues () {
return this.db.executeSql(this.deleteAllQueuesCommand)
}

async clearStorage () {
return this.db.executeSql(this.clearStorageCommand)
}

async getQueueSize (queue, options) {
assert(queue, 'Missing queue name argument')

const sql = plans.getQueueSize(this.config.schema, options)

const { rows } = await this.db.executeSql(sql, [queue])

return parseFloat(rows[0].count)
}
}

module.exports = Manager
140 changes: 140 additions & 0 deletions src/migrationStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ function getAll (schema, config) {

return [
{
release: '4.0.0',
version: 12,
previous: 11,
install: [
Expand All @@ -95,6 +96,7 @@ function getAll (schema, config) {
]
},
{
release: '3.2.0',
version: 11,
previous: 10,
install: [
Expand All @@ -105,6 +107,7 @@ function getAll (schema, config) {
]
},
{
release: '3.1.3',
version: 10,
previous: 9,
install: [
Expand All @@ -115,6 +118,7 @@ function getAll (schema, config) {
]
},
{
release: '3.1.0',
version: 9,
previous: 8,
install: [
Expand All @@ -129,6 +133,142 @@ function getAll (schema, config) {
`DROP INDEX ${schema}.job_name`,
`CREATE INDEX job_name ON ${schema}.job (name) WHERE state < 'active'`
]
},
{
release: '3.0.0',
version: 8,
previous: 7,
install: [
'CREATE EXTENSION IF NOT EXISTS pgcrypto',
`ALTER TABLE ${schema}.job ALTER COLUMN id SET DEFAULT gen_random_uuid()`,
`ALTER TABLE ${schema}.job ADD retryDelay integer not null DEFAULT (0)`,
`ALTER TABLE ${schema}.job ADD retryBackoff boolean not null DEFAULT false`,
`ALTER TABLE ${schema}.job ADD startAfter timestamp with time zone not null default now()`,
`UPDATE ${schema}.job SET startAfter = createdOn + startIn`,
`ALTER TABLE ${schema}.job DROP COLUMN startIn`,
`UPDATE ${schema}.job SET expireIn = interval '15 minutes' WHERE expireIn IS NULL`,
`ALTER TABLE ${schema}.job ALTER COLUMN expireIn SET NOT NULL`,
`ALTER TABLE ${schema}.job ALTER COLUMN expireIn SET DEFAULT interval '15 minutes'`,
// archive table schema changes
`ALTER TABLE ${schema}.archive ADD retryDelay integer not null DEFAULT (0)`,
`ALTER TABLE ${schema}.archive ADD retryBackoff boolean not null DEFAULT false`,
`ALTER TABLE ${schema}.archive ADD startAfter timestamp with time zone`,
`UPDATE ${schema}.archive SET startAfter = createdOn + startIn`,
`ALTER TABLE ${schema}.archive DROP COLUMN startIn`,
// rename complete to completed for state enum - can't use ALTER TYPE :(
`DROP INDEX ${schema}.job_fetch`,
`DROP INDEX ${schema}.job_singletonOn`,
`DROP INDEX ${schema}.job_singletonKeyOn`,
`DROP INDEX ${schema}.job_singletonKey`,
`ALTER TABLE ${schema}.job ALTER COLUMN state DROP DEFAULT`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DATA TYPE text USING state::text`,
`ALTER TABLE ${schema}.archive ALTER COLUMN state SET DATA TYPE text USING state::text`,
`DROP TYPE ${schema}.job_state`,
`CREATE TYPE ${schema}.job_state AS ENUM ('created', 'retry', 'active', 'completed', 'expired', 'cancelled', 'failed')`,
`UPDATE ${schema}.job SET state = 'completed' WHERE state = 'complete'`,
`UPDATE ${schema}.archive SET state = 'completed' WHERE state = 'complete'`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DATA TYPE ${schema}.job_state USING state::${schema}.job_state`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DEFAULT 'created'`,
`ALTER TABLE ${schema}.archive ALTER COLUMN state SET DATA TYPE ${schema}.job_state USING state::${schema}.job_state`,
`CREATE INDEX job_fetch ON ${schema}.job (name, priority desc, createdOn, id) WHERE state < 'active'`,
`CREATE UNIQUE INDEX job_singletonOn ON ${schema}.job (name, singletonOn) WHERE state < 'expired' AND singletonKey IS NULL`,
`CREATE UNIQUE INDEX job_singletonKeyOn ON ${schema}.job (name, singletonOn, singletonKey) WHERE state < 'expired'`,
`CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < 'completed' AND singletonOn IS NULL`,
// add new job name index
`CREATE INDEX job_name ON ${schema}.job (name) WHERE state < 'active'`
],
uninstall: [
`ALTER TABLE ${schema}.job ALTER COLUMN id DROP DEFAULT`,
// won't know if we should drop pgcrypto extension so it stays
`ALTER TABLE ${schema}.job DROP COLUMN retryDelay`,
`ALTER TABLE ${schema}.job DROP COLUMN retryBackoff`,
`ALTER TABLE ${schema}.job DROP COLUMN startAfter`,
`ALTER TABLE ${schema}.job ADD COLUMN startIn interval not null default(interval '0')`,
// leaving migrated default data for expireIn
`ALTER TABLE ${schema}.job ALTER COLUMN expireIn DROP NOT NULL`,
`ALTER TABLE ${schema}.job ALTER COLUMN expireIn DROP DEFAULT`,
// archive table restore
`ALTER TABLE ${schema}.archive DROP COLUMN retryDelay`,
`ALTER TABLE ${schema}.archive DROP COLUMN retryBackoff`,
`ALTER TABLE ${schema}.archive DROP COLUMN startAfter`,
`ALTER TABLE ${schema}.archive ADD COLUMN startIn interval`,
// drop new job name index
`DROP INDEX ${schema}.job_name`,
// roll back to old enum def
`DROP INDEX ${schema}.job_fetch`,
`DROP INDEX ${schema}.job_singletonOn`,
`DROP INDEX ${schema}.job_singletonKeyOn`,
`DROP INDEX ${schema}.job_singletonKey`,
`ALTER TABLE ${schema}.job ALTER COLUMN state DROP DEFAULT`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DATA TYPE text USING state::text`,
`ALTER TABLE ${schema}.archive ALTER COLUMN state SET DATA TYPE text USING state::text`,
`DROP TYPE ${schema}.job_state`,
`CREATE TYPE ${schema}.job_state AS ENUM ('created', 'retry', 'active', 'complete', 'expired', 'cancelled', 'failed')`,
`UPDATE ${schema}.job SET state = 'completed' WHERE state = 'complete'`,
`UPDATE ${schema}.archive SET state = 'complete' WHERE state = 'completed'`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DATA TYPE ${schema}.job_state USING state::${schema}.job_state`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DEFAULT 'created'`,
`ALTER TABLE ${schema}.archive ALTER COLUMN state SET DATA TYPE ${schema}.job_state USING state::${schema}.job_state`,
`CREATE INDEX job_fetch ON ${schema}.job (name, priority desc, createdOn, id) WHERE state < 'active'`,
`CREATE UNIQUE INDEX job_singletonOn ON ${schema}.job (name, singletonOn) WHERE state < 'expired' AND singletonKey IS NULL`,
`CREATE UNIQUE INDEX job_singletonKeyOn ON ${schema}.job (name, singletonOn, singletonKey) WHERE state < 'expired'`,
`CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < 'complete' AND singletonOn IS NULL`
]
},
{
release: '2.5.0',
version: 7,
previous: 6,
install: [
`CREATE TABLE IF NOT EXISTS ${schema}.archive (LIKE ${schema}.job)`,
`ALTER TABLE ${schema}.archive ADD archivedOn timestamptz NOT NULL DEFAULT now()`
],
uninstall: [
`DROP TABLE ${schema}.archive`
]
},
{
release: '2.0.0',
version: 6,
previous: 5,
install: [
`CREATE INDEX job_fetch ON ${schema}.job (priority desc, createdOn, id) WHERE state < 'active'`
],
uninstall: [
`DROP INDEX ${schema}.job_fetch`
]
},
{
release: '2.0.0',
version: 5,
previous: 4,
install: [
`ALTER TABLE ${schema}.job ALTER COLUMN startIn SET DEFAULT (interval '0')`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DEFAULT ('created')`,
`UPDATE ${schema}.job SET name = left(name, -9) || '__state__expired' WHERE name LIKE '%__expired'`
],
uninstall: [
`ALTER TABLE ${schema}.job ALTER COLUMN startIn DROP DEFAULT`,
`ALTER TABLE ${schema}.job ALTER COLUMN state DROP DEFAULT`,
`UPDATE ${schema}.job SET name = left(name, -16) || '__expired' WHERE name LIKE '%__state__expired'`
]
},
{
release: '1.1.0',
version: 4,
previous: 3,
install: [
`ALTER TABLE ${schema}.job ADD COLUMN priority integer not null default(0)`,
`ALTER TABLE ${schema}.job ALTER COLUMN createdOn SET DATA TYPE timestamptz`,
`ALTER TABLE ${schema}.job ALTER COLUMN startedOn SET DATA TYPE timestamptz`,
`ALTER TABLE ${schema}.job ALTER COLUMN completedOn SET DATA TYPE timestamptz`
],
uninstall: [
`ALTER TABLE ${schema}.job DROP COLUMN priority`,
`ALTER TABLE ${schema}.job ALTER COLUMN createdOn SET DATA TYPE timestamp`,
`ALTER TABLE ${schema}.job ALTER COLUMN startedOn SET DATA TYPE timestamp`,
`ALTER TABLE ${schema}.job ALTER COLUMN completedOn SET DATA TYPE timestamp`
]
}
]
}
15 changes: 13 additions & 2 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ module.exports = {
countStates,
deleteQueue,
deleteAllQueues,
clearStorage,
getQueueSize,
states: { ...states },
completedJobPrefix,
advisoryLock,
Expand Down Expand Up @@ -131,11 +133,20 @@ function addIdIndexToArchive (schema) {
}

function deleteQueue (schema) {
return `DELETE FROM ${schema}.job WHERE name = $1`
return `DELETE FROM ${schema}.job WHERE name = $1 and state < '${states.active}'`
}

function deleteAllQueues (schema) {
return `TRUNCATE ${schema}.job`
return `DELETE FROM ${schema}.job WHERE state < '${states.active}'`
}

function clearStorage (schema) {
return `TRUNCATE ${schema}.job, ${schema}.archive`
}

function getQueueSize (schema, options = {}) {
options.before = options.before || states.active
return `SELECT count(*) as count FROM ${schema}.job WHERE name = $1 AND state < '${options.before}'`
}

function createIndexSingletonKey (schema) {
Expand Down
Loading

0 comments on commit 709bb0d

Please sign in to comment.