Skip to content

Commit

Permalink
feat(sql): allow setting transaction isolation level
Browse files Browse the repository at this point in the history
Closes #819

BREAKING CHANGE:
- `em.transactional()` signature has changed, the parameter is now options object
- `em.begin()` signature has changed, the parameter is now options object
  • Loading branch information
B4nan committed May 14, 2021
1 parent 91d6a7a commit 6ae5fbf
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 38 deletions.
18 changes: 18 additions & 0 deletions docs/docs/transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,23 @@ const res = await em.find(User, { name: 'Jon' }, {
// for update of "e0" skip locked
```

### Isolation levels

We can set the transaction isolation levels:

```ts
await orm.em.transactional(async em => {
// ...
}, { isolationLevel: IsolationLevel.READ_UNCOMMITTED });
```

Available isolation levels:

- `IsolationLevel.READ_UNCOMMITTED`
- `IsolationLevel.READ_COMMITTED`
- `IsolationLevel.SNAPSHOT`
- `IsolationLevel.REPEATABLE_READ`
- `IsolationLevel.SERIALIZABLE`

> This part of documentation is highly inspired by [doctrine internals docs](https://www.doctrine-project.org/projects/doctrine-orm/en/latest/reference/transactions-and-concurrency.html)
> as the behaviour here is pretty much the same.
12 changes: 7 additions & 5 deletions packages/core/src/EntityManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { AssignOptions, EntityAssigner, EntityFactory, EntityLoader, EntityLoade
import { UnitOfWork } from './unit-of-work';
import { CountOptions, DeleteOptions, EntityManagerType, FindOneOptions, FindOneOrFailOptions, FindOptions, IDatabaseDriver, UpdateOptions } from './drivers';
import { AnyEntity, Dictionary, EntityData, EntityDictionary, EntityDTO, EntityMetadata, EntityName, FilterDef, FilterQuery, GetRepository, Loaded, New, Populate, PopulateMap, PopulateOptions, Primary } from './typings';
import { LoadStrategy, LockMode, QueryOrderMap, ReferenceType, SCALAR_TYPES } from './enums';
import { IsolationLevel, LoadStrategy, LockMode, QueryOrderMap, ReferenceType, SCALAR_TYPES } from './enums';
import { MetadataStorage } from './metadata';
import { Transaction } from './connections';
import { EventManager, TransactionEventBroadcaster } from './events';
Expand Down Expand Up @@ -389,8 +389,10 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
/**
* Runs your callback wrapped inside a database transaction.
*/
async transactional<T>(cb: (em: D[typeof EntityManagerType]) => Promise<T>, ctx = this.transactionContext): Promise<T> {
async transactional<T>(cb: (em: D[typeof EntityManagerType]) => Promise<T>, options: { ctx?: Transaction; isolationLevel?: IsolationLevel } = {}): Promise<T> {
const em = this.fork(false);
/* istanbul ignore next */
options.ctx = options.ctx ?? this.transactionContext;

return TransactionContext.createAsync(em, async () => {
return em.getConnection().transactional(async trx => {
Expand All @@ -399,15 +401,15 @@ export class EntityManager<D extends IDatabaseDriver = IDatabaseDriver> {
await em.flush();

return ret;
}, ctx, new TransactionEventBroadcaster(em));
}, { ...options, eventBroadcaster: new TransactionEventBroadcaster(em) });
});
}

/**
* Starts new transaction bound to this EntityManager. Use `ctx` parameter to provide the parent when nesting transactions.
*/
async begin(ctx?: Transaction): Promise<void> {
this.transactionContext = await this.getConnection('write').begin(ctx, new TransactionEventBroadcaster(this));
async begin(options: { ctx?: Transaction; isolationLevel?: IsolationLevel } = {}): Promise<void> {
this.transactionContext = await this.getConnection('write').begin({ ...options, eventBroadcaster: new TransactionEventBroadcaster(this) });
}

/**
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/connections/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { MetadataStorage } from '../metadata';
import { Dictionary } from '../typings';
import { Platform } from '../platforms/Platform';
import { TransactionEventBroadcaster } from '../events/TransactionEventBroadcaster';
import { IsolationLevel } from '../enums';

export abstract class Connection {

Expand Down Expand Up @@ -42,11 +43,11 @@ export abstract class Connection {
*/
abstract getDefaultClientUrl(): string;

async transactional<T>(cb: (trx: Transaction) => Promise<T>, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise<T> {
async transactional<T>(cb: (trx: Transaction) => Promise<T>, options?: { isolationLevel?: IsolationLevel; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster }): Promise<T> {
throw new Error(`Transactions are not supported by current driver`);
}

async begin(ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise<unknown> {
async begin(options?: { isolationLevel?: IsolationLevel; ctx?: Transaction; eventBroadcaster?: TransactionEventBroadcaster }): Promise<Transaction> {
throw new Error(`Transactions are not supported by current driver`);
}

Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ export enum LockMode {
PESSIMISTIC_READ_OR_FAIL = 7,
}

export enum IsolationLevel {
READ_UNCOMMITTED = 'read uncommitted',
READ_COMMITTED = 'read committed',
SNAPSHOT = 'snapshot',
REPEATABLE_READ = 'repeatable read',
SERIALIZABLE = 'serializable',
}

export enum EventType {
onInit = 'onInit',
beforeCreate = 'beforeCreate',
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/unit-of-work/UnitOfWork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ export class UnitOfWork {
const runInTransaction = !this.em.isInTransaction() && platform.supportsTransactions() && this.em.config.get('implicitTransactions');

if (runInTransaction) {
await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx), oldTx, new TransactionEventBroadcaster(this.em, this));
await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx), {
ctx: oldTx,
eventBroadcaster: new TransactionEventBroadcaster(this.em, this),
});
} else {
await this.persistToDatabase(groups, this.em.getTransactionContext());
}
Expand Down
24 changes: 12 additions & 12 deletions packages/knex/src/AbstractSqlConnection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { knex, Knex } from 'knex';
import { readFile } from 'fs-extra';
import {
AnyEntity, Configuration, Connection, ConnectionOptions, EntityData, EventType, QueryResult,
AnyEntity, Configuration, Connection, ConnectionOptions, EntityData, EventType, IsolationLevel, QueryResult,
Transaction, TransactionEventBroadcaster, Utils,
} from '@mikro-orm/core';
import { AbstractSqlPlatform } from './AbstractSqlPlatform';
Expand Down Expand Up @@ -40,31 +40,31 @@ export abstract class AbstractSqlConnection extends Connection {
}
}

async transactional<T>(cb: (trx: Transaction<Knex.Transaction>) => Promise<T>, ctx?: Transaction<Knex.Transaction>, eventBroadcaster?: TransactionEventBroadcaster): Promise<T> {
const trx = await this.begin(ctx, eventBroadcaster);
async transactional<T>(cb: (trx: Transaction<Knex.Transaction>) => Promise<T>, options: { isolationLevel?: IsolationLevel; ctx?: Knex.Transaction; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise<T> {
const trx = await this.begin(options);

try {
const ret = await cb(trx);
await this.commit(trx, eventBroadcaster);
await this.commit(trx, options.eventBroadcaster);

return ret;
} catch (error) {
await this.rollback(trx, eventBroadcaster);
await this.rollback(trx, options.eventBroadcaster);
throw error;
}
}

async begin(ctx?: Knex.Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise<Knex.Transaction> {
if (!ctx) {
await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart);
async begin(options: { isolationLevel?: IsolationLevel; ctx?: Knex.Transaction; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise<Knex.Transaction> {
if (!options.ctx) {
await options.eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart);
}

const trx = await (ctx || this.client).transaction();
const trx = await (options.ctx || this.client).transaction(null, { isolationLevel: options.isolationLevel });

if (!ctx) {
await eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, trx);
if (!options.ctx) {
await options.eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, trx);
} else {
trx[parentTransactionSymbol] = ctx;
trx[parentTransactionSymbol] = options.ctx;
}

return trx;
Expand Down
2 changes: 1 addition & 1 deletion packages/migrations/src/MigrationRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class MigrationRunner {
migration.setTransactionContext(tx);
const queries = await this.getQueries(migration, method);
await Utils.runSerial(queries, sql => this.driver.execute(sql, undefined, 'all', tx));
}, this.masterTransaction);
}, { ctx: this.masterTransaction });
}
}

Expand Down
20 changes: 10 additions & 10 deletions packages/mongodb/src/MongoConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
import { inspect } from 'util';
import {
Connection, ConnectionConfig, QueryResult, Transaction, Utils, QueryOrder, QueryOrderMap,
FilterQuery, AnyEntity, EntityName, Dictionary, EntityData, TransactionEventBroadcaster, EventType,
FilterQuery, AnyEntity, EntityName, Dictionary, EntityData, TransactionEventBroadcaster, EventType, IsolationLevel,
} from '@mikro-orm/core';

export class MongoConnection extends Connection {
Expand Down Expand Up @@ -148,33 +148,33 @@ export class MongoConnection extends Connection {
return this.runQuery<T, number>('countDocuments', collection, undefined, where, ctx);
}

async transactional<T>(cb: (trx: Transaction<ClientSession>) => Promise<T>, ctx?: Transaction<ClientSession>, eventBroadcaster?: TransactionEventBroadcaster): Promise<T> {
const session = await this.begin(ctx, eventBroadcaster);
async transactional<T>(cb: (trx: Transaction<ClientSession>) => Promise<T>, options: { isolationLevel?: IsolationLevel; ctx?: Transaction<ClientSession>; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise<T> {
const session = await this.begin(options);

try {
const ret = await cb(session);
await this.commit(session, eventBroadcaster);
await this.commit(session, options.eventBroadcaster);

return ret;
} catch (error) {
await this.rollback(session, eventBroadcaster);
await this.rollback(session, options.eventBroadcaster);
throw error;
} finally {
session.endSession();
}
}

async begin(ctx?: ClientSession, eventBroadcaster?: TransactionEventBroadcaster): Promise<ClientSession> {
if (!ctx) {
async begin(options: { isolationLevel?: IsolationLevel; ctx?: ClientSession; eventBroadcaster?: TransactionEventBroadcaster } = {}): Promise<ClientSession> {
if (!options.ctx) {
/* istanbul ignore next */
await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart);
await options.eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart);
}

const session = ctx || this.client.startSession();
const session = options.ctx || this.client.startSession();
session.startTransaction();
this.logQuery('db.begin();');
/* istanbul ignore next */
await eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, session);
await options.eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, session);

return session;
}
Expand Down
9 changes: 9 additions & 0 deletions tests/EntityManager.mongo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,15 @@ describe('EntityManagerMongo', () => {
expect(driver.getConnection().getCollection(BookTag).collectionName).toBe('book-tag');
expect(orm.em.getCollection(BookTag).collectionName).toBe('book-tag');

const conn = driver.getConnection();
const tx = await conn.begin();
const first = await driver.nativeInsert(Publisher.name, { name: 'test 123', type: 'GLOBAL' }, tx);
await conn.commit(tx);

await conn.transactional(async tx => {
await driver.nativeDelete(Publisher.name, first.insertId, tx);
});

// multi inserts
const res = await driver.nativeInsertMany(Publisher.name, [
{ name: 'test 1', type: 'GLOBAL' },
Expand Down
31 changes: 25 additions & 6 deletions tests/EntityManager.mysql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import chalk from 'chalk';
import {
Collection, Configuration, EntityManager, LockMode, MikroORM, QueryFlag, QueryOrder, Reference, Logger, ValidationError, wrap,
UniqueConstraintViolationException, TableNotFoundException, TableExistsException, SyntaxErrorException,
NonUniqueFieldNameException, InvalidFieldNameException, expr,
NonUniqueFieldNameException, InvalidFieldNameException, expr, IsolationLevel,
} from '@mikro-orm/core';
import { MySqlDriver, MySqlConnection } from '@mikro-orm/mysql';
import { Author2, Book2, BookTag2, FooBar2, FooBaz2, Publisher2, PublisherType, Test2 } from './entities-sql';
Expand Down Expand Up @@ -96,11 +96,11 @@ describe('EntityManagerMySql', () => {
await expect(driver.ensureIndexes()).rejects.toThrowError('MySqlDriver does not use ensureIndexes');

const conn = driver.getConnection();
await conn.transactional(async tx => {
await conn.execute('select 1', [], 'all', tx);
await conn.execute(orm.em.getKnex().raw('select 1'), [], 'all', tx);
await conn.execute(orm.em.getRepository(Author2).getKnex().raw('select 1'), [], 'all', tx);
});
const tx = await conn.begin();
await conn.execute('select 1', [], 'all', tx);
await conn.execute(orm.em.getKnex().raw('select 1'), [], 'all', tx);
await conn.execute(orm.em.getRepository(Author2).getKnex().raw('select 1'), [], 'all', tx);
await conn.commit(tx);

// multi inserts
const res = await driver.nativeInsertMany(Publisher2.name, [
Expand Down Expand Up @@ -334,6 +334,25 @@ describe('EntityManagerMySql', () => {
}
});

test('transactions with isolation levels', async () => {
const mock = jest.fn();
const logger = new Logger(mock, ['query']);
Object.assign(orm.config, { logger });

const god1 = new Author2('God1', 'hello@heaven1.god');
try {
await orm.em.transactional(async em => {
await em.persistAndFlush(god1);
throw new Error(); // rollback the transaction
}, { isolationLevel: IsolationLevel.READ_UNCOMMITTED });
} catch { }

expect(mock.mock.calls[0][0]).toMatch('set transaction isolation level read uncommitted');
expect(mock.mock.calls[1][0]).toMatch('begin');
expect(mock.mock.calls[2][0]).toMatch('insert into `author2` (`created_at`, `email`, `name`, `terms_accepted`, `updated_at`) values (?, ?, ?, ?, ?)');
expect(mock.mock.calls[3][0]).toMatch('rollback');
});

test('nested transactions with save-points', async () => {
await orm.em.transactional(async em => {
const god1 = new Author2('God1', 'hello1@heaven.god');
Expand Down
20 changes: 19 additions & 1 deletion tests/EntityManager.postgre.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { v4 } from 'uuid';
import {
Collection, Configuration, EntityManager, LockMode, MikroORM, QueryFlag, QueryOrder, Reference, Logger, ValidationError, ChangeSetType, wrap, expr,
UniqueConstraintViolationException, TableNotFoundException, NotNullConstraintViolationException, TableExistsException, SyntaxErrorException,
NonUniqueFieldNameException, InvalidFieldNameException, EventSubscriber, ChangeSet, AnyEntity, FlushEventArgs, LoadStrategy,
NonUniqueFieldNameException, InvalidFieldNameException, EventSubscriber, ChangeSet, AnyEntity, FlushEventArgs, LoadStrategy, IsolationLevel,
} from '@mikro-orm/core';
import { PostgreSqlDriver, PostgreSqlConnection } from '@mikro-orm/postgresql';
import { Address2, Author2, Book2, BookTag2, FooBar2, FooBaz2, Publisher2, PublisherType, PublisherType2, Test2, Label2 } from './entities-sql';
Expand Down Expand Up @@ -214,6 +214,24 @@ describe('EntityManagerPostgre', () => {
}
});

test('transactions with isolation levels', async () => {
const mock = jest.fn();
const logger = new Logger(mock, ['query']);
Object.assign(orm.config, { logger });

const god1 = new Author2('God1', 'hello@heaven1.god');
try {
await orm.em.transactional(async em => {
await em.persistAndFlush(god1);
throw new Error(); // rollback the transaction
}, { isolationLevel: IsolationLevel.READ_UNCOMMITTED });
} catch { }

expect(mock.mock.calls[0][0]).toMatch('begin isolation level read uncommitted');
expect(mock.mock.calls[1][0]).toMatch('insert into "author2" ("created_at", "email", "name", "terms_accepted", "updated_at") values ($1, $2, $3, $4, $5) returning "id", "created_at", "updated_at", "age", "terms_accepted"');
expect(mock.mock.calls[2][0]).toMatch('rollback');
});

test('nested transactions with save-points', async () => {
await orm.em.transactional(async em => {
const god1 = new Author2('God1', 'hello1@heaven.god');
Expand Down

0 comments on commit 6ae5fbf

Please sign in to comment.