Skip to content

Commit

Permalink
Merge 08963ca into c4bcaab
Browse files Browse the repository at this point in the history
  • Loading branch information
klesgidis committed Oct 17, 2022
2 parents c4bcaab + 08963ca commit bf7fb69
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 47 deletions.
62 changes: 37 additions & 25 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ GRANT CREATE ON DATABASE db1 TO leastprivuser;

If the CREATE privilege is not available or desired, you can use the included [static functions](#static-functions) to export the SQL commands to manually create or upgrade the required database schema. **This means you will also need to monitor future releases for schema changes** (the schema property in [version.json](../version.json)) so they can be applied manually.

NOTE: Using an existing schema is supported for advanced use cases **but discouraged**, as this opens up the possibility that creation will fail on an object name collision, and it will add more steps to the uninstallation process.
NOTE: Using an existing schema is supported for advanced use cases **but discouraged**, as this opens up the possibility that creation will fail on an object name collision, and it will add more steps to the uninstallation process.

# Database uninstall

Expand All @@ -109,7 +109,7 @@ If you need to uninstall pg-boss from a database, just run the following command
DROP SCHEMA $1 CASCADE
```

Where `$1` is the name of your schema if you've customized it. Otherwise, the default schema is `pgboss`.
Where `$1` is the name of your schema if you've customized it. Otherwise, the default schema is `pgboss`.

NOTE: If an existing schema was used during installation, created objects will need to be removed manually using the following commands.

Expand Down Expand Up @@ -156,7 +156,7 @@ The following command is the definition of the primary job table. For manual job

# Events

Each instance of pg-boss is an EventEmitter. You can run multiple instances of pg-boss for a variety of use cases including distribution and load balancing. Each instance has the freedom to process to whichever jobs you need. Because of this diversity, the job activity of one instance could be drastically different from another.
Each instance of pg-boss is an EventEmitter. You can run multiple instances of pg-boss for a variety of use cases including distribution and load balancing. Each instance has the freedom to process to whichever jobs you need. Because of this diversity, the job activity of one instance could be drastically different from another.

> For example, if you were to process to `error` in instance A, it will not receive an `error` event from instance B.
Expand Down Expand Up @@ -309,11 +309,11 @@ The following options can be set as properties in an object for additional confi

* **db** - object

Passing an object named db allows you "bring your own database connection". This option may be beneficial if you'd like to use an existing database service with its own connection pool. Setting this option will bypass the above configuration.
Passing an object named db allows you "bring your own database connection". This option may be beneficial if you'd like to use an existing database service with its own connection pool. Setting this option will bypass the above configuration.

The expected interface is a function named `executeSql` that allows the following code to run without errors.


```js
const text = "select 1 as value1 from table1 where bar = $1"
const values = ['foo']
Expand All @@ -322,7 +322,7 @@ The following options can be set as properties in an object for additional confi

assert(rows[0].value1 === 1)
assert(rowCount === 1)
```
```

* **schema** - string, defaults to "pgboss"

Expand Down Expand Up @@ -524,6 +524,18 @@ Available in constructor as a default, or overridden in send.

> When a higher unit is is specified, lower unit configuration settings are ignored.
**Connection options**

* **db**, object
A wrapper object containing an async method called `executeSql` that performs the query to the db. Can be used to manage jobs inside a transaction. Example:

```
const db = {
async executeSql (sql, values) {
return trx.query(sql, values)
}
}
```

**Deferred jobs**

Expand Down Expand Up @@ -652,7 +664,7 @@ This is a convenience version of `send()` with the `singletonSeconds`, `singleto

## `insert([jobs])`

Create multiple jobs in one request with an array of objects.
Create multiple jobs in one request with an array of objects.

The contract and supported features are slightly different than `send()`, which is why this function is named independently. For example, debouncing is not supported.

Expand All @@ -667,7 +679,7 @@ interface JobInsert<T = object> {
retryLimit?: number;
retryDelay?: number;
retryBackoff?: boolean;
startAfter?: Date | string;
startAfter?: Date | string;
singletonKey?: string;
expireInSeconds?: number;
keepUntil?: Date | string;
Expand Down Expand Up @@ -761,7 +773,7 @@ for (let i = 0; i < jobs.length; i++) {
Same as `fetch()`, but retrieves any completed jobs. See [`onComplete()`](#oncompletename--options-handler) for more information.
## `work()`

Adds a new polling worker for a queue and executes the provided callback function when jobs are found. Multiple workers can be added if needed.
Adds a new polling worker for a queue and executes the provided callback function when jobs are found. Multiple workers can be added if needed.

Workers can be stopped via `offWork()` all at once by queue name or individually by using the unique id resolved by `work()`. Workers may be monitored by listening to the `wip` event.

Expand Down Expand Up @@ -817,7 +829,7 @@ How often workers will poll the queue table for jobs. Available in the construct

**Handler function**

Typically `handler` will be an `async` function, since this automatically returns promises that can be awaited for backpressure support.
Typically `handler` will be an `async` function, since this automatically returns promises that can be awaited for backpressure support.

If handler returns a promise, the value resolved/returned will be stored in a completion job. Likewise, if an error occurs in the handler, it will be caught and useful error properties stored into a completion job in addition to marking the job as failed.

Expand Down Expand Up @@ -940,7 +952,7 @@ Remove the subscription of queue `name` to `event`.

## Scheduling

Jobs may be sent automatically based on a cron expression. As with other cron-based systems, at least one instance needs to be running for scheduling to work. In order to reduce the amount of evaluations, schedules are checked every 30 seconds, which means the 6-placeholder format should be discouraged in favor of the minute-level precision 5-placeholder format.
Jobs may be sent automatically based on a cron expression. As with other cron-based systems, at least one instance needs to be running for scheduling to work. In order to reduce the amount of evaluations, schedules are checked every 30 seconds, which means the 6-placeholder format should be discouraged in favor of the minute-level precision 5-placeholder format.

For example, use this format, which implies "any second during 3:30 am every day"

Expand All @@ -954,7 +966,7 @@ but **not** this format which is parsed as "only run exactly at 3:30:30 am every
30 30 3 * * *
```

In order mitigate clock skew and drift, every 10 minutes the clocks of each instance are compared to the database server's clock. The skew, if any, is stored and used as an offset during cron evaluation to ensure all instances are synchronized. Internally, job throttling options are then used to make sure only 1 job is sent even if multiple instances are running.
In order mitigate clock skew and drift, every 10 minutes the clocks of each instance are compared to the database server's clock. The skew, if any, is stored and used as an offset during cron evaluation to ensure all instances are synchronized. Internally, job throttling options are then used to make sure only 1 job is sent even if multiple instances are running.

If needed, the default clock monitoring interval can be adjusted using `clockMonitorIntervalSeconds` or `clockMonitorIntervalMinutes`. Additionally, to disable scheduling on an instance completely, use the following in the constructor options.

Expand All @@ -968,7 +980,7 @@ For more cron documentation and examples see the docs for the [cron-parser packa

### `schedule(name, cron, data, options)`

Schedules a job to be sent to the specified queue based on a cron expression. If the schedule already exists, it's updated to the new cron expression.
Schedules a job to be sent to the specified queue based on a cron expression. If the schedule already exists, it's updated to the new cron expression.

**Arguments**

Expand All @@ -993,49 +1005,49 @@ Removes a schedule by queue name.

Retrieves an array of all scheduled jobs currently being monitored.

## `cancel(id)`
## `cancel(id, options)`

Cancels a pending or active job.

The promise will resolve on a successful cancel, or reject if the job could not be cancelled.

## `cancel([ids])`
## `cancel([ids], options)`

Cancels a set of pending or active jobs.

The promise will resolve on a successful cancel, or reject if not all of the requested jobs could not be cancelled.

> Due to the nature of the use case of attempting a batch job cancellation, it may be likely that some jobs were in flight and even completed during the cancellation request. Because of this, cancellation will cancel as many as possible and reject with a message showing the number of jobs that could not be cancelled because they were no longer active.
## `resume(id)`
## `resume(id, options)`

Resumes a cancelled job.

## `resume([ids])`
## `resume([ids], options)`

Resumes a set of cancelled jobs.

## `complete(id [, data])`
## `complete(id [, data, options])`

Completes an active job. This would likely only be used with `fetch()`. Accepts an optional `data` argument for usage with [`onComplete()`](#oncompletename--options-handler) state-based workers or `fetchCompleted()`.

The promise will resolve on a successful completion, or reject if the job could not be completed.

## `complete([ids])`
## `complete([ids], options)`

Completes a set of active jobs.

The promise will resolve on a successful completion, or reject if not all of the requested jobs could not be marked as completed.

> See comments above on `cancel([ids])` regarding when the promise will resolve or reject because of a batch operation.
## `fail(id [, data])`
## `fail(id [, data, options])`

Marks an active job as failed. This would likely only be used with `fetch()`. Accepts an optional `data` argument for usage with [`onFail()`](#onfailname--options-handler) state-based workers or `fetchFailed()`.

The promise will resolve on a successful assignment of failure, or reject if the job could not be marked as failed.

## `fail([ids])`
## `fail([ids], options)`

Fails a set of active jobs.

Expand All @@ -1045,7 +1057,7 @@ The promise will resolve on a successful failure state assignment, or reject if
## `notifyWorker(id)`

Notifies a worker by id to bypass the job polling interval (see `newJobCheckInterval`) for this iteration in the loop.
Notifies a worker by id to bypass the job polling interval (see `newJobCheckInterval`) for this iteration in the loop.

## `getQueueSize(name [, options])`

Expand All @@ -1065,7 +1077,7 @@ As an example, the following options object include active jobs along with creat
}
```

## `getJobById(id)`
## `getJobById(id, options)`

Retrieves a job with all metadata by id in either the primary or archive storage.

Expand Down
27 changes: 16 additions & 11 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -535,27 +535,31 @@ class Manager extends EventEmitter {
}
}

async complete (id, data) {
async complete (id, data, options = {}) {
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'complete')
const result = await this.db.executeSql(this.completeJobsCommand, [ids, this.mapCompletionDataArg(data)])
const result = await db.executeSql(this.completeJobsCommand, [ids, this.mapCompletionDataArg(data)])
return this.mapCompletionResponse(ids, result)
}

async fail (id, data) {
async fail (id, data, options = {}) {
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'fail')
const result = await this.db.executeSql(this.failJobsCommand, [ids, this.mapCompletionDataArg(data)])
const result = await db.executeSql(this.failJobsCommand, [ids, this.mapCompletionDataArg(data)])
return this.mapCompletionResponse(ids, result)
}

async cancel (id) {
async cancel (id, options = {}) {
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'cancel')
const result = await this.db.executeSql(this.cancelJobsCommand, [ids])
const result = await db.executeSql(this.cancelJobsCommand, [ids])
return this.mapCompletionResponse(ids, result)
}

async resume (id) {
async resume (id, options = {}) {
const db = options.db || this.db
const ids = this.mapCompletionIdArg(id, 'resume')
const result = await this.db.executeSql(this.resumeJobsCommand, [ids])
const result = await db.executeSql(this.resumeJobsCommand, [ids])
return this.mapCompletionResponse(ids, result)
}

Expand Down Expand Up @@ -587,14 +591,15 @@ class Manager extends EventEmitter {
return result ? parseFloat(result.rows[0].count) : null
}

async getJobById (id) {
const result1 = await this.db.executeSql(this.getJobByIdCommand, [id])
async getJobById (id, options = {}) {
const db = options.db || this.db
const result1 = await db.executeSql(this.getJobByIdCommand, [id])

if (result1 && result1.rows && result1.rows.length === 1) {
return result1.rows[0]
}

const result2 = await this.db.executeSql(this.getArchivedJobByIdCommand, [id])
const result2 = await db.executeSql(this.getArchivedJobByIdCommand, [id])

if (result2 && result2.rows && result2.rows.length === 1) {
return result2.rows[0]
Expand Down
23 changes: 23 additions & 0 deletions test/cancelTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,27 @@ describe('cancel', function () {

await boss.cancel(jobs)
})

it('should cancel a pending job with custom connection', async function () {
const config = this.test.bossConfig
const boss = this.test.boss = await helper.start(config)

let called = false
const _db = await helper.getDb()
const db = {
async executeSql (sql, values) {
called = true
return _db.pool.query(sql, values)
}
}

const jobId = await boss.send('will_cancel', null, { startAfter: 1 })

await boss.cancel(jobId, { db })

const job = await boss.getJobById(jobId)

assert(job && job.state === 'cancelled')
assert.strictEqual(called, true)
})
})
37 changes: 37 additions & 0 deletions test/completeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -298,4 +298,41 @@ describe('complete', function () {

assert.strictEqual(job.output.message, completionError.message)
})

it('should complete a batch of jobs with custom connection', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig, onComplete: true })

const queue = 'complete-batch'
const batchSize = 3

await Promise.all([
boss.send(queue),
boss.send(queue),
boss.send(queue)
])

const countJobs = (state) => helper.countJobs(this.test.bossConfig.schema, 'name = $1 AND state = $2', [queue, state])

const jobs = await boss.fetch(queue, batchSize)

const activeCount = await countJobs(PgBoss.states.active)

assert.strictEqual(activeCount, batchSize)

let called = false
const _db = await helper.getDb()
const db = {
async executeSql (sql, values) {
called = true
return _db.pool.query(sql, values)
}
}

await boss.complete(jobs.map(job => job.id), null, { db })

const completed = await boss.fetchCompleted(queue, batchSize)

assert.strictEqual(batchSize, completed.length)
assert.strictEqual(called, true)
})
})
22 changes: 22 additions & 0 deletions test/failureTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,26 @@ describe('failure', function () {
assert.strictEqual(job.data.state, 'failed')
assert(job.data.response.message.includes(message))
})

it('should fail a job with custom connection', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

await boss.send(queue)

const job = await boss.fetch(queue)

let called = false
const _db = await helper.getDb()
const db = {
async executeSql (sql, values) {
called = true
return _db.pool.query(sql, values)
}
}

await boss.fail(job.id, null, { db })

assert.strictEqual(called, true)
})
})
29 changes: 29 additions & 0 deletions test/resumeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,33 @@ describe('cancel', function () {

assert(job2 && job2.state === 'created')
})

it('should cancel and resume a pending job with custom connection', async function () {
const config = this.test.bossConfig
const boss = this.test.boss = await helper.start(config)

const jobId = await boss.send('will_cancel', null, { startAfter: 1 })

let callCount = 0
const _db = await helper.getDb()
const db = {
async executeSql (sql, values) {
callCount++
return _db.pool.query(sql, values)
}
}

await boss.cancel(jobId, { db })

const job = await boss.getJobById(jobId, { db })

assert(job && job.state === 'cancelled')

await boss.resume(jobId, { db })

const job2 = await boss.getJobById(jobId, { db })

assert(job2 && job2.state === 'created')
assert.strictEqual(callCount, 4)
})
})

0 comments on commit bf7fb69

Please sign in to comment.