Skip to content

Commit

Permalink
added begin, commit to transactions, more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jadbox committed Jan 13, 2016
1 parent 9a78225 commit 4f171f1
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
24 changes: 19 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,20 @@ module.exports = {

function _transaction(_this, queryList) {
// Same as Client's version
queryList = [].concat(queryList);
return Rxo.fromArray(queryList).scan((acc, x) => {
queryList = ['BEGIN'].concat(queryList);
queryList.push('COMMIT');

//if(_this.opts.debug) console.log("starting transaction");

let lastResponse = null;
return Rxo.fromArray(queryList).reduce((acc, x, i) => {
/*if(_this.opts.debug)
console.log('transaction ' + i + ': ', x.toString());*/

return acc.concatMap( prev => {
if(i < queryList.length && prev) {
lastResponse = prev;
}
if(!x) return Rxo.empty();

if(typeof x === 'string') return _this._query(x);
Expand All @@ -34,9 +45,10 @@ function _transaction(_this, queryList) {
}, Rxo.just(null))
.merge(1)
.catch( x => {
console.log('Transaction error:', x);
lastResponse = null;
if(this.ops.debug) console.log('Transaction error:', x);
return _this._query('ROLLBACK').flatMap(_=>Rxo.throw(x));
}).last()
}).first().map(x => lastResponse); // use last response before commit
}

/**
Expand Down Expand Up @@ -94,8 +106,10 @@ Pool.prototype._query = function() {
pool.client.rxquery = pool.client.rxquery
|| Rxo.fromNodeCallback(pool.client.query, pool.client);

return Rxo.defer(x => pool.client.rxquery.apply(pool.client, args))
const ret = Rxo.defer(x => pool.client.rxquery.apply(pool.client, args))
.do( () => null, () => pool.done(), () => pool.done() )

return ret;
})
}

Expand Down
33 changes: 30 additions & 3 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const pg = require('../')

const config = 'postgres://hx:hx@localhost/hx'

describe('## pg-then', () => {
describe('## pg-rxjs', () => {
describe('# Pool', () => {

it('invalid db uri', done => {
Expand Down Expand Up @@ -37,7 +37,7 @@ describe('## pg-then', () => {
})

it('query transaction', () => {
const pool = pg.Pool(config);
const pool = pg.Pool(config, {debug: true});
const transaction = pool.transaction,
query = pool.query;
transaction([
Expand Down Expand Up @@ -90,7 +90,7 @@ describe('## pg-then', () => {
let client, query;

it('new client', (done) => {
client = pg.Client(config, {debug: true}); // desync'ed connection
client = pg.Client(config, {debug: false}); // desync'ed connection
query = client.query; // query method is bound to client
done();
})
Expand Down Expand Up @@ -128,6 +128,33 @@ describe('## pg-then', () => {
}, err => assert.fail('there should be no err', err))
})

it('query transaction invalid function return', (done) => {
client.transaction([
query('SELECT 2 as count'),
'SELECT 3 as count',
x => {
assert(x.rows[0].count === 3);
return null; // invalid return step
}
])
.subscribe(result => {
assert.fail('there should be no result', result)
}, err => done())
})

it('query transaction invalid query step', (done) => {
client.transaction([
query('SELECT 2 as count'),
null, // invalid step
x => {
return query('SELECT $1::int as count', [x.rows[0].count+1])
}
])
.subscribe(result => {
assert.fail('there should be no result', result)
}, err => done())
})

it('stream', done => {
let rows = 0
return client
Expand Down

0 comments on commit 4f171f1

Please sign in to comment.