Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documents usage of transactional producer #440

Closed
wants to merge 5 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion docs/gettingStarted.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Consumer.subscribeAnd(Subscription.topics("topic150"))

## Example: consuming, producing and committing offset

This example shows how to consume messages from topic `topic_a` and produce transformed messages to `topic_b`, after which consumer offsets are committed. Processing is done in chunks using `ZStreamChunk` for more efficiency.
This example shows how to consume messages from topic `my-input-topic` and produce transformed messages to `my-output-topic`, after which consumer offsets are committed. Processing is done in chunks using `ZStreamChunk` for more efficiency.

```scala
import zio.ZLayer
Expand Down Expand Up @@ -147,6 +147,55 @@ val consumeProduceStream = Consumer
.provideSomeLayer(consumerAndProducer)
```

## Example: consuming, producing and committing offset transactionally

This example does the same as the previous, except that messages are produced in a transaction. A transactional message is committed by committing the corresponding consumer offset. Consumers that use the isolation strategy "read-committed", will only read committed messages.

Messages are consumed from topic `my-input-topic` and produced to `my-output-topic`. Also here processing is done in chunks for more efficiency.

```scala
import zio.Chunk
import zio.ZLayer
import zio.kafka.consumer._
import zio.kafka.producer._
import zio.kafka.serde._
import org.apache.kafka.clients.producer.ProducerRecord

val consumerSettings: ConsumerSettings =
ConsumerSettings(List("localhost:9092"))
.withGroupId("group")
// Prevent commit problems after a re-balance (see https://github.com/zio/zio-kafka/pull/427)
.withRestartStreamOnRebalancing(true)
val producerSettings: TransactionalProducerSettings =
TransactionalProducerSettings(List("localhost:9092"), "transaction-1")

val consumerAndProducer =
ZLayer.fromManaged(Consumer.make(consumerSettings)) ++
ZLayer.fromManaged(TransactionalProducer.make(producerSettings))

val consumeProduceStream = Consumer
.subscribeAnd(Subscription.topics("my-input-topic"))
.plainStream(Serde.int, Serde.long)
.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = value.toString

val producerRecord: ProducerRecord[Int, String] = new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
.mapChunksZIO { chunk =>
val records = chunk.map(_._1)
val offsetBatch = OffsetBatch(chunk.map(_._2).toSeq)

TransactionalProducer.createTransaction.use { t =>
t.produceChunkBatch[Any, Int, String](records, Serde.int, Serde.string, offsetBatch) *>
offsetBatch.commit.as(Chunk(()))
}
.runDrain
.provideSomeLayer(consumerAndProducer)
```

## Partition assignment and offset retrieval

`zio-kafka` offers several ways to control which Kafka topics and partitions are assigned to your application.
Expand Down