Skip to content

Commit

Permalink
Barrier Pattern (#31)
Browse files Browse the repository at this point in the history
* AbstractSqsConsumer consumer handleMessage callback minor refactor

* Adding barrier option to handlerContainer

* Applying barrier to sqs multi consumer

* Minor fix

* Minor change

* adding barrier support to amqp and sns multi consumers

* Todo added

* barrier pattern on sqs mono consumer

* Revert "barrier pattern on sqs mono consumer"

This reverts commit 584f7b1.

* Changing approach for sqs multi-consumer barrier

* Fixing tests

* Minor improvement

* Applying new approach to amqp

* Reverting unneeded change

* shouldProcessMessageLater replaced by preHandlerBarrier

* Adding doc

* Adding mono consumer preHandlerBarrier example

* Improving doc

* Minor improvements

* Adding tests covering error cases

* Amqp barrier test and reverting sns test

* Let's see if CI passes

* Minor tweaks

* Adjust creation order

* Let's try longer delay

* Adjusting amqp to sqs

---------

Co-authored-by: Igor Savin <iselwin@gmail.com>
  • Loading branch information
CarlosGamero and kibertoad committed Aug 22, 2023
1 parent 7aaca4b commit 1d68011
Show file tree
Hide file tree
Showing 19 changed files with 331 additions and 60 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ It consists of the following submodules:
* `close()`, which needs to be invoked when stopping the application;
* `processMessage()`, which accepts as parameter a `message` following a `zod` schema and should be overridden with logic on what to do with the message;
* `start()`, which invokes `init()` and `processMessage()` and handles errors.
* `preHandlerBarrier`, which accepts as a parameter a `message` following a `zod` schema and can be overridden to enable the barrier pattern (see [Barrier pattern](#barrier-pattern))

> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sqs/test/consumers/SqsPermissionConsumerMonoSchema.ts) for a practical example.
Expand All @@ -72,6 +73,16 @@ Then the message is automatically nacked without requeueing by the abstract cons

> **_NOTE:_** See [userConsumerSchemas.ts](./packages/sqs/test/consumers/userConsumerSchemas.ts) and [SqsPermissionsConsumerMonoSchema.spec.ts](./packages/sqs/test/consumers/SqsPermissionsConsumerMonoSchema.spec.ts) for a practical example.
### Barrier pattern
The barrier pattern facilitates the out-of-order message handling by retrying the message later if the system is not still in the good state to be able to process that message.

To enable this pattern you should implement `preHandlerBarrier` including your conditions to process the message so
if the method returns `true` the message will be processed right away but if it returns false it will be retried later

> **_NOTE:_** See [SqsPermissionConsumerMonoSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMonoSchema.ts) for a practical example on mono consumers.
> **_NOTE:_** See [SqsPermissionConsumerMultiSchema.ts](./packages/sns/test/consumers/SnsSqsPermissionConsumerMultiSchema.ts) for a practical example on multi consumers.

## Fan-out to Multiple Consumers

SQS queues are built in a way that every message is only consumed once, and then deleted. If you want to do fan-out to multiple consumers, you need SNS topic in the middle, which is then propagated to all the SQS queues that have subscribed.
Expand Down
21 changes: 19 additions & 2 deletions packages/amqp/lib/AbstractAmqpBaseConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,25 @@ export abstract class AbstractAmqpBaseConsumer<MessagePayloadType extends object
}
}

private async internalProcessMessage(
message: MessagePayloadType,
messageType: string,
): Promise<Either<'retryLater', 'success'>> {
const barrierPassed = await this.preHandlerBarrier(message, messageType)

if (barrierPassed) {
return this.processMessage(message, messageType)
}
return { error: 'retryLater' }
}

protected abstract preHandlerBarrier(
message: MessagePayloadType,
messageType: string,
): Promise<boolean>

abstract processMessage(
messagePayload: MessagePayloadType,
message: MessagePayloadType,
messageType: string,
): Promise<Either<'retryLater', 'success'>>

Expand Down Expand Up @@ -120,7 +137,7 @@ export abstract class AbstractAmqpBaseConsumer<MessagePayloadType extends object
const resolvedLogMessage = this.resolveMessageLog(deserializedMessage.result, messageType)
this.logMessage(resolvedLogMessage)
}
this.processMessage(deserializedMessage.result, messageType)
this.internalProcessMessage(deserializedMessage.result, messageType)
.then((result) => {
if (result.error === 'retryLater') {
this.channel.nack(message, false, true)
Expand Down
7 changes: 7 additions & 0 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ export abstract class AbstractAmqpConsumer<MessagePayloadType extends object>
protected override resolveSchema(_message: MessagePayloadType) {
return this.schemaEither
}

/**
* Override to implement barrier pattern
*/
protected preHandlerBarrier(_message: MessagePayloadType): Promise<boolean> {
return Promise.resolve(true)
}
}
11 changes: 11 additions & 0 deletions packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,18 @@ export abstract class AbstractAmqpConsumerMultiSchema<
messageType: string,
): Promise<Either<'retryLater', 'success'>> {
const handler = this.handlerContainer.resolveHandler(messageType)

// @ts-ignore
return handler.handler(message, this)
}

protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.messageLogFormatter(message)
}

override preHandlerBarrier(message: MessagePayloadType, messageType: string): Promise<boolean> {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.preHandlerBarrier ? handler.preHandlerBarrier(message) : Promise.resolve(true)
}
}
25 changes: 18 additions & 7 deletions packages/amqp/test/consumers/AmqpPermissionConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ export class AmqpPermissionConsumerMultiSchema extends AbstractAmqpConsumerMulti
public addCounter = 0
public removeCounter = 0

constructor(dependencies: AMQPConsumerDependencies, options?: Partial<NewAMQPConsumerOptions>) {
constructor(
dependencies: AMQPConsumerDependencies,
options?: Partial<NewAMQPConsumerOptions> & {
addPreHandlerBarrier?: (message: SupportedEvents) => Promise<boolean>
},
) {
super(dependencies, {
creationConfig: {
queueName: AmqpPermissionConsumerMultiSchema.QUEUE_NAME,
Expand All @@ -40,12 +45,18 @@ export class AmqpPermissionConsumerMultiSchema extends AbstractAmqpConsumerMulti
SupportedEvents,
AmqpPermissionConsumerMultiSchema
>()
.addConfig(PERMISSIONS_ADD_MESSAGE_SCHEMA, async (_message, _context) => {
this.addCounter++
return {
result: 'success',
}
})
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (_message, _context) => {
this.addCounter++
return {
result: 'success',
}
},
{
preHandlerBarrier: options?.addPreHandlerBarrier,
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, async (_message, _context) => {
this.removeCounter++
return {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { waitAndRetry } from '@message-queue-toolkit/core'
import type { AwilixContainer } from 'awilix'
import { asClass, asFunction } from 'awilix'
import { describe, beforeEach, afterEach, expect, it, beforeAll } from 'vitest'
import { describe, beforeEach, afterEach, expect, it } from 'vitest'

import { waitAndRetry } from '../../../core/lib/utils/waitUtils'
import { FakeConsumerErrorResolver } from '../fakes/FakeConsumerErrorResolver'
import { FakeLogger } from '../fakes/FakeLogger'
import type { AmqpPermissionPublisherMultiSchema } from '../publishers/AmqpPermissionPublisherMultiSchema'
Expand Down Expand Up @@ -44,6 +44,64 @@ describe('PermissionsConsumerMultiSchema', () => {
})
})

describe('preHandlerBarrier', () => {
let diContainer: AwilixContainer<Dependencies>
let publisher: AmqpPermissionPublisherMultiSchema

beforeAll(async () => {
diContainer = await registerDependencies(TEST_AMQP_CONFIG)
await diContainer.cradle.permissionConsumerMultiSchema.close()
publisher = diContainer.cradle.permissionPublisherMultiSchema
})

it('blocks first try', async () => {
let barrierCounter = 0
const newConsumer = new AmqpPermissionConsumerMultiSchema(diContainer.cradle, {
addPreHandlerBarrier: (_msg) => {
barrierCounter++
return Promise.resolve(barrierCounter > 1)
},
})
await newConsumer.start()

publisher.publish({
messageType: 'add',
})

await waitAndRetry(() => {
return newConsumer.addCounter === 1
})

expect(newConsumer.addCounter).toBe(1)
expect(barrierCounter).toBe(2)
})

it('throws an error on first try', async () => {
let barrierCounter = 0
const newConsumer = new AmqpPermissionConsumerMultiSchema(diContainer.cradle, {
addPreHandlerBarrier: (_msg) => {
barrierCounter++
if (barrierCounter === 1) {
throw new Error()
}
return Promise.resolve(true)
},
})
await newConsumer.start()

publisher.publish({
messageType: 'add',
})

await waitAndRetry(() => {
return newConsumer.addCounter === 1
})

expect(newConsumer.addCounter).toBe(1)
expect(barrierCounter).toBe(2)
})
})

describe('consume', () => {
let diContainer: AwilixContainer<Dependencies>
let publisher: AmqpPermissionPublisherMultiSchema
Expand Down Expand Up @@ -76,7 +134,7 @@ describe('PermissionsConsumerMultiSchema', () => {
})

await waitAndRetry(() => {
return consumer.addCounter > 0 && consumer.removeCounter == 2
return consumer.addCounter === 1 && consumer.removeCounter === 2
})

expect(consumer.addCounter).toBe(1)
Expand Down
1 change: 1 addition & 0 deletions packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export {
HandlerContainer,
MessageHandlerConfig,
MessageHandlerConfigBuilder,
BarrierCallbackWithoutMessageType,
} from './lib/queues/HandlerContainer'
export type { HandlerContainerOptions, Handler } from './lib/queues/HandlerContainer'

Expand Down
1 change: 1 addition & 0 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export abstract class AbstractQueueService<
protected abstract resolveSchema(
message: MessagePayloadSchemas,
): Either<Error, ZodSchema<MessagePayloadSchemas>>

protected abstract resolveMessage(
message: MessageEnvelopeType,
): Either<MessageInvalidFormatError | MessageValidationError, unknown>
Expand Down
18 changes: 14 additions & 4 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,25 @@ import type { ZodSchema } from 'zod'

export type LogFormatter<MessagePayloadSchema> = (message: MessagePayloadSchema) => unknown

export type BarrierCallbackWithoutMessageType<MessagePayloadSchema extends object> = (
message: MessagePayloadSchema,
) => Promise<boolean>

export const defaultLogFormatter = <MessagePayloadSchema>(message: MessagePayloadSchema) => message

export type HandlerConfigOptions<MessagePayloadSchema> = {
export type HandlerConfigOptions<MessagePayloadSchema extends object> = {
messageLogFormatter?: LogFormatter<MessagePayloadSchema>
preHandlerBarrier?: BarrierCallbackWithoutMessageType<MessagePayloadSchema>
}

export class MessageHandlerConfig<const MessagePayloadSchema, const ExecutionContext> {
export class MessageHandlerConfig<
const MessagePayloadSchema extends object,
const ExecutionContext,
> {
public readonly schema: ZodSchema<MessagePayloadSchema>
public readonly messageLogFormatter: LogFormatter<MessagePayloadSchema>
public readonly handler: Handler<MessagePayloadSchema, ExecutionContext>
public readonly messageLogFormatter: LogFormatter<MessagePayloadSchema>
public readonly preHandlerBarrier?: BarrierCallbackWithoutMessageType<MessagePayloadSchema>

constructor(
schema: ZodSchema<MessagePayloadSchema>,
Expand All @@ -22,10 +31,11 @@ export class MessageHandlerConfig<const MessagePayloadSchema, const ExecutionCon
this.schema = schema
this.handler = handler
this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter
this.preHandlerBarrier = options?.preHandlerBarrier
}
}

export class MessageHandlerConfigBuilder<MessagePayloadSchemas, ExecutionContext> {
export class MessageHandlerConfigBuilder<MessagePayloadSchemas extends object, ExecutionContext> {
private readonly configs: MessageHandlerConfig<MessagePayloadSchemas, ExecutionContext>[]

constructor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export class SnsSqsPermissionConsumerMonoSchema extends AbstractSnsSqsConsumerMo
public static CONSUMED_QUEUE_NAME = 'user_permissions'
public static SUBSCRIBED_TOPIC_NAME = 'user_permissions'

public preHandlerBarrierCounter: number = 0

constructor(
dependencies: SNSSQSConsumerDependencies,
options:
Expand Down Expand Up @@ -70,4 +72,9 @@ export class SnsSqsPermissionConsumerMonoSchema extends AbstractSnsSqsConsumerMo
result: 'success',
}
}

async preHandlerBarrier(_message: PERMISSIONS_MESSAGE_TYPE): Promise<boolean> {
this.preHandlerBarrierCounter++
return this.preHandlerBarrierCounter > 2
}
}
22 changes: 16 additions & 6 deletions packages/sns/test/consumers/SnsSqsPermissionConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class SnsSqsPermissionConsumerMultiSchema extends AbstractSnsSqsConsumerM
public static SUBSCRIBED_TOPIC_NAME = 'user_permissions_multi'

public addCounter = 0
public addBarrierCounter = 0
public removeCounter = 0

constructor(
Expand All @@ -48,12 +49,21 @@ export class SnsSqsPermissionConsumerMultiSchema extends AbstractSnsSqsConsumerM
SupportedEvents,
SnsSqsPermissionConsumerMultiSchema
>()
.addConfig(PERMISSIONS_ADD_MESSAGE_SCHEMA, async (_message, _context) => {
this.addCounter++
return {
result: 'success',
}
})
.addConfig(
PERMISSIONS_ADD_MESSAGE_SCHEMA,
async (_message, _context) => {
this.addCounter++
return {
result: 'success',
}
},
{
preHandlerBarrier: async (_message) => {
this.addBarrierCounter++
return this.addBarrierCounter > 2
},
},
)
.addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, async (_message, _context) => {
this.removeCounter++
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,13 @@ describe('SNS PermissionsConsumer', () => {
describe('consume', () => {
let diContainer: AwilixContainer<Dependencies>
let publisher: SnsPermissionPublisherMonoSchema
let consumer: SnsSqsPermissionConsumerMonoSchema
let fakeResolver: FakeConsumerErrorResolver

beforeEach(async () => {
diContainer = await registerDependencies()
publisher = diContainer.cradle.permissionPublisher
consumer = diContainer.cradle.permissionConsumer
fakeResolver = diContainer.cradle.consumerErrorResolver as FakeConsumerErrorResolver

delete userPermissionMap[100]
Expand Down Expand Up @@ -172,6 +175,7 @@ describe('SNS PermissionsConsumer', () => {
throw new Error('Users permissions unexpectedly null')
}

expect(consumer.preHandlerBarrierCounter).toBe(3)
expect(updatedUsersPermissions).toBeDefined()
expect(updatedUsersPermissions[0]).toHaveLength(2)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,15 @@ describe('SNS PermissionsConsumerMultiSchema', () => {
messageType: 'remove',
})

await waitAndRetry(() => {
return consumer.addCounter > 0 && consumer.removeCounter == 2
})

await waitAndRetry(
() => {
return consumer.addCounter === 1 && consumer.removeCounter === 2
},
30,
20,
)

expect(consumer.addBarrierCounter).toBe(3)
expect(consumer.addCounter).toBe(1)
expect(consumer.removeCounter).toBe(2)
})
Expand Down
8 changes: 4 additions & 4 deletions packages/sns/test/utils/testContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ export async function registerDependencies(
lifetime: Lifetime.SINGLETON,
asyncInit: 'init',
asyncDispose: 'close',
asyncInitPriority: 20,
asyncDisposePriority: 20,
asyncInitPriority: 40,
asyncDisposePriority: 40,
enabled: queuesEnabled,
}),
permissionPublisherMultiSchema: asClass(SnsPermissionPublisherMultiSchema, {
lifetime: Lifetime.SINGLETON,
asyncInit: 'init',
asyncDispose: 'close',
asyncInitPriority: 20,
asyncDisposePriority: 20,
asyncInitPriority: 40,
asyncDisposePriority: 40,
enabled: queuesEnabled,
}),

Expand Down
Loading

0 comments on commit 1d68011

Please sign in to comment.