-
Notifications
You must be signed in to change notification settings - Fork 28
/
core.clj
430 lines (411 loc) · 8.04 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
(ns zero-one.geni.core
(:refer-clojure :exclude [*
+
-
/
<
<=
>
>=
alias
cast
concat
count
distinct
drop
empty?
filter
first
flatten
group-by
hash
last
map
max
min
mod
not
partition-by
rand
remove
replace
reverse
second
sequence
shuffle
sort
struct
take
when])
(:require
[potemkin :refer [import-vars]]
[zero-one.geni.column]
[zero-one.geni.dataset]
[zero-one.geni.data-sources]
[zero-one.geni.interop :as interop]
[zero-one.geni.sql]
[zero-one.geni.window])
(:import
(org.apache.spark.sql Dataset
RelationalGroupedDataset
SparkSession
functions)))
(import-vars
[zero-one.geni.column
col
->col-array
->column])
(import-vars
[zero-one.geni.sql
&&
*
+
-
->date-col
->timestamp-col
->utc-timestamp
/
<
<=
===
>
>=
abs
acos
add-months
approx-count-distinct
array
array-contains
array-distinct
array-except
array-intersect
array-join
array-max
array-min
array-position
array-remove
array-repeat
array-sort
array-union
arrays-overlap
arrays-zip
asc
asc-nulls-first
asc-nulls-last
ascii
asin
atan
atan2
base64
between
bin
bitwise-not
broadcast
bround
cast
cbrt
ceil
collect-list
collect-set
concat
concat-ws
contains
conv
corr
cos
cosh
count-distinct
covar
covar-pop
covar-samp
crc32
cube-root
cume-dist
current-date
current-timestamp
date-add
date-diff
date-format
date-sub
date-trunc
datediff
day-of-month
day-of-week
day-of-year
decode
degrees
dense-rank
desc
desc-nulls-first
desc-nulls-last
element-at
encode
ends-with
exp
explode
expr
factorial
flatten
floor
format-number
format-string
from-unixtime
greatest
grouping
hash
hex
hour
hypot
initcap
input-file-name
instr
isin
kurtosis
lag
last
last-day
lead
least
length
levenshtein
like
lit
locate
log
log1p
log2
lower
lpad
ltrim
md5
minute
mod
monotonically-increasing-id
month
months-between
nan?
nanvl
negate
next-day
not
ntile
null-count
null-rate
null?
percent-rank
pi
pmod
posexplode
posexplode-outer
pow
quarter
radians
rand
randn
rank
regexp-extract
regexp-replace
reverse
rint
rlike
round
row-number
rpad
rtrim
second
sequence
sha1
sha2
shift-left
shift-right
shift-right-unsigned
shuffle
signum
sin
sinh
size
skewness
slice
sort-array
spark-partition-id
split
sqr
sqrt
starts-with
stddev
stddev-pop
stddev-samp
struct
substring
sum-distinct
tan
tanh
to-date
to-timestamp
to-utc-timestamp
translate
trim
unbase64
unhex
unix-timestamp
upper
var-pop
var-samp
variance
week-of-year
when
year
||])
(import-vars
[zero-one.geni.dataset
->row
agg
agg-all
approx-quantile
cache
col-regex
collect
collect-col
collect-vals
column-names
columns
cross-join
cube
describe
distinct
drop
drop-duplicates
drop-na
dtypes
empty?
except
except-all
explain
fill-na
filter
first-vals
group-by
infer-schema
infer-struct-field
intersect
intersect-all
is-empty
is-local
java-type->spark-type
join
join-with
limit
local?
map->dataset
order-by
partitions
persist
pivot
print-schema
random-split
records->dataset
remove
rename-columns
repartition
repartition-by-range
replace
rollup
sample
select
select-expr
show
show-vertical
sort
sort-within-partitions
spark-session
sql-context
summary
table->dataset
take
take-vals
union
union-by-name
where
with-column
with-column-renamed])
(import-vars
[zero-one.geni.window
over
window])
(import-vars
[zero-one.geni.data-sources
read-csv!
read-json!
read-libsvm!
read-parquet!
read-text!
write-csv!
write-json!
write-libsvm!
write-parquet!
write-text!])
(defmulti as (fn [head & _] (class head)))
(defmethod as :default [expr new-name] (.as (->column expr) new-name))
(defmethod as Dataset [dataframe new-name] (.as dataframe new-name))
(def alias as)
(defmulti count class)
(defmethod count :default [expr] (functions/count expr))
(defmethod count Dataset [dataset] (.count dataset))
(defmethod count RelationalGroupedDataset [grouped] (.count grouped))
(defmulti mean (fn [head & _] (class head)))
(defmethod mean :default [expr & _] (functions/mean expr))
(defmethod mean RelationalGroupedDataset
[grouped & col-names] (.mean grouped (interop/->scala-seq col-names)))
(def avg mean)
(defmulti max (fn [head & _] (class head)))
(defmethod max :default [expr] (functions/max expr))
(defmethod max RelationalGroupedDataset
[grouped & col-names] (.max grouped (interop/->scala-seq col-names)))
(defmulti min (fn [head & _] (class head)))
(defmethod min :default [expr] (functions/min expr))
(defmethod min RelationalGroupedDataset
[grouped & col-names] (.min grouped (interop/->scala-seq col-names)))
(defmulti sum (fn [head & _] (class head)))
(defmethod sum :default [expr] (functions/sum expr))
(defmethod sum RelationalGroupedDataset
[grouped & col-names] (.sum grouped (interop/->scala-seq col-names)))
(defmulti coalesce (fn [head & _] (class head)))
(defmethod coalesce Dataset [dataframe n-partitions]
(.coalesce dataframe n-partitions))
(defmethod coalesce :default [& exprs]
(functions/coalesce (->col-array exprs)))
(defmulti first class)
(defmethod first Dataset [dataframe]
(-> dataframe (zero-one.geni.dataset/take 1) clojure.core/first))
(defmethod first :default [expr] (functions/first (->column expr)))
(defn create-spark-session [{:keys [app-name master configs log-level]
:or {app-name "Geni App"
master "local[*]"
configs {}
log-level "ERROR"}}]
(let [unconfigured (.. (SparkSession/builder)
(appName app-name)
(master master))
configured (reduce
(fn [s [k v]] (.config s k v))
unconfigured
configs)
session (.getOrCreate configured)
context (.sparkContext session)]
(.setLogLevel context log-level)
session))
(comment
(require '[zero-one.geni.test-resources :refer [spark melbourne-df]])
(-> melbourne-df count)
(-> melbourne-df print-schema)
(require '[midje.repl :refer [autotest]])
(autotest :filter (complement :slow))
;; TODO: add remove (i.e. filter not)
(require '[clojure.reflect :as r])
(->> (r/reflect temp)
:members
;(clojure.core/filter #(= (:name %) 'approxQuantile))
;(mapv :parameter-types)
(mapv :name)
pprint)
0)