Skip to content

Commit

Permalink
Merge pull request #341 from klesgidis/transactions
Browse files Browse the repository at this point in the history
Add transaction support on createJob, insert & fetch
  • Loading branch information
timgit committed Aug 3, 2022
2 parents dbd8f98 + fb985ee commit b852e18
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 10 deletions.
15 changes: 9 additions & 6 deletions src/manager.js
Expand Up @@ -393,6 +393,7 @@ class Manager extends EventEmitter {

async createJob (name, data, options, singletonOffset = 0) {
const {
db: wrapper,
expireIn,
priority,
startAfter,
Expand Down Expand Up @@ -423,8 +424,8 @@ class Manager extends EventEmitter {
keepUntil, // 13
onComplete // 14
]

const result = await this.db.executeSql(this.insertJobCommand, values)
const db = wrapper || this.db
const result = await db.executeSql(this.insertJobCommand, values)

if (result && result.rowCount === 1) {
return result.rows[0].id
Expand All @@ -445,10 +446,12 @@ class Manager extends EventEmitter {
return await this.createJob(name, data, options, singletonOffset)
}

async insert (jobs) {
async insert (jobs, options = {}) {
const { db: wrapper } = options
const db = wrapper || this.db
const checkedJobs = Attorney.checkInsertArgs(jobs)
const data = JSON.stringify(checkedJobs)
return await this.db.executeSql(this.insertJobsCommand, [data])
return await db.executeSql(this.insertJobsCommand, [data])
}

getDebounceStartAfter (singletonSeconds, clockOffset) {
Expand All @@ -470,8 +473,8 @@ class Manager extends EventEmitter {

async fetch (name, batchSize, options = {}) {
const values = Attorney.checkFetchArgs(name, batchSize, options)

const result = await this.db.executeSql(
const db = options.db || this.db
const result = await db.executeSql(
this.nextJobCommand(options.includeMetadata || false),
[values.name, batchSize || 1]
)
Expand Down
22 changes: 22 additions & 0 deletions test/fetchTest.js
Expand Up @@ -99,4 +99,26 @@ describe('fetch', function () {
assert(job.keepuntil !== undefined)
})
})

it('should fetch a job with custom connection', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

let calledCounter = 0
const db = await helper.getDb()
const options = {
db: {
async executeSql (sql, values) {
calledCounter++
return db.pool.query(sql, values)
}
}
}

await boss.send(queue, {}, options)
const [job] = await boss.fetch(queue, 10, options)
assert(queue === job.name)
assert(job.startedon === undefined)
assert.strictEqual(calledCounter, 2)
})
})
48 changes: 48 additions & 0 deletions test/insertTest.js
Expand Up @@ -52,4 +52,52 @@ describe('insert', function () {
assert.strictEqual(new Date(job.keepuntil).toISOString(), input.keepUntil, `keepUntil input ${input.keepUntil} didn't match job ${job.keepuntil}`)
assert.strictEqual(job.on_complete, input.onComplete, `onComplete input ${input.onComplete} didn't match job ${job.on_complete}`)
})

it('should create jobs from an array with all properties and custom connection', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = this.test.bossConfig.schema

const input = {
id: uuid(),
name: queue,
priority: 1,
data: { some: 'data' },
retryLimit: 1,
retryDelay: 2,
retryBackoff: true,
startAfter: new Date().toISOString(),
expireInSeconds: 5,
singletonKey: '123',
keepUntil: new Date().toISOString(),
onComplete: true
}
let called = false
const db = await helper.getDb()
const options = {
db: {
async executeSql (sql, values) {
called = true
return db.pool.query(sql, values)
}
}
}

await boss.insert([input], options)

const job = await boss.getJobById(input.id)

assert.strictEqual(job.id, input.id, `id input ${input.id} didn't match job ${job.id}`)
assert.strictEqual(job.name, input.name, `name input ${input.name} didn't match job ${job.name}`)
assert.strictEqual(job.priority, input.priority, `priority input ${input.priority} didn't match job ${job.priority}`)
assert.strictEqual(JSON.stringify(job.data), JSON.stringify(input.data), `data input ${input.data} didn't match job ${job.data}`)
assert.strictEqual(job.retrylimit, input.retryLimit, `retryLimit input ${input.retryLimit} didn't match job ${job.retrylimit}`)
assert.strictEqual(job.retrydelay, input.retryDelay, `retryDelay input ${input.retryDelay} didn't match job ${job.retrydelay}`)
assert.strictEqual(job.retrybackoff, input.retryBackoff, `retryBackoff input ${input.retryBackoff} didn't match job ${job.retrybackoff}`)
assert.strictEqual(new Date(job.startafter).toISOString(), input.startAfter, `startAfter input ${input.startAfter} didn't match job ${job.startafter}`)
assert.strictEqual(job.expirein.seconds, input.expireInSeconds, `expireInSeconds input ${input.expireInSeconds} didn't match job ${job.expirein}`)
assert.strictEqual(job.singletonkey, input.singletonKey, `name input ${input.singletonKey} didn't match job ${job.singletonkey}`)
assert.strictEqual(new Date(job.keepuntil).toISOString(), input.keepUntil, `keepUntil input ${input.keepUntil} didn't match job ${job.keepuntil}`)
assert.strictEqual(job.on_complete, input.onComplete, `onComplete input ${input.onComplete} didn't match job ${job.on_complete}`)
assert.strictEqual(called, true)
})
})
59 changes: 59 additions & 0 deletions test/sendTest.js
Expand Up @@ -70,4 +70,63 @@ describe('send', function () {

assert.strictEqual(job.data, null)
})

it('should accept job object with name and custom connection', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = 'sendqueueAndOptions'
let called = false
const db = await helper.getDb()
const options = {
db: {
async executeSql (sql, values) {
called = true
return db.pool.query(sql, values)
}
},
someCrazyOption: 'whatever'
}

await boss.send({ name: queue, options })

const job = await boss.fetch(queue)

assert.notEqual(job, null)
assert.strictEqual(job.data, null)
assert.strictEqual(called, true)
})

it('should not create job if transaction fails', async function () {
const boss = this.test.boss = await helper.start(this.test.bossConfig)
const queue = 'sendqueueAndOptions'
const db = await helper.getDb()
const client = db.pool
await client.query('CREATE TABLE IF NOT EXISTS test (label VARCHAR(50))')

const throwError = () => { throw new Error('Error!!') }

try {
await client.query('BEGIN')
const options = {
db: {
async executeSql (sql, values) {
return client.query(sql, values)
}
},
someCrazyOption: 'whatever'
}
const queryText = 'INSERT INTO test(label) VALUES(\'Test\')'
await client.query(queryText)

await boss.send({ name: queue, options })

throwError()
await client.query('COMMIT')
} catch (e) {
await client.query('ROLLBACK')
}

const job = await boss.fetch(queue)

assert.strictEqual(job, null)
})
})
15 changes: 11 additions & 4 deletions types.d.ts
Expand Up @@ -91,7 +91,13 @@ declare namespace PgBoss {
singletonNextSlot?: boolean;
}

type SendOptions = JobOptions & ExpirationOptions & RetentionOptions & RetryOptions & CompletionOptions
interface ConnectionOptions {
db?: Db;
}

type InsertOptions = ConnectionOptions;

type SendOptions = JobOptions & ExpirationOptions & RetentionOptions & RetryOptions & CompletionOptions & ConnectionOptions;

type ScheduleOptions = SendOptions & { tz?: string }

Expand All @@ -112,7 +118,7 @@ declare namespace PgBoss {

type FetchOptions = {
includeMetadata?: boolean;
}
} & ConnectionOptions;

interface WorkHandler<ReqData, ResData> {
(job: PgBoss.JobWithDoneCallback<ReqData, ResData>): Promise<ResData> | void;
Expand Down Expand Up @@ -190,7 +196,7 @@ declare namespace PgBoss {
retryLimit?: number;
retryDelay?: number;
retryBackoff?: boolean;
startAfter?: Date | string;
startAfter?: Date | string;
singletonKey?: string;
expireInSeconds?: number;
keepUntil?: Date | string;
Expand Down Expand Up @@ -298,6 +304,7 @@ declare class PgBoss extends EventEmitter {
sendDebounced(name: string, data: object, options: PgBoss.SendOptions, seconds: number, key: string): Promise<string | null>;

insert(jobs: PgBoss.JobInsert[]): Promise<void>;
insert(jobs: PgBoss.JobInsert[], options: PgBoss.InsertOptions): Promise<void>;

work<ReqData, ResData>(name: string, handler: PgBoss.WorkHandler<ReqData, ResData>): Promise<string>;
work<ReqData, ResData>(name: string, options: PgBoss.WorkOptions & { includeMetadata: true }, handler: PgBoss.WorkWithMetadataHandler<ReqData, ResData>): Promise<string>;
Expand All @@ -311,7 +318,7 @@ declare class PgBoss extends EventEmitter {

/**
* Notify worker that something has changed
* @param workerId
* @param workerId
*/
notifyWorker(workerId: string): void;

Expand Down

0 comments on commit b852e18

Please sign in to comment.