Skip to content

Commit

Permalink
Merge ff2517b into 1bfaa8d
Browse files Browse the repository at this point in the history
  • Loading branch information
ismael-iskauskas committed Jan 20, 2023
2 parents 1bfaa8d + ff2517b commit 2d3dd80
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 6 deletions.
12 changes: 12 additions & 0 deletions docs/readme.md
Expand Up @@ -13,6 +13,7 @@
- [`monitor-states`](#monitor-states)
- [`wip`](#wip)
- [`stopped`](#stopped)
- [`after-purge`](#after-purge)
- [Static functions](#static-functions)
- [`string getConstructionPlans(schema)`](#string-getconstructionplansschema)
- [`string getMigrationPlans(schema, version)`](#string-getmigrationplansschema-version)
Expand Down Expand Up @@ -242,6 +243,17 @@ Emitted at most once every 2 seconds when workers are active and jobs are enteri

Emitted after `stop()` once all workers have completed their work and maintenance has been shut down.

## `after-purge`

Emitted after `purge()` gets called from the maintenance job. Only emitted if any archived job was deleted
```js
const boss = new PgBoss(....)
boss.on('after-purge', (jobIds) => {
console.log(jobIds) // ['fc738fb0-1de5-4947-b138-40d6a790749e']
})

```

# Static functions

The following static functions are not required during normal operations, but are intended to assist in schema creation or migration if run-time privileges do not allow schema changes.
Expand Down
9 changes: 7 additions & 2 deletions src/boss.js
Expand Up @@ -11,7 +11,8 @@ const queues = {
const events = {
error: 'error',
monitorStates: 'monitor-states',
maintenance: 'maintenance'
maintenance: 'maintenance',
afterPurge: 'after-purge'
}

class Boss extends EventEmitter {
Expand Down Expand Up @@ -213,7 +214,11 @@ class Boss extends EventEmitter {
}

async purge () {
await this.executeSql(this.purgeCommand)
const purgedJobs = await this.executeSql(this.purgeCommand)
const deleteResult = purgedJobs.find(r => r.command === 'DELETE')
if (deleteResult && deleteResult.rows && deleteResult.rows.length > 0) {
this.emit(events.afterPurge, deleteResult.rows.map(r => r.id))
}
}

async setMaintenanceTime () {
Expand Down
4 changes: 2 additions & 2 deletions src/plans.js
Expand Up @@ -601,7 +601,7 @@ function insertJobs (schema) {
keepUntil,
on_complete
)
SELECT
SELECT
COALESCE(id, gen_random_uuid()) as id,
name,
data,
Expand Down Expand Up @@ -635,7 +635,7 @@ function insertJobs (schema) {
function purge (schema, interval) {
return `
DELETE FROM ${schema}.archive
WHERE archivedOn < (now() - interval '${interval}')
WHERE archivedOn < (now() - interval '${interval}') RETURNING id
`
}

Expand Down
25 changes: 23 additions & 2 deletions test/deleteTest.js
Expand Up @@ -4,7 +4,8 @@ const delay = require('delay')

describe('delete', async function () {
const defaults = {
deleteAfterSeconds: 1,
deleteAfterSeconds: 2,
archiveCompletedAfterSeconds: 1,
maintenanceIntervalSeconds: 1
}

Expand All @@ -17,13 +18,33 @@ describe('delete', async function () {
const job = await boss.fetch(jobName)

assert.strictEqual(jobId, job.id)

await boss.complete(jobId)
await delay(7000)

const archivedJob = await helper.getArchivedJobById(config.schema, jobId)

assert.strictEqual(archivedJob, null)
})

it('should delete an archived job and trigger after-purge', async function () {
const jobName = 'deleteMe'

const config = { ...this.test.bossConfig, ...defaults }
const boss = this.test.boss = await helper.start(config)
const jobId = await boss.send(jobName)
const job = await boss.fetch(jobName)
const purgePromise = new Promise(resolve => this.test.boss.once('after-purge', (jobIds) => {
return resolve(jobIds)
}))

assert.strictEqual(jobId, job.id)
await boss.complete(jobId)
await delay(7000)

const archivedJob = await helper.getArchivedJobById(config.schema, jobId)

assert.strictEqual(archivedJob, null)
const deletedJobIds = await purgePromise
assert.strictEqual(deletedJobIds.includes(jobId), true)
})
})
3 changes: 3 additions & 0 deletions types.d.ts
Expand Up @@ -283,6 +283,9 @@ declare class PgBoss extends EventEmitter {
on(event: "stopped", handler: () => void): this;
off(event: "stopped", handler: () => void): this;

on(event: "after-purge", handler: (jobIds: string[]) => void): this;
off(event: "after-purge", handler: (jobIds: string[]) => void): this;

start(): Promise<PgBoss>;
stop(options?: PgBoss.StopOptions): Promise<void>;

Expand Down

0 comments on commit 2d3dd80

Please sign in to comment.