Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cancelling timeouted queued queries in transactions for MySQL #4416

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/dialects/mysql/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// -------
const defer = require('lodash/defer');
const map = require('lodash/map');
const remove = require('lodash/remove');
const { promisify } = require('util');
const Client = require('../../client');

Expand Down Expand Up @@ -123,7 +124,7 @@ class Client_MySQL extends Client {
return;
}
const queryOptions = Object.assign({ sql: obj.sql }, obj.options);
connection.query(
obj.__query = connection.query(
queryOptions,
obj.bindings,
function (err, rows, fields) {
Expand Down Expand Up @@ -161,7 +162,17 @@ class Client_MySQL extends Client {
}
}

async cancelQuery(connectionToKill) {
_cancelQueuedQuery(connectionToKill, queryToKill) {
const removed = remove(connectionToKill._protocol._queue, (query, idx) => (
idx > 0 && query === queryToKill.__query
));
return removed.length > 0;
}

async cancelQuery(connectionToKill, queryToKill) {
if (this._cancelQueuedQuery(connectionToKill, queryToKill)) return;

connectionToKill.pause();
const conn = await this.acquireRawConnection();
try {
return await this._query(conn, {
Expand All @@ -170,6 +181,7 @@ class Client_MySQL extends Client {
options: {},
});
} finally {
connectionToKill.resume();
await this.destroyRawConnection(conn);
if (conn.__knex__disposed) {
this.logger.warn(`Connection Error: ${conn.__knex__disposed}`);
Expand Down
13 changes: 13 additions & 0 deletions lib/dialects/mysql2/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// MySQL2 Client
// -------
const findIndex = require('lodash/findIndex');
const Client_MySQL = require('../mysql');
const Transaction = require('./transaction');

Expand All @@ -20,6 +21,18 @@ class Client_MySQL2 extends Client_MySQL {
}
return true;
}

_cancelQueuedQuery(connectionToKill, queryToKill) {
const index = findIndex(
connectionToKill._commands.toArray(),
(cmd) => cmd === queryToKill.__query
);
if (index >= 0) {
connectionToKill._commands.removeOne(index);
Copy link
Contributor Author

@martinmacko47 martinmacko47 Apr 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite an invasion into the driver internals. Perhaps we could eventually ask the driver upstream to implement canceling of queued queries. But for now, there is no other way how to remove a query from the queue.

Copy link
Collaborator

@maximelkin maximelkin Apr 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite an invasion into the driver internals

Dont really like this idea. Can we implement it in drivers first?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, implement this probably bad idea, can we try to make things same for pg and mysql instead?
Also some related thread was here #4324

Copy link
Contributor Author

@martinmacko47 martinmacko47 Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maximelkin

Can we implement it in drivers first?

I can definitely try to make a PR to drivers. But first we should be sure this solution is acceptable for us.

Wait, implement this probably bad idea, can we try to make things same for pg and mysql instead?

Can you please explain what you mean by making things the same for pg and mysql? How would we make things the same for them?

In MySQL there are two commands:

  • KILL QUERY ? stops the current query, and keeps both the transaction and the connection active. The user may continue with and eventually commit the transaction.

  • KILL CONNECTION ? stops the current query, rolls back the transaction and closes the connection.

In Postgres there are also two commands:

  • pg_cancel_backend(?) stops the current query, marks the transaction aborted, bug keeps the connection active. The user can roll back to a savepoint and continue with the transaction, or roll back the transaction completly.

  • pg_terminate_backend(?) stops the current query, rolls back the transaction and closes the connection.

We probably don't want to use KILL CONNECTION ? nor pg_terminate_backend(?) because that would be a breaking change for timeouts outside transactions. Timeouts outside transactions worked for years already, so there will be a lot of code expecting connections won't be closed after a query times out.

So we have to use KILL QUERY ? and pg_cancel_backend(?) which have different semantics. We can't make them do the same. We just need to use them somehow to make the timeouts working reliably also in transactions.

Also IMHO it is good to keep the connection open, so the user may still handle and resurrect the transation if one of their queries times out. In postgres they may want to roll back to some savepoint they created before running the slow query, in MySQL they may want to just catch the timeout exception and continue with another query.

Copy link
Collaborator

@maximelkin maximelkin Apr 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding connection.removeQuery() to mysql/mysql2, which accepts driver Query, and which responses with one of three states: not executed (and cancelled), currently executing (no actions, we should cancel it), not found (probably executed or cancelled by some other way).

currently executing - this case is dangerous, because cancel initialization could take too many time, so next query will be cancelled instead.

Maybe we need lock mechanism to prevent sending any commands to mysql before query die.

Also todo: check concurrent manual cancel + timeout cancel.

EDIT: proposed solution not final, see comments in drivers issues

Copy link
Contributor Author

@martinmacko47 martinmacko47 Apr 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maximelkin @kibertoad I filed the feature request to both drivers:

Please provide every possible endorsment for this feature request in the drives on behalf of Knex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maximelkin @kibertoad I have created draft implementations of Query.dequeue() for both drivers mysql and mysql2:

Also I have adapted this PR to use Query.dequeue() instead of driver internals. I created a separate PR for it, so we can have both versions arround until Query.dequeue() gets merged into the drivers: #4444

Copy link
Contributor Author

@martinmacko47 martinmacko47 Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maximelkin

currently executing - this case is dangerous, because cancel initialization could take too many time, so next query will be cancelled instead.

Maybe we need lock mechanism to prevent sending any commands to mysql before query die.

I managed to simulate this race condition in a test. And it seems connectionToKill.pause()/resume() indeed fixes it. Even thought it was supposedly meant for streams, it actually pauses processing packets from server for non-stream queries as well. As a consequence the driver after pause() won't start any new queries in the connection until we resume() it. Therefore even if the timeouted query happens to complete before we manage to kill it, no new query will start in the transaction, until we resume() the connection.

See the test: martinmacko47#7

I have put it into a separate branch until we decide it is working, so we won't intermingle two problems here.

Edit: Btw, it seems to work for both drives mysql as well as mysql2. Even though they have different implementation for pause()/resume().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have put it into a separate branch until we decide it is working, so we won't intermingle two problems here.

I've merged martinmacko47#7 both to this PR and to #4444. I've also backported all additional commits from #4444 except the commit using dequeque methods from drivers. So we have both versions ready in case it will take too long to get the dequeque methods upstream to drivers.

return true;
}
return false;
}
}

Object.assign(Client_MySQL2.prototype, {
Expand Down
2 changes: 1 addition & 1 deletion lib/execution/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class Runner {

let cancelQuery;
if (obj.cancelOnTimeout) {
cancelQuery = this.client.cancelQuery(this.connection);
cancelQuery = this.client.cancelQuery(this.connection, obj);
} else {
// If we don't cancel the query, we need to mark the connection as disposed so that
// it gets destroyed by the pool and is never used again. If we don't do this and
Expand Down
197 changes: 197 additions & 0 deletions test/integration/query/additional.js
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,203 @@ module.exports = function (knex) {
});
});

it('.timeout(ms, {cancel: true}) should cancel all parallel slow queries run in a single transaction', async function () {
// Only mysql/postgres query cancelling supported for now
if (!isMysql(knex) && !isPostgreSQL(knex)) {
return this.skip();
}

const testQueries = {
[drivers.PostgreSQL]: function () {
return knex.raw('SELECT pg_sleep(10)');
},
[drivers.MySQL]: function () {
return knex.raw('SELECT SLEEP(10)');
},
[drivers.MySQL2]: function () {
return knex.raw('SELECT SLEEP(10)');
},
[drivers.MsSQL]: function () {
return knex.raw("WAITFOR DELAY '00:00:10'");
},
[drivers.Oracle]: function () {
return knex.raw('begin dbms_lock.sleep(10); end;');
},
};

const driverName = knex.client.driverName;
if (!Object.prototype.hasOwnProperty.call(testQueries, driverName)) {
throw new Error('Missing test query for dialect: ' + driverName);
}

const query = testQueries[driverName]();

const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};

if (!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcesses = getProcessesForDriver[driverName];

return knex.transaction(async (trx) => {
const promise = Promise.all(_.range(5).map(async () => {
await query.timeout(50, { cancel: true }).transacting(trx);
}));

await delay(10);
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(query.toString());

await expect(promise).to.eventually.be.rejected.and.to.deep.include({
timeout: 50,
name: 'KnexTimeoutError',
message: 'Defined query timeout of 50ms exceeded when running query.',
});

await delay(10);
const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.not.include(query.toString());
});
});

it('.timeout(ms, {cancel: true}) should not cancel earlier queries in a transaction before the timeouted query', async function () {
// Only mysql/postgres query cancelling supported for now
// Postgres automatically aborts the entire transaction if one query is canceled
if (!isMysql(knex)) {
return this.skip();
}

const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};

const driverName = knex.client.driverName;
if (!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcesses = getProcessesForDriver[driverName];

return knex.transaction(async (trx) => {
const queryA = trx.raw('SELECT SLEEP(0.1)');
const queryB = trx.raw('SELECT SLEEP(0.2)');
const promiseA = Promise.all([queryA])
const promiseB = Promise.all([queryB.timeout(50, {cancel: true})])

await delay(10);
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(queryA.toString());
expect(processesBeforeTimeout).to.not.include(queryB.toString());

await expect(promiseB).to.eventually.be.rejected.and.to.deep.include({
timeout: 50,
name: 'KnexTimeoutError',
message: 'Defined query timeout of 50ms exceeded when running query.',
});

await delay(10);
const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.include(queryA.toString());
expect(processesAfterTimeout).to.not.include(queryB.toString());

await expect(promiseA).to.eventually.be.fulfilled.and.to.be.an('array');

await delay(10);
const processesAfterCompletion = await getProcesses();
expect(processesAfterCompletion).to.not.include(queryA.toString());
expect(processesAfterCompletion).to.not.include(queryB.toString());
});
});

it('.timeout(ms, {cancel: true}) should not cancel subsequent queries in a transaction if timeouted query completes while being cancelled', async function () {
// Only mysql/postgres query cancelling supported for now
// Postgres automatically aborts the entire transaction if one query is canceled
if (!isMysql(knex)) {
return this.skip();
}

const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};

const driverName = knex.client.driverName;
if (!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcesses = getProcessesForDriver[driverName];

return knex.transaction(async (trx) => {
const queryA = trx.raw('SELECT SLEEP(1), "A"');
const queryB = trx.raw('SELECT SLEEP(1), "B"');

const promise = Promise.all([
queryA.timeout(500, { cancel: true }),
queryB,
])

await delay(100);
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(queryA.toString());
expect(processesBeforeTimeout).to.not.include(queryB.toString());

const origAcquireRawConnection = trx.client.acquireRawConnection.bind(trx.client);
trx.client.acquireRawConnection = async () => {
await delay(600);
return origAcquireRawConnection();
}

await expect(promise).to.eventually.be.rejected.and.to.deep.include({
timeout: 500,
name: 'KnexTimeoutError',
message: 'Defined query timeout of 500ms exceeded when running query.',
});

trx.client.acquireRawConnection = origAcquireRawConnection;

await delay(100);
const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.not.include(queryA.toString());
expect(processesAfterTimeout).to.include(queryB.toString());
});
});

it('.timeout(ms, {cancel: true}) should cancel slow query even if connection pool is exhausted', async function () {
// Only mysql/postgres query cancelling supported for now
if (!isMysql(knex) && !isPostgreSQL(knex)) {
Expand Down