Summary
The amqp consume callback in RunMQConsumerCreator.runProcessor is async but its returned Promise is never awaited by rabbitmq-client. Any error that escapes the processor chain (notably from RunMQExceptionLoggerProcessor, which re-throws after logging) becomes an unhandled rejection.
Where
src/core/consumer/RunMQConsumerCreator.ts:60-89
src/core/consumer/processors/RunMQExceptionLoggerProcessor.ts:8-21 (rethrows)
src/core/consumer/processors/RunMQFailedMessageRejecterProcessor.ts:8-15 (only catches errors from inner consumer, not from its own nack call)
Failure mode
- Channel closes mid-shutdown →
nack() throws → escapes RunMQFailedMessageRejecterProcessor (its catch only protects the inner consume call) → propagates up → unhandled rejection.
- On Node 15+ with default settings, this terminates the worker process.
- Even before termination: messages are stuck in unacked state until channel timeout, no recovery.
Proposed fix
Wrap the entire inner work of the consume callback in a try/catch that logs and never rethrows:
await consumerChannel.consume(name, async (msg) => {
if (!msg) return;
try {
const rabbitmqMessage = new RabbitMQMessage(...);
await new RunMQExceptionLoggerProcessor(...).consume(rabbitmqMessage);
} catch (e) {
this.logger.error('Unhandled error in consumer chain', { cause: e });
// do NOT rethrow — broker will redeliver on channel close
}
});
Acceptance criteria
Summary
The amqp consume callback in
RunMQConsumerCreator.runProcessorisasyncbut its returned Promise is never awaited byrabbitmq-client. Any error that escapes the processor chain (notably fromRunMQExceptionLoggerProcessor, which re-throws after logging) becomes an unhandled rejection.Where
src/core/consumer/RunMQConsumerCreator.ts:60-89src/core/consumer/processors/RunMQExceptionLoggerProcessor.ts:8-21(rethrows)src/core/consumer/processors/RunMQFailedMessageRejecterProcessor.ts:8-15(only catches errors from inner consumer, not from its ownnackcall)Failure mode
nack()throws → escapesRunMQFailedMessageRejecterProcessor(its catch only protects the innerconsumecall) → propagates up → unhandled rejection.Proposed fix
Wrap the entire inner work of the consume callback in a try/catch that logs and never rethrows:
Acceptance criteria