Skip to content

Commit

Permalink
Use connection pause/resume to prevent kill race condition (#7)
Browse files Browse the repository at this point in the history
* Use connection pause/resume to prevent kill race condition

* Move connection resume before destroy raw connection
Conflicts:
	lib/dialects/mysql/index.js
  • Loading branch information
martinmacko47 committed Apr 28, 2021
1 parent b8dbd32 commit 8c89de4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/dialects/mysql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class Client_MySQL extends Client {
async cancelQuery(connectionToKill, queryToKill) {
if (this._cancelQueuedQuery(connectionToKill, queryToKill)) return;

connectionToKill.pause();
const conn = await this.acquireRawConnection();
try {
return await this._query(conn, {
Expand All @@ -180,6 +181,7 @@ class Client_MySQL extends Client {
options: {},
});
} finally {
connectionToKill.resume();
await this.destroyRawConnection(conn);
if (conn.__knex__disposed) {
this.logger.warn(`Connection Error: ${conn.__knex__disposed}`);
Expand Down
64 changes: 64 additions & 0 deletions test/integration/query/additional.js
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,70 @@ module.exports = function (knex) {
});
});

it('.timeout(ms, {cancel: true}) should not cancel subsequent queries in a transaction if timeouted query completes while being cancelled', async function () {
// Only mysql/postgres query cancelling supported for now
// Postgres automatically aborts the entire transaction if one query is canceled
if (!isMysql(knex)) {
return this.skip();
}

const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};

const driverName = knex.client.driverName;
if (!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcesses = getProcessesForDriver[driverName];

return knex.transaction(async (trx) => {
const queryA = trx.raw('SELECT SLEEP(1), "A"');
const queryB = trx.raw('SELECT SLEEP(1), "B"');

const promise = Promise.all([
queryA.timeout(500, { cancel: true }),
queryB,
])

await delay(100);
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(queryA.toString());
expect(processesBeforeTimeout).to.not.include(queryB.toString());

const origAcquireRawConnection = trx.client.acquireRawConnection.bind(trx.client);
trx.client.acquireRawConnection = async () => {
await delay(600);
return origAcquireRawConnection();
}

await expect(promise).to.eventually.be.rejected.and.to.deep.include({
timeout: 500,
name: 'KnexTimeoutError',
message: 'Defined query timeout of 500ms exceeded when running query.',
});

trx.client.acquireRawConnection = origAcquireRawConnection;

await delay(100);
const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.not.include(queryA.toString());
expect(processesAfterTimeout).to.include(queryB.toString());
});
});

it('.timeout(ms, {cancel: true}) should cancel slow query even if connection pool is exhausted', async function () {
// Only mysql/postgres query cancelling supported for now
if (!isMysql(knex) && !isPostgreSQL(knex)) {
Expand Down

0 comments on commit 8c89de4

Please sign in to comment.