Skip to content

Commit

Permalink
feat(microservices): dont catch errors thrown by kafka event handling
Browse files Browse the repository at this point in the history
Error thrown by event handling method are no longer being caught by RcpExecptionFilter.
Instead errors are passed to kafkajs's eachMessage.
This results in proper interaction with kafka.

This commit closes Kafka commitOffsets #9283.
  • Loading branch information
davidschuette committed Mar 4, 2022
1 parent 5b24f81 commit 77b7bd6
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 3 deletions.
14 changes: 14 additions & 0 deletions packages/microservices/context/kafka-rpc-proxy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { RpcExceptionsHandler } from 'exceptions/rpc-exceptions-handler';
import { Observable } from 'rxjs';
import { RpcProxy } from './rpc-proxy';

export class KafkaRpcProxy extends RpcProxy {
public create(
targetCallback: (...args: unknown[]) => Promise<Observable<any>>,
exceptionsHandler: RpcExceptionsHandler,
): (...args: unknown[]) => Promise<Observable<unknown>> {
return (...args: unknown[]) => {
return targetCallback(...args);
};
}
}
11 changes: 9 additions & 2 deletions packages/microservices/microservices-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import { PipesContextCreator } from '@nestjs/core/pipes/pipes-context-creator';
import { ClientProxyFactory } from './client';
import { ClientsContainer } from './container';
import { ExceptionFiltersContext } from './context/exception-filters-context';
import { KafkaRpcProxy } from './context/kafka-rpc-proxy';
import { RpcContextCreator } from './context/rpc-context-creator';
import { RpcProxy } from './context/rpc-proxy';
import { Transport } from './enums';
import { CustomTransportStrategy } from './interfaces';
import { ListenersController } from './listeners-controller';
import { Server } from './server/server';
Expand All @@ -23,8 +25,13 @@ export class MicroservicesModule {
private readonly clientsContainer = new ClientsContainer();
private listenersController: ListenersController;

public register(container: NestContainer, config: ApplicationConfig) {
const rpcProxy = new RpcProxy();
public register(
container: NestContainer,
config: ApplicationConfig,
transport?: Transport,
) {
const rpcProxy =
transport === Transport.KAFKA ? new KafkaRpcProxy() : new RpcProxy();
const exceptionFiltersContext = new ExceptionFiltersContext(
container,
config,
Expand Down
7 changes: 6 additions & 1 deletion packages/microservices/nest-microservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ export class NestMicroservice
) {
super(container);

this.microservicesModule.register(container, this.applicationConfig);
this.microservicesModule.register(
container,
this.applicationConfig,
// @ts-expect-error transport does not exist on type of config??
config.transport,
);
this.createServer(config);
this.selectContextModule();
}
Expand Down

0 comments on commit 77b7bd6

Please sign in to comment.