From 77c9bc675e11de03d489d54b72fedc3436a620f2 Mon Sep 17 00:00:00 2001 From: rmathew1992 Date: Fri, 8 Mar 2019 08:51:25 -0800 Subject: [PATCH] Use Mock Consumer (#2) We have begun to use MockConsumer in a future PR we will incorporate it into all of our tests but wanted to get some feedback on this approach. Co-Authored-By: Derrick Carr --- src/main/scala/fable/Consumer.scala | 5 +- src/test/scala/fable/ConsumerSpec.scala | 62 ++++++++++++++++++++----- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/src/main/scala/fable/Consumer.scala b/src/main/scala/fable/Consumer.scala index cd1a968..95d1bde 100644 --- a/src/main/scala/fable/Consumer.scala +++ b/src/main/scala/fable/Consumer.scala @@ -5,6 +5,7 @@ import cats.implicits._ import cats.Monad import fs2.Stream import io.chrisdavenport.log4cats.{slf4j, Logger} +import org.apache.kafka import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition import scala.collection.JavaConverters._ @@ -46,7 +47,7 @@ import scala.collection.JavaConverters._ */ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] ( config: Config.Consumer, - kafkaConsumer: KafkaConsumer[K, V]) { + kafkaConsumer: kafka.clients.consumer.Consumer[K, V]) { /** * Continuously [[poll]] Kafka for new records. @@ -205,7 +206,7 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] ( * * @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html]] for available methods */ - def eval[A](f: KafkaConsumer[K, V] => A): F[A] = + def eval[A](f: kafka.clients.consumer.Consumer[K, V] => A): F[A] = ContextShift[F].evalOn(executionContext)(Sync[F].delay(f(kafkaConsumer))) private val executionContext = diff --git a/src/test/scala/fable/ConsumerSpec.scala b/src/test/scala/fable/ConsumerSpec.scala index d60df03..0e988f1 100644 --- a/src/test/scala/fable/ConsumerSpec.scala +++ b/src/test/scala/fable/ConsumerSpec.scala @@ -10,30 +10,34 @@ import org.apache.kafka.clients.producer.{ ProducerConfig, ProducerRecord } +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringSerializer import org.scalatest.AsyncFunSuite +import org.apache.kafka.clients.consumer.{MockConsumer, OffsetResetStrategy} import scala.concurrent.ExecutionContext +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ class ConsumerSpec extends AsyncFunSuite { implicit val contextShift = IO.contextShift(implicitly[ExecutionContext]) test("poll") { val topic = Topic("fable-test-example") - val consumer = Consumer.resource[IO, String, String](config) + val offset = 1.toLong + val topicPartition: TopicPartition = new TopicPartition(topic.name, 1) + val mockConsumer = createMockConsumer(offset, topicPartition) + val record = createRecordMock(mockConsumer, + topic, + topicPartition, + offset, + ("one" -> "1")) + val consumer = new Consumer[IO, String, String](config, mockConsumer) (for { - _ <- createTopic(topic.name) - _ <- sendRecords(topic.name, "one" -> "1", "two" -> "2") - records <- consumer.use { instance => - for { - _ <- instance.subscribe(topic) - records <- instance.poll - } yield records.toSeq - } + records <- consumer.poll } yield { - assert( - records.map(record => (record.key, record.value)) === Seq("one" -> "1", - "two" -> "2")) + assert(records.toSeq.head.key === "one") + assert(records.toSeq.head.value === "1") }).unsafeToFuture } @@ -253,5 +257,39 @@ class ConsumerSpec extends AsyncFunSuite { producer.close } + private def createMockConsumer(offset: Long, topicPartition: TopicPartition): MockConsumer[String, String] = { + val mockConsumer = + new MockConsumer[String, String](OffsetResetStrategy.EARLIEST) + val partitionMap = + Map(topicPartition -> offset.asInstanceOf[java.lang.Long]).asJava + + mockConsumer.subscribe(Seq(topicPartition.topic).asJava) + mockConsumer.rebalance(ArrayBuffer(topicPartition).asJava) + mockConsumer.updateBeginningOffsets(partitionMap) + mockConsumer.updateEndOffsets(partitionMap) + + mockConsumer + } + + private def createRecordMock(mockConsumer: MockConsumer[String, String], + topic: Topic, + topicPartition: TopicPartition, + offset: Long, + records: (String, String)*) = { + for { + record <- records + } yield { + mockConsumer.addRecord( + new ConsumerRecord( + topic.name, + topicPartition.partition, + offset, + record._1, + record._2 + ) + ) + } + } + val config = TestConfig.consumer }