diff --git a/docs/readme.md b/docs/readme.md index b39617ea..74433e67 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -77,7 +77,8 @@ Architecturally, pg-boss is somewhat similar to queue products such as AWS SQS, All jobs start out in the `created` state and become `active` when picked up for work. If job processing completes successfully, jobs will go to `completed`. If a job fails, it will typcially enter the `failed` state. However, if a job has retry options configured, it will enter the `retry` state on failure instead and have a chance to re-enter `active` state. It's also possible for `active` jobs to become `expired`, which happens when job processing takes too long. Jobs can also enter `cancelled` state via [`cancel(id)`](#cancelid) or [`cancel([ids])`](#cancelids). -All jobs that are `completed`, `expired`, `cancelled` or `failed` become eligible for archiving (i.e. they will transition into the `archive` state) after the configured `archiveCompletedAfterSeconds` time. Once `archive`d, jobs will be automatically deleted by pg-boss after the configured deletion period. +All jobs that are `completed`, `expired` or `cancelled` become eligible for archiving (i.e. they will transition into the `archive` state) after the configured `archiveCompletedAfterSeconds` time. The `failed` jobs become eligible for archiving after the configured `archiveFailedAfterSeconds` time. +Once `archive`d, jobs will be automatically deleted by pg-boss after the configured deletion period. Here's a state diagram that shows the possible states and their transitions: @@ -343,6 +344,12 @@ Queue options contain the following constructor-only settings. Default: 12 hours +* **archiveFailedAfterSeconds** + + Specifies how long in seconds failed jobs get archived. Note: a warning will be emitted if set to lower than 60s and cron processing will be disabled. + + Default: 7 days + **Monitoring options** * **monitorStateIntervalSeconds** - int, default undefined diff --git a/src/attorney.js b/src/attorney.js index 6ad64dcb..21e32195 100644 --- a/src/attorney.js +++ b/src/attorney.js @@ -159,6 +159,7 @@ function getConfig (value) { applyDatabaseConfig(config) applyMaintenanceConfig(config) applyArchiveConfig(config) + applyArchiveFailedConfig(config) applyDeleteConfig(config) applyMonitoringConfig(config) applyUuidConfig(config) @@ -195,6 +196,20 @@ function applyArchiveConfig (config) { } } +function applyArchiveFailedConfig (config) { + const ARCHIVE_DEFAULT = 60 * 60 * 24 * 7 + + assert(!('archiveFailedAfterSeconds' in config) || config.archiveFailedAfterSeconds >= 1, + 'configuration assert: archiveFailedAfterSeconds must be at least every second and less than ') + + config.archiveFailedSeconds = config.archiveFailedAfterSeconds || ARCHIVE_DEFAULT + config.archiveFailedInterval = `${config.archiveFailedSeconds} seconds` + + if (config.archiveSeconds < 60) { + emitWarning(WARNINGS.CRON_DISABLED) + } +} + function applyCompletionConfig (config, defaults) { assert(!('onComplete' in config) || config.onComplete === true || config.onComplete === false, 'configuration assert: onComplete must be either true or false') diff --git a/src/boss.js b/src/boss.js index cb85eac0..899f323b 100644 --- a/src/boss.js +++ b/src/boss.js @@ -33,7 +33,7 @@ class Boss extends EventEmitter { this.events = events this.expireCommand = plans.locked(config.schema, plans.expire(config.schema)) - this.archiveCommand = plans.locked(config.schema, plans.archive(config.schema, config.archiveInterval)) + this.archiveCommand = plans.locked(config.schema, plans.archive(config.schema, config.archiveInterval, config.archiveFailedInterval)) this.purgeCommand = plans.locked(config.schema, plans.purge(config.schema, config.deleteAfter)) this.getMaintenanceTimeCommand = plans.getMaintenanceTime(config.schema) this.setMaintenanceTimeCommand = plans.setMaintenanceTime(config.schema) diff --git a/src/plans.js b/src/plans.js index 774c0b50..bb7b2760 100644 --- a/src/plans.js +++ b/src/plans.js @@ -601,7 +601,7 @@ function insertJobs (schema) { keepUntil, on_complete ) - SELECT + SELECT COALESCE(id, gen_random_uuid()) as id, name, data, @@ -639,12 +639,16 @@ function purge (schema, interval) { ` } -function archive (schema, interval) { +function archive (schema, completedInterval, failedInterval = completedInterval) { return ` WITH archived_rows AS ( DELETE FROM ${schema}.job - WHERE - completedOn < (now() - interval '${interval}') + WHERE ( + state <> '${states.failed}' AND completedOn < (now() - interval '${completedInterval}') + ) + OR ( + state = '${states.failed}' AND completedOn < (now() - interval '${failedInterval}') + ) OR ( state < '${states.active}' AND keepUntil < now() ) diff --git a/src/timekeeper.js b/src/timekeeper.js index b9503f3e..a4161646 100644 --- a/src/timekeeper.js +++ b/src/timekeeper.js @@ -45,7 +45,7 @@ class Timekeeper extends EventEmitter { async start () { // setting the archive config too low breaks the cron 60s debounce interval so don't even try - if (this.config.archiveSeconds < 60) { + if (this.config.archiveSeconds < 60 || this.config.archiveFailedSeconds < 60) { return } diff --git a/test/archiveTest.js b/test/archiveTest.js index a43bd411..96700ea9 100644 --- a/test/archiveTest.js +++ b/test/archiveTest.js @@ -1,10 +1,12 @@ const assert = require('assert') const helper = require('./testHelper') const delay = require('delay') +const { states } = require('../src/plans') describe('archive', function () { const defaults = { archiveCompletedAfterSeconds: 1, + archiveFailedAfterSeconds: 10, maintenanceIntervalSeconds: 1 } @@ -80,4 +82,40 @@ describe('archive', function () { assert.strictEqual(jobId, archivedJob.id) assert.strictEqual(queue, archivedJob.name) }) + + it('should not archive a failed job before the config setting', async function () { + const config = { ...this.test.bossConfig, ...defaults } + const boss = this.test.boss = await helper.start(config) + + const queue = 'archive-failed' + + const failPayload = { someReason: 'nuna' } + const jobId = await boss.send(queue, null, { retentionSeconds: 1 }) + + await boss.fail(jobId, failPayload) + await delay(7000) + + const archivedJob = await helper.getArchivedJobById(config.schema, jobId) + + assert.strictEqual(archivedJob, null) + }) + + it('should archive a failed job', async function () { + const config = { ...this.test.bossConfig, ...defaults, archiveFailedAfterSeconds: 1 } + const boss = this.test.boss = await helper.start(config) + + const queue = 'archive-failed' + + const failPayload = { someReason: 'nuna' } + const jobId = await boss.send(queue, null, { retentionSeconds: 1 }) + + await boss.fail(jobId, failPayload) + await delay(7000) + + const archivedJob = await helper.getArchivedJobById(config.schema, jobId) + + assert.strictEqual(jobId, archivedJob.id) + assert.strictEqual(queue, archivedJob.name) + assert.strictEqual(states.failed, archivedJob.state) + }) }) diff --git a/types.d.ts b/types.d.ts index 49546a93..714d74e4 100644 --- a/types.d.ts +++ b/types.d.ts @@ -44,6 +44,7 @@ declare namespace PgBoss { maintenanceIntervalMinutes?: number; archiveCompletedAfterSeconds?: number; + archiveFailedAfterSeconds?: number; } type ConstructorOptions =