Skip to content

Commit

Permalink
AP-3648 SQS exponential delay between retries (#150)
Browse files Browse the repository at this point in the history
* Adding required methods to auto-filling internal props

* Adjusting sqs to new methods

* Fixing mutation issue when adding new properties

* Mutation ssue fix + Sqs adjust

* Sns adjusted

* Amqp adjusted

* Adding common delay calculation method

* Spy is using parsed message instead of original one

* Implementing delay on SQS

* Lint fixes

* Fixing tests + adding TODO

* Error fix

* Test fixes

* And more test fixes

* Testing exponential delay

* One more test fix

* Test [will be reverted]

* Revert "Test [will be reverted]"

This reverts commit 3ba05da.

* Trying to fix tests on CI

* Better test for exp delay

* Fixing issue on test

* Test fix

* Improving tests

* Test fix

* SNS tests

* Lint fixes

* Release prepare - minor version

* Adding JSdoc to number of retries prop
  • Loading branch information
CarlosGamero committed May 27, 2024
1 parent 8130718 commit f2e9571
Show file tree
Hide file tree
Showing 23 changed files with 415 additions and 158 deletions.
14 changes: 6 additions & 8 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type {
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import {
isRetryDateExceeded,
isMessageError,
parseMessage,
HandlerContainer,
Expand Down Expand Up @@ -170,27 +169,26 @@ export abstract class AbstractAmqpConsumer<
.then((result) => {
if (result.result === 'success') {
this.channel.ack(message)
this.handleMessageProcessed(originalMessage, 'consumed')
this.handleMessageProcessed(parsedMessage, 'consumed')
return
}

// retryLater
const timestamp = this.tryToExtractTimestamp(originalMessage) ?? new Date()
// requeue the message if maxRetryDuration is not exceeded, else ack it to avoid infinite loop
if (!isRetryDateExceeded(timestamp, this.maxRetryDuration)) {
if (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) {
// TODO: Add retry delay + republish message updating internal properties
this.channel.nack(message, false, true)
this.handleMessageProcessed(originalMessage, 'retryLater')
this.handleMessageProcessed(parsedMessage, 'retryLater')
} else {
// ToDo move message to DLQ once it is implemented
this.channel.ack(message)
this.handleMessageProcessed(originalMessage, 'error')
this.handleMessageProcessed(parsedMessage, 'error')
}
})
.catch((err) => {
// ToDo we need sanity check to stop trying at some point, perhaps some kind of Redis counter
// If we fail due to unknown reason, let's retry
this.channel.nack(message, false, true)
this.handleMessageProcessed(originalMessage, 'retryLater')
this.handleMessageProcessed(parsedMessage, 'retryLater')
this.handleError(err)
})
.finally(() => {
Expand Down
12 changes: 2 additions & 10 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,14 @@ export abstract class AbstractAmqpPublisher<
return
}

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
message[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)

try {
this.publishInternal(objectToBuffer(message), options)
} catch (err) {
Expand Down
4 changes: 2 additions & 2 deletions packages/amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/amqp",
"version": "15.3.0",
"version": "15.4.0",
"private": false,
"license": "MIT",
"description": "AMQP adapter for message-queue-toolkit",
Expand Down Expand Up @@ -29,7 +29,7 @@
"zod": "^3.23.8"
},
"peerDependencies": {
"@message-queue-toolkit/core": "^13.3.1",
"@message-queue-toolkit/core": "^13.4.0",
"amqplib": "^0.10.3"
},
"devDependencies": {
Expand Down
17 changes: 8 additions & 9 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import type { Dependencies } from '../utils/testContext'
import { registerDependencies, SINGLETON_CONFIG } from '../utils/testContext'

import { AmqpPermissionConsumer } from './AmqpPermissionConsumer'
import type { PERMISSIONS_MESSAGE_TYPE } from './userConsumerSchemas'
import type {
PERMISSIONS_ADD_MESSAGE_TYPE,
PERMISSIONS_REMOVE_MESSAGE_TYPE,
} from './userConsumerSchemas'

describe('AmqpPermissionConsumer', () => {
describe('init', () => {
Expand Down Expand Up @@ -90,15 +93,15 @@ describe('AmqpPermissionConsumer', () => {
expect(logger.loggedMessages.length).toBe(5)
expect(logger.loggedMessages).toEqual([
'Propagating new connection across 0 receivers',
'timestamp not defined, adding it automatically',
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
},
'timestamp not defined, adding it automatically',
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
},
{
messageId: '1',
Expand Down Expand Up @@ -407,11 +410,9 @@ describe('AmqpPermissionConsumer', () => {
})
await consumer.start()

const message: PERMISSIONS_MESSAGE_TYPE = {
const message: PERMISSIONS_ADD_MESSAGE_TYPE = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['100'],
timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(),
}
publisher.publish(message)
Expand All @@ -432,11 +433,9 @@ describe('AmqpPermissionConsumer', () => {
})
await consumer.start()

const message: PERMISSIONS_MESSAGE_TYPE = {
const message: PERMISSIONS_REMOVE_MESSAGE_TYPE = {
id: '1',
messageType: 'remove',
userIds: [1],
permissions: ['100'],
timestamp: new Date(new Date().getTime() - 2 * 1000).toISOString(),
}
publisher.publish(message)
Expand Down
2 changes: 2 additions & 0 deletions packages/amqp/test/consumers/userConsumerSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ export const PERMISSIONS_MESSAGE_SCHEMA = z.object({
export const PERMISSIONS_ADD_MESSAGE_SCHEMA = z.object({
id: z.string(),
messageType: z.literal('add'),
timestamp: z.string().or(z.date()).optional(),
})

export const PERMISSIONS_REMOVE_MESSAGE_SCHEMA = z.object({
id: z.string(),
messageType: z.literal('remove'),
timestamp: z.string().or(z.date()).optional(),
})

export type PERMISSIONS_MESSAGE_TYPE = z.infer<typeof PERMISSIONS_MESSAGE_SCHEMA>
Expand Down
9 changes: 6 additions & 3 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ describe('PermissionPublisher', () => {
return logger.loggedMessages.length === 2
})

expect(logger.loggedMessages[2]).toEqual({
expect(logger.loggedMessages[1]).toEqual({
id: '1',
messageType: 'add',
timestamp: expect.any(String),
})
})
})
Expand Down Expand Up @@ -245,6 +244,7 @@ describe('PermissionPublisher', () => {
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
_internalNumberOfRetries: 0,
},
})
})
Expand Down Expand Up @@ -292,11 +292,12 @@ describe('PermissionPublisher', () => {
userIds: [1],
permissions: ['100'],
timestamp: message.timestamp.toISOString(),
_internalNumberOfRetries: 0,
},
})
})

it('publishes a message auto-filling timestamp', async () => {
it('publishes a message auto-filling internal properties', async () => {
await permissionConsumer.close()

const message = {
Expand Down Expand Up @@ -325,11 +326,13 @@ describe('PermissionPublisher', () => {
parsedMessage: {
id: '2',
messageType: 'add',
timestamp: expect.any(String),
},
originalMessage: {
id: '2',
messageType: 'add',
timestamp: expect.any(String),
_internalNumberOfRetries: 0,
},
})
})
Expand Down
65 changes: 63 additions & 2 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { ZodSchema, ZodType } from 'zod'
import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors'
import type { Logger, MessageProcessingResult } from '../types/MessageQueueTypes'
import type { DeletionConfig, QueueDependencies, QueueOptions } from '../types/queueOptionsTypes'
import { isRetryDateExceeded } from '../utils/dateUtils'
import { toDatePreprocessor } from '../utils/toDateProcessor'

import type {
Expand Down Expand Up @@ -42,11 +43,21 @@ export abstract class AbstractQueueService<
ExecutionContext = undefined,
PrehandlerOutput = undefined,
> {
/**
* Used to keep track of the number of `retryLater` results received for a message to be able to
* calculate the delay for the next retry
*/
private readonly messageNumberOfRetriesField = '_internalNumberOfRetries'
/**
* Used to know when the message was sent initially so we can have a max retry date and avoid
* a infinite `retryLater` loop
*/
protected readonly messageTimestampField: string

protected readonly errorReporter: ErrorReporter
public readonly logger: Logger
protected readonly messageIdField: string
protected readonly messageTypeField: string
protected readonly messageTimestampField: string
protected readonly logMessages: boolean
protected readonly creationConfig?: QueueConfiguration
protected readonly locatorConfig?: QueueLocatorType
Expand Down Expand Up @@ -198,7 +209,45 @@ export abstract class AbstractQueueService<
return await barrier(message, executionContext, preHandlerOutput)
}

protected tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined {
shouldBeRetried(message: MessagePayloadSchemas, maxRetryDuration: number): boolean {
const timestamp = this.tryToExtractTimestamp(message) ?? new Date()
return !isRetryDateExceeded(timestamp, maxRetryDuration)
}

protected getMessageRetryDelayInSeconds(message: MessagePayloadSchemas): number {
// if not defined, this is the first attempt
const retries = this.tryToExtractNumberOfRetries(message) ?? 0

// exponential backoff -> (2 ^ (attempts)) * delay
// delay = 1 second
return Math.pow(2, retries)
}

protected updateInternalProperties(message: MessagePayloadSchemas): MessagePayloadSchemas {
const messageCopy = { ...message } // clone the message to avoid mutation

/**
* If the message doesn't have a timestamp field -> add it
* will be used to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
messageCopy[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}

/**
* add/increment the number of retries performed to exponential message delay
*/
const numberOfRetries = this.tryToExtractNumberOfRetries(message)
// @ts-ignore
messageCopy[this.messageNumberOfRetriesField] =
numberOfRetries !== undefined ? numberOfRetries + 1 : 0

return messageCopy
}

private tryToExtractTimestamp(message: MessagePayloadSchemas): Date | undefined {
// @ts-ignore
if (this.messageTimestampField in message) {
// @ts-ignore
Expand All @@ -213,6 +262,18 @@ export abstract class AbstractQueueService<
return undefined
}

private tryToExtractNumberOfRetries(message: MessagePayloadSchemas): number | undefined {
if (
this.messageNumberOfRetriesField in message &&
typeof message[this.messageNumberOfRetriesField] === 'number'
) {
// @ts-ignore
return message[this.messageNumberOfRetriesField]
}

return undefined
}

protected abstract resolveNextFunction(
preHandlers: Prehandler<MessagePayloadSchemas, ExecutionContext, PrehandlerOutput>[],
message: MessagePayloadSchemas,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/core",
"version": "13.3.2",
"version": "13.4.0",
"private": false,
"license": "MIT",
"description": "Useful utilities, interfaces and base classes for message queue handling. Supports AMQP and SQS with a common abstraction on top currently",
Expand Down
16 changes: 4 additions & 12 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,24 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
}

try {
messageSchemaResult.result.parse(message)

/**
* If the message doesn't have a timestamp field -> add it
* will be used on the consumer to prevent infinite retries on the same message
*/
if (!this.tryToExtractTimestamp(message)) {
// @ts-ignore
message[this.messageTimestampField] = new Date().toISOString()
this.logger.warn(`${this.messageTimestampField} not defined, adding it automatically`)
}
const parsedMessage = messageSchemaResult.result.parse(message)

if (this.logMessages) {
// @ts-ignore
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)

const input = {
Message: JSON.stringify(message),
TopicArn: this.topicArn,
...options,
} satisfies PublishCommandInput
const command = new PublishCommand(input)
await this.snsClient.send(command)
this.handleMessageProcessed(message, 'published')
this.handleMessageProcessed(parsedMessage, 'published')
} catch (error) {
const err = error as Error
this.handleError(err)
Expand Down
6 changes: 2 additions & 4 deletions packages/sns/lib/utils/snsMessageDeserializer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ describe('messageDeserializer', () => {
const messagePayload = {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
nonSchemaField: 'nonSchemaField',
}
Expand Down Expand Up @@ -47,15 +46,14 @@ describe('messageDeserializer', () => {
parsedMessage: {
id: '1',
messageType: 'add',
userIds: [1],
permissions: ['perm'],
},
})
})

it('throws an error on invalid JSON', () => {
const messagePayload: Partial<PERMISSIONS_MESSAGE_TYPE> = {
userIds: [1],
permissions: ['perm'],
}

const snsMessage: SNS_MESSAGE_BODY_TYPE = {
Expand Down Expand Up @@ -89,7 +87,7 @@ describe('messageDeserializer', () => {

it('throws an error on invalid SNS envelope', () => {
const messagePayload: Partial<PERMISSIONS_MESSAGE_TYPE> = {
userIds: [1],
permissions: ['perm'],
}

const snsMessage: Partial<SNS_MESSAGE_BODY_TYPE> = {
Expand Down
6 changes: 3 additions & 3 deletions packages/sns/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@message-queue-toolkit/sns",
"version": "15.1.0",
"version": "15.2.0",
"private": false,
"license": "MIT",
"description": "SNS adapter for message-queue-toolkit",
Expand Down Expand Up @@ -32,8 +32,8 @@
"peerDependencies": {
"@aws-sdk/client-sns": "^3.556.0",
"@aws-sdk/client-sqs": "^3.556.0",
"@message-queue-toolkit/core": "^13.1.0",
"@message-queue-toolkit/sqs": "^15.0.0"
"@message-queue-toolkit/core": "^13.4.0",
"@message-queue-toolkit/sqs": "^15.1.0"
},
"devDependencies": {
"@aws-sdk/client-sns": "^3.569.0",
Expand Down
Loading

0 comments on commit f2e9571

Please sign in to comment.