Skip to content

Commit

Permalink
test and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed May 4, 2021
1 parent 44bf6b7 commit 9c1e4ff
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 26 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
- NEW: Added the `output` jsonb column to storage tables to store result or error data along with the original job, which were previously only available via completion jobs. This has the added benefit of storing any errors or results from completion jobs themselves, which were previously discarded.
- NEW: `getJobById(id)` can now be used to fetch a job from either primary or archive storage by id. This may be helpful if needed to inspect `output` and you have the job id.
- NEW: Added new function, `publishSingleton()`, similar to publishOnce(), but throttles publish to only allow 1 job in the queue at a time, allowing a job to be queued even if 1 or more jobs are currently active.
- CHANGE: `subscribe()` now resolves with a unique worker id that will be visible in `wip` along with additional metadata about the subscription.
- CHANGE: `unsubscribe()` now accepts an object as an argument to allow removing a specific subscription by id. Use `{ worker: 'id' }`.
- CHANGE: `subscribe()` and `onComplete()` now resolve with a unique worker id that will be visible in `wip` along with additional metadata about the subscription.
- CHANGE: `subscribe()` and `onComplete()` now abort promise execution client-side based on the job's expiration.
- CHANGE: `unsubscribe()` and `offComplete()` now accept an object as an argument to allow removing a specific subscription by id.
- MAJOR: The `onComplete` publish option is now defaulted to `false`, which breaks backward compatability for automatic creation of completion jobs. To restore the previous behavior of completion jobs being created by default, you should set `onComplete` to `true` in your constructor options.
- MAJOR: The default retention policy has been reduced from 30 to 14 days. This can still be customized as an option in the constructor.
- MAJOR: Node 10 is EOL. Node 12 is now the minimum supported version.
Expand Down
19 changes: 13 additions & 6 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
- [`subscribe()`](#subscribe)
- [`subscribe(name [, options], handler)`](#subscribename--options-handler)
- [`onComplete(name [, options], handler)`](#oncompletename--options-handler)
- [`unsubscribe(name)`](#unsubscribename)
- [`offComplete(name)`](#offcompletename)
- [`unsubscribe(value)`](#unsubscribevalue)
- [`offComplete(value)`](#offcompletevalue)
- [`fetch()`](#fetch)
- [`fetch(name)`](#fetchname)
- [`fetch(name, batchSize, [, options])`](#fetchname-batchsize--options)
Expand Down Expand Up @@ -495,13 +495,20 @@ The following is an example data object from the job retrieved in the onComplete
}
```

## `unsubscribe(name)`
## `unsubscribe(value)`

Removes a subscription by name and stops polling.
Removes a subscription by name or id and stops polling.

### `offComplete(name)`
** Arguments **
- value: string or object

Same as `unsubscribe()`, but removes an `onComplete()` subscription.
If a string, removes all subscriptions found matching the name. If an object, only the subscription with a matching `id` will be removed.

### `offComplete(value)`

Similar to `unsubscribe()`, but removes an `onComplete()` subscription.

**

## `fetch()`

Expand Down
2 changes: 1 addition & 1 deletion src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Db extends EventEmitter {

async executeSql (text, values) {
if (this.opened) {
return this.pool.query(text, values)
return await this.pool.query(text, values)
}
}
}
Expand Down
36 changes: 20 additions & 16 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ class Manager extends EventEmitter {

async subscribe (name, ...args) {
const { options, callback } = Attorney.checkSubscribeArgs(name, args, this.config)
return this.watch(name, options, callback)
return await this.watch(name, options, callback)
}

async onComplete (name, ...args) {
const { options, callback } = Attorney.checkSubscribeArgs(name, args, this.config)
return this.watch(COMPLETION_JOB_PREFIX + name, options, callback)
return await this.watch(COMPLETION_JOB_PREFIX + name, options, callback)
}

addWorker (worker) {
Expand Down Expand Up @@ -195,12 +195,12 @@ class Manager extends EventEmitter {
assert(value, 'Missing required argument')

const query = (typeof value === 'string')
? { type: 'name', value, filter: i => i.name === value }
: (typeof value === 'object' && value.worker)
? { type: 'worker', value: value.worker, filter: i => i.id === value.worker }
? { filter: i => i.name === value }
: (typeof value === 'object' && value.id)
? { filter: i => i.id === value.id }
: null

assert(query, 'Invalid argument. Expected string or object: { worker: id }')
assert(query, 'Invalid argument. Expected string or object: { id }')

const workers = this.getWorkers().filter(i => query.filter(i) && !i.stopping && !i.stopped)

Expand All @@ -221,13 +221,17 @@ class Manager extends EventEmitter {
}, 1000)
}

async offComplete (name) {
return this.unsubscribe(COMPLETION_JOB_PREFIX + name)
async offComplete (value) {
if (typeof value === 'string') {
value = COMPLETION_JOB_PREFIX + value
}

return await this.unsubscribe(value)
}

async publish (...args) {
const { name, data, options } = Attorney.checkPublishArgs(args, this.config)
return this.createJob(name, data, options)
return await this.createJob(name, data, options)
}

async publishOnce (name, data, options, key) {
Expand All @@ -237,7 +241,7 @@ class Manager extends EventEmitter {

const result = Attorney.checkPublishArgs([name, data, options], this.config)

return this.createJob(result.name, result.data, result.options)
return await this.createJob(result.name, result.data, result.options)
}

async publishSingleton (name, data, options) {
Expand All @@ -247,7 +251,7 @@ class Manager extends EventEmitter {

const result = Attorney.checkPublishArgs([name, data, options], this.config)

return this.createJob(result.name, result.data, result.options)
return await this.createJob(result.name, result.data, result.options)
}

async publishAfter (name, data, options, after) {
Expand All @@ -256,7 +260,7 @@ class Manager extends EventEmitter {

const result = Attorney.checkPublishArgs([name, data, options], this.config)

return this.createJob(result.name, result.data, result.options)
return await this.createJob(result.name, result.data, result.options)
}

async publishThrottled (name, data, options, seconds, key) {
Expand All @@ -267,7 +271,7 @@ class Manager extends EventEmitter {

const result = Attorney.checkPublishArgs([name, data, options], this.config)

return this.createJob(result.name, result.data, result.options)
return await this.createJob(result.name, result.data, result.options)
}

async publishDebounced (name, data, options, seconds, key) {
Expand All @@ -278,7 +282,7 @@ class Manager extends EventEmitter {

const result = Attorney.checkPublishArgs([name, data, options], this.config)

return this.createJob(result.name, result.data, result.options)
return await this.createJob(result.name, result.data, result.options)
}

async createJob (name, data, options, singletonOffset = 0) {
Expand Down Expand Up @@ -332,7 +336,7 @@ class Manager extends EventEmitter {

singletonOffset = singletonSeconds

return this.createJob(name, data, options, singletonOffset)
return await this.createJob(name, data, options, singletonOffset)
}

getDebounceStartAfter (singletonSeconds, clockOffset) {
Expand Down Expand Up @@ -382,7 +386,7 @@ class Manager extends EventEmitter {
}

async fetchCompleted (name, batchSize, options = {}) {
return this.fetch(COMPLETION_JOB_PREFIX + name, batchSize, options)
return await this.fetch(COMPLETION_JOB_PREFIX + name, batchSize, options)
}

mapCompletionIdArg (id, funcName) {
Expand Down
26 changes: 25 additions & 1 deletion test/completeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ describe('complete', function () {

let receivedCount = 0

boss.onComplete(jobName, async job => {
boss.onComplete(jobName, { newJobCheckInterval: 500 }, async job => {
receivedCount++
await boss.offComplete(jobName)
})
Expand All @@ -154,6 +154,30 @@ describe('complete', function () {
assert.strictEqual(receivedCount, 1)
})

it('should unsubscribe an onComplete subscription by id', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true })
const queue = this.test.bossConfig.schema

let receivedCount = 0

await boss.publish(queue)
const job1 = await boss.fetch(queue)
await boss.complete(job1.id)

await boss.publish(queue)
const job2 = await boss.fetch(queue)
await boss.complete(job2.id)

const id = await boss.onComplete(queue, { newJobCheckInterval: 500 }, async () => {
receivedCount++
await boss.offComplete({ id })
})

await delay(2000)

assert.strictEqual(receivedCount, 1)
})

it('should fetch a completed job', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true })

Expand Down
19 changes: 19 additions & 0 deletions test/subscribeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ describe('subscribe', function () {
assert.strictEqual(receivedCount, 1)
})

it('should unsubscribe a subscription by id', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

let receivedCount = 0

await boss.publish(queue)
await boss.publish(queue)

const id = await boss.subscribe(queue, { newJobCheckInterval: 500 }, async () => {
receivedCount++
await boss.unsubscribe({ id })
})

await delay(2000)

assert.strictEqual(receivedCount, 1)
})

it('should handle a batch of jobs via teamSize', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)

Expand Down

0 comments on commit 9c1e4ff

Please sign in to comment.