From 4fc1f69501c63c3be6b23195db5ddbd00197587b Mon Sep 17 00:00:00 2001 From: akushka Date: Tue, 19 Sep 2017 12:59:20 +0200 Subject: [PATCH] Implemented circe deserializer with fallback --- .../circe/CirceSerialization.scala | 16 +++++++++------ .../circe/CirceSerializationSpec.scala | 20 +++++++++++++++++-- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/circe/src/main/scala/com/ovoenergy/kafka/serialization/circe/CirceSerialization.scala b/circe/src/main/scala/com/ovoenergy/kafka/serialization/circe/CirceSerialization.scala index 5c6961b..7a26211 100644 --- a/circe/src/main/scala/com/ovoenergy/kafka/serialization/circe/CirceSerialization.scala +++ b/circe/src/main/scala/com/ovoenergy/kafka/serialization/circe/CirceSerialization.scala @@ -15,12 +15,16 @@ private[circe] trait CirceSerialization { data.asJson.noSpaces.getBytes(StandardCharsets.UTF_8) } - def circeJsonDeserializer[T: Decoder]: KafkaDeserializer[T] = deserializer { (_, data) => - (for { - json <- parse(new String(data, StandardCharsets.UTF_8)): Either[Error, Json] - t <- json.as[T]: Either[Error, T] - } yield - t).fold(error => throw new RuntimeException(s"Deserialization failure: ${error.getMessage}", error), identity _) + def circeJsonDeserializer[T: Decoder]: KafkaDeserializer[T] = + circeJsonDeserializerWithFallback { error => + throw new RuntimeException(s"Deserialization failure: ${error.getMessage}", error) + } + + def circeJsonDeserializerWithFallback[T: Decoder](fallback: Error => T): KafkaDeserializer[T] = deserializer { (_, data) => + parse(new String(data, StandardCharsets.UTF_8)) + .flatMap(_.as[T]) + .fold(fallback, identity _) } + } diff --git a/circe/src/test/scala/com/ovoenergy/kafka/serialization/circe/CirceSerializationSpec.scala b/circe/src/test/scala/com/ovoenergy/kafka/serialization/circe/CirceSerializationSpec.scala index 2bd31a9..c5442d0 100644 --- a/circe/src/test/scala/com/ovoenergy/kafka/serialization/circe/CirceSerializationSpec.scala +++ b/circe/src/test/scala/com/ovoenergy/kafka/serialization/circe/CirceSerializationSpec.scala @@ -11,7 +11,7 @@ import io.circe.syntax._ class CirceSerializationSpec extends UnitSpec with CirceSerialization { "CirceSerialization" when { - "serializing" should { + "circeJsonSerializer" should { "write the Json body" in forAll { event: Event => val serializer = circeJsonSerializer[Event] val bytes = serializer.serialize("Does not matter", event) @@ -20,15 +20,31 @@ class CirceSerializationSpec extends UnitSpec with CirceSerialization { } } - "deserializing" should { + "circeJsonDeserializer" should { "parse the json" in forAll { event: Event => val jsonBytes = event.asJson.noSpaces.getBytes(UTF_8) val deserializer = circeJsonDeserializer[Event] + val deserialized = deserializer.deserialize("does not matter", jsonBytes) + + deserialized shouldBe event + } + } + "circeJsonDeserializerWithFallback" should { + "parse the json" in forAll { event: Event => + val jsonBytes = event.asJson.noSpaces.getBytes(UTF_8) + val deserializer = circeJsonDeserializerWithFallback[Event](_ => Event("", "")) val deserialized = deserializer.deserialize("does not matter", jsonBytes) deserialized shouldBe event } + "execute fallback function in case of failure" in forAll { event: Event => + val jsonBytes = "{}".getBytes(UTF_8) + val deserializer = circeJsonDeserializerWithFallback[Event](_ => Event("", "")) + val deserialized = deserializer.deserialize("does not matter", jsonBytes) + + deserialized shouldBe Event("", "") + } } }