From 0fa5cf34c57c48dcef91d39341e4a2ba91c0738c Mon Sep 17 00:00:00 2001 From: galarragas Date: Mon, 3 Aug 2015 19:48:59 +0100 Subject: [PATCH] Some refactoring to support: - Definition of publisher using just the class type (and using implicits) - Type-specific publish and receive methods - AVRO support --- build.sbt | 1 + .../manub/embeddedkafka/EmbeddedKafka.scala | 40 ++++++---- .../marshalling/avro/marshallers.scala | 60 ++++++++++++++ .../marshalling/avro/package.scala | 19 +++++ .../marshalling/marshalling.scala | 17 ++++ .../embeddedkafka/EmbeddedKafkaSpec.scala | 79 +++++++++++++++++-- .../manub/embeddedkafka/TestAvroClass.scala | 37 +++++++++ 7 files changed, 232 insertions(+), 21 deletions(-) create mode 100644 src/main/scala/net/manub/embeddedkafka/marshalling/avro/marshallers.scala create mode 100644 src/main/scala/net/manub/embeddedkafka/marshalling/avro/package.scala create mode 100644 src/main/scala/net/manub/embeddedkafka/marshalling/marshalling.scala create mode 100644 src/test/scala/net/manub/embeddedkafka/TestAvroClass.scala diff --git a/build.sbt b/build.sbt index dfdf14f..2ab611f 100644 --- a/build.sbt +++ b/build.sbt @@ -10,6 +10,7 @@ lazy val commonSettings = Seq( "org.scalatest" %% "scalatest" % "2.2.5", "org.apache.kafka" %% "kafka" % "0.8.2.1", "org.apache.zookeeper" % "zookeeper" % "3.4.6", + "org.apache.avro" % "avro" % "1.7.7", "com.typesafe.akka" %% "akka-actor" % "2.3.11" % "test", "com.typesafe.akka" %% "akka-testkit" % "2.3.11" % "test" diff --git a/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala b/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala index ecd4fe9..709ae04 100644 --- a/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala +++ b/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala @@ -5,10 +5,10 @@ import java.util.Properties import java.util.concurrent.Executors import kafka.consumer.{Consumer, ConsumerConfig, Whitelist} -import kafka.serializer.StringDecoder +import kafka.serializer.{Decoder, StringDecoder} import kafka.server.{KafkaConfig, KafkaServer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.serialization.{Serializer, StringSerializer} +import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringSerializer} import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer} import org.scalatest.Suite @@ -46,6 +46,9 @@ trait EmbeddedKafka { } } + def publishStringMessageToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit = + publishToKafka(topic, message)(config, new StringSerializer) + /** * Publishes asynchronously a message to the running Kafka broker. * @@ -55,17 +58,15 @@ trait EmbeddedKafka { * @throws KafkaUnavailableException if unable to connect to Kafka */ @throws(classOf[KafkaUnavailableException]) - def publishToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit = { + def publishToKafka[T](topic: String, message: T)(implicit config: EmbeddedKafkaConfig, serializer: Serializer[T]): Unit = { - val kafkaProducer = new KafkaProducer[String, String](Map[String, String]( + val kafkaProducer = new KafkaProducer(Map( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}", - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString, ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString - )) + ), new StringSerializer, serializer) - val sendFuture = kafkaProducer.send(new ProducerRecord[String, String](topic, message)) + val sendFuture = kafkaProducer.send(new ProducerRecord(topic, message)) val sendResult = Try { sendFuture.get(3, SECONDS) } kafkaProducer.close() @@ -73,6 +74,9 @@ trait EmbeddedKafka { if (sendResult.isFailure) throw new KafkaUnavailableException } + def consumeFirstStringMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String = + consumeFirstMessageFrom(topic)(config, new StringDecoder()) + /** * Consumes the first message available in a given topic, deserializing it as a String. @@ -85,7 +89,7 @@ trait EmbeddedKafka { */ @throws(classOf[TimeoutException]) @throws(classOf[KafkaUnavailableException]) - def consumeFirstMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String = { + def consumeFirstMessageFrom[T](topic: String)(implicit config: EmbeddedKafkaConfig, decoder: Decoder[T]): T = { val props = new Properties() props.put("group.id", s"embedded-kafka-spec-$suiteId") props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}") @@ -98,7 +102,7 @@ trait EmbeddedKafka { val filter = Whitelist(topic) val messageStreams = - consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = new StringDecoder) + consumer.createMessageStreamsByFilter(filter, keyDecoder = new StringDecoder, valueDecoder = decoder) val messageFuture = Future { messageStreams.headOption .getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message() @@ -113,13 +117,21 @@ trait EmbeddedKafka { object aKafkaProducer { def thatSerializesValuesWith[V](serializer: Class[_ <: Serializer[V]])(implicit config: EmbeddedKafkaConfig) = { - new KafkaProducer[String, V](Map( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}", + new KafkaProducer[String, V]( basicKafkaConfig(config) + ( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> serializer.getName + )) + } + + def apply[V](implicit valueSerializer: Serializer[V], config: EmbeddedKafkaConfig) = + new KafkaProducer[String, V](basicKafkaConfig(config), new StringSerializer, valueSerializer) + + def basicKafkaConfig[V](config: EmbeddedKafkaConfig): Map[String, String] = { + Map( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:${config.kafkaPort}", ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG -> 3000.toString, ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString - )) + ) } } diff --git a/src/main/scala/net/manub/embeddedkafka/marshalling/avro/marshallers.scala b/src/main/scala/net/manub/embeddedkafka/marshalling/avro/marshallers.scala new file mode 100644 index 0000000..9266dcc --- /dev/null +++ b/src/main/scala/net/manub/embeddedkafka/marshalling/avro/marshallers.scala @@ -0,0 +1,60 @@ +package net.manub.embeddedkafka.marshalling.avro + +import java.io.ByteArrayOutputStream + +import kafka.serializer.{Decoder, Encoder} +import kafka.utils.VerifiableProperties +import org.apache.avro.Schema +import org.apache.avro.io._ +import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter, SpecificRecord} +import org.apache.kafka.common.serialization.{Deserializer, Serializer} + +class KafkaAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) extends Decoder[T]{ + private[this] val NoInstanceReuse = null.asInstanceOf[T] + private[this] val NoDecoderReuse = null.asInstanceOf[BinaryDecoder] + private[this] val reader: SpecificDatumReader[T] = new SpecificDatumReader[T](schema) + + override def fromBytes(bytes: Array[Byte]): T = { + val decoder = DecoderFactory.get().binaryDecoder(bytes, NoDecoderReuse) + reader.read(NoInstanceReuse, decoder) + } +} + +class KafkaAvroEncoder[T <: SpecificRecord](props: VerifiableProperties = null) extends Encoder[T] { + private[this] val NoEncoderReuse = null.asInstanceOf[BinaryEncoder] + + override def toBytes(nullableData: T): Array[Byte] = { + Option(nullableData).fold[Array[Byte]](null) { data => + val writer: DatumWriter[T] = new SpecificDatumWriter[T](data.getSchema) + val out = new ByteArrayOutputStream() + val encoder = EncoderFactory.get.binaryEncoder(out, NoEncoderReuse) + + writer.write(data, encoder) + encoder.flush() + out.close() + + out.toByteArray + } + + } +} + +class KafkaAvroDeserializer[T <: SpecificRecord](schema: Schema) extends Deserializer[T]{ + private[this] val decoder = new KafkaAvroDecoder[T](schema = schema) + + override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {} + + override def close(): Unit = {} + + override def deserialize(topic: String, data: Array[Byte]): T = decoder.fromBytes(data) +} + +class KafkaAvroSerializer[T <: SpecificRecord]() extends Serializer[T] { + private[this] val encoder = new KafkaAvroEncoder[T]() + + override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {} + + override def serialize(topic: String, data: T): Array[Byte] = encoder.toBytes(data) + + override def close(): Unit = {} +} \ No newline at end of file diff --git a/src/main/scala/net/manub/embeddedkafka/marshalling/avro/package.scala b/src/main/scala/net/manub/embeddedkafka/marshalling/avro/package.scala new file mode 100644 index 0000000..a5fee34 --- /dev/null +++ b/src/main/scala/net/manub/embeddedkafka/marshalling/avro/package.scala @@ -0,0 +1,19 @@ +package net.manub.embeddedkafka.marshalling + +import kafka.serializer.{Encoder, Decoder} +import kafka.utils.VerifiableProperties +import org.apache.avro.Schema +import org.apache.avro.specific.SpecificRecord +import org.apache.kafka.common.serialization.{Deserializer, Serializer} + +package object avro { + implicit def specificAvroSerializer[T <: SpecificRecord] : Serializer[T] = new KafkaAvroSerializer[T] + implicit def specificAvroEncoder[T <: SpecificRecord] : Encoder[T] = new KafkaAvroEncoder[T] + + def specificAvroDeserializer[T <: SpecificRecord](schema: Schema) : Deserializer[T] = + new KafkaAvroDeserializer[T](schema) + + def specificAvroDecoder[T <: SpecificRecord](schema: Schema, props: VerifiableProperties = null) : Decoder[T] = + new KafkaAvroDecoder[T](schema, props) +} + diff --git a/src/main/scala/net/manub/embeddedkafka/marshalling/marshalling.scala b/src/main/scala/net/manub/embeddedkafka/marshalling/marshalling.scala new file mode 100644 index 0000000..5a606f0 --- /dev/null +++ b/src/main/scala/net/manub/embeddedkafka/marshalling/marshalling.scala @@ -0,0 +1,17 @@ +package net.manub.embeddedkafka + +import kafka.serializer._ +import org.apache.kafka.common.serialization._ + + +package object marshalling { + implicit val stringEncoder: Encoder[String] = new StringEncoder() + implicit val nullEncoder: Encoder[Array[Byte]] = new DefaultEncoder() + implicit val stringSerializer: Serializer[String] = new StringSerializer() + implicit val nullSerializer: Serializer[Array[Byte]] = new ByteArraySerializer() + + implicit val stringDecoder: Decoder[String] = new StringDecoder() + implicit val nullDecoder: Decoder[Array[Byte]] = new DefaultDecoder() + implicit val stringDeserializer: Deserializer[String] = new StringDeserializer() + implicit val nullDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer() +} diff --git a/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala b/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala index 5b391ae..eb2308c 100644 --- a/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala +++ b/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala @@ -107,14 +107,14 @@ class EmbeddedKafkaSpec "the publishToKafka method" should { - "publishes asynchronously a message to Kafka as String" in { + "publishes asynchronously a message to Kafka" in { withRunningKafka { val message = "hello world!" val topic = "test_topic" - publishToKafka(topic, message) + publishStringMessageToKafka(topic, message) val consumer = Consumer.create(consumerConfigForEmbeddedKafka) @@ -142,9 +142,8 @@ class EmbeddedKafkaSpec } "throws a KafkaUnavailableException when Kafka is unavailable when trying to publish" in { - a[KafkaUnavailableException] shouldBe thrownBy { - publishToKafka("non_existing_topic", "a message") + publishStringMessageToKafka("non_existing_topic", "a message") } } } @@ -165,7 +164,50 @@ class EmbeddedKafkaSpec )) whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ => - consumeFirstMessageFrom(topic) shouldBe message + consumeFirstStringMessageFrom(topic) shouldBe message + } + + producer.close() + } + } + + "returns a message published to a topic with implicit decoder" in { + + withRunningKafka { + + val message = "hello world!" + val topic = "test_topic" + + val producer = new KafkaProducer[String, String](Map( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001", + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName + )) + + import marshalling._ + whenReady(producer.send(new ProducerRecord[String, String](topic, message))) { _ => + consumeFirstMessageFrom[Array[Byte]](topic) shouldBe message.getBytes + } + + producer.close() + } + } + + "return a message published to a topic with custom decoder" in { + + import marshalling.avro._ + withRunningKafka { + + val message = TestAvroClass("name") + val topic = "test_topic" + implicit val testAvroClassDecoder = specificAvroDecoder[TestAvroClass](TestAvroClass.SCHEMA$) + + val producer = new KafkaProducer[String, TestAvroClass](Map( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001" + ), new StringSerializer, specificAvroSerializer[TestAvroClass]) + + whenReady(producer.send(new ProducerRecord(topic, message))) { _ => + consumeFirstMessageFrom[TestAvroClass](topic) shouldBe message } producer.close() @@ -176,7 +218,7 @@ class EmbeddedKafkaSpec withRunningKafka { a[TimeoutException] shouldBe thrownBy { - consumeFirstMessageFrom("non_existing_topic") + consumeFirstStringMessageFrom("non_existing_topic") } } } @@ -184,7 +226,7 @@ class EmbeddedKafkaSpec "throws a KafkaUnavailableException when there's no running instance of Kafka" in { a[KafkaUnavailableException] shouldBe thrownBy { - consumeFirstMessageFrom("non_existing_topic") + consumeFirstStringMessageFrom("non_existing_topic") } } } @@ -201,6 +243,27 @@ class EmbeddedKafkaSpec } } + "the aKafkaProducer object" should { + + "return a producer that encodes messages for the given type" in { + import marshalling._ + withRunningKafka { + val producer = aKafkaProducer[String] + producer.send(new ProducerRecord[String, String]("a topic", "a message")) + } + } + + + "return a producer that encodes messages for a custom type" in { + import marshalling.avro._ + + withRunningKafka { + val producer = aKafkaProducer[TestAvroClass] + producer.send(new ProducerRecord[String, TestAvroClass]("a topic", TestAvroClass("name"))) + } + } + } + lazy val consumerConfigForEmbeddedKafka: ConsumerConfig = { val props = new Properties() props.put("group.id", "test") @@ -211,6 +274,8 @@ class EmbeddedKafkaSpec } } + + object TcpClient { def props(remote: InetSocketAddress, replies: ActorRef) = Props(classOf[TcpClient], remote, replies) } diff --git a/src/test/scala/net/manub/embeddedkafka/TestAvroClass.scala b/src/test/scala/net/manub/embeddedkafka/TestAvroClass.scala new file mode 100644 index 0000000..62e9f3e --- /dev/null +++ b/src/test/scala/net/manub/embeddedkafka/TestAvroClass.scala @@ -0,0 +1,37 @@ +package net.manub.embeddedkafka + +import org.apache.avro.specific.SpecificRecordBase +import org.apache.avro.{AvroRuntimeException, Schema} + +case class TestAvroClass(var name: String) extends SpecificRecordBase { + def this() = this("") + + override def get(i: Int): AnyRef = i match { + case 0 => name + case _ => throw new AvroRuntimeException("Bad index") + } + + override def put(i: Int, v: scala.Any): Unit = i match { + case 0 => name = v match { + case (utf8: org.apache.avro.util.Utf8) => utf8.toString + case _ => v.asInstanceOf[String] + } + case _ => throw new AvroRuntimeException("Bad index") + } + + override def getSchema: Schema = TestAvroClass.SCHEMA$ +} + +object TestAvroClass { + val SCHEMA$ = (new Schema.Parser).parse( + """ + |{"namespace": "example", + | "type": "record", + | "namespace": "net.manub.embeddedkafka", + | "name": "TestAvroClass", + | "fields": [ + | {"name": "name", "type": "string"} + | ] + |} + """.stripMargin ) +} \ No newline at end of file