Skip to content

Commit

Permalink
Add Consumer.seek
Browse files Browse the repository at this point in the history
This is used to resume polling from an externally committed offset.
  • Loading branch information
jferris committed Mar 28, 2019
1 parent 00bcf92 commit 7961254
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 10 deletions.
10 changes: 10 additions & 0 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -195,6 +195,16 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
}.toMap
}

/**
* Overrides the fetch offsets that the consumer will use on the next poll.
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seek-org.apache.kafka.common.TopicPartition-long-]]
*/
def seek(partition: Partition, offset: Long): F[Unit] =
eval(
_.seek(new TopicPartition(partition.topic.name, partition.number),
offset))

/**
* Perform an operation using the underlying KafkaConsumer and return the
* result suspended in F.
Expand Down
42 changes: 32 additions & 10 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -29,8 +29,7 @@ class ConsumerSpec extends AsyncFunSuite {
val record = createRecordMock(mockConsumer,
topic,
topicPartition,
offset,
("one" -> "1"))
(offset, "one", "1"))
val consumer = new Consumer[IO, String, String](config, mockConsumer)

(for {
Expand Down Expand Up @@ -210,6 +209,30 @@ class ConsumerSpec extends AsyncFunSuite {
}).unsafeToFuture
}

test("seek") {
val topic = Topic("fable-test-example")
val firstOffset = 1.toLong
val secondOffset = 2.toLong
val partition = 1
val topicPartition: TopicPartition =
new TopicPartition(topic.name, partition)
val mockConsumer = createMockConsumer(firstOffset, topicPartition)
createRecordMock(mockConsumer,
topic,
topicPartition,
(firstOffset, "1", "First"),
(secondOffset, "2", "Second"))
val consumer = new Consumer[IO, String, String](config, mockConsumer)

(for {
_ <- consumer.subscribe(topic)
_ <- consumer.seek(Partition(topic, partition), secondOffset)
records <- consumer.poll
} yield {
assert(records.toSeq.map(_.value) === List("Second"))
}).unsafeToFuture
}

private def createTopic(topic: String): IO[Unit] =
IO.delay {
val properties = new Properties
Expand Down Expand Up @@ -257,7 +280,9 @@ class ConsumerSpec extends AsyncFunSuite {
producer.close
}

private def createMockConsumer(offset: Long, topicPartition: TopicPartition): MockConsumer[String, String] = {
private def createMockConsumer(
offset: Long,
topicPartition: TopicPartition): MockConsumer[String, String] = {
val mockConsumer =
new MockConsumer[String, String](OffsetResetStrategy.EARLIEST)
val partitionMap =
Expand All @@ -274,18 +299,15 @@ class ConsumerSpec extends AsyncFunSuite {
private def createRecordMock(mockConsumer: MockConsumer[String, String],
topic: Topic,
topicPartition: TopicPartition,
offset: Long,
records: (String, String)*) = {
for {
record <- records
} yield {
records: (Long, String, String)*): Unit = {
for (record <- records) {
mockConsumer.addRecord(
new ConsumerRecord(
topic.name,
topicPartition.partition,
offset,
record._1,
record._2
record._2,
record._3
)
)
}
Expand Down

0 comments on commit 7961254

Please sign in to comment.