Skip to content

Commit

Permalink
Merge fc5a2d0 into d32832d
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Jun 16, 2021
2 parents d32832d + fc5a2d0 commit d7208d4
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 128 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,11 @@
# Changes

## 6.1.0

- Existing schemas can now be used via the `schema` property in the constructor.
- Fixed expiration rejection in subscriptions when the pg driver wasn't returning an interval object.
- Removed setInterval() in `unsubscribe()` causing process to hang

## 6.0.1

- Typescript types updated for `stop()`. PR from @stnwk
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "6.0.1",
"version": "6.1.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
Expand Down
56 changes: 26 additions & 30 deletions src/index.js
Expand Up @@ -131,49 +131,45 @@ class PgBoss extends EventEmitter {
await this.manager.stop()
await this.timekeeper.stop()

let polling = false

const shutdown = async () => {
try {
await this.boss.stop()
if (this.db.isOurs) {
await this.db.close()
}

if (this.db.isOurs) {
await this.db.close()
}
} catch (err) {
if (polling) {
this.emit(events.error, err)
} else {
throw err
}
} finally {
this.stopped = true
this.stoppingOn = null
this.stopped = true
this.stoppingOn = null

this.emit(events.stopped)
}
this.emit(events.stopped)
}

if (!graceful) {
return await shutdown()
await this.boss.stop()
await shutdown()
return
}

if (this.manager.getWipData().length === 0) {
return await shutdown()
}
setImmediate(async () => {
let closing = false

polling = true
try {
while (Date.now() - this.stoppingOn < timeout) {
if (this.manager.getWipData({ includeInternal: closing }).length === 0) {
if (closing) {
break
}

setImmediate(async () => {
while (Date.now() - this.stoppingOn < timeout) {
await delay(1000)
closing = true

await this.boss.stop()
}

if (this.manager.getWipData().length === 0) {
return await shutdown()
await delay(1000)
}
}

await shutdown()
await shutdown()
} catch (err) {
this.emit(events.error, err)
}
})
}
}
Expand Down
51 changes: 30 additions & 21 deletions src/manager.js
Expand Up @@ -111,7 +111,9 @@ class Manager extends EventEmitter {
}
}

getWipData () {
getWipData (options = {}) {
const { includeInternal = false } = options

const data = this.getWorkers()
.map(({
id,
Expand All @@ -138,7 +140,7 @@ class Manager extends EventEmitter {
lastError,
lastErrorOn
}))
.filter(i => i.count > 0 && !INTERNAL_QUEUES[i.name])
.filter(i => i.count > 0 && (!INTERNAL_QUEUES[i.name] || includeInternal))

return data
}
Expand All @@ -165,24 +167,32 @@ class Manager extends EventEmitter {
throw new Error('__test__throw_subscription')
}

const expirationRace = (promise, timeout) => Promise.race([
promise,
delay.reject(timeout, { value: new Error(`handler execution exceeded ${timeout}ms`) })
])
const resolveWithinSeconds = async (promise, seconds) => {
const timeout = Math.max(1, seconds) * 1000
const reject = delay.reject(timeout, { value: new Error(`handler execution exceeded ${timeout}ms`) })

const result = await Promise.race([promise, reject])

try {
reject.clear()
} catch {}

return result
}

this.emitWip(name)

let result

if (batchSize) {
const maxTimeout = jobs.reduce((acc, i) => Math.max(acc, plans.intervalToMs(i.expirein)), 0)
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expirein), 0)

// Failing will fail all fetched jobs
result = await expirationRace(Promise.all([callback(jobs)]), maxTimeout)
result = await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration)
.catch(err => this.fail(jobs.map(job => job.id), err))
} else {
result = await pMap(jobs, job =>
expirationRace(callback(job), plans.intervalToMs(job.expirein))
resolveWithinSeconds(callback(job), job.expirein)
.then(result => this.complete(job.id, result))
.catch(err => this.fail(job.id, err))
, { concurrency: teamConcurrency }
Expand Down Expand Up @@ -228,13 +238,15 @@ class Manager extends EventEmitter {
worker.stop()
}

setInterval(() => {
if (workers.every(w => w.stopped)) {
for (const worker of workers) {
this.removeWorker(worker)
}
setImmediate(async () => {
while (!workers.every(w => w.stopped)) {
await delay(1000)
}

for (const worker of workers) {
this.removeWorker(worker)
}
}, 1000)
})
}

async offComplete (value) {
Expand Down Expand Up @@ -374,12 +386,13 @@ class Manager extends EventEmitter {

async fetch (name, batchSize, options = {}) {
const values = Attorney.checkFetchArgs(name, batchSize, options)

const result = await this.db.executeSql(
this.nextJobCommand(options.includeMetadata || false),
[values.name, batchSize || 1]
)

if (!result) {
if (!result || result.rows.length === 0) {
return null
}

Expand All @@ -394,11 +407,7 @@ class Manager extends EventEmitter {
return job
})

return jobs.length === 0
? null
: jobs.length === 1 && !batchSize
? jobs[0]
: jobs
return jobs.length === 1 && !batchSize ? jobs[0] : jobs
}

async fetchCompleted (name, batchSize, options = {}) {
Expand Down
22 changes: 1 addition & 21 deletions src/plans.js
Expand Up @@ -18,20 +18,6 @@ const MUTEX = 1337968055000
const MIGRATE_RACE_MESSAGE = 'division by zero'
const CREATE_RACE_MESSAGE = 'already exists'

const SECOND = 1000
const MINUTE = SECOND * 60
const HOUR = MINUTE * 60
const DAY = HOUR * 24

// source: pg.types -> postgres-interval
const INTERVAL_TO_MS_MAP = {
days: DAY,
hours: HOUR,
minutes: MINUTE,
seconds: SECOND,
milliseconds: 1
}

module.exports = {
create,
insertVersion,
Expand Down Expand Up @@ -61,7 +47,6 @@ module.exports = {
setCronTime,
locked,
assertMigration,
intervalToMs,
getArchivedJobById,
getJobById,
states: { ...states },
Expand All @@ -72,11 +57,6 @@ module.exports = {
DEFAULT_SCHEMA
}

function intervalToMs (interval) {
const ms = Object.keys(interval).reduce((total, key) => total + INTERVAL_TO_MS_MAP[key] * interval[key], 0)
return ms
}

function locked (query) {
if (Array.isArray(query)) {
query = query.join(';\n')
Expand Down Expand Up @@ -344,7 +324,7 @@ function fetchNextJob (schema) {
retryCount = CASE WHEN state = '${states.retry}' THEN retryCount + 1 ELSE retryCount END
FROM nextJob
WHERE j.id = nextJob.id
RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data, expireIn'}
RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data, EXTRACT(epoch FROM expireIn) as expireIn'}
`
}

Expand Down
47 changes: 26 additions & 21 deletions src/timekeeper.js
Expand Up @@ -39,34 +39,46 @@ class Timekeeper extends EventEmitter {
this.unschedule,
this.getSchedules
]

this.stopped = true
}

async start () {
if (this.config.archiveSeconds < 60) {
return
}

await this.cacheClockSkew()

if (this.config.archiveSeconds >= 60) {
await this.watch()
this.cronMonitorInterval = setInterval(async () => await this.monitorCron(), this.cronMonitorIntervalMs)
}
await this.manager.subscribe(queues.CRON, { newJobCheckIntervalSeconds: 4 }, (job) => this.onCron(job))
await this.manager.subscribe(queues.SEND_IT, { newJobCheckIntervalSeconds: 4, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job))

await this.cronMonitorAsync()

this.cronMonitorInterval = setInterval(async () => await this.monitorCron(), this.cronMonitorIntervalMs)
this.skewMonitorInterval = setInterval(async () => await this.cacheClockSkew(), this.skewMonitorIntervalMs)

this.stopped = false
}

async stop () {
if (!this.stopped) {
this.stopped = true
if (this.stopped) {
return
}

if (this.skewMonitorInterval) {
clearInterval(this.skewMonitorInterval)
this.skewMonitorInterval = null
}
this.stopped = true

if (this.cronMonitorInterval) {
clearInterval(this.cronMonitorInterval)
this.cronMonitorInterval = null
}
await this.manager.unsubscribe(queues.CRON)
await this.manager.unsubscribe(queues.SEND_IT)

if (this.skewMonitorInterval) {
clearInterval(this.skewMonitorInterval)
this.skewMonitorInterval = null
}

if (this.cronMonitorInterval) {
clearInterval(this.cronMonitorInterval)
this.cronMonitorInterval = null
}
}

Expand Down Expand Up @@ -96,13 +108,6 @@ class Timekeeper extends EventEmitter {
this.clockSkew = skew
}

async watch () {
await this.manager.subscribe(queues.CRON, { newJobCheckIntervalSeconds: 4 }, (job) => this.onCron(job))
await this.manager.subscribe(queues.SEND_IT, { newJobCheckIntervalSeconds: 4, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job))

await this.cronMonitorAsync()
}

async cronMonitorAsync () {
const opts = {
retryLimit: 2,
Expand Down
24 changes: 8 additions & 16 deletions test/delayTest.js
Expand Up @@ -5,29 +5,21 @@ const delay = require('delay')
describe('delayed jobs', function () {
it('should wait until after an int (in seconds)', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

const delaySeconds = 2
const queue = 'wait'
const startAfter = 2

const data = { message: 'hold your horses', submitted: Date.now() }
const options = { startAfter: delaySeconds }

await boss.publish(queue, data, options)
await boss.publish(queue, null, { startAfter })

return new Promise((resolve, reject) => {
boss.subscribe(queue, async job => {
const start = new Date(job.data.submitted)
const end = new Date()
const job = await boss.fetch(queue)

const elapsedSeconds = Math.floor((end - start) / 1000)
assert.strictEqual(job, null)

await job.done()
await delay(startAfter * 1000)

assert(delaySeconds >= elapsedSeconds)
const job2 = await boss.fetch(queue)

resolve()
})
})
assert(job2)
})

it('should wait until after a date time string', async function () {
Expand Down
4 changes: 2 additions & 2 deletions test/migrationTest.js
Expand Up @@ -136,7 +136,7 @@ describe('migration', function () {
} catch (error) {
assert(error.message.includes('wat'))
} finally {
boss1.stop()
await boss1.stop({ graceful: false })
}

const version1 = await contractor.version()
Expand All @@ -154,6 +154,6 @@ describe('migration', function () {

assert.strictEqual(version2, currentSchemaVersion)

await boss2.stop()
await boss2.stop({ graceful: false })
})
})

0 comments on commit d7208d4

Please sign in to comment.