Skip to content

Commit

Permalink
Merge 4fa58f2 into 9938b1a
Browse files Browse the repository at this point in the history
  • Loading branch information
adamhamlin committed Mar 12, 2023
2 parents 9938b1a + 4fa58f2 commit 5b0195a
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 7 deletions.
15 changes: 13 additions & 2 deletions docs/readme.md
Expand Up @@ -559,7 +559,7 @@ Available in constructor as a default, or overridden in send.

* **singletonKey** string

Only allows 1 job (within the same name) to be queued or active with the same singletonKey.
Allows a max of 1 job (with the same name and singletonKey) to be queued or active.

```js
boss.send('my-job', {}, {singletonKey: '123'}) // resolves a jobId
Expand All @@ -570,7 +570,9 @@ Available in constructor as a default, or overridden in send.

* **useSingletonQueue** boolean

When used in conjunction with singletonKey, only allows 1 job (within the same name) to be queued with the same singletonKey.
When used in conjunction with singletonKey, allows a max of 1 job to be queued.

>By default, there is no limit on the number of these jobs that may be active. However, this behavior may be modified by passing the [enforceSingletonQueueActiveLimit](#fetch) option.
```js
boss.send('my-job', {}, {singletonKey: '123', useSingletonQueue: true}) // resolves a jobId
Expand Down Expand Up @@ -744,6 +746,11 @@ Typically one would use `work()` for automated polling for new jobs based upon a
| oncomplete | bool |
| output | object |

* `enforceSingletonQueueActiveLimit`, bool

If `true`, modifies the behavior of the `useSingletonQueue` flag to allow a max of 1 job to be queued plus a max of 1 job to be active.
>Note that use of this option can impact performance on instances with large numbers of jobs.

**Resolves**
- `[job]`: array of job objects, `null` if none found
Expand Down Expand Up @@ -819,6 +826,10 @@ The default concurrency for `work()` is 1 job every 2 seconds. Both the interval

Same as in [`fetch()`](#fetch)

* **enforceSingletonQueueActiveLimit**, bool

Same as in [`fetch()`](#fetch)

**Polling options**

How often workers will poll the queue table for jobs. Available in the constructor as a default or per worker in `work()` and `onComplete()`.
Expand Down
2 changes: 2 additions & 0 deletions src/attorney.js
Expand Up @@ -129,6 +129,7 @@ function checkWorkArgs (name, args, defaults) {
assert(!('teamSize' in options) || (Number.isInteger(options.teamSize) && options.teamSize >= 1), 'teamSize must be an integer > 0')
assert(!('batchSize' in options) || (Number.isInteger(options.batchSize) && options.batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean')

return { options, callback }
}
Expand All @@ -140,6 +141,7 @@ function checkFetchArgs (name, batchSize, options) {

assert(!batchSize || (Number.isInteger(batchSize) && batchSize >= 1), 'batchSize must be an integer > 0')
assert(!('includeMetadata' in options) || typeof options.includeMetadata === 'boolean', 'includeMetadata must be a boolean')
assert(!('enforceSingletonQueueActiveLimit' in options) || typeof options.enforceSingletonQueueActiveLimit === 'boolean', 'enforceSingletonQueueActiveLimit must be a boolean')

return { name }
}
Expand Down
7 changes: 4 additions & 3 deletions src/manager.js
Expand Up @@ -184,7 +184,8 @@ class Manager extends EventEmitter {
teamSize = 1,
teamConcurrency = 1,
teamRefill: refill = false,
includeMetadata = false
includeMetadata = false,
enforceSingletonQueueActiveLimit = false
} = options

const id = uuid.v4()
Expand All @@ -208,7 +209,7 @@ class Manager extends EventEmitter {
createTeamRefillPromise()
}

const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata })
const fetch = () => this.fetch(name, batchSize || (teamSize - queueSize), { includeMetadata, enforceSingletonQueueActiveLimit })

const onFetch = async (jobs) => {
if (this.config.__test__throw_worker) {
Expand Down Expand Up @@ -475,7 +476,7 @@ class Manager extends EventEmitter {
const values = Attorney.checkFetchArgs(name, batchSize, options)
const db = options.db || this.db
const result = await db.executeSql(
this.nextJobCommand(options.includeMetadata || false),
this.nextJobCommand(options.includeMetadata || false, options.enforceSingletonQueueActiveLimit || false),
[values.name, batchSize || 1]
)

Expand Down
21 changes: 19 additions & 2 deletions src/plans.js
Expand Up @@ -351,13 +351,30 @@ function insertVersion (schema, version) {
}

function fetchNextJob (schema) {
return (includeMetadata) => `
return (includeMetadata, enforceSingletonQueueActiveLimit) => `
WITH nextJob as (
SELECT id
FROM ${schema}.job
FROM ${schema}.job j
WHERE state < '${states.active}'
AND name LIKE $1
AND startAfter < now()
${enforceSingletonQueueActiveLimit
? `AND (
CASE
WHEN singletonKey IS NOT NULL
AND singletonKey LIKE '${SINGLETON_QUEUE_KEY_ESCAPED}%'
THEN NOT EXISTS (
SELECT *
FROM ${schema}.job active_job
WHERE active_job.state = '${states.active}'
AND active_job.name = j.name
AND active_job.singletonKey = j.singletonKey
)
ELSE
true
END
)`
: ''}
ORDER BY priority desc, createdOn, id
LIMIT $2
FOR UPDATE SKIP LOCKED
Expand Down
45 changes: 45 additions & 0 deletions test/fetchTest.js
Expand Up @@ -121,4 +121,49 @@ describe('fetch', function () {
assert(job.startedon === undefined)
assert.strictEqual(calledCounter, 2)
})

describe('enforceSingletonQueueActiveLimit option', function () {
it('when enforceSingletonQueueActiveLimit=false, should fetch singleton queue job even if there is already an active one', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema
const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true }
const sendArgs = [queue, {}, jobOptions]
const fetchArgs = [queue, undefined, { enforceSingletonQueueActiveLimit: false }]

const publish1 = await boss.send(...sendArgs)
assert(publish1)
const fetch1 = await boss.fetch(...fetchArgs)
assert(fetch1)

const publish2 = await boss.send(...sendArgs)
assert(publish2)
const fetch2 = await boss.fetch(...fetchArgs)
assert(fetch2)
})

it('when enforceSingletonQueueActiveLimit=true, should not fetch singleton queue job if there is already an active one', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema
const jobOptions = { singletonKey: 'singleton_queue_active_test', useSingletonQueue: true }
const sendArgs = [queue, {}, jobOptions]
const fetchArgs = [queue, undefined, { enforceSingletonQueueActiveLimit: true }]

const publish1 = await boss.send(...sendArgs)
assert(publish1)
const fetch1 = await boss.fetch(...fetchArgs)
assert(fetch1)

const publish2 = await boss.send(...sendArgs)
assert(publish2)
// Job 1 still active, can't fetch job 2
const fetch2 = await boss.fetch(...fetchArgs)
assert(fetch2 === null)

await boss.complete(fetch1.id)
// Job 1 no longer active, should be able to fetch job 2
const retryFetch2 = await boss.fetch(...fetchArgs)
assert(retryFetch2)
assert(retryFetch2.id === publish2)
})
})
})
2 changes: 2 additions & 0 deletions types.d.ts
Expand Up @@ -113,12 +113,14 @@ declare namespace PgBoss {
teamRefill?: boolean;
batchSize?: number;
includeMetadata?: boolean;
enforceSingletonQueueActiveLimit?: boolean;
}

type WorkOptions = JobFetchOptions & JobPollingOptions

type FetchOptions = {
includeMetadata?: boolean;
enforceSingletonQueueActiveLimit?: boolean;
} & ConnectionOptions;

interface WorkHandler<ReqData, ResData> {
Expand Down

0 comments on commit 5b0195a

Please sign in to comment.