diff --git a/integration/microservices/e2e/sum-kafka.spec.ts b/integration/microservices/e2e/sum-kafka.spec.ts index 0ab86ec7351..0be9ad309ff 100644 --- a/integration/microservices/e2e/sum-kafka.spec.ts +++ b/integration/microservices/e2e/sum-kafka.spec.ts @@ -96,7 +96,21 @@ describe.skip('Kafka transport', function () { .send() .end(() => { setTimeout(() => { - expect(KafkaController.IS_NOTIFIED).to.be.true; + expect(KafkaMessagesController.IS_NOTIFIED).to.be.true; + done(); + }, 1000); + }); + }); + + it(`/POST (async event notification)`, done => { + request(server) + .post('/notifyRegex') + .send() + .end(() => { + setTimeout(() => { + expect(KafkaMessagesController.IS_REGEX_NOTIFIED).to.be.eq( + 'regex.notify.test-0', + ); done(); }, 1000); }); diff --git a/integration/microservices/src/kafka/kafka.controller.ts b/integration/microservices/src/kafka/kafka.controller.ts index 0db49ebcd67..f80b2a112bc 100644 --- a/integration/microservices/src/kafka/kafka.controller.ts +++ b/integration/microservices/src/kafka/kafka.controller.ts @@ -15,7 +15,6 @@ import { UserDto } from './dtos/user.dto'; @Controller() export class KafkaController implements OnModuleInit, OnModuleDestroy { protected readonly logger = new Logger(KafkaController.name); - static IS_NOTIFIED = false; static MATH_SUM = 0; @Client({ @@ -133,6 +132,11 @@ export class KafkaController implements OnModuleInit, OnModuleDestroy { return this.client.emit('notify', { notify: true }); } + @Post('notifyRegex') + async sendRegexNotification(): Promise { + return this.client.emit('regex.notify.test-0', { notify: true }); + } + // Complex data to send. @Post('/user') @HttpCode(200) diff --git a/integration/microservices/src/kafka/kafka.messages.controller.ts b/integration/microservices/src/kafka/kafka.messages.controller.ts index 29acae07f77..187954e270c 100644 --- a/integration/microservices/src/kafka/kafka.messages.controller.ts +++ b/integration/microservices/src/kafka/kafka.messages.controller.ts @@ -1,5 +1,10 @@ import { Controller, Logger } from '@nestjs/common'; -import { EventPattern, MessagePattern } from '@nestjs/microservices'; +import { + Ctx, + EventPattern, + KafkaContext, + MessagePattern, +} from '@nestjs/microservices'; import { BusinessDto } from './dtos/business.dto'; import { UserDto } from './dtos/user.dto'; import { BusinessEntity } from './entities/business.entity'; @@ -10,6 +15,7 @@ import { KafkaController } from './kafka.controller'; export class KafkaMessagesController { protected readonly logger = new Logger(KafkaMessagesController.name); static IS_NOTIFIED = false; + static IS_REGEX_NOTIFIED: any = false; @MessagePattern('math.sum.sync.kafka.message') mathSumSyncKafkaMessage(data: any) { @@ -53,7 +59,12 @@ export class KafkaMessagesController { @EventPattern('notify') eventHandler(data: any) { - KafkaController.IS_NOTIFIED = data.value.notify; + KafkaMessagesController.IS_NOTIFIED = data.value.notify; + } + + @EventPattern(/regex\.notify\.test-[0-9]*/) + regexHandler(data: any, @Ctx() context: KafkaContext) { + KafkaMessagesController.IS_REGEX_NOTIFIED = context.getTopic(); } // Complex data to send. diff --git a/packages/microservices/server/server-kafka.ts b/packages/microservices/server/server-kafka.ts index 397bb049643..1196d108e19 100644 --- a/packages/microservices/server/server-kafka.ts +++ b/packages/microservices/server/server-kafka.ts @@ -28,6 +28,7 @@ import { KafkaLogger, KafkaParser } from '../helpers'; import { CustomTransportStrategy, KafkaOptions, + MessageHandler, OutgoingResponse, ReadPacket, } from '../interfaces'; @@ -49,6 +50,8 @@ export class ServerKafka extends Server implements CustomTransportStrategy { protected clientId: string; protected groupId: string; + protected registeredPatterns: any[] = []; + constructor(protected readonly options: KafkaOptions['options']) { super(); @@ -120,13 +123,12 @@ export class ServerKafka extends Server implements CustomTransportStrategy { } public async bindEvents(consumer: Consumer) { - const registeredPatterns = [...this.messageHandlers.keys()]; const consumerSubscribeOptions = this.options.subscribe || {}; - if (registeredPatterns.length > 0) { + if (this.registeredPatterns.length > 0) { await this.consumer.subscribe({ ...consumerSubscribeOptions, - topics: registeredPatterns, + topics: this.registeredPatterns, }); } @@ -317,4 +319,14 @@ export class ServerKafka extends Server implements CustomTransportStrategy { protected initializeDeserializer(options: KafkaOptions['options']) { this.deserializer = options?.deserializer ?? new KafkaRequestDeserializer(); } + + public addHandler( + pattern: any, + callback: MessageHandler, + isEventHandler: boolean = false, + extras: Record = {}, + ) { + this.registeredPatterns.push(pattern); + super.addHandler(pattern, callback, isEventHandler, extras); + } } diff --git a/packages/microservices/test/server/server-kafka.spec.ts b/packages/microservices/test/server/server-kafka.spec.ts index 5255b43107c..c3b72e171f1 100644 --- a/packages/microservices/test/server/server-kafka.spec.ts +++ b/packages/microservices/test/server/server-kafka.spec.ts @@ -180,10 +180,26 @@ describe('ServerKafka', () => { await server.listen(callback); const pattern = 'test'; - const handler = sinon.spy(); - (server as any).messageHandlers = objectToMap({ - [pattern]: handler, - }); + (server as any).registeredPatterns = [pattern]; + + await server.bindEvents((server as any).consumer); + + expect(subscribe.called).to.be.true; + expect( + subscribe.calledWith({ + topics: [pattern], + }), + ).to.be.true; + + expect(run.called).to.be.true; + expect(connect.called).to.be.true; + }); + it('should call subscribe and run on consumer when there are RegExp messageHandlers', async () => { + (server as any).logger = new NoopLogger(); + await server.listen(callback); + + const pattern = /test/; + (server as any).registeredPatterns = [pattern]; await server.bindEvents((server as any).consumer); @@ -204,10 +220,7 @@ describe('ServerKafka', () => { await server.listen(callback); const pattern = 'test'; - const handler = sinon.spy(); - (server as any).messageHandlers = objectToMap({ - [pattern]: handler, - }); + (server as any).registeredPatterns = [pattern]; await server.bindEvents((server as any).consumer);