Skip to content

Commit

Permalink
Add method to fetch endOffsets for consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
jferris committed Feb 8, 2019
1 parent 51a1f9b commit 95b4754
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -155,6 +155,26 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
.toList
}

/**
* Query Kafka for the end offsets for the given partitions.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets-java.util.Collection- KafkaConsumer.endOffsets]]
*/
def endOffsets(partitions: Seq[Partition]): F[Map[Partition, Long]] =
for {
offsets <- eval(
_.endOffsets(
partitions
.map(partition =>
new TopicPartition(partition.topic.name, partition.number))
.asJava))
} yield {
offsets.asScala.map {
case (partition, offset) =>
(Partition(Topic(partition.topic), partition.partition), offset: Long)
}.toMap
}

/**
* Perform an operation using the underlying KafkaConsumer and return the
* result suspended in F.
Expand Down
19 changes: 19 additions & 0 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -165,6 +165,25 @@ class ConsumerSpec extends AsyncFunSuite {
}).unsafeToFuture
}

test("endOffsets") {
val topic = Topic("fable-test")
val consumer = Consumer.resource[IO, String, String](config)

(for {
_ <- createTopic(topic.name)
offsets <- consumer.use { instance =>
for {
_ <- instance.subscribe(topic)
_ <- instance.poll
partitions <- instance.assignment
result <- instance.endOffsets(partitions)
} yield result
}
} yield {
assert(offsets.keys.toList === List(Partition(topic, 0)))
}).unsafeToFuture
}

private def createTopic(topic: String): IO[Unit] =
IO.delay {
val properties = new Properties
Expand Down

0 comments on commit 95b4754

Please sign in to comment.