Skip to content

Commit

Permalink
Adding context on multi consumer prehandler barrier (#34)
Browse files Browse the repository at this point in the history
* Adding context to barrier callback for multi consumers

* Adjusting multiconsumers

* Fixing error on AbstractSnsSqsConsumerMonoSchema
  • Loading branch information
CarlosGamero committed Aug 28, 2023
1 parent 4ad3ea2 commit 452e8f8
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 10 deletions.
8 changes: 6 additions & 2 deletions packages/amqp/lib/AbstractAmqpConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ export abstract class AbstractAmqpConsumerMultiSchema<
return handler.messageLogFormatter(message)
}

override preHandlerBarrier(message: MessagePayloadType, messageType: string): Promise<boolean> {
protected override async preHandlerBarrier(
message: MessagePayloadType,
messageType: string,
): Promise<boolean> {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.preHandlerBarrier ? handler.preHandlerBarrier(message) : Promise.resolve(true)
// @ts-ignore
return handler.preHandlerBarrier ? await handler.preHandlerBarrier(message, this) : true
}
}
2 changes: 1 addition & 1 deletion packages/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export {
HandlerContainer,
MessageHandlerConfig,
MessageHandlerConfigBuilder,
BarrierCallbackWithoutMessageType,
BarrierCallbackMultiConsumers,
} from './lib/queues/HandlerContainer'
export type { HandlerContainerOptions, Handler } from './lib/queues/HandlerContainer'

Expand Down
16 changes: 10 additions & 6 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import type { ZodSchema } from 'zod'

export type LogFormatter<MessagePayloadSchema> = (message: MessagePayloadSchema) => unknown

export type BarrierCallbackWithoutMessageType<MessagePayloadSchema extends object> = (
export type BarrierCallbackMultiConsumers<MessagePayloadSchema extends object, ExecutionContext> = (
message: MessagePayloadSchema,
context: ExecutionContext,
) => Promise<boolean>

export const defaultLogFormatter = <MessagePayloadSchema>(message: MessagePayloadSchema) => message

export type HandlerConfigOptions<MessagePayloadSchema extends object> = {
export type HandlerConfigOptions<MessagePayloadSchema extends object, ExecutionContext> = {
messageLogFormatter?: LogFormatter<MessagePayloadSchema>
preHandlerBarrier?: BarrierCallbackWithoutMessageType<MessagePayloadSchema>
preHandlerBarrier?: BarrierCallbackMultiConsumers<MessagePayloadSchema, ExecutionContext>
}

export class MessageHandlerConfig<
Expand All @@ -21,12 +22,15 @@ export class MessageHandlerConfig<
public readonly schema: ZodSchema<MessagePayloadSchema>
public readonly handler: Handler<MessagePayloadSchema, ExecutionContext>
public readonly messageLogFormatter: LogFormatter<MessagePayloadSchema>
public readonly preHandlerBarrier?: BarrierCallbackWithoutMessageType<MessagePayloadSchema>
public readonly preHandlerBarrier?: BarrierCallbackMultiConsumers<
MessagePayloadSchema,
ExecutionContext
>

constructor(
schema: ZodSchema<MessagePayloadSchema>,
handler: Handler<MessagePayloadSchema, ExecutionContext>,
options?: HandlerConfigOptions<MessagePayloadSchema>,
options?: HandlerConfigOptions<MessagePayloadSchema, ExecutionContext>,
) {
this.schema = schema
this.handler = handler
Expand All @@ -45,7 +49,7 @@ export class MessageHandlerConfigBuilder<MessagePayloadSchemas extends object, E
addConfig<MessagePayloadSchema extends MessagePayloadSchemas>(
schema: ZodSchema<MessagePayloadSchema>,
handler: Handler<MessagePayloadSchema, ExecutionContext>,
options?: HandlerConfigOptions<MessagePayloadSchema>,
options?: HandlerConfigOptions<MessagePayloadSchema, ExecutionContext>,
) {
// @ts-ignore
this.configs.push(new MessageHandlerConfig(schema, handler, options))
Expand Down
10 changes: 10 additions & 0 deletions packages/sns/lib/sns/AbstractSnsSqsConsumerMonoSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ export abstract class AbstractSnsSqsConsumerMonoSchema<
return readSnsMessage(message, this.errorResolver)
}

/**
* Override to implement barrier pattern
*/
protected preHandlerBarrier(
_message: MessagePayloadType,
_messageType: string,
): Promise<boolean> {
return Promise.resolve(true)
}

override async init(): Promise<void> {
if (this.deletionConfig && this.creationConfig && this.subscriptionConfig) {
await deleteSnsSqs(
Expand Down
3 changes: 2 additions & 1 deletion packages/sqs/lib/sqs/AbstractSqsConsumerMultiSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export abstract class AbstractSqsConsumerMultiSchema<
messageType: string,
): Promise<boolean> {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.preHandlerBarrier ? await handler.preHandlerBarrier(message) : true
// @ts-ignore
return handler.preHandlerBarrier ? await handler.preHandlerBarrier(message, this) : true
}
}

0 comments on commit 452e8f8

Please sign in to comment.