diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index fec7a123..a27a4621 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -10,7 +10,6 @@ import type { TransactionObservabilityManager, } from '@message-queue-toolkit/core' import { - isRetryDateExceeded, isMessageError, parseMessage, HandlerContainer, @@ -170,27 +169,26 @@ export abstract class AbstractAmqpConsumer< .then((result) => { if (result.result === 'success') { this.channel.ack(message) - this.handleMessageProcessed(originalMessage, 'consumed') + this.handleMessageProcessed(parsedMessage, 'consumed') return } - // retryLater - const timestamp = this.tryToExtractTimestamp(originalMessage) ?? new Date() // requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop - if (!isRetryDateExceeded(timestamp, this.maxRetryDuration)) { + if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) { + // TODO: Add retry delay + republish message updating internal properties this.channel.nack(message, false, true) - this.handleMessageProcessed(originalMessage, 'retryLater') + this.handleMessageProcessed(parsedMessage, 'retryLater') } else { // ToDo move message to DLQ once it is implemented this.channel.ack(message) - this.handleMessageProcessed(originalMessage, 'error') + this.handleMessageProcessed(parsedMessage, 'error') } }) .catch((err) => { // ToDo we need sanity check to stop trying at some point, perhaps some kind of Redis counter // If we fail due to unknown reason, let's retry this.channel.nack(message, false, true) - this.handleMessageProcessed(originalMessage, 'retryLater') + this.handleMessageProcessed(parsedMessage, 'retryLater') this.handleError(err) }) .finally(() => { diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 60e85541..966f8225 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -85,22 +85,14 @@ export abstract class AbstractAmqpPublisher< return } - /** - * If the message doesn't have a timestamp field -> add it - * will be used on the consumer to prevent infinite retries on the same message - */ - if (!this.tryToExtractTimestamp(message)) { - // @ts-ignore - message[this.messageTimestampField] = new Date().toISOString() - this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`) - } - if (this.logMessages) { // @ts-ignore const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) this.logMessage(resolvedLogMessage) } + message = this.updateInternalProperties(message) + try { this.publishInternal(objectToBuffer(message), options) } catch (err) { diff --git a/packages/amqp/package.json b/packages/amqp/package.json index 8a5baeed..d27ba2e3 100644 --- a/packages/amqp/package.json +++ b/packages/amqp/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/amqp", - "version": "15.3.0", + "version": "15.4.0", "private": false, "license": "MIT", "description": "AMQP adapter for message-queue-toolkit", @@ -29,7 +29,7 @@ "zod": "^3.23.8" }, "peerDependencies": { - "@message-queue-toolkit/core": "^13.3.1", + "@message-queue-toolkit/core": "^13.4.0", "amqplib": "^0.10.3" }, "devDependencies": { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index c4762bcd..7ba916bc 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -13,7 +13,10 @@ import type { Dependencies } from '../utils/testContext' import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext' import { AmqpPermissionConsumer } from './AmqpPermissionConsumer' -import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas' +import type { + PERMISSIONS_ADD_MESSAGE_TYPE, + PERMISSIONS_REMOVE_MESSAGE_TYPE, +} from './userConsumerSchemas' describe('AmqpPermissionConsumer', () => { describe('init', () => { @@ -90,15 +93,15 @@ describe('AmqpPermissionConsumer', () => { expect(logger.loggedMessages.length).toBe(5) expect(logger.loggedMessages).toEqual([ 'Propagating new connection across 0 receivers', - 'timestamp not defined, adding it automatically', { id: '1', messageType: 'add', - timestamp: expect.any(String), }, + 'timestamp not defined, adding it automatically', { id: '1', messageType: 'add', + timestamp: expect.any(String), }, { messageId: '1', @@ -407,11 +410,9 @@ describe('AmqpPermissionConsumer', () => { }) await consumer.start() - const message: PERMISSIONS_MESSAGE_TYPE = { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { id: '1', messageType: 'add', - userIds: [1], - permissions: ['100'], timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(), } publisher.publish(message) @@ -432,11 +433,9 @@ describe('AmqpPermissionConsumer', () => { }) await consumer.start() - const message: PERMISSIONS_MESSAGE_TYPE = { + const message: PERMISSIONS_REMOVE_MESSAGE_TYPE = { id: '1', messageType: 'remove', - userIds: [1], - permissions: ['100'], timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(), } publisher.publish(message) diff --git a/packages/amqp/test/consumers/userConsumerSchemas.ts b/packages/amqp/test/consumers/userConsumerSchemas.ts index 5dd72c2f..30bed14e 100644 --- a/packages/amqp/test/consumers/userConsumerSchemas.ts +++ b/packages/amqp/test/consumers/userConsumerSchemas.ts @@ -11,11 +11,13 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({ export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({ id: z.string(), messageType: z.literal('add'), + timestamp: z.string().or(z.date()).optional(), }) export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({ id: z.string(), messageType: z.literal('remove'), + timestamp: z.string().or(z.date()).optional(), }) export type PERMISSIONS_MESSAGE_TYPE = z.infer diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index bd01a042..3026d370 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -51,10 +51,9 @@ describe('PermissionPublisher', () => { return logger.loggedMessages.length === 2 }) - expect(logger.loggedMessages[2]).toEqual({ + expect(logger.loggedMessages[1]).toEqual({ id: '1', messageType: 'add', - timestamp: expect.any(String), }) }) }) @@ -245,6 +244,7 @@ describe('PermissionPublisher', () => { userIds: [1], permissions: ['100'], timestamp: message.timestamp.toISOString(), + _internalNumberOfRetries: 0, }, }) }) @@ -292,11 +292,12 @@ describe('PermissionPublisher', () => { userIds: [1], permissions: ['100'], timestamp: message.timestamp.toISOString(), + _internalNumberOfRetries: 0, }, }) }) - it('publishes a message auto-filling timestamp', async () => { + it('publishes a message auto-filling internal properties', async () => { await permissionConsumer.close() const message = { @@ -325,11 +326,13 @@ describe('PermissionPublisher', () => { parsedMessage: { id: '2', messageType: 'add', + timestamp: expect.any(String), }, originalMessage: { id: '2', messageType: 'add', timestamp: expect.any(String), + _internalNumberOfRetries: 0, }, }) }) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 5327281a..baea2535 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -7,6 +7,7 @@ import type { ZodSchema, ZodType } from 'zod' import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors' import type { Logger, MessageProcessingResult } from '../types/MessageQueueTypes' import type { DeletionConfig, QueueDependencies, QueueOptions } from '../types/queueOptionsTypes' +import { isRetryDateExceeded } from '../utils/dateUtils' import { toDatePreprocessor } from '../utils/toDateProcessor' import type { @@ -42,11 +43,21 @@ export abstract class AbstractQueueService< ExecutionContext = undefined, PrehandlerOutput = undefined, > { + /** + * Used to keep track of the number of `retryLater` results received for a message to be able to + * calculate the delay for the next retry + */ + private readonly messageNumberOfRetriesField = '_internalNumberOfRetries' + /** + * Used to know when the message was sent initially so we can have a max retry date and avoid + * a infinite `retryLater` loop + */ + protected readonly messageTimestampField: string + protected readonly errorReporter: ErrorReporter public readonly logger: Logger protected readonly messageIdField: string protected readonly messageTypeField: string - protected readonly messageTimestampField: string protected readonly logMessages: boolean protected readonly creationConfig?: QueueConfiguration protected readonly locatorConfig?: QueueLocatorType @@ -198,7 +209,45 @@ export abstract class AbstractQueueService< return await barrier(message, executionContext, preHandlerOutput) } - protected tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined { + shouldBeRetried(message: MessagePayloadSchemas, maxRetryDuration: number): boolean { + const timestamp = this.tryToExtractTimestamp(message) ?? new Date() + return !isRetryDateExceeded(timestamp, maxRetryDuration) + } + + protected getMessageRetryDelayInSeconds(message: MessagePayloadSchemas): number { + // if not defined, this is the first attempt + const retries = this.tryToExtractNumberOfRetries(message) ?? 0 + + // exponential backoff -> (2 ^ (attempts)) * delay + // delay = 1 second + return Math.pow(2, retries) + } + + protected updateInternalProperties(message: MessagePayloadSchemas): MessagePayloadSchemas { + const messageCopy = { ...message } // clone the message to avoid mutation + + /** + * If the message doesn't have a timestamp field -> add it + * will be used to prevent infinite retries on the same message + */ + if (!this.tryToExtractTimestamp(message)) { + // @ts-ignore + messageCopy[this.messageTimestampField] = new Date().toISOString() + this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`) + } + + /** + * add/increment the number of retries performed to exponential message delay + */ + const numberOfRetries = this.tryToExtractNumberOfRetries(message) + // @ts-ignore + messageCopy[this.messageNumberOfRetriesField] = + numberOfRetries !== undefined ? numberOfRetries + 1 : 0 + + return messageCopy + } + + private tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined { // @ts-ignore if (this.messageTimestampField in message) { // @ts-ignore @@ -213,6 +262,18 @@ export abstract class AbstractQueueService< return undefined } + private tryToExtractNumberOfRetries(message: MessagePayloadSchemas): number | undefined { + if ( + this.messageNumberOfRetriesField in message && + typeof message[this.messageNumberOfRetriesField] === 'number' + ) { + // @ts-ignore + return message[this.messageNumberOfRetriesField] + } + + return undefined + } + protected abstract resolveNextFunction( preHandlers: Prehandler[], message: MessagePayloadSchemas, diff --git a/packages/core/package.json b/packages/core/package.json index d35b5803..0e46b6cf 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/core", - "version": "13.3.2", + "version": "13.4.0", "private": false, "license": "MIT", "description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently", diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index b0336c57..24086ef9 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -60,17 +60,7 @@ export abstract class AbstractSnsPublisher } try { - messageSchemaResult.result.parse(message) - - /** - * If the message doesn't have a timestamp field -> add it - * will be used on the consumer to prevent infinite retries on the same message - */ - if (!this.tryToExtractTimestamp(message)) { - // @ts-ignore - message[this.messageTimestampField] = new Date().toISOString() - this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`) - } + const parsedMessage = messageSchemaResult.result.parse(message) if (this.logMessages) { // @ts-ignore @@ -78,6 +68,8 @@ export abstract class AbstractSnsPublisher this.logMessage(resolvedLogMessage) } + message = this.updateInternalProperties(message) + const input = { Message: JSON.stringify(message), TopicArn: this.topicArn, @@ -85,7 +77,7 @@ export abstract class AbstractSnsPublisher } satisfies PublishCommandInput const command = new PublishCommand(input) await this.snsClient.send(command) - this.handleMessageProcessed(message, 'published') + this.handleMessageProcessed(parsedMessage, 'published') } catch (error) { const err = error as Error this.handleError(err) diff --git a/packages/sns/lib/utils/snsMessageDeserializer.spec.ts b/packages/sns/lib/utils/snsMessageDeserializer.spec.ts index 7e506be0..042bbcf3 100644 --- a/packages/sns/lib/utils/snsMessageDeserializer.spec.ts +++ b/packages/sns/lib/utils/snsMessageDeserializer.spec.ts @@ -13,7 +13,6 @@ describe('messageDeserializer', () => { const messagePayload = { id: '1', messageType: 'add', - userIds: [1], permissions: ['perm'], nonSchemaField: 'nonSchemaField', } @@ -47,7 +46,6 @@ describe('messageDeserializer', () => { parsedMessage: { id: '1', messageType: 'add', - userIds: [1], permissions: ['perm'], }, }) @@ -55,7 +53,7 @@ describe('messageDeserializer', () => { it('throws an error on invalid JSON', () => { const messagePayload: Partial = { - userIds: [1], + permissions: ['perm'], } const snsMessage: SNS_MESSAGE_BODY_TYPE = { @@ -89,7 +87,7 @@ describe('messageDeserializer', () => { it('throws an error on invalid SNS envelope', () => { const messagePayload: Partial = { - userIds: [1], + permissions: ['perm'], } const snsMessage: Partial = { diff --git a/packages/sns/package.json b/packages/sns/package.json index c0114544..36e48cd6 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sns", - "version": "15.1.0", + "version": "15.2.0", "private": false, "license": "MIT", "description": "SNS adapter for message-queue-toolkit", @@ -32,8 +32,8 @@ "peerDependencies": { "@aws-sdk/client-sns": "^3.556.0", "@aws-sdk/client-sqs": "^3.556.0", - "@message-queue-toolkit/core": "^13.1.0", - "@message-queue-toolkit/sqs": "^15.0.0" + "@message-queue-toolkit/core": "^13.4.0", + "@message-queue-toolkit/sqs": "^15.1.0" }, "devDependencies": { "@aws-sdk/client-sns": "^3.569.0", diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts index ddd61fae..4c0aeb70 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.deadLetterQueue.spec.ts @@ -17,7 +17,7 @@ import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import { SnsSqsPermissionConsumer } from './SnsSqsPermissionConsumer' -import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas' +import type { PERMISSIONS_REMOVE_MESSAGE_TYPE } from './userConsumerSchemas' // Note that dead letter queue are fully tested by sqs library - only including a few tests here to make sure the integration works describe('SnsSqsPermissionConsumer - dead letter queue', () => { @@ -156,21 +156,29 @@ describe('SnsSqsPermissionConsumer - dead letter queue', () => { }) dlqConsumer.start() - const message: PERMISSIONS_MESSAGE_TYPE = { + const message: PERMISSIONS_REMOVE_MESSAGE_TYPE = { id: '1', messageType: 'remove', - userIds: [1], - permissions: ['100'], timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(), } await publisher.publish(message) const spyResult = await consumer.handlerSpy.waitForMessageWithId('1', 'error') expect(spyResult.message).toEqual(message) - expect(counter).toBeGreaterThan(2) + // due to exponential backoff and timestamp, message is only retried once before being moved to DLQ + expect(counter).toBe(2) await waitAndRetry(async () => dlqMessage) - expect(JSON.parse(dlqMessage.Body)).toMatchObject({ id: '1', messageType: 'remove' }) + + const messageBody = JSON.parse(dlqMessage.Body) + expect(messageBody).toEqual({ + id: '1', + messageType: 'remove', + timestamp: message.timestamp, + _internalNumberOfRetries: expect.any(Number), + }) + // due to exponential backoff and timestamp, on second retry message is moved to DLQ so _internalNumberOfRetries is 1 + expect(messageBody._internalNumberOfRetries).toBe(1) dlqConsumer.stop() }) diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts index 441c9d94..004880cf 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.spec.ts @@ -517,4 +517,67 @@ describe('SnsSqsPermissionConsumer', () => { 10000, ) }) + + describe('exponential backoff retry', () => { + const topicName = 'myTestTopic' + const queueName = 'myTestQueue' + let diContainer: AwilixContainer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('should use internal field and 1 base delay', async () => { + const consumer = new SnsSqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { Name: topicName }, + queue: { QueueName: queueName }, + }, + removeHandlerOverride: () => { + return Promise.resolve({ error: 'retryLater' }) + }, + }) + await consumer.start() + + const publisher = new SnsPermissionPublisher(diContainer.cradle, { + locatorConfig: { topicArn: consumer.subscriptionProps.topicArn }, + }) + + const sqsSpy = vi.spyOn(diContainer.cradle.sqsClient, 'send') + await publisher.publish({ + id: '10', + messageType: 'remove', + }) + + await waitAndRetry( + () => { + const sqsSendMessageCommands = sqsSpy.mock.calls + .map((call) => call[0].input) + .filter((input) => 'MessageBody' in input) + + return sqsSendMessageCommands.length === 1 + }, + 5, + 100, + ) + + const sqsSendMessageCommands = sqsSpy.mock.calls + .map((call) => call[0].input) + .filter((input) => 'MessageBody' in input) + + expect(sqsSendMessageCommands).toHaveLength(1) + expect(sqsSendMessageCommands[0]).toMatchObject({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":1'), + DelaySeconds: 1, + }) + }) + }) }) diff --git a/packages/sns/test/consumers/userConsumerSchemas.ts b/packages/sns/test/consumers/userConsumerSchemas.ts index 974eab03..97ec04a8 100644 --- a/packages/sns/test/consumers/userConsumerSchemas.ts +++ b/packages/sns/test/consumers/userConsumerSchemas.ts @@ -3,7 +3,6 @@ import z from 'zod' export const PERMISSIONS_MESSAGE_SCHEMA = z.object({ id: z.string(), messageType: z.enum(['add', 'remove']), - userIds: z.array(z.number()).describe('User IDs'), permissions: z.array(z.string()).nonempty().describe('List of user permissions'), timestamp: z.string().optional(), }) @@ -12,12 +11,14 @@ export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({ id: z.string(), preHandlerIncrement: z.optional(z.number()), messageType: z.literal('add'), + timestamp: z.string().optional(), }) export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({ id: z.string(), preHandlerIncrement: z.optional(z.number()), messageType: z.literal('remove'), + timestamp: z.string().optional(), }) export type PERMISSIONS_MESSAGE_TYPE = z.infer diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts index be8e7b89..4ffc6fe6 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.spec.ts @@ -15,17 +15,12 @@ import type { PERMISSIONS_ADD_MESSAGE_TYPE, PERMISSIONS_MESSAGE_TYPE, } from '../consumers/userConsumerSchemas' -import { - PERMISSIONS_MESSAGE_SCHEMA, - PERMISSIONS_ADD_MESSAGE_SCHEMA, -} from '../consumers/userConsumerSchemas' +import { PERMISSIONS_ADD_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' import { SnsPermissionPublisher } from './SnsPermissionPublisher' -const perms: [string, ...string[]] = ['perm1', 'perm2'] -const userIds = [100, 200, 300] const queueName = 'someQueue' describe('SnsPermissionPublisher', () => { @@ -131,11 +126,9 @@ describe('SnsPermissionPublisher', () => { const message = { id: '1', - userIds, messageType: 'add', - permissions: perms, timestamp: new Date().toISOString(), - } satisfies PERMISSIONS_MESSAGE_TYPE + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE const { queueUrl } = await assertQueue(sqsClient, { QueueName: queueName, @@ -164,7 +157,7 @@ describe('SnsPermissionPublisher', () => { } const decodedMessage = deserializeSNSMessage( message as any, - PERMISSIONS_MESSAGE_SCHEMA, + PERMISSIONS_ADD_MESSAGE_SCHEMA, new FakeConsumerErrorResolver(), ) receivedMessage = decodedMessage.result! @@ -180,14 +173,14 @@ describe('SnsPermissionPublisher', () => { await waitAndRetry(() => !!receivedMessage) expect(receivedMessage).toEqual({ - originalMessage: message, + originalMessage: { ...message, _internalNumberOfRetries: 0, timestamp: expect.any(String) }, parsedMessage: message, }) consumer.stop() }) - it('publishes a message auto-filling timestamp', async () => { + it('publishes a message auto-filling internal properties', async () => { const { permissionPublisher } = diContainer.cradle const message = { @@ -238,10 +231,15 @@ describe('SnsPermissionPublisher', () => { await waitAndRetry(() => !!receivedMessage) expect(receivedMessage).toEqual({ - originalMessage: message, + originalMessage: { + ...message, + timestamp: expect.any(String), + _internalNumberOfRetries: 0, + }, parsedMessage: { id: '1', messageType: 'add', + timestamp: expect.any(String), }, }) @@ -258,7 +256,6 @@ describe('SnsPermissionPublisher', () => { const message = { id: '1', - userIds, messageType: 'add', permissions, } satisfies PERMISSIONS_MESSAGE_TYPE @@ -295,10 +292,8 @@ describe('SnsPermissionPublisher', () => { const message = { id: '1', - userIds, messageType: 'add', - permissions: perms, - } satisfies PERMISSIONS_MESSAGE_TYPE + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE await newPublisher.publish(message) diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 63cb8c12..838e4636 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -12,7 +12,6 @@ import type { ParseMessageResult, } from '@message-queue-toolkit/core' import { - isRetryDateExceeded, isMessageError, parseMessage, HandlerContainer, @@ -234,28 +233,24 @@ export abstract class AbstractSqsConsumer< } if (result.error === 'retryLater') { - const timestamp = this.tryToExtractTimestamp(originalMessage) ?? new Date() - // requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop - if (!isRetryDateExceeded(timestamp, this.maxRetryDuration)) { + if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) { await this.sqsClient.send( new SendMessageCommand({ QueueUrl: this.queueUrl, - MessageBody: JSON.stringify({ - ...originalMessage, - [this.messageTimestampField]: timestamp.toISOString(), - }), + DelaySeconds: this.getMessageRetryDelayInSeconds(originalMessage), + MessageBody: JSON.stringify(this.updateInternalProperties(originalMessage)), }), ) - this.handleMessageProcessed(originalMessage, 'retryLater') + this.handleMessageProcessed(parsedMessage, 'retryLater') } else { await this.failProcessing(message) - this.handleMessageProcessed(originalMessage, 'error') + this.handleMessageProcessed(parsedMessage, 'error') } return message } - this.handleMessageProcessed(originalMessage, 'error') + this.handleMessageProcessed(parsedMessage, 'error') return Promise.reject(result.error) }, }) diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index 44db7800..ef3aa2da 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -59,17 +59,7 @@ export abstract class AbstractSqsPublisher } try { - messageSchemaResult.result.parse(message) - - /** - * If the message doesn't have a timestamp field -> add it - * will be used on the consumer to prevent infinite retries on the same message - */ - if (!this.tryToExtractTimestamp(message)) { - // @ts-ignore - message[this.messageTimestampField] = new Date().toISOString() - this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`) - } + const parsedMessage = messageSchemaResult.result.parse(message) if (this.logMessages) { // @ts-ignore @@ -77,6 +67,8 @@ export abstract class AbstractSqsPublisher this.logMessage(resolvedLogMessage) } + message = this.updateInternalProperties(message) + const input = { // SendMessageRequest QueueUrl: this.queueUrl, @@ -85,7 +77,7 @@ export abstract class AbstractSqsPublisher } satisfies SendMessageCommandInput const command = new SendMessageCommand(input) await this.sqsClient.send(command) - this.handleMessageProcessed(message, 'published') + this.handleMessageProcessed(parsedMessage, 'published') } catch (error) { const err = error as Error this.handleError(err) diff --git a/packages/sqs/package.json b/packages/sqs/package.json index d5d3b1f8..cca89a43 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/sqs", - "version": "15.0.0", + "version": "15.1.0", "private": false, "license": "MIT", "description": "SQS adapter for message-queue-toolkit", @@ -31,7 +31,7 @@ }, "peerDependencies": { "@aws-sdk/client-sqs": "^3.556.0", - "@message-queue-toolkit/core": "^13.0.0" + "@message-queue-toolkit/core": "^13.4.0" }, "devDependencies": { "@aws-sdk/client-sqs": "^3.569.0", diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 74c19a41..7e67c3d0 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -1,3 +1,4 @@ +/* eslint-disable max-lines */ import { setTimeout } from 'node:timers/promises' import type { SendMessageCommandInput, SQSClient } from '@aws-sdk/client-sqs' @@ -192,18 +193,17 @@ describe('SqsPermissionConsumer', () => { await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') expect(logger.loggedMessages.length).toBe(2) - expect(logger.loggedMessages).toMatchInlineSnapshot(` - [ - { - "id": "1", - "messageType": "add", - }, - { - "messageId": "1", - "processingResult": "consumed", - }, - ] - `) + expect(logger.loggedMessages).toMatchObject([ + { + id: '1', + messageType: 'add', + timestamp: expect.any(String), + }, + { + messageId: '1', + processingResult: 'consumed', + }, + ]) await newConsumer.close() }) }) @@ -559,4 +559,97 @@ describe('SqsPermissionConsumer', () => { 10000, ) }) + + describe('exponential backoff retry', () => { + const queueName = 'myTestQueue_exponentialBackoffRetry' + let diContainer: AwilixContainer + + beforeEach(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + }) + + afterEach(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('should use internal field and 1 base delay', async () => { + const consumer = new SqsPermissionConsumer(diContainer.cradle, { + creationConfig: { + queue: { QueueName: queueName }, + }, + removeHandlerOverride: () => { + return Promise.resolve({ error: 'retryLater' }) + }, + }) + await consumer.start() + + const publisher = new SqsPermissionPublisher(diContainer.cradle, { + locatorConfig: { queueUrl: consumer.queueProps.url }, + }) + await publisher.init() + + const sqsSpy = vi.spyOn(diContainer.cradle.sqsClient, 'send') + await publisher.publish({ + id: '10', + messageType: 'remove', + }) + await publisher.publish({ + id: '20', + messageType: 'remove', + _internalNumberOfRetries: 1, // Note that publish will add 1 to this value, but it's fine for this test + } as any) + await publisher.publish({ + id: '30', + messageType: 'remove', + _internalNumberOfRetries: 10, // Note that publish will add 1 to this value, but it's fine for this test + } as any) + + await waitAndRetry( + () => { + const sendMessageCommands = sqsSpy.mock.calls + .map((call) => call[0].input) + .filter((input) => 'MessageBody' in input) + + return sendMessageCommands.length === 6 + }, + 5, + 100, + ) + + const sendMessageCommands = sqsSpy.mock.calls + .map((call) => call[0].input) + .filter((input) => 'MessageBody' in input) + + expect(sendMessageCommands).toHaveLength(6) + expect(sendMessageCommands).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":0'), + }), + expect.objectContaining({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":2'), + }), + expect.objectContaining({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":11'), + }), + expect.objectContaining({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":1'), + DelaySeconds: 1, + }), + expect.objectContaining({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":3'), + DelaySeconds: 4, + }), + expect.objectContaining({ + MessageBody: expect.stringContaining('"_internalNumberOfRetries":12'), + DelaySeconds: 2048, + }), + ]), + ) + }) + }) }) diff --git a/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts b/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts index dd4c3ece..44b27f83 100644 --- a/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts +++ b/packages/sqs/test/consumers/SqsPermisssionConsumer.deadLetterQueue.spec.ts @@ -13,7 +13,7 @@ import { registerDependencies } from '../utils/testContext' import { SqsPermissionConsumer } from './SqsPermissionConsumer' import type { - PERMISSIONS_MESSAGE_TYPE, + PERMISSIONS_ADD_MESSAGE_TYPE, PERMISSIONS_REMOVE_MESSAGE_TYPE, } from './userConsumerSchemas' @@ -246,25 +246,32 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => { await waitAndRetry(async () => dlqMessage, 50, 20) expect(counter).toBe(2) - expect(JSON.parse(dlqMessage.Body)).toMatchObject({ id: '1', messageType: 'remove' }) + expect(JSON.parse(dlqMessage.Body)).toEqual({ + id: '1', + messageType: 'remove', + timestamp: expect.any(String), + _internalNumberOfRetries: 0, + }) }) - it('messages with retryLater should always be retried and not go to DLQ', async () => { + it('messages with retryLater should be retried with exponential delay and not go to DLQ', async () => { const sqsMessage: PERMISSIONS_REMOVE_MESSAGE_TYPE = { id: '1', messageType: 'remove' } let counter = 0 + const messageArrivalTime: number[] = [] consumer = new SqsPermissionConsumer(diContainer.cradle, { creationConfig: { queue: { QueueName: queueName } }, deadLetterQueue: { creationConfig: { queue: { QueueName: deadLetterQueueName } }, - redrivePolicy: { maxReceiveCount: 3 }, + redrivePolicy: { maxReceiveCount: 1 }, }, removeHandlerOverride: async (message) => { if (message.id !== sqsMessage.id) { throw new Error('not expected message') } counter++ - return counter < 10 ? { error: 'retryLater' } : { result: 'success' } + messageArrivalTime.push(new Date().getTime()) + return counter < 2 ? { error: 'retryLater' } : { result: 'success' } }, }) await consumer.start() @@ -274,6 +281,12 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => { const handlerSpyResult = await consumer.handlerSpy.waitForMessageWithId('1', 'consumed') expect(handlerSpyResult.processingResult).toBe('consumed') expect(handlerSpyResult.message).toMatchObject({ id: '1', messageType: 'remove' }) + + expect(counter).toBe(2) + + // delay is 1s, but consumer can take the message + const secondsRetry = (messageArrivalTime[1] - messageArrivalTime[0]) / 1000 + expect(secondsRetry).toBeGreaterThan(1) }) it('messages with deserialization errors should go to DLQ', async () => { @@ -319,7 +332,7 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => { creationConfig: { queue: { QueueName: deadLetterQueueName } }, redrivePolicy: { maxReceiveCount: 200 }, }, - maxRetryDuration: 3, + maxRetryDuration: 2, addPreHandlerBarrier: (_msg) => { counter++ return Promise.resolve({ isPassing: false }) @@ -337,21 +350,28 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => { }) dlqConsumer.start() - const message: PERMISSIONS_MESSAGE_TYPE = { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { id: '1', messageType: 'add', - userIds: [1], - permissions: ['100'], - timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(), + timestamp: new Date(new Date().getTime() - 1000).toISOString(), } await permissionPublisher.publish(message) const spyResult = await consumer.handlerSpy.waitForMessageWithId('1', 'error') expect(spyResult.message).toEqual(message) - expect(counter).toBeGreaterThan(2) + // due to exponential backoff and timestamp, message is only retried once before being moved to DLQ + expect(counter).toBe(2) - await waitAndRetry(async () => dlqMessage) - expect(JSON.parse(dlqMessage.Body)).toMatchObject({ id: '1', messageType: 'add' }) + await waitAndRetry(() => dlqMessage) + const messageBody = JSON.parse(dlqMessage.Body) + expect(messageBody).toEqual({ + id: '1', + messageType: 'add', + timestamp: message.timestamp, + _internalNumberOfRetries: expect.any(Number), + }) + // due to exponential backoff and timestamp, on second retry message is moved to DLQ so _internalNumberOfRetries is 1 + expect(messageBody._internalNumberOfRetries).toBe(1) dlqConsumer.stop() }) @@ -364,7 +384,7 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => { creationConfig: { queue: { QueueName: deadLetterQueueName } }, redrivePolicy: { maxReceiveCount: 200 }, }, - maxRetryDuration: 3, + maxRetryDuration: 2, removeHandlerOverride: async () => { counter++ return Promise.resolve({ error: 'retryLater' }) @@ -382,21 +402,28 @@ describe('SqsPermissionConsumer - deadLetterQueue', () => { }) dlqConsumer.start() - const message: PERMISSIONS_MESSAGE_TYPE = { - id: '1', + const message: PERMISSIONS_REMOVE_MESSAGE_TYPE = { + id: '2', messageType: 'remove', - userIds: [1], - permissions: ['100'], - timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(), + timestamp: new Date(new Date().getTime() - 1000).toISOString(), } await permissionPublisher.publish(message) - const spyResult = await consumer.handlerSpy.waitForMessageWithId('1', 'error') + const spyResult = await consumer.handlerSpy.waitForMessageWithId('2', 'error') expect(spyResult.message).toEqual(message) - expect(counter).toBeGreaterThan(2) + // due to exponential backoff and timestamp, message is only retried once before being moved to DLQ + expect(counter).toBe(2) - await waitAndRetry(async () => dlqMessage) - expect(JSON.parse(dlqMessage.Body)).toMatchObject({ id: '1', messageType: 'remove' }) + await waitAndRetry(() => dlqMessage) + const messageBody = JSON.parse(dlqMessage.Body) + expect(messageBody).toEqual({ + id: '2', + messageType: 'remove', + timestamp: message.timestamp, + _internalNumberOfRetries: expect.any(Number), + }) + // due to exponential backoff and timestamp, on second retry message is moved to DLQ so _internalNumberOfRetries is 1 + expect(messageBody._internalNumberOfRetries).toBe(1) dlqConsumer.stop() }) diff --git a/packages/sqs/test/consumers/userConsumerSchemas.ts b/packages/sqs/test/consumers/userConsumerSchemas.ts index d4784b74..18c4339a 100644 --- a/packages/sqs/test/consumers/userConsumerSchemas.ts +++ b/packages/sqs/test/consumers/userConsumerSchemas.ts @@ -11,11 +11,13 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({ export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({ id: z.string(), messageType: z.literal('add'), + timestamp: z.string().optional(), }) export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({ id: z.string(), messageType: z.literal('remove'), + timestamp: z.string().optional(), }) export type PERMISSIONS_MESSAGE_TYPE = z.infer diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts index fd24a3bc..9baecd1b 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.spec.ts @@ -1,12 +1,15 @@ import type { SQSClient } from '@aws-sdk/client-sqs' +import { waitAndRetry } from '@lokalise/node-core' import type { AwilixContainer } from 'awilix' +import { Consumer } from 'sqs-consumer' import { afterEach, beforeEach, describe, expect, it } from 'vitest' +import { FakeConsumerErrorResolver } from '../../lib/fakes/FakeConsumerErrorResolver' +import type { SQSMessage } from '../../lib/types/MessageTypes' +import { deserializeSQSMessage } from '../../lib/utils/sqsMessageDeserializer' import { assertQueue, deleteQueue, getQueueAttributes } from '../../lib/utils/sqsUtils' -import type { - PERMISSIONS_ADD_MESSAGE_TYPE, - PERMISSIONS_MESSAGE_TYPE, -} from '../consumers/userConsumerSchemas' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '../consumers/userConsumerSchemas' +import { PERMISSIONS_ADD_MESSAGE_SCHEMA } from '../consumers/userConsumerSchemas' import { registerDependencies } from '../utils/testContext' import type { Dependencies } from '../utils/testContext' @@ -173,11 +176,9 @@ describe('SqsPermissionPublisher', () => { const message = { id: '1', - userIds: [100, 200, 300], messageType: 'add', - permissions: ['perm1', 'perm2'], timestamp: new Date().toISOString(), - } satisfies PERMISSIONS_MESSAGE_TYPE + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE await permissionPublisher.publish(message) @@ -186,8 +187,35 @@ describe('SqsPermissionPublisher', () => { expect(spy.processingResult).toBe('published') }) - it('publish a message auto-filling timestamp', async () => { - const { permissionPublisher } = diContainer.cradle + it('publish a message auto-filling internal properties', async () => { + const QueueName = 'auto-filling_test_queue' + const { queueUrl } = await assertQueue(diContainer.cradle.sqsClient, { + QueueName, + }) + + const permissionPublisher = new SqsPermissionPublisher(diContainer.cradle, { + creationConfig: { + queue: { QueueName }, + }, + }) + + let receivedMessage: unknown + const consumer = Consumer.create({ + queueUrl: queueUrl, + handleMessage: async (message: SQSMessage) => { + if (message === null) { + return + } + const decodedMessage = deserializeSQSMessage( + message as any, + PERMISSIONS_ADD_MESSAGE_SCHEMA, + new FakeConsumerErrorResolver(), + ) + receivedMessage = decodedMessage.result! + }, + sqs: diContainer.cradle.sqsClient, + }) + consumer.start() const message = { id: '1', @@ -196,9 +224,23 @@ describe('SqsPermissionPublisher', () => { await permissionPublisher.publish(message) - const spy = await permissionPublisher.handlerSpy.waitForMessageWithId('1', 'published') - expect(spy.message).toEqual({ ...message, timestamp: expect.any(String) }) - expect(spy.processingResult).toBe('published') + await waitAndRetry(() => !!receivedMessage) + expect(receivedMessage).toEqual({ + originalMessage: { + id: '1', + messageType: 'add', + timestamp: expect.any(String), + _internalNumberOfRetries: 0, + }, + parsedMessage: { + id: '1', + messageType: 'add', + timestamp: expect.any(String), + }, + }) + + consumer.stop() + await permissionPublisher.close() }) it('publish message with lazy loading', async () => { @@ -206,10 +248,8 @@ describe('SqsPermissionPublisher', () => { const message = { id: '1', - userIds: [100, 200, 300], messageType: 'add', - permissions: ['perm1', 'perm2'], - } satisfies PERMISSIONS_MESSAGE_TYPE + } satisfies PERMISSIONS_ADD_MESSAGE_TYPE await newPublisher.publish(message) diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.ts index c569173a..865ca90c 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.ts @@ -8,7 +8,6 @@ import type { } from '../../lib/sqs/AbstractSqsService' import type { PERMISSIONS_ADD_MESSAGE_TYPE, - PERMISSIONS_MESSAGE_TYPE, PERMISSIONS_REMOVE_MESSAGE_TYPE, } from '../consumers/userConsumerSchemas' import { @@ -16,10 +15,7 @@ import { PERMISSIONS_REMOVE_MESSAGE_SCHEMA, } from '../consumers/userConsumerSchemas' -type SupportedMessages = - | PERMISSIONS_ADD_MESSAGE_TYPE - | PERMISSIONS_REMOVE_MESSAGE_TYPE - | PERMISSIONS_MESSAGE_TYPE +type SupportedMessages = PERMISSIONS_ADD_MESSAGE_TYPE | PERMISSIONS_REMOVE_MESSAGE_TYPE export class SqsPermissionPublisher extends AbstractSqsPublisher { public static readonly QUEUE_NAME = 'user_permissions_multi'