Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,30 @@ class MySpec extends WordSpec with EmbeddedKafka {

}
```

The same implicit `EmbeddedKafkaConfig` is used to define custom consumer or producer properties

```scala
class MySpec extends WordSpec with EmbeddedKafka {

"runs with custom producer and consumer properties" should {
val customBrokerConfig = Map("replica.fetch.max.bytes" -> "2000000",
"message.max.bytes" -> "2000000")

val customProducerConfig = Map("max.request.size" -> "2000000")
val customConsumerConfig = Map("max.partition.fetch.bytes" -> "2000000")

implicit val customKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = customBrokerConfig,
customProducerProperties = customProducerConfig,
customConsumerProperties = customConsumerConfig)

withRunningKafka {
// now a kafka broker is listening on port 12345
}

}
```

This works for both `withRunningKafka` and `EmbeddedKafka.start()`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ sealed trait EmbeddedKafkaSupport {
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
ProducerConfig.MAX_BLOCK_MS_CONFIG -> 10000.toString,
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
)
) ++ config.customProducerProperties

private def baseConsumerConfig(
implicit config: EmbeddedKafkaConfig): Properties = {
Expand All @@ -228,6 +228,7 @@ sealed trait EmbeddedKafkaSupport {
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
props.put("auto.offset.reset", "earliest")
props.put("enable.auto.commit", "false")
props.putAll(config.customConsumerProperties)
props
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package net.manub.embeddedkafka

case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
zooKeeperPort: Int = 6000,
customBrokerProperties: Map[String, String] =
Map.empty)
customBrokerProperties: Map[String, String] = Map.empty,
customProducerProperties: Map[String, String] = Map.empty,
customConsumerProperties: Map[String, String] = Map.empty)

object EmbeddedKafkaConfig {
implicit val defaultConfig = EmbeddedKafkaConfig()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package net.manub.embeddedkafka

import scala.language.postfixOps

class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
val TwoMegabytes = 2097152
val ThreeMegabytes = 3145728

"the custom config" should {
"allow pass additional producer parameters" in {
val customBrokerConfig = Map(
"replica.fetch.max.bytes" -> s"$ThreeMegabytes",
"message.max.bytes" -> s"$ThreeMegabytes")

val customProducerConfig = Map("max.request.size" -> s"$ThreeMegabytes")
val customConsumerConfig = Map("max.partition.fetch.bytes" -> s"$ThreeMegabytes")

implicit val customKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = customBrokerConfig,
customProducerProperties = customProducerConfig,
customConsumerProperties = customConsumerConfig)

val bigMessage = generateMessageOfLength(TwoMegabytes)
val topic = "big-message-topic"

withRunningKafka {
publishStringMessageToKafka(topic, bigMessage)
consumeFirstStringMessageFrom(topic) shouldBe bigMessage
}
}
}

def generateMessageOfLength(length: Int): String = Stream.continually(util.Random.nextPrintableChar) take length mkString
}