From c19c29f673e08f9911c31180ef6e99c10c253c39 Mon Sep 17 00:00:00 2001 From: Benjamin Sproule Date: Thu, 9 Mar 2017 09:14:52 +0000 Subject: [PATCH 1/3] Updated to allow passing in maximum number of attempts and poll timeout --- build.sbt | 17 +++--- .../embeddedkafka/ConsumerExtensions.scala | 26 +++++---- .../manub/embeddedkafka/ConsumerOpsTest.scala | 53 +++++++++++++++++++ 3 files changed, 77 insertions(+), 19 deletions(-) create mode 100644 embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala diff --git a/build.sbt b/build.sbt index bd9bb98..3fc5285 100644 --- a/build.sbt +++ b/build.sbt @@ -20,13 +20,13 @@ lazy val commonSettings = Seq( ) lazy val commonLibrarySettings = libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % "3.0.1", - "org.apache.kafka" %% "kafka" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact), - "org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude (slf4jLog4jOrg, slf4jLog4jArtifact), - "org.apache.avro" % "avro" % "1.8.1" exclude (slf4jLog4jOrg, slf4jLog4jArtifact), - "com.typesafe.akka" %% "akka-actor" % akkaVersion % Test, - "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test - ) + "org.scalatest" %% "scalatest" % "3.0.1", + "org.apache.kafka" %% "kafka" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact), + "org.apache.zookeeper" % "zookeeper" % "3.4.8" exclude(slf4jLog4jOrg, slf4jLog4jArtifact), + "org.apache.avro" % "avro" % "1.8.1" exclude(slf4jLog4jOrg, slf4jLog4jArtifact), + "com.typesafe.akka" %% "akka-actor" % akkaVersion % Test, + "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test +) lazy val publishSettings = Seq( licenses += ("MIT", url("http://opensource.org/licenses/MIT")), @@ -69,6 +69,7 @@ lazy val embeddedKafka = (project in file("embedded-kafka")) .settings(publishSettings: _*) .settings(commonSettings: _*) .settings(commonLibrarySettings) + .settings(libraryDependencies += "org.mockito" % "mockito-core" % "1.8.5" % Test) .settings(releaseSettings: _*) lazy val kafkaStreams = (project in file("kafka-streams")) @@ -78,6 +79,6 @@ lazy val kafkaStreams = (project in file("kafka-streams")) .settings(commonLibrarySettings) .settings(releaseSettings: _*) .settings(libraryDependencies ++= Seq( - "org.apache.kafka" % "kafka-streams" % kafkaVersion exclude (slf4jLog4jOrg, slf4jLog4jArtifact) + "org.apache.kafka" % "kafka-streams" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact) )) .dependsOn(embeddedKafka) diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala index a8f6d84..742b873 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/ConsumerExtensions.scala @@ -6,9 +6,9 @@ import org.apache.log4j.Logger import scala.util.Try -/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing.*/ +/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing. */ object ConsumerExtensions { - val MaximumAttempts = 3 + implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) { private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]]) @@ -18,14 +18,16 @@ object ConsumerExtensions { * to consume batches from the given topic, until it reaches the number of desired messages or * return otherwise. * - * @param topic the topic from which to consume messages + * @param topic the topic from which to consume messages + * @param maximumAttempts the maximum number of attempts to try and get the batch (defaults to 3) + * @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available (defaults to 2000) * @return the stream of consumed messages that you can do `.take(n: Int).toList` * to evaluate the requested number of messages. */ - def consumeLazily(topic: String): Stream[(K, V)] = { - val attempts = 1 to MaximumAttempts + def consumeLazily(topic: String, maximumAttempts: Int = 3, poll: Long = 2000): Stream[(K, V)] = { + val attempts = 1 to maximumAttempts attempts.toStream.flatMap { attempt => - val batch: Seq[(K, V)] = getNextBatch(topic) + val batch: Seq[(K, V)] = getNextBatch(topic, poll) logger.debug(s"----> Batch $attempt ($topic) | ${batch.mkString("|")}") batch } @@ -34,18 +36,20 @@ object ConsumerExtensions { /** Get the next batch of messages from Kafka. * * @param topic the topic to consume + * @param poll the amount of time, in milliseconds, to wait in the buffer for any messages to be available * @return the next batch of messages */ - def getNextBatch(topic: String): Seq[(K, V)] = + private def getNextBatch(topic: String, poll: Long): Seq[(K, V)] = Try { - import scala.collection.JavaConversions._ - consumer.subscribe(List(topic)) + import scala.collection.JavaConverters._ + consumer.subscribe(List(topic).asJava) consumer.partitionsFor(topic) - val records = consumer.poll(2000) + val records = consumer.poll(poll) // use toList to force eager evaluation. toSeq is lazy - records.iterator().toList.map(r => r.key -> r.value) + records.iterator().asScala.toList.map(r => r.key -> r.value) }.recover { case ex: KafkaException => throw new KafkaUnavailableException(ex) }.get } + } diff --git a/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala b/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala new file mode 100644 index 0000000..3b2c8d8 --- /dev/null +++ b/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala @@ -0,0 +1,53 @@ +package net.manub.embeddedkafka + +import net.manub.embeddedkafka.ConsumerExtensions._ +import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} +import org.apache.kafka.common.TopicPartition +import org.mockito.Mockito.{times, verify, when} +import org.scalatest.mockito.MockitoSugar + +import scala.collection.JavaConverters._ + +class ConsumerOpsTest extends EmbeddedKafkaSpecSupport with MockitoSugar { + + "ConsumeLazily " should { + "retry to get messages with the configured maximum number of attempts when poll fails" in { + val consumer: KafkaConsumer[String, String] = mock[KafkaConsumer[String, String]] + val consumerRecords = new ConsumerRecords[String, String](mapAsJavaMap(Map.empty)) + + when(consumer.poll(1)).thenReturn(consumerRecords) + + val maximumAttempts = 2 + consumer.consumeLazily("topic", maximumAttempts, 1) + + verify(consumer, times(maximumAttempts)).poll(1) + } + + "not retry to get messages with the configured maximum number of attempts when poll succeeds" in { + val consumer: KafkaConsumer[String, String] = mock[KafkaConsumer[String, String]] + val consumerRecord: ConsumerRecord[String, String] = mock[ConsumerRecord[String, String]] + val consumerRecords = new ConsumerRecords[String, String]( + mapAsJavaMap(Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava)) + ) + + when(consumer.poll(1)).thenReturn(consumerRecords) + + consumer.consumeLazily("topic", 1, 1) + + verify(consumer).poll(1) + } + + "poll to get messages with the configured poll timeout" in { + val consumer: KafkaConsumer[String, String] = mock[KafkaConsumer[String, String]] + val consumerRecords = new ConsumerRecords[String, String](mapAsJavaMap(Map.empty)) + + val pollTimeout = 10 + when(consumer.poll(pollTimeout)).thenReturn(consumerRecords) + + consumer.consumeLazily("topic", 1, pollTimeout) + + verify(consumer).poll(pollTimeout) + } + } + +} From a5913983744266b96542cea80974eeb4f627f980 Mon Sep 17 00:00:00 2001 From: Benjamin Sproule Date: Thu, 9 Mar 2017 09:32:06 +0000 Subject: [PATCH 2/3] Updated after code review comments (upgraded mockito and minor refactoring) --- build.sbt | 2 +- ...merOpsTest.scala => ConsumerOpsSpec.scala} | 28 +++++++++++-------- 2 files changed, 17 insertions(+), 13 deletions(-) rename embedded-kafka/src/test/java/net/manub/embeddedkafka/{ConsumerOpsTest.scala => ConsumerOpsSpec.scala} (59%) diff --git a/build.sbt b/build.sbt index 3fc5285..ed587a2 100644 --- a/build.sbt +++ b/build.sbt @@ -69,7 +69,7 @@ lazy val embeddedKafka = (project in file("embedded-kafka")) .settings(publishSettings: _*) .settings(commonSettings: _*) .settings(commonLibrarySettings) - .settings(libraryDependencies += "org.mockito" % "mockito-core" % "1.8.5" % Test) + .settings(libraryDependencies += "org.mockito" % "mockito-core" % "2.7.14" % Test) .settings(releaseSettings: _*) lazy val kafkaStreams = (project in file("kafka-streams")) diff --git a/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala b/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala similarity index 59% rename from embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala rename to embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala index 3b2c8d8..ffa9c94 100644 --- a/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsTest.scala +++ b/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala @@ -8,43 +8,47 @@ import org.scalatest.mockito.MockitoSugar import scala.collection.JavaConverters._ -class ConsumerOpsTest extends EmbeddedKafkaSpecSupport with MockitoSugar { +class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar { "ConsumeLazily " should { "retry to get messages with the configured maximum number of attempts when poll fails" in { - val consumer: KafkaConsumer[String, String] = mock[KafkaConsumer[String, String]] + val consumer = mock[KafkaConsumer[String, String]] val consumerRecords = new ConsumerRecords[String, String](mapAsJavaMap(Map.empty)) - when(consumer.poll(1)).thenReturn(consumerRecords) + val pollTimeout = 1 + when(consumer.poll(pollTimeout)).thenReturn(consumerRecords) val maximumAttempts = 2 - consumer.consumeLazily("topic", maximumAttempts, 1) + consumer.consumeLazily("topic", maximumAttempts, pollTimeout) - verify(consumer, times(maximumAttempts)).poll(1) + verify(consumer, times(maximumAttempts)).poll(pollTimeout) } "not retry to get messages with the configured maximum number of attempts when poll succeeds" in { - val consumer: KafkaConsumer[String, String] = mock[KafkaConsumer[String, String]] - val consumerRecord: ConsumerRecord[String, String] = mock[ConsumerRecord[String, String]] + val consumer = mock[KafkaConsumer[String, String]] + val consumerRecord = mock[ConsumerRecord[String, String]] val consumerRecords = new ConsumerRecords[String, String]( mapAsJavaMap(Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava)) ) - when(consumer.poll(1)).thenReturn(consumerRecords) + val pollTimeout = 1 + when(consumer.poll(pollTimeout)).thenReturn(consumerRecords) - consumer.consumeLazily("topic", 1, 1) + val maximumAttempts = 2 + consumer.consumeLazily("topic", maximumAttempts, pollTimeout) - verify(consumer).poll(1) + verify(consumer).poll(pollTimeout) } "poll to get messages with the configured poll timeout" in { - val consumer: KafkaConsumer[String, String] = mock[KafkaConsumer[String, String]] + val consumer = mock[KafkaConsumer[String, String]] val consumerRecords = new ConsumerRecords[String, String](mapAsJavaMap(Map.empty)) val pollTimeout = 10 when(consumer.poll(pollTimeout)).thenReturn(consumerRecords) - consumer.consumeLazily("topic", 1, pollTimeout) + val maximumAttempts = 1 + consumer.consumeLazily("topic", maximumAttempts, pollTimeout) verify(consumer).poll(pollTimeout) } From 983c7e7642e2267330f616e3aedb744943af6fe2 Mon Sep 17 00:00:00 2001 From: Benjamin Sproule Date: Thu, 9 Mar 2017 20:53:43 +0000 Subject: [PATCH 3/3] Changed mapAsJavaMap to asJava, so it can compile in 2.11 without using a deprecated object --- .../java/net/manub/embeddedkafka/ConsumerOpsSpec.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala b/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala index ffa9c94..4d5ba43 100644 --- a/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala +++ b/embedded-kafka/src/test/java/net/manub/embeddedkafka/ConsumerOpsSpec.scala @@ -13,7 +13,8 @@ class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar { "ConsumeLazily " should { "retry to get messages with the configured maximum number of attempts when poll fails" in { val consumer = mock[KafkaConsumer[String, String]] - val consumerRecords = new ConsumerRecords[String, String](mapAsJavaMap(Map.empty)) + val consumerRecords = + new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava) val pollTimeout = 1 when(consumer.poll(pollTimeout)).thenReturn(consumerRecords) @@ -28,7 +29,7 @@ class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar { val consumer = mock[KafkaConsumer[String, String]] val consumerRecord = mock[ConsumerRecord[String, String]] val consumerRecords = new ConsumerRecords[String, String]( - mapAsJavaMap(Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava)) + Map[TopicPartition, java.util.List[ConsumerRecord[String, String]]](new TopicPartition("topic", 1) -> List(consumerRecord).asJava).asJava ) val pollTimeout = 1 @@ -42,7 +43,8 @@ class ConsumerOpsSpec extends EmbeddedKafkaSpecSupport with MockitoSugar { "poll to get messages with the configured poll timeout" in { val consumer = mock[KafkaConsumer[String, String]] - val consumerRecords = new ConsumerRecords[String, String](mapAsJavaMap(Map.empty)) + val consumerRecords = + new ConsumerRecords[String, String](Map.empty[TopicPartition, java.util.List[ConsumerRecord[String, String]]].asJava) val pollTimeout = 10 when(consumer.poll(pollTimeout)).thenReturn(consumerRecords)