Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix timeout method #4324

Merged
merged 5 commits into from Mar 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions lib/dialects/mysql/index.js
Expand Up @@ -162,16 +162,18 @@ class Client_MySQL extends Client {
}

async cancelQuery(connectionToKill) {
const conn = await this.acquireConnection();
const conn = await this.acquireRawConnection();
try {
return await this.query(conn, {
method: 'raw',
return await this._query(conn, {
sql: 'KILL QUERY ?',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bit of semantic difference between MySQL and Postgress:

  • KILL QUERY ? in MySQL stops just the current query, but keeps the transaction active. So the user can send further queries to the same transaction and commit it eventually.
  • SELECT pg_cancel_backend($1) in Postress stops the current query, and also automaticaly rejects the transaction. So any further queries sent to the same transaction will fail and the transaction will be eventually rolled back.

Should we keep this semantinc difference? Or should we rather manually roll back the transaction in MySQL as well. If we decide either way, it won't be a breaking change, as the query cancellation didn't work in transactions so far anyway.

bindings: [connectionToKill.threadId],
options: {},
});
} finally {
await this.releaseConnection(conn);
await this.destroyRawConnection(conn);
if (conn.__knex__disposed) {
this.logger.warn(`Connection Error: ${conn.__knex__disposed}`);
}
}
}
}
Expand Down
17 changes: 6 additions & 11 deletions lib/dialects/postgres/index.js
Expand Up @@ -237,24 +237,19 @@ class Client_PG extends Client {
}

async cancelQuery(connectionToKill) {
// Error out if we can't acquire connection in time.
// Purposely not putting timeout on `pg_cancel_backend` execution because erroring
// early there would release the `connectionToKill` back to the pool with
// a `KILL QUERY` command yet to finish.
const conn = await this.acquireConnection();
const conn = await this.acquireRawConnection();

try {
return await this._wrappedCancelQueryCall(conn, connectionToKill);
} finally {
// NOT returning this promise because we want to release the connection
// in a non-blocking fashion
this.releaseConnection(conn);
await this.destroyRawConnection(conn).catch((err) => {
this.logger.warn(`Connection Error: ${err}`);
});
}
}
_wrappedCancelQueryCall(conn, connectionToKill) {
return this.query(conn, {
method: 'raw',
sql: 'SELECT pg_cancel_backend(?);',
return this._query(conn, {
sql: 'SELECT pg_cancel_backend($1);',
bindings: [connectionToKill.processID],
options: {},
});
Expand Down
155 changes: 148 additions & 7 deletions test/integration/query/additional.js
Expand Up @@ -869,7 +869,118 @@ module.exports = function (knex) {
});
});

it('.timeout(ms, {cancel: true}) should throw error if cancellation cannot acquire connection', async function () {
it('.timeout(ms, {cancel: true}) should throw TimeoutError and cancel slow query in transaction', function () {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is a copy of ".timeout(ms, {cancel: true}) should throw TimeoutError and cancel slow query" test with just the query enveloped into a transaction.

You can use this test to confirm that canceling a timeouted query does not work inside a transaction on current master. The test will fail there. I've made as little as possible changes to the test, to make sure the cause is the transaction.

const driverName = knex.client.driverName;
if (driverName === 'sqlite3') {
return this.skip();
} //TODO -- No built-in support for sleeps
if (/redshift/.test(driverName)) {
return this.skip();
}

// There's unexpected behavior caused by knex releasing a connection back
// to the pool because of a timeout when a long query is still running.
// A subsequent query will acquire the connection (still in-use) and hang
// until the first query finishes. Setting a sleep time longer than the
// mocha timeout exposes this behavior.
const testQueries = {
pg: function () {
return knex.raw('SELECT pg_sleep(10)');
},
mysql: function () {
return knex.raw('SELECT SLEEP(10)');
},
mysql2: function () {
return knex.raw('SELECT SLEEP(10)');
},
mssql: function () {
return knex.raw("WAITFOR DELAY '00:00:10'");
},
oracledb: function () {
return knex.raw('begin dbms_lock.sleep(10); end;');
},
};

if (!Object.prototype.hasOwnProperty.call(testQueries, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const query = testQueries[driverName]();

function addTimeout() {
return query.timeout(200, { cancel: true });
}

// Only mysql/postgres query cancelling supported for now
if (
!_.startsWith(driverName, 'mysql') &&
!_.startsWith(driverName, 'pg')
) {
expect(addTimeout).to.throw(
'Query cancelling not supported for this dialect'
);
return; // TODO: Use `this.skip()` here?
}

const getProcessesQueries = {
pg: function () {
return knex.raw('SELECT * from pg_stat_activity');
},
mysql: function () {
return knex.raw('SHOW PROCESSLIST');
},
mysql2: function () {
return knex.raw('SHOW PROCESSLIST');
},
};

if (
!Object.prototype.hasOwnProperty.call(getProcessesQueries, driverName)
) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcessesQuery = getProcessesQueries[driverName]();

return knex.transaction((trx) => addTimeout().transacting(trx))
.then(function () {
expect(true).to.equal(false);
})
.catch(function (error) {
expect(_.pick(error, 'timeout', 'name', 'message')).to.deep.equal({
timeout: 200,
name: 'KnexTimeoutError',
message:
'Defined query timeout of 200ms exceeded when running query.',
});

// Ensure sleep command is removed.
// This query will hang if a connection gets released back to the pool
// too early.
// 50ms delay since killing query doesn't seem to have immediate effect to the process listing
return delay(50)
.then(function () {
return getProcessesQuery;
})
.then(function (results) {
let processes;
let sleepProcess;

if (_.startsWith(driverName, 'pg')) {
processes = results.rows;
sleepProcess = _.find(processes, { query: query.toString() });
} else {
processes = results[0];
sleepProcess = _.find(processes, {
Info: 'SELECT SLEEP(10)',
});
}
expect(sleepProcess).to.equal(undefined);
});
});
});

it('.timeout(ms, {cancel: true}) should cancel slow query even if connection pool is exhausted', async function () {
// Only mysql/postgres query cancelling supported for now
if (!isMysql(knex) && !isPostgreSQL(knex)) {
return this.skip();
Expand Down Expand Up @@ -909,15 +1020,45 @@ module.exports = function (knex) {

const query = testQueries[driverName]();

// We must use the original knex instance without the exhausted pool to list running queries
const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};

if (
!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)
) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcesses = getProcessesForDriver[driverName];

try {
await expect(
query.timeout(1, { cancel: true })
).to.eventually.be.rejected.and.to.deep.include({
timeout: 1,
const promise = query.timeout(50, { cancel: true }).then(_.identity)

await delay(10)
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(query.toString())

await expect(promise).to.eventually.be.rejected.and.to.deep.include({
timeout: 50,
name: 'KnexTimeoutError',
message:
'After query timeout of 1ms exceeded, cancelling of query failed.',
message: 'Defined query timeout of 50ms exceeded when running query.',
});

const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.not.include(query.toString())
} finally {
await knexDb.destroy();
}
Expand Down