Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions src/core/clients/RabbitMQClientAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {Connection, Channel} from "rabbitmq-client";
import {RunMQException} from "@src/core/exceptions/RunMQException";
import {Exceptions} from "@src/core/exceptions/Exceptions";
import {AMQPChannel, AMQPClient, RunMQConnectionConfig} from "@src/types";
import {AMQPChannel, AMQPChannelLifecycleCallbacks, AMQPClient, RunMQConnectionConfig} from "@src/types";
import {RabbitMQClientChannel} from "@src/core/clients/RabbitMQClientChannel";
import {RunMQLogger} from "@src";
import {RunMQConsoleLogger} from "@src/core/logging/RunMQConsoleLogger";
Expand All @@ -15,6 +15,7 @@ export class RabbitMQClientAdapter implements AMQPClient {
private defaultChannel: AMQPChannel | undefined;
private isConnected: boolean = false;
private acquiredChannels: Channel[] = [];
private isShuttingDown: boolean = false;

constructor(private config: RunMQConnectionConfig, private logger: RunMQLogger = new RunMQConsoleLogger()) {
}
Expand All @@ -25,6 +26,8 @@ export class RabbitMQClientAdapter implements AMQPClient {
return this.connection;
}

this.isShuttingDown = false;

// Close any existing connection that might be in a bad state
if (this.connection) {
try {
Expand Down Expand Up @@ -88,9 +91,36 @@ export class RabbitMQClientAdapter implements AMQPClient {
}
}

public async getChannel(): Promise<AMQPChannel> {
public async getChannel(callbacks?: AMQPChannelLifecycleCallbacks): Promise<AMQPChannel> {
const connection = await this.connect();
const rawChannel = await connection.acquire();
const channelId = rawChannel.id;

rawChannel.on('error', (err) => {
this.logger.error('RabbitMQ channel error:', {channelId, error: err});
try {
callbacks?.onError?.(err);
} catch (cbErr) {
this.logger.error('RabbitMQ channel onError callback threw:', {channelId, error: cbErr});
}
});

rawChannel.on('close', () => {
this.logger.warn('RabbitMQ channel closed', {channelId});
const idx = this.acquiredChannels.indexOf(rawChannel);
if (idx >= 0) {
this.acquiredChannels.splice(idx, 1);
}
if (this.isShuttingDown) {
return;
}
try {
callbacks?.onClose?.();
} catch (cbErr) {
this.logger.error('RabbitMQ channel onClose callback threw:', {channelId, error: cbErr});
}
});

// Track the channel so we can close it on disconnect
this.acquiredChannels.push(rawChannel);
return new RabbitMQClientChannel(rawChannel);
Expand All @@ -108,6 +138,7 @@ export class RabbitMQClientAdapter implements AMQPClient {
const conn = this.connection;
const channels = this.acquiredChannels;

this.isShuttingDown = true;
this.connection = undefined;
this.defaultChannel = undefined;
this.isConnected = false;
Expand Down
30 changes: 26 additions & 4 deletions src/core/consumer/RunMQConsumerCreator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {RunMQLogger} from "@src/core/logging/RunMQLogger";
import {DefaultDeserializer} from "@src/core/serializers/deserializer/DefaultDeserializer";
import {ConsumerCreatorUtils} from "@src/core/consumer/ConsumerCreatorUtils";
import {RunMQPublisherCreator} from "@src/core/publisher/RunMQPublisherCreator";
import {AMQPChannel, AMQPClient, RabbitMQManagementConfig} from "@src/types";
import {AMQPClient, RabbitMQManagementConfig} from "@src/types";
import {RunMQTTLPolicyManager} from "@src/core/management/Policies/RunMQTTLPolicyManager";
import {RunMQMetadataManager} from "@src/core/management/Policies/RunMQMetadataManager";
import {RunMQException} from "@src/core/exceptions/RunMQException";
Expand Down Expand Up @@ -53,7 +53,17 @@ export class RunMQConsumerCreator {
}

private async runProcessor<T>(consumerConfiguration: ConsumerConfiguration<T>): Promise<void> {
const consumerChannel = await this.getProcessorChannel();
const consumerChannel = await this.client.getChannel({
onClose: () => {
if (!this.client.isActive()) {
return;
}
this.logger.warn('Consumer channel closed; attempting to re-subscribe', {
processor: consumerConfiguration.processorConfig.name,
});
this.resubscribeProcessor(consumerConfiguration);
},
});
const DLQPublisher = new RunMQPublisherCreator(this.logger).createPublisher(Constants.DEAD_LETTER_ROUTER_EXCHANGE_NAME);

const prefetchCount = consumerConfiguration.processorConfig.prefetch ?? DEFAULTS.PREFETCH_COUNT;
Expand Down Expand Up @@ -163,7 +173,19 @@ export class RunMQConsumerCreator {
);
}

private async getProcessorChannel(): Promise<AMQPChannel> {
return await this.client.getChannel()
private resubscribeProcessor<T>(consumerConfiguration: ConsumerConfiguration<T>): void {
const delay = DEFAULTS.RECONNECT_DELAY;
setTimeout(() => {
if (!this.client.isActive()) {
return;
}
this.runProcessor(consumerConfiguration).catch((err) => {
this.logger.error('Failed to re-subscribe consumer; will retry', {
processor: consumerConfiguration.processorConfig.name,
error: err instanceof Error ? err.message : err,
});
this.resubscribeProcessor(consumerConfiguration);
});
}, delay);
}
}
7 changes: 6 additions & 1 deletion src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,15 @@ export interface AMQPChannel {
close(): Promise<void>;
}

export interface AMQPChannelLifecycleCallbacks {
onError?: (err: any) => void;
onClose?: () => void;
}

export interface AMQPClient {
connect(): Promise<any>;

getChannel(): Promise<AMQPChannel>;
getChannel(callbacks?: AMQPChannelLifecycleCallbacks): Promise<AMQPChannel>;

getDefaultChannel(): Promise<AMQPChannel>;

Expand Down
109 changes: 109 additions & 0 deletions tests/e2e/RunMQ.resubscribe.e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import {RunMQ} from '@src/core/RunMQ';
import {RabbitMQClientAdapter} from "@src/core/clients/RabbitMQClientAdapter";
import {Constants} from "@src/core/constants";
import {ChannelTestHelpers} from "@tests/helpers/ChannelTestHelpers";
import {RunMQUtils} from "@src/core/utils/RunMQUtils";
import {MockedRunMQLogger} from "@tests/mocks/MockedRunMQLogger";
import {RunMQConnectionConfigExample} from "@tests/Examples/RunMQConnectionConfigExample";
import {RunMQProcessorConfigurationExample} from "@tests/Examples/RunMQProcessorConfigurationExample";
import {RunMQMessageExample} from "@tests/Examples/RunMQMessageExample";
import {MessageTestUtils} from "@tests/helpers/MessageTestUtils";
import {RabbitMQManagementConfigExample} from "@tests/Examples/RabbitMQManagementConfigExample";

interface ManagementConsumer {
queue: { name: string };
channel_details: { connection_name: string };
}

function authHeader(): string {
const cfg = RabbitMQManagementConfigExample.valid();
return 'Basic ' + Buffer.from(`${cfg.username}:${cfg.password}`).toString('base64');
}

async function closeConnectionsForQueue(queueName: string, timeoutMs: number): Promise<number> {
const cfg = RabbitMQManagementConfigExample.valid();
const deadline = Date.now() + timeoutMs;
const closed = new Set<string>();

while (Date.now() < deadline) {
const res = await fetch(`${cfg.url}/api/consumers`, {
headers: {Authorization: authHeader()},
});
if (!res.ok) throw new Error(`list consumers failed: ${res.status}`);
const consumers = (await res.json()) as ManagementConsumer[];
const targets = consumers.filter((c) => c.queue?.name === queueName);

if (targets.length > 0) {
for (const c of targets) {
const connName = c.channel_details?.connection_name;
if (connName && !closed.has(connName)) {
const del = await fetch(
`${cfg.url}/api/connections/${encodeURIComponent(connName)}`,
{method: 'DELETE', headers: {Authorization: authHeader()}}
);
if (!del.ok && del.status !== 404) {
throw new Error(`close connection failed: ${del.status}`);
}
closed.add(connName);
}
}
return closed.size;
}
await new Promise((r) => setTimeout(r, 200));
}
return closed.size;
}

describe('RunMQ Consumer Channel Resubscription E2E', () => {
const validConfig = RunMQConnectionConfigExample.valid();
const testingConnection = new RabbitMQClientAdapter(validConfig);

afterAll(async () => {
await testingConnection.disconnect();
});

it('should re-subscribe a consumer after its channel is closed by the broker', async () => {
const configuration = RunMQProcessorConfigurationExample.simpleNoSchema('resubscribe_processor');
const topic = 'resubscribe.topic';

const channel = await testingConnection.getChannel();
await ChannelTestHelpers.deleteQueue(channel, configuration.name);

const runMQ = await RunMQ.start(validConfig, MockedRunMQLogger);
const received: any[] = [];
await runMQ.process<Record<string, any>>(topic, configuration, async (msg) => {
received.push(msg);
});

// Sanity: deliver one message before forcing a channel close.
channel.publish(
Constants.ROUTER_EXCHANGE_NAME,
topic,
MessageTestUtils.buffer(RunMQMessageExample.random())
);
await RunMQUtils.delay(500);
expect(received.length).toBe(1);

// Force-close only the connection(s) holding consumers for our queue.
// Scoping by queue avoids killing unrelated parallel-test connections.
const closed = await closeConnectionsForQueue(configuration.name, 5000);
expect(closed).toBeGreaterThan(0);

// Wait long enough for the rabbitmq-client to reconnect and for our
// resubscription to fire (RECONNECT_DELAY is 5s). Add headroom.
await RunMQUtils.delay(8000);

// Re-acquire a publishing channel; the previous one was closed too.
const republishChannel = await testingConnection.getChannel();
republishChannel.publish(
Constants.ROUTER_EXCHANGE_NAME,
topic,
MessageTestUtils.buffer(RunMQMessageExample.random())
);

await RunMQUtils.delay(2000);
expect(received.length).toBe(2);

await runMQ.disconnect();
}, 30000);
});
50 changes: 50 additions & 0 deletions tests/unit/core/consumer/RunMQConsumerCreator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,5 +264,55 @@ describe('RunMQConsumerCreator Unit Tests', () => {
expect(mockedChannel.consume).toHaveBeenCalled();
});
});

describe('consumer channel close handling', () => {
beforeEach(() => {
jest.useFakeTimers();
});

afterEach(() => {
jest.useRealTimers();
});

it('should pass an onClose callback to getChannel for consumer channels', async () => {
await consumerCreator.createConsumer(testConsumerConfig);

expect(mockedClient.getChannel).toHaveBeenCalledWith(
expect.objectContaining({onClose: expect.any(Function)})
);
});

it('should re-subscribe a consumer when its channel closes while client is active', async () => {
mockedClient.isActive.mockReturnValue(true);

await consumerCreator.createConsumer(testConsumerConfig);

const initialCalls = mockedChannel.consume.mock.calls.length;
const lastCall = mockedClient.getChannel.mock.calls.at(-1) as [{ onClose?: () => void }];
const onClose = lastCall[0]?.onClose;
expect(onClose).toBeDefined();

onClose!();
await jest.advanceTimersByTimeAsync(DEFAULTS.RECONNECT_DELAY);

expect(mockedChannel.consume.mock.calls.length).toBeGreaterThan(initialCalls);
});

it('should not re-subscribe a consumer when client is no longer active', async () => {
mockedClient.isActive.mockReturnValue(true);

await consumerCreator.createConsumer(testConsumerConfig);

const callsBeforeClose = mockedChannel.consume.mock.calls.length;
const lastCall = mockedClient.getChannel.mock.calls.at(-1) as [{ onClose?: () => void }];
const onClose = lastCall[0]?.onClose;

mockedClient.isActive.mockReturnValue(false);
onClose!();
await jest.advanceTimersByTimeAsync(DEFAULTS.RECONNECT_DELAY);

expect(mockedChannel.consume.mock.calls.length).toBe(callsBeforeClose);
});
});
});
});
Loading