Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,73 @@ It is possible to create producers for custom types in two ways:

For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.

## Custom consumers

Use the `Consumer` trait that easily creates consumers of arbitrary key-value types and manages their lifecycle (via a loaner pattern).
* For basic String consumption use `Consumer.withStringConsumer { your code here }`.
* For arbitrary key and value types, expose implicit `Deserializer`s for each type and use `Consumer.withConsumer { your code here }`.
* If you just want to create a consumer and manage its lifecycle yourself then try `Consumer.newConsumer()`.


## Easy message consumption
With `ConsumerExtensions` you can turn a consumer to a Scala lazy Stream of key-value pairs and treat it as a collection for easy assertion.
* Just import the extensions.
* On any `KafkaConsumer` instance you can now do:

```scala
import net.manub.embeddedkafka.ConsumerExtensions._
...
consumer.consumeLazily("from-this-topic").take(3).toList should be (Seq(
"1" -> "one",
"2" -> "two",
"3" -> "three"
)
```


# scalatest-embedded-kafka-streams

A library that builds on top of `scalatest-embedded-kafka` to offer easy testing of [Kafka Streams](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams) with ScalaTest.
It uses Kafka Streams 0.10.0.1.
It takes care of instantiating and starting your streams as well as closing them after running your test-case code.

## How to use

* In your `build.sbt` file add the following dependency: `"net.manub" %% "scalatest-embedded-kafka-streams" % "0.8.1" % "test"`
* Have a look at the [example test](kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala)
* For most of the cases have your `Spec` extend the `EmbeddedKafkaStreamsAllInOne` trait. This offers both streams management and easy creation of consumers for asserting resulting messages in output/sink topics.
* If you only want to use the streams management without the test consumers just have the `Spec` extend the `EmbeddedKafkaStreams` trait.
* Use the `runStreamsWithStringConsumer` to:
* Create any topics that need to exist for the strems to operate (usually sources and sinks).
* Pass the Stream or Topology builder that will then be used to instantiate and start the Kafka Streams. This will be done while using the `withRunningKafka` closure internally so that your stream runs with an embedded Kafka and Zookeeper.
* Pass the `{code block}` that needs a running instance of your streams. This is where your actual test code will sit. You can publish messages to your source topics and consume messages from your sink topics that the Kafka Streams should have generated. This method also offers a pre-instantiated consumer that can read String keys and values.
* For more flexibility, use `runStreams` and `withConsumer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/net/manub/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala).

```scala
import net.manub.embeddedkafka.ConsumerExtensions._
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.scalatest.{Matchers, WordSpec}

class MySpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {
"my kafka stream" should {
"be easy to test" in {
val inputTopic = "input-topic"
val outputTopic = "output-topic"
// your code for building the stream goes here e.g.
val streamBuilder = new KStreamBuilder
streamBuilder.stream(inputTopic).to(outputTopic)
// tell the stream test
// 1. what topics need to be created before the stream starts
// 2. the builder to be used for initializing and starting the stream
runStreamsWithStringConsumer(
topicsToCreate = Seq(inputTopic, outputTopic),
builder = streamBuilder
){ consumer =>
// your test code goes here
publishToKafka(inputTopic, key = "hello", message = "world")
consumer.consumeLazily(outputTopic).head should be ("hello" -> "world")
}
}
}
}
```
41 changes: 32 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import sbtrelease.Version

val kafkaVersion = "0.10.0.1"

val slf4jLog4jOrg = "org.slf4j"
val slf4jLog4jArtifact = "slf4j-log4j12"

lazy val commonSettings = Seq(
name := "scalatest-embedded-kafka",
organization := "net.manub",
scalaVersion := "2.11.8",
crossScalaVersions := Seq("2.10.6", "2.11.8"),
homepage := Some(url("https://github.com/manub/scalatest-embedded-kafka")),
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.0",
"org.apache.kafka" %% "kafka" % "0.10.0.1" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.zookeeper" % "zookeeper" % "3.4.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.avro" % "avro" % "1.7.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"com.typesafe.akka" %% "akka-actor" % "2.3.14" % Test,
"com.typesafe.akka" %% "akka-testkit" % "2.3.14" % Test
)
logBuffered in Test := false,
fork in Test := true
)

lazy val commonLibrarySettings = libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.0",
"org.apache.kafka" %% "kafka" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.zookeeper" % "zookeeper" % "3.4.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"org.apache.avro" % "avro" % "1.7.7" exclude(slf4jLog4jOrg, slf4jLog4jArtifact),
"com.typesafe.akka" %% "akka-actor" % "2.3.14" % Test,
"com.typesafe.akka" %% "akka-testkit" % "2.3.14" % Test
)

lazy val publishSettings = Seq(
Expand Down Expand Up @@ -47,6 +51,25 @@ lazy val releaseSettings = Seq(
)

lazy val root = (project in file("."))
.settings(name := "scalatest-embedded-kafka-root")
.settings(commonSettings: _*)
.aggregate(embeddedKafka, kafkaStreams)


lazy val embeddedKafka = (project in file("embedded-kafka"))
.settings(name := "scalatest-embedded-kafka")
.settings(publishSettings: _*)
.settings(commonSettings: _*)
.settings(commonLibrarySettings)
.settings(releaseSettings: _*)

lazy val kafkaStreams = (project in file("kafka-streams"))
.settings(name := "scalatest-embedded-kafka-streams")
.settings(publishSettings: _*)
.settings(commonSettings: _*)
.settings(commonLibrarySettings)
.settings(releaseSettings: _*)
.settings(libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % kafkaVersion exclude(slf4jLog4jOrg, slf4jLog4jArtifact)
))
.dependsOn(embeddedKafka)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package net.manub.embeddedkafka

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.KafkaException
import org.apache.log4j.Logger

import scala.util.Try

/** Method extensions for Kafka's [[KafkaConsumer]] API allowing easy testing.*/
object ConsumerExtensions {
val MaximumAttempts = 3
implicit class ConsumerOps[K, V](val consumer: KafkaConsumer[K, V]) {

private val logger = Logger.getLogger(classOf[ConsumerOps[K, V]])

/** Consume messages from a given topic and return them as a lazily evaluated Scala Stream.
* Depending on how many messages are taken from the Scala Stream it will try up to 3 times
* to consume batches from the given topic, until it reaches the number of desired messages or
* return otherwise.
*
* @param topic the topic from which to consume messages
* @return the stream of consumed messages that you can do `.take(n: Int).toList`
* to evaluate the requested number of messages.
*/
def consumeLazily(topic: String): Stream[(K, V)] = {
val attempts = 1 to MaximumAttempts
attempts.toStream.flatMap { attempt =>
val batch: Seq[(K, V)] = getNextBatch(topic)
logger.debug(s"----> Batch $attempt ($topic) | ${batch.mkString("|")}")
batch
}
}

/** Get the next batch of messages from Kafka.
*
* @param topic the topic to consume
* @return the next batch of messages
*/
def getNextBatch(topic: String): Seq[(K, V)] = Try {
import scala.collection.JavaConversions._
consumer.subscribe(List(topic))
consumer.partitionsFor(topic)
val records = consumer.poll(2000)
// use toList to force eager evaluation. toSeq is lazy
records.iterator().toList.map(r => r.key -> r.value)
}.recover {
case ex: KafkaException => throw new KafkaUnavailableException(ex)
}.get
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package net.manub.embeddedkafka

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.Deserializer

/** Utility trait for easily creating Kafka consumers and accessing their consumed messages. */
trait Consumers {
/** Loaner pattern that allows running a code block with a newly created consumer.
* The consumer's lifecycle will be automatically handled and closed at the end of the
* given code block.
*
* @param block the code block to be executed with the instantiated consumer
* passed as an argument
* @tparam K the type of the consumer's Key
* @tparam V the type of the consumer's Value
* @tparam T the type of the block's returning result
* @return the result of the executed block
*/
def withConsumer[K: Deserializer, V: Deserializer, T](block: KafkaConsumer[K, V] => T)
(implicit config: EmbeddedKafkaConfig): T = {
val consumer = newConsumer[K, V]()
try {
val result = block(consumer)
result
} finally {
consumer.close()
}
}

/** Convenience alternative to `withConsumer` that offers a consumer for String keys and values.
*
* @param block the block to be executed with the consumer
* @tparam T the type of the result of the code block
* @return the code block result
*/
def withStringConsumer[T](block: KafkaConsumer[String, String] => T)
(implicit config: EmbeddedKafkaConfig): T = {
import net.manub.embeddedkafka.Codecs.stringDeserializer
withConsumer(block)
}

/** Create a new Kafka consumer.
*
* @tparam K the type of the consumer's Key
* @tparam V the type of the consumer's Value
* @return the new consumer
*/
def newConsumer[K: Deserializer, V: Deserializer]()
(implicit config: EmbeddedKafkaConfig): KafkaConsumer[K, V] = {
val props = new Properties()
props.put("group.id", UUIDs.newUuid().toString)
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
props.put("auto.offset.reset", "earliest")

new KafkaConsumer[K, V](props, implicitly[Deserializer[K]], implicitly[Deserializer[V]])
}
}
14 changes: 14 additions & 0 deletions embedded-kafka/src/main/scala/net/manub/embeddedkafka/UUIDs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package net.manub.embeddedkafka

import java.util.UUID

/** Utility object for creating unique test IDs.
* Useful for separating IDs and directories across test cases.
*/
object UUIDs {
/** Create a new unique ID.
*
* @return the unique ID
*/
def newUuid(): UUID = UUID.randomUUID()
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
log4j.rootLogger=info, stdout
log4j.rootLogger=off

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

# Pattern to output the caller's file name and line number.
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package net.manub.embeddedkafka.streams

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.processor.TopologyBuilder
import org.apache.log4j.Logger
import org.scalatest.Suite

/** Helper trait for testing Kafka Streams.
* It creates an embedded Kafka Instance for each test case.
* Use `runStreams` to execute your streams.
*/
trait EmbeddedKafkaStreams extends EmbeddedKafka with TestStreamsConfig {
this: Suite =>

private val logger = Logger.getLogger(classOf[EmbeddedKafkaStreams])

/** Execute Kafka streams and pass a block of code that can
* operate while the streams are active.
* The code block can be used for publishing and consuming messages in Kafka.
* The block gets a pre-initialized kafka consumer that can be used implicitly for
* util methods such as `consumeLazily(String)`.
*
* e.g.
*
* {{{
*runStreams(Seq("inputTopic", "outputTopic", streamBuilder) {
* // here you can publish and consume messages and make assertions
* publishToKafka(in, Seq("one-string", "another-string"))
* consumeFirstStringMessageFrom(in) should be ("one-string")
*}
* }}}
*
* @param topicsToCreate the topics that should be created in Kafka before launching the streams.
* @param builder the streams builder that will be used to instantiate the streams with
* a default configuration (all state directories are different and
* in temp folders)
* @param block the code block that will executed while the streams are active.
* Once the block has been executed the streams will be closed.
*/
def runStreams(topicsToCreate: Seq[String], builder: TopologyBuilder)
(block: => Any)
(implicit config: EmbeddedKafkaConfig): Any =
withRunningKafka {
topicsToCreate.foreach(topic => createCustomTopic(topic))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my opinion it would be more idiomatic .foreach(createCustomTopic)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I tried that originally but unfortunately it does not work:

Error:(45, 30) type mismatch;
 found   : (String, Map[String,String], Int, Int) => Unit
 required: String => ?
      topicsToCreate.foreach(createCustomTopic)

An alternative is foreach(createCustomTopic(_)). In my opinion that's not particularly pretty and I prefer being explicit as per above. I think it reads better.
What do you think?

val streamId = UUIDs.newUuid().toString
logger.debug(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(builder, streamConfig(streamId))
streams.start()
try {
block
} finally {
streams.close()
}
}(config)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package net.manub.embeddedkafka.streams

import net.manub.embeddedkafka.{Consumers, EmbeddedKafkaConfig}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.streams.processor.TopologyBuilder
import org.scalatest.Suite

/** Convenience trait for testing Kafka Streams with ScalaTest.
* It exposes `EmbeddedKafkaStreams.runStreams` as well as `Consumers` api
* for easily creating and querying consumers in tests.
*
* e.g.
* {{{
*runStreams(Seq("inputTopic", "outputTopic", streamBuilder) {
* withConsumer[String, String, Unit] { consumer =>
* // here you can publish and consume messages and make assertions
* publishToKafka(in, Seq("one-string", "another-string"))
* consumeLazily(out).take(2).toList should be (
* Seq("one-string" -> "true", "another-string" -> "true")
* )
* }
*}
* }}}
*
* @see [[Consumers]]
* @see [[EmbeddedKafkaStreams]]
*/
trait EmbeddedKafkaStreamsAllInOne extends EmbeddedKafkaStreams with Consumers {
this: Suite =>

/** Run Kafka Streams while offering a String-based consumer for easy testing of stream output.
*
* @param topicsToCreate the topics that should be created. Usually these should be the topics
* that the Streams-under-test use for inputs and outputs. They need to be
* created before running the streams and
* this is automatically taken care of.
* @param builder the streams builder that contains the stream topology that will be instantiated
* @param block the block of testing code that will be executed by passing the simple
* String-based consumer.
* @return the result of the testing code
*/
def runStreamsWithStringConsumer(topicsToCreate: Seq[String], builder: TopologyBuilder)
(block: KafkaConsumer[String, String] => Any)
(implicit config: EmbeddedKafkaConfig): Any =
runStreams(topicsToCreate, builder)(withStringConsumer[Any](block))(config)
}
Loading