Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/core/consumer/RunMQConsumerCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ export class RunMQConsumerCreator {
this.logger
),
consumerConfiguration.processorConfig,
DLQPublisher,
this.logger
)
)
Expand Down
44 changes: 16 additions & 28 deletions src/core/consumer/processors/RunMQRetriesCheckerProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import {RunMQConsumer, RunMQProcessorConfiguration, RunMQPublisher} from "@src/types";
import {RunMQConsumer, RunMQProcessorConfiguration} from "@src/types";
import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage";
import {RunMQMessage} from "@src/core/message/RunMQMessage";
import {RunMQLogger} from "@src/core/logging/RunMQLogger";
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {DEFAULTS} from "@src/core/constants";
import {Constants, DEFAULTS} from "@src/core/constants";

export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
private readonly maxAttempts: number = this.config.attempts ?? DEFAULTS.PROCESSING_ATTEMPTS;

constructor(
private readonly consumer: RunMQConsumer,
private readonly config: RunMQProcessorConfiguration,
private readonly DLQPublisher: RunMQPublisher,
private readonly logger: RunMQLogger,
) {
}
Expand Down Expand Up @@ -45,31 +43,21 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
);
}

// Republish the original AMQP body verbatim so the envelope (including
// publishedAt) is preserved end-to-end for audit/replay.
private moveToFinalDeadLetter(message: RabbitMQMessage) {
const originalPayload = this.extractOriginalPayload(message);
const dlqMessage = new RabbitMQMessage(
originalPayload,
message.id,
message.correlationId,
message.channel,
message.amqpMessage,
message.headers
);
this.DLQPublisher.publish(ConsumerCreatorUtils.getDLQTopicName(this.config.name), dlqMessage)
}

private extractOriginalPayload(message: RabbitMQMessage): any {
if (typeof message.message === 'string') {
try {
const parsed = JSON.parse(message.message);
if (RunMQMessage.isValid(parsed)) {
return parsed.message;
}
} catch {
// Not valid JSON, use as-is
if (!message.amqpMessage) return;
message.channel.publish(
Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME,
ConsumerCreatorUtils.getDLQTopicName(this.config.name),
message.amqpMessage.content,
{
correlationId: message.correlationId,
messageId: message.id,
headers: message.headers,
persistent: true,
}
}
return message.message;
);
}

private acknowledgeMessage(message: RabbitMQMessage) {
Expand All @@ -88,4 +76,4 @@ export class RunMQRetriesCheckerProcessor implements RunMQConsumer {
const deathRecord = xDeath.filter(entry => entry && entry.reason == 'rejected')[0];
return deathRecord ? deathRecord.count + 1 : 1;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,51 +1,50 @@
import {RunMQRetriesCheckerProcessor} from "@src/core/consumer/processors/RunMQRetriesCheckerProcessor";
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {Constants} from "@src/core/constants";
import {MockedThrowableRabbitMQConsumer} from "@tests/mocks/MockedRunMQConsumer";
import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample";
import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
import {
mockedRabbitMQMessageWithChannelAndDeathCount,
mockedRabbitMQMessageWithDeathCount
} from "@tests/mocks/MockedRabbitMQMessage";
import {MockedRabbitMQPublisher} from "@tests/mocks/MockedRunMQPublisher";
import {MockedAMQPChannelWithAcknowledgeFailure, MockedAMQPChannel} from "@tests/mocks/MockedAMQPChannel";
import {RabbitMQMessage} from "@src/core/message/RabbitMQMessage";
import {RunMQMessage, RunMQMessageMeta} from "@src/core/message/RunMQMessage";
import {MockedAmqpMessage} from "@tests/mocks/MockedAmqpMessage";

describe('RunMQRetriesCheckerProcessor', () => {
const consumer = new MockedThrowableRabbitMQConsumer()
const processorConfig = RunMQProcessorConfigurationExample.withAttempts(3)

it("should throw error if message hasn't reached max attempts yet", async () => {
const message = mockedRabbitMQMessageWithDeathCount(1)
const runMQPublisher = new MockedRabbitMQPublisher()

const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, runMQPublisher, MockedRunMQLogger)
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, MockedRunMQLogger)
await expect(processor.consume(message)).rejects.toThrow(Error);
})

it('should log and move to dead-letter queue when max attempts reached and acknowledge message', async () => {
const message = mockedRabbitMQMessageWithDeathCount(2)
const runMQPublisher = new MockedRabbitMQPublisher()

const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, runMQPublisher, MockedRunMQLogger)
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, MockedRunMQLogger)
await processor.consume(message)

expect(MockedRunMQLogger.error).toHaveBeenCalledWith(`Message reached maximum attempts. Moving to dead-letter queue.`, {
message: message.message,
attempts: 3,
max: 3,
});
expect(runMQPublisher.publish).toHaveBeenCalledWith(
expect(message.channel.publish).toHaveBeenCalledWith(
Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME,
ConsumerCreatorUtils.getDLQTopicName(processorConfig.name),
expect.objectContaining({
message: message.message,
id: message.id,
message.amqpMessage!.content,
{
correlationId: message.correlationId,
messageId: message.id,
headers: message.headers,
})
)
persistent: true,
}
);
expect(message.channel.ack).toHaveBeenCalledWith(message.amqpMessage);
})
})
Expand All @@ -59,8 +58,7 @@ describe('RunMQRetriesCheckerProcessor - acknowledgeMessage', () => {
new MockedAMQPChannelWithAcknowledgeFailure(),
2
)
const runMQPublisher = new MockedRabbitMQPublisher()
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, runMQPublisher, MockedRunMQLogger)
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, MockedRunMQLogger)

await expect(processor.consume(message)).rejects.toMatchObject({
message: "A message acknowledge failed after publishing to final dead letter",
Expand All @@ -71,86 +69,96 @@ describe('RunMQRetriesCheckerProcessor - acknowledgeMessage', () => {
attempts: 3,
max: 3,
});
expect(runMQPublisher.publish).toHaveBeenCalledWith(
expect(message.channel.publish).toHaveBeenCalledWith(
Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME,
ConsumerCreatorUtils.getDLQTopicName(processorConfig.name),
message.amqpMessage!.content,
expect.objectContaining({
message: message.message,
id: message.id,
correlationId: message.correlationId,
messageId: message.id,
headers: message.headers,
persistent: true,
})
)
);
});
});

describe('RunMQRetriesCheckerProcessor - DLQ message double encoding', () => {
describe('RunMQRetriesCheckerProcessor - DLQ envelope preservation', () => {
const consumer = new MockedThrowableRabbitMQConsumer()
const processorConfig = RunMQProcessorConfigurationExample.withAttempts(3)

it('should extract the original payload when message content is a serialized RunMQMessage', async () => {
it('should publish the original buffer verbatim, preserving the envelope including publishedAt', async () => {
const originalPayload = {userId: "123", email: "user@example.com", name: "John Doe"};
const serializedContent = JSON.stringify(new RunMQMessage(
const originalPublishedAt = 1700000000000;
const originalEnvelope = new RunMQMessage(
originalPayload,
new RunMQMessageMeta("msg-id", Date.now(), "corr-id")
));
new RunMQMessageMeta("msg-id", originalPublishedAt, "corr-id")
);
const originalBuffer = Buffer.from(JSON.stringify(originalEnvelope));

const channel = new MockedAMQPChannel();
const amqpMessage = {
content: originalBuffer,
fields: {
consumerTag: 'test-consumer-tag',
deliveryTag: 1,
redelivered: false,
exchange: 'test-exchange',
routingKey: 'test-routing-key',
},
properties: {},
} as any;
const message = new RabbitMQMessage(
serializedContent,
originalBuffer.toString(),
"msg-id",
"corr-id",
channel,
MockedAmqpMessage,
amqpMessage,
{"x-death": [{count: 2, reason: "rejected"}]}
);

const runMQPublisher = new MockedRabbitMQPublisher();
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, runMQPublisher, MockedRunMQLogger);
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, MockedRunMQLogger);
await processor.consume(message);

const rabbitMQMessage = runMQPublisher.publish.mock.calls[0][1] as RabbitMQMessage;
expect(rabbitMQMessage.message).toEqual(originalPayload);
expect(rabbitMQMessage.id).toBe(message.id);
expect(rabbitMQMessage.correlationId).toBe(message.correlationId);
expect(channel.publish).toHaveBeenCalledTimes(1);
const publishedBuffer = channel.publish.mock.calls[0][2];
expect(publishedBuffer).toBe(originalBuffer);

const decoded = JSON.parse(publishedBuffer.toString());
expect(decoded.message).toEqual(originalPayload);
expect(decoded.meta.publishedAt).toBe(originalPublishedAt);
expect(decoded.meta.id).toBe("msg-id");
expect(decoded.meta.correlationId).toBe("corr-id");
});

it('should keep message as-is when content is not a serialized RunMQMessage', async () => {
it('should publish the original buffer even when content is not a valid envelope', async () => {
const plainContent = "plain text message";
const plainBuffer = Buffer.from(plainContent);
const channel = new MockedAMQPChannel();
const amqpMessage = {
content: plainBuffer,
fields: {
consumerTag: 'test-consumer-tag',
deliveryTag: 1,
redelivered: false,
exchange: 'test-exchange',
routingKey: 'test-routing-key',
},
properties: {},
} as any;
const message = new RabbitMQMessage(
plainContent,
"msg-id",
"corr-id",
channel,
MockedAmqpMessage,
amqpMessage,
{"x-death": [{count: 2, reason: "rejected"}]}
);

const runMQPublisher = new MockedRabbitMQPublisher();
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, runMQPublisher, MockedRunMQLogger);
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, MockedRunMQLogger);
await processor.consume(message);

const publishedMessage = runMQPublisher.publish.mock.calls[0][1] as RabbitMQMessage;
expect(publishedMessage.message).toBe(plainContent);
expect(channel.publish).toHaveBeenCalledTimes(1);
expect(channel.publish.mock.calls[0][2]).toBe(plainBuffer);
});

it('should keep message as-is when content is a non-RunMQMessage JSON string', async () => {
const jsonContent = JSON.stringify({foo: "bar"});
const channel = new MockedAMQPChannel();
const message = new RabbitMQMessage(
jsonContent,
"msg-id",
"corr-id",
channel,
MockedAmqpMessage,
{"x-death": [{count: 2, reason: "rejected"}]}
);

const runMQPublisher = new MockedRabbitMQPublisher();
const processor = new RunMQRetriesCheckerProcessor(consumer, processorConfig, runMQPublisher, MockedRunMQLogger);
await processor.consume(message);

const publishedMessage = runMQPublisher.publish.mock.calls[0][1] as RabbitMQMessage;
expect(publishedMessage.message).toBe(jsonContent);
});
});
});
Loading