Skip to content

Commit

Permalink
Use Mock Consumer (#2)
Browse files Browse the repository at this point in the history
We have begun to use MockConsumer in a future PR we will incorporate it into all of our tests but wanted to get some feedback on this approach.

Co-Authored-By: Derrick Carr <derrick@thoughtbot.com>
  • Loading branch information
newuser1992 and Derrick Carr committed Mar 8, 2019
1 parent a31fb86 commit 77c9bc6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 14 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/fable/Consumer.scala
Expand Up @@ -5,6 +5,7 @@ import cats.implicits._
import cats.Monad
import fs2.Stream
import io.chrisdavenport.log4cats.{slf4j, Logger}
import org.apache.kafka
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -46,7 +47,7 @@ import scala.collection.JavaConverters._
*/
class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
config: Config.Consumer,
kafkaConsumer: KafkaConsumer[K, V]) {
kafkaConsumer: kafka.clients.consumer.Consumer[K, V]) {

/**
* Continuously [[poll]] Kafka for new records.
Expand Down Expand Up @@ -205,7 +206,7 @@ class Consumer[F[_]: ContextShift: Monad: Sync, K, V] private[fable] (
*
* @see [[https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html]] for available methods
*/
def eval[A](f: KafkaConsumer[K, V] => A): F[A] =
def eval[A](f: kafka.clients.consumer.Consumer[K, V] => A): F[A] =
ContextShift[F].evalOn(executionContext)(Sync[F].delay(f(kafkaConsumer)))

private val executionContext =
Expand Down
62 changes: 50 additions & 12 deletions src/test/scala/fable/ConsumerSpec.scala
Expand Up @@ -10,30 +10,34 @@ import org.apache.kafka.clients.producer.{
ProducerConfig,
ProducerRecord
}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringSerializer
import org.scalatest.AsyncFunSuite
import org.apache.kafka.clients.consumer.{MockConsumer, OffsetResetStrategy}
import scala.concurrent.ExecutionContext
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._

class ConsumerSpec extends AsyncFunSuite {
implicit val contextShift = IO.contextShift(implicitly[ExecutionContext])

test("poll") {
val topic = Topic("fable-test-example")
val consumer = Consumer.resource[IO, String, String](config)
val offset = 1.toLong
val topicPartition: TopicPartition = new TopicPartition(topic.name, 1)
val mockConsumer = createMockConsumer(offset, topicPartition)
val record = createRecordMock(mockConsumer,
topic,
topicPartition,
offset,
("one" -> "1"))
val consumer = new Consumer[IO, String, String](config, mockConsumer)

(for {
_ <- createTopic(topic.name)
_ <- sendRecords(topic.name, "one" -> "1", "two" -> "2")
records <- consumer.use { instance =>
for {
_ <- instance.subscribe(topic)
records <- instance.poll
} yield records.toSeq
}
records <- consumer.poll
} yield {
assert(
records.map(record => (record.key, record.value)) === Seq("one" -> "1",
"two" -> "2"))
assert(records.toSeq.head.key === "one")
assert(records.toSeq.head.value === "1")
}).unsafeToFuture
}

Expand Down Expand Up @@ -253,5 +257,39 @@ class ConsumerSpec extends AsyncFunSuite {
producer.close
}

private def createMockConsumer(offset: Long, topicPartition: TopicPartition): MockConsumer[String, String] = {
val mockConsumer =
new MockConsumer[String, String](OffsetResetStrategy.EARLIEST)
val partitionMap =
Map(topicPartition -> offset.asInstanceOf[java.lang.Long]).asJava

mockConsumer.subscribe(Seq(topicPartition.topic).asJava)
mockConsumer.rebalance(ArrayBuffer(topicPartition).asJava)
mockConsumer.updateBeginningOffsets(partitionMap)
mockConsumer.updateEndOffsets(partitionMap)

mockConsumer
}

private def createRecordMock(mockConsumer: MockConsumer[String, String],
topic: Topic,
topicPartition: TopicPartition,
offset: Long,
records: (String, String)*) = {
for {
record <- records
} yield {
mockConsumer.addRecord(
new ConsumerRecord(
topic.name,
topicPartition.partition,
offset,
record._1,
record._2
)
)
}
}

val config = TestConfig.consumer
}

0 comments on commit 77c9bc6

Please sign in to comment.