Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Experiment/inject transaction connection #3708

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const debugQuery = (sql, txId) => _debugQuery(sql.replace(/%/g, '%%'), txId);

const { POOL_CONFIG_OPTIONS } = require('./constants');

const ReuseConnection = (connection) =>
async function withConnection(next) {
return next(connection);
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops -- I thought I deleted this part already. I'll fix this in a little while. (I currently have the tests running on loop so that we can detect unhandled promise rejections)

// The base client provides the general structure
// for a dialect specific client object.
function Client(config = {}) {
Expand Down
56 changes: 16 additions & 40 deletions lib/dialects/mssql/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,48 +55,24 @@ module.exports = class Transaction_MSSQL extends Transaction {
// Acquire a connection and create a disposer - either using the one passed
// via config or getting one off the client. The disposer will be called once
// the original promise is marked completed.
acquireConnection(config, cb) {
const configConnection = config && config.connection;
return new Promise((resolve, reject) => {
try {
resolve(
(this.outerTx ? this.outerTx.conn : null) ||
configConnection ||
this.client.acquireConnection()
);
} catch (e) {
reject(e);
async acquireConnection(conn, cb) {
try {
if (!this.outerTx) {
this.conn = conn;
conn.tx_ = conn.transaction();
}
})
.then((conn) => {
conn.__knexTxId = this.txid;
if (!this.outerTx) {
this.conn = conn;
conn.tx_ = conn.transaction();
}
return conn;
})
.then(async (conn) => {
try {
return await cb(conn);
} finally {
if (!this.outerTx) {
if (conn.tx_) {
if (!this._completed) {
debug('%s: unreleased transaction', this.txid);
conn.tx_.rollback();
}
conn.tx_ = null;
}
this.conn = null;
if (!configConnection) {
debug('%s: releasing connection', this.txid);
this.client.releaseConnection(conn);
} else {
debug('%s: not releasing external connection', this.txid);
}

return await cb(conn);
} finally {
if (!this.outerTx) {
if (conn.tx_) {
if (!this._completed) {
debug('%s: unreleased transaction', this.txid);
conn.tx_.rollback();
}
conn.tx_ = null;
}
});
}
}
}
};
44 changes: 11 additions & 33 deletions lib/dialects/oracledb/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,18 @@ module.exports = class Oracle_Transaction extends Transaction {
return this.query(conn, `SAVEPOINT ${this.txid}`);
}

acquireConnection(config, cb) {
const configConnection = config && config.connection;
const t = this;
return new Promise((resolve, reject) => {
try {
this.client
.acquireConnection()
.then((cnx) => {
cnx.__knexTxId = this.txid;
cnx.isTransaction = true;
resolve(cnx);
})
.catch(reject);
} catch (e) {
reject(e);
}
}).then(async (connection) => {
async acquireConnection(connection, cb) {
try {
connection.isTransaction = true;
return await cb(connection);
} finally {
debugTx('%s: releasing connection', this.txid);
connection.isTransaction = false;
try {
return await cb(connection);
} finally {
debugTx('%s: releasing connection', this.txid);
connection.isTransaction = false;
try {
await connection.commitAsync();
} catch (err) {
t._rejecter(err);
} finally {
if (!configConnection) {
await t.client.releaseConnection(connection);
} else {
debugTx('%s: not releasing external connection', t.txid);
}
}
await connection.commitAsync();
} catch (err) {
this._rejecter(err);
}
});
}
}
};
9 changes: 7 additions & 2 deletions lib/dialects/sqlite3/schema/ddl.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ const {
chunk,
} = require('lodash');

const ReuseConnection = (connection) =>
async function(next) {
return next(connection);
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: This function is currently duplicated in a few places. We can consolidate it as part of the re-implementation.

// So altering the schema in SQLite3 is a major pain.
// We have our own object to deal with the renaming and altering the types
// for sqlite3 things.
Expand Down Expand Up @@ -282,7 +287,7 @@ assign(SQLite3_DDL.prototype, {
return omit(row, mappedFrom);
});
},
{ connection: this.connection }
{ withConnection: ReuseConnection(this.connection) }
);
},

Expand Down Expand Up @@ -312,7 +317,7 @@ assign(SQLite3_DDL.prototype, {
);
});
},
{ connection: this.connection }
{ withConnection: ReuseConnection(this.connection) }
);
},

Expand Down
143 changes: 74 additions & 69 deletions lib/transaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@ const makeKnex = require('./util/make-knex');
const { callbackify } = require('util');
const { timeout, KnexTimeoutError } = require('./util/timeout');
const finallyMixin = require('./util/finally-mixin');
const pipe = require('./util/pipe');

const debug = Debug('knex:tx');

const { uniqueId, isUndefined } = require('lodash');

const ReuseConnection = (connection) =>
async function(next) {
return next(connection);
};

const InjectTransactionID = (txid) => (next) =>
async function(connection) {
connection.__knexTxId = txid;
return next(connection);
};

// FYI: This is defined as a function instead of a constant so that
// each Transactor can have its own copy of the default config.
// This will minimize the impact of bugs that might be introduced
Expand Down Expand Up @@ -163,79 +175,69 @@ class Transaction extends EventEmitter {
// Wait for the earlier Transactions to complete before proceeding.
await this._previousSibling;

return this.acquireConnection(config, (connection) => {
const trxClient = (this.trxClient = makeTxClient(
this,
this.client,
connection
));
const init = this.client.transacting
? this.savepoint(connection)
: this.begin(connection);
const executionPromise = new Promise((resolver, rejecter) => {
this._resolver = resolver;
this._rejecter = rejecter;
});

init
.then(() => {
return makeTransactor(this, connection, trxClient);
})
.then((transactor) => {
transactor.executionPromise = executionPromise;

// If we've returned a "thenable" from the transaction container, assume
// the rollback and commit are chained to this object's success / failure.
// Directly thrown errors are treated as automatic rollbacks.
let result;
try {
result = container(transactor);
} catch (err) {
result = Promise.reject(err);
}
if (result && result.then && typeof result.then === 'function') {
result
.then((val) => {
return transactor.commit(val);
})
.catch((err) => {
return transactor.rollback(err);
});
}
return null;
})
.catch((e) => {
return this._rejecter(e);
const { withConnection } = config;

const withEverything = pipe([
withConnection,
InjectTransactionID(this.txid),
]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I'll keep the pipe([...]) function in the re-implementation. It's a functional programming concept that many developers might find mystifying.


return withEverything((conn) => {
return this.acquireConnection(conn, (connection) => {
const trxClient = (this.trxClient = makeTxClient(
this,
this.client,
connection
));
const init = this.client.transacting
? this.savepoint(connection)
: this.begin(connection);
const executionPromise = new Promise((resolver, rejecter) => {
this._resolver = resolver;
this._rejecter = rejecter;
});

return executionPromise;
init
.then(() => {
return makeTransactor(this, connection, trxClient);
})
.then((transactor) => {
transactor.executionPromise = executionPromise;

// If we've returned a "thenable" from the transaction container, assume
// the rollback and commit are chained to this object's success / failure.
// Directly thrown errors are treated as automatic rollbacks.
let result;
try {
result = container(transactor);
} catch (err) {
result = Promise.reject(err);
}
if (result && result.then && typeof result.then === 'function') {
result
.then((val) => {
return transactor.commit(val);
})
.catch((err) => {
return transactor.rollback(err);
});
}
return null;
})
.catch((e) => {
return this._rejecter(e);
});

return executionPromise;
});
});
}

// Acquire a connection and create a disposer - either using the one passed
// via config or getting one off the client. The disposer will be called once
// the original promise is marked completed.
acquireConnection(config, cb) {
const configConnection = config && config.connection;
return new Promise((resolve, reject) => {
try {
resolve(configConnection || this.client.acquireConnection());
} catch (e) {
reject(e);
}
}).then(async (connection) => {
try {
connection.__knexTxId = this.txid;
return await cb(connection);
} finally {
if (!configConnection) {
debug('%s: releasing connection', this.txid);
this.client.releaseConnection(connection);
} else {
debug('%s: not releasing external connection', this.txid);
}
}
});
async acquireConnection(connection, cb) {
return cb(connection);
}

then(onResolve, onReject) {
Expand Down Expand Up @@ -269,13 +271,16 @@ function makeTransactor(trx, connection, trxClient) {
transactor.isTransaction = true;
transactor.userParams = trx.userParams || {};

transactor.transaction = function(container, options) {
if (!options) {
options = { doNotRejectOnRollback: true };
} else if (isUndefined(options.doNotRejectOnRollback)) {
transactor.transaction = function(container, _config) {
const options = Object.assign({}, _config);
if (isUndefined(options.doNotRejectOnRollback)) {
options.doNotRejectOnRollback = true;
}

if (isUndefined(options.withConnection)) {
options.withConnection = ReuseConnection(connection);
}

if (container) {
return trxClient.transaction(container, options, trx);
} else {
Expand Down
19 changes: 16 additions & 3 deletions lib/util/make-knex.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ const QueryInterface = require('../query/methods');
const { merge, isUndefined } = require('lodash');
const batchInsert = require('./batchInsert');

const LeaseConnectionFromClient = (client) =>
async function withConnection(next) {
const connection = await client.acquireConnection();
try {
return await next(connection);
} finally {
await client.releaseConnection(connection);
}
};

function makeKnex(client) {
// The object we're potentially using to kick off an initial chain.
function knex(tableName, options) {
Expand Down Expand Up @@ -38,15 +48,18 @@ function initContext(knexFn) {
// when transaction is ready to be used.
transaction(container, _config) {
const config = Object.assign({}, _config);
config.userParams = this.userParams || {}
if(isUndefined(config.doNotRejectOnRollback)) {
config.userParams = this.userParams || {};
if (isUndefined(config.doNotRejectOnRollback)) {
// Backwards-compatibility: default value changes depending upon
// whether or not a `container` was provided.
config.doNotRejectOnRollback = !container;
}

if (isUndefined(config.withConnection)) {
config.withConnection = LeaseConnectionFromClient(this.client);
}

if(container) {
if (container) {
const trx = this.client.transaction(container, config);
return trx;
} else {
Expand Down
6 changes: 6 additions & 0 deletions lib/util/pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
const pipe = (decorators) => (next) =>
Array.from(decorators)
.reverse()
.reduce((_next, d) => d(_next), next);

module.exports = exports = pipe;