Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add deleteQueue option #437

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
- [`notifyWorker(id)`](#notifyworkerid)
- [`getQueueSize(name [, options])`](#getqueuesizename--options)
- [`getJobById(id, options)`](#getjobbyidid-options)
- [`deleteQueue(name)`](#deletequeuename)
- [`deleteQueue(name [, options])`](#deletequeuename)
- [`deleteAllQueues()`](#deleteallqueues)
- [`clearStorage()`](#clearstorage)

Expand Down Expand Up @@ -1085,10 +1085,17 @@ As an example, the following options object include active jobs along with creat

Retrieves a job with all metadata by id in either the primary or archive storage.

## `deleteQueue(name)`
## `deleteQueue(name [, options])`

Deletes all pending jobs in the specified queue from the active job table. All jobs in the archive table are retained.

`options`: Optional, object.

| Prop | Type | Description | Default |
| - | - | - | - |
|`before`| string | delete jobs in states before this state | states.active |


## `deleteAllQueues()`

Deletes all pending jobs from all queues in the active job table. All jobs in the archive table are retained.
Expand Down
40 changes: 40 additions & 0 deletions test/deleteQueueTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,46 @@ describe('deleteQueue', function () {
assert.strictEqual(0, q2Count3)
})

it('should clear a specific queue and state', async function () {
const defaults = {
archiveCompletedAfterSeconds: 1,
maintenanceIntervalSeconds: 1
}
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, ...defaults })

const queue = 'delete-queue-by-state-works'

const jobId = await boss.send(queue)
const job = await boss.fetch(queue)

assert.strictEqual(job.id, jobId)

await boss.complete(jobId)

await delay(3000)

const db = await helper.getDb()

const getJobCount = async table => {
const jobCountResult = await db.executeSql(`SELECT count(*)::int as job_count FROM ${this.test.bossConfig.schema}.${table}`)
return jobCountResult.rows[0].job_count
}

const preJobCount = await getJobCount('job')
const preArchiveCount = await getJobCount('archive')

assert(preJobCount === 1)
assert(preArchiveCount === 1)

await boss.deleteQueue(queue, { before: 'completed' })

const postJobCount = await getJobCount('job')
const postArchiveCount = await getJobCount('archive')

assert(postJobCount === 0)
assert(postArchiveCount > 0) // archive should still have records
})

it('should clear all queues', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)

Expand Down
4 changes: 3 additions & 1 deletion types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ declare namespace PgBoss {

type ScheduleOptions = SendOptions & { tz?: string }

type DeleteQueueOptions = { before?: 'created' | 'retry' | 'active' | 'completed' | 'expired' | 'cancelled' | 'failed' };

interface JobPollingOptions {
newJobCheckInterval?: number;
newJobCheckIntervalSeconds?: number;
Expand Down Expand Up @@ -368,7 +370,7 @@ declare class PgBoss extends EventEmitter {
getQueueSize(name: string, options?: object): Promise<number>;
getJobById(id: string, options?: PgBoss.ConnectionOptions): Promise<PgBoss.JobWithMetadata | null>;

deleteQueue(name: string): Promise<void>;
deleteQueue(name: string, options?: PgBoss.DeleteQueueOptions): Promise<void>;
deleteAllQueues(): Promise<void>;
clearStorage(): Promise<void>;

Expand Down