Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable the consumer to commit an offset with metadata #1067

Merged
merged 7 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,29 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offsets.values.map(_.map(_.offset)))(forall(isSome(equalTo(nrMessages.toLong / nrPartitions))))
},
test("commits an offset with metadata") {
for {
topic <- randomTopic
group <- randomGroup
metadata <- randomThing("metadata")
client <- randomClient
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = 1))
_ <- produceOne(topic, "key", "msg")

// Consume messages
subscription = Subscription.topics(topic)
offsets <- (Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMap(_._2.map(_.offset.withMetadata(metadata)))
.take(1)
.transduce(Consumer.offsetBatches)
.take(1)
.mapZIO(_.commit)
.runDrain *>
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
Consumer.committed(Set(new TopicPartition(topic, 0))))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
} yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata)))
},
test("handle rebalancing by completing topic-partition streams") {
val nrMessages = 50
val nrPartitions = 6 // Must be even and strictly positive
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord }
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, Long] => Task[Unit],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
Expand Down Expand Up @@ -44,14 +44,15 @@ final case class CommittableRecord[K, V](
partition = record.partition(),
offset = record.offset(),
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
consumerGroupMetadata = consumerGroupMetadata,
metadata = None
)
}

object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, Long] => Task[Unit],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
Expand Down
21 changes: 16 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException }
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata, RetriableCommitFailedException }
import org.apache.kafka.common.TopicPartition
import zio.{ RIO, Schedule, Task }

sealed trait Offset {

def topic: String
def partition: Int
def offset: Long
def commit: Task[Unit]
def batch: OffsetBatch
def consumerGroupMetadata: Option[ConsumerGroupMetadata]
def withMetadata(metadata: String): Offset
flavienbert marked this conversation as resolved.
Show resolved Hide resolved

private[consumer] def metadata: Option[String]
private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull)

/**
* Attempts to commit and retries according to the given policy when the commit fails with a
Expand Down Expand Up @@ -39,9 +44,15 @@ private final case class OffsetImpl(
topic: String,
partition: Int,
offset: Long,
commitHandle: Map[TopicPartition, Long] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata],
metadata: Option[String] = None
) extends Offset {
def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset))
def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata)
def commit: Task[Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata))
def batch: OffsetBatch = OffsetBatchImpl(
Map(topicPartition -> asJavaOffsetAndMetadata),
commitHandle,
consumerGroupMetadata
)
def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata))
}
31 changes: 19 additions & 12 deletions zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.{ RIO, Schedule, Task, ZIO }

sealed trait OffsetBatch {
def offsets: Map[TopicPartition, Long]
def offsets: Map[TopicPartition, OffsetAndMetadata]
def commit: Task[Unit]
def add(offset: Offset): OffsetBatch
@deprecated("Use add(Offset) instead", "2.1.4")
Expand All @@ -28,35 +28,42 @@ object OffsetBatch {
}

private final case class OffsetBatchImpl(
offsets: Map[TopicPartition, Long],
commitHandle: Map[TopicPartition, Long] => Task[Unit],
offsets: Map[TopicPartition, OffsetAndMetadata],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
) extends OffsetBatch {
override def commit: Task[Unit] = commitHandle(offsets)

override def add(offset: Offset): OffsetBatch =
override def add(offset: Offset): OffsetBatch = {
val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match {
case Some(existing) if existing.offset > offset.offset => existing
case _ => offset.asJavaOffsetAndMetadata
}

copy(
offsets = offsets + (offset.topicPartition -> (offsets
.getOrElse(offset.topicPartition, -1L) max offset.offset))
offsets = offsets + (offset.topicPartition -> maxOffsetAndMetadata)
)
}

override def merge(offset: Offset): OffsetBatch = add(offset)

override def merge(otherOffsets: OffsetBatch): OffsetBatch = {
val newOffsets = Map.newBuilder[TopicPartition, Long]
val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata]
newOffsets ++= offsets
otherOffsets.offsets.foreach { case (tp, offset) =>
val existing = offsets.getOrElse(tp, -1L)
if (existing < offset)
newOffsets += tp -> offset
val laterOffset = offsets.get(tp) match {
case Some(existing) => if (existing.offset < offset.offset) offset else existing
case None => offset
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
}
newOffsets += tp -> laterOffset
}

copy(offsets = newOffsets.result())
}
}

case object EmptyOffsetBatch extends OffsetBatch {
override val offsets: Map[TopicPartition, Long] = Map.empty
override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty
override val commit: Task[Unit] = ZIO.unit
override def add(offset: Offset): OffsetBatch = offset.batch
override def merge(offset: Offset): OffsetBatch = add(offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object DiagnosticEvent {

sealed trait Commit extends DiagnosticEvent
object Commit {
final case class Started(offsets: Map[TopicPartition, Long]) extends Commit
final case class Started(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit
final case class Success(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit
final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[consumer] final class Runloop private (
}

/** This is the implementation behind the user facing api `Offset.commit`. */
private val commit: Map[TopicPartition, Long] => Task[Unit] =
private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] =
offsets =>
for {
p <- Promise.make[Throwable, Unit]
Expand All @@ -127,16 +127,21 @@ private[consumer] final class Runloop private (
commits: Chunk[RunloopCommand.Commit]
): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = {
val offsets = commits
.foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) =>
.foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset))
acc += (tp -> acc
.get(tp)
.map(current => if (current.offset() > offset.offset()) current else offset)
.getOrElse(offset))
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
}
acc
}
.toMap
val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val offsetsWithMetaData = offsets.map { case (tp, offset) =>
tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata)
}
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription }
Expand All @@ -19,7 +20,8 @@ object RunloopCommand {
case object StopRunloop extends Control
case object StopAllStreams extends StreamCommand

final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends RunloopCommand {
final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit])
extends RunloopCommand {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object TransactionalProducer {
case Some(consumerGroupMetadata) =>
val offsets: util.Map[TopicPartition, OffsetAndMetadata] =
offsetBatch.offsets.map { case (topicPartition, offset) =>
topicPartition -> new OffsetAndMetadata(offset + 1)
topicPartition -> new OffsetAndMetadata(offset.offset + 1, offset.metadata)
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
}.asJava

ZIO.attemptBlocking(live.p.sendOffsetsToTransaction(offsets, consumerGroupMetadata))
Expand Down
Loading