Skip to content

Commit

Permalink
Add archiveFailedAfterSeconds option
Browse files Browse the repository at this point in the history
  • Loading branch information
klesgidis committed Jan 28, 2023
1 parent 4b3b46f commit 035a66a
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 7 deletions.
9 changes: 8 additions & 1 deletion docs/readme.md
Expand Up @@ -77,7 +77,8 @@ Architecturally, pg-boss is somewhat similar to queue products such as AWS SQS,

All jobs start out in the `created` state and become `active` when picked up for work. If job processing completes successfully, jobs will go to `completed`. If a job fails, it will typcially enter the `failed` state. However, if a job has retry options configured, it will enter the `retry` state on failure instead and have a chance to re-enter `active` state. It's also possible for `active` jobs to become `expired`, which happens when job processing takes too long. Jobs can also enter `cancelled` state via [`cancel(id)`](#cancelid) or [`cancel([ids])`](#cancelids).

All jobs that are `completed`, `expired`, `cancelled` or `failed` become eligible for archiving (i.e. they will transition into the `archive` state) after the configured `archiveCompletedAfterSeconds` time. Once `archive`d, jobs will be automatically deleted by pg-boss after the configured deletion period.
All jobs that are `completed`, `expired` or `cancelled` become eligible for archiving (i.e. they will transition into the `archive` state) after the configured `archiveCompletedAfterSeconds` time. The `failed` jobs become eligible for archiving after the configured `archiveFailedAfterSeconds` time.
Once `archive`d, jobs will be automatically deleted by pg-boss after the configured deletion period.

Here's a state diagram that shows the possible states and their transitions:

Expand Down Expand Up @@ -343,6 +344,12 @@ Queue options contain the following constructor-only settings.

Default: 12 hours

* **archiveFailedAfterSeconds**

Specifies how long in seconds failed jobs get archived. Note: a warning will be emitted if set to lower than 60s and cron processing will be disabled.

Default: 7 days

**Monitoring options**

* **monitorStateIntervalSeconds** - int, default undefined
Expand Down
15 changes: 15 additions & 0 deletions src/attorney.js
Expand Up @@ -159,6 +159,7 @@ function getConfig (value) {
applyDatabaseConfig(config)
applyMaintenanceConfig(config)
applyArchiveConfig(config)
applyArchiveFailedConfig(config)
applyDeleteConfig(config)
applyMonitoringConfig(config)
applyUuidConfig(config)
Expand Down Expand Up @@ -195,6 +196,20 @@ function applyArchiveConfig (config) {
}
}

function applyArchiveFailedConfig (config) {
const ARCHIVE_DEFAULT = 60 * 60 * 24 * 7

assert(!('archiveFailedAfterSeconds' in config) || config.archiveFailedAfterSeconds >= 1,
'configuration assert: archiveFailedAfterSeconds must be at least every second and less than ')

config.archiveFailedSeconds = config.archiveFailedAfterSeconds || ARCHIVE_DEFAULT
config.archiveFailedInterval = `${config.archiveFailedSeconds} seconds`

if (config.archiveSeconds < 60) {
emitWarning(WARNINGS.CRON_DISABLED)
}
}

function applyCompletionConfig (config, defaults) {
assert(!('onComplete' in config) || config.onComplete === true || config.onComplete === false,
'configuration assert: onComplete must be either true or false')
Expand Down
2 changes: 1 addition & 1 deletion src/boss.js
Expand Up @@ -33,7 +33,7 @@ class Boss extends EventEmitter {
this.events = events

this.expireCommand = plans.locked(config.schema, plans.expire(config.schema))
this.archiveCommand = plans.locked(config.schema, plans.archive(config.schema, config.archiveInterval))
this.archiveCommand = plans.locked(config.schema, plans.archive(config.schema, config.archiveInterval, config.archiveFailedInterval))
this.purgeCommand = plans.locked(config.schema, plans.purge(config.schema, config.deleteAfter))
this.getMaintenanceTimeCommand = plans.getMaintenanceTime(config.schema)
this.setMaintenanceTimeCommand = plans.setMaintenanceTime(config.schema)
Expand Down
12 changes: 8 additions & 4 deletions src/plans.js
Expand Up @@ -601,7 +601,7 @@ function insertJobs (schema) {
keepUntil,
on_complete
)
SELECT
SELECT
COALESCE(id, gen_random_uuid()) as id,
name,
data,
Expand Down Expand Up @@ -639,12 +639,16 @@ function purge (schema, interval) {
`
}

function archive (schema, interval) {
function archive (schema, completedInterval, failedInterval = completedInterval) {
return `
WITH archived_rows AS (
DELETE FROM ${schema}.job
WHERE
completedOn < (now() - interval '${interval}')
WHERE (
state <> '${states.failed}' AND completedOn < (now() - interval '${completedInterval}')
)
OR (
state = '${states.failed}' AND completedOn < (now() - interval '${failedInterval}')
)
OR (
state < '${states.active}' AND keepUntil < now()
)
Expand Down
2 changes: 1 addition & 1 deletion src/timekeeper.js
Expand Up @@ -45,7 +45,7 @@ class Timekeeper extends EventEmitter {

async start () {
// setting the archive config too low breaks the cron 60s debounce interval so don't even try
if (this.config.archiveSeconds < 60) {
if (this.config.archiveSeconds < 60 || this.config.archiveFailedSeconds < 60) {
return
}

Expand Down
38 changes: 38 additions & 0 deletions test/archiveTest.js
@@ -1,10 +1,12 @@
const assert = require('assert')
const helper = require('./testHelper')
const delay = require('delay')
const { states } = require('../src/plans')

describe('archive', function () {
const defaults = {
archiveCompletedAfterSeconds: 1,
archiveFailedAfterSeconds: 10,
maintenanceIntervalSeconds: 1
}

Expand Down Expand Up @@ -80,4 +82,40 @@ describe('archive', function () {
assert.strictEqual(jobId, archivedJob.id)
assert.strictEqual(queue, archivedJob.name)
})

it('should not archive a failed job before the config setting', async function () {
const config = { ...this.test.bossConfig, ...defaults }
const boss = this.test.boss = await helper.start(config)

const queue = 'archive-failed'

const failPayload = { someReason: 'nuna' }
const jobId = await boss.send(queue, null, { retentionSeconds: 1 })

await boss.fail(jobId, failPayload)
await delay(7000)

const archivedJob = await helper.getArchivedJobById(config.schema, jobId)

assert.strictEqual(archivedJob, null)
})

it('should archive a failed job', async function () {
const config = { ...this.test.bossConfig, ...defaults, archiveFailedAfterSeconds: 1 }
const boss = this.test.boss = await helper.start(config)

const queue = 'archive-failed'

const failPayload = { someReason: 'nuna' }
const jobId = await boss.send(queue, null, { retentionSeconds: 1 })

await boss.fail(jobId, failPayload)
await delay(7000)

const archivedJob = await helper.getArchivedJobById(config.schema, jobId)

assert.strictEqual(jobId, archivedJob.id)
assert.strictEqual(queue, archivedJob.name)
assert.strictEqual(states.failed, archivedJob.state)
})
})
1 change: 1 addition & 0 deletions types.d.ts
Expand Up @@ -44,6 +44,7 @@ declare namespace PgBoss {
maintenanceIntervalMinutes?: number;

archiveCompletedAfterSeconds?: number;
archiveFailedAfterSeconds?: number;
}

type ConstructorOptions =
Expand Down

0 comments on commit 035a66a

Please sign in to comment.