diff --git a/docs/tutorials/mikroorm.md b/docs/tutorials/mikroorm.md index 4536b18d2fb..aaa853b77ea 100644 --- a/docs/tutorials/mikroorm.md +++ b/docs/tutorials/mikroorm.md @@ -296,16 +296,14 @@ The `@Transactional()` decorator allows you to enable a retry policy for the par ```typescript import {OptimisticLockError} from "@mikro-orm/core"; import {RetryStrategy} from "@tsed/mikro-orm"; +import {OverrideProvider} from "@tsed/di"; +import {setTimeout} from "timers/promises"; -export interface ExponentialBackoffOptions { - maxDepth: number; -} - +@OverrideProvider(RetryStrategy) export class ExponentialBackoff implements RetryStrategy { + private readonly maxDepth = 3; private depth = 0; - constructor(private readonly options: ExponentialBackoffOptions) {} - public async acquire unknown>(task: T): Promise> { try { return (await task()) as ReturnType; @@ -323,24 +321,13 @@ export class ExponentialBackoff implements RetryStrategy { } private async retry unknown>(task: T): Promise> { - await this.sleep(2 ** this.depth * 50); + await setTimeout(2 ** this.depth * 50); this.depth += 1; return this.acquire(task); } - - private sleep(milliseconds: number): Promise { - return new Promise((resolve) => setTimeout(resolve, milliseconds)); - } } - -registerProvider({ - provide: RetryStrategy, - useFactory(): ExponentialBackoff { - return new ExponentialBackoff({maxDepth: 3}); - } -}); ``` `ExponentialBackoff` invokes passed function recursively is contained in a try/catch block. The method returns control to the interceptor if the call to the `task` function succeeds without throwing an exception. If the `task` method fails, the catch block examines the reason for the failure. If it's optimistic locking the code waits for a short delay before retrying the operation. @@ -397,6 +384,52 @@ export class SomeSubscriber implements EventSubscriber { } ``` +## Transaction Hooks + +The transaction hooks allow you to customize the default transaction behavior. These hooks enable you to execute custom code before and after committing data to the database. These transaction hooks provide a flexible way to extend the default transaction behavior and implement advanced patterns such as the Inbox pattern or domain event dispatching. + +### BeforeTransactionCommit Hook + +The `BeforeTransactionCommit` interface allows you to define hooks that are executed right before committing data to the database. This hook provides a way to modify data within the same transaction context and perform additional operations before the transaction is committed. + +To use the `BeforeTransactionCommit` hook, first, you have to implement the `BeforeTransactionCommit` interface: + +```typescript +import {BeforeTransactionCommit} from "@tsed/mikro-orm"; +import {EntityManager} from "@mikro-orm/core"; +import {Injectable} from "@tsed/di"; + +@Injectable() +export class Hooks implements BeforeTransactionCommit { + $beforeTransactionCommit(em: EntityManager): Promise | unknown { + // Custom code executed before committing data + } +} +``` + +Then just write your code inside the `$beforeTransactionCommit` method. This code will be executed before the transaction is committed. + +### AfterTransactionCommit Hook + +The `AfterTransactionCommit` interface allows you to define hooks that are executed right after committing data to the database. This hook enables you to execute code after the data is committed, making multiple transactions. + +To use the `AfterTransactionCommit` hook, you have to implement the `AfterTransactionCommit` interface: + +```typescript +import {AfterTransactionCommit} from "@tsed/mikro-orm"; +import {EntityManager} from "@mikro-orm/core"; +import {Injectable} from "@tsed/di"; + +@Injectable() +export class Hooks implements AfterTransactionCommit { + $afterTransactionCommit(em: EntityManager): Promise | unknown { + // Custom code executed after committing data + } +} +``` + +It's important to note that when using the `AfterTransactionCommit` hook, you need to handle eventual consistency and compensatory actions in case of failures on your own. + ## Author diff --git a/packages/orm/mikro-orm/readme.md b/packages/orm/mikro-orm/readme.md index 62437eff310..7dd54d9c9ea 100644 --- a/packages/orm/mikro-orm/readme.md +++ b/packages/orm/mikro-orm/readme.md @@ -316,16 +316,14 @@ The `@Transactional()` decorator allows you to enable a retry policy for the par ```typescript import {OptimisticLockError} from "@mikro-orm/core"; import {RetryStrategy} from "@tsed/mikro-orm"; +import {OverrideProvider} from "@tsed/di"; +import {setTimeout} from "timers/promises"; -export interface ExponentialBackoffOptions { - maxDepth: number; -} - +@OverrideProvider(RetryStrategy) export class ExponentialBackoff implements RetryStrategy { + private readonly maxDepth = 3; private depth = 0; - constructor(private readonly options: ExponentialBackoffOptions) {} - public async acquire unknown>(task: T): Promise> { try { return (await task()) as ReturnType; @@ -343,24 +341,13 @@ export class ExponentialBackoff implements RetryStrategy { } private async retry unknown>(task: T): Promise> { - await this.sleep(2 ** this.depth * 50); + await setTimeout(2 ** this.depth * 50); this.depth += 1; return this.acquire(task); } - - private sleep(milliseconds: number): Promise { - return new Promise((resolve) => setTimeout(resolve, milliseconds)); - } } - -registerProvider({ - provide: RetryStrategy, - useFactory(): ExponentialBackoff { - return new ExponentialBackoff({maxDepth: 3}); - } -}); ``` `ExponentialBackoff` invokes passed function recursively is contained in a try/catch block. The method returns control to the interceptor if the call to the `task` function succeeds without throwing an exception. If the `task` method fails, the catch block examines the reason for the failure. If it's optimistic locking the code waits for a short delay before retrying the operation. @@ -417,6 +404,54 @@ export class SomeSubscriber implements EventSubscriber { } ``` +## Transaction Hooks + +The transaction hooks allow you to customize the default transaction behavior. These hooks enable you to execute custom code before and after committing data to the database. These transaction hooks provide a flexible way to extend the default transaction behavior and implement advanced patterns such as the Inbox pattern or domain event dispatching. + +### BeforeTransactionCommit Hook + +The `BeforeTransactionCommit` interface allows you to define hooks that are executed right before committing data to the database. This hook provides a way to modify data within the same transaction context and perform additional operations before the transaction is committed. + +To use the `BeforeTransactionCommit` hook, first, you have to implement the `BeforeTransactionCommit` interface: + +```typescript +import {BeforeTransactionCommit} from "@tsed/mikro-orm"; +import {EntityManager} from "@mikro-orm/core"; +import {Injectable} from "@tsed/di"; + +@Injectable() +export class Hooks implements BeforeTransactionCommit { + $beforeTransactionCommit(em: EntityManager): Promise | unknown { + // Custom code executed before committing data + } +} +``` + +Then just write your code inside the `$beforeTransactionCommit` method. This code will be executed before the transaction is committed. + +### AfterTransactionCommit Hook + +The `AfterTransactionCommit` interface allows you to define hooks that are executed right after committing data to the database. This hook enables you to execute code after the data is committed, making multiple transactions. + +To use the `AfterTransactionCommit` hook, you have to implement the `AfterTransactionCommit` interface: + +```typescript +import {AfterTransactionCommit} from "@tsed/mikro-orm"; +import {EntityManager} from "@mikro-orm/core"; +import {Injectable} from "@tsed/di"; + +@Injectable() +export class Hooks implements AfterTransactionCommit { + $afterTransactionCommit(em: EntityManager): Promise | unknown { + // Custom code executed after committing data + } +} +``` + +::: tip Note +When using the `AfterTransactionCommit` hook, you need to handle eventual consistency and compensatory actions in case of failures on your own. +::: + ## Contributors Please read [contributing guidelines here](https://tsed.io/contributing.html) diff --git a/packages/orm/mikro-orm/src/MikroOrmModule.ts b/packages/orm/mikro-orm/src/MikroOrmModule.ts index 277bb223c8e..934749d8078 100644 --- a/packages/orm/mikro-orm/src/MikroOrmModule.ts +++ b/packages/orm/mikro-orm/src/MikroOrmModule.ts @@ -13,7 +13,7 @@ import { } from "@tsed/di"; import {EventSubscriber, Options} from "@mikro-orm/core"; import {MikroOrmRegistry} from "./services/MikroOrmRegistry"; -import {RetryStrategy} from "./services/RetryStrategy"; +import {RetryStrategy} from "./interfaces/RetryStrategy"; import {OptimisticLockErrorFilter} from "./filters/OptimisticLockErrorFilter"; import {MikroOrmContext} from "./services/MikroOrmContext"; import {classOf, isFunction, Store} from "@tsed/core"; diff --git a/packages/orm/mikro-orm/src/index.ts b/packages/orm/mikro-orm/src/index.ts index 401a9ec9d82..1e10cbcac17 100644 --- a/packages/orm/mikro-orm/src/index.ts +++ b/packages/orm/mikro-orm/src/index.ts @@ -10,7 +10,9 @@ export * from "./decorators/subscriber"; export * from "./decorators/transactional"; export * from "./filters/OptimisticLockErrorFilter"; export * from "./interceptors/TransactionalInterceptor"; +export * from "./interfaces/AfterTransactionCommit"; +export * from "./interfaces/BeforeTransactionCommit"; +export * from "./interfaces/RetryStrategy"; export * from "./services/MikroOrmContext"; export * from "./services/MikroOrmFactory"; export * from "./services/MikroOrmRegistry"; -export * from "./services/RetryStrategy"; diff --git a/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.spec.ts b/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.spec.ts index d19b0f97d87..5c6957c1503 100644 --- a/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.spec.ts +++ b/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.spec.ts @@ -1,13 +1,27 @@ import {TransactionalInterceptor} from "./TransactionalInterceptor"; import {anyFunction, anything, deepEqual, instance, mock, objectContaining, reset, verify, when} from "ts-mockito"; -import {InterceptorContext} from "@tsed/di"; +import {InjectorService, InterceptorContext} from "@tsed/di"; import {Logger} from "@tsed/logger"; import {EntityManager, IsolationLevel, MikroORM, OptimisticLockError} from "@mikro-orm/core"; import {MikroOrmRegistry} from "../services/MikroOrmRegistry"; -import {RetryStrategy} from "../services/RetryStrategy"; +import {RetryStrategy} from "../interfaces/RetryStrategy"; import {MikroOrmContext} from "../services/MikroOrmContext"; +// AHDOC: https://github.com/NagRock/ts-mockito/issues/191 +// TODO: avoid using ts-mockito +const resolvableInstance = (m: T): T => + new Proxy(instance(m), { + get(target, prop, receiver) { + if (["Symbol(Symbol.toPrimitive)", "then", "catch"].includes(prop.toString())) { + return undefined; + } + + return Reflect.get(target, prop, receiver); + } + }); + describe("TransactionalInterceptor", () => { + const mockedInjectorService = mock(); const mockedMikroOrmRegistry = mock(); const mockedMikroOrm = mock(); const mockedEntityManager = mock(); @@ -16,11 +30,14 @@ describe("TransactionalInterceptor", () => { const mockedMikroOrmContext = mock(); const next = jest.fn(); + let entityManger!: EntityManager; + let mikroOrm!: MikroORM; let transactionalInterceptor!: TransactionalInterceptor; afterEach(() => { next.mockReset(); - reset( + reset( + mockedInjectorService, mockedMikroOrmRegistry, mockedMikroOrm, mockedEntityManager, @@ -30,9 +47,12 @@ describe("TransactionalInterceptor", () => { ); }); beforeEach(() => { - when(mockedMikroOrm.em).thenReturn(instance(mockedEntityManager)); + entityManger = resolvableInstance(mockedEntityManager); + mikroOrm = resolvableInstance(mockedMikroOrm); + when(mockedMikroOrm.em).thenReturn(entityManger); transactionalInterceptor = new TransactionalInterceptor( + instance(mockedInjectorService), instance(mockedMikroOrmRegistry), instance(mockedMikroOrmContext), instance(mockedLogger) @@ -43,9 +63,8 @@ describe("TransactionalInterceptor", () => { it("should run within a existing context", async () => { // arrange const context = {} as InterceptorContext; - const entityManger = instance(mockedEntityManager); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); @@ -59,9 +78,8 @@ describe("TransactionalInterceptor", () => { it("should set an isolation level", async () => { // arrange const context = {options: {isolationLevel: IsolationLevel.SERIALIZABLE}} as InterceptorContext; - const entityManger = instance(mockedEntityManager); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); @@ -72,65 +90,67 @@ describe("TransactionalInterceptor", () => { verify(mockedEntityManager.transactional(anyFunction(), objectContaining({isolationLevel: IsolationLevel.SERIALIZABLE}))).once(); }); - it("should set an isolation level by default", async () => { + it("should run within a new context", async () => { // arrange - const context = {options: {}} as InterceptorContext; - const entityManger = instance(mockedEntityManager); + const context = {} as InterceptorContext; - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); - when(mockedMikroOrmContext.has(anything())).thenReturn(true); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); + when(mockedMikroOrmContext.has(anything())).thenReturn(false); when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); + when(mockedMikroOrmContext.run(anything(), anything())).thenCall((_: EntityManager[], func: (...args: unknown[]) => unknown) => + func() + ); // act await transactionalInterceptor.intercept(context, next); // assert - verify(mockedEntityManager.transactional(anyFunction(), objectContaining({isolationLevel: IsolationLevel.READ_COMMITTED}))).once(); + verify(mockedMikroOrmContext.run(deepEqual([entityManger]), anyFunction())).twice(); + verify(mockedEntityManager.transactional(anyFunction(), objectContaining({}))).once(); }); - it("should run within a new context", async () => { + it("should perform a task within a transaction", async () => { // arrange const context = {} as InterceptorContext; - const entityManger = instance(mockedEntityManager); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); - when(mockedMikroOrmContext.has(anything())).thenReturn(false); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); + when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); - when(mockedMikroOrmContext.run(anything(), anything())).thenCall((_: EntityManager[], func: (...args: unknown[]) => unknown) => - func() - ); + when(mockedEntityManager.transactional(anything(), anything())).thenCall((func: (...args: unknown[]) => unknown) => func()); // act await transactionalInterceptor.intercept(context, next); // assert - verify(mockedMikroOrmContext.run(deepEqual([entityManger]), anyFunction())).once(); - verify(mockedEntityManager.transactional(anyFunction(), objectContaining({}))).once(); + expect(next).toHaveBeenCalled(); }); - it("should perform a task within a transaction", async () => { + it("should emit events in the right order", async () => { // arrange const context = {} as InterceptorContext; - const entityManger = instance(mockedEntityManager); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); when(mockedEntityManager.transactional(anything(), anything())).thenCall((func: (...args: unknown[]) => unknown) => func()); + when(mockedMikroOrmContext.run(anything(), anything())).thenCall((_: EntityManager[], func: (...args: unknown[]) => unknown) => + func() + ); // act await transactionalInterceptor.intercept(context, next); // assert - expect(next).toHaveBeenCalled(); + verify(mockedInjectorService.alterAsync("$beforeTransactionCommit", entityManger)).calledBefore( + mockedInjectorService.alterAsync("$afterTransactionCommit", entityManger) + ); }); it("should disable an explicit transaction", async () => { // arrange const context = {options: {disabled: true}} as InterceptorContext; - const entityManger = instance(mockedEntityManager); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); @@ -146,7 +166,7 @@ describe("TransactionalInterceptor", () => { // arrange const context = {} as InterceptorContext; - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedEntityManager.transactional(anything(), anything())).thenCall((func: (...args: unknown[]) => unknown) => func()); @@ -175,9 +195,9 @@ describe("TransactionalInterceptor", () => { const context = {} as InterceptorContext; when(mockedMikroOrmContext.has(anything())).thenReturn(true); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedEntityManager.transactional(anything(), anything())).thenReject(OptimisticLockError.lockFailed("Lock")); - when(mockedMikroOrmContext.get(anything())).thenReturn(instance(mockedEntityManager)); + when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); // act const result = transactionalInterceptor.intercept(context, next); @@ -190,6 +210,7 @@ describe("TransactionalInterceptor", () => { it("should apply a retry mechanism if retry is enabled", async () => { // arrange transactionalInterceptor = new TransactionalInterceptor( + instance(mockedInjectorService), instance(mockedMikroOrmRegistry), instance(mockedMikroOrmContext), instance(mockedLogger), @@ -199,9 +220,9 @@ describe("TransactionalInterceptor", () => { when(mockedMikroOrmContext.has(anything())).thenReturn(true); when(mockedRetryStrategy.acquire(anything())).thenResolve(); - when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm)); + when(mockedMikroOrmRegistry.get(anything())).thenReturn(mikroOrm); when(mockedEntityManager.transactional(anything(), anything())).thenReject(OptimisticLockError.lockFailed("Lock")); - when(mockedMikroOrmContext.get(anything())).thenReturn(instance(mockedEntityManager)); + when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger); // act await transactionalInterceptor.intercept(context, next); diff --git a/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.ts b/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.ts index 74d0f49057f..76c0f74eb5c 100644 --- a/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.ts +++ b/packages/orm/mikro-orm/src/interceptors/TransactionalInterceptor.ts @@ -1,9 +1,10 @@ -import {Inject, Interceptor, InterceptorContext, InterceptorMethods, InterceptorNext} from "@tsed/di"; +import {Inject, InjectorService, Interceptor, InterceptorContext, InterceptorMethods, InterceptorNext} from "@tsed/di"; import {Logger} from "@tsed/logger"; -import {RetryStrategy} from "../services/RetryStrategy"; +import {RetryStrategy} from "../interfaces/RetryStrategy"; import {MikroOrmContext} from "../services/MikroOrmContext"; import {MikroOrmRegistry} from "../services/MikroOrmRegistry"; -import {FlushMode, IsolationLevel} from "@mikro-orm/core"; +import {EntityManager, FlushMode, IsolationLevel} from "@mikro-orm/core"; +import {DEFAULT_CONTEXT_NAME} from "../constants"; export interface TransactionOptions { retry?: boolean; @@ -13,11 +14,12 @@ export interface TransactionOptions { contextName?: string; } -type TransactionSettings = Required> & {flushMode?: FlushMode}; +type TransactionSettings = Omit & Required>; @Interceptor() export class TransactionalInterceptor implements InterceptorMethods { constructor( + @Inject() private readonly injector: InjectorService, @Inject() private readonly registry: MikroOrmRegistry, @Inject() private readonly context: MikroOrmContext, @Inject() private readonly logger: Logger, @@ -26,85 +28,102 @@ export class TransactionalInterceptor implements InterceptorMethods { ) {} public intercept(context: InterceptorContext, next: InterceptorNext) { - const options = this.extractContextName(context); + const options = this.extractContextName(context.options); if (options.retry && !this.retryStrategy) { this.logger.warn(`To retry a transaction you have to implement a "${RetryStrategy.description}" interface`); } - if (this.context.has(options.contextName)) { - return this.runWithinCtx(next, options); - } - - return this.runWithinNewCtx(next, options); + return this.context.has(options.contextName) ? this.runWithinCtx(next, options) : this.runWithinNewCtx(next, options); } - private runWithinCtx(next: InterceptorNext, options: TransactionSettings): Promise | unknown { + private runWithinCtx(next: InterceptorNext, options: TransactionSettings): Promise { const callback = () => this.executeInTransaction(next, options); return options.retry && this.retryStrategy ? this.retryStrategy.acquire(callback) : callback(); } - private runWithinNewCtx(next: InterceptorNext, options: TransactionSettings): Promise | unknown { - const orm = this.registry.get(options.contextName); + private async runWithinNewCtx(next: InterceptorNext, options: TransactionSettings): Promise { + const em = await this.getGlobalEntityManager(options.contextName); + + return this.context.run([em], () => this.runWithinCtx(next, options)); + } + + private getGlobalEntityManager(contextName: string = DEFAULT_CONTEXT_NAME): EntityManager { + const orm = this.registry.get(contextName); if (!orm) { - return Promise.reject( - new Error( - `No such context: ${options.contextName}. Please register a corresponding MikroOrm instance using '@Configuration()' decorator.` - ) + throw new Error( + `No such context: ${contextName}. Please register a corresponding MikroOrm instance using '@Configuration()' decorator.` ); } - return this.context.run([orm.em], () => this.runWithinCtx(next, options)); + return orm.em; } - private extractContextName(context: InterceptorContext): TransactionSettings { - const options = (context.options || {}) as TransactionOptions | string; - - let isolationLevel: IsolationLevel | undefined; - let disabled: boolean | undefined; - let contextName: string | undefined; - let flushMode: FlushMode | undefined; - let retry: boolean | undefined; - + private extractContextName(options: TransactionOptions | string | undefined): TransactionSettings { if (typeof options === "string") { - contextName = options; - } else if (options) { - contextName = options.contextName; - isolationLevel = options.isolationLevel; - retry = options.retry; - disabled = options.disabled; - flushMode = options.flushMode; + return { + contextName: options + }; } + const {flushMode, isolationLevel, contextName = DEFAULT_CONTEXT_NAME, retry = false, disabled = false} = options ?? {}; + return { + contextName, + retry, + disabled, flushMode, - retry: retry ?? false, - disabled: disabled ?? false, - contextName: contextName ?? "default", - isolationLevel: isolationLevel ?? IsolationLevel.READ_COMMITTED + isolationLevel }; } - private executeInTransaction(next: InterceptorNext, options: TransactionSettings): Promise | unknown { - const manager = this.context.get(options.contextName); - - if (!manager) { - return Promise.reject( - new Error( - `No such context: ${options.contextName}. Please check if the async context is lost in one of the asynchronous operations.` - ) - ); - } + private async executeInTransaction(next: InterceptorNext, options: TransactionSettings): Promise { + const em = this.getEntityManager(options.contextName); if (options.disabled) { return next(); } - return manager.transactional(() => Promise.resolve(next()), { + const result = await em.transactional(() => this.executeHandlerDelegate(next, options), { flushMode: options.flushMode, isolationLevel: options.isolationLevel }); + + await this.afterTransactionCommit(options); + + return result; + } + + private async executeHandlerDelegate(next: InterceptorNext, options: TransactionSettings): Promise { + const result = await next(); + await this.beforeTransactionCommit(options); + return result; + } + + private beforeTransactionCommit(options: TransactionSettings): Promise { + return this.injector.alterAsync("$beforeTransactionCommit", this.getEntityManager(options.contextName)); + } + + private afterTransactionCommit(options: TransactionSettings): Promise { + const em = this.getGlobalEntityManager(options.contextName); + + return this.context.run([em], () => { + const forkedEm = this.getEntityManager(options.contextName); + forkedEm.resetTransactionContext(); + + return this.injector.alterAsync("$afterTransactionCommit", forkedEm); + }); + } + + private getEntityManager(contextName: string = DEFAULT_CONTEXT_NAME): EntityManager { + const em = this.context.get(contextName); + + if (!em) { + throw new Error(`No such context: ${contextName}. Please check if the async context is lost in one of the asynchronous operations.`); + } + + return em; } } diff --git a/packages/orm/mikro-orm/src/interfaces/AfterTransactionCommit.ts b/packages/orm/mikro-orm/src/interfaces/AfterTransactionCommit.ts new file mode 100644 index 00000000000..3ce30042505 --- /dev/null +++ b/packages/orm/mikro-orm/src/interfaces/AfterTransactionCommit.ts @@ -0,0 +1,5 @@ +import {EntityManager} from "@mikro-orm/core"; + +export interface AfterTransactionCommit { + $afterTransactionCommit(em: EntityManager): Promise | unknown; +} diff --git a/packages/orm/mikro-orm/src/interfaces/BeforeTransactionCommit.ts b/packages/orm/mikro-orm/src/interfaces/BeforeTransactionCommit.ts new file mode 100644 index 00000000000..69d7b5aaad6 --- /dev/null +++ b/packages/orm/mikro-orm/src/interfaces/BeforeTransactionCommit.ts @@ -0,0 +1,5 @@ +import {EntityManager} from "@mikro-orm/core"; + +export interface BeforeTransactionCommit { + $beforeTransactionCommit(em: EntityManager): Promise | unknown; +} diff --git a/packages/orm/mikro-orm/src/services/RetryStrategy.ts b/packages/orm/mikro-orm/src/interfaces/RetryStrategy.ts similarity index 71% rename from packages/orm/mikro-orm/src/services/RetryStrategy.ts rename to packages/orm/mikro-orm/src/interfaces/RetryStrategy.ts index 88625db3593..c1ef1d7574b 100644 --- a/packages/orm/mikro-orm/src/services/RetryStrategy.ts +++ b/packages/orm/mikro-orm/src/interfaces/RetryStrategy.ts @@ -1,5 +1,5 @@ export interface RetryStrategy { - acquire unknown>(task: T): Promise>; + acquire unknown>(task: T): Promise extends Promise ? V : ReturnType>; } export const RetryStrategy: unique symbol = Symbol("RetryStrategy"); diff --git a/packages/orm/mikro-orm/test/helpers/services/Hooks.ts b/packages/orm/mikro-orm/test/helpers/services/Hooks.ts new file mode 100644 index 00000000000..22196108dd1 --- /dev/null +++ b/packages/orm/mikro-orm/test/helpers/services/Hooks.ts @@ -0,0 +1,14 @@ +import {AfterTransactionCommit, BeforeTransactionCommit} from "../../../src"; +import {EntityManager} from "@mikro-orm/core"; +import {Injectable} from "@tsed/di"; + +@Injectable() +export class Hooks implements AfterTransactionCommit, BeforeTransactionCommit { + $afterTransactionCommit(em: EntityManager): Promise | unknown { + return Promise.resolve(); + } + + $beforeTransactionCommit(em: EntityManager): Promise | unknown { + return Promise.resolve(); + } +} diff --git a/packages/orm/mikro-orm/test/helpers/services/UserService.ts b/packages/orm/mikro-orm/test/helpers/services/UserService.ts index c70881268cd..66f2dc98fa3 100644 --- a/packages/orm/mikro-orm/test/helpers/services/UserService.ts +++ b/packages/orm/mikro-orm/test/helpers/services/UserService.ts @@ -28,6 +28,6 @@ export class UserService { @Transactional() create(data: {email: string}): Promise { - return this.orm.em.create(User, data); + return Promise.resolve(this.orm.em.create(User, data)); } } diff --git a/packages/orm/mikro-orm/test/integration.spec.ts b/packages/orm/mikro-orm/test/integration.spec.ts index 4c7bb7acacf..b6a0529cf2a 100644 --- a/packages/orm/mikro-orm/test/integration.spec.ts +++ b/packages/orm/mikro-orm/test/integration.spec.ts @@ -4,13 +4,18 @@ import {TestMongooseContext} from "@tsed/testing-mongoose"; import {User} from "./helpers/entity/User"; import {Server} from "./helpers/Server"; import {UserService} from "./helpers/services/UserService"; -import {MikroORM} from "@mikro-orm/core"; -import {anything, spy, verify} from "ts-mockito"; +import {EntityManager, MikroORM} from "@mikro-orm/core"; +import {anyOfClass, anything, reset, spy, verify} from "ts-mockito"; import {UnmanagedEventSubscriber1} from "./helpers/services/UnmanagedEventSubscriber1"; import {UnmanagedEventSubscriber2} from "./helpers/services/UnmanagedEventSubscriber2"; import {MikroOrmModule, TransactionalInterceptor} from "../src"; +import {Hooks} from "./helpers/services/Hooks"; describe("MikroOrm integration", () => { + let spiedLogger!: Logger; + let spiedTransactionalInterceptor!: TransactionalInterceptor; + let spiedHooks!: Hooks; + beforeEach(async () => { await TestMongooseContext.install(); const {url: clientUrl} = await TestMongooseContext.getMongooseOptions(); @@ -22,6 +27,7 @@ describe("MikroOrm integration", () => { clientUrl, type: "mongo", entities: [User], + // @ts-expect-error mikro-orm supports the class reference starting from v6 subscribers: [UnmanagedEventSubscriber1, new UnmanagedEventSubscriber2()] }, { @@ -40,11 +46,20 @@ describe("MikroOrm integration", () => { }); await bstrp(); + + spiedLogger = spy(PlatformTest.get(Logger)); + spiedTransactionalInterceptor = spy(PlatformTest.get(TransactionalInterceptor)); + spiedHooks = spy(PlatformTest.get(Hooks)); + }); + + afterEach(async () => { + reset(spiedLogger, spiedTransactionalInterceptor, spiedHooks); + + await TestMongooseContext.reset(); }); - afterEach(TestMongooseContext.reset); it("should return repository", () => { - const service = PlatformTest.injector.get(UserService)!; + const service = PlatformTest.get(UserService)!; expect(service).toBeInstanceOf(UserService); expect(service.orm).toBeInstanceOf(MikroORM); @@ -63,9 +78,7 @@ describe("MikroOrm integration", () => { }); it("should create a request context", async () => { - const service = PlatformTest.injector.get(UserService)!; - const transactionalInterceptor = PlatformTest.injector.get(TransactionalInterceptor)!; - const spiedTransactionalInterceptor = spy(transactionalInterceptor); + const service = PlatformTest.get(UserService)!; await service.create({email: "test@example.com"}); @@ -73,12 +86,20 @@ describe("MikroOrm integration", () => { }); it("should resolve the configured subscribers", async () => { - const service = PlatformTest.injector.get(UserService)!; - const logger = PlatformTest.injector.get(Logger)!; - const spiedLogger = spy(logger); + const service = PlatformTest.get(UserService)!; await service.create({email: "test@example.com"}); verify(spiedLogger.info("Changes has been flushed.")).thrice(); }); + + it("should emit the $afterTransactionCommit event", async () => { + const service = PlatformTest.get(UserService)!; + + await service.create({email: "test@example.com"}); + + verify(spiedHooks.$afterTransactionCommit(anyOfClass(EntityManager))).calledAfter( + spiedHooks.$beforeTransactionCommit(anyOfClass(EntityManager)) + ); + }); });