Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions content/microservices/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ throw new KafkaRetriableException('...');

#### Commit offsets

Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `ClientKafka` offers a way to manually commit offsets that work like the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).
Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).

```typescript
@@filename()
Expand All @@ -446,7 +446,8 @@ async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaC
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await this.client.commitOffsets([{ topic, partition, offset }])
const consumer = context.getConsumer();
await consumer.commitOffsets([{ topic, partition, offset }])
}
@@switch
@Bind(Payload(), Ctx())
Expand All @@ -457,7 +458,8 @@ async handleUserCreated(data, context) {
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await this.client.commitOffsets([{ topic, partition, offset }])
const consumer = context.getConsumer();
await consumer.commitOffsets([{ topic, partition, offset }])
}
```

Expand Down