Skip to content

Commit

Permalink
fixing promise handling with subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Dec 20, 2019
1 parent 29315c9 commit e9bdeb2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 20 deletions.
8 changes: 3 additions & 5 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,16 @@ class Boss extends EventEmitter {
}

async countStates () {
const stateCountDefault = Object.assign({}, plans.states)
const stateCountDefault = { ...plans.states }

Object.keys(stateCountDefault)
.forEach(key => { stateCountDefault[key] = 0 })

const defaultAcc = Object.assign({}, stateCountDefault, { queues: {} })

const counts = await this.db.executeSql(this.countStatesCommand)

const states = counts.rows.reduce((acc, item) => {
if (item.name) {
acc.queues[item.name] = acc.queues[item.name] || Object.assign({}, stateCountDefault)
acc.queues[item.name] = acc.queues[item.name] || { ...stateCountDefault }
}

const queue = item.name ? acc.queues[item.name] : acc
Expand All @@ -83,7 +81,7 @@ class Boss extends EventEmitter {
queue[state] = parseFloat(item.size)

return acc
}, defaultAcc)
}, { ...stateCountDefault, queues: {} })

this.emit(events.monitorStates, states)

Expand Down
16 changes: 7 additions & 9 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,22 +105,20 @@ class Manager extends EventEmitter {
const concurrency = options.teamConcurrency || 1

// either no option was set, or teamSize was used
return Promise.map(jobs, async job => {
try {
const value = await callback(job)
await this.complete(job.id, value)
} catch (err) {
await this.fail(job.id, err)
}
}, { concurrency })
return Promise.map(jobs, job =>
callback(job)
.then(value => this.complete(job.id, value))
.catch(err => this.fail(job.id, err))
, { concurrency }
).catch(() => {}) // allow promises & non-promises to live together in harmony
}

const onError = error => this.emit(events.error, error)

const workerConfig = {
name,
fetch: () => this.fetch(name, options.batchSize || options.teamSize || 1),
onFetch: jobs => sendItBruh(jobs).catch(() => {}), // just send it, bruh
onFetch: jobs => sendItBruh(jobs),
onError,
interval: options.newJobCheckInterval
}
Expand Down
2 changes: 1 addition & 1 deletion src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module.exports = {
countStates,
deleteQueue,
deleteAllQueues,
states: Object.assign({}, states),
states: { ...states },
completedJobPrefix
}

Expand Down
19 changes: 14 additions & 5 deletions test/failureTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,29 @@ describe('failure', function () {
it('subscribe failure via done() should pass error payload to failed job', function (finished) {
const queue = 'fetchFailureWithPayload'
const errorMessage = 'mah error'
const error = new Error(errorMessage)

boss.publish(queue)
.then(() => boss.subscribe(queue, job => {
handler()
test()

async function test () {
await boss.publish(queue)

boss.subscribe(queue, job => {
const error = new Error(errorMessage)

handler().catch(err => finished(err))

async function handler () {
await job.done(error)

const failedJob = await boss.fetchCompleted(queue)

assert.strictEqual(failedJob.data.state, 'failed')
assert.strictEqual(failedJob.data.response.message, errorMessage)

finished()
}
}))
})
}
})

it('subscribe failure via Promise reject() should pass string wrapped in value prop', async function () {
Expand Down

0 comments on commit e9bdeb2

Please sign in to comment.