diff --git a/packages/core/src/EntityManager.ts b/packages/core/src/EntityManager.ts index 53477d353a01..7175dd930589 100644 --- a/packages/core/src/EntityManager.ts +++ b/packages/core/src/EntityManager.ts @@ -8,7 +8,7 @@ import { AnyEntity, Dictionary, EntityData, EntityMetadata, EntityName, FilterDe import { LoadStrategy, LockMode, QueryOrderMap, ReferenceType, SCALAR_TYPES } from './enums'; import { MetadataStorage } from './metadata'; import { Transaction } from './connections'; -import { EventManager } from './events'; +import { EventManager, TransactionEventBroadcaster } from './events'; import { EntityComparator } from './utils/EntityComparator'; import { OptimisticLockError, ValidationError } from './errors'; @@ -335,7 +335,7 @@ export class EntityManager { await em.flush(); return ret; - }, ctx); + }, ctx, new TransactionEventBroadcaster(em)); }); } @@ -343,7 +343,7 @@ export class EntityManager { * Starts new transaction bound to this EntityManager. Use `ctx` parameter to provide the parent when nesting transactions. */ async begin(ctx?: Transaction): Promise { - this.transactionContext = await this.getConnection('write').begin(ctx); + this.transactionContext = await this.getConnection('write').begin(ctx, new TransactionEventBroadcaster(this)); } /** @@ -351,7 +351,7 @@ export class EntityManager { */ async commit(): Promise { await this.flush(); - await this.getConnection('write').commit(this.transactionContext); + await this.getConnection('write').commit(this.transactionContext, new TransactionEventBroadcaster(this)); delete this.transactionContext; } @@ -359,7 +359,7 @@ export class EntityManager { * Rollbacks the transaction bound to this EntityManager. */ async rollback(): Promise { - await this.getConnection('write').rollback(this.transactionContext); + await this.getConnection('write').rollback(this.transactionContext, new TransactionEventBroadcaster(this)); delete this.transactionContext; } diff --git a/packages/core/src/connections/Connection.ts b/packages/core/src/connections/Connection.ts index 576220c23929..601e52c5bc16 100644 --- a/packages/core/src/connections/Connection.ts +++ b/packages/core/src/connections/Connection.ts @@ -5,6 +5,7 @@ import { Configuration, ConnectionOptions, Utils } from '../utils'; import { MetadataStorage } from '../metadata'; import { Dictionary } from '../typings'; import { Platform } from '../platforms/Platform'; +import { TransactionEventBroadcaster } from '../events/TransactionEventBroadcaster'; export abstract class Connection { @@ -41,19 +42,19 @@ export abstract class Connection { */ abstract getDefaultClientUrl(): string; - async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction): Promise { + async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { throw new Error(`Transactions are not supported by current driver`); } - async begin(ctx?: Transaction): Promise { + async begin(ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { throw new Error(`Transactions are not supported by current driver`); } - async commit(ctx: Transaction): Promise { + async commit(ctx: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { throw new Error(`Transactions are not supported by current driver`); } - async rollback(ctx: Transaction): Promise { + async rollback(ctx: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { throw new Error(`Transactions are not supported by current driver`); } diff --git a/packages/core/src/enums.ts b/packages/core/src/enums.ts index adbb6f9dd808..f70b08065137 100644 --- a/packages/core/src/enums.ts +++ b/packages/core/src/enums.ts @@ -103,4 +103,12 @@ export enum EventType { beforeFlush = 'beforeFlush', onFlush = 'onFlush', afterFlush = 'afterFlush', + beforeTransactionStart = 'beforeTransactionStart', + afterTransactionStart = 'afterTransactionStart', + beforeTransactionCommit = 'beforeTransactionCommit', + afterTransactionCommit = 'afterTransactionCommit', + beforeTransactionRollback = 'beforeTransactionRollback', + afterTransactionRollback = 'afterTransactionRollback', } + +export type TransactionEventType = EventType.beforeTransactionStart | EventType.afterTransactionStart | EventType.beforeTransactionCommit | EventType.afterTransactionCommit | EventType.beforeTransactionRollback | EventType.afterTransactionRollback; diff --git a/packages/core/src/events/EventManager.ts b/packages/core/src/events/EventManager.ts index 1f6c732e46c6..1895f545777b 100644 --- a/packages/core/src/events/EventManager.ts +++ b/packages/core/src/events/EventManager.ts @@ -1,7 +1,7 @@ import { AnyEntity, EntityMetadata } from '../typings'; -import { EventArgs, EventSubscriber, FlushEventArgs } from './EventSubscriber'; +import { EventArgs, EventSubscriber, FlushEventArgs, TransactionEventArgs } from './EventSubscriber'; import { Utils } from '../utils'; -import { EventType } from '../enums'; +import { EventType, TransactionEventType } from '../enums'; export class EventManager { @@ -22,9 +22,10 @@ export class EventManager { }); } + dispatchEvent>(event: TransactionEventType, args: TransactionEventArgs): unknown; dispatchEvent>(event: EventType.onInit, args: Partial>): unknown; dispatchEvent>(event: EventType, args: Partial | FlushEventArgs>): Promise; - dispatchEvent>(event: EventType, args: Partial | FlushEventArgs>): Promise | unknown { + dispatchEvent>(event: EventType, args: Partial | FlushEventArgs | TransactionEventArgs>): Promise | unknown { const listeners: [EventType, EventSubscriber][] = []; const entity: T = (args as EventArgs).entity; @@ -41,10 +42,10 @@ export class EventManager { } if (event === EventType.onInit) { - return listeners.forEach(listener => listener[1][listener[0]]!(args as (EventArgs & FlushEventArgs))); + return listeners.forEach(listener => listener[1][listener[0]]!(args as (EventArgs & FlushEventArgs & TransactionEventArgs))); } - return Utils.runSerial(listeners, listener => listener[1][listener[0]]!(args as (EventArgs & FlushEventArgs)) as Promise); + return Utils.runSerial(listeners, listener => listener[1][listener[0]]!(args as (EventArgs & FlushEventArgs & TransactionEventArgs)) as Promise); } hasListeners>(event: EventType, meta: EntityMetadata): boolean { diff --git a/packages/core/src/events/EventSubscriber.ts b/packages/core/src/events/EventSubscriber.ts index 9c6a3db7af3f..c70524bb4a0e 100644 --- a/packages/core/src/events/EventSubscriber.ts +++ b/packages/core/src/events/EventSubscriber.ts @@ -1,6 +1,7 @@ import { EntityName } from '../typings'; import { EntityManager } from '../EntityManager'; import { ChangeSet, UnitOfWork } from '../unit-of-work'; +import { Transaction } from '../connections'; export interface EventArgs { entity: T; @@ -12,6 +13,11 @@ export interface FlushEventArgs extends Omit, 'entity'> { uow: UnitOfWork; } +export interface TransactionEventArgs extends Omit, 'entity' | 'changeSet'> { + transaction?: Transaction; + uow?: UnitOfWork; +} + export interface EventSubscriber { getSubscribedEntities?(): EntityName[]; onInit?(args: EventArgs): void; @@ -24,4 +30,11 @@ export interface EventSubscriber { beforeFlush?(args: FlushEventArgs): Promise; onFlush?(args: FlushEventArgs): Promise; afterFlush?(args: FlushEventArgs): Promise; + + beforeTransactionStart?(args: TransactionEventArgs): Promise; + afterTransactionStart?(args: TransactionEventArgs): Promise; + beforeTransactionCommit?(args: TransactionEventArgs): Promise; + afterTransactionCommit?(args: TransactionEventArgs): Promise; + beforeTransactionRollback?(args: TransactionEventArgs): Promise; + afterTransactionRollback?(args: TransactionEventArgs): Promise; } diff --git a/packages/core/src/events/TransactionEventBroadcaster.ts b/packages/core/src/events/TransactionEventBroadcaster.ts new file mode 100644 index 000000000000..13237f30b297 --- /dev/null +++ b/packages/core/src/events/TransactionEventBroadcaster.ts @@ -0,0 +1,17 @@ +import { Transaction } from '../connections'; +import { EntityManager } from '../EntityManager'; +import { TransactionEventType } from '../enums'; +import { UnitOfWork } from '../unit-of-work'; + +export class TransactionEventBroadcaster { + + constructor( + private entityManager: EntityManager, + private uow?: UnitOfWork + ) {} + + async dispatchEvent(event: TransactionEventType, transaction?: Transaction) { + await this.entityManager.getEventManager().dispatchEvent(event, { em: this.entityManager, transaction, uow: this.uow }); + } + +} diff --git a/packages/core/src/events/index.ts b/packages/core/src/events/index.ts index f629c0df9cdb..8e82421d63b2 100644 --- a/packages/core/src/events/index.ts +++ b/packages/core/src/events/index.ts @@ -1,2 +1,3 @@ export * from './EventSubscriber'; export * from './EventManager'; +export * from './TransactionEventBroadcaster'; diff --git a/packages/core/src/unit-of-work/UnitOfWork.ts b/packages/core/src/unit-of-work/UnitOfWork.ts index 1c3f3d80f0bf..b94f902774d1 100644 --- a/packages/core/src/unit-of-work/UnitOfWork.ts +++ b/packages/core/src/unit-of-work/UnitOfWork.ts @@ -9,6 +9,7 @@ import { EntityManager } from '../EntityManager'; import { Cascade, EventType, LockMode, ReferenceType } from '../enums'; import { OptimisticLockError, ValidationError } from '../errors'; import { Transaction } from '../connections'; +import { TransactionEventBroadcaster } from '../events'; import { IdentityMap } from './IdentityMap'; export class UnitOfWork { @@ -223,7 +224,7 @@ export class UnitOfWork { const runInTransaction = !this.em.isInTransaction() && platform.supportsTransactions() && this.em.config.get('implicitTransactions'); if (runInTransaction) { - await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx)); + await this.em.getConnection('write').transactional(trx => this.persistToDatabase(groups, trx), undefined, new TransactionEventBroadcaster(this.em, this)); } else { await this.persistToDatabase(groups, this.em.getTransactionContext()); } diff --git a/packages/knex/src/AbstractSqlConnection.ts b/packages/knex/src/AbstractSqlConnection.ts index 04c8fc572f82..c2c188014146 100644 --- a/packages/knex/src/AbstractSqlConnection.ts +++ b/packages/knex/src/AbstractSqlConnection.ts @@ -1,8 +1,17 @@ import Knex, { Config, QueryBuilder, Raw, Client, Transaction as KnexTransaction } from 'knex'; import { readFile } from 'fs-extra'; -import { AnyEntity, Configuration, Connection, ConnectionOptions, EntityData, QueryResult, Transaction, Utils } from '@mikro-orm/core'; +import { + AnyEntity, Configuration, Connection, ConnectionOptions, EntityData, EventType, QueryResult, + Transaction, TransactionEventBroadcaster, Utils, +} from '@mikro-orm/core'; import { AbstractSqlPlatform } from './AbstractSqlPlatform'; +const parentTransactionSymbol = Symbol('parentTransaction'); + +function isRootTransaction(trx: Transaction) { + return !Object.getOwnPropertySymbols(trx).includes(parentTransactionSymbol); +} + export abstract class AbstractSqlConnection extends Connection { protected platform!: AbstractSqlPlatform; @@ -30,21 +39,52 @@ export abstract class AbstractSqlConnection extends Connection { } } - async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction): Promise { - return (ctx || this.client).transaction(cb); + async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + const trx = await this.begin(ctx, eventBroadcaster); + try { + const ret = await cb(trx); + await this.commit(trx, eventBroadcaster); + return ret; + } catch (error) { + await this.rollback(trx, eventBroadcaster); + throw error; + } } - async begin(ctx?: KnexTransaction): Promise { - return (ctx || this.client).transaction(); + async begin(ctx?: KnexTransaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + if (!ctx) { + await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart); + } + const trx = await (ctx || this.client).transaction(); + if (!ctx) { + await eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, trx); + } else { + trx[parentTransactionSymbol] = ctx; + } + return trx; } - async commit(ctx: KnexTransaction): Promise { + async commit(ctx: KnexTransaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + const runTrxHooks = isRootTransaction(ctx); + if (runTrxHooks) { + await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionCommit, ctx); + } ctx.commit(); - return ctx.executionPromise; // https://github.com/knex/knex/issues/3847#issuecomment-626330453 + await ctx.executionPromise; // https://github.com/knex/knex/issues/3847#issuecomment-626330453 + if (runTrxHooks) { + await eventBroadcaster?.dispatchEvent(EventType.afterTransactionCommit, ctx); + } } - async rollback(ctx: KnexTransaction): Promise { - return ctx.rollback(); + async rollback(ctx: KnexTransaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + const runTrxHooks = isRootTransaction(ctx); + if (runTrxHooks) { + await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionRollback, ctx); + } + await ctx.rollback(); + if (runTrxHooks) { + await eventBroadcaster?.dispatchEvent(EventType.afterTransactionRollback, ctx); + } } async execute | EntityData[] = EntityData[]>(queryOrKnex: string | QueryBuilder | Raw, params: any[] = [], method: 'all' | 'get' | 'run' = 'all', ctx?: Transaction): Promise { diff --git a/packages/mongodb/src/MongoConnection.ts b/packages/mongodb/src/MongoConnection.ts index 9cb531ee7e52..f696d44868a1 100644 --- a/packages/mongodb/src/MongoConnection.ts +++ b/packages/mongodb/src/MongoConnection.ts @@ -5,7 +5,7 @@ import { import { inspect } from 'util'; import { Connection, ConnectionConfig, QueryResult, Transaction, Utils, QueryOrder, QueryOrderMap, - FilterQuery, AnyEntity, EntityName, Dictionary, EntityData, + FilterQuery, AnyEntity, EntityName, Dictionary, EntityData, TransactionEventBroadcaster, EventType, } from '@mikro-orm/core'; export class MongoConnection extends Connection { @@ -148,39 +148,50 @@ export class MongoConnection extends Connection { return this.runQuery('countDocuments', collection, undefined, where, ctx); } - async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction): Promise { - const session = ctx || this.client.startSession(); - let ret: T = null as unknown as T; - + async transactional(cb: (trx: Transaction) => Promise, ctx?: Transaction, eventBroadcaster?: TransactionEventBroadcaster): Promise { + const session = await this.begin(ctx, eventBroadcaster); try { - this.logQuery('db.begin();'); - await session.withTransaction(async () => ret = await cb(session)); + const ret = await cb(session); + await this.commit(session, eventBroadcaster); + return ret; + } catch (error) { + await this.rollback(session, eventBroadcaster); + throw error; + } finally { session.endSession(); - this.logQuery('db.commit();'); - } catch (e) { - this.logQuery('db.rollback();'); - throw e; } - - return ret; } - async begin(ctx?: ClientSession): Promise { + async begin(ctx?: ClientSession, eventBroadcaster?: TransactionEventBroadcaster): Promise { + if (!ctx) { + /* istanbul ignore next */ + await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionStart); + } const session = ctx || this.client.startSession(); session.startTransaction(); this.logQuery('db.begin();'); + /* istanbul ignore next */ + await eventBroadcaster?.dispatchEvent(EventType.afterTransactionStart, session); return session; } - async commit(ctx: ClientSession): Promise { + async commit(ctx: ClientSession, eventBroadcaster?: TransactionEventBroadcaster): Promise { + /* istanbul ignore next */ + await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionCommit, ctx); await ctx.commitTransaction(); this.logQuery('db.commit();'); + /* istanbul ignore next */ + await eventBroadcaster?.dispatchEvent(EventType.afterTransactionCommit, ctx); } - async rollback(ctx: ClientSession): Promise { + async rollback(ctx: ClientSession, eventBroadcaster?: TransactionEventBroadcaster): Promise { + /* istanbul ignore next */ + await eventBroadcaster?.dispatchEvent(EventType.beforeTransactionRollback, ctx); await ctx.abortTransaction(); this.logQuery('db.rollback();'); + /* istanbul ignore next */ + await eventBroadcaster?.dispatchEvent(EventType.afterTransactionRollback, ctx); } protected logQuery(query: string, took?: number): void { diff --git a/tests/issues/GH1175.test.ts b/tests/issues/GH1175.test.ts new file mode 100644 index 000000000000..1d7a7cf720a0 --- /dev/null +++ b/tests/issues/GH1175.test.ts @@ -0,0 +1,733 @@ +import { + EntityManager, + Entity, + MikroORM, + PrimaryKey, + Property, + EventSubscriber, + EventArgs, + TransactionEventArgs, + Transaction, + UnitOfWork, + Unique, +} from '@mikro-orm/core'; +import { MongoDriver, ObjectId } from '@mikro-orm/mongodb'; +import { PostgreSqlDriver } from '@mikro-orm/postgresql'; +import { v4 as uuid } from 'uuid'; + +class UserSubscriber implements EventSubscriber { + + pendingActions = new Map void | Promise)[]>(); + + async beforeTransactionStart(args: TransactionEventArgs) { + // + } + + async afterTransactionStart(args: TransactionEventArgs) { + const { em } = args; + this.pendingActions.set(em, []); + } + + async afterTransactionCommit(args: TransactionEventArgs) { + const { em } = args; + const actions = this.pendingActions.get(em); + if (actions) { + for (const action of actions) { + await action(); + } + } + this.pendingActions.delete(em); + } + + async afterTransactionRollback(args: TransactionEventArgs) { + const { em } = args; + this.pendingActions.delete(em); + } + + async afterCreate(args: EventArgs) { + const { em } = args; + this.pendingActions.get(em)?.push(() => { + this.afterCommitCreate(args); + }); + } + + async afterCommitCreate(args: EventArgs) { + // + } + +} + +describe('GH issue 1175', () => { + let em: EntityManager; + const testSubscriber = new UserSubscriber(); + const afterCreate = jest.spyOn(testSubscriber, 'afterCreate'); + const beforeTransactionStart = jest.spyOn(testSubscriber, 'beforeTransactionStart'); + const afterTransactionStart = jest.spyOn(testSubscriber, 'afterTransactionStart'); + const afterTransactionCommit = jest.spyOn(testSubscriber, 'afterTransactionCommit'); + const afterTransactionRollback = jest.spyOn(testSubscriber, 'afterTransactionRollback'); + const afterCommitCreate = jest.spyOn(testSubscriber, 'afterCommitCreate'); + + afterEach(() => { + beforeTransactionStart.mockClear(); + afterTransactionStart.mockClear(); + afterCreate.mockClear(); + afterTransactionCommit.mockClear(); + afterCommitCreate.mockClear(); + afterTransactionRollback.mockClear(); + }); + + describe('sql', () => { + @Entity({ tableName: 'users' }) + class User { + + @PrimaryKey() + id!: number; + + @Property() + readonly username: string; + + constructor(username: string) { + this.username = username; + } + + } + + async function getOrmInstance(subscriber?: EventSubscriber): Promise> { + const orm = await MikroORM.init({ + entities: [User], + dbName: 'mikro_orm_test_gh_1175', + type: 'postgresql', + subscribers: subscriber ? [subscriber] : [], + }); + + return orm as MikroORM; + } + + let orm: MikroORM; + + beforeAll(async () => { + orm = await getOrmInstance(testSubscriber); + await orm.getSchemaGenerator().dropDatabase('mikro_orm_test_gh_1175'); + await orm.getSchemaGenerator().createDatabase('mikro_orm_test_gh_1175'); + }); + + afterAll(async () => { + await orm.close(); + }); + + beforeEach(() => { + em = orm.em.fork(); + }); + + describe('immediate constraints (failures on insert)', () => { + beforeAll(async () => { + const orm = await getOrmInstance(); + await orm.em.getConnection().execute( + ` + drop table if exists users; + create table users ( + id serial primary key, + username varchar(50) not null unique deferrable initially immediate + ); + ` + ); + await orm.close(); + }); + describe('implicit transactions', () => { + let username: string; + it('afterCommitCreate called when transaction succeeds', async () => { + username = uuid(); + const user = new User(username); + em.persist(user); + + await expect(em.flush()).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledWith(expect.objectContaining({ em, uow: expect.any(UnitOfWork) })); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCommitCreate not called when transaction fails', async () => { + const user = new User(username); + em.persist(user); + + await expect(em.flush()).rejects.toThrowError( + /^insert.+duplicate key value/ + ); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + + describe('explicit transactions with "transactional()"', () => { + let username: string; + it('afterCommitCreate called when transaction succeeds', async () => { + const work = em.transactional(async em => { + username = uuid(); + const user = new User(username); + em.persist(user); + }); + + await expect(work).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCommitCreate not called when transaction fails', async () => { + const work = em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + + await expect(work).rejects.toThrowError(/^insert.+duplicate key value/); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + describe('nested transactions', () => { + it('inner and outer afterCommitCreate called', async () => { + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + username = 'nested1'; + const user = new User(username); + em.persist(user); + await em.transactional(async em => { + username = 'nested2'; + const user = new User(username); + em.persist(user); + }); + }); + }; + await expect(work()).resolves.toBeUndefined(); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(2); + expect(afterCreate).toHaveBeenNthCalledWith(1, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'nested1' }) })); + expect(afterCreate).toHaveBeenNthCalledWith(2, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'nested2' }) })); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCommitCreate).toHaveBeenCalledTimes(2); + expect(afterCommitCreate).toHaveBeenNthCalledWith(1, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'nested1' }) })); + expect(afterCommitCreate).toHaveBeenNthCalledWith(2, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'nested2' }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('no afterCommitCreate called if inner transaction fails', async () => { + const username = uuid(); + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + const user = new User(username); + em.persist(user); + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + }); + }; + await expect(work()).rejects.toThrowError(/^insert.+duplicate key value/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + expect(afterTransactionRollback).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + }); + it('no afterCommitCreate called if outer transaction fails before running inner transaction', async () => { + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + const user = new User(username); + em.persist(user); + await em.flush(); + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + }); + }; + await expect(work()).rejects.toThrowError(/^insert.+duplicate key value/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + expect(afterTransactionRollback).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + }); + it('no afterCommitCreate called if outer transaction fails after running inner transaction', async () => { + const username = uuid(); + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + const user = new User(username); + em.persist(user); + }); + }; + await expect(work()).rejects.toThrowError(/^insert.+duplicate key value/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + expect(afterTransactionRollback).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + }); + }); + }); + + describe('explicit transactions with explicit begin/commit method calls', () => { + let username: string; + it('creating a new user succeeds', async () => { + await em.begin(); + username = uuid(); + const user = new User(username); + em.persist(user); + + await expect(em.commit()).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCreate hook is called when commit() fails', async () => { + await em.begin(); + const work = async () => { + try { + const user = new User(username); + em.persist(user); + await em.commit(); + } catch (error) { + await em.rollback(); + throw error; + } + }; + + await expect(work).rejects.toThrowError(/^insert.+duplicate key value/); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + }); + + describe('deferred constraints (failures on commit)', () => { + beforeAll(async () => { + const orm = await getOrmInstance(); + await orm.em.getConnection().execute( + ` + drop table if exists users; + create table users ( + id serial primary key, + username varchar(50) not null unique deferrable initially deferred + ); + ` + ); + await orm.close(); + }); + + describe('implicit transactions', () => { + let username: string; + it('creating a new user succeeds', async () => { + username = uuid(); + const user = new User(username); + em.persist(user); + + await expect(em.flush()).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledWith(expect.objectContaining({ em, uow: expect.any(UnitOfWork) })); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCreate hook is called when flush fails', async () => { + const user = new User(username); + em.persist(user); + + await expect(em.flush()).rejects.toThrowError( + /^COMMIT.+duplicate key value/ + ); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + + describe('explicit transactions with "transactional()"', () => { + let username: string; + it('creating a new user succeeds', async () => { + const work = em.transactional(async em => { + username = uuid(); + const user = new User(username); + em.persist(user); + }); + + await expect(work).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCreate hook is called when transactional() fails', async () => { + const work = async () => { + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + }; + + await expect(work).rejects.toThrowError(/^COMMIT.+duplicate key value/); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + describe('nested transactions', () => { + it('inner and outer afterCommitCreate called', async () => { + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + username = 'dnested1'; + const user = new User(username); + em.persist(user); + await em.transactional(async em => { + username = 'dnested2'; + const user = new User(username); + em.persist(user); + }); + }); + }; + await expect(work()).resolves.toBeUndefined(); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(2); + expect(afterCreate).toHaveBeenNthCalledWith(1, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'dnested1' }) })); + expect(afterCreate).toHaveBeenNthCalledWith(2, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'dnested2' }) })); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCommitCreate).toHaveBeenCalledTimes(2); + expect(afterCommitCreate).toHaveBeenNthCalledWith(1, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'dnested1' }) })); + expect(afterCommitCreate).toHaveBeenNthCalledWith(2, expect.objectContaining({ em: em1!, entity: expect.objectContaining({ username: 'dnested2' }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('no afterCommitCreate called if inner transaction fails', async () => { + const username = uuid(); + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + const user = new User(username); + em.persist(user); + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + }); + }; + await expect(work()).rejects.toThrowError(/^COMMIT.+duplicate key value/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(2); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + expect(afterTransactionRollback).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + }); + it('no afterCommitCreate called if outer transaction fails before running inner transaction', async () => { + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + const user = new User(username); + em.persist(user); + await em.flush(); + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + }); + }; + await expect(work()).rejects.toThrowError(/^COMMIT.+duplicate key value/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(2); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + expect(afterTransactionRollback).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + }); + it('no afterCommitCreate called if outer transaction fails after running inner transaction', async () => { + const username = uuid(); + let em1: EntityManager; + let trx1: Transaction; + const work = async () => { + await em.transactional(async em => { + em1 = em; + trx1 = em.getTransactionContext(); + await em.transactional(async em => { + const user = new User(username); + em.persist(user); + }); + const user = new User(username); + em.persist(user); + }); + }; + await expect(work()).rejects.toThrowError(/^COMMIT.+duplicate key value/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(beforeTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: undefined })); + expect(afterTransactionStart).toHaveBeenCalledTimes(1); + expect(afterTransactionStart).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + expect(afterCreate).toHaveBeenCalledTimes(2); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + expect(afterTransactionRollback).toHaveBeenCalledWith(expect.objectContaining({ em: em1!, transaction: trx1 })); + }); + }); + }); + + describe('explicit transactions with explicit begin/commit/rollback method calls', () => { + let username: string; + it('creating a new user succeeds', async () => { + await em.begin(); + username = uuid(); + const user = new User(username); + em.persist(user); + + await expect(em.commit()).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCreate hook is called when commit() fails', async () => { + await em.begin(); + const work = async () => { + try { + const user = new User(username); + em.persist(user); + await em.commit(); + } catch (error) { + await em.rollback(); + throw error; + } + }; + + await expect(work).rejects.toThrowError(/^COMMIT.+duplicate key value/); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + }); + }); + + describe('mongo', () => { + @Entity() + class Entity1175 { + + @PrimaryKey() + _id!: ObjectId; + + @Unique() + @Property() + readonly username: string; + + constructor(username: string) { + this.username = username; + } + + } + + let orm: MikroORM; + + beforeAll(async () => { + orm = await MikroORM.init({ + entities: [Entity1175], + clientUrl: 'mongodb://localhost:27017,localhost:27018,localhost:27019/mikro-orm-test?replicaSet=rs0', + type: 'mongo', + implicitTransactions: true, + subscribers: [testSubscriber], + }); + await orm.em.nativeDelete(Entity1175, {}); + await orm.em.nativeInsert(Entity1175, { username: 'test1' }); + await orm.em.getDriver().ensureIndexes(); + }); + + afterAll(async () => { + await orm.close(); + }); + + beforeEach(() => { + em = orm.em.fork(); + }); + + describe('implicit transactions', () => { + let username: string; + it('afterCommitCreate called when transaction succeeds', async () => { + username = uuid(); + const user = new Entity1175(username); + em.persist(user); + + await expect(em.flush()).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledWith(expect.objectContaining({ em, uow: expect.any(UnitOfWork) })); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCommitCreate not called when transaction fails', async () => { + const user = new Entity1175(username); + em.persist(user); + + await expect(em.flush()).rejects.toThrowError( + /^E11000 duplicate key error/ + ); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + + describe('explicit transactions with "transactional()"', () => { + let username: string; + it('afterCommitCreate called when transaction succeeds', async () => { + const work = em.transactional(async em => { + username = uuid(); + const user = new Entity1175(username); + em.persist(user); + }); + + await expect(work).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCommitCreate not called when transaction fails', async () => { + const work = em.transactional(async em => { + const user = new Entity1175(username); + em.persist(user); + }); + + await expect(work).rejects.toThrowError(/^E11000 duplicate key error/); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + describe('nested transactions', () => { + it('not allowed', async () => { + const work = async () => { + await em.transactional(async em => { + username = 'nested1'; + const user = new Entity1175(username); + em.persist(user); + await em.transactional(async em => { + username = 'nested2'; + const user = new Entity1175(username); + em.persist(user); + }); + }); + }; + await expect(work()).rejects.toThrowError(/Transaction already in progress/); + expect(beforeTransactionStart).toHaveBeenCalledTimes(1); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + }); + + describe('explicit transactions with explicit begin/commit method calls', () => { + let username: string; + it('creating a new user succeeds', async () => { + await em.begin(); + username = uuid(); + const user = new Entity1175(username); + em.persist(user); + + await expect(em.commit()).resolves.toBeUndefined(); + expect(afterCreate).toHaveBeenCalledTimes(1); + expect(afterTransactionCommit).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledTimes(1); + expect(afterCommitCreate).toHaveBeenCalledWith(expect.objectContaining({ entity: expect.objectContaining({ username }) })); + expect(afterTransactionRollback).toHaveBeenCalledTimes(0); + }); + it('afterCreate hook is called when commit() fails', async () => { + await em.begin(); + const work = async () => { + try { + const user = new Entity1175(username); + em.persist(user); + await em.commit(); + } catch (error) { + await em.rollback(); + throw error; + } + }; + + await expect(work).rejects.toThrowError(/^E11000 duplicate key error/); + expect(afterCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionCommit).toHaveBeenCalledTimes(0); + expect(afterCommitCreate).toHaveBeenCalledTimes(0); + expect(afterTransactionRollback).toHaveBeenCalledTimes(1); + }); + }); + }); +}); diff --git a/tests/issues/GH1176.test.ts b/tests/issues/GH1176.test.ts index b3924753d651..f1a6a9b097cc 100644 --- a/tests/issues/GH1176.test.ts +++ b/tests/issues/GH1176.test.ts @@ -51,7 +51,7 @@ describe('GH issue 1176', () => { em = orm.em.fork(); }); - describe('immediate constraint (failures on insert)', () => { + describe('immediate constraints (failures on insert)', () => { beforeAll(async () => { const orm = await getOrmInstance(); await orm.em.getConnection().execute( @@ -157,7 +157,7 @@ describe('GH issue 1176', () => { await expect(em.flush()).resolves.toBeUndefined(); }); - it('flush throws when a database constraint ffails', async () => { + it('flush throws when a database constraint fails', async () => { const user = new User(username); em.persist(user);