Skip to content

Commit

Permalink
feat: add sequelize.withConnection, rename sequelize.set to `sequ…
Browse files Browse the repository at this point in the history
…elize.setSessionVariables` (#15851)
  • Loading branch information
ephys committed Mar 26, 2023
1 parent 64aa16e commit 336d712
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 76 deletions.
22 changes: 8 additions & 14 deletions packages/core/src/dialects/abstract/connection-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import cloneDeep from 'lodash/cloneDeep';
import semver from 'semver';
import { TimeoutError } from 'sequelize-pool';
import { ConnectionAcquireTimeoutError } from '../../errors';
import type { Dialect, Sequelize, ConnectionOptions, QueryRawOptions } from '../../sequelize.js';
import type { Dialect, Sequelize, ConnectionOptions } from '../../sequelize.js';
import { isNodeError } from '../../utils/check.js';
import * as deprecations from '../../utils/deprecations';
import { logger } from '../../utils/logger';
Expand Down Expand Up @@ -200,23 +200,17 @@ export class AbstractConnectionManager<TConnection extends Connection = Connecti
return;
}

// TODO: move to sequelize.queryRaw instead?
this.#versionPromise = (async () => {
try {
const connection = conn ?? await this._connect(this.config.replication.write || this.config);

// connection might have set databaseVersion value at initialization,
// avoiding a useless round trip
const options: QueryRawOptions = {
logging: () => {},
// TODO: add "connection" parameter to QueryRawOptions? Would require a way to reuse the same connection without it going back in the pool,
// something like sequelize.session(connection => {}).
// this would help for SET SESSION queries, like in https://github.com/sequelize/sequelize/discussions/15377
// @ts-expect-error -- HACK: Cheat .query to use our private connection
transaction: { connection },
};

const version = await this.sequelize.fetchDatabaseVersion(options);
const version = await this.sequelize.fetchDatabaseVersion({
logging: false,
// we must use the current connection for this, otherwise it will try to create a
// new connection, which will try to initialize the database version again, and loop forever
connection,
});

const parsedVersion = semver.coerce(version)?.version || version;
this.sequelize.options.databaseVersion = semver.valid(parsedVersion)
? parsedVersion
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/model-internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ Got ${NodeUtil.inspect(include)} instead`);
}

export function setTransactionFromCls(options: Transactionable, sequelize: Sequelize): void {
if (options.transaction === undefined) {
if (options.transaction && options.connection) {
throw new Error('You cannot use the "transaction" and "connection" options simultaneously. Please pass either one of them.');
}

if (options.transaction === undefined && options.connection === undefined) {
options.transaction = sequelize.getCurrentClsTransaction();
}
}
Expand Down
16 changes: 15 additions & 1 deletion packages/core/src/model.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
HasOneOptions,
} from './associations/index';
import type { Deferrable } from './deferrable';
import type { Connection } from './dialects/abstract/connection-manager.js';
import type { DataType, NormalizedDataType } from './dialects/abstract/data-types.js';
import type {
IndexOptions,
Expand Down Expand Up @@ -65,11 +66,24 @@ export interface Poolable {
export interface Transactionable {
/**
* The transaction in which this query must be run.
* Mutually exclusive with {@link Transactionable.connection}.
*
* If {@link Options.disableClsTransactions} has not been set to true, and a transaction is running in the current AsyncLocalStorage context,
* that transaction will be used, unless null or a Transaction is manually specified here.
* that transaction will be used, unless null or another Transaction is manually specified here.
*/
transaction?: Transaction | null | undefined;

/**
* The connection on which this query must be run.
* Mutually exclusive with {@link Transactionable.transaction}.
*
* Can be used to ensure that a query is run on the same connection as a previous query, which is useful when
* configuring session options.
*
* Specifying this option takes precedence over CLS Transactions. If a transaction is running in the current
* AsyncLocalStorage context, it will be ignored in favor of the specified connection.
*/
connection?: Connection | null | undefined;
}

export interface SearchPathable {
Expand Down
27 changes: 25 additions & 2 deletions packages/core/src/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -1789,6 +1789,10 @@ ${associationOwner._getAssociationDebugList()}`);
);
}

if (options.connection) {
throw new Error('findOrCreate does not support specifying which connection must be used, because findOrCreate must run in a transaction.');
}

options = { ...options };

const modelDefinition = this.modelDefinition;
Expand Down Expand Up @@ -2186,6 +2190,7 @@ ${associationOwner._getAssociationDebugList()}`);
const includeOptions = _(cloneDeep(include))
.omit(['association'])
.defaults({
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
})
Expand Down Expand Up @@ -2328,6 +2333,7 @@ ${associationOwner._getAssociationDebugList()}`);
const includeOptions = _(cloneDeep(include))
.omit(['association'])
.defaults({
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
})
Expand Down Expand Up @@ -2370,6 +2376,7 @@ ${associationOwner._getAssociationDebugList()}`);
const throughOptions = _(cloneDeep(include))
.omit(['association', 'attributes'])
.defaults({
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
})
Expand Down Expand Up @@ -2475,7 +2482,13 @@ ${associationOwner._getAssociationDebugList()}`);
let instances;
// Get daos and run beforeDestroy hook on each record individually
if (options.individualHooks) {
instances = await this.findAll({ where: options.where, transaction: options.transaction, logging: options.logging, benchmark: options.benchmark });
instances = await this.findAll({
where: options.where,
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
benchmark: options.benchmark,
});

await Promise.all(instances.map(instance => {
return this.hooks.runAsync('beforeDestroy', instance, options);
Expand Down Expand Up @@ -2553,7 +2566,14 @@ ${associationOwner._getAssociationDebugList()}`);
let instances;
// Get daos and run beforeRestore hook on each record individually
if (options.individualHooks) {
instances = await this.findAll({ where: options.where, transaction: options.transaction, logging: options.logging, benchmark: options.benchmark, paranoid: false });
instances = await this.findAll({
where: options.where,
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
benchmark: options.benchmark,
paranoid: false,
});

await Promise.all(instances.map(instance => {
return this.hooks.runAsync('beforeRestore', instance, options);
Expand Down Expand Up @@ -2681,6 +2701,7 @@ ${associationOwner._getAssociationDebugList()}`);
if (options.individualHooks) {
instances = await this.findAll({
where: options.where,
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
benchmark: options.benchmark,
Expand Down Expand Up @@ -3670,6 +3691,7 @@ Instead of specifying a Model, either:
const includeOptions = _(cloneDeep(include))
.omit(['association'])
.defaults({
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
parentRecord: this,
Expand Down Expand Up @@ -3759,6 +3781,7 @@ Instead of specifying a Model, either:
const includeOptions = _(cloneDeep(include))
.omit(['association'])
.defaults({
connection: options.connection,
transaction: options.transaction,
logging: options.logging,
parentRecord: this,
Expand Down
44 changes: 43 additions & 1 deletion packages/core/src/sequelize-typescript.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { initDecoratedAssociations } from './decorators/legacy/associations.js';
import { initDecoratedModel } from './decorators/shared/model.js';
import type { Connection } from './dialects/abstract/connection-manager.js';
import type { Connection, AbstractConnectionManager, GetConnectionOptions } from './dialects/abstract/connection-manager.js';
import type { AbstractQuery } from './dialects/abstract/query.js';
import {
legacyBuildHasHook,
Expand All @@ -17,6 +17,7 @@ import { validModelHooks } from './model-hooks.js';
import type { ConnectionOptions, Options, Sequelize } from './sequelize.js';
import type { TransactionOptions } from './transaction.js';
import { Transaction } from './transaction.js';
import type { PartialBy } from './utils/types.js';
import type { ModelAttributes, ModelOptions, ModelStatic, QueryOptions, SyncOptions } from '.';

export interface SequelizeHooks extends ModelHooks {
Expand Down Expand Up @@ -75,6 +76,15 @@ export interface StaticSequelizeHooks {
afterInit(sequelize: Sequelize): void;
}

export interface WithConnectionOptions extends PartialBy<GetConnectionOptions, 'type'> {
/**
* Close the connection when the callback finishes instead of returning it to the pool.
* This is useful if you want to ensure that the connection is not reused,
* for example if you ran queries that changed session options.
*/
destroyConnection?: boolean;
}

const staticSequelizeHooks = new HookHandlerBuilder<StaticSequelizeHooks>([
'beforeInit', 'afterInit',
]);
Expand All @@ -89,13 +99,16 @@ const instanceSequelizeHooks = new HookHandlerBuilder<SequelizeHooks>([
]);

type TransactionCallback<T> = (t: Transaction) => PromiseLike<T> | T;
type SessionCallback<T> = (connection: Connection) => PromiseLike<T> | T;

// DO NOT MAKE THIS CLASS PUBLIC!
/**
* This is a temporary class used to progressively migrate the Sequelize class to TypeScript by slowly moving its functions here.
* Always use {@link Sequelize} instead.
*/
export abstract class SequelizeTypeScript {
declare connectionManager: AbstractConnectionManager;

static get hooks(): HookHandler<StaticSequelizeHooks> {
return staticSequelizeHooks.getFor(this);
}
Expand Down Expand Up @@ -342,4 +355,33 @@ export abstract class SequelizeTypeScript {

return transaction;
}

async withConnection<T>(options: WithConnectionOptions, callback: SessionCallback<T>): Promise<T>;
async withConnection<T>(callback: SessionCallback<T>): Promise<T>;
async withConnection<T>(
optionsOrCallback: SessionCallback<T> | WithConnectionOptions,
maybeCallback?: SessionCallback<T>,
): Promise<T> {
let options: WithConnectionOptions;
let callback: SessionCallback<T>;
if (typeof optionsOrCallback === 'function') {
callback = optionsOrCallback;
options = { type: 'write' };
} else {
callback = maybeCallback!;
options = { type: 'write', ...optionsOrCallback };
}

const connection = await this.connectionManager.getConnection(options as GetConnectionOptions);

try {
return await callback(connection);
} finally {
if (options.destroyConnection) {
await this.connectionManager.destroyConnection(connection);
} else {
this.connectionManager.releaseConnection(connection);
}
}
}
}
4 changes: 2 additions & 2 deletions packages/core/src/sequelize.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ export interface DialectOptions {
options?: string | Record<string, unknown>;
}

export interface QueryOptionsTransactionRequired { }
export interface SetSessionVariablesOptions extends Omit<QueryOptions, 'raw' | 'plain' | 'type'> { }

export type BindOrReplacements = { [key: string]: unknown } | unknown[];
type FieldMap = { [key: string]: string };
Expand Down Expand Up @@ -976,7 +976,7 @@ export class Sequelize extends SequelizeTypeScript {
* @param variables object with multiple variables.
* @param options Query options.
*/
set(variables: object, options: QueryOptionsTransactionRequired): Promise<unknown>;
setSessionVariables(variables: object, options?: SetSessionVariablesOptions): Promise<unknown>;

/**
* Escape value.
Expand Down
38 changes: 18 additions & 20 deletions packages/core/src/sequelize.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Fn, fn } from './expression-builders/fn.js';
import { json } from './expression-builders/json.js';
import { Literal, literal } from './expression-builders/literal.js';
import { Where, where } from './expression-builders/where.js';
import { setTransactionFromCls } from './model-internals.js';
import { SequelizeTypeScript } from './sequelize-typescript';
import { withSqliteForeignKeysOff } from './dialects/sqlite/sqlite-utils';
import { isString } from './utils/check.js';
Expand Down Expand Up @@ -705,19 +706,18 @@ Use Sequelize#query if you wish to use replacements.`);
}
};

setTransactionFromCls(options, this);
const retryOptions = { ...this.options.retry, ...options.retry };

return await retry(async () => {
if (options.transaction === undefined) {
options.transaction = this.getCurrentClsTransaction();
}

checkTransaction();

const connection = await (options.transaction ? options.transaction.connection : this.connectionManager.getConnection({
useMaster: options.useMaster,
type: options.type === 'SELECT' ? 'read' : 'write',
}));
const connection = options.transaction ? options.transaction.connection
: options.connection ? options.connection
: await this.connectionManager.getConnection({
useMaster: options.useMaster,
type: options.type === 'SELECT' ? 'read' : 'write',
});

if (this.options.dialect === 'db2' && options.alter && options.alter.drop === false) {
connection.dropTable = false;
Expand All @@ -732,7 +732,7 @@ Use Sequelize#query if you wish to use replacements.`);
return await query.run(sql, bindParameters, { minifyAliases: options.minifyAliases });
} finally {
await this.hooks.runAsync('afterQuery', options, query);
if (!options.transaction) {
if (!options.transaction && !options.connection) {
this.connectionManager.releaseConnection(connection);
}
}
Expand All @@ -743,25 +743,23 @@ Use Sequelize#query if you wish to use replacements.`);
* Execute a query which would set an environment or user variable. The variables are set per connection, so this function needs a transaction.
* Only works for MySQL or MariaDB.
*
* @param {object} variables Object with multiple variables.
* @param {object} [options] query options.
* @param {Transaction} [options.transaction] The transaction that the query should be executed under
*
* @memberof Sequelize
* @param {object} variables Object with multiple variables.
* @param {object} [options] query options.
*
* @returns {Promise}
*/
async set(variables, options) {

async setSessionVariables(variables, options) {
// Prepare options
options = { ...this.options.set, ...typeof options === 'object' && options };
options = { ...this.options.setSessionVariables, ...options };

if (!['mysql', 'mariadb'].includes(this.options.dialect)) {
throw new Error('sequelize.set is only supported for mysql or mariadb');
throw new Error('sequelize.setSessionVariables is only supported for mysql or mariadb');
}

if (!options.transaction || !(options.transaction instanceof Transaction)) {
throw new TypeError('options.transaction is required');
setTransactionFromCls(options, this);

if ((!options.transaction || !(options.transaction instanceof Transaction)) && (!options.connection)) {
throw new Error('You must specify either options.transaction or options.connection, as sequelize.setSessionVariables is used to set the session options of a connection');
}

// Override some options, since this isn't a SELECT
Expand Down
35 changes: 0 additions & 35 deletions packages/core/test/integration/sequelize.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,41 +222,6 @@ describe(Support.getTestDialectTeaser('Sequelize'), () => {
});
});

if (['mysql', 'mariadb'].includes(dialect)) {
describe('set', () => {
it('should return an promised error if transaction isn\'t defined', async function () {
await expect(this.sequelize.set({ foo: 'bar' }))
.to.be.rejectedWith(TypeError, 'options.transaction is required');
});

it('one value', async function () {
const t = await this.sequelize.startUnmanagedTransaction();
this.t = t;
await this.sequelize.set({ foo: 'bar' }, { transaction: t });
const data = await this.sequelize.query('SELECT @foo as `foo`', { plain: true, transaction: this.t });
expect(data).to.be.ok;
expect(data.foo).to.be.equal('bar');
await this.t.commit();
});

it('multiple values', async function () {
const t = await this.sequelize.startUnmanagedTransaction();
this.t = t;

await this.sequelize.set({
foo: 'bar',
foos: 'bars',
}, { transaction: t });

const data = await this.sequelize.query('SELECT @foo as `foo`, @foos as `foos`', { plain: true, transaction: this.t });
expect(data).to.be.ok;
expect(data.foo).to.be.equal('bar');
expect(data.foos).to.be.equal('bars');
await this.t.commit();
});
});
}

describe('define', () => {
it('adds a new dao to the dao manager', function () {
const count = this.sequelize.modelManager.all.length;
Expand Down

0 comments on commit 336d712

Please sign in to comment.