Skip to content

Commit

Permalink
feat: add an avro serde endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
jgiovaresco committed Apr 13, 2023
1 parent 278d8e0 commit c1a6d30
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 41 deletions.
9 changes: 9 additions & 0 deletions app/src/main/kotlin/io/apim/samples/avro/JsonSerDe.kt
@@ -1,7 +1,10 @@
package io.apim.samples.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.DatumReader
import org.apache.avro.io.DecoderFactory
import org.apache.avro.io.EncoderFactory
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
Expand All @@ -19,4 +22,10 @@ class JsonSerDe(private val schema: Schema) {

return output.toString(StandardCharsets.UTF_8)
}

fun deserialize(json: String): Any {
val decoder = DecoderFactory.get().jsonDecoder(schema, json)
val reader: DatumReader<Any> = GenericDatumReader(schema)
return reader.read(null, decoder)
}
}
14 changes: 0 additions & 14 deletions app/src/main/kotlin/io/apim/samples/rest/EchoHandler.kt
Expand Up @@ -6,7 +6,6 @@ import io.vertx.core.json.JsonObject
import io.vertx.ext.web.impl.ParsableMIMEValue
import io.vertx.kotlin.core.json.json
import io.vertx.kotlin.core.json.obj
import io.vertx.rxjava3.core.MultiMap
import io.vertx.rxjava3.ext.web.ParsedHeaderValues
import io.vertx.rxjava3.ext.web.RequestBody
import io.vertx.rxjava3.ext.web.RoutingContext
Expand Down Expand Up @@ -67,16 +66,3 @@ fun handleBody(body: RequestBody, headers: ParsedHeaderValues): JsonObject {
)
}
}

/** Transform a MultiMap into a simple map. Multiple values are joined in a string separated with ; */
fun MultiMap.toSimpleMap() = this.entries()
.groupBy { it.key }
.mapValues { it.value.joinToString(";") { h -> h.value } }

fun ParsableMIMEValue.isText(): Boolean {
return this.component() == "text"
}

fun ParsableMIMEValue.isJson(): Boolean {
return this.component() == "application" && this.subComponent().contains("json")
}
21 changes: 21 additions & 0 deletions app/src/main/kotlin/io/apim/samples/rest/RequestHelper.kt
@@ -0,0 +1,21 @@
package io.apim.samples.rest

import io.vertx.ext.web.impl.ParsableMIMEValue
import io.vertx.rxjava3.core.MultiMap

/** Transform a MultiMap into a simple map. Multiple values are joined in a string separated with ; */
fun MultiMap.toSimpleMap() = this.entries()
.groupBy { it.key }
.mapValues { it.value.joinToString(";") { h -> h.value } }

fun ParsableMIMEValue.isText(): Boolean {
return this.component() == "text"
}

fun ParsableMIMEValue.isJson(): Boolean {
return this.component() == "application" && this.subComponent().contains("json")
}

fun ParsableMIMEValue.isAvro(): Boolean {
return this.component() == "avro" || this.subComponent().contains("avro")
}
15 changes: 15 additions & 0 deletions app/src/main/kotlin/io/apim/samples/rest/ResponseHelper.kt
@@ -0,0 +1,15 @@
package io.apim.samples.rest

import io.vertx.kotlin.core.json.obj
import io.vertx.rxjava3.ext.web.RoutingContext

fun RoutingContext.sendError(statusCode: Int, title: String, detail: String? = null) {
this.response().statusCode = statusCode
this.end(io.vertx.kotlin.core.json.json {
obj(
"title" to title,
"detail" to detail
)
}
.toString()).subscribe()
}
Expand Up @@ -2,6 +2,7 @@ package io.apim.samples.rest

import io.apim.samples.httpPort
import io.apim.samples.rest.avro.avroGeneratorHandler
import io.apim.samples.rest.avro.avroSerDeHandler
import io.reactivex.rxjava3.core.Completable
import io.vertx.rxjava3.config.ConfigRetriever
import io.vertx.rxjava3.core.AbstractVerticle
Expand Down Expand Up @@ -40,6 +41,7 @@ class RestServerVerticle(
router.route().handler(BodyHandler.create())
router.route("/echo").handler(::echoHandler)
router.route("/avro/generate").handler(::avroGeneratorHandler)
router.route("/avro/serde").handler(::avroSerDeHandler)
router.route("/grpc*").handler(::protobufFileHandler)
router.route("/health*").handler(HealthCheckHandler.createWithHealthChecks(healthChecks))
router
Expand Down
Expand Up @@ -2,17 +2,15 @@ package io.apim.samples.rest.avro

import io.apim.samples.avro.AvroSerDeFactoryImpl
import io.apim.samples.avro.JsonSerDe
import io.apim.samples.avro.SerializationFormat
import io.apim.samples.avro.generate
import io.apim.samples.rest.isJson
import io.apim.samples.rest.sendError
import io.vertx.core.http.HttpHeaders
import io.vertx.ext.web.impl.ParsableMIMEValue
import io.vertx.kotlin.core.json.obj
import io.vertx.rxjava3.core.buffer.Buffer
import io.vertx.rxjava3.ext.web.RoutingContext
import org.apache.avro.Schema
import org.apache.avro.SchemaParseException
import java.util.*

val serdeFactory = AvroSerDeFactoryImpl()

Expand All @@ -31,19 +29,9 @@ fun avroGeneratorHandler(ctx: RoutingContext) {
generate(ctx)
}

private fun getOutputFormat(ctx: RoutingContext): OutputFormat {
val output = ctx.queryParam("output").elementAtOrNull(0) ?: "avro"
try {
return OutputFormat.valueOf(output.uppercase(Locale.getDefault()))
} catch (e: IllegalArgumentException) {
ctx.sendError(400, "Invalid output format", "Valid values are: avro, json")
throw e
}
}

private fun generate(ctx: RoutingContext) {
try {
val output = getOutputFormat(ctx)
val output = ctx.getOutputFormatFromQueryParam()
val schema = Schema.Parser().parse(ctx.body().asString())

when (output) {
Expand All @@ -59,8 +47,7 @@ private fun generate(ctx: RoutingContext) {
private fun generateAvro(schema: Schema, ctx: RoutingContext) {
val data = generate(schema)

val format = ctx.queryParam("format").elementAtOrNull(0)?.let { SerializationFormat.valueOf(it) }
?: SerializationFormat.CONFLUENT
val format = ctx.getSerializationFormatFromQueryParam()
val serde = serdeFactory.new(schema, format)

ctx.response().statusCode = 200
Expand All @@ -76,14 +63,3 @@ private fun generateJson(schema: Schema, ctx: RoutingContext) {
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
ctx.end(serde.serialize(data)).subscribe()
}

private fun RoutingContext.sendError(statusCode: Int, title: String, detail: String? = null) {
this.response().statusCode = statusCode
this.end(io.vertx.kotlin.core.json.json {
obj(
"title" to title,
"detail" to detail
)
}
.toString()).subscribe()
}
43 changes: 43 additions & 0 deletions app/src/main/kotlin/io/apim/samples/rest/avro/AvroRequestHelper.kt
@@ -0,0 +1,43 @@
package io.apim.samples.rest.avro

import io.apim.samples.avro.SerializationFormat
import io.apim.samples.rest.sendError
import io.vertx.rxjava3.ext.web.RoutingContext
import org.apache.avro.Schema
import org.apache.avro.SchemaParseException
import java.util.*

fun RoutingContext.getOutputFormatFromQueryParam(param: String = "output"): OutputFormat {
val output = queryParam(param).elementAtOrNull(0) ?: "avro"
try {
return OutputFormat.valueOf(output.uppercase(Locale.getDefault()))
} catch (e: IllegalArgumentException) {
sendError(400, "Invalid $param format", "Valid values are: ${OutputFormat.values().joinToString(", ") { it.name.lowercase() }}")
throw e
}
}

fun RoutingContext.getSerializationFormatFromQueryParam(param: String = "format"): SerializationFormat {
val format = queryParam(param).elementAtOrNull(0) ?: SerializationFormat.CONFLUENT.name
try {
return SerializationFormat.valueOf(format.uppercase(Locale.getDefault()))
} catch (e: IllegalArgumentException) {
sendError(400, "Invalid $param", "Valid values are: ${SerializationFormat.values().joinToString(", ") { it.name.lowercase() }}")
throw e
}
}

fun RoutingContext.getSchemaFromHeader(header: String = "X-Avro-Schema"): Schema {
val schemaString = request().getHeader(header)
if (schemaString == null) {
sendError(400, "Avro schema required in $header header")
throw IllegalArgumentException("Avro schema required in $header header")
}

return try {
Schema.Parser().parse(schemaString)
} catch (e: SchemaParseException) {
sendError(400, "Invalid avro schema", e.message)
throw e
}
}
37 changes: 37 additions & 0 deletions app/src/main/kotlin/io/apim/samples/rest/avro/AvroSerDeHandler.kt
@@ -0,0 +1,37 @@
package io.apim.samples.rest.avro

import io.apim.samples.avro.JsonSerDe
import io.apim.samples.rest.isAvro
import io.apim.samples.rest.isJson
import io.apim.samples.rest.sendError
import io.vertx.core.http.HttpHeaders
import io.vertx.ext.web.impl.ParsableMIMEValue
import io.vertx.rxjava3.core.buffer.Buffer
import io.vertx.rxjava3.ext.web.RoutingContext

fun avroSerDeHandler(ctx: RoutingContext) {
val contentType = (ctx.parsedHeaders().delegate.contentType() as ParsableMIMEValue).forceParse()

val schema = ctx.getSchemaFromHeader("X-Avro-Schema")
val jsonSerDe = JsonSerDe(schema)
val avroSerDe = serdeFactory.new(schema, ctx.getSerializationFormatFromQueryParam())

if(contentType.isJson()) {
val data = jsonSerDe.deserialize(ctx.body().asString())
ctx.response().statusCode = 200
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, "avro/binary")
ctx.end(Buffer.buffer(avroSerDe.serialize(data))).subscribe()
return
}

if(contentType.isAvro()) {
val data = avroSerDe.deserialize(ctx.body().buffer().bytes)
ctx.response().statusCode = 200
ctx.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
ctx.end(Buffer.buffer(jsonSerDe.serialize(data))).subscribe()
return
}

ctx.sendError(400, "Unsupported content type")
return
}

0 comments on commit c1a6d30

Please sign in to comment.