forked from damballa/abracad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
avro.clj
386 lines (338 loc) · 13.6 KB
/
avro.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
(ns abracad.avro
"Functions for de/serializing data with Avro."
(:refer-clojure :exclude [compare spit slurp])
(:require [clojure.java.io :as io]
[clojure.walk :refer [postwalk]]
[cheshire.core :as json]
[abracad.avro.util :refer [returning mangle unmangle coerce]]
[abracad.avro.conversion :as c])
(:import [java.io
ByteArrayOutputStream EOFException
File InputStream OutputStream]
[clojure.lang Named]
[org.apache.avro Schema Schema$Parser]
[org.apache.avro.file
CodecFactory DataFileWriter DataFileReader DataFileStream SeekableInput
SeekableFileInput SeekableByteArrayInput]
[org.apache.avro.io
DatumReader DatumWriter Decoder DecoderFactory
Encoder EncoderFactory]
[abracad.avro ClojureDatumReader ClojureDatumWriter ClojureData]))
(defn schema?
"True iff `schema` is an Avro `Schema` instance."
[schema] (instance? Schema schema))
(defn ^:private named?
"True iff `x` is something which may be provided as an argument to `name`."
[x] (or (string? x) (instance? Named x)))
(defn ^:private schema-mangle
"Mangle `named` forms and existing schemas."
[form]
(cond (named? form) (-> form name mangle)
(schema? form) (json/parse-string (str form))
:else form))
(defn ^:private clj->json
"Parse Clojure data into a JSON schema."
[schema] (json/generate-string (postwalk schema-mangle schema)))
(defn ^:private codec-for
"Return Avro codec factory for `codec`."
{:tag `CodecFactory}
[codec] (if-not (string? codec) codec (CodecFactory/fromString codec)))
(defprotocol PSeekableInput
"Protocol for coercing to an Avro `SeekableInput`."
(-seekable-input [x opts]
"Attempt to coerce `x` to an Avro `SeekableInput`."))
(defn seekable-input
"Attempt to coerce `x` to an Avro `SeekableInput`."
{:tag `SeekableInput}
([x] (-seekable-input x nil))
([opts x] (-seekable-input x opts)))
(extend-protocol PSeekableInput
(Class/forName "[B") (-seekable-input [x opts] (SeekableByteArrayInput. x))
SeekableInput (-seekable-input [x opts] x)
File (-seekable-input [x opts] (SeekableFileInput. x))
String (-seekable-input [x opts] (seekable-input opts (io/file x))))
(defn ^:private raw-schema?
"True if schema `source` should be parsed as-is."
[source]
(or (instance? InputStream source)
(and (string? source)
(.lookingAt (re-matcher #"[\[\{\"]" source)))))
(defn ^:private parse-schema-raw
[^Schema$Parser parser source]
(if (instance? String source)
(.parse parser ^String source)
(.parse parser ^InputStream source)))
(defn ^:private parse-schema*
{:tag `Schema}
[& sources]
(let [parser (Schema$Parser.)]
(reduce (fn [_ source]
(->> (cond (schema? source) (str source)
(raw-schema? source) source
:else (clj->json source))
(parse-schema-raw parser)))
nil
sources)))
(defn parse-schema
"Parse Avro schemas in `source` and `sources`. Each schema source may be a
JSON string, an input stream containing a JSON schema, a Clojure data structure
which may be converted to a JSON schema, or an already-parsed Avro schema
object. The schema for each subsequent source may refer to the types defined in
the previous schemas. The parsed schema from the final source is returned."
{:tag `Schema}
([source] (if (schema? source) source (parse-schema* source)))
([source & sources] (apply parse-schema* source sources)))
(defn unparse-schema
"Return Avro-normalized Clojure data version of `schema`. If `schema` is not
already a parsed schema, will first normalize and parse it."
[schema] (-> schema parse-schema str (json/parse-string true)))
(defn tuple-schema
"Return Clojure-data Avro schema for record consisting of fields of the
provided `types`, and optionally named `name`."
([types] (-> "abracad.avro.tuple" gensym name (tuple-schema types)))
([name types]
{:name name, :type "record",
:abracad.reader "vector",
:fields (vec (map-indexed (fn [i type]
(merge {:name (str "field" i),
:type type}
(meta type)))
types))}))
(defn ^:private order-ignore
"Update all but the first `n` record-field specifiers `fields` to have an
`:order` of \"ignore\"."
[fields n]
(vec (map-indexed (fn [i field]
(if (< i n)
field
(assoc field :order "ignore")))
fields)))
(defn grouping-schema
"Produce a grouping schema version of record schema `schema` which ignores all
but the first `n` fields when sorting."
[n schema] (-> schema unparse-schema (update-in [:fields] order-ignore n)))
(defn datum-reader
"Return an Avro DatumReader which produces Clojure data structures."
{:tag `ClojureDatumReader}
([] (datum-reader nil))
([schema] (datum-reader schema schema))
([expected actual]
(ClojureDatumReader.
(when-not (nil? expected) (parse-schema expected))
(when-not (nil? actual) (parse-schema actual))
(c/create-clojure-data))))
(defn data-file-reader
"Return an Avro DataFileReader which produces Clojure data structures."
{:tag `DataFileReader}
([source] (data-file-reader nil source))
([expected source]
(DataFileReader/openReader
(seekable-input source) (datum-reader expected))))
(defn data-file-stream
"Return an Avro DataFileStream which produces Clojure data structures."
{:tag `DataFileStream}
([source] (data-file-stream nil source))
([expected source]
(DataFileStream.
(io/input-stream source) (datum-reader expected))))
(defmacro ^:private decoder-factory
"Invoke static methods of default Avro Decoder factory."
[method & args] `(. (DecoderFactory/get) ~method ~@args))
(defn binary-decoder
"Return a binary-encoding decoder for `source`. The `source` may be
an input stream, a byte array, or a vector of `[bytes off len]`."
{:tag `Decoder}
[source]
(if (vector? source)
(let [[source off len] source]
(decoder-factory binaryDecoder source off len nil))
(if (instance? InputStream source)
(decoder-factory binaryDecoder ^InputStream source nil)
(decoder-factory binaryDecoder ^bytes source nil))))
(defn direct-binary-decoder
"Return a non-buffered binary-encoding decoder for `source`."
{:tag `Decoder}
[source] (decoder-factory directBinaryDecoder source nil))
(defn json-decoder
"Return a JSON-encoding decoder for `source` using `schema`."
{:tag `Decoder}
[schema source]
(let [schema (parse-schema schema)]
(if (instance? InputStream source)
(decoder-factory jsonDecoder schema ^InputStream source)
(decoder-factory jsonDecoder schema ^String source))))
(defn decode
"Decode and return one object from `source` using `schema`. The
`source` may be an existing Decoder object or anything on which
a (binary-encoding) Decoder may be opened."
[schema source]
(let [reader (coerce DatumReader datum-reader schema)
decoder (coerce Decoder binary-decoder source)]
(.read reader nil decoder)))
(defn decode-seq
"As per `decode`, but decode and return a sequence of all objects
decoded serially from `source`."
[schema source]
(let [reader (coerce DatumReader datum-reader schema)
decoder (coerce Decoder binary-decoder source)]
((fn step []
(lazy-seq
(try
(let [record (.read reader nil decoder)]
(cons record (step)))
(catch EOFException _ nil)))))))
(defn datum-writer
"Return an Avro DatumWriter which consumes Clojure data structures."
{:tag `ClojureDatumWriter}
([] (datum-writer nil))
([schema]
(ClojureDatumWriter.
(when-not (nil? schema) (parse-schema schema))
(c/create-clojure-data))))
(defn data-file-writer
"Return an Avro DataFileWriter which consumes Clojure data structures."
{:tag `DataFileWriter}
([] (DataFileWriter. (datum-writer)))
([sink]
(let [^DataFileWriter dfw (data-file-writer)]
(doto dfw (.appendTo (io/file sink)))))
([schema sink]
(data-file-writer nil schema sink))
([codec schema sink]
(let [^DataFileWriter dfw (data-file-writer)
sink (coerce OutputStream io/output-stream sink)
schema (parse-schema schema)]
(when codec (.setCodec dfw (codec-for codec)))
(.create dfw schema sink)
dfw)))
(defmacro ^:private encoder-factory
"Invoke static methods of default Avro Encoder factory."
[method & args] `(. (EncoderFactory/get) ~method ~@args))
(defn binary-encoder
"Return a binary-encoding encoder for `sink`."
{:tag `Encoder}
[sink] (encoder-factory binaryEncoder sink nil))
(defn direct-binary-encoder
"Return an unbuffered binary-encoding encoder for `sink`."
{:tag `Encoder}
[sink] (encoder-factory directBinaryEncoder sink nil))
(defn json-encoder
"Return a JSON-encoding encoder for `sink` using `schema`."
{:tag `Encoder}
[schema sink]
(let [schema (parse-schema schema)]
(encoder-factory jsonEncoder schema ^OutputStream sink)))
(defn encode
"Serially encode each record in `records` to `sink` using `schema`.
The `sink` may be an existing Encoder object, or anything on which
a (binary-encoding) Encoder may be opened."
[schema sink & records]
(let [writer (coerce DatumWriter datum-writer schema)
encoder (coerce Encoder binary-encoder sink)]
(doseq [record records]
(.write writer record encoder))
(.flush encoder)))
(defn binary-encoded
"Return bytes produced by binary-encoding `records` with `schema`
via `encode`."
[schema & records]
(with-open [out (ByteArrayOutputStream.)]
(apply encode schema out records)
(.toByteArray out)))
(defn json-encoded
"Return string produced by JSON-encoding `records` with `schema`
via `encode`."
[schema & records]
(with-open [out (ByteArrayOutputStream.)]
(apply encode schema (json-encoder schema out) records)
(String. (.toByteArray out))))
(defn compare
"Compare `x` and `y` according to `schema`."
[schema x y]
(let [schema (parse-schema schema)]
(.compare (ClojureData/withoutConversions) x y schema)))
(defn spit
"Like core `spit`, but emits `content` to `f` as Avro with `schema`."
[schema f content & opts]
(let [codec (get opts :codec "snappy")]
(with-open [dfw (data-file-writer codec schema f)]
(.append dfw content))))
(defn slurp
"Like core `slurp`, but reads Avro content from `f`."
[f & opts]
(with-open [dfr (data-file-reader f)]
(.next dfr)))
(defn mspit
"Like Avro `spit`, but emits `content` as a sequence of records."
[schema f content & opts]
(let [codec (get opts :codec "snappy")]
(with-open [dfw (data-file-writer codec schema f)]
(doseq [record content]
(.append dfw record)))))
(defn mslurp
"Like Avro `slurp`, but produces a sequence of records."
[f & opts]
(with-open [dfr (data-file-reader f)]
(into [] dfr)))
(defprotocol AvroSerializable
"Protocol for customizing Avro serialization."
(schema-name [this]
"Full package-/namespace-qualified name for Avro purposes.")
(field-get [this field]
"Value of keyword `field` for Avro serialization of object.")
(field-list [this]
"List of keyword fields this object provides."))
;; The following implementation is pretty much just copy-pasted
;; directly from the Clojure *data-readers* implementation.
(def ^:dynamic *avro-readers*
"Like `clojure.core/*data-readers*`, but for reading Avro records.
Initializes with merged contents of `avro_readers.clj` resources.
Whenever an Avro record is deserialized, the Clojure datum reader will
check this map for a key matching the Avro record name represented as
a namespace-qualified symbol. When found, the datum reader will
invoke the associated value as a function on the deserialized record's
fields as positional arguments. The datum reader will use the return
value as the deserialization result."
{})
(defn ^:private avro-reader-urls
[] (enumeration-seq
(-> (Thread/currentThread) .getContextClassLoader
(.getResources "avro_readers.clj"))))
(defn ^:private avro-reader-var
[sym] (intern (create-ns (symbol (namespace sym))) (symbol (name sym))))
(defn ^:private load-avro-reader-file
[mappings ^java.net.URL url]
(with-open [rdr (clojure.lang.LineNumberingPushbackReader.
(java.io.InputStreamReader.
(.openStream url) "UTF-8"))]
(binding [*file* (.getFile url)]
(let [new-mappings (read rdr false nil)]
(when (not (map? new-mappings))
(throw (ex-info (str "Not a valid avro-reader map")
{:url url})))
(reduce
(fn [m [k v]]
(when (not (symbol? k))
(throw (ex-info (str "Invalid form in avro-reader file")
{:url url
:form k})))
(let [v-var (avro-reader-var v)]
(when (and (contains? mappings k)
(not= (mappings k) v-var))
(throw (ex-info "Conflicting avro-reader mapping"
{:url url
:conflict k
:mappings m})))
(assoc m k v-var)))
mappings
new-mappings)))))
(defn ^:private load-avro-readers
[] (alter-var-root #'*avro-readers*
(fn [mappings]
(reduce load-avro-reader-file
mappings (avro-reader-urls)))))
(try
(load-avro-readers)
(catch Throwable t
(.printStackTrace t)
(throw t)))