Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

It is not possible to use RegEx for event matching on the @EventPattern() adn @MessagePattern() #3083

Open
alfredoperez opened this issue Oct 1, 2019 · 31 comments

Comments

@alfredoperez
Copy link

Bug Report

.

Current behavior

It is not possible to use a RegEx for event and message patterns. This is possible when using KafkaJS library but is not possible in NestJS

Input Code

Having a controller emitting a message 'pre-notify-post'

@Post('notify-with-regex')
  async sendNotificationForRegExMatching(): Promise<any> {
    return this.client.emit('pre-notify-post', { notify: true });
  }

Also having a controller with the event handler and expecting messages that match the regular expression of /.*notify.*/ :

 @EventPattern(/.*notify.*/)
  secondEventHandler(data: any) {
    KafkaController.IS_NOTIFIED_WITH_REGEX = data.value.notify;
  }

Currently, the event handler never gets called, because the handler is not matched using regular expressions.

Expected behavior

It is expected to listen to events and messages by using RegEx patterns. This is possible in KafkaJS library and it is useful when having a set of a topic that the consumer wants to listen to.

Alternatively, you can subscribe to multiple topics at once using a RegExp:

await consumer.connect()
await consumer.subscribe({ topic: /topic-(eu|us)-.*/i })

https://kafka.js.org/docs/consuming

Possible Solution

This could be fixed by getting the matching routes.

Environment

Nest version: 6.7

@alfredoperez alfredoperez added the needs triage This issue has not been looked into label Oct 1, 2019
@alfredoperez
Copy link
Author

I can work on the fix for this, but I want to get your opinion first.

@kamilmysliwiec
Copy link
Member

I'd love to see the draft PR. Would you like to create one with your idea?

@Vetm
Copy link

Vetm commented May 10, 2021

Any updates on this? Without this feature kafka is pretty much useless for us.

@inmativ
Copy link

inmativ commented Jul 15, 2021

I also need this feature. Some microservices need to handle different kafka topics in the same way. We have to write a separate listener for each topic.

@vviital
Copy link

vviital commented Jul 29, 2021

Hello! Any updates? We also stuck with this.

@kuskoman
Copy link

kuskoman commented Aug 3, 2021

I'm just a random with some experience in creating own microservice transport (actually at work we replaced nearly whole nest/microservices to match our needs), but i can try to implement it in free time.
@alfredoperez if i understand it correctly, this kind of pattern matching would require an array of regexes checking if given event has a regex matching it pattern.
However, unlike for string matching, which is exact, more than one regex can match one string.
What should happen (/what is currently happening in kafka in implementation you mentioned) when it is happening?

For example in case when the event is "someString", and there are 2 handlers: someHandler matching /some.*/ and string handler matching for /.*string/i. Should the first (which is first?) handler be executed, or both of them?

Sorry for mentioning, but @kamilmysliwiec how would you handle this situation? Is there any place in code when there is similiar (regex based) patter matching?

@Idan747
Copy link

Idan747 commented Oct 13, 2021

Hi, any update on this?

@Jaikant
Copy link

Jaikant commented Nov 22, 2021

We have a similar requirement, either the MessagePattern decorator takes in an array of strings or a regex could also work.

@fjodor-rybakov
Copy link

If anyone needs, then there is a workaround for regexp support. You just need to add custom strategy and override bindEvents method as shown below.

import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 2),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }
}

And use regexp like this

@EventPattern('/my-super-topic.retry.[0-9]+$/', { flags: 'i' })
async mySuperTopicRetry() {}

@jfelicianiats
Copy link

jfelicianiats commented Jul 22, 2022

@fjodor-rybakov Was there anything else you had to change? I copy / pasted your class and added it as the strategy for my microservice to just try it and it doesn't call the regular expression event pattern. I get the following Error instead.

@EventPattern('/myevent-.*/', { flags: 'i' })
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

// ... elsewhere in the code I call the emit
this.client.emit('myevent-1', obj);
ERROR [ServerKafka] There is no matching event handler defined in the remote service. Event pattern: myevent-1

If I hard-code the event pattern it works...

@EventPattern('myevent-1')
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

@fjodor-rybakov
Copy link

@fjodor-rybakov Was there anything else you had to change? I copy / pasted your class and added it as the strategy for my microservice to just try it and it doesn't call the regular expression event pattern. I get the following Error instead.

@EventPattern('/myevent-.*/', { flags: 'i' })
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

// ... elsewhere in the code I call the emit
this.client.emit('myevent-1', obj);
ERROR [ServerKafka] There is no matching event handler defined in the remote service. Event pattern: myevent-1

If I hard-code the event pattern it works...

@EventPattern('myevent-1')
async onSomeEvent(data: any) {
  // this never gets called
  console.log('->', data);
}

Sry, i forgot...

You also need to override the getHandlerByPattern method, that it looks for a handler with a suitable regex for the consumer topic

@jfelicianiats
Copy link

So thanks for the reply @fjodor-rybakov I got it working somewhat now but there are definite issues still. For example, if you have multiple EventPatterns using Regular Expressions that overlap on the event that should fire (ie. /myevent-[0-9]+/ and /myevent-.*/) only the first one found will execute or if there is an exact string event name only that then would fire.

Here is the updated class that I was testing with...

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 1),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  override getHandlerByPattern(pattern: string) {
    const route = this.getRouteFromPattern(pattern);

    return this.messageHandlers.has(route)
      ? this.messageHandlers.get(route)
      : this.testRegularExpressions(route) || null;
  }

  private testRegularExpressions(pattern: string) {
    for (const [key, val] of this.messageHandlers.entries()) {
      if (!key.startsWith('/') || !key.endsWith('/')) continue;

      const regex = new RegExp(
        key.slice(1, pattern.length - 1),
        val.extras.flags,
      );
      if (regex.test(pattern)) {
        return val;
      }
    }
  }
}

I also attempted to resolve these issues but this is over my head atm for the amount of time I have to work with it. I took a peek at the source code and attempted to mimic chaining the funcitons via the next property but it still doesn't work correctly. There is probably something else happening behind the scenes with the way next is setup and used I didn't see or understand.

Here is the code for that if anyone wants to expand off the idea...

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 1),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  override getHandlerByPattern(pattern: string) {
    const route = this.getRouteFromPattern(pattern);
    const handlers: MessageHandler[] = [];

    if (this.messageHandlers.has(route))
      handlers.push(this.messageHandlers.get(route));

    for (const [key, val] of this.messageHandlers.entries()) {
      if (!key.startsWith('/') || !key.endsWith('/')) continue;

      const regex = new RegExp(key.slice(1, key.length - 1), val.extras.flags);

      if (regex.test(pattern)) {
        handlers.push(val);
      }
    }

    const allHandlers: MessageHandler[][] = [];
    for (let i = 0; i < handlers.length; i++) {
      const handler = handlers[i];
      const hierarchy: MessageHandler[] = [];

      let nextChild = handler;
      while (nextChild) {
        hierarchy.push(this.cloneHandle(nextChild));
        nextChild = nextChild.next;
      }

      allHandlers.push(hierarchy);
    }

    const flattened = allHandlers.flat();
    for (let i = flattened.length - 1; i >= 0; i--) {
      const handler = flattened[i];
      const prev = flattened[i - 1];

      if (prev) prev.next = handler;
    }

    return flattened.length > 0 ? flattened[0] : null;
  }

  private cloneHandle(handle: MessageHandler) {
    const dup = handle.bind({}) as MessageHandler;
    dup.isEventHandler = handle.isEventHandler;
    dup.extras = { ...handle.extras };
    return dup;
  }
}

@fjodor-rybakov
Copy link

@jfelicianiats

This should work

import { CustomTransportStrategy, ServerKafka } from '@nestjs/microservices';
import { Consumer } from 'kafkajs';

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 2),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  public override getHandlerByPattern(pattern: string) {
    const handler = super.getHandlerByPattern(pattern);
    if (handler) {
      return handler;
    }

    return this.getHandlerByRegExp(pattern);
  }

  private getHandlerByRegExp(pattern: string) {
    const route = this.getRouteFromPattern(pattern);

    const keys = this.messageHandlers.keys();
    for (const key of keys) {
      const regexp = new RegExp(key);
      if (regexp.test(route)) return this.messageHandlers.get(key);
    }

    return null;
  }
}

@ww917352
Copy link

ww917352 commented Aug 4, 2022

Hi everybody,

Thanks for the discussion and posting workarounds.

My question is: do these workarounds also support topics that get created in future? If my app is deployed, and then a topic whose name match the regex gets created, will the app get subscribed to this new topic?

Thank you!

Cheers,
Wei

@fjodor-rybakov
Copy link

Hi everybody,

Thanks for the discussion and posting workarounds.

My question is: do these workarounds also support topics that get created in future? If my app is deployed, and then a topic whose name match the regex gets created, will the app get subscribed to this new topic?

Thank you!

Cheers, Wei

No, subscribe on topic only happens when consumer starts

You can pass a list of matching topics at once

@EventPattern(['topic.one', 'topic.two'])

I think this will solve your problem

@ww917352
Copy link

ww917352 commented Aug 4, 2022

Thank you @fjodor-rybakov

We have a topic for each customer such as <<customerId>>.traffic. It is impossible to list them all in @EventPattern. Also when new customers are onboarded, new topics will be added.

We try to do with @EventPattern('/*.traffic/'), but it does not work.

Or, do I misunderstand your suggestion?

@lfowlie
Copy link

lfowlie commented Sep 1, 2022

Same issue as @ww917352, any updates on this?

@NishKebab
Copy link

Adding +1, this is greatly needed since any message that comes in with a malformed pattern will stop the app from consuming completely.

@NenadJovicic
Copy link

+1 for this

@g-capasso
Copy link

+1

1 similar comment
@beebeebeeebeee
Copy link

+1

@katrotz
Copy link

katrotz commented Nov 2, 2022

Before deciding on an official Api for this, please consider the same partial matching in Event/MessagePattern, when the pattern is an object.

Eg. The Nest microservice consumes messages from a queue and handler qualifies based on a sub-set of the message properties/attributes

Given message handler definition

@MessagePattern({ foo: 'foo' }, { partialMatch: true }) // ?
public handlerMethod() {}

it qualifies incoming message patterns that partially match

{ foo: 'foo', bar: 'bar', ... }

@eugene-chernyshenko
Copy link

+1

@eugene-chernyshenko
Copy link

export class KafkaCustomTransport
  extends ServerKafka
  implements CustomTransportStrategy
{
  override async bindEvents(consumer: Consumer): Promise<void> {
    const registeredPatterns = [...this.messageHandlers.entries()].map(
      ([pattern, handler]) =>
        pattern.startsWith('/') && pattern.endsWith('/')
          ? new RegExp(
              pattern.slice(1, pattern.length - 2),
              handler.extras.flags,
            )
          : pattern,
    );
    const consumerSubscribeOptions = this.options.subscribe || {};
    const subscribeToPattern = async (pattern: string) =>
      consumer.subscribe({
        topic: pattern,
        ...consumerSubscribeOptions,
      });
    await Promise.all(registeredPatterns.map(subscribeToPattern));

    const consumerRunOptions = Object.assign(this.options.run || {}, {
      eachMessage: this.getMessageHandler(),
    });
    await consumer.run(consumerRunOptions);
  }

  public override getHandlerByPattern(pattern: string) {
    const handler = super.getHandlerByPattern(pattern);
    if (handler) {
      return handler;
    }

    return this.getHandlerByRegExp(pattern);
  }

  private getHandlerByRegExp(pattern: string) {
    const route = this.getRouteFromPattern(pattern);

    const keys = this.messageHandlers.keys();
    for (const key of keys) {
      const regexp = new RegExp(key);
      if (regexp.test(route)) return this.messageHandlers.get(key);
    }

    return null;
  }
}

Some fix

const regexp = new RegExp(key.slice(1, key.length - 1));

@bozorgmehr96
Copy link

Any update?

@ryakoviv
Copy link

+1

@johansenja
Copy link

Also would like somthing like this (more specifically with rabbitMq) - I was slightly misled by the docs here https://docs.nestjs.com/microservices/basics#decorators which implies to me that

@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
  console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
  return new Date().toLocaleTimeString(...);
}

would match "time.us.west", "time.us.east" etc.

@abrudane
Copy link

@kamilmysliwiec Hello! Do you know if this was fixed? Or there wasn't a solution found?
We could really use this feature since we have different envs, hence different topics to consume from.

@PiotrOwsiak
Copy link

NestJS feels like a collection of leaky abstractions with missing features and edge cases creeping there to surprise you every now and then and the official docs don't go into the details deeply enough to prepare for those things so you discover them only as you go :(

@buccfer-knauf
Copy link

Updates?

@vtereshyn
Copy link

any updates here?

@nestjs nestjs locked and limited conversation to collaborators Feb 1, 2024
edeesis added a commit to edeesis/nest that referenced this issue May 1, 2024
edeesis added a commit to edeesis/nest that referenced this issue May 1, 2024
edeesis added a commit to edeesis/nest that referenced this issue May 1, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.