-
Notifications
You must be signed in to change notification settings - Fork 5
/
deserializers.clj
42 lines (37 loc) · 1.58 KB
/
deserializers.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
(ns kafka-avro-confluent.deserializers
"Avro deserializers that fetch schemas from the Confluent Schema Registry.
They all implement org.apache.kafka.common.serialization.Deserializer"
(:require [abracad.avro :as avro]
[kafka-avro-confluent.magic :as magic]
[kafka-avro-confluent.schema-registry-client :as registry])
(:import java.nio.ByteBuffer
org.apache.kafka.common.serialization.Deserializer))
(defn #^"[B" byte-buffer->bytes
[^ByteBuffer buffer]
(let [array (byte-array (.remaining buffer))]
(.get buffer array)
array))
(defn- -deserialize
[schema-registry data]
(when data
(let [buffer (ByteBuffer/wrap data)
magic (.get buffer)
_ (assert (= magic/magic magic) (str "Found different magic byte: " magic))
schema-id (.getInt buffer)
schema (registry/get-schema-by-id schema-registry schema-id)]
(avro/decode schema (byte-buffer->bytes buffer)))))
(deftype AvroDeserializer [schema-registry]
Deserializer
(configure [_ _ _])
(deserialize [_ _ data] (-deserialize schema-registry data))
(close [_]))
(defn ->avro-deserializer
"Avro deserializer for Apache Kafka using Confluent's Schema Registry.
Use for deserializing Kafka keys and values.
See https://avro.apache.org/
See http://docs.confluent.io/current/schema-registry/docs
See https://github.com/damballa/abracad"
;; FIXME https://github.com/miner/eastwood#wrong-tag---an-incorrect-type-tag
^kafka_avro_confluent.deserializers.AvroDeserializer
[schema-registry]
(AvroDeserializer. schema-registry))