Skip to content

Commit

Permalink
[Proposal] middleware support before handleNextMessage (#156)
Browse files Browse the repository at this point in the history
* notes

* WIP middlewares

* comments

* MR feedback

* remove comments

* changing the naming of the middlewares

* added tests to cover the new middleware

* removing my use of sleep

Co-authored-by: edward <edward@optizmo.com>
  • Loading branch information
mod35 and edward committed Nov 9, 2021
1 parent 894b96f commit cf057fa
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 18 deletions.
9 changes: 9 additions & 0 deletions packages/bus-core/src/service-bus/bus.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages'
import { Middleware } from '../util'
import { TransportMessage } from '../transport'

export enum BusState {
Expand Down Expand Up @@ -75,4 +76,12 @@ export interface Bus {
* @template TransportMessageType - The raw message type returned from the transport that will be passed to the hooks
*/
off<TransportMessageType = unknown> (action: HookAction, callback: HookCallback<TransportMessageType>): void

/**
* Register optional middlewares that will run for each message that is polled from the transport
* Note these middlewares only run when polling successfully pulls a message off the Transports queue
* After all the user defined middlewares have registered. @see start and @see stop should add/remove a final bus middleware
* that ensures the message is correctly dispatched to the handlers and removed from the underlying transport
*/
messageReadMiddleware<TransportMessageType = unknown> (useBeforeHandleNextMessageMiddleware: Middleware<TransportMessage<TransportMessageType>>): void
}
45 changes: 33 additions & 12 deletions packages/bus-core/src/service-bus/service-bus.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import { BusState } from './bus'
import { TestEvent } from '../test/test-event'
import { TestEvent2 } from '../test/test-event-2'
import { TestCommand } from '../test/test-command'
import { sleep } from '../util'
import { Middleware, sleep } from '../util'
import { Container, inject } from 'inversify'
import { TestContainer } from '../test/test-container'
import { BUS_SYMBOLS } from '../bus-symbols'
import { Logger } from '@node-ts/logger-core'
import { Mock, IMock, Times } from 'typemoq'
import { HandlesMessage } from '../handler'
import { Mock, IMock, Times, It } from 'typemoq'
import { HandlesMessage, MessageType } from '../handler'
import { ApplicationBootstrap } from '../application-bootstrap'
import { MessageAttributes } from '@node-ts/bus-messages'
import { BusConfiguration } from './bus-configuration'
Expand Down Expand Up @@ -55,6 +55,7 @@ describe('ServiceBus', () => {
let queue: MemoryQueue

let callback: IMock<Callback>
let messageReadMiddleware: IMock<Middleware<TransportMessage<MessageType>>>

beforeAll(async () => {
container = new TestContainer().silenceLogs()
Expand All @@ -68,8 +69,12 @@ describe('ServiceBus', () => {

callback = Mock.ofType<Callback>()
container.bind(CALLBACK).toConstantValue(callback.object)
await bootstrapper.initialize(container)

sut = container.get(BUS_SYMBOLS.Bus)
messageReadMiddleware = Mock.ofType<Middleware<TransportMessage<MessageType>>>()

sut.messageReadMiddleware<MessageType>(messageReadMiddleware.object)
await bootstrapper.initialize(container)
})

afterAll(async () => {
Expand Down Expand Up @@ -129,18 +134,34 @@ describe('ServiceBus', () => {
})

describe('when a message is successfully handled from the queue', () => {
it('should delete the message from the queue', async () => {
callback.reset()
callback
.setup(c => c())
.callback(() => undefined)
beforeAll(async () => {
messageReadMiddleware.reset()

messageReadMiddleware
.setup(x => x(It.isAny(),It.isAny()))
.returns((_, next) => next())
.verifiable(Times.once())
await sut.publish(event)
await sleep(10)

expect(queue.depth).toEqual(0)
callback.reset()

await new Promise<void>(async resolve => {
callback
.setup(c => c())
.callback(resolve)
.verifiable(Times.once())
await sut.publish(event)
})
})

it('should call the message handler', () => {
callback.verifyAll()
})
it('should delete the message from the queue', async () => {
expect(queue.depth).toEqual(0)
})
it('should call the message middlewares', async () => {
messageReadMiddleware.verifyAll()
})
})

describe('and a handled message throw an Error', () => {
Expand Down
34 changes: 28 additions & 6 deletions packages/bus-core/src/service-bus/service-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { BUS_SYMBOLS, BUS_INTERNAL_SYMBOLS } from '../bus-symbols'
import { Transport, TransportMessage } from '../transport'
import { Event, Command, MessageAttributes, Message } from '@node-ts/bus-messages'
import { Logger, LOGGER_SYMBOLS } from '@node-ts/logger-core'
import { sleep } from '../util'
import { Middleware, MiddlewareDispatcher, Next, sleep } from '../util'
import { HandlerRegistry, HandlerRegistration } from '../handler'
import * as serializeError from 'serialize-error'
import { SessionScopeBinder } from '../bus-module'
Expand All @@ -20,6 +20,7 @@ export class ServiceBus implements Bus {

private internalState: BusState = BusState.Stopped
private runningWorkerCount = 0
private messageReadMiddlewares: MiddlewareDispatcher<TransportMessage<MessageType>>

constructor (
@inject(BUS_SYMBOLS.Transport) private readonly transport: Transport<{}>,
Expand All @@ -29,7 +30,11 @@ export class ServiceBus implements Bus {
@inject(BUS_INTERNAL_SYMBOLS.BusHooks) private readonly busHooks: BusHooks,
@inject(BUS_SYMBOLS.BusConfiguration) private readonly busConfiguration: BusConfiguration,
@optional() @inject(BUS_INTERNAL_SYMBOLS.RawMessage) private readonly rawMessage: TransportMessage<unknown>

) {
this.messageReadMiddlewares = new MiddlewareDispatcher<TransportMessage<MessageType>>()
// Register our message handling middleware
this.messageReadMiddlewares.useFinal(this.handleNextMessagePolled)
}

async publish<TEvent extends Event> (
Expand Down Expand Up @@ -58,15 +63,22 @@ export class ServiceBus implements Bus {
): Promise<void> {
this.logger.debug('send', { command })
const transportOptions = this.prepareTransportOptions(messageOptions)

await Promise.all(this.busHooks.send.map(callback => callback(command, messageOptions)))
return this.transport.send(command, transportOptions)
}

messageReadMiddleware<MessageType>(messageReadMiddleware: Middleware<TransportMessage<MessageType>>) {
if (this.internalState !== BusState.Stopped) {
throw new Error('ServiceBus must be stopped to add useBforeHandleNextMessageMiddlewares')
}
this.messageReadMiddlewares.use(messageReadMiddleware)
}

async start (): Promise<void> {
if (this.internalState !== BusState.Stopped) {
throw new Error('ServiceBus must be stopped before it can be started')
}

this.internalState = BusState.Starting
this.logger.info('ServiceBus starting...', { concurrency: this.busConfiguration.concurrency })
new Array(this.busConfiguration.concurrency)
Expand All @@ -83,6 +95,7 @@ export class ServiceBus implements Bus {
await sleep(100)
}


this.internalState = BusState.Stopped
this.logger.info('ServiceBus stopped')
}
Expand Down Expand Up @@ -117,11 +130,8 @@ export class ServiceBus implements Bus {

if (message) {
this.logger.debug('Message read from transport', { message })

try {
await this.dispatchMessageToHandlers(message)
this.logger.debug('Message dispatched to all handlers', { message })
await this.transport.deleteMessage(message)
await this.messageReadMiddlewares.dispatch(message)
} catch (error) {
this.logger.warn(
'Message was unsuccessfully handled. Returning to queue',
Expand All @@ -133,6 +143,7 @@ export class ServiceBus implements Bus {
message.attributes,
message
)))
// second middleware call (might not be required)
await this.transport.returnMessage(message)
return false
}
Expand All @@ -143,6 +154,17 @@ export class ServiceBus implements Bus {
}
return false
}
/**
* The final middleware that runs, after all the useBeforeHandleNextMessage middlewares have completed
* It dispatches a message that has been polled from the queue
* and deletes the message from the transport
*/
private handleNextMessagePolled: Middleware<TransportMessage<MessageType>> = async (message: TransportMessage<MessageType>, next: Next): Promise<void> => {
await this.dispatchMessageToHandlers(message)
this.logger.debug('Message dispatched to all handlers', { message })
await this.transport.deleteMessage(message)
return next()
}

private async dispatchMessageToHandlers (
rawMessage: TransportMessage<MessageType>
Expand Down
1 change: 1 addition & 0 deletions packages/bus-core/src/util/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './sleep'
export * from './class-constructor'
export * from './assert-unreachable'
export * from './middleware'
69 changes: 69 additions & 0 deletions packages/bus-core/src/util/middleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* This middleware pattern has been grabbed from here:
* https://evertpot.com/generic-middleware/
*/

/**
* 'next' function, passed to a middleware
*/
export type Next = () => void | Promise<void>;

/**
* A middleware
*/
export type Middleware<T> =
(context: T, next: Next) => Promise<void> | void

/**
* A middleware container and invoker
*/
export class MiddlewareDispatcher<T> {

middlewares: Middleware<T>[];

finalMiddlewares: Middleware<T>[];

constructor() {
this.middlewares = [];
this.finalMiddlewares = [];
}

/**
* Add a middleware function.
*/
use(...middlewares: Middleware<T>[]): void {
this.middlewares.push(...middlewares);
}

/**
* Add 'final' middlewares that will be added to the end of the
* regular middlewares. This allows for finer control when exposing
* the @see use functionality to consumers but wanting to ensure that your
* final middleware is last to run
*/
useFinal(...middlewares: Middleware<T>[]): void {
this.finalMiddlewares.push(...middlewares);
}

/**
* Execute the chain of middlewares, in the order they were added on a
* given Context.
*/
dispatch(context: T): Promise<void> {
return invokeMiddlewares(context, this.middlewares.concat(this.finalMiddlewares))
}
}


async function invokeMiddlewares<T>(context: T, middlewares: Middleware<T>[]): Promise<void> {

if (!middlewares.length) {
return
}

const middleware = middlewares[0]

return middleware(context, async () => {
await invokeMiddlewares(context, middlewares.slice(1))
})
}

0 comments on commit cf057fa

Please sign in to comment.