Skip to content

Commit

Permalink
Merge 8de485e into d32832d
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Jun 18, 2021
2 parents d32832d + 8de485e commit 41c8d37
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 131 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,11 @@
# Changes

## 6.1.0

- Existing schemas can now be used via the `schema` property in the constructor.
- Fixed expiration rejection in subscriptions when the pg driver wasn't returning an interval object.
- Removed setInterval() in `unsubscribe()` causing process to hang

## 6.0.1

- Typescript types updated for `stop()`. PR from @stnwk
Expand Down
14 changes: 13 additions & 1 deletion docs/usage.md
Expand Up @@ -90,6 +90,8 @@ GRANT CREATE ON DATABASE db1 TO leastprivuser;

If the CREATE privilege is not available or desired, you can use the included [static functions](#static-functions) to export the SQL commands to manually create or upgrade the required database schema. **This means you will also need to monitor future releases for schema changes** (the schema property in [version.json](../version.json)) so they can be applied manually.

NOTE: Using an existing schema is supported for advanced use cases **but discouraged**, as this opens up the possibility that creation will fail on an object name collision, and it will add more steps to the uninstallation process.

# Database uninstall

If you need to uninstall pg-boss from a database, just run the following command.
Expand All @@ -98,7 +100,17 @@ If you need to uninstall pg-boss from a database, just run the following command
DROP SCHEMA $1 CASCADE
```

Where `$1` is the name of your schema if you've customized it. Otherwise, the default schema is `pgboss`.
Where `$1` is the name of your schema if you've customized it. Otherwise, the default schema is `pgboss`.

NOTE: If an existing schema was used during installation, created objects will need to be removed manually using the following commands.

```sql
DROP TABLE ${schema}.archive;
DROP TABLE ${schema}.job;
DROP TABLE ${schema}.schedule;
DROP TABLE ${schema}.version;
DROP TYPE ${schema}.job_state;
```

# Direct database interactions

Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "6.0.1",
"version": "6.1.0",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
Expand Down
56 changes: 26 additions & 30 deletions src/index.js
Expand Up @@ -131,49 +131,45 @@ class PgBoss extends EventEmitter {
await this.manager.stop()
await this.timekeeper.stop()

let polling = false

const shutdown = async () => {
try {
await this.boss.stop()
if (this.db.isOurs) {
await this.db.close()
}

if (this.db.isOurs) {
await this.db.close()
}
} catch (err) {
if (polling) {
this.emit(events.error, err)
} else {
throw err
}
} finally {
this.stopped = true
this.stoppingOn = null
this.stopped = true
this.stoppingOn = null

this.emit(events.stopped)
}
this.emit(events.stopped)
}

if (!graceful) {
return await shutdown()
await this.boss.stop()
await shutdown()
return
}

if (this.manager.getWipData().length === 0) {
return await shutdown()
}
setImmediate(async () => {
let closing = false

polling = true
try {
while (Date.now() - this.stoppingOn < timeout) {
if (this.manager.getWipData({ includeInternal: closing }).length === 0) {
if (closing) {
break
}

setImmediate(async () => {
while (Date.now() - this.stoppingOn < timeout) {
await delay(1000)
closing = true

await this.boss.stop()
}

if (this.manager.getWipData().length === 0) {
return await shutdown()
await delay(1000)
}
}

await shutdown()
await shutdown()
} catch (err) {
this.emit(events.error, err)
}
})
}
}
Expand Down
51 changes: 30 additions & 21 deletions src/manager.js
Expand Up @@ -111,7 +111,9 @@ class Manager extends EventEmitter {
}
}

getWipData () {
getWipData (options = {}) {
const { includeInternal = false } = options

const data = this.getWorkers()
.map(({
id,
Expand All @@ -138,7 +140,7 @@ class Manager extends EventEmitter {
lastError,
lastErrorOn
}))
.filter(i => i.count > 0 && !INTERNAL_QUEUES[i.name])
.filter(i => i.count > 0 && (!INTERNAL_QUEUES[i.name] || includeInternal))

return data
}
Expand All @@ -165,24 +167,32 @@ class Manager extends EventEmitter {
throw new Error('__test__throw_subscription')
}

const expirationRace = (promise, timeout) => Promise.race([
promise,
delay.reject(timeout, { value: new Error(`handler execution exceeded ${timeout}ms`) })
])
const resolveWithinSeconds = async (promise, seconds) => {
const timeout = Math.max(1, seconds) * 1000
const reject = delay.reject(timeout, { value: new Error(`handler execution exceeded ${timeout}ms`) })

const result = await Promise.race([promise, reject])

try {
reject.clear()
} catch {}

return result
}

this.emitWip(name)

let result

if (batchSize) {
const maxTimeout = jobs.reduce((acc, i) => Math.max(acc, plans.intervalToMs(i.expirein)), 0)
const maxExpiration = jobs.reduce((acc, i) => Math.max(acc, i.expirein), 0)

// Failing will fail all fetched jobs
result = await expirationRace(Promise.all([callback(jobs)]), maxTimeout)
result = await resolveWithinSeconds(Promise.all([callback(jobs)]), maxExpiration)
.catch(err => this.fail(jobs.map(job => job.id), err))
} else {
result = await pMap(jobs, job =>
expirationRace(callback(job), plans.intervalToMs(job.expirein))
resolveWithinSeconds(callback(job), job.expirein)
.then(result => this.complete(job.id, result))
.catch(err => this.fail(job.id, err))
, { concurrency: teamConcurrency }
Expand Down Expand Up @@ -228,13 +238,15 @@ class Manager extends EventEmitter {
worker.stop()
}

setInterval(() => {
if (workers.every(w => w.stopped)) {
for (const worker of workers) {
this.removeWorker(worker)
}
setImmediate(async () => {
while (!workers.every(w => w.stopped)) {
await delay(1000)
}

for (const worker of workers) {
this.removeWorker(worker)
}
}, 1000)
})
}

async offComplete (value) {
Expand Down Expand Up @@ -374,12 +386,13 @@ class Manager extends EventEmitter {

async fetch (name, batchSize, options = {}) {
const values = Attorney.checkFetchArgs(name, batchSize, options)

const result = await this.db.executeSql(
this.nextJobCommand(options.includeMetadata || false),
[values.name, batchSize || 1]
)

if (!result) {
if (!result || result.rows.length === 0) {
return null
}

Expand All @@ -394,11 +407,7 @@ class Manager extends EventEmitter {
return job
})

return jobs.length === 0
? null
: jobs.length === 1 && !batchSize
? jobs[0]
: jobs
return jobs.length === 1 && !batchSize ? jobs[0] : jobs
}

async fetchCompleted (name, batchSize, options = {}) {
Expand Down
22 changes: 1 addition & 21 deletions src/plans.js
Expand Up @@ -18,20 +18,6 @@ const MUTEX = 1337968055000
const MIGRATE_RACE_MESSAGE = 'division by zero'
const CREATE_RACE_MESSAGE = 'already exists'

const SECOND = 1000
const MINUTE = SECOND * 60
const HOUR = MINUTE * 60
const DAY = HOUR * 24

// source: pg.types -> postgres-interval
const INTERVAL_TO_MS_MAP = {
days: DAY,
hours: HOUR,
minutes: MINUTE,
seconds: SECOND,
milliseconds: 1
}

module.exports = {
create,
insertVersion,
Expand Down Expand Up @@ -61,7 +47,6 @@ module.exports = {
setCronTime,
locked,
assertMigration,
intervalToMs,
getArchivedJobById,
getJobById,
states: { ...states },
Expand All @@ -72,11 +57,6 @@ module.exports = {
DEFAULT_SCHEMA
}

function intervalToMs (interval) {
const ms = Object.keys(interval).reduce((total, key) => total + INTERVAL_TO_MS_MAP[key] * interval[key], 0)
return ms
}

function locked (query) {
if (Array.isArray(query)) {
query = query.join(';\n')
Expand Down Expand Up @@ -344,7 +324,7 @@ function fetchNextJob (schema) {
retryCount = CASE WHEN state = '${states.retry}' THEN retryCount + 1 ELSE retryCount END
FROM nextJob
WHERE j.id = nextJob.id
RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data, expireIn'}
RETURNING ${includeMetadata ? 'j.*' : 'j.id, name, data, EXTRACT(epoch FROM expireIn) as expireIn'}
`
}

Expand Down
47 changes: 26 additions & 21 deletions src/timekeeper.js
Expand Up @@ -39,34 +39,46 @@ class Timekeeper extends EventEmitter {
this.unschedule,
this.getSchedules
]

this.stopped = true
}

async start () {
if (this.config.archiveSeconds < 60) {
return
}

await this.cacheClockSkew()

if (this.config.archiveSeconds >= 60) {
await this.watch()
this.cronMonitorInterval = setInterval(async () => await this.monitorCron(), this.cronMonitorIntervalMs)
}
await this.manager.subscribe(queues.CRON, { newJobCheckIntervalSeconds: 4 }, (job) => this.onCron(job))
await this.manager.subscribe(queues.SEND_IT, { newJobCheckIntervalSeconds: 4, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job))

await this.cronMonitorAsync()

this.cronMonitorInterval = setInterval(async () => await this.monitorCron(), this.cronMonitorIntervalMs)
this.skewMonitorInterval = setInterval(async () => await this.cacheClockSkew(), this.skewMonitorIntervalMs)

this.stopped = false
}

async stop () {
if (!this.stopped) {
this.stopped = true
if (this.stopped) {
return
}

if (this.skewMonitorInterval) {
clearInterval(this.skewMonitorInterval)
this.skewMonitorInterval = null
}
this.stopped = true

if (this.cronMonitorInterval) {
clearInterval(this.cronMonitorInterval)
this.cronMonitorInterval = null
}
await this.manager.unsubscribe(queues.CRON)
await this.manager.unsubscribe(queues.SEND_IT)

if (this.skewMonitorInterval) {
clearInterval(this.skewMonitorInterval)
this.skewMonitorInterval = null
}

if (this.cronMonitorInterval) {
clearInterval(this.cronMonitorInterval)
this.cronMonitorInterval = null
}
}

Expand Down Expand Up @@ -96,13 +108,6 @@ class Timekeeper extends EventEmitter {
this.clockSkew = skew
}

async watch () {
await this.manager.subscribe(queues.CRON, { newJobCheckIntervalSeconds: 4 }, (job) => this.onCron(job))
await this.manager.subscribe(queues.SEND_IT, { newJobCheckIntervalSeconds: 4, teamSize: 50, teamConcurrency: 5 }, (job) => this.onSendIt(job))

await this.cronMonitorAsync()
}

async cronMonitorAsync () {
const opts = {
retryLimit: 2,
Expand Down

0 comments on commit 41c8d37

Please sign in to comment.