Skip to content

Commit

Permalink
Fix timeout method (#4324)
Browse files Browse the repository at this point in the history
  • Loading branch information
martinmacko47 committed Mar 21, 2021
1 parent 910c009 commit 1744c8c
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 22 deletions.
10 changes: 6 additions & 4 deletions lib/dialects/mysql/index.js
Expand Up @@ -162,16 +162,18 @@ class Client_MySQL extends Client {
}

async cancelQuery(connectionToKill) {
const conn = await this.acquireConnection();
const conn = await this.acquireRawConnection();
try {
return await this.query(conn, {
method: 'raw',
return await this._query(conn, {
sql: 'KILL QUERY ?',
bindings: [connectionToKill.threadId],
options: {},
});
} finally {
await this.releaseConnection(conn);
await this.destroyRawConnection(conn);
if (conn.__knex__disposed) {
this.logger.warn(`Connection Error: ${conn.__knex__disposed}`);
}
}
}
}
Expand Down
17 changes: 6 additions & 11 deletions lib/dialects/postgres/index.js
Expand Up @@ -237,24 +237,19 @@ class Client_PG extends Client {
}

async cancelQuery(connectionToKill) {
// Error out if we can't acquire connection in time.
// Purposely not putting timeout on `pg_cancel_backend` execution because erroring
// early there would release the `connectionToKill` back to the pool with
// a `KILL QUERY` command yet to finish.
const conn = await this.acquireConnection();
const conn = await this.acquireRawConnection();

try {
return await this._wrappedCancelQueryCall(conn, connectionToKill);
} finally {
// NOT returning this promise because we want to release the connection
// in a non-blocking fashion
this.releaseConnection(conn);
await this.destroyRawConnection(conn).catch((err) => {
this.logger.warn(`Connection Error: ${err}`);
});
}
}
_wrappedCancelQueryCall(conn, connectionToKill) {
return this.query(conn, {
method: 'raw',
sql: 'SELECT pg_cancel_backend(?);',
return this._query(conn, {
sql: 'SELECT pg_cancel_backend($1);',
bindings: [connectionToKill.processID],
options: {},
});
Expand Down
155 changes: 148 additions & 7 deletions test/integration/query/additional.js
Expand Up @@ -867,7 +867,118 @@ module.exports = function (knex) {
});
});

it('.timeout(ms, {cancel: true}) should throw error if cancellation cannot acquire connection', async function () {
it('.timeout(ms, {cancel: true}) should throw TimeoutError and cancel slow query in transaction', function () {
const driverName = knex.client.driverName;
if (driverName === 'sqlite3') {
return this.skip();
} //TODO -- No built-in support for sleeps
if (/redshift/.test(driverName)) {
return this.skip();
}

// There's unexpected behavior caused by knex releasing a connection back
// to the pool because of a timeout when a long query is still running.
// A subsequent query will acquire the connection (still in-use) and hang
// until the first query finishes. Setting a sleep time longer than the
// mocha timeout exposes this behavior.
const testQueries = {
pg: function () {
return knex.raw('SELECT pg_sleep(10)');
},
mysql: function () {
return knex.raw('SELECT SLEEP(10)');
},
mysql2: function () {
return knex.raw('SELECT SLEEP(10)');
},
mssql: function () {
return knex.raw("WAITFOR DELAY '00:00:10'");
},
oracledb: function () {
return knex.raw('begin dbms_lock.sleep(10); end;');
},
};

if (!Object.prototype.hasOwnProperty.call(testQueries, driverName)) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const query = testQueries[driverName]();

function addTimeout() {
return query.timeout(200, { cancel: true });
}

// Only mysql/postgres query cancelling supported for now
if (
!_.startsWith(driverName, 'mysql') &&
!_.startsWith(driverName, 'pg')
) {
expect(addTimeout).to.throw(
'Query cancelling not supported for this dialect'
);
return; // TODO: Use `this.skip()` here?
}

const getProcessesQueries = {
pg: function () {
return knex.raw('SELECT * from pg_stat_activity');
},
mysql: function () {
return knex.raw('SHOW PROCESSLIST');
},
mysql2: function () {
return knex.raw('SHOW PROCESSLIST');
},
};

if (
!Object.prototype.hasOwnProperty.call(getProcessesQueries, driverName)
) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcessesQuery = getProcessesQueries[driverName]();

return knex.transaction((trx) => addTimeout().transacting(trx))
.then(function () {
expect(true).to.equal(false);
})
.catch(function (error) {
expect(_.pick(error, 'timeout', 'name', 'message')).to.deep.equal({
timeout: 200,
name: 'KnexTimeoutError',
message:
'Defined query timeout of 200ms exceeded when running query.',
});

// Ensure sleep command is removed.
// This query will hang if a connection gets released back to the pool
// too early.
// 50ms delay since killing query doesn't seem to have immediate effect to the process listing
return delay(50)
.then(function () {
return getProcessesQuery;
})
.then(function (results) {
let processes;
let sleepProcess;

if (_.startsWith(driverName, 'pg')) {
processes = results.rows;
sleepProcess = _.find(processes, { query: query.toString() });
} else {
processes = results[0];
sleepProcess = _.find(processes, {
Info: 'SELECT SLEEP(10)',
});
}
expect(sleepProcess).to.equal(undefined);
});
});
});

it('.timeout(ms, {cancel: true}) should cancel slow query even if connection pool is exhausted', async function () {
// Only mysql/postgres query cancelling supported for now
if (!isMysql(knex) && !isPostgreSQL(knex)) {
return this.skip();
Expand Down Expand Up @@ -907,15 +1018,45 @@ module.exports = function (knex) {

const query = testQueries[driverName]();

// We must use the original knex instance without the exhausted pool to list running queries
const getProcessesForDriver = {
pg: async () => {
const results = await knex.raw('SELECT * from pg_stat_activity');
return _.map(_.filter(results.rows, {state: 'active'}), 'query');
},
mysql: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
mysql2: async () => {
const results = await knex.raw('SHOW PROCESSLIST');
return _.map(results[0], 'Info');
},
};

if (
!Object.prototype.hasOwnProperty.call(getProcessesForDriver, driverName)
) {
throw new Error('Missing test query for driverName: ' + driverName);
}

const getProcesses = getProcessesForDriver[driverName];

try {
await expect(
query.timeout(1, { cancel: true })
).to.eventually.be.rejected.and.to.deep.include({
timeout: 1,
const promise = query.timeout(50, { cancel: true }).then(_.identity)

await delay(10)
const processesBeforeTimeout = await getProcesses();
expect(processesBeforeTimeout).to.include(query.toString())

await expect(promise).to.eventually.be.rejected.and.to.deep.include({
timeout: 50,
name: 'KnexTimeoutError',
message:
'After query timeout of 1ms exceeded, cancelling of query failed.',
message: 'Defined query timeout of 50ms exceeded when running query.',
});

const processesAfterTimeout = await getProcesses();
expect(processesAfterTimeout).to.not.include(query.toString())
} finally {
await knexDb.destroy();
}
Expand Down

0 comments on commit 1744c8c

Please sign in to comment.