-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
decode.clj
107 lines (98 loc) · 4.66 KB
/
decode.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
(ns piotr-yuxuan.slava.decode
"FIXME add cljdoc"
(:require [camel-snake-kebab.core :as csk]
[clojure.string :as str])
(:import (org.apache.avro Schema$Field Schema Schema$MapSchema Schema$RecordSchema Schema$ArraySchema Schema$UnionSchema)
(org.apache.avro.generic GenericData$Record GenericData$Array)))
(defn decoder-name
"FIXME add cljdoc"
[^Schema avro-schema]
(->> avro-schema
(.getType)
str
csk/->kebab-case-string
(conj ["avro"])
(str/join "-")
(keyword "decoder")))
(declare -decoder-fn)
(defn avro-record
"FIXME add cljdoc"
[config ^Schema$RecordSchema reader-schema]
(let [{:keys [record-key-fn]} config
record-key (record-key-fn config reader-schema)
field-decoders (map (fn [^Schema$Field field]
(let [field-name (.name field)
value-decoder (-decoder-fn (assoc config :field-name field-name)
(.schema field))]
(cond (and value-decoder record-key) (fn [m ^GenericData$Record data] (assoc! m (record-key field-name) (value-decoder (.get data field-name))))
value-decoder (fn [m ^GenericData$Record data] (assoc! m field-name (value-decoder (.get data field-name))))
record-key (fn [m ^GenericData$Record data] (assoc! m (record-key field-name) (.get data field-name)))
:else (fn [m ^GenericData$Record data] (assoc! m field-name (.get data field-name))))))
(.getFields reader-schema))]
(fn [data]
(let [m (transient {})]
(doseq [decoder! field-decoders]
(decoder! m data))
(vary-meta
(persistent! m)
assoc
:piotr-yuxuan.slava/type :avro-record
:piotr-yuxuan.slava/reader-schema reader-schema)))))
(defn avro-array
"FIXME add cljdoc"
[config ^Schema$ArraySchema reader-schema]
(when-let [value-decoder (-decoder-fn config (.getElementType reader-schema))]
(fn [^GenericData$Array data]
(vary-meta
(map value-decoder data)
assoc
:piotr-yuxuan.slava/type :avro-array
:piotr-yuxuan.slava/reader-schema reader-schema))))
(defn avro-map
"FIXME add cljdoc"
[config ^Schema$MapSchema reader-schema]
(let [{:decoder/keys [map-key-fn]} config
map-key (map-key-fn config reader-schema)
value-decoder (-decoder-fn
;; Don't proprate field-name any deeper
(dissoc config :field-name)
(.getValueType reader-schema))
meta-wrapper #(vary-meta %
assoc
:piotr-yuxuan.slava/type :avro-map
:piotr-yuxuan.slava/reader-schema reader-schema)]
(cond (and map-key value-decoder) (comp meta-wrapper #(->> % (map (juxt (comp map-key key) (comp value-decoder val))) (into {})))
value-decoder (comp meta-wrapper #(->> % (map (juxt key (comp value-decoder val))) (into {})))
map-key (comp meta-wrapper #(->> % (map (juxt (comp map-key key) val)) (into {})))
:else meta-wrapper)))
(defn avro-union
"FIXME add cljdoc"
[{:keys [generic-concrete-types] :as config} ^Schema$UnionSchema reader-schema]
(let [possible-decoders (->> (.getTypes reader-schema)
(map (juxt decoder-name (partial -decoder-fn config)))
(remove (comp nil? second))
(into {}))
decoded-types (select-keys generic-concrete-types (keys possible-decoders))]
;; If no types in the union need a decode, no need to find some.
(when (seq decoded-types)
(fn [data]
(if-let [found-decoder (->> decoded-types
(some (fn [[avro-type pred]] (when (pred data) avro-type)))
(get possible-decoders))]
(found-decoder data)
;; If the concrete type doesn't need to be decoded, return datum as is.
data)))))
(defn -decoder-fn
"FIXME add cljdoc"
[config ^Schema reader-schema]
(when-let [decoder-fn-fn (get config (decoder-name reader-schema))]
(decoder-fn-fn config reader-schema)))
(def ^{:arglists '([config ^org.apache.avro.Schema reader-schema])
:doc "FIXME add cljdoc"}
;; The assumption is that we won't see a lot of schemas here, so we can build a encoder only once.
decoder-fn
(memoize -decoder-fn))
(defn decode
"FIXME add cljdoc"
[config data ^Schema reader-schema]
((decoder-fn config reader-schema) data))