Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ The `EmbeddedKafka` trait provides also some utility methods to interact with th

def consumeFirstMessageFrom(topic: String): String

def createCustomTopic(topic: String, topicConfig: Map[String,String]): Unit
def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit

## Custom producers

Expand Down
14 changes: 9 additions & 5 deletions src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,19 @@ sealed trait EmbeddedKafkaSupport {
/**
* Creates a topic with a custom configuration
*
* @param topic the topic name
* @param topicConfig per topic configuration [[Map]]
* @param config an implicit [[EmbeddedKafkaConfig]]
* @param topic the topic name
* @param topicConfig per topic configuration [[Map]]
* @param partitions number of partitions [[Int]]
* @param replicationFactor replication factor [[Int]]
* @param config an implicit [[EmbeddedKafkaConfig]]
*/
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add some ScalaDocs here?

def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty)(implicit config: EmbeddedKafkaConfig): Unit = {
def createCustomTopic(topic: String, topicConfig: Map[String,String] = Map.empty,
partitions: Int = 1, replicationFactor: Int = 1)(implicit config: EmbeddedKafkaConfig): Unit = {

val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
val topicProperties = topicConfig.foldLeft(new Properties){case (props, (k,v)) => props.put(k,v); props}

try AdminUtils.createTopic(zkUtils, topic, 1, 1, topicProperties) finally zkUtils.close()
try AdminUtils.createTopic(zkUtils, topic, partitions, replicationFactor, topicProperties) finally zkUtils.close()
}

}
18 changes: 18 additions & 0 deletions src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {

}
}

"create a topic with custom number of partitions" in {
implicit val config = EmbeddedKafkaConfig()
val topic = "test_custom_topic"

withRunningKafka {

createCustomTopic(topic, Map("cleanup.policy"->"compact"), partitions = 2)

val zkSessionTimeoutMs = 10000
val zkConnectionTimeoutMs = 10000
val zkSecurityEnabled = false

val zkUtils = ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled)
try { AdminUtils.fetchTopicMetadataFromZk(topic, zkUtils).partitionsMetadata.size shouldBe 2 } finally zkUtils.close()

}
}
}

"the consumeFirstStringMessageFrom method" should {
Expand Down