Skip to content

Commit

Permalink
fix: kill connection on commit/rollback error (v7) (#14574)
Browse files Browse the repository at this point in the history
  • Loading branch information
ephys committed Jun 20, 2022
1 parent 4470fcb commit 0610358
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 184 deletions.
12 changes: 11 additions & 1 deletion src/dialects/abstract/connection-manager.ts
Expand Up @@ -255,11 +255,21 @@ export class AbstractConnectionManager<TConnection extends Connection = Connecti
*
* @param connection
*/
async releaseConnection(connection: TConnection) {
releaseConnection(connection: TConnection) {
this.pool.release(connection);
debug('connection released');
}

/**
* Destroys a pooled connection and removes it from the pool.
*
* @param connection
*/
async destroyConnection(connection: TConnection) {
await this.pool.destroy(connection);
debug(`connection ${connection.uuid} destroyed`);
}

/**
* Call dialect library to get connection
*
Expand Down
7 changes: 6 additions & 1 deletion src/sequelize.d.ts
Expand Up @@ -1282,10 +1282,15 @@ export class Sequelize extends Hooks {
getDatabaseName(): string;

/**
* Returns an instance of QueryInterface.
* Returns the dialect-dependant QueryInterface instance.
*/
getQueryInterface(): QueryInterface;

/**
* The QueryInterface instance, dialect dependant.
*/
queryInterface: QueryInterface;

/**
* Define a new model, representing a table in the DB.
*
Expand Down
27 changes: 13 additions & 14 deletions src/sequelize.js
Expand Up @@ -2,6 +2,7 @@

import isPlainObject from 'lodash/isPlainObject';
import { withSqliteForeignKeysOff } from './dialects/sqlite/sqlite-utils';
import { AggregateError } from './errors';
import { isString } from './utils';
import { noSequelizeDataType } from './utils/deprecations';
import { isSameInitialModel, isModelStatic } from './utils/model-utils';
Expand Down Expand Up @@ -687,7 +688,7 @@ Use Sequelize#query if you wish to use replacements.`);
} finally {
await this.runHooks('afterQuery', options, query);
if (!options.transaction) {
await this.connectionManager.releaseConnection(connection);
this.connectionManager.releaseConnection(connection);
}
}
}, retryOptions);
Expand Down Expand Up @@ -1102,33 +1103,31 @@ Use Sequelize#query if you wish to use replacements.`);
const transaction = new Transaction(this, options);

if (!autoCallback) {
await transaction.prepareEnvironment(false);
await transaction.prepareEnvironment(/* cls */ false);

return transaction;
}

// autoCallback provided
return Sequelize._clsRun(async () => {
try {
await transaction.prepareEnvironment();
const result = await autoCallback(transaction);
await transaction.commit();
await transaction.prepareEnvironment(/* cls */ true);

return await result;
let result;
try {
result = await autoCallback(transaction);
} catch (error) {
try {
if (!transaction.finished) {
await transaction.rollback();
} else {
// release the connection, even if we don't need to rollback
await transaction.cleanup();
}
await transaction.rollback();
} catch {
// ignore
// ignore, because 'rollback' will already print the error before killing the connection
}

throw error;
}

await transaction.commit();

return result;
});
}

Expand Down
58 changes: 36 additions & 22 deletions src/transaction.ts
@@ -1,6 +1,6 @@
import assert from 'assert';
import type { Class } from 'type-fest';
import type { Logging, Sequelize, Deferrable, PartlyRequired, Connection } from './index.js';
import type { Logging, Deferrable, PartlyRequired, Connection, Sequelize } from './index.js';

type AfterTransactionCommitCallback = (transaction: Transaction) => void | Promise<void>;

Expand Down Expand Up @@ -73,10 +73,15 @@ export class Transaction {
}

try {
return await this.sequelize.getQueryInterface().commitTransaction(this, this.options);
await this.sequelize.getQueryInterface().commitTransaction(this, this.options);
this.cleanup();
} catch (error) {
console.warn(`Committing transaction ${this.id} failed with error ${error instanceof Error ? JSON.stringify(error.message) : String(error)}. We are killing its connection as it is now in an undetermined state.`);
await this.forceCleanup();

throw error;
} finally {
this.finished = 'commit';
await this.cleanup();
for (const hook of this._afterCommitHooks) {
// eslint-disable-next-line no-await-in-loop -- sequentially call hooks
await Reflect.apply(hook, this, [this]);
Expand All @@ -97,12 +102,17 @@ export class Transaction {
}

try {
return await this
await this
.sequelize
.getQueryInterface()
.rollbackTransaction(this, this.options);
} finally {
await this.cleanup();

this.cleanup();
} catch (error) {
console.warn(`Rolling back transaction ${this.id} failed with error ${error instanceof Error ? JSON.stringify(error.message) : String(error)}. We are killing its connection as it is now in an undetermined state.`);
await this.forceCleanup();

throw error;
}
}

Expand All @@ -112,11 +122,7 @@ export class Transaction {
*
* @param useCLS Defaults to true: Use CLS (Continuation Local Storage) with Sequelize. With CLS, all queries within the transaction callback will automatically receive the transaction object.
*/
async prepareEnvironment(useCLS?: boolean) {
if (useCLS === undefined) {
useCLS = true;
}

async prepareEnvironment(useCLS = true) {
let connection;
if (this.parent) {
connection = this.parent.connection;
Expand All @@ -136,15 +142,17 @@ export class Transaction {
let result;
try {
await this.begin();

result = await this.setDeferrable();
} catch (error) {
try {
result = await this.rollback();
await this.rollback();
} finally {
throw error; // eslint-disable-line no-unsafe-finally -- while this will mask the error thrown by `rollback`, the previous error is more important.
}
}

// TODO (@ephys) [>=7.0.0]: move this inside of sequelize.transaction, remove parameter -- during the move to built-in AsyncLocalStorage
if (useCLS && this.sequelize.Sequelize._cls) {
this.sequelize.Sequelize._cls.set('transaction', this);
}
Expand Down Expand Up @@ -175,26 +183,32 @@ export class Transaction {
return queryInterface.startTransaction(this, this.options);
}

async cleanup(): Promise<void> {
cleanup(): void {
// Don't release the connection if there's a parent transaction or
// if we've already cleaned up
if (this.parent || this.connection?.uuid === undefined) {
return;
}

this._clearCls();
const res = this.sequelize.connectionManager.releaseConnection(this.connection);
this.sequelize.connectionManager.releaseConnection(this.connection);
this.connection.uuid = undefined;

await res;
}

_clearCls() {
const cls = this.sequelize.Sequelize._cls;

if (cls && cls.get('transaction') === this) {
cls.set('transaction', null);
/**
* Kills the connection this transaction uses.
* Used as a last resort, for instance because COMMIT or ROLLBACK resulted in an error
* and the transaction is left in a broken state,
* and releasing the connection to the pool would be dangerous.
*/
async forceCleanup() {
// Don't release the connection if there's a parent transaction or
// if we've already cleaned up
if (this.parent || this.connection?.uuid === undefined) {
return;
}

await this.sequelize.connectionManager.destroyConnection(this.connection);
this.connection.uuid = undefined;
}

/**
Expand Down
144 changes: 0 additions & 144 deletions test/integration/sequelize.transaction.test.js

This file was deleted.

0 comments on commit 0610358

Please sign in to comment.