diff --git a/packages/bus-core/src/service-bus/bus.ts b/packages/bus-core/src/service-bus/bus.ts index 6c0a7315..163172a0 100644 --- a/packages/bus-core/src/service-bus/bus.ts +++ b/packages/bus-core/src/service-bus/bus.ts @@ -1,4 +1,5 @@ import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages' +import { Middleware } from '../util' import { TransportMessage } from '../transport' export enum BusState { @@ -75,4 +76,12 @@ export interface Bus { * @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks */ off (action: HookAction, callback: HookCallback): void + + /** + * Register optional middlewares that will run for each message that is polled from the transport + * Note these middlewares only run when polling successfully pulls a message off the Transports queue + * After all the user defined middlewares have registered. @see start and @see stop should add/remove a final bus middleware + * that ensures the message is correctly dispatched to the handlers and removed from the underlying transport + */ + messageReadMiddleware (useBeforeHandleNextMessageMiddleware: Middleware>): void } diff --git a/packages/bus-core/src/service-bus/service-bus.integration.ts b/packages/bus-core/src/service-bus/service-bus.integration.ts index bc46f077..a40b888e 100644 --- a/packages/bus-core/src/service-bus/service-bus.integration.ts +++ b/packages/bus-core/src/service-bus/service-bus.integration.ts @@ -5,13 +5,13 @@ import { BusState } from './bus' import { TestEvent } from '../test/test-event' import { TestEvent2 } from '../test/test-event-2' import { TestCommand } from '../test/test-command' -import { sleep } from '../util' +import { Middleware, sleep } from '../util' import { Container, inject } from 'inversify' import { TestContainer } from '../test/test-container' import { BUS_SYMBOLS } from '../bus-symbols' import { Logger } from '@node-ts/logger-core' -import { Mock, IMock, Times } from 'typemoq' -import { HandlesMessage } from '../handler' +import { Mock, IMock, Times, It } from 'typemoq' +import { HandlesMessage, MessageType } from '../handler' import { ApplicationBootstrap } from '../application-bootstrap' import { MessageAttributes } from '@node-ts/bus-messages' import { BusConfiguration } from './bus-configuration' @@ -55,6 +55,7 @@ describe('ServiceBus', () => { let queue: MemoryQueue let callback: IMock + let messageReadMiddleware: IMock>> beforeAll(async () => { container = new TestContainer().silenceLogs() @@ -68,8 +69,12 @@ describe('ServiceBus', () => { callback = Mock.ofType() container.bind(CALLBACK).toConstantValue(callback.object) - await bootstrapper.initialize(container) + sut = container.get(BUS_SYMBOLS.Bus) + messageReadMiddleware = Mock.ofType>>() + + sut.messageReadMiddleware(messageReadMiddleware.object) + await bootstrapper.initialize(container) }) afterAll(async () => { @@ -129,18 +134,34 @@ describe('ServiceBus', () => { }) describe('when a message is successfully handled from the queue', () => { - it('should delete the message from the queue', async () => { - callback.reset() - callback - .setup(c => c()) - .callback(() => undefined) + beforeAll(async () => { + messageReadMiddleware.reset() + + messageReadMiddleware + .setup(x => x(It.isAny(),It.isAny())) + .returns((_, next) => next()) .verifiable(Times.once()) - await sut.publish(event) - await sleep(10) - expect(queue.depth).toEqual(0) + callback.reset() + + await new Promise(async resolve => { + callback + .setup(c => c()) + .callback(resolve) + .verifiable(Times.once()) + await sut.publish(event) + }) + }) + + it('should call the message handler', () => { callback.verifyAll() }) + it('should delete the message from the queue', async () => { + expect(queue.depth).toEqual(0) + }) + it('should call the message middlewares', async () => { + messageReadMiddleware.verifyAll() + }) }) describe('and a handled message throw an Error', () => { diff --git a/packages/bus-core/src/service-bus/service-bus.ts b/packages/bus-core/src/service-bus/service-bus.ts index 6c2e18e8..80812b90 100644 --- a/packages/bus-core/src/service-bus/service-bus.ts +++ b/packages/bus-core/src/service-bus/service-bus.ts @@ -5,7 +5,7 @@ import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from '../bus-symbols' import { Transport, TransportMessage } from '../transport' import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages' import { Logger, LOGGER_SYMBOLS } from '@node-ts/logger-core' -import { sleep } from '../util' +import { Middleware, MiddlewareDispatcher, Next, sleep } from '../util' import { HandlerRegistry, HandlerRegistration } from '../handler' import * as serializeError from 'serialize-error' import { SessionScopeBinder } from '../bus-module' @@ -20,6 +20,7 @@ export class ServiceBus implements Bus { private internalState: BusState = BusState.Stopped private runningWorkerCount = 0 + private messageReadMiddlewares: MiddlewareDispatcher> constructor ( @inject(BUS_SYMBOLS.Transport) private readonly transport: Transport<{}>, @@ -29,7 +30,11 @@ export class ServiceBus implements Bus { @inject(BUS_INTERNAL_SYMBOLS.BusHooks) private readonly busHooks: BusHooks, @inject(BUS_SYMBOLS.BusConfiguration) private readonly busConfiguration: BusConfiguration, @optional() @inject(BUS_INTERNAL_SYMBOLS.RawMessage) private readonly rawMessage: TransportMessage + ) { + this.messageReadMiddlewares = new MiddlewareDispatcher>() + // Register our message handling middleware + this.messageReadMiddlewares.useFinal(this.handleNextMessagePolled) } async publish ( @@ -58,15 +63,22 @@ export class ServiceBus implements Bus { ): Promise { this.logger.debug('send', { command }) const transportOptions = this.prepareTransportOptions(messageOptions) - await Promise.all(this.busHooks.send.map(callback => callback(command, messageOptions))) return this.transport.send(command, transportOptions) } + messageReadMiddleware(messageReadMiddleware: Middleware>) { + if (this.internalState !== BusState.Stopped) { + throw new Error('ServiceBus must be stopped to add useBforeHandleNextMessageMiddlewares') + } + this.messageReadMiddlewares.use(messageReadMiddleware) + } + async start (): Promise { if (this.internalState !== BusState.Stopped) { throw new Error('ServiceBus must be stopped before it can be started') } + this.internalState = BusState.Starting this.logger.info('ServiceBus starting...', { concurrency: this.busConfiguration.concurrency }) new Array(this.busConfiguration.concurrency) @@ -83,6 +95,7 @@ export class ServiceBus implements Bus { await sleep(100) } + this.internalState = BusState.Stopped this.logger.info('ServiceBus stopped') } @@ -117,11 +130,8 @@ export class ServiceBus implements Bus { if (message) { this.logger.debug('Message read from transport', { message }) - try { - await this.dispatchMessageToHandlers(message) - this.logger.debug('Message dispatched to all handlers', { message }) - await this.transport.deleteMessage(message) + await this.messageReadMiddlewares.dispatch(message) } catch (error) { this.logger.warn( 'Message was unsuccessfully handled. Returning to queue', @@ -133,6 +143,7 @@ export class ServiceBus implements Bus { message.attributes, message ))) + // second middleware call (might not be required) await this.transport.returnMessage(message) return false } @@ -143,6 +154,17 @@ export class ServiceBus implements Bus { } return false } + /** + * The final middleware that runs, after all the useBeforeHandleNextMessage middlewares have completed + * It dispatches a message that has been polled from the queue + * and deletes the message from the transport + */ + private handleNextMessagePolled: Middleware> = async (message: TransportMessage, next: Next): Promise => { + await this.dispatchMessageToHandlers(message) + this.logger.debug('Message dispatched to all handlers', { message }) + await this.transport.deleteMessage(message) + return next() + } private async dispatchMessageToHandlers ( rawMessage: TransportMessage diff --git a/packages/bus-core/src/util/index.ts b/packages/bus-core/src/util/index.ts index b9c512cc..e52cc11c 100644 --- a/packages/bus-core/src/util/index.ts +++ b/packages/bus-core/src/util/index.ts @@ -1,3 +1,4 @@ export * from './sleep' export * from './class-constructor' export * from './assert-unreachable' +export * from './middleware' diff --git a/packages/bus-core/src/util/middleware.ts b/packages/bus-core/src/util/middleware.ts new file mode 100644 index 00000000..8cbfc215 --- /dev/null +++ b/packages/bus-core/src/util/middleware.ts @@ -0,0 +1,69 @@ +/** + * This middleware pattern has been grabbed from here: + * https://evertpot.com/generic-middleware/ + */ + +/** + * 'next' function, passed to a middleware + */ + export type Next = () => void | Promise; + +/** + * A middleware + */ +export type Middleware = + (context: T, next: Next) => Promise | void + +/** + * A middleware container and invoker + */ +export class MiddlewareDispatcher { + + middlewares: Middleware[]; + + finalMiddlewares: Middleware[]; + + constructor() { + this.middlewares = []; + this.finalMiddlewares = []; + } + + /** + * Add a middleware function. + */ + use(...middlewares: Middleware[]): void { + this.middlewares.push(...middlewares); + } + + /** + * Add 'final' middlewares that will be added to the end of the + * regular middlewares. This allows for finer control when exposing + * the @see use functionality to consumers but wanting to ensure that your + * final middleware is last to run + */ + useFinal(...middlewares: Middleware[]): void { + this.finalMiddlewares.push(...middlewares); + } + + /** + * Execute the chain of middlewares, in the order they were added on a + * given Context. + */ + dispatch(context: T): Promise { + return invokeMiddlewares(context, this.middlewares.concat(this.finalMiddlewares)) + } +} + + +async function invokeMiddlewares(context: T, middlewares: Middleware[]): Promise { + + if (!middlewares.length) { + return + } + + const middleware = middlewares[0] + + return middleware(context, async () => { + await invokeMiddlewares(context, middlewares.slice(1)) + }) +}