Skip to content

Commit

Permalink
Merge 89860a4 into cf8bc92
Browse files Browse the repository at this point in the history
  • Loading branch information
darky committed Apr 9, 2023
2 parents cf8bc92 + 89860a4 commit 0a61ac5
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 16 deletions.
7 changes: 7 additions & 0 deletions docs/readme.md
Expand Up @@ -414,6 +414,13 @@ How often maintenance operations are run against the job and archive tables.

> When a higher unit is is specified, lower unit configuration settings are ignored.
**Polling options**

How often workers will poll the queue table for jobs

* **useNotify**, boolean

Default: false. Use built-in Postgres LISTEN/NOTIFY for reducing polling latency

## `start()`

Expand Down
55 changes: 49 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -12,6 +12,7 @@
"lodash.debounce": "^4.0.8",
"p-map": "^4.0.0",
"pg": "^8.5.1",
"pg-listen": "^1.7.0",
"serialize-error": "^8.1.0",
"uuid": "^9.0.0"
},
Expand Down
8 changes: 8 additions & 0 deletions src/db.js
@@ -1,5 +1,6 @@
const EventEmitter = require('events')
const pg = require('pg')
const createSubscriber = require('pg-listen')

class Db extends EventEmitter {
constructor (config) {
Expand All @@ -8,18 +9,25 @@ class Db extends EventEmitter {
config.application_name = config.application_name || 'pgboss'

this.config = config
this.notifier = null
}

async open () {
this.pool = new pg.Pool(this.config)
this.pool.on('error', error => this.emit('error', error))
if (this.config.useNotify) {
this.notifier = createSubscriber(this.config)
this.notifier.events.on('error', error => this.emit('error', error))
await this.notifier.connect()
}
this.opened = true
}

async close () {
if (!this.pool.ending) {
this.opened = false
await this.pool.end()
this.notifier?.close()
}
}

Expand Down
29 changes: 21 additions & 8 deletions src/manager.js
Expand Up @@ -233,6 +233,7 @@ class Manager extends EventEmitter {
.then(result => this.complete(job.id, result))
.catch(err => this.fail(job.id, err))
.then(() => refill ? onRefill() : null)
.then(() => !name.startsWith(COMPLETION_JOB_PREFIX) && this.db.notifier?.notify(`pgboss-${COMPLETION_JOB_PREFIX}${name}`, {}))
, { concurrency: teamConcurrency }
).catch(() => {}) // allow promises & non-promises to live together in harmony

Expand All @@ -254,7 +255,7 @@ class Manager extends EventEmitter {
this.emit(events.error, { ...error, message: error.message, stack: error.stack, queue: name, worker: id })
}

const worker = new Worker({ id, name, options, interval, fetch, onFetch, onError })
const worker = new Worker({ id, name, options, interval, fetch, onFetch, onError, notifier: this.db.notifier })

this.addWorker(worker)

Expand All @@ -281,7 +282,7 @@ class Manager extends EventEmitter {
}

for (const worker of workers) {
worker.stop()
await worker.stop()
}

setImmediate(async () => {
Expand Down Expand Up @@ -337,7 +338,9 @@ class Manager extends EventEmitter {

async send (...args) {
const { name, data, options } = Attorney.checkSendArgs(args, this.config)
return await this.createJob(name, data, options)
const resp = await this.createJob(name, data, options)
await this.db.notifier?.notify(`pgboss-${name}`, {})
return resp
}

async sendOnce (name, data, options, key) {
Expand All @@ -347,7 +350,9 @@ class Manager extends EventEmitter {

const result = Attorney.checkSendArgs([name, data, options], this.config)

return await this.createJob(result.name, result.data, result.options)
const resp = await this.createJob(result.name, result.data, result.options)
await this.db.notifier?.notify(`pgboss-${name}`, {})
return resp
}

async sendSingleton (name, data, options) {
Expand All @@ -357,7 +362,9 @@ class Manager extends EventEmitter {

const result = Attorney.checkSendArgs([name, data, options], this.config)

return await this.createJob(result.name, result.data, result.options)
const resp = await this.createJob(result.name, result.data, result.options)
await this.db.notifier?.notify(`pgboss-${name}`, {})
return resp
}

async sendAfter (name, data, options, after) {
Expand All @@ -366,7 +373,9 @@ class Manager extends EventEmitter {

const result = Attorney.checkSendArgs([name, data, options], this.config)

return await this.createJob(result.name, result.data, result.options)
const resp = await this.createJob(result.name, result.data, result.options)
await this.db.notifier?.notify(`pgboss-${name}`, {})
return resp
}

async sendThrottled (name, data, options, seconds, key) {
Expand All @@ -377,7 +386,9 @@ class Manager extends EventEmitter {

const result = Attorney.checkSendArgs([name, data, options], this.config)

return await this.createJob(result.name, result.data, result.options)
const resp = await this.createJob(result.name, result.data, result.options)
await this.db.notifier?.notify(`pgboss-${name}`, {})
return resp
}

async sendDebounced (name, data, options, seconds, key) {
Expand All @@ -388,7 +399,9 @@ class Manager extends EventEmitter {

const result = Attorney.checkSendArgs([name, data, options], this.config)

return await this.createJob(result.name, result.data, result.options)
const resp = await this.createJob(result.name, result.data, result.options)
await this.db.notifier?.notify(`pgboss-${name}`, {})
return resp
}

async createJob (name, data, options, singletonOffset = 0) {
Expand Down
16 changes: 14 additions & 2 deletions src/worker.js
Expand Up @@ -8,14 +8,16 @@ const WORKER_STATES = {
}

class Worker {
constructor ({ id, name, options, interval, fetch, onFetch, onError }) {
constructor ({ id, name, options, interval, fetch, onFetch, onError, notifier }) {
this.id = id
this.name = name
this.options = options
this.fetch = fetch
this.onFetch = onFetch
this.onError = onError
this.interval = interval
this.notifier = notifier
this.notifierListener = () => this.notify()
this.jobs = []
this.createdOn = Date.now()
this.lastFetchedOn = null
Expand All @@ -41,6 +43,11 @@ class Worker {
async start () {
this.state = WORKER_STATES.active

if (this.notifier) {
await this.notifier.listenTo(`pgboss-${this.name}`)
this.notifier.notifications.on(`pgboss-${this.name}`, this.notifierListener)
}

while (!this.stopping) {
const started = Date.now()

Expand Down Expand Up @@ -86,10 +93,15 @@ class Worker {
this.state = WORKER_STATES.stopped
}

stop () {
async stop () {
this.stopping = true
this.state = WORKER_STATES.stopping

if (this.notifier) {
await this.notifier.unlisten(`pgboss-${this.name}`)
this.notifier.notifications.off(`pgboss-${this.name}`, this.notifierListener)
}

if (this.loopDelayPromise) {
this.loopDelayPromise.clear()
}
Expand Down
16 changes: 16 additions & 0 deletions test/completeTest.js
Expand Up @@ -335,4 +335,20 @@ describe('complete', function () {
assert.strictEqual(batchSize, completed.length)
assert.strictEqual(called, true)
})

it('should honor onComplete when useNotify in config', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true, useNotify: true })
const queue = this.test.bossConfig.schema

let completeCount = 0
const newJobCheckIntervalSeconds = 3

await boss.work(queue, { newJobCheckIntervalSeconds }, () => {})
await boss.onComplete(queue, { newJobCheckIntervalSeconds }, () => completeCount++)

await boss.send(queue)
await delay(100)

assert.strictEqual(completeCount, 1)
})
})
18 changes: 18 additions & 0 deletions test/workTest.js
Expand Up @@ -88,6 +88,24 @@ describe('work', function () {
assert.strictEqual(processCount, 2)
})

it('should honor when useNotify in config', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, useNotify: true })
const queue = this.test.bossConfig.schema

let processCount = 0
const newJobCheckIntervalSeconds = 5

await boss.send(queue)

await boss.work(queue, { newJobCheckIntervalSeconds }, () => processCount++)
await delay(100)
assert.strictEqual(processCount, 1)
await boss.send(queue)

await delay(100)
assert.strictEqual(processCount, 2)
})

it('should remove a worker', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema
Expand Down
1 change: 1 addition & 0 deletions types.d.ts
Expand Up @@ -105,6 +105,7 @@ declare namespace PgBoss {
interface JobPollingOptions {
newJobCheckInterval?: number;
newJobCheckIntervalSeconds?: number;
useNotify?: boolean;
}

interface JobFetchOptions {
Expand Down

0 comments on commit 0a61ac5

Please sign in to comment.