Skip to content

Commit

Permalink
Merge pull request #185 from timgit/cron-monitor
Browse files Browse the repository at this point in the history
Cron monitoring
  • Loading branch information
timgit committed Aug 14, 2020
2 parents 44ef18b + 0f53be8 commit 52a3d9d
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 26 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,9 @@
# Changes

## 5.0.6

- Added a cron monitor to verify the health of the cron queue.

## 5.0.5

- Removed latency offset calculation during clock skew detection. This was causing cron processing to be paused whenever a significant wait time was required to acquire a connection from the pool.
Expand Down
113 changes: 95 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "5.0.5",
"version": "5.0.6",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
Expand All @@ -14,7 +14,7 @@
},
"devDependencies": {
"coveralls": "^3.1.0",
"mocha": "^8.0.1",
"mocha": "^8.1.1",
"moment-timezone": "^0.5.31",
"nyc": "^15.1.0",
"standard": "^14.3.4"
Expand Down
11 changes: 11 additions & 0 deletions src/migrationStore.js
Expand Up @@ -77,6 +77,17 @@ function getAll (schema, config) {
const keepUntil = config ? config.keepUntil : DEFAULT_RETENTION

return [
{
release: '5.0.6',
version: 15,
previous: 14,
install: [
`ALTER TABLE ${schema}.version ADD cron_on timestamp with time zone`
],
uninstall: [
`ALTER TABLE ${schema}.version DROP COLUMN cron_on`
]
},
{
release: '5.0.0',
version: 14,
Expand Down
13 changes: 12 additions & 1 deletion src/plans.js
Expand Up @@ -39,6 +39,8 @@ module.exports = {
getQueueSize,
getMaintenanceTime,
setMaintenanceTime,
getCronTime,
setCronTime,
states: { ...states },
completedJobPrefix,
advisoryLock,
Expand Down Expand Up @@ -79,7 +81,8 @@ function createVersionTable (schema) {
return `
CREATE TABLE ${schema}.version (
version int primary key,
maintained_on timestamp with time zone
maintained_on timestamp with time zone,
cron_on timestamp with time zone
)
`
}
Expand Down Expand Up @@ -148,6 +151,14 @@ function getMaintenanceTime (schema) {
return `SELECT maintained_on, EXTRACT( EPOCH FROM (now() - maintained_on) ) seconds_ago FROM ${schema}.version`
}

function setCronTime (schema) {
return `UPDATE ${schema}.version SET cron_on = now()`
}

function getCronTime (schema) {
return `SELECT cron_on, EXTRACT( EPOCH FROM (now() - cron_on) ) seconds_ago FROM ${schema}.version`
}

function deleteQueue (schema, options = {}) {
options.before = options.before || states.active
assert(options.before in states, `${options.before} is not a valid state`)
Expand Down
41 changes: 37 additions & 4 deletions src/timekeeper.js
Expand Up @@ -30,6 +30,8 @@ class Timekeeper extends EventEmitter {
this.getSchedulesCommand = plans.getSchedules(config.schema)
this.scheduleCommand = plans.schedule(config.schema)
this.unscheduleCommand = plans.unschedule(config.schema)
this.getCronTimeCommand = plans.getCronTime(config.schema)
this.setCronTimeCommand = plans.setCronTime(config.schema)

this.functions = [
this.schedule,
Expand All @@ -45,21 +47,36 @@ class Timekeeper extends EventEmitter {
await this.watch()
}

this.monitorInterval = setInterval(() => this.cacheClockSkew(), this.monitorIntervalMs)
this.skewMonitorInterval = setInterval(() => this.cacheClockSkew(), this.monitorIntervalMs)
this.cronMonitorInterval = setInterval(() => this.monitorCron(), 60)

this.stopped = false
}

async stop () {
if (!this.stopped) {
this.stopped = true

if (this.monitorInterval) {
clearInterval(this.monitorInterval)
this.monitorInterval = null
if (this.skewMonitorInterval) {
clearInterval(this.skewMonitorInterval)
this.skewMonitorInterval = null
}

if (this.cronMonitorInterval) {
clearInterval(this.cronMonitorInterval)
this.cronMonitorInterval = null
}
}
}

async monitorCron () {
const { secondsAgo } = await this.getCronTime()

if (secondsAgo > 60) {
await this.cronMonitorAsync()
}
}

async cacheClockSkew () {
const { rows } = await this.db.executeSql(this.getTimeCommand)

Expand Down Expand Up @@ -111,6 +128,8 @@ class Timekeeper extends EventEmitter {
if (sending.length) {
await Promise.map(sending, it => this.send(it), { concurrency: 5 })
}

await this.setCronTime()
} catch (err) {
this.emit(this.events.error, err)
}
Expand Down Expand Up @@ -168,6 +187,20 @@ class Timekeeper extends EventEmitter {
const { rowCount } = await this.db.executeSql(this.unscheduleCommand, [name])
return rowCount
}

async setCronTime () {
await this.db.executeSql(this.setCronTimeCommand)
}

async getCronTime () {
const { rows } = await this.db.executeSql(this.getCronTimeCommand)

let { cron_on: cronOn, seconds_ago: secondsAgo } = rows[0]

secondsAgo = secondsAgo !== null ? parseFloat(secondsAgo) : 61

return { cronOn, secondsAgo }
}
}

module.exports = Timekeeper
2 changes: 1 addition & 1 deletion version.json
@@ -1,3 +1,3 @@
{
"schema": 14
"schema": 15
}

0 comments on commit 52a3d9d

Please sign in to comment.