From 2bbc0a0a10c40e83d5b1e9e0cbe676d7b19bdc79 Mon Sep 17 00:00:00 2001 From: marcin Date: Fri, 24 Feb 2017 12:12:04 +0100 Subject: [PATCH 1/4] Unit test for publishing message bigger than default 1MB --- .../manub/embeddedkafka/EmbeddedKafka.scala | 2 +- .../embeddedkafka/EmbeddedKafkaConfig.scala | 3 ++- .../EmbeddedKafkaCustomConfigSpec.scala | 22 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala index 210db9f..6ff89d2 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala @@ -209,7 +209,7 @@ sealed trait EmbeddedKafkaSupport { ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}", ProducerConfig.MAX_BLOCK_MS_CONFIG -> 10000.toString, ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString - ) + ) ++ config.customProducerProperties private def baseConsumerConfig(implicit config: EmbeddedKafkaConfig) : Properties = { val props = new Properties() diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala index aeced8c..a8a3904 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala @@ -2,7 +2,8 @@ package net.manub.embeddedkafka case class EmbeddedKafkaConfig(kafkaPort: Int = 6001, zooKeeperPort: Int = 6000, - customBrokerProperties: Map[String, String] = Map.empty) + customBrokerProperties: Map[String, String] = Map.empty, + customProducerProperties: Map[String, String] = Map.empty) object EmbeddedKafkaConfig { implicit val defaultConfig = EmbeddedKafkaConfig() diff --git a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala new file mode 100644 index 0000000..ac017a6 --- /dev/null +++ b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala @@ -0,0 +1,22 @@ +package net.manub.embeddedkafka + +import scala.language.postfixOps + +class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka { + + "the custom config" should { + "should allow pass additional producer parameters" in { + val customProducerConfig = Map("" -> "") + implicit val customKafkaConfig = EmbeddedKafkaConfig(customProducerProperties = customProducerConfig) + val bigMessage = generateMessageOfLength(2000000) + val topic = "big-message-topic" + + withRunningKafka { + publishStringMessageToKafka(topic, bigMessage) + consumeFirstStringMessageFrom(topic) shouldBe bigMessage + } + } + } + + def generateMessageOfLength(length: Int): String = Stream.continually(util.Random.nextPrintableChar) take length mkString +} \ No newline at end of file From 50af3b021c792a2d6863e9f4aa50ccfcb0259b47 Mon Sep 17 00:00:00 2001 From: kalondar Date: Fri, 24 Feb 2017 20:47:11 +0100 Subject: [PATCH 2/4] Add custom producer and consumer properties --- .../manub/embeddedkafka/EmbeddedKafka.scala | 1 + .../embeddedkafka/EmbeddedKafkaConfig.scala | 3 ++- .../EmbeddedKafkaCustomConfigSpec.scala | 18 +++++++++++++++--- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala index 6ff89d2..cca278f 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala @@ -217,6 +217,7 @@ sealed trait EmbeddedKafkaSupport { props.put("bootstrap.servers", s"localhost:${config.kafkaPort}") props.put("auto.offset.reset", "earliest") props.put("enable.auto.commit", "false") + props.putAll(config.customConsumerProperties) props } diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala index a8a3904..d656dee 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala @@ -3,7 +3,8 @@ package net.manub.embeddedkafka case class EmbeddedKafkaConfig(kafkaPort: Int = 6001, zooKeeperPort: Int = 6000, customBrokerProperties: Map[String, String] = Map.empty, - customProducerProperties: Map[String, String] = Map.empty) + customProducerProperties: Map[String, String] = Map.empty, + customConsumerProperties: Map[String, String] = Map.empty) object EmbeddedKafkaConfig { implicit val defaultConfig = EmbeddedKafkaConfig() diff --git a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala index ac017a6..41c1394 100644 --- a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala +++ b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala @@ -3,12 +3,24 @@ package net.manub.embeddedkafka import scala.language.postfixOps class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka { + val TWO_MEGABYTES = 2097152 + val THREE_MEGABYTES = 3145728 "the custom config" should { "should allow pass additional producer parameters" in { - val customProducerConfig = Map("" -> "") - implicit val customKafkaConfig = EmbeddedKafkaConfig(customProducerProperties = customProducerConfig) - val bigMessage = generateMessageOfLength(2000000) + val customBrokerConfig = Map( + "replica.fetch.max.bytes" -> s"$THREE_MEGABYTES", + "message.max.bytes" -> s"$THREE_MEGABYTES") + + val customProducerConfig = Map("max.request.size" -> s"$THREE_MEGABYTES") + val customConsumerConfig = Map("max.partition.fetch.bytes" -> s"$THREE_MEGABYTES") + + implicit val customKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = customBrokerConfig, + customProducerProperties = customProducerConfig, + customConsumerProperties = customConsumerConfig) + + val bigMessage = generateMessageOfLength(TWO_MEGABYTES) val topic = "big-message-topic" withRunningKafka { From f2138a64c8520f5be739312300f1452a95cb511b Mon Sep 17 00:00:00 2001 From: kalondar Date: Tue, 28 Feb 2017 20:45:15 +0100 Subject: [PATCH 3/4] Update readme file with information about custom config properties --- README.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/README.md b/README.md index da5c8f0..256525f 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,30 @@ class MySpec extends WordSpec with EmbeddedKafka { } ``` + +The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties + +```scala +class MySpec extends WordSpec with EmbeddedKafka { + +"runs with custom producer and consumer properties" should { + val customBrokerConfig = Map("replica.fetch.max.bytes" -> "2000000", + "message.max.bytes" -> "2000000") + + val customProducerConfig = Map("max.request.size" -> "2000000") + val customConsumerConfig = Map("max.partition.fetch.bytes" -> "2000000") + + implicit val customKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = customBrokerConfig, + customProducerProperties = customProducerConfig, + customConsumerProperties = customConsumerConfig) + + withRunningKafka { + // now a kafka broker is listening on port 12345 + } + +} +``` This works for both `withRunningKafka` and `EmbeddedKafka.start()` From c22209679bd9328d1ca75352b79fc97fcc97ddf2 Mon Sep 17 00:00:00 2001 From: marcin Date: Wed, 1 Mar 2017 14:29:00 +0100 Subject: [PATCH 4/4] Clean code fixes --- .../EmbeddedKafkaCustomConfigSpec.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala index 41c1394..63bd6b5 100644 --- a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala +++ b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala @@ -3,24 +3,24 @@ package net.manub.embeddedkafka import scala.language.postfixOps class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka { - val TWO_MEGABYTES = 2097152 - val THREE_MEGABYTES = 3145728 + val TwoMegabytes = 2097152 + val ThreeMegabytes = 3145728 "the custom config" should { - "should allow pass additional producer parameters" in { + "allow pass additional producer parameters" in { val customBrokerConfig = Map( - "replica.fetch.max.bytes" -> s"$THREE_MEGABYTES", - "message.max.bytes" -> s"$THREE_MEGABYTES") + "replica.fetch.max.bytes" -> s"$ThreeMegabytes", + "message.max.bytes" -> s"$ThreeMegabytes") - val customProducerConfig = Map("max.request.size" -> s"$THREE_MEGABYTES") - val customConsumerConfig = Map("max.partition.fetch.bytes" -> s"$THREE_MEGABYTES") + val customProducerConfig = Map("max.request.size" -> s"$ThreeMegabytes") + val customConsumerConfig = Map("max.partition.fetch.bytes" -> s"$ThreeMegabytes") implicit val customKafkaConfig = EmbeddedKafkaConfig( customBrokerProperties = customBrokerConfig, customProducerProperties = customProducerConfig, customConsumerProperties = customConsumerConfig) - val bigMessage = generateMessageOfLength(TWO_MEGABYTES) + val bigMessage = generateMessageOfLength(TwoMegabytes) val topic = "big-message-topic" withRunningKafka { @@ -31,4 +31,4 @@ class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with Embedd } def generateMessageOfLength(length: Int): String = Stream.continually(util.Random.nextPrintableChar) take length mkString -} \ No newline at end of file +}