From a894f822c2bc9e4970e7c6e3f0b56c4bdf5168a7 Mon Sep 17 00:00:00 2001 From: Adam C Hamlin Date: Tue, 21 Feb 2023 21:54:59 -0500 Subject: [PATCH] fix: limit 1 active singleton queue job --- docs/readme.md | 4 ++-- src/plans.js | 11 +++++++++++ test/fetchTest.js | 24 ++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/docs/readme.md b/docs/readme.md index 6a61d0b0..1e9eb3e2 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -559,7 +559,7 @@ Available in constructor as a default, or overridden in send. * **singletonKey** string - Only allows 1 job (within the same name) to be queued or active with the same singletonKey. + Allows a max of 1 job (with the same name and singletonKey) to be queued or active. ```js boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId @@ -570,7 +570,7 @@ Available in constructor as a default, or overridden in send. * **useSingletonQueue** boolean - When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey. + When used in conjunction with singletonKey, allows a max of 1 job to be queued and a max of 1 job to be active. ```js boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId diff --git a/src/plans.js b/src/plans.js index bb7b2760..a8b0d031 100644 --- a/src/plans.js +++ b/src/plans.js @@ -358,6 +358,17 @@ function fetchNextJob (schema) { WHERE state < '${states.active}' AND name LIKE $1 AND startAfter < now() + AND NOT ( + singletonKey IS NOT NULL + AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%' + AND EXISTS ( + SELECT * + FROM ${schema}.job active_job + WHERE active_job.state = '${states.active}' + AND active_job.name = name + AND active_job.singletonKey = singletonKey + ) + ) ORDER BY priority desc, createdOn, id LIMIT $2 FOR UPDATE SKIP LOCKED diff --git a/test/fetchTest.js b/test/fetchTest.js index d2a9a858..3a25d482 100644 --- a/test/fetchTest.js +++ b/test/fetchTest.js @@ -121,4 +121,28 @@ describe('fetch', function () { assert(job.startedon === undefined) assert.strictEqual(calledCounter, 2) }) + + it('should not fetch singleton queue job if there is already an active one', async function () { + const boss = this.test.boss = await helper.start(this.test.bossConfig) + const queue = this.test.bossConfig.schema + const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true } + const sendArgs = [queue, {}, jobOptions] + + const publish1 = await boss.send(...sendArgs) + const fetch1 = await boss.fetch(queue) + assert(publish1) + assert(fetch1) + + const publish2 = await boss.send(...sendArgs) + assert(publish2) + // Job 1 still active, can't fetch job 2 + const fetch2 = await boss.fetch(queue) + assert(fetch2 === null) + + await boss.complete(fetch1.id) + // Job 1 no longer active, should be able to fetch + const retryFetch2 = await boss.fetch(queue) + assert(retryFetch2) + assert(retryFetch2.id === publish2) + }) })