Skip to content

KafkaJs seems not maintained anymore #13223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 task done
andreacioni opened this issue Feb 17, 2024 · 27 comments
Open
1 task done

KafkaJs seems not maintained anymore #13223

andreacioni opened this issue Feb 17, 2024 · 27 comments

Comments

@andreacioni
Copy link

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

Lately I was using NestJs to build a Kafka micro service for a consumer application. I had to study in depth what NestJs uses as the underlying transport to be able to communicate with Kafka brokers safely and efficiently. To be sure of that I needed to tune the underlying Kafka client implementation KafkaJs. While checking the repository I saw that this project is no more maintained and the people who previously worked on it are no longer responding even to discussion they started (see tulios/kafkajs#1603). Because of that I think that this situation could mine the stability of the NestJs project too considering that Kafka is a must-have integration for today's projects.

Describe the solution you'd like

The solution should be to migrate to a stable Kafka implementation that have a strong support. node-rdkafka, which wrap the official C/C++ Confluent driver should be a good choice.

Teachability, documentation, adoption, migration strategy

Dropping the support for KafkaJs may not be the best solution solution for whom is using it without issue in production but certainly you may allow people to start or switch to a different implementation (https://www.npmjs.com/package/node-rdkafka). That could be the right way to start removing support for KafkaJs in case the project remains in this stale situation.

What is the motivation / use case for changing the behavior?

Avoid to be locked in a Kafka implementation that is not going to be supported anymore.

@andreacioni andreacioni added needs triage This issue has not been looked into type: enhancement 🐺 labels Feb 17, 2024
@samuelleach
Copy link

Sam from Confluent here.

I know that the Confluent team are now officially supporting a javascript client with this early access release:

https://github.com/confluentinc/confluent-kafka-javascript

From the Readme:

confluent-kafka-javascript is Confluent's JavaScript client for Apache Kafka and the Confluent Platform. This is an early access library. The goal is to provide an highly performant, reliable and easy to use JavaScript client that is based on node-rdkafka yet also API compatible with KafkaJS to provide flexibility to users and streamline migrations from other clients.

Check it out and see if it meets your needs.

@andreacioni
Copy link
Author

Hi @samuelleach, thanks for pointing it out. I didn't knew about this work, on Confluent website I just saw a mention to node-rdkafka.

Anyway based on the facts that this library is at an early development stage and the migration from node-rdkafka is going to be trivial (just switching the import statement) I'd suggest for starter to migrate from KafkaJs to node-rdkafka. But That's just my personal opinion.

@trevordixon
Copy link

+1 As a heavy user of NestJS and Kafka, I'd like to know the framework uses the best lib. https://github.com/confluentinc/confluent-kafka-javascript says, "This library is currently in early access and not meant for production use." I'd like to see NestJS switch to node-rdkafka sooner than later while keeping an eye on confluent-kafka-javascript for the future.

@andreacioni
Copy link
Author

Any thoughts from the maintainers? Because if you decide to change/add node-rdkafka for the underlying implementation of Kafka microservices I'm wondering to propose a PR.

@mkaufmaner
Copy link
Contributor

Sam from Confluent here.

I know that the Confluent team are now officially supporting a javascript client with this early access release:

https://github.com/confluentinc/confluent-kafka-javascript

From the Readme:

confluent-kafka-javascript is Confluent's JavaScript client for Apache Kafka and the Confluent Platform. This is an early access library. The goal is to provide an highly performant, reliable and easy to use JavaScript client that is based on node-rdkafka yet also API compatible with KafkaJS to provide flexibility to users and streamline migrations from other clients.

Check it out and see if it meets your needs.

@samuelleach Looking forward to this library becoming production ready. It is disappointing to see the KafkaJS project become stale and I appreciate Confluent picking it up.

One thing to keep in mind is that NestJS uses a custom partitioner for assigning reply topics to support the MessagePattern. See https://github.com/nestjs/nest/blob/master/packages/microservices/helpers/kafka-reply-partition-assigner.ts

@mkaufmaner
Copy link
Contributor

Related to tulios/kafkajs#1603

@hendrikpeilke
Copy link

Version 1.0.0 of confluent-kafka-javascript is released (see https://github.com/confluentinc/confluent-kafka-javascript/releases/tag/v1.0.0).

Are there any plans on switching to that now stable library from the non maintained kafkajs library?

@phidang-bic
Copy link

I’m really looking forward to this

@sirmonin
Copy link

sirmonin commented Feb 7, 2025

Please, do not replace kafkajs. It still provides useful functionality such as partition reassignment. There is no such functionality in node kafka client libraries that use librdkafka.

As for the issue #12703, I believe the reason why messages are consumed one topic at a time, is because in nest/microservices ServerKafka creates only one consumer for all the topics. To consume topics concurrently, each topic must have its own consumer.

Current behavior:

  • Only one kafkajs consumer is created, subscribed to multiple topics
  • This consumer has eachMessage, that dynamically assignes the handler based on the topic from the payload.

Supposed behavior:

  • kafkajs consumer is created for each topic, to allow concurrency
  • eachMessage handler is assigned based on the consumer topic

An option to create a dedicated consumer for a topic would have solved the issue with concurrent processing of messages in different topics.

@Forceres
Copy link

Forceres commented Feb 9, 2025

Please, do not replace kafkajs. It still provides useful functionality such as partition reassignment. There is no such functionality in node kafka client libraries that use librdkafka.

As for the issue #12703, I believe the reason why messages are consumed one topic at a time, is because in nest/microservices ServerKafka creates only one consumer for all the topics. To consume topics concurrently, each topic must have its own consumer.

Current behavior:

* Only one kafkajs consumer is created, subscribed to multiple topics

* This consumer has `eachMessage`, that dynamically assignes the handler based on the topic from the payload.

Supposed behavior:

* kafkajs consumer is created for each topic, to allow concurrency

* `eachMessage` handler is assigned based on the consumer topic

An option to create a dedicated consumer for a topic would have solved the issue with concurrent processing of messages in different topics.

There is no point to keep kafkajs, it is not maintained at all (issues will never be closed). Any mechanisms such as partition reassignment could be implemented in different libraries.

@nestjs nestjs deleted a comment from TorinAsakura Mar 18, 2025
@andreacioni
Copy link
Author

Sam from Confluent here.

I know that the Confluent team are now officially supporting a javascript client with this early access release:

https://github.com/confluentinc/confluent-kafka-javascript

From the Readme:

confluent-kafka-javascript is Confluent's JavaScript client for Apache Kafka and the Confluent Platform. This is an early access library. The goal is to provide an highly performant, reliable and easy to use JavaScript client that is based on node-rdkafka yet also API compatible with KafkaJS to provide flexibility to users and streamline migrations from other clients.

Check it out and see if it meets your needs.

About this, I took some time to play around with Confluent kafka-javascript library and NestJS and I find it very promising, also it recently reached a stable version. I didn't try to replace the internal microservice implementation but I've just used it as a standalone module. If anyone is interested I've wrote a short article (here) with my discoveries, hope it will help someone.

Additionally, I'm still interested to try to replace the KafkaJS implementation behind NestJS to use Confluent library, but as the position of the maintainers is not very clear I don't know of it's worth investing some time in it.

@kamilmysliwiec
Copy link
Member

but as the position of the maintainers is not very clear I don't know of it's worth investing some time in it.

Ideally, we'd see a new transport strategy (as suggested here #13538) so folks can slowly migrate over time if they see a need for it.

@micalevisk
Copy link
Member

another lib: @platformatic/kafka. Created by Node.js members. Article: Why we created another Kafka client for Node.js

@andreacioni
Copy link
Author

another lib: @platformatic/kafka. Created by Node.js members. Article: Why we created another Kafka client for Node.js

Cool! I didn't know about this one. However the comparison they have made is on node-rdkafka and KafkaJS not on the new Confluent kafka-javascript library which should be the reference for benchmarks and DX.

@frct1
Copy link

frct1 commented May 3, 2025

Definitely +1 on this topic. Seems like KafkaJS introduces a lot of useless CPU/memory usage for no reason. There is like 2-3 messages per minute. You can see the difference that service running with no Kafka strategy enabled uses 10x times less CPU time and almost a 3x times less memory. Spot the difference when Kafka has been disabled starting 11pm until 1am. After about 12 hours memory usage goes back to "normal" but still 50MB higher for unknown reason but CPU still high.

Image

@kamilmysliwiec
Copy link
Member

kamilmysliwiec commented May 15, 2025

I've been working on a custom transport strategy that allows integrating @platformatic/kafka but sadly there's 1 feature missing in order to achieve the full feature parity with the current built-in kafkajs transporter (platformatic/kafka#34)

Otherwise, the transporter seems to be working well - will share it as soon as it's ready to use (can be safely used for those who rely on @EventHandler handlers - event-driven communication). Consumer might be losing responses every now and then for @MessageHandler handlers (due to partitions reassignment etc)

@kamilmysliwiec
Copy link
Member

@samuelleach is there any chance that Confluent's package will support custom partition assigners?

@kamilmysliwiec
Copy link
Member

kamilmysliwiec commented May 19, 2025

For those who want to use @platformatic/kafka in their projects (even though it doesn't support setting a custom partition assigner yet), you can use the following code snippets (just copy & paste them into your project):


Server class (platformatic-kafka.strategy.ts)

Click to expand
import { Logger } from '@nestjs/common';
import {
  CustomTransportStrategy,
  KafkaHeaders,
  KafkaRetriableException,
  OutgoingResponse,
  Server,
} from '@nestjs/microservices';
import {
  KAFKA_DEFAULT_CLIENT,
  KAFKA_DEFAULT_GROUP,
  NO_EVENT_HANDLER,
  NO_MESSAGE_HANDLER,
} from '@nestjs/microservices/constants';
import {
  Broker,
  ConsumeOptions,
  Consumer,
  ConsumerOptions,
  Message,
  ProduceOptions,
  Producer,
  ProduceResult,
  ProducerOptions,
  stringDeserializers,
  stringSerializers,
} from '@platformatic/kafka';
import { isObservable, lastValueFrom, Observable, ReplaySubject } from 'rxjs';
import { PlatformaticKafkaContext } from './platformatic-kafka.context';

type PlatformaticClientEvent =
  | 'client:broker:connect'
  | 'client:broker:disconnect'
  | 'client:broker:failed'
  | 'client:broker:drain';

export enum PlatformaticKafkaStatus {
  CONNECTED = 'connected',
  DISCONNECTED = 'disconnected',
  FAILED = 'failed',
}

export interface PlatformaticKafkaOptions {
  brokers: string[] | Broker[];
  clientId?: string;
  groupId?: string;
  postfixId?: string;
  forceClose?: boolean;
  consumer?: Omit<
    ConsumerOptions<any, any, any, any>,
    'clientId' | 'bootstrapBrokers'
  >;
  producer?: Omit<
    ProducerOptions<any, any, any, any>,
    'clientId' | 'bootstrapBrokers' | 'groupId'
  >;
  consumeOptions?: ConsumeOptions<any, any, any, any>;
  produceOptions?: ProduceOptions<any, any, any, any>;
}

export class PlatformaticKafkaStrategy
  extends Server
  implements CustomTransportStrategy
{
  protected readonly logger = new Logger(PlatformaticKafkaStrategy.name);
  private readonly clientId: string;
  private readonly groupId: string;
  private _consumer: Consumer<string, string, string, string>;
  private _producer: Producer<string, string, string, string>;
  private readonly eventsMap: Record<
    PlatformaticKafkaStatus,
    PlatformaticClientEvent
  > = {
    [PlatformaticKafkaStatus.CONNECTED]: 'client:broker:connect',
    [PlatformaticKafkaStatus.DISCONNECTED]: 'client:broker:disconnect',
    [PlatformaticKafkaStatus.FAILED]: 'client:broker:failed',
  };

  get consumer(): Consumer<string, string, string, string> {
    if (!this._consumer) {
      throw new Error('No consumer initialized');
    }
    return this._consumer;
  }

  get producer(): Producer<string, string, string, string> {
    if (!this._producer) {
      throw new Error('No producer initialized');
    }
    return this._producer;
  }

  constructor(private readonly options: PlatformaticKafkaOptions) {
    super();

    const postfixId = this.getOptionsProp(this.options, 'postfixId', '-server');

    this.clientId = (options.clientId ?? KAFKA_DEFAULT_CLIENT) + postfixId;
    this.groupId = (options.groupId ?? KAFKA_DEFAULT_GROUP) + postfixId;
  }

  public async listen(callback: () => void): Promise<void> {
    this._consumer = this.createConsumer();
    this._producer = this.createProducer();
    this.registerConsumerEventListeners();
    this.registerProducerEventListeners();

    await this.consumeTopics();
    callback();
  }

  public async close() {
    this.logger.log('Closing Kafka connection...');
    await Promise.all([
      this._consumer.close(this.options.forceClose),
      this._producer.close(),
    ]);
  }

  public on<
    EventKey extends string = string,
    EventCallback extends Function = Function,
  >(event: EventKey, callback: EventCallback) {
    throw new Error(
      'Method not supported, register events using the "consumer" and "producer" attributes',
    );
  }

  public unwrap<T>(): T {
    return [this._consumer, this._producer] as T;
  }

  public async consumeTopics() {
    const registeredPatterns = [...this.messageHandlers.keys()];
    const consumeOptions = this.options.consumeOptions || {};

    if (registeredPatterns.length === 0) {
      return;
    }

    const hasBidirectionalTopics = Array.from(
      this.messageHandlers.values(),
    ).some(handler => !handler.isEventHandler);
    if (hasBidirectionalTopics) {
      // The "@platformatic/kafka" library does not support custom partition assignment
      // for bidirectional topics, so messages can be lost every now and then during
      // rebalancment.
      this.logger.warn(
        'Bidirectional communication is not supported by the "@platformatic/kafka" library. Messages can be lost during rebalancing. Avoid using the "@MessagePattern" decorator and instead consider processing messages asynchronously with the "@EventPattern" decorator.',
      );
    }

    const stream = await this._consumer!.consume({
      autocommit: true,
      sessionTimeout: 10000,
      heartbeatInterval: 500,
      topics: registeredPatterns,
      ...consumeOptions,
    });
    stream.on('data', message => this.handleMessage(message));
    stream.on('error', error =>
      this.handleError(typeof error === 'string' ? error : error.message),
    );
    this.logger.log('Kafka consumer ready to process messages');
  }

  public async handleMessage(payload: Message<string, string, string, string>) {
    const headers = payload.headers;
    const correlationId = headers.get(KafkaHeaders.CORRELATION_ID);
    const replyTopic = headers.get(KafkaHeaders.REPLY_TOPIC);
    const replyPartition = headers.get(KafkaHeaders.REPLY_PARTITION);
    const kafkaContext = new PlatformaticKafkaContext([
      payload,
      payload.partition,
      payload.topic,
      payload.headers,
    ]);
    const handler = this.getHandlerByPattern(payload.topic);

    // If the correlation id or reply topic is not set
    // then this is an event (events could still have correlation id)
    if (handler?.isEventHandler || !correlationId || !replyTopic) {
      return this.handleEvent(payload.topic, payload.value, kafkaContext);
    }

    const publish = this.getPublisher(
      replyTopic,
      replyPartition,
      correlationId,
    );

    if (!handler) {
      return publish({
        id: correlationId,
        err: NO_MESSAGE_HANDLER,
      });
    }

    try {
      const deserializedValue = JSON.parse(payload.value);
      const response$ = this.transformToObservable(
        handler(deserializedValue, kafkaContext),
      );

      const replayStream$ = new ReplaySubject();
      await this.combineStreamsAndThrowIfRetriable(response$, replayStream$);

      this.send(replayStream$, publish);
    } catch (err) {
      this.logger.error(err);
      return publish({
        id: correlationId,
        err: err,
      });
    }
  }

  public async handleEvent(
    topic: string,
    value: unknown,
    context: PlatformaticKafkaContext,
  ): Promise<any> {
    try {
      const handler = this.getHandlerByPattern(topic);
      if (!handler) {
        return this.logger.error(NO_EVENT_HANDLER`${topic}`);
      }
      const deserializedValue = JSON.parse(value as string);
      const resultOrStream = await handler(deserializedValue, context);
      if (isObservable(resultOrStream)) {
        await lastValueFrom(resultOrStream);
      }
    } catch (err) {
      this.logger.error(err);
    }
  }

  private registerConsumerEventListeners() {
    if (!this._consumer) {
      return;
    }
    this._consumer.on(this.eventsMap.connected, () =>
      this._status$.next(PlatformaticKafkaStatus.CONNECTED),
    );
    this._consumer.on(this.eventsMap.disconnected, () =>
      this._status$.next(PlatformaticKafkaStatus.DISCONNECTED),
    );
    this._consumer.on(this.eventsMap.failed, () =>
      this._status$.next(PlatformaticKafkaStatus.FAILED),
    );
  }

  private registerProducerEventListeners() {
    if (!this._producer) {
      return;
    }
    this._producer.on(this.eventsMap.connected, () =>
      this._status$.next(PlatformaticKafkaStatus.CONNECTED),
    );
    this._producer.on(this.eventsMap.disconnected, () =>
      this._status$.next(PlatformaticKafkaStatus.DISCONNECTED),
    );
    this._producer.on(this.eventsMap.failed, () =>
      this._status$.next(PlatformaticKafkaStatus.FAILED),
    );
  }

  private getPublisher(
    replyTopic: string,
    replyPartition: string,
    correlationId: string,
  ): (data: OutgoingResponse) => Promise<ProduceResult> {
    return (data: OutgoingResponse) =>
      this.sendMessage(data, replyTopic, replyPartition, correlationId);
  }

  private async sendMessage(
    message: OutgoingResponse,
    replyTopic: string,
    replyPartition: string | undefined | null,
    correlationId: string,
  ): Promise<ProduceResult> {
    const outgoingMessage = {
      topic: replyTopic,
      value: JSON.stringify(message.response),
      headers: {},
    } as Partial<Message<string, string, string, string>>;

    this.assignReplyPartition(replyPartition, outgoingMessage);
    this.assignCorrelationIdHeader(correlationId, outgoingMessage);
    this.assignErrorHeader(message, outgoingMessage);
    this.assignIsDisposedHeader(message, outgoingMessage);

    return this.producer!.send({
      ...this.options.produceOptions,
      messages: [outgoingMessage as Message<string, string, string, string>],
    });
  }

  private createConsumer() {
    return new Consumer({
      groupId: this.groupId,
      clientId: this.clientId,
      bootstrapBrokers: this.options.brokers,
      maxWaitTime: 1000,
      autocommit: 100,
      deserializers: stringDeserializers,
      ...this.options.consumer,
    });
  }

  private createProducer() {
    return new Producer({
      bootstrapBrokers: this.options.brokers,
      clientId: this.clientId,
      serializers: stringSerializers,
      ...this.options.producer,
    });
  }

  private combineStreamsAndThrowIfRetriable(
    response$: Observable<any>,
    replayStream$: ReplaySubject<unknown>,
  ) {
    return new Promise<void>((resolve, reject) => {
      let isPromiseResolved = false;
      response$.subscribe({
        next: val => {
          replayStream$.next(val);
          if (!isPromiseResolved) {
            isPromiseResolved = true;
            resolve();
          }
        },
        error: err => {
          if (err instanceof KafkaRetriableException && !isPromiseResolved) {
            isPromiseResolved = true;
            reject(err);
          } else {
            resolve();
          }
          replayStream$.error(err);
        },
        complete: () => replayStream$.complete(),
      });
    });
  }

  private assignIsDisposedHeader(
    outgoingResponse: OutgoingResponse,
    outgoingMessage: Partial<Message<string, string, string, string>>,
  ) {
    if (!outgoingResponse.isDisposed) {
      return;
    }
    outgoingMessage.headers![KafkaHeaders.NEST_IS_DISPOSED] = '1';
  }

  private assignErrorHeader(
    outgoingResponse: OutgoingResponse,
    outgoingMessage: Partial<Message<string, string, string, string>>,
  ) {
    if (!outgoingResponse.err) {
      return;
    }
    const stringifiedError =
      typeof outgoingResponse.err === 'object'
        ? JSON.stringify(outgoingResponse.err)
        : outgoingResponse.err;
    outgoingMessage.headers![KafkaHeaders.NEST_ERR] = stringifiedError;
  }

  private assignCorrelationIdHeader(
    correlationId: string,
    outgoingMessage: Partial<Message<string, string, string, string>>,
  ) {
    outgoingMessage.headers![KafkaHeaders.CORRELATION_ID] = correlationId;
  }

  private assignReplyPartition(
    replyPartition: string | null | undefined,
    outgoingMessage: Partial<Message<string, string, string, string>>,
  ) {
    if (typeof replyPartition === 'undefined' || replyPartition === null) {
      return;
    }
    outgoingMessage.partition = parseFloat(replyPartition);
  }
}

Client class (platformatic-kafka.client.ts)

Click to expand
import { Logger } from '@nestjs/common';
import { isNil, isUndefined } from '@nestjs/common/utils/shared.utils';
import {
  ClientProxy,
  IncomingResponse,
  MsPattern,
  OutgoingEvent,
  ReadPacket,
  WritePacket,
} from '@nestjs/microservices';
import {
  KAFKA_DEFAULT_CLIENT,
  KAFKA_DEFAULT_GROUP,
} from '@nestjs/microservices/constants';
import { KafkaHeaders } from '@nestjs/microservices/enums';
import { InvalidKafkaClientTopicException } from '@nestjs/microservices/errors/invalid-kafka-client-topic.exception';
import { InvalidMessageException } from '@nestjs/microservices/errors/invalid-message.exception';
import {
  Broker,
  ConsumeOptions,
  Consumer,
  ConsumerOptions,
  Message,
  ProduceOptions,
  Producer,
  ProducerOptions,
  stringDeserializers,
  stringSerializers,
} from '@platformatic/kafka';
import {
  connectable,
  defer,
  mergeMap,
  Observable,
  Subject,
  throwError,
} from 'rxjs';

type PlatformaticClientEvent =
  | 'client:broker:connect'
  | 'client:broker:disconnect'
  | 'client:broker:failed'
  | 'client:broker:drain';

export enum PlatformaticKafkaStatus {
  CONNECTED = 'connected',
  DISCONNECTED = 'disconnected',
  FAILED = 'failed',
}

export interface PlatformaticKafkaOptions {
  brokers: string[] | Broker[];
  clientId?: string;
  groupId?: string;
  postfixId?: string;
  forceClose?: boolean;
  consumer?: Omit<
    ConsumerOptions<any, any, any, any>,
    'clientId' | 'bootstrapBrokers'
  >;
  producer?: Omit<
    ProducerOptions<any, any, any, any>,
    'clientId' | 'bootstrapBrokers' | 'groupId'
  >;
  consumeOptions?: ConsumeOptions<any, any, any, any>;
  produceOptions?: ProduceOptions<any, any, any, any>;
  producerOnlyMode?: boolean;
}

export class PlatformaticKafkaClient extends ClientProxy<
  never,
  PlatformaticKafkaStatus
> {
  protected logger = new Logger(PlatformaticKafkaClient.name);
  protected initialized: Promise<void> | null = null;
  protected responsePatterns: string[] = [];
  protected consumerAssignments: { [key: string]: number } = {};
  protected clientId: string;
  protected groupId: string;
  protected producerOnlyMode: boolean;
  protected _consumer: Consumer<string, string, string, string> | null = null;
  protected _producer: Producer<string, string, string, string> | null = null;
  private readonly eventsMap: Record<
    PlatformaticKafkaStatus,
    PlatformaticClientEvent
  > = {
    [PlatformaticKafkaStatus.CONNECTED]: 'client:broker:connect',
    [PlatformaticKafkaStatus.DISCONNECTED]: 'client:broker:disconnect',
    [PlatformaticKafkaStatus.FAILED]: 'client:broker:failed',
  };

  get consumer(): Consumer<string, string, string, string> {
    if (!this._consumer) {
      throw new Error(
        'No consumer initialized. Please, call the "connect" method first.',
      );
    }
    return this._consumer;
  }

  get producer(): Producer<string, string, string, string> {
    if (!this._producer) {
      throw new Error(
        'No producer initialized. Please, call the "connect" method first.',
      );
    }
    return this._producer;
  }

  constructor(protected readonly options: PlatformaticKafkaOptions) {
    super();

    const postfixId = this.getOptionsProp(this.options, 'postfixId', '-client');
    this.producerOnlyMode = this.getOptionsProp(
      this.options,
      'producerOnlyMode',
      false,
    );
    this.clientId = (options.clientId ?? KAFKA_DEFAULT_CLIENT) + postfixId;
    this.groupId = (options.groupId ?? KAFKA_DEFAULT_GROUP) + postfixId;
  }

  public subscribeToResponseOf(pattern: unknown): void {
    const request = this.normalizePattern(pattern as MsPattern);
    this.responsePatterns.push(this.getResponsePatternName(request));
  }

  public async close(): Promise<void> {
    this._producer && (await this._producer.close());
    this._consumer && (await this._consumer.close());
    this._producer = null;
    this._consumer = null;
    this.initialized = null;
  }

  public async connect(): Promise<Producer<string, string, string, string>> {
    if (this.initialized) {
      return this.initialized.then(() => this._producer!);
    }
    this.initialized = new Promise(async (resolve, reject) => {
      try {
        if (!this.producerOnlyMode) {
          this._consumer = this.createConsumer();
          this.registerConsumerEventListeners();

          // Set member assignments on join and rebalance
          this._consumer.on(
            'consumer:group:join',
            this.setConsumerAssignments.bind(this),
          );
          await this.registerResponseTopicHandlers();
        }

        this._producer = this.createProducer();
        this.registerProducerEventListeners();

        resolve();
      } catch (err) {
        reject(err);
      }
    });
    return this.initialized.then(() => this._producer!);
  }

  public async registerResponseTopicHandlers(): Promise<void> {
    if (!this._consumer) {
      throw Error('No consumer initialized');
    }

    const consumeOptions = this.options.consumeOptions || {};
    if (this.responsePatterns.length === 0) {
      return;
    }
    const stream = await this._consumer!.consume({
      autocommit: true,
      sessionTimeout: 10000,
      heartbeatInterval: 500,
      topics: this.responsePatterns,
      ...consumeOptions,
    });

    stream.on('data', message => this.handleMessage(message));
  }

  public async handleMessage(payload: Message<string, string, string, string>) {
    if (isUndefined(payload.headers.get(KafkaHeaders.CORRELATION_ID))) {
      return;
    }
    const { err, response, isDisposed, id } = this.deserialize(payload);

    const callback = this.routingMap.get(id);
    if (!callback) {
      return;
    }
    if (err || isDisposed) {
      return callback({
        err,
        response,
        isDisposed,
      });
    }
    callback({
      err,
      response,
    });
  }

  public getConsumerAssignments() {
    return this.consumerAssignments;
  }

  public emitBatch<TResult = any, TInput = any>(
    pattern: any,
    data: { messages: TInput[] },
  ): Observable<TResult> {
    if (isNil(pattern) || isNil(data)) {
      return throwError(() => new InvalidMessageException());
    }
    const source = defer(async () => this.connect()).pipe(
      mergeMap(() => this.dispatchBatchEvent({ pattern, data })),
    );
    const connectableSource = connectable(source, {
      connector: () => new Subject(),
      resetOnDisconnect: false,
    });
    connectableSource.connect();
    return connectableSource;
  }

  public unwrap<T>(): T {
    return [this._consumer, this._producer] as unknown as T;
  }

  public on<
    EventKey extends string | number | symbol = string | number | symbol,
    EventCallback = any,
  >(event: EventKey, callback: EventCallback) {
    throw new Error(
      'Method not supported, register events using the "consumer" and "producer" attributes',
    );
  }

  protected registerConsumerEventListeners() {
    if (!this._consumer) {
      return;
    }
    this._consumer.on(this.eventsMap.connected, () =>
      this._status$.next(PlatformaticKafkaStatus.CONNECTED),
    );
    this._consumer.on(this.eventsMap.disconnected, () =>
      this._status$.next(PlatformaticKafkaStatus.DISCONNECTED),
    );
    this._consumer.on(this.eventsMap.failed, () =>
      this._status$.next(PlatformaticKafkaStatus.FAILED),
    );
  }

  protected registerProducerEventListeners() {
    if (!this._producer) {
      return;
    }
    this._producer.on(this.eventsMap.connected, () =>
      this._status$.next(PlatformaticKafkaStatus.CONNECTED),
    );
    this._producer.on(this.eventsMap.disconnected, () =>
      this._status$.next(PlatformaticKafkaStatus.DISCONNECTED),
    );
    this._producer.on(this.eventsMap.failed, () =>
      this._status$.next(PlatformaticKafkaStatus.FAILED),
    );
  }

  protected async dispatchBatchEvent<TInput = any>(
    packets: ReadPacket<{ messages: TInput[] }>,
  ): Promise<any> {
    if (packets.data.messages.length === 0) {
      return;
    }
    const pattern = this.normalizePattern(packets.pattern);
    return this.producer.send({
      ...this.options.produceOptions,
      messages: packets.data.messages.map(message => ({
        topic: pattern,
        value: JSON.stringify(message),
      })),
    });
  }

  protected async dispatchEvent(packet: OutgoingEvent): Promise<any> {
    const pattern = this.normalizePattern(packet.pattern);
    return this._producer!.send({
      ...this.options.produceOptions,
      messages: [
        {
          topic: pattern,
          value: JSON.stringify(packet.data),
        },
      ],
    });
  }

  protected getReplyTopicPartition(topic: string): string {
    const minimumPartition = this.consumerAssignments[topic];
    if (typeof minimumPartition === 'undefined') {
      throw new InvalidKafkaClientTopicException(topic);
    }

    // Get the minimum partition
    return minimumPartition.toString();
  }

  protected publish(
    partialPacket: ReadPacket,
    callback: (packet: WritePacket) => any,
  ): () => void {
    const packet = this.assignPacketId(partialPacket);
    this.routingMap.set(packet.id, callback);

    const cleanup = () => this.routingMap.delete(packet.id);
    const errorCallback = (err: unknown) => {
      cleanup();
      callback({ err });
    };

    try {
      const pattern = this.normalizePattern(partialPacket.pattern);
      const replyTopic = this.getResponsePatternName(pattern);
      const replyPartition = this.getReplyTopicPartition(replyTopic);
      const headers = {
        [KafkaHeaders.CORRELATION_ID]: packet.id,
        [KafkaHeaders.REPLY_TOPIC]: replyTopic,
        [KafkaHeaders.REPLY_PARTITION]: replyPartition,
      };

      this._producer
        .send({
          ...this.options.produceOptions,
          messages: [
            {
              topic: pattern,
              value: JSON.stringify(packet.data),
              headers,
            },
          ],
        })
        .catch(err => errorCallback(err));
      return cleanup;
    } catch (err) {
      errorCallback(err);
      return () => null;
    }
  }

  protected createConsumer() {
    return new Consumer({
      groupId: this.groupId,
      clientId: this.clientId,
      bootstrapBrokers: this.options.brokers,
      maxWaitTime: 1000,
      autocommit: 100,
      deserializers: stringDeserializers,
      ...this.options.consumer,
    });
  }

  protected createProducer() {
    return new Producer({
      bootstrapBrokers: this.options.brokers,
      clientId: this.clientId,
      serializers: stringSerializers,
      ...this.options.producer,
    });
  }

  protected getResponsePatternName(pattern: string): string {
    return `${pattern}.reply`;
  }

  protected setConsumerAssignments(data: {
    groupId: string;
    memberId: string;
    assignments: Array<{ topic: string; partitions: number[] }>;
    isLeader: boolean;
    generationId: number;
  }): void {
    const consumerAssignments: { [key: string]: number } = {};

    // Only need to set the minimum
    data.assignments.forEach(({ topic, partitions }) => {
      if (partitions.length) {
        consumerAssignments[topic] = Math.min(...partitions);
      }
    });

    this.consumerAssignments = consumerAssignments;
  }

  protected deserialize(
    message: Message<string, string, string, string>,
  ): IncomingResponse {
    const id = message.headers.get(KafkaHeaders.CORRELATION_ID).toString();
    if (!isUndefined(message.headers.get(KafkaHeaders.NEST_ERR))) {
      return {
        id,
        err: message.headers.get(KafkaHeaders.NEST_ERR),
        isDisposed: true,
      };
    }
    if (!isUndefined(message.headers.get(KafkaHeaders.NEST_IS_DISPOSED))) {
      return {
        id,
        response: message.value,
        isDisposed: true,
      };
    }
    return {
      id,
      response: message.value,
      isDisposed: false,
    };
  }
}

Context class (platformatic-kafka.context.ts)

Click to expand
import { BaseRpcContext } from '@nestjs/microservices/ctx-host/base-rpc.context';
import { Message } from '@platformatic/kafka';

type PlatformaticKafkaContextArgs = [
  message: Message<string, string, string, string>,
  partition: number,
  topic: string,
  headers: Message<string, string, string, string>['headers'],
];

export class PlatformaticKafkaContext extends BaseRpcContext<PlatformaticKafkaContextArgs> {
  constructor(args: PlatformaticKafkaContextArgs) {
    super(args);
  }

  /**
   * Returns the reference to the original message.
   */
  getMessage() {
    return this.args[0];
  }

  /**
   * Returns the partition.
   */
  getPartition() {
    return this.args[1];
  }

  /**
   * Returns the name of the topic.
   */
  getTopic() {
    return this.args[2];
  }

  /**
   * Returns the headers map.
   */
  getHeaders() {
    return this.args[3];
  }
}

docker-compose (if needed)

Click to expand
kafka-bare-metal:
    image: apache/kafka:3.9.0
    ports:
      - '9092:9092' # PLAIN TEXT
      - '9093:9093' # SSL
      - '9094:9094' # SASL
    volumes:
      - ./data/ssl:/var/ssl/private
      - ./data/jaas:/var/jaas
    environment:
      _JAVA_OPTIONS: '-XX:UseSVE=0'
      KAFKA_NODE_ID: 1
      KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT://:19092,MAIN://:9092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-bare-metal:19092,MAIN://localhost:9092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,MAIN:PLAINTEXT'
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-bare-metal:29093'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Enable the following to being able to test ConsumerHeartbeat
      # See: https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes
      KAFKA_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: 'classic,consumer'
      KAFKA_TRANSACTION_PARTITION_VERIFICATION_ENABLE: 'false'

Usage

Your main.ts file:

const app = await NestFactory.createMicroservice(AppModule, {
  strategy: new PlatformaticKafkaStrategy({
    brokers: ['localhost:9092'],
  }),
});

Example event handler:

Click to expand
@EventPattern('math.sum')
sum(@Payload() data: number[], @Ctx() ctx: PlatformaticKafkaContext) {
  // ^^^-- NOTE: PlatformaticKafkaContext instead of KafkaContext
  console.log(data, ctx);
}

Otherwise, everything else remains the same

Warning

Note that @platformatic/kafka does not support setting a custom partition assigner, which means that your "message handlers" (those annotated with the @MessagePattern) may lose responses every now and then (when Kafka consumer's rebalancing occurs). This package is perfectly safe to use for those that only use Kafka for async communication (using the @EventPattern) which is the primary use-case for Kafka anyway

@Forceres
Copy link

Forceres commented May 19, 2025

For those who want to use @platformatic/kafka in their projects (even though it doesn't support setting a custom partition assigner yet), you can use the following code snippets (just copy & paste them into your project)...truncated

Why not just use node-rdkafka?

@kamilmysliwiec
Copy link
Member

I don't mind implementing strategies for confluent's wrapper and rdkafka as well, if needed

@Forceres
Copy link

I don't mind implementing strategies for confluent's wrapper and rdkafka as well, if needed

I mean that node-rdkafka is much more popular solution in comparison to platformatic/kafka (better maintenance, more opportunities)

@Forceres
Copy link

confluent's wrapper is also very promising

@kamilmysliwiec
Copy link
Member

Confluent's wrapper appears to have some significant shortcomings at the moment:

Given these issues, it doesn’t seem reliable enough to use as a foundation for a transport layer, at least for now. I'd rather recommend everyone to stick to kafkajs (or consider using platformatic/kafka strategy that I've shown above)

@saimon-moore
Copy link

@kamilmysliwiec I think that a wrapper for the official confluent wrapper makes sense at this stage. Even if it's just for the brave to already try it out and start discovering kinks?

@kamilmysliwiec
Copy link
Member

@saimon-moore
Copy link

@kamilmysliwiec yeah I did see that comment and took a look at the issues you linked. I agree with your reading that better to stick with kafkajs for production (or try plataformatec) and though those issues would perhaps be an issue for a robust production-ready solution, I was thinking that I wouldn't mind exploring how the confluent wrapper performs with our preview trafiic (which given more time I would trust to be the better long term bet ) basically because I'm not really using the event hooks and would love to find issues early...

But understand your sentiment....

@Forceres
Copy link

@kamilmysliwiec yeah I did see that comment and took a look at the issues you linked. I agree with your reading that better to stick with kafkajs for production (or try plataformatec) and though those issues would perhaps be an issue for a robust production-ready solution, I was thinking that I wouldn't mind exploring how the confluent wrapper performs with our preview trafiic (which given more time I would trust to be the better long term bet ) basically because I'm not really using the event hooks and would love to find issues early...

But understand your sentiment....

node-rdkafka might be used (wrapper for Nest needed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests