From 55a266b5a14749523cf0918b3069651a1ac51f01 Mon Sep 17 00:00:00 2001 From: Emre Sahin Date: Thu, 4 Jul 2024 11:45:05 +0200 Subject: [PATCH] docs(microservices/kafka): Update commit offsets to use KafkaContext --- content/microservices/kafka.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/content/microservices/kafka.md b/content/microservices/kafka.md index 9e136ce165..fbeb3dd57a 100644 --- a/content/microservices/kafka.md +++ b/content/microservices/kafka.md @@ -435,7 +435,7 @@ throw new KafkaRetriableException('...'); #### Commit offsets -Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `ClientKafka` offers a way to manually commit offsets that work like the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing). +Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing). ```typescript @@filename() @@ -446,7 +446,8 @@ async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaC const { offset } = context.getMessage(); const partition = context.getPartition(); const topic = context.getTopic(); - await this.client.commitOffsets([{ topic, partition, offset }]) + const consumer = context.getConsumer(); + await consumer.commitOffsets([{ topic, partition, offset }]) } @@switch @Bind(Payload(), Ctx()) @@ -457,7 +458,8 @@ async handleUserCreated(data, context) { const { offset } = context.getMessage(); const partition = context.getPartition(); const topic = context.getTopic(); - await this.client.commitOffsets([{ topic, partition, offset }]) + const consumer = context.getConsumer(); + await consumer.commitOffsets([{ topic, partition, offset }]) } ```