From 6ae5fbf70dd87fe2380b74d83bc8a04bb8f447fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Ad=C3=A1mek?= Date: Fri, 14 May 2021 16:16:40 +0200 Subject: [PATCH] feat(sql): allow setting transaction isolation level Closes #819 BREAKING CHANGE: - `em.transactional()` signature has changed, the parameter is now options object - `em.begin()` signature has changed, the parameter is now options object --- docs/docs/transactions.md | 18 ++++++++++++ packages/core/src/EntityManager.ts | 12 ++++---- packages/core/src/connections/Connection.ts | 5 ++-- packages/core/src/enums.ts | 8 +++++ packages/core/src/unit-of-work/UnitOfWork.ts | 5 +++- packages/knex/src/AbstractSqlConnection.ts | 24 +++++++-------- packages/migrations/src/MigrationRunner.ts | 2 +- packages/mongodb/src/MongoConnection.ts | 20 ++++++------- tests/EntityManager.mongo.test.ts | 9 ++++++ tests/EntityManager.mysql.test.ts | 31 ++++++++++++++++---- tests/EntityManager.postgre.test.ts | 20 ++++++++++++- 11 files changed, 116 insertions(+), 38 deletions(-) diff --git a/docs/docs/transactions.md b/docs/docs/transactions.md index 38f0ab13b9e0..f1b03b5fc9e1 100644 --- a/docs/docs/transactions.md +++ b/docs/docs/transactions.md @@ -317,5 +317,23 @@ const res = await em.find(User, { name: 'Jon' }, { // for update of "e0" skip locked ``` +### Isolation levels + +We can set the transaction isolation levels: + +```ts +await orm.em.transactional(async em => { + // ... +}, { isolationLevel: IsolationLevel.READ_UNCOMMITTED }); +``` + +Available isolation levels: + +- `IsolationLevel.READ_UNCOMMITTED` +- `IsolationLevel.READ_COMMITTED` +- `IsolationLevel.SNAPSHOT` +- `IsolationLevel.REPEATABLE_READ` +- `IsolationLevel.SERIALIZABLE` + > This part of documentation is highly inspired by [doctrine internals docs](https://www.doctrine-project.org/projects/doctrine-orm/en/latest/reference/transactions-and-concurrency.html) > as the behaviour here is pretty much the same. diff --git a/packages/core/src/EntityManager.ts b/packages/core/src/EntityManager.ts index 2ba4699e4cce..09ee3ec0da9d 100644 --- a/packages/core/src/EntityManager.ts +++ b/packages/core/src/EntityManager.ts @@ -5,7 +5,7 @@ import { AssignOptions, EntityAssigner, EntityFactory, EntityLoader, EntityLoade import { UnitOfWork } from './unit-of-work'; import { CountOptions, DeleteOptions, EntityManagerType, FindOneOptions, FindOneOrFailOptions, FindOptions, IDatabaseDriver, UpdateOptions } from './drivers'; import { AnyEntity, Dictionary, EntityData, EntityDictionary, EntityDTO, EntityMetadata, EntityName, FilterDef, FilterQuery, GetRepository, Loaded, New, Populate, PopulateMap, PopulateOptions, Primary } from './typings'; -import { LoadStrategy, LockMode, QueryOrderMap, ReferenceType, SCALAR_TYPES } from './enums'; +import { IsolationLevel, LoadStrategy, LockMode, QueryOrderMap, ReferenceType, SCALAR_TYPES } from './enums'; import { MetadataStorage } from './metadata'; import { Transaction } from './connections'; import { EventManager, TransactionEventBroadcaster } from './events'; @@ -389,8 +389,10 @@ export class EntityManager { /** * Runs your callback wrapped inside a database transaction. */ - async transactional(cb: (em: D[typeof EntityManagerType]) => Promise, ctx = this.transactionContext): Promise { + async transactional(cb: (em: D[typeof EntityManagerType]) => Promise, options: { ctx?: Transaction; isolationLevel?: IsolationLevel } = {}): Promise { const em = this.fork(false); + /* istanbul ignore next */ + options.ctx = options.ctx ?? this.transactionContext; return TransactionContext.createAsync(em, async () => { return em.getConnection().transactional(async trx => { @@ -399,15 +401,15 @@ export class EntityManager { await em.flush(); return ret; - }, ctx, new TransactionEventBroadcaster(em)); + }, { ...options, eventBroadcaster: new TransactionEventBroadcaster(em) }); }); } /** * Starts new transaction bound to this EntityManager. Use `ctx` parameter to provide the parent when nesting transactions. */ - async begin(ctx?: Transaction): Promise { - this.transactionContext = await this.getConnection('write').begin(ctx, new TransactionEventBroadcaster(this)); + async begin(options: { ctx?: Transaction; isolationLevel?: IsolationLevel } = {}): Promise { + this.transactionContext = await this.getConnection('write').begin({ ...options, eventBroadcaster: new TransactionEventBroadcaster(this) }); } /** diff --git a/packages/core/src/connections/Connection.ts b/packages/core/src/connections/Connection.ts index 14ebc27ae879..8d631769190d 100644 --- a/packages/core/src/connections/Connection.ts +++ b/packages/core/src/connections/Connection.ts @@ -6,6 +6,7 @@ import { MetadataStorage } from '../metadata'; import { Dictionary } from '../typings'; import { Platform } from '../platforms/Platform'; import { TransactionEventBroadcaster } from '../events/TransactionEventBroadcaster'; +import { IsolationLevel } from '../enums'; export abstract class Connection { @@ -42,11 +43,11 @@ export abstract class Connection { */ abstract getDefaultClientUrl(): string; - async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + async transactional(cb: (trx: Transaction) => Promise, options?: { isolationLevel?: IsolationLevel; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster }): Promise { throw new Error(`Transactions are not supported by current driver`); } - async begin(ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + async begin(options?: { isolationLevel?: IsolationLevel; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster }): Promise { throw new Error(`Transactions are not supported by current driver`); } diff --git a/packages/core/src/enums.ts b/packages/core/src/enums.ts index 36b6f5590da4..d3518d325410 100644 --- a/packages/core/src/enums.ts +++ b/packages/core/src/enums.ts @@ -98,6 +98,14 @@ export enum LockMode { PESSIMISTIC_READ_OR_FAIL = 7, } +export enum IsolationLevel { + READ_UNCOMMITTED = 'read uncommitted', + READ_COMMITTED = 'read committed', + SNAPSHOT = 'snapshot', + REPEATABLE_READ = 'repeatable read', + SERIALIZABLE = 'serializable', +} + export enum EventType { onInit = 'onInit', beforeCreate = 'beforeCreate', diff --git a/packages/core/src/unit-of-work/UnitOfWork.ts b/packages/core/src/unit-of-work/UnitOfWork.ts index 664a9b3db1ea..77f82497ff64 100644 --- a/packages/core/src/unit-of-work/UnitOfWork.ts +++ b/packages/core/src/unit-of-work/UnitOfWork.ts @@ -236,7 +236,10 @@ export class UnitOfWork { const runInTransaction = !this.em.isInTransaction() && platform.supportsTransactions() && this.em.config.get('implicitTransactions'); if (runInTransaction) { - await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx), oldTx, new TransactionEventBroadcaster(this.em, this)); + await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx), { + ctx: oldTx, + eventBroadcaster: new TransactionEventBroadcaster(this.em, this), + }); } else { await this.persistToDatabase(groups, this.em.getTransactionContext()); } diff --git a/packages/knex/src/AbstractSqlConnection.ts b/packages/knex/src/AbstractSqlConnection.ts index b3e13f8c0fdb..58875c8a0e7e 100644 --- a/packages/knex/src/AbstractSqlConnection.ts +++ b/packages/knex/src/AbstractSqlConnection.ts @@ -1,7 +1,7 @@ import { knex, Knex } from 'knex'; import { readFile } from 'fs-extra'; import { - AnyEntity, Configuration, Connection, ConnectionOptions, EntityData, EventType, QueryResult, + AnyEntity, Configuration, Connection, ConnectionOptions, EntityData, EventType, IsolationLevel, QueryResult, Transaction, TransactionEventBroadcaster, Utils, } from '@mikro-orm/core'; import { AbstractSqlPlatform } from './AbstractSqlPlatform'; @@ -40,31 +40,31 @@ export abstract class AbstractSqlConnection extends Connection { } } - async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { - const trx = await this.begin(ctx, eventBroadcaster); + async transactional(cb: (trx: Transaction) => Promise, options: { isolationLevel?: IsolationLevel; ctx?: Knex.Transaction; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise { + const trx = await this.begin(options); try { const ret = await cb(trx); - await this.commit(trx, eventBroadcaster); + await this.commit(trx, options.eventBroadcaster); return ret; } catch (error) { - await this.rollback(trx, eventBroadcaster); + await this.rollback(trx, options.eventBroadcaster); throw error; } } - async begin(ctx?: Knex.Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { - if (!ctx) { - await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart); + async begin(options: { isolationLevel?: IsolationLevel; ctx?: Knex.Transaction; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise { + if (!options.ctx) { + await options.eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart); } - const trx = await (ctx || this.client).transaction(); + const trx = await (options.ctx || this.client).transaction(null, { isolationLevel: options.isolationLevel }); - if (!ctx) { - await eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, trx); + if (!options.ctx) { + await options.eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, trx); } else { - trx[parentTransactionSymbol] = ctx; + trx[parentTransactionSymbol] = options.ctx; } return trx; diff --git a/packages/migrations/src/MigrationRunner.ts b/packages/migrations/src/MigrationRunner.ts index 53bd9eb06f9b..36705bb2f0e0 100644 --- a/packages/migrations/src/MigrationRunner.ts +++ b/packages/migrations/src/MigrationRunner.ts @@ -23,7 +23,7 @@ export class MigrationRunner { migration.setTransactionContext(tx); const queries = await this.getQueries(migration, method); await Utils.runSerial(queries, sql => this.driver.execute(sql, undefined, 'all', tx)); - }, this.masterTransaction); + }, { ctx: this.masterTransaction }); } } diff --git a/packages/mongodb/src/MongoConnection.ts b/packages/mongodb/src/MongoConnection.ts index 0fadff0ebcfc..cf7ca4c95aa5 100644 --- a/packages/mongodb/src/MongoConnection.ts +++ b/packages/mongodb/src/MongoConnection.ts @@ -5,7 +5,7 @@ import { import { inspect } from 'util'; import { Connection, ConnectionConfig, QueryResult, Transaction, Utils, QueryOrder, QueryOrderMap, - FilterQuery, AnyEntity, EntityName, Dictionary, EntityData, TransactionEventBroadcaster, EventType, + FilterQuery, AnyEntity, EntityName, Dictionary, EntityData, TransactionEventBroadcaster, EventType, IsolationLevel, } from '@mikro-orm/core'; export class MongoConnection extends Connection { @@ -148,33 +148,33 @@ export class MongoConnection extends Connection { return this.runQuery('countDocuments', collection, undefined, where, ctx); } - async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { - const session = await this.begin(ctx, eventBroadcaster); + async transactional(cb: (trx: Transaction) => Promise, options: { isolationLevel?: IsolationLevel; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise { + const session = await this.begin(options); try { const ret = await cb(session); - await this.commit(session, eventBroadcaster); + await this.commit(session, options.eventBroadcaster); return ret; } catch (error) { - await this.rollback(session, eventBroadcaster); + await this.rollback(session, options.eventBroadcaster); throw error; } finally { session.endSession(); } } - async begin(ctx?: ClientSession, eventBroadcaster?: TransactionEventBroadcaster): Promise { - if (!ctx) { + async begin(options: { isolationLevel?: IsolationLevel; ctx?: ClientSession; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise { + if (!options.ctx) { /* istanbul ignore next */ - await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart); + await options.eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart); } - const session = ctx || this.client.startSession(); + const session = options.ctx || this.client.startSession(); session.startTransaction(); this.logQuery('db.begin();'); /* istanbul ignore next */ - await eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, session); + await options.eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, session); return session; } diff --git a/tests/EntityManager.mongo.test.ts b/tests/EntityManager.mongo.test.ts index 3f444718feee..1e12a5d4f391 100644 --- a/tests/EntityManager.mongo.test.ts +++ b/tests/EntityManager.mongo.test.ts @@ -517,6 +517,15 @@ describe('EntityManagerMongo', () => { expect(driver.getConnection().getCollection(BookTag).collectionName).toBe('book-tag'); expect(orm.em.getCollection(BookTag).collectionName).toBe('book-tag'); + const conn = driver.getConnection(); + const tx = await conn.begin(); + const first = await driver.nativeInsert(Publisher.name, { name: 'test 123', type: 'GLOBAL' }, tx); + await conn.commit(tx); + + await conn.transactional(async tx => { + await driver.nativeDelete(Publisher.name, first.insertId, tx); + }); + // multi inserts const res = await driver.nativeInsertMany(Publisher.name, [ { name: 'test 1', type: 'GLOBAL' }, diff --git a/tests/EntityManager.mysql.test.ts b/tests/EntityManager.mysql.test.ts index 7d6659591494..e8309f799081 100644 --- a/tests/EntityManager.mysql.test.ts +++ b/tests/EntityManager.mysql.test.ts @@ -7,7 +7,7 @@ import chalk from 'chalk'; import { Collection, Configuration, EntityManager, LockMode, MikroORM, QueryFlag, QueryOrder, Reference, Logger, ValidationError, wrap, UniqueConstraintViolationException, TableNotFoundException, TableExistsException, SyntaxErrorException, - NonUniqueFieldNameException, InvalidFieldNameException, expr, + NonUniqueFieldNameException, InvalidFieldNameException, expr, IsolationLevel, } from '@mikro-orm/core'; import { MySqlDriver, MySqlConnection } from '@mikro-orm/mysql'; import { Author2, Book2, BookTag2, FooBar2, FooBaz2, Publisher2, PublisherType, Test2 } from './entities-sql'; @@ -96,11 +96,11 @@ describe('EntityManagerMySql', () => { await expect(driver.ensureIndexes()).rejects.toThrowError('MySqlDriver does not use ensureIndexes'); const conn = driver.getConnection(); - await conn.transactional(async tx => { - await conn.execute('select 1', [], 'all', tx); - await conn.execute(orm.em.getKnex().raw('select 1'), [], 'all', tx); - await conn.execute(orm.em.getRepository(Author2).getKnex().raw('select 1'), [], 'all', tx); - }); + const tx = await conn.begin(); + await conn.execute('select 1', [], 'all', tx); + await conn.execute(orm.em.getKnex().raw('select 1'), [], 'all', tx); + await conn.execute(orm.em.getRepository(Author2).getKnex().raw('select 1'), [], 'all', tx); + await conn.commit(tx); // multi inserts const res = await driver.nativeInsertMany(Publisher2.name, [ @@ -334,6 +334,25 @@ describe('EntityManagerMySql', () => { } }); + test('transactions with isolation levels', async () => { + const mock = jest.fn(); + const logger = new Logger(mock, ['query']); + Object.assign(orm.config, { logger }); + + const god1 = new Author2('God1', 'hello@heaven1.god'); + try { + await orm.em.transactional(async em => { + await em.persistAndFlush(god1); + throw new Error(); // rollback the transaction + }, { isolationLevel: IsolationLevel.READ_UNCOMMITTED }); + } catch { } + + expect(mock.mock.calls[0][0]).toMatch('set transaction isolation level read uncommitted'); + expect(mock.mock.calls[1][0]).toMatch('begin'); + expect(mock.mock.calls[2][0]).toMatch('insert into `author2` (`created_at`, `email`, `name`, `terms_accepted`, `updated_at`) values (?, ?, ?, ?, ?)'); + expect(mock.mock.calls[3][0]).toMatch('rollback'); + }); + test('nested transactions with save-points', async () => { await orm.em.transactional(async em => { const god1 = new Author2('God1', 'hello1@heaven.god'); diff --git a/tests/EntityManager.postgre.test.ts b/tests/EntityManager.postgre.test.ts index d4e57abe90cb..a229f8befbf8 100644 --- a/tests/EntityManager.postgre.test.ts +++ b/tests/EntityManager.postgre.test.ts @@ -2,7 +2,7 @@ import { v4 } from 'uuid'; import { Collection, Configuration, EntityManager, LockMode, MikroORM, QueryFlag, QueryOrder, Reference, Logger, ValidationError, ChangeSetType, wrap, expr, UniqueConstraintViolationException, TableNotFoundException, NotNullConstraintViolationException, TableExistsException, SyntaxErrorException, - NonUniqueFieldNameException, InvalidFieldNameException, EventSubscriber, ChangeSet, AnyEntity, FlushEventArgs, LoadStrategy, + NonUniqueFieldNameException, InvalidFieldNameException, EventSubscriber, ChangeSet, AnyEntity, FlushEventArgs, LoadStrategy, IsolationLevel, } from '@mikro-orm/core'; import { PostgreSqlDriver, PostgreSqlConnection } from '@mikro-orm/postgresql'; import { Address2, Author2, Book2, BookTag2, FooBar2, FooBaz2, Publisher2, PublisherType, PublisherType2, Test2, Label2 } from './entities-sql'; @@ -214,6 +214,24 @@ describe('EntityManagerPostgre', () => { } }); + test('transactions with isolation levels', async () => { + const mock = jest.fn(); + const logger = new Logger(mock, ['query']); + Object.assign(orm.config, { logger }); + + const god1 = new Author2('God1', 'hello@heaven1.god'); + try { + await orm.em.transactional(async em => { + await em.persistAndFlush(god1); + throw new Error(); // rollback the transaction + }, { isolationLevel: IsolationLevel.READ_UNCOMMITTED }); + } catch { } + + expect(mock.mock.calls[0][0]).toMatch('begin isolation level read uncommitted'); + expect(mock.mock.calls[1][0]).toMatch('insert into "author2" ("created_at", "email", "name", "terms_accepted", "updated_at") values ($1, $2, $3, $4, $5) returning "id", "created_at", "updated_at", "age", "terms_accepted"'); + expect(mock.mock.calls[2][0]).toMatch('rollback'); + }); + test('nested transactions with save-points', async () => { await orm.em.transactional(async em => { const god1 = new Author2('God1', 'hello1@heaven.god');