Skip to content

Commit

Permalink
9
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielSchaffer committed Mar 11, 2019
1 parent 8289681 commit 8bdc41f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 20 deletions.
30 changes: 17 additions & 13 deletions packages/dandi-contrib/data-pg/src/pg-db-queryable.ts
Expand Up @@ -23,23 +23,12 @@ export class PgDbQueryableBase<TClient extends PgDbQueryableClient> implements D
) {}

public async query(cmd: string, ...args: any[]): Promise<any[]> {
let result: QueryResult
if (args) {
args.forEach((arg, index) => {
args[index] = this.formatArg(arg)
})
}
try {
result = await this.client.query(cmd, args)
} catch (err) {
throw new PgDbQueryError(err)
}
return result.rows
return this.queryInternal(cmd, args)
}

public async queryModel<T>(model: Constructor<T>, cmd: string, ...args: any[]): Promise<T[]> {
cmd = this.replaceSelectList(model, cmd)
const result = await this.query(cmd, ...args)
const result = await this.queryInternal(cmd, args)
if (!result || !result.length) {
return result
}
Expand Down Expand Up @@ -93,6 +82,21 @@ export class PgDbQueryableBase<TClient extends PgDbQueryableClient> implements D
return cmd.replace(/select\s+([\w\s,._]+)\s+from/i, `select\n${newSelect.join(',\n')}\nfrom`)
}

protected async queryInternal(cmd: string, args: any[]): Promise<any[]> {
let result: QueryResult
if (args) {
args.forEach((arg, index) => {
args[index] = this.formatArg(arg)
})
}
try {
result = await this.client.query(cmd, args)
} catch (err) {
throw new PgDbQueryError(err)
}
return result.rows
}

private formatArg(arg: any): any {
if (arg instanceof Uuid) {
return `{${arg}}`
Expand Down
Expand Up @@ -12,7 +12,7 @@ import { ModelBuilderFixture } from '@dandi/model-builder/testing'
import { expect } from 'chai'
import { stub } from 'sinon'

describe.only('PgDbTransactionClient', function() {
describe('PgDbTransactionClient', function() {

const harness = stubHarness(PgDbTransactionClient,
PgDbPoolClientFixture.factory,
Expand Down Expand Up @@ -93,6 +93,88 @@ describe.only('PgDbTransactionClient', function() {
expect(this.client.query.thirdCall.args).to.deep.equal(['SELECT more FROM stuff', []])

})

it('throws an error if called when the transaction cannot accept a query', async function() {

this.transactionClient.state = 'COMMITTING'

await expect(this.transactionClient.query('SELECT foo FROM bar'))
.to.be.rejectedWith(InvalidTransactionStateError)

})

})

describe('queryModel', function() {

class TestModel {}

beforeEach(function() {
stub(this.transactionClient, 'rollback')
})

it('begins the transaction if it has not already begun and sets the internal state to READY', async function() {

await this.transactionClient.queryModel(TestModel, 'SELECT foo FROM bar')

expect(this.transactionClient.state).to.equal('READY')
expect(this.client.query).to.have.been.calledTwice
expect(this.client.query.firstCall.args).to.deep.equal(['BEGIN', []])
expect(this.client.query.secondCall.args).to.deep.equal(['SELECT foo FROM bar', []])
})

it('does not send additional BEGIN queries if the transaction has already begun', async function() {
await this.transactionClient.queryModel(TestModel, 'INSERT INTO bar (foo) VALUES ($1)', 42)
await this.transactionClient.queryModel(TestModel, 'SELECT foo FROM bar')

expect(this.client.query).to.have.been.calledThrice
expect(this.client.query.firstCall.args).to.deep.equal(['BEGIN', []])
expect(this.client.query.secondCall.args).to.deep.equal(['INSERT INTO bar (foo) VALUES ($1)', [42]])
expect(this.client.query.thirdCall.args).to.deep.equal(['SELECT foo FROM bar', []])
})

it('rolls the transaction back if an exception is thrown and rethrows the error', async function() {
const catcher = stub()
this.client.query.onSecondCall().callsFake(() => {
throw new Error()
})
this.client.query.onThirdCall().returns({ rows: [] })

try {
await this.transactionClient.queryModel(TestModel, 'SELECT foo FROM bar')
} catch (err) {
catcher(err)
}
expect(catcher).to.have.been.calledOnce
expect(this.client.query).to.have.been.calledTwice
expect(this.client.query.firstCall.args).to.deep.equal(['BEGIN', []])
expect(this.client.query.secondCall.args).to.deep.equal(['SELECT foo FROM bar', []])
expect(this.transactionClient.rollback).to.have.been.calledOnce
})

it('waits for an existing state transition before continuing', async function() {

const firstQuery = this.transactionClient.queryModel(TestModel, 'SELECT foo FROM bar')
const secondQuery = this.transactionClient.queryModel(TestModel, 'SELECT more FROM stuff')

await secondQuery

expect(this.client.query).to.have.been.calledThrice
expect(this.client.query.firstCall.args).to.deep.equal(['BEGIN', []])
expect(this.client.query.secondCall.args).to.deep.equal(['SELECT foo FROM bar', []])
expect(this.client.query.thirdCall.args).to.deep.equal(['SELECT more FROM stuff', []])

})

it('throws an error if called when the transaction cannot accept a query', async function() {

this.transactionClient.state = 'COMMITTING'

await expect(this.transactionClient.queryModel(TestModel, 'SELECT foo FROM bar'))
.to.be.rejectedWith(InvalidTransactionStateError)

})

})

describe('commit', function() {
Expand Down Expand Up @@ -150,6 +232,21 @@ describe.only('PgDbTransactionClient', function() {
expect(receivedErr).to.be.instanceof(PgDbQueryError)
expect(receivedErr.innerError).to.equal(err)
})

it('rethrows the error if it cannot roll back', async function() {

this.transactionClient.state = 'READY'
const err = new Error('Your llama is lloose!')

this.client.query.callsFake(() => {
this.transactionClient.state = 'ROLLED_BACK'
return Promise.reject(err)
})

const commitErr = await expect(this.transactionClient.commit()).to.be.rejected
expect(commitErr.innerError).to.equal(err)

})
})

describe('rollback', function() {
Expand Down
Expand Up @@ -71,7 +71,7 @@ export class PgDbTransactionClient extends PgDbQueryableBase<PgDbPoolClient> imp
}

public async query(cmd: string, ...args: any[]): Promise<any[]> {
return this.mutex.runLocked(async (lock) => {
return await this.mutex.runLocked(async (lock) => {
await this.safeBeginTransaction()
try {
return await super.query(cmd, ...args)
Expand All @@ -85,7 +85,7 @@ export class PgDbTransactionClient extends PgDbQueryableBase<PgDbPoolClient> imp
}

public async queryModel<T>(model: Constructor<T>, cmd: string, ...args: any[]): Promise<T[]> {
return this.mutex.runLocked(async (lock) => {
return await this.mutex.runLocked(async (lock) => {
await this.safeBeginTransaction()
try {
return await super.queryModel<T>(model, cmd, ...args)
Expand Down Expand Up @@ -147,10 +147,6 @@ export class PgDbTransactionClient extends PgDbQueryableBase<PgDbPoolClient> imp
}

private validateTransactionAction(action: TransactionAction): void {
// if (this.state === TRANSITIONS[action]) {
// return
// }

if (!ALLOWED_ACTIONS[this.state].includes(action)) {
throw new InvalidTransactionStateError(`Cannot perform action ${action} while in transaction state ${this.state}`)
}
Expand Down

0 comments on commit 8bdc41f

Please sign in to comment.