-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Standalone transaction client #2717
Comments
Here's a messy sample implementation of this while playing around. 'use strict';
import Promise from 'bluebird';
import { EventEmitter } from 'events';
import Debug from 'debug';
import makeKnex from './util/make-knex';
const debug = Debug('knex:tx');
import { assign, pick, uniqueId, isUndefined } from 'lodash';
export default function StandaloneTransaction(client, config, outerTx) {
const txid = uniqueId('s-trx');
this.txid = txid;
this.client = client;
this.logger = client.logger;
this.outerTx = outerTx;
this._debug = client.config && client.config.debug;
debug(
'%s: Starting %s s-transaction',
this.txid,
outerTx ? 'nested' : 'top level'
);
const trxClient = Object.create(client.constructor.prototype);
assign(trxClient, pick(client, 'version', 'config', 'driver', 'connectionSettings', 'valueForUndefined', 'logger'));
trxClient.transacting = true;
trxClient.on('query', function(arg) {
client.emit('query', arg);
});
trxClient.on('query-error', function(err, obj) {
client.emit('query-error', err, obj);
});
trxClient.on('query-response', function(response, obj, builder) {
client.emit('query-response', response, obj, builder);
});
const isCompleted = () => this._completed || (this.outerTx && this.outerTx.isCompleted()) || false;
const _query = trxClient.query;
let connection = null;
const query = (sql, status, value) => {
const q = trxClient
.query(connection, sql)
.catch((err) => {
status = 2;
value = err;
this._completed = true;
debug('%s error running transaction query', this.txid);
})
.tap(() => {
if (status === 2) {
if (isUndefined(value)) {
throw new Error(`Transaction rejected with non-error: ${value}`);
}
throw value;
}
});
if (status === 1 || status === 2) {
this._completed = true;
}
return q;
};
const ensureConnection = () => new Promise((resolve, reject) => {
if(connection) {
return resolve(connection);
}
const configConnection = config && config.connection;
return Promise.try(() => configConnection || client.acquireConnection())
.then((con) => {
con.__knexTxId = txid;
connection = con;
return connection;
})
.then(() => {
if(client.transacting) {
return query(`SAVEPOINT ${txid};`);
}
return query('BEGIN;');
})
.then(() => {
return resolve(connection);
})
.catch(reject)
.disposer(function(connection) {
if (!configConnection) {
debug('%s: releasing connection', txid);
client.releaseConnection(connection);
} else {
debug('%s: not releasing external connection', txid);
}
});
});
trxClient.query = function(conn, obj) {
const completed = isCompleted();
return Promise.using(ensureConnection(), function(connection) {
return Promise.try(function() {
if (conn !== connection)
throw new Error('Invalid connection for transaction query.');
if (completed) completedError(txid, obj);
return _query.call(trxClient, conn, obj);
});
});
};
const _stream = trxClient.stream;
trxClient.stream = function(conn, obj, stream, options) {
const completed = isCompleted();
return Promise.using(ensureConnection(), function(connection) {
return Promise.try(function() {
if (conn !== connection)
throw new Error('Invalid connection for transaction query.');
if (completed) completedError(txid, obj);
return _stream.call(trxClient, conn, obj, stream, options);
});
});
};
trxClient.acquireConnection = function() {
return ensureConnection();
};
trxClient.releaseConnection = function() {
return Promise.resolve();
};
const commit = (value) => query('COMMIT;', 1, value);
const release = (value) => query(`RELEASE SAVEPOINT ${txid};`, 1, value);
const rollback = (error) => query('ROLLBACK', 2, error)
.timeout(5000)
.catch(Promise.TimeoutError, () => {
throw error;
});
const rollbackTo = (error) => query(`ROLLBACK TO SAVEPOINT ${txid}`, 2, error)
.timeout(5000)
.catch(Promise.TimeoutError, () => {
throw error;
});
const transactor = makeKnex(trxClient);
transactor.transaction = function(container, options) {
return trxClient.transaction(container, options, trx);
};
transactor.savepoint = function(container, options) {
return transactor.transaction(container, options);
};
if (client.transacting) {
transactor.commit = (value) => release(value);
transactor.rollback = (error) => rollbackTo(error);
} else {
transactor.commit = (value) => commit(value);
transactor.rollback = (error) => rollback(error);
}
return transactor;
}
function completedError(trx, obj) {
const sql = typeof obj === 'string' ? obj : obj && obj.sql;
debug('%s: Transaction completed: %s', trx.txid, sql);
throw new Error(
'Transaction query already complete, run with DEBUG=knex:tx for more info'
);
} Usage const transaction = knex.createTransaction();
const accounts = await transaction('accounts').select();
const nestedTransaction = transaction.createTransaction();
const deletedAccounts = await nestedTransaction('accounts').del();
await nestedTransaction.rollback();
await transaction('accounts').update({balance: 50});
await transaction.commit(); @tgriesser Are there any plans currently to implement something like this through the 1.0 refactor branch? |
Also been discussed here in the past. |
@wubzz yeah this is definitely needed, and we should work on getting this in there. The issue I've run into is that the way transactions (and a lot of the connection internals) are implemented is super ad-hoc/hacked together throughout all of the various adapters and I'm wary of continuing to throw stuff in there without standardizing at least the basics. That's mostly what's driving #2698 - I think that having types will pay off big medium to long term and help us simplify what is handled where. I would also like to get rid of needing to use I'm going to be on vacation for the next 2-3 weeks but as soon as I'm back there will be more progress on that and we can continue to look at how we can get this implemented. If the API seems good maybe we can implement as a hacky solution at first and then refactor later. |
@tgriesser This is something I really think would make knex shine even more. Personally I can work on this on freetime and at work, but what's stopping me is not knowing what's going on next in the repo. Doing something rather big like this seems like a bad idea if the entire codebase is going to be rewritten (be it with TypeScript or not). I agree the codebase suffers a lot. In many areas it's like a bunch of puzzle pieces glued together, creating a somewhat sideways puzzle. But we've also dropped older versions of node. We shouldn't even need webpack at this point, with async/await and all. Rewriting into ES6 classes with async/await and better general structure shouldn't take too long either, assuming we're not changing existing features at the same time. In any case I would like to see this improvement go through somehow. |
@wubzz We still support Node 6 until April, so we can't drop Babel yet if we need async/await, as it was first introduced in Node 8. |
Alright, so maybe not drop babel quite just yet, but the point remains the same. |
@Wubzzi think it is a great idea. It is what I need to end project that I'm working on for a company that I'm working for. |
You can implement similar helpers directly to your project, no need to wait this for this to get your project finished. |
can i use his |
Depends on what are you trying to do exactly. Anyways that is offtopic from this thread maybe people in stackoverflow can help with solution of your exact problem. |
Feature discussion / request
Explain what is your use case
Using Knex with transactions can be very tedious due to its nature of relying on a callback scope, especially so when used within an application dependant on HTTP libs such as express, restify, etc.
Explain what kind of feature would support this
Exposing a transaction client without relying on a callback (read 'container') should solve this. In essence a transaction client must run all its queries on the same connection. Naturally it will still need to issue initial
BEGIN
/SAVEPOINT
queries prior to running the first query.Give some API proposal, how the feature should work
In my case I've been using knex like the Before example below for about 3-4 years and it's been bugging me since day one. Maybe there already are better solutions, I'm not sure.
The 1.0 refactoring idea of using contexts would be optimal solution, but I think this is on ice right now?
Without the refactoring branch, I would suggest a compact solution such as this:
Before
After
Other example
This would return a slightly altered version of makeTxClient, and must support a "hook" on the first query to be run. This hook has three jobs:
BEGIN
(Using nested transactions (Savepoints) on a transaction that has not yet run a single query would also issue the "hook" prior to creating the nested transacton, ensuring the same connection is used)
Benefits of this approach:
Looking at existing transaction structure I believe this is possible to build by breaking it up into smaller pieces.
Obviously completely removing the current implementation of
.transaction
would break every knex-transaction-dependant application out there, so exposing this as a seperate function would be better.The text was updated successfully, but these errors were encountered: