-
Notifications
You must be signed in to change notification settings - Fork 1
/
udf.clj
93 lines (74 loc) · 2.01 KB
/
udf.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
(ns louna.datasets.udf
(:require louna.state.settings
[louna.datasets.sql :as sql])
(:import (org.apache.spark.sql.api.java UDF1 UDF0 UDF2 UDF3 UDF4)
[org.apache.spark.sql functions]))
;;---------UDF---------------------------------
(defrecord UF0 [f]
UDF0
(call [this]
(f)))
(defrecord UF1 [f]
UDF1
(call [this x]
(f x)))
(defrecord UF2 [f]
UDF2
(call [this x y]
(f x y)))
(defrecord UF3 [f]
UDF3
(call [this x y z]
(f x y z)))
(defrecord UF4 [f]
UDF4
(call [this x y z w]
(f x y z w)))
(defn get-udf-class [f nargs]
(cond
(= nargs 0)
(UF0. f)
(= nargs 1)
(UF1. f)
(= nargs 2)
(UF2. f)
(= nargs 3)
(UF3. f)
(= nargs 4)
(UF4. f)
:else
(do (prn "unknown arity") (System/exit 0))))
(defn call-udf [name & col-names]
(functions/callUDF name (sql/ca col-names)))
(defn register-udf [f nargs rtype]
(let [rtype (if (keyword? rtype)
(get louna.datasets.schema/schema-types rtype)
rtype)
new-udf (get-udf-class f nargs)
name (louna.state.settings/new-udf-name)
- (-> (louna.state.settings/get-session)
(.udf)
(.register name new-udf rtype))]
(partial call-udf name)))
(defmacro defudf [arg1 arg2 arg3 arg4]
(if (vector? arg3)
;;define "spark" function
(let [fname arg1
rtype arg2
args arg3
body arg4]
`(def ~fname (register-udf (fn ~args ~body) ~(count args) ~rtype)))
;;use already defined clojure function
(let [fname arg1
f arg2
nargs arg3
rtype arg4]
`(def ~fname (register-udf ~f ~nargs ~rtype)))))
(defn register-udaf [udaf-object]
(let [name (louna.state.settings/new-udf-name)
- (.register (.udf (louna.state.settings/get-session))
name
udaf-object)]
(partial louna.datasets.udf/call-udf name)))
(defmacro defudaf [fname udaf-object]
`(def ~fname (register-udaf ~udaf-object)))