Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ lazy val commonSettings = Seq(
"org.scalatest" %% "scalatest" % "2.2.5",
"org.apache.kafka" %% "kafka" % "0.8.2.1",
"org.apache.zookeeper" % "zookeeper" % "3.4.6",
"org.apache.avro" % "avro" % "1.7.7",

"com.typesafe.akka" %% "akka-actor" % "2.3.11" % "test",
"com.typesafe.akka" %% "akka-testkit" % "2.3.11" % "test"
Expand Down
40 changes: 26 additions & 14 deletions src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import java.util.Properties
import java.util.concurrent.Executors

import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
import kafka.serializer.StringDecoder
import kafka.serializer.{Decoder, StringDecoder}
import kafka.server.{KafkaConfig, KafkaServer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringSerializer}
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
import org.scalatest.Suite

Expand Down Expand Up @@ -46,6 +46,9 @@ trait EmbeddedKafka {
}
}

def publishStringMessageToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit =
publishToKafka(topic, message)(config, new StringSerializer)

/**
* Publishes asynchronously a message to the running Kafka broker.
*
Expand All @@ -55,24 +58,25 @@ trait EmbeddedKafka {
* @throws KafkaUnavailableException if unable to connect to Kafka
*/
@throws(classOf[KafkaUnavailableException])
def publishToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit = {
def publishToKafka[T](topic: String, message: T)(implicit config: EmbeddedKafkaConfig, serializer: Serializer[T]): Unit = {

val kafkaProducer = new KafkaProducer[String, String](Map[String, String](
val kafkaProducer = new KafkaProducer(Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
))
), new StringSerializer, serializer)

val sendFuture = kafkaProducer.send(new ProducerRecord[String, String](topic, message))
val sendFuture = kafkaProducer.send(new ProducerRecord(topic, message))
val sendResult = Try { sendFuture.get(3, SECONDS) }

kafkaProducer.close()

if (sendResult.isFailure) throw new KafkaUnavailableException
}

def consumeFirstStringMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String =
consumeFirstMessageFrom(topic)(config, new StringDecoder())


/**
* Consumes the first message available in a given topic, deserializing it as a String.
Expand All @@ -85,7 +89,7 @@ trait EmbeddedKafka {
*/
@throws(classOf[TimeoutException])
@throws(classOf[KafkaUnavailableException])
def consumeFirstMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String = {
def consumeFirstMessageFrom[T](topic: String)(implicit config: EmbeddedKafkaConfig, decoder: Decoder[T]): T = {
val props = new Properties()
props.put("group.id", s"embedded-kafka-spec-$suiteId")
props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}")
Expand All @@ -98,7 +102,7 @@ trait EmbeddedKafka {

val filter = Whitelist(topic)
val messageStreams =
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = new StringDecoder)
consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = decoder)

val messageFuture = Future { messageStreams.headOption
.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
Expand All @@ -113,13 +117,21 @@ trait EmbeddedKafka {

object aKafkaProducer {
def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = {
new KafkaProducer[String, V](Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
new KafkaProducer[String, V]( basicKafkaConfig(config) + (
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName
))
}

def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) =
new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer)

def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = {
Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}",
ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString,
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
))
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package net.manub.embeddedkafka.marshalling.avro

import java.io.ByteArrayOutputStream

import kafka.serializer.{Decoder, Encoder}
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.io._
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecord}
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) extends Decoder[T]{
private[this] val NoInstanceReuse = null.asInstanceOf[T]
private[this] val NoDecoderReuse = null.asInstanceOf[BinaryDecoder]
private[this] val reader: SpecificDatumReader[T] = new SpecificDatumReader[T](schema)

override def fromBytes(bytes: Array[Byte]): T = {
val decoder = DecoderFactory.get().binaryDecoder(bytes, NoDecoderReuse)
reader.read(NoInstanceReuse, decoder)
}
}

class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null) extends Encoder[T] {
private[this] val NoEncoderReuse = null.asInstanceOf[BinaryEncoder]

override def toBytes(nullableData: T): Array[Byte] = {
Option(nullableData).fold[Array[Byte]](null) { data =>
val writer: DatumWriter[T] = new SpecificDatumWriter[T](data.getSchema)
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(out, NoEncoderReuse)

writer.write(data, encoder)
encoder.flush()
out.close()

out.toByteArray
}

}
}

class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema) extends Deserializer[T]{
private[this] val decoder = new KafkaAvroDecoder[T](schema = schema)

override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}

override def close(): Unit = {}

override def deserialize(topic: String, data: Array[Byte]): T = decoder.fromBytes(data)
}

class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] {
private[this] val encoder = new KafkaAvroEncoder[T]()

override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {}

override def serialize(topic: String, data: T): Array[Byte] = encoder.toBytes(data)

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package net.manub.embeddedkafka.marshalling

import kafka.serializer.{Encoder, Decoder}
import kafka.utils.VerifiableProperties
import org.apache.avro.Schema
import org.apache.avro.specific.SpecificRecord
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

package object avro {
implicit def specificAvroSerializer[T <: SpecificRecord] : Serializer[T] = new KafkaAvroSerializer[T]
implicit def specificAvroEncoder[T <: SpecificRecord] : Encoder[T] = new KafkaAvroEncoder[T]

def specificAvroDeserializer[T <: SpecificRecord](schema: Schema) : Deserializer[T] =
new KafkaAvroDeserializer[T](schema)

def specificAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) : Decoder[T] =
new KafkaAvroDecoder[T](schema, props)
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package net.manub.embeddedkafka

import kafka.serializer._
import org.apache.kafka.common.serialization._


package object marshalling {
implicit val stringEncoder: Encoder[String] = new StringEncoder()
implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder()
implicit val stringSerializer: Serializer[String] = new StringSerializer()
implicit val nullSerializer: Serializer[Array[Byte]] = new ByteArraySerializer()

implicit val stringDecoder: Decoder[String] = new StringDecoder()
implicit val nullDecoder: Decoder[Array[Byte]] = new DefaultDecoder()
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer()
implicit val nullDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer()
}
79 changes: 72 additions & 7 deletions src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ class EmbeddedKafkaSpec

"the publishToKafka method" should {

"publishes asynchronously a message to Kafka as String" in {
"publishes asynchronously a message to Kafka" in {

withRunningKafka {

val message = "hello world!"
val topic = "test_topic"

publishToKafka(topic, message)
publishStringMessageToKafka(topic, message)

val consumer = Consumer.create(consumerConfigForEmbeddedKafka)

Expand Down Expand Up @@ -142,9 +142,8 @@ class EmbeddedKafkaSpec
}

"throws a KafkaUnavailableException when Kafka is unavailable when trying to publish" in {

a[KafkaUnavailableException] shouldBe thrownBy {
publishToKafka("non_existing_topic", "a message")
publishStringMessageToKafka("non_existing_topic", "a message")
}
}
}
Expand All @@ -165,7 +164,50 @@ class EmbeddedKafkaSpec
))

whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
consumeFirstMessageFrom(topic) shouldBe message
consumeFirstStringMessageFrom(topic) shouldBe message
}

producer.close()
}
}

"returns a message published to a topic with implicit decoder" in {

withRunningKafka {

val message = "hello world!"
val topic = "test_topic"

val producer = new KafkaProducer[String, String](Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
))

import marshalling._
whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ =>
consumeFirstMessageFrom[Array[Byte]](topic) shouldBe message.getBytes
}

producer.close()
}
}

"return a message published to a topic with custom decoder" in {

import marshalling.avro._
withRunningKafka {

val message = TestAvroClass("name")
val topic = "test_topic"
implicit val testAvroClassDecoder = specificAvroDecoder[TestAvroClass](TestAvroClass.SCHEMA$)

val producer = new KafkaProducer[String, TestAvroClass](Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001"
), new StringSerializer, specificAvroSerializer[TestAvroClass])

whenReady(producer.send(new ProducerRecord(topic, message))) { _ =>
consumeFirstMessageFrom[TestAvroClass](topic) shouldBe message
}

producer.close()
Expand All @@ -176,15 +218,15 @@ class EmbeddedKafkaSpec

withRunningKafka {
a[TimeoutException] shouldBe thrownBy {
consumeFirstMessageFrom("non_existing_topic")
consumeFirstStringMessageFrom("non_existing_topic")
}
}
}

"throws a KafkaUnavailableException when there's no running instance of Kafka" in {

a[KafkaUnavailableException] shouldBe thrownBy {
consumeFirstMessageFrom("non_existing_topic")
consumeFirstStringMessageFrom("non_existing_topic")
}
}
}
Expand All @@ -201,6 +243,27 @@ class EmbeddedKafkaSpec
}
}

"the aKafkaProducer object" should {

"return a producer that encodes messages for the given type" in {
import marshalling._
withRunningKafka {
val producer = aKafkaProducer[String]
producer.send(new ProducerRecord[String, String]("a topic", "a message"))
}
}


"return a producer that encodes messages for a custom type" in {
import marshalling.avro._

withRunningKafka {
val producer = aKafkaProducer[TestAvroClass]
producer.send(new ProducerRecord[String, TestAvroClass]("a topic", TestAvroClass("name")))
}
}
}

lazy val consumerConfigForEmbeddedKafka: ConsumerConfig = {
val props = new Properties()
props.put("group.id", "test")
Expand All @@ -211,6 +274,8 @@ class EmbeddedKafkaSpec
}
}



object TcpClient {
def props(remote: InetSocketAddress, replies: ActorRef) = Props(classOf[TcpClient], remote, replies)
}
Expand Down
37 changes: 37 additions & 0 deletions src/test/scala/net/manub/embeddedkafka/TestAvroClass.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package net.manub.embeddedkafka

import org.apache.avro.specific.SpecificRecordBase
import org.apache.avro.{AvroRuntimeException, Schema}

case class TestAvroClass(var name: String) extends SpecificRecordBase {
def this() = this("")

override def get(i: Int): AnyRef = i match {
case 0 => name
case _ => throw new AvroRuntimeException("Bad index")
}

override def put(i: Int, v: scala.Any): Unit = i match {
case 0 => name = v match {
case (utf8: org.apache.avro.util.Utf8) => utf8.toString
case _ => v.asInstanceOf[String]
}
case _ => throw new AvroRuntimeException("Bad index")
}

override def getSchema: Schema = TestAvroClass.SCHEMA$
}

object TestAvroClass {
val SCHEMA$ = (new Schema.Parser).parse(
"""
|{"namespace": "example",
| "type": "record",
| "namespace": "net.manub.embeddedkafka",
| "name": "TestAvroClass",
| "fields": [
| {"name": "name", "type": "string"}
| ]
|}
""".stripMargin )
}