From 4fa58f286c1abd8a65e173a76b58c43e03dc9781 Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Sun, 12 Mar 2023 15:21:02 -0400 Subject: [PATCH] add enforceSingletonQueueActiveLimit fetch option --- docs/readme.md | 15 +++++++++++++-- src/attorney.js | 2 ++ src/manager.js | 7 ++++--- src/plans.js | 21 +++++++++++++++++++-- test/fetchTest.js | 45 +++++++++++++++++++++++++++++++++++++++++++++ types.d.ts | 2 ++ 6 files changed, 85 insertions(+), 7 deletions(-) diff --git a/docs/readme.md b/docs/readme.md index 6a61d0b0..b2ce710a 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -559,7 +559,7 @@ Available in constructor as a default, or overridden in send. * **singletonKey** string - Only allows 1 job (within the same name) to be queued or active with the same singletonKey. + Allows a max of 1 job (with the same name and singletonKey) to be queued or active. ```js boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId @@ -570,7 +570,9 @@ Available in constructor as a default, or overridden in send. * **useSingletonQueue** boolean - When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey. + When used in conjunction with singletonKey, allows a max of 1 job to be queued. + + >By default, there is no limit on the number of these jobs that may be active. However, this behavior may be modified by passing the [enforceSingletonQueueActiveLimit](#fetch) option. ```js boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId @@ -744,6 +746,11 @@ Typically one would use `work()` for automated polling for new jobs based upon a | oncomplete | bool | | output | object | + * `enforceSingletonQueueActiveLimit`, bool + + If `true`, modifies the behavior of the `useSingletonQueue` flag to allow a max of 1 job to be queued plus a max of 1 job to be active. + >Note that use of this option can impact performance on instances with large numbers of jobs. + **Resolves** - `[job]`: array of job objects, `null` if none found @@ -819,6 +826,10 @@ The default concurrency for `work()` is 1 job every 2 seconds. Both the interval Same as in [`fetch()`](#fetch) +* **enforceSingletonQueueActiveLimit**, bool + + Same as in [`fetch()`](#fetch) + **Polling options** How often workers will poll the queue table for jobs. Available in the constructor as a default or per worker in `work()` and `onComplete()`. diff --git a/src/attorney.js b/src/attorney.js index 0ca79c02..c60ce266 100644 --- a/src/attorney.js +++ b/src/attorney.js @@ -129,6 +129,7 @@ function checkWorkArgs (name, args, defaults) { assert(!('teamSize' in options) || (Number.isInteger(options.teamSize) && options.teamSize >= 1), 'teamSize must be an integer > 0') assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0') assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') + assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean') return { options, callback } } @@ -140,6 +141,7 @@ function checkFetchArgs (name, batchSize, options) { assert(!batchSize || (Number.isInteger(batchSize) && batchSize >= 1), 'batchSize must be an integer > 0') assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean') + assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean') return { name } } diff --git a/src/manager.js b/src/manager.js index d351e486..a18d66b4 100644 --- a/src/manager.js +++ b/src/manager.js @@ -184,7 +184,8 @@ class Manager extends EventEmitter { teamSize = 1, teamConcurrency = 1, teamRefill: refill = false, - includeMetadata = false + includeMetadata = false, + enforceSingletonQueueActiveLimit = false } = options const id = uuid.v4() @@ -208,7 +209,7 @@ class Manager extends EventEmitter { createTeamRefillPromise() } - const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata }) + const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, enforceSingletonQueueActiveLimit }) const onFetch = async (jobs) => { if (this.config.__test__throw_worker) { @@ -475,7 +476,7 @@ class Manager extends EventEmitter { const values = Attorney.checkFetchArgs(name, batchSize, options) const db = options.db || this.db const result = await db.executeSql( - this.nextJobCommand(options.includeMetadata || false), + this.nextJobCommand(options.includeMetadata || false, options.enforceSingletonQueueActiveLimit || false), [values.name, batchSize || 1] ) diff --git a/src/plans.js b/src/plans.js index bb7b2760..6cb3bac3 100644 --- a/src/plans.js +++ b/src/plans.js @@ -351,13 +351,30 @@ function insertVersion (schema, version) { } function fetchNextJob (schema) { - return (includeMetadata) => ` + return (includeMetadata, enforceSingletonQueueActiveLimit) => ` WITH nextJob as ( SELECT id - FROM ${schema}.job + FROM ${schema}.job j WHERE state < '${states.active}' AND name LIKE $1 AND startAfter < now() + ${enforceSingletonQueueActiveLimit + ? `AND ( + CASE + WHEN singletonKey IS NOT NULL + AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' + THEN NOT EXISTS ( + SELECT * + FROM ${schema}.job active_job + WHERE active_job.state = '${states.active}' + AND active_job.name = j.name + AND active_job.singletonKey = j.singletonKey + ) + ELSE + true + END + )` + : ''} ORDER BY priority desc, createdOn, id LIMIT $2 FOR UPDATE SKIP LOCKED diff --git a/test/fetchTest.js b/test/fetchTest.js index d2a9a858..092230c4 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -121,4 +121,49 @@ describe('fetch', function () { assert(job.startedon === undefined) assert.strictEqual(calledCounter, 2) }) + + describe('enforceSingletonQueueActiveLimit option', function () { + it('when enforceSingletonQueueActiveLimit=false, should fetch singleton queue job even if there is already an active one', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true } + const sendArgs = [queue, {}, jobOptions] + const fetchArgs = [queue, undefined, { enforceSingletonQueueActiveLimit: false }] + + const publish1 = await boss.send(...sendArgs) + assert(publish1) + const fetch1 = await boss.fetch(...fetchArgs) + assert(fetch1) + + const publish2 = await boss.send(...sendArgs) + assert(publish2) + const fetch2 = await boss.fetch(...fetchArgs) + assert(fetch2) + }) + + it('when enforceSingletonQueueActiveLimit=true, should not fetch singleton queue job if there is already an active one', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true } + const sendArgs = [queue, {}, jobOptions] + const fetchArgs = [queue, undefined, { enforceSingletonQueueActiveLimit: true }] + + const publish1 = await boss.send(...sendArgs) + assert(publish1) + const fetch1 = await boss.fetch(...fetchArgs) + assert(fetch1) + + const publish2 = await boss.send(...sendArgs) + assert(publish2) + // Job 1 still active, can't fetch job 2 + const fetch2 = await boss.fetch(...fetchArgs) + assert(fetch2 === null) + + await boss.complete(fetch1.id) + // Job 1 no longer active, should be able to fetch job 2 + const retryFetch2 = await boss.fetch(...fetchArgs) + assert(retryFetch2) + assert(retryFetch2.id === publish2) + }) + }) }) diff --git a/types.d.ts b/types.d.ts index 714d74e4..a57a2c71 100644 --- a/types.d.ts +++ b/types.d.ts @@ -113,12 +113,14 @@ declare namespace PgBoss { teamRefill?: boolean; batchSize?: number; includeMetadata?: boolean; + enforceSingletonQueueActiveLimit?: boolean; } type WorkOptions = JobFetchOptions & JobPollingOptions type FetchOptions = { includeMetadata?: boolean; + enforceSingletonQueueActiveLimit?: boolean; } & ConnectionOptions; interface WorkHandler {