Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rpc exception kafka #10379

Closed
wants to merge 2 commits into from
Closed

Conversation

RaiMX
Copy link

@RaiMX RaiMX commented Oct 7, 2022

PR Checklist

Please check if your PR fulfills the following requirements:

PR Type

What kind of change does this PR introduce?

  • Bugfix
  • Feature
  • Code style update (formatting, local variables)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • CI related changes
  • Other... Please describe:

What is the current behavior?

#10113

Issue Number: 10113

What is the new behavior?

Microservice return custom RPC error through Kafka transport

Does this PR introduce a breaking change?

  • Yes
  • No

Other information

Bug caused microservise not to return error to Kafka transport when using custom RPC exception filter

Fixes issue nestjs#10113
@coveralls
Copy link

Pull Request Test Coverage Report for Build d8d9a5ab-12b5-4810-9ec6-34797051705d

  • 1 of 3 (33.33%) changed or added relevant lines in 1 file are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-0.003%) to 93.784%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/microservices/server/server-kafka.ts 1 3 33.33%
Totals Coverage Status
Change from base Build 6b637f64-9c7a-4d7b-8537-b6a8d098f58e: -0.003%
Covered Lines: 6110
Relevant Lines: 6515

💛 - Coveralls

@EugeneKorshenko
Copy link
Contributor

Hey @kamilmysliwiec could you take a quick look at this PR? It seems to fix an issue introduced by this commit: cfe1191

Comment on lines -212 to -215
if (!isPromiseResolved) {
isPromiseResolved = true;
resolve();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this condition is removed? What if a stream emits multiple values (streaming)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @kimvladis can help with this?
Credits for this commit belongs to @kimvladis because he suggested it in #10113 (comment)
I just checked for his solution and it worked for me.

Copy link

@kimvladis kimvladis Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this condition is removed? What if a stream emits multiple values (streaming)?

Reasoned, but as I can see, there are no supporting for multiple values emitting in handlers.
I think this condition can be returned, just in case. @RaiMX

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kimvladis emitting multiple values is supported

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this condition is removed? What if a stream emits multiple values (streaming)?

Reasoned, but as I can see, there are no supporting for multiple values emitting in handlers. I think this condition can be returned, just in case. @RaiMX

Honestly I can't seem to figure this out. Can you take it from here @kimvladis? Or maybe suggest edit, then I will merge it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think I realised scenario, but we never used it like that in our code. So the handler can return observable, and the client can receive multiple values, and multiple values will be emitted to Kafka(same topic, same correlationId)?

Copy link

@kimvladis kimvladis Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, this condition is redundant, because resolving already resolved promises has no effects. Or I missed something?
https://tc39.es/ecma262/#sec-promise-objects

@RaiMX
Copy link
Author

RaiMX commented Nov 18, 2022

So no updates at the moment?

@TheVoid-0
Copy link

TheVoid-0 commented Nov 18, 2022

I was about to open a PR to add that resolve that is blocking the execution after an error is thrown. There's something missing in this PR so that it can be merged? This PR would fix my issue as well

@RaiMX
Copy link
Author

RaiMX commented Dec 2, 2022

@kamilmysliwiec no updates yet?
because of this bug I can't use version 9 in my production...sad

@JKKGBE
Copy link

JKKGBE commented Dec 14, 2022

+1 Please someone look into it. @kamilmysliwiec

@TheVoid-0
Copy link

TheVoid-0 commented Dec 28, 2022

I added the following code to my application as a workaround for now:

// created file kafka-server.ts


// HACK - This is a hack to fix the issue with the @nestjs/microservices package tracked in https://github.com/nestjs/nest/pull/10379
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
export class KafkaServer extends ServerKafka {
  private combineStreamsAndThrowIfRetriable(
    response$: { subscribe: (arg0: { next: (val: any) => void; error: (err: any) => void; complete: () => any }) => void },
    replayStream$: { next: (arg0: any) => void; error: (arg0: any) => void; complete: () => any },
  ) {
    return new Promise<void>((resolve, reject) => {
      let isPromiseResolved = false;
      response$.subscribe({
        next: val => {
          replayStream$.next(val);
          if (!isPromiseResolved) {
            isPromiseResolved = true;
            resolve();
          }
        },
        error: err => {
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
            isPromiseResolved = true;
            reject(err);
          }
          replayStream$.error(err);
          resolve(); // This line is needed to prevent server stall
        },
        complete: () => replayStream$.complete(),
      });
    });
  }
}
// on main.ts

 app.connectMicroservice<MicroserviceOptions>(
      {
        strategy: new KafkaServer({
          client: {
            brokers: environment.KAFKA_URL.map(url => `${url}:${environment.KAFKA_PORT}`),
            clientId: 'logs',
          },
          consumer: { groupId: 'logs' },
        }),
      },
      { inheritAppConfig: true },
    );

    await app.startAllMicroservices();

@kamilmysliwiec
Copy link
Member

Fixed in this PR #10982

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants