diff --git a/README.md b/README.md index a648e0b..25b84dc 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,30 @@ class MySpec extends WordSpec with EmbeddedKafka { } ``` + +The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties + +```scala +class MySpec extends WordSpec with EmbeddedKafka { + +"runs with custom producer and consumer properties" should { + val customBrokerConfig = Map("replica.fetch.max.bytes" -> "2000000", + "message.max.bytes" -> "2000000") + + val customProducerConfig = Map("max.request.size" -> "2000000") + val customConsumerConfig = Map("max.partition.fetch.bytes" -> "2000000") + + implicit val customKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = customBrokerConfig, + customProducerProperties = customProducerConfig, + customConsumerProperties = customConsumerConfig) + + withRunningKafka { + // now a kafka broker is listening on port 12345 + } + +} +``` This works for both `withRunningKafka` and `EmbeddedKafka.start()` diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala index e2f2312..46bfaee 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala @@ -219,7 +219,7 @@ sealed trait EmbeddedKafkaSupport { ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}", ProducerConfig.MAX_BLOCK_MS_CONFIG -> 10000.toString, ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString - ) + ) ++ config.customProducerProperties private def baseConsumerConfig( implicit config: EmbeddedKafkaConfig): Properties = { @@ -228,6 +228,7 @@ sealed trait EmbeddedKafkaSupport { props.put("bootstrap.servers", s"localhost:${config.kafkaPort}") props.put("auto.offset.reset", "earliest") props.put("enable.auto.commit", "false") + props.putAll(config.customConsumerProperties) props } diff --git a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala index 9d44de3..d656dee 100644 --- a/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala +++ b/embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala @@ -2,8 +2,9 @@ package net.manub.embeddedkafka case class EmbeddedKafkaConfig(kafkaPort: Int = 6001, zooKeeperPort: Int = 6000, - customBrokerProperties: Map[String, String] = - Map.empty) + customBrokerProperties: Map[String, String] = Map.empty, + customProducerProperties: Map[String, String] = Map.empty, + customConsumerProperties: Map[String, String] = Map.empty) object EmbeddedKafkaConfig { implicit val defaultConfig = EmbeddedKafkaConfig() diff --git a/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala new file mode 100644 index 0000000..63bd6b5 --- /dev/null +++ b/embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaCustomConfigSpec.scala @@ -0,0 +1,34 @@ +package net.manub.embeddedkafka + +import scala.language.postfixOps + +class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka { + val TwoMegabytes = 2097152 + val ThreeMegabytes = 3145728 + + "the custom config" should { + "allow pass additional producer parameters" in { + val customBrokerConfig = Map( + "replica.fetch.max.bytes" -> s"$ThreeMegabytes", + "message.max.bytes" -> s"$ThreeMegabytes") + + val customProducerConfig = Map("max.request.size" -> s"$ThreeMegabytes") + val customConsumerConfig = Map("max.partition.fetch.bytes" -> s"$ThreeMegabytes") + + implicit val customKafkaConfig = EmbeddedKafkaConfig( + customBrokerProperties = customBrokerConfig, + customProducerProperties = customProducerConfig, + customConsumerProperties = customConsumerConfig) + + val bigMessage = generateMessageOfLength(TwoMegabytes) + val topic = "big-message-topic" + + withRunningKafka { + publishStringMessageToKafka(topic, bigMessage) + consumeFirstStringMessageFrom(topic) shouldBe bigMessage + } + } + } + + def generateMessageOfLength(length: Int): String = Stream.continually(util.Random.nextPrintableChar) take length mkString +}