Scala Library for Unit-Testing Processing Topologies in Apache Kafka / Kafka Streams
Scala
Latest commit 890eeee Feb 13, 2017 @jpzk committed on GitHub Merge pull request #9 from hrafzali/feature/state
Added stateTable
Permalink
Failed to load latest commit information.
project Removed scoverage plugin Oct 24, 2016
src Updated stateTable tests Feb 13, 2017
.gitignore added .gitignore Oct 3, 2016
.travis.yml added travis.yml Sep 28, 2016
CHANGELOG sbt: changed version of Kafka Jan 3, 2017
LICENSE added LICENSE Sep 28, 2016
README.md Update README.md Jan 3, 2017
build.sbt sbt: version bump Jan 3, 2017

README.md

Mocked Streams

Build Status License GitHub stars

Mocked Streams 1.0 (git) is a library for Scala >= 2.11.8 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Further, you can use your favourite Scala testing framework e.g. ScalaTest and Specs2. Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your SBT dependencies:

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.1.0" % "test"

Apache Kafka Compatibility

Mocked Streams Version Apache Kafka Version
1.1.0 0.10.1.1
1.0.0 0.10.1.0

Simple Example

It wraps the org.apache.kafka.test.ProcessorTopologyTestDriver class, but adds more syntactic sugar to keep your test code simple:

import com.madewithtea.mockedstreams.MockedStreams

val input = Seq(("x", "v1"), ("y", "v2"))
val exp = Seq(("x", "V1"), ("y", "V2"))
val strings = Serdes.String()

MockedStreams()
  .topology { builder => builder.stream(...) [...] }
  .input("topic-in", strings, strings, input)
  .output("topic-out", strings, strings, exp.size) shouldEqual exp

Multiple Input / Output Example and State

It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(Seq[String]):

import com.madewithtea.mockedstreams.MockedStreams

val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] }
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)

Custom Streams Configuration

Sometimes you need to pass a custom configuration to Kafka Streams:

import com.madewithtea.mockedstreams.MockedStreams

  val props = new Properties
  props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)

  val mstreams = MockedStreams()
  .topology { builder => builder.stream(...) [...] }
  .config(props)
  .input("in-a", strings, ints, inputA)
  .input("in-b", strings, ints, inputB)
  .stores(Seq("store-name"))

mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA)
mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB)