Skip to content

Commit

Permalink
Merge 4f94b92 into 9938b1a
Browse files Browse the repository at this point in the history
  • Loading branch information
adamhamlin committed Mar 5, 2023
2 parents 9938b1a + 4f94b92 commit cd5caed
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
4 changes: 2 additions & 2 deletions docs/readme.md
Expand Up @@ -559,7 +559,7 @@ Available in constructor as a default, or overridden in send.

* **singletonKey** string

Only allows 1 job (within the same name) to be queued or active with the same singletonKey.
Allows a max of 1 job (with the same name and singletonKey) to be queued or active.

```js
boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId
Expand All @@ -570,7 +570,7 @@ Available in constructor as a default, or overridden in send.

* **useSingletonQueue** boolean

When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey.
When used in conjunction with singletonKey, allows a max of 1 job to be queued and a max of 1 job to be active.

```js
boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId
Expand Down
17 changes: 16 additions & 1 deletion src/plans.js
Expand Up @@ -354,10 +354,25 @@ function fetchNextJob (schema) {
return (includeMetadata) => `
WITH nextJob as (
SELECT id
FROM ${schema}.job
FROM ${schema}.job j
WHERE state < '${states.active}'
AND name LIKE $1
AND startAfter < now()
AND (
CASE
WHEN singletonKey IS NOT NULL
AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%'
THEN NOT EXISTS (
SELECT *
FROM ${schema}.job active_job
WHERE active_job.state = '${states.active}'
AND active_job.name = j.name
AND active_job.singletonKey = j.singletonKey
)
ELSE
true
END
)
ORDER BY priority desc, createdOn, id
LIMIT $2
FOR UPDATE SKIP LOCKED
Expand Down
24 changes: 24 additions & 0 deletions test/fetchTest.js
Expand Up @@ -121,4 +121,28 @@ describe('fetch', function () {
assert(job.startedon === undefined)
assert.strictEqual(calledCounter, 2)
})

it('should not fetch singleton queue job if there is already an active one', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema
const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true }
const sendArgs = [queue, {}, jobOptions]

const publish1 = await boss.send(...sendArgs)
assert(publish1)
const fetch1 = await boss.fetch(queue)
assert(fetch1)

const publish2 = await boss.send(...sendArgs)
assert(publish2)
// Job 1 still active, can't fetch job 2
const fetch2 = await boss.fetch(queue)
assert(fetch2 === null)

await boss.complete(fetch1.id)
// Job 1 no longer active, should be able to fetch
const retryFetch2 = await boss.fetch(queue)
assert(retryFetch2)
assert(retryFetch2.id === publish2)
})
})

0 comments on commit cd5caed

Please sign in to comment.