-
Notifications
You must be signed in to change notification settings - Fork 28
/
dataset.clj
78 lines (65 loc) · 2.6 KB
/
dataset.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
(ns zero-one.geni.dataset
(:require
[zero-one.geni.interop :as interop]
[zero-one.geni.utils :refer [vector-of-numbers?]])
(:import
(org.apache.spark.sql RowFactory)
(org.apache.spark.sql.types ArrayType DataTypes)
(org.apache.spark.ml.linalg VectorUDT)))
(defn ->row [coll]
(RowFactory/create (into-array Object (map interop/->scala-coll coll))))
(defn ->java-list [coll]
(java.util.ArrayList. coll))
(def java-type->spark-type
{java.lang.Boolean DataTypes/BooleanType
java.lang.Byte DataTypes/ByteType
java.lang.Double DataTypes/DoubleType
java.lang.Float DataTypes/FloatType
java.lang.Integer DataTypes/IntegerType
java.lang.Long DataTypes/LongType
java.lang.Short DataTypes/ShortType
java.lang.String DataTypes/StringType
java.sql.Timestamp DataTypes/TimestampType
java.util.Date DataTypes/DateType
nil DataTypes/NullType})
(defn infer-spark-type [value]
(cond
(vector-of-numbers? value) (VectorUDT.)
(coll? value) (ArrayType. (infer-spark-type (first value)) true)
:else (get java-type->spark-type (type value) DataTypes/BinaryType)))
(defn infer-struct-field [col-name value]
(let [spark-type (infer-spark-type value)]
(DataTypes/createStructField col-name spark-type true)))
(defn infer-schema [col-names values]
(DataTypes/createStructType
(mapv infer-struct-field col-names values)))
(defn first-non-nil [values]
(first (filter identity values)))
(defn transpose [xs]
(apply map list xs))
(defn table->dataset [spark table col-names]
(let [col-names (map name col-names)
values (map first-non-nil (transpose table))
rows (->java-list (map ->row table))
schema (infer-schema col-names values)]
(.createDataFrame spark rows schema)))
(defn map->dataset [spark map-of-values]
(let [table (transpose (vals map-of-values))
col-names (keys map-of-values)]
(table->dataset spark table col-names)))
(defn conj-record [map-of-values record]
(let [col-names (keys map-of-values)]
(reduce
(fn [acc-map col-name]
(update acc-map col-name #(conj % (get record col-name))))
map-of-values
col-names)))
(defn records->dataset [spark records]
(let [col-names (-> (map keys records) flatten distinct)
map-of-values (reduce
conj-record
(zipmap col-names (repeat []))
records)]
(map->dataset spark map-of-values)))
(comment
true)