From a894f822c2bc9e4970e7c6e3f0b56c4bdf5168a7 Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Tue, 21 Feb 2023 21:54:59 -0500 Subject: [PATCH 1/6] fix: limit 1 active singleton queue job --- docs/readme.md | 4 ++-- src/plans.js | 11 +++++++++++ test/fetchTest.js | 24 ++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/docs/readme.md b/docs/readme.md index 6a61d0b0..1e9eb3e2 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -559,7 +559,7 @@ Available in constructor as a default, or overridden in send. * **singletonKey** string - Only allows 1 job (within the same name) to be queued or active with the same singletonKey. + Allows a max of 1 job (with the same name and singletonKey) to be queued or active. ```js boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId @@ -570,7 +570,7 @@ Available in constructor as a default, or overridden in send. * **useSingletonQueue** boolean - When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey. + When used in conjunction with singletonKey, allows a max of 1 job to be queued and a max of 1 job to be active. ```js boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId diff --git a/src/plans.js b/src/plans.js index bb7b2760..a8b0d031 100644 --- a/src/plans.js +++ b/src/plans.js @@ -358,6 +358,17 @@ function fetchNextJob (schema) { WHERE state < '${states.active}' AND name LIKE $1 AND startAfter < now() + AND NOT ( + singletonKey IS NOT NULL + AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' + AND EXISTS ( + SELECT * + FROM ${schema}.job active_job + WHERE active_job.state = '${states.active}' + AND active_job.name = name + AND active_job.singletonKey = singletonKey + ) + ) ORDER BY priority desc, createdOn, id LIMIT $2 FOR UPDATE SKIP LOCKED diff --git a/test/fetchTest.js b/test/fetchTest.js index d2a9a858..3a25d482 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -121,4 +121,28 @@ describe('fetch', function () { assert(job.startedon === undefined) assert.strictEqual(calledCounter, 2) }) + + it('should not fetch singleton queue job if there is already an active one', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true } + const sendArgs = [queue, {}, jobOptions] + + const publish1 = await boss.send(...sendArgs) + const fetch1 = await boss.fetch(queue) + assert(publish1) + assert(fetch1) + + const publish2 = await boss.send(...sendArgs) + assert(publish2) + // Job 1 still active, can't fetch job 2 + const fetch2 = await boss.fetch(queue) + assert(fetch2 === null) + + await boss.complete(fetch1.id) + // Job 1 no longer active, should be able to fetch + const retryFetch2 = await boss.fetch(queue) + assert(retryFetch2) + assert(retryFetch2.id === publish2) + }) }) From c3824ab83b5f13f075c10260ccabef0278a4e58e Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Sun, 5 Mar 2023 12:55:54 -0500 Subject: [PATCH 2/6] Use case stmt to only conditionally run subquery --- src/plans.js | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/plans.js b/src/plans.js index a8b0d031..be74e888 100644 --- a/src/plans.js +++ b/src/plans.js @@ -358,16 +358,20 @@ function fetchNextJob (schema) { WHERE state < '${states.active}' AND name LIKE $1 AND startAfter < now() - AND NOT ( - singletonKey IS NOT NULL - AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' - AND EXISTS ( - SELECT * - FROM ${schema}.job active_job - WHERE active_job.state = '${states.active}' - AND active_job.name = name - AND active_job.singletonKey = singletonKey - ) + AND ( + CASE + WHEN singletonKey IS NOT NULL + AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' + THEN NOT EXISTS ( + SELECT * + FROM ${schema}.job active_job + WHERE active_job.state = '${states.active}' + AND active_job.name = name + AND active_job.singletonKey = singletonKey + ) + ELSE + true + END ) ORDER BY priority desc, createdOn, id LIMIT $2 From c1b229aa3e3fb5cb88079bc8d136d79a2dfcb73f Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Sun, 5 Mar 2023 17:09:18 -0500 Subject: [PATCH 3/6] Fix flaky test? --- test/fetchTest.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/fetchTest.js b/test/fetchTest.js index 3a25d482..1b2bda9f 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -129,8 +129,8 @@ describe('fetch', function () { const sendArgs = [queue, {}, jobOptions] const publish1 = await boss.send(...sendArgs) - const fetch1 = await boss.fetch(queue) assert(publish1) + const fetch1 = await boss.fetch(queue) assert(fetch1) const publish2 = await boss.send(...sendArgs) From 414866e8d4fe37c057952cb37277d515ca6e844d Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Sun, 5 Mar 2023 17:31:12 -0500 Subject: [PATCH 4/6] Allow startAfter === now() when fetching --- src/plans.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plans.js b/src/plans.js index be74e888..762f0404 100644 --- a/src/plans.js +++ b/src/plans.js @@ -357,7 +357,7 @@ function fetchNextJob (schema) { FROM ${schema}.job WHERE state < '${states.active}' AND name LIKE $1 - AND startAfter < now() + AND startAfter <= now() AND ( CASE WHEN singletonKey IS NOT NULL From 6d31e357492f090d09dde9d71f223c9462b9ffe5 Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Sun, 5 Mar 2023 17:51:45 -0500 Subject: [PATCH 5/6] Revert 'Allow startAfter === now() when fetching' --- src/plans.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plans.js b/src/plans.js index 762f0404..be74e888 100644 --- a/src/plans.js +++ b/src/plans.js @@ -357,7 +357,7 @@ function fetchNextJob (schema) { FROM ${schema}.job WHERE state < '${states.active}' AND name LIKE $1 - AND startAfter <= now() + AND startAfter < now() AND ( CASE WHEN singletonKey IS NOT NULL From 4f94b9217b75c463e50b11915c9606ea06ea47df Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Sun, 5 Mar 2023 18:27:23 -0500 Subject: [PATCH 6/6] Fix correlated subquery qualifiers --- src/plans.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/plans.js b/src/plans.js index be74e888..4ef324f3 100644 --- a/src/plans.js +++ b/src/plans.js @@ -354,7 +354,7 @@ function fetchNextJob (schema) { return (includeMetadata) => ` WITH nextJob as ( SELECT id - FROM ${schema}.job + FROM ${schema}.job j WHERE state < '${states.active}' AND name LIKE $1 AND startAfter < now() @@ -366,8 +366,8 @@ function fetchNextJob (schema) { SELECT * FROM ${schema}.job active_job WHERE active_job.state = '${states.active}' - AND active_job.name = name - AND active_job.singletonKey = singletonKey + AND active_job.name = j.name + AND active_job.singletonKey = j.singletonKey ) ELSE true