/
bq.clj
278 lines (252 loc) · 11.8 KB
/
bq.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
(ns datasplash.bq
(:require [cheshire.core :as json]
[clojure.java.shell :refer [sh]]
[clojure.string :as str]
[clj-time [coerce :as tc]
[format :as tf]]
[clojure.tools.logging :as log]
[datasplash.core :refer :all])
(:import
[org.codehaus.jackson.map ObjectMapper]
[com.google.api.services.bigquery.model
TableRow TableFieldSchema TableSchema TimePartitioning]
[org.apache.beam.sdk.transforms SerializableFunction]
[org.apache.beam.sdk Pipeline]
[org.apache.beam.sdk.io.gcp.bigquery
BigQueryIO BigQueryIO$Read BigQueryIO$Write
BigQueryIO$Write$WriteDisposition BigQueryIO$Write$SchemaUpdateOption
BigQueryIO$Write$CreateDisposition TableRowJsonCoder TableDestination InsertRetryPolicy
BigQueryIO$Write$Method]
[org.apache.beam.sdk.values PBegin PCollection]))
(defn read-bq-raw
[{:keys [query table standard-sql?] :as options} p]
(let [opts (assoc options :label :read-bq-table-raw)
ptrans (cond
query (.fromQuery (BigQueryIO/readTableRows) query)
table (.from (BigQueryIO/readTableRows) table)
:else (throw (ex-info
"Error with options of read-bq-table, should specify one of :table or :query"
{:options options})))]
(-> p
(cond-> (instance? Pipeline p) (PBegin/in))
(apply-transform
(if (and standard-sql? query)
(.usingStandardSql ptrans)
ptrans)
named-schema opts))))
(defn auto-parse-val
[v]
(cond
(and (string? v) (re-find #"^\d+$" v)) (Long/parseLong v)
:else v))
(defn table-row->clj
([{:keys [auto-parse]} ^TableRow row]
(let [keyset (.keySet row)]
(persistent!
(reduce
(fn [acc k]
(assoc! acc (keyword k)
(let [raw-v (get row k)]
(cond
(instance? java.util.List raw-v) (if (instance? java.util.AbstractMap (first raw-v))
(map #(table-row->clj {:auto-parse auto-parse} %) raw-v)
(map #(if auto-parse (auto-parse-val %) %) raw-v))
(instance? java.util.AbstractMap raw-v) (table-row->clj {:auto-parse auto-parse} raw-v)
:else (if auto-parse (auto-parse-val raw-v) raw-v)))))
(transient {}) keyset))))
([row] (table-row->clj {} row)))
(defn coerce-by-bq-val
[v]
(cond
(instance? java.util.Date v) (try (->> (tc/from-long (.getTime v))
(tf/unparse (tf/formatter "yyyy-MM-dd HH:mm:ss")))
(catch Exception e (log/errorf "error when parsing date %s" v)))
(set? v) (into '() v)
(keyword? v) (name v)
(symbol? v) (name v)
:else v))
(defn clean-name
[s]
(let [test (number? s)]
(-> s
(cond-> test (str))
(name)
(str/replace #"-" "_")
(str/replace #"\?" ""))))
(defn bqize-keys
"Recursively transforms all map keys from strings to keywords."
{:added "1.1"}
[m]
(let [f (fn [[k v]] [(clean-name k) v])]
;; only apply to maps
(clojure.walk/postwalk (fn [x] (if (map? x)
(persistent!
(reduce
(fn [acc [k v]]
(assoc! acc (clean-name k) v))
(transient {}) x))
x))
m)))
(defn ^TableRow clj->table-row
[hmap]
(let [^TableRow row (TableRow.)]
(doseq [[k v] hmap]
(.set row (clean-name k) (coerce-by-bq-val v)))
row))
(defn ^TableRow clj-nested->table-row
[hmap]
(let [clean-map (->> hmap
(clojure.walk/prewalk coerce-by-bq-val)
(bqize-keys))
my-mapper (org.codehaus.jackson.map.ObjectMapper.)
^TableRow row (.readValue my-mapper (json/encode clean-map) TableRow)]
row))
(defn- read-bq-clj-transform
[options]
(let [safe-opts (dissoc options :name)]
(ptransform
:read-bq-to-clj
[pcoll]
(->> pcoll
(read-bq-raw safe-opts)
(dmap (partial table-row->clj safe-opts) safe-opts)))))
(defn read-bq
[options ^Pipeline p]
(let [opts (assoc options :label :read-bq-table)]
(apply-transform p (read-bq-clj-transform opts) base-schema opts)))
(defn- clj->TableFieldSchema
[defs transform-keys]
(for [{:keys [type mode description] field-name :name nested-fields :fields} defs]
(-> (TableFieldSchema.)
(.setName (transform-keys (clean-name field-name)))
(.setType (str/upper-case (name type)))
(cond-> mode (.setMode mode))
(cond-> description (.setDescription description))
(cond-> nested-fields (.setFields (clj->TableFieldSchema nested-fields transform-keys))))))
(defn ^TableSchema ->schema
([defs transform-keys]
(if (instance? TableSchema defs)
defs
(let [fields (clj->TableFieldSchema defs transform-keys)]
(-> (TableSchema.) (.setFields fields)))))
([defs] (->schema defs name)))
(defn ^TimePartitioning ->time-partitioning
[{:keys [type expiration-ms field require-partition-filter]
:or {type :day}}]
(let [tp (doto (TimePartitioning.) (.setType (-> type name .toUpperCase)))]
(cond-> tp
(int? expiration-ms) (.setExpirationMs expiration-ms)
(string? field) (.setField field)
(boolean? require-partition-filter) (.setRequirePartitionFilter require-partition-filter))))
(defn get-bq-table-schema
"Beware, uses bq util to get the schema!"
[table-spec]
(let [{:keys [exit out] :as return} (sh "bq" "--format=json" "show" (name table-spec))]
(if (zero? exit)
(-> (json/decode out true) (:schema) (:fields))
(throw (ex-info (str "Could not get bq table schema for table " table-spec)
{:table table-spec
:bq-return return})))))
(def write-disposition-enum
{:append BigQueryIO$Write$WriteDisposition/WRITE_APPEND
:empty BigQueryIO$Write$WriteDisposition/WRITE_EMPTY
:truncate BigQueryIO$Write$WriteDisposition/WRITE_TRUNCATE})
(def create-disposition-enum
{:if-needed BigQueryIO$Write$CreateDisposition/CREATE_IF_NEEDED
:never BigQueryIO$Write$CreateDisposition/CREATE_NEVER})
(def retry-policy-enum
{:never (InsertRetryPolicy/neverRetry)
:always (InsertRetryPolicy/alwaysRetry)
:retry-transient (InsertRetryPolicy/retryTransientErrors)})
(def write-method-enum
{:default BigQueryIO$Write$Method/DEFAULT
:load BigQueryIO$Write$Method/FILE_LOADS
:streaming BigQueryIO$Write$Method/STREAMING_INSERTS})
(def schema-update-options-enum
{:allow-field-addition BigQueryIO$Write$SchemaUpdateOption/ALLOW_FIELD_ADDITION
:allow-field-relaxation BigQueryIO$Write$SchemaUpdateOption/ALLOW_FIELD_RELAXATION})
(def write-bq-table-schema
(merge
base-schema
{:schema {:docstr "Specifies bq schema."
:action (fn [transform schema] (.withSchema transform (->schema schema)))}
:json-schema {:docstr "Specifies bq schema in json"
:action (fn [transform json-schema] (let [sch (cheshire.core/decode json-schema)
full-sch (if (get sch "fields")
(cheshire.core/encode sch)
(cheshire.core/encode {"fields" sch}))]
(.withJsonSchema transform full-sch)))}
:table-description {:docstr "Specifies table description"
:action (fn [transform description] (.withTableDescription transform description))}
:write-disposition {:docstr "Choose write disposition."
:enum write-disposition-enum
:action (select-enum-option-fn
:write-disposition
write-disposition-enum
(fn [transform enum] (.withWriteDisposition transform enum)))}
:create-disposition {:docstr "Choose create disposition."
:enum create-disposition-enum
:action (select-enum-option-fn
:create-disposition
create-disposition-enum
(fn [transform enum] (.withCreateDisposition transform enum)))}
:write-method {:docstr "Choose write method."
:enum write-method-enum
:action (select-enum-option-fn
:write-method
write-method-enum
(fn [transform enum] (.withMethod transform enum)))}
:without-validation {:docstr "Disables validation until runtime."
:action (fn [transform without-validation]
(if without-validation
(.withoutValidation transform)
transform))}
:ignore-unknown-values {:docstr "Ignores fields which does not match the schema."
:action (fn [transform ignore-unknown-values]
(if ignore-unknown-values
(.ignoreUnknownValues transform)
transform))}
:schema-update-options {:docstr "Include schema update options. (pass in a list of options)"
:enum schema-update-options-enum
:action (select-enum-option-fn-set
:schema-update-options
schema-update-options-enum
(fn [transform enum] (.withSchemaUpdateOptions transform enum)))}
:skip-invalid-rows {:docstr "Skips invalid rows. Only works with :streaming write method."
:action (fn [transform skip-invalid-rows]
(if skip-invalid-rows
(.skipInvalidRows transform)
transform))}
:retry-policy {:docstr "Specify retry policy for failed insert in streaming"
:action (select-enum-option-fn
:retry-policy
retry-policy-enum
(fn [transform retrypolicy] (.withFailedInsertRetryPolicy transform retrypolicy)))}
:time-partitioning {:docstr "Toggles write partitioning for the destination table"
:action (fn [transform opts]
(.withTimePartitioning transform (->time-partitioning opts)))}}))
(defn custom-output-fn [cust-fn]
(sfn (fn [elt]
(let [^String out (cust-fn elt)]
(TableDestination. out nil)))))
(def format-fn (sfn clj-nested->table-row))
(defn write-bq-table-raw
([to options ^PCollection pcoll]
(let [opts (assoc options :label :write-bq-table-raw)]
(apply-transform pcoll (-> (BigQueryIO/write)
(.to to)
(.withFormatFunction format-fn))
write-bq-table-schema opts)))
([to pcoll] (write-bq-table-raw to {} pcoll)))
(defn- write-bq-table-clj-transform
[to options]
(let [safe-opts (dissoc options :name)]
(ptransform
:write-bq-table-from-clj
[^PCollection pcoll]
(write-bq-table-raw to safe-opts pcoll))))
(defn write-bq-table
([to options ^PCollection pcoll]
(let [opts (assoc options :label :write-bq-table)]
(apply-transform pcoll (write-bq-table-clj-transform to opts) named-schema opts)))
([to pcoll] (write-bq-table to {} pcoll)))