Skip to content

Commit

Permalink
Add method for fetching current offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
jferris committed Feb 8, 2019
1 parent c7b25a5 commit 0de8af4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -105,6 +105,15 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
.traverse(topic => logger.info(s"Subscribed to ${topic.name}"))
.void

/**
* Fetch the current offset for this consumer for the given topic and
* partition.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#position-org.apache.kafka.common.TopicPartition- KafkaConsumer.position]]
*/
def position(partition: Partition): F[Long] =
eval(_.position(new TopicPartition(partition.topic.name, partition.number)))

/**
* Fetch information about partitions for a specific topic.
*
Expand Down
19 changes: 19 additions & 0 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -146,6 +146,25 @@ class ConsumerSpec extends AsyncFunSuite {
}).unsafeToFuture
}

test("position") {
val topic = Topic("fable-test")
val consumer = Consumer.resource[IO, String, String](config)

(for {
_ <- createTopic(topic.name)
position <- consumer.use { instance =>
for {
_ <- instance.subscribe(topic)
_ <- instance.poll
partitions <- instance.assignment
result <- instance.position(partitions.head)
} yield result
}
} yield {
assert(position >= 0)
}).unsafeToFuture
}

private def createTopic(topic: String): IO[Unit] =
IO.delay {
val properties = new Properties
Expand Down

0 comments on commit 0de8af4

Please sign in to comment.