-
Notifications
You must be signed in to change notification settings - Fork 18
/
join_concat_ds.clj
183 lines (158 loc) · 7.8 KB
/
join_concat_ds.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
(ns tablecloth.api.join-concat-ds
(:refer-clojure :exclude [concat])
(:require [tech.v3.dataset :as ds]
[tech.v3.dataset.join :as j]
[tech.v3.dataset.column :as col]
[clojure.set :as s]
[tablecloth.api.dataset :refer [dataset?]]
[tablecloth.api.rows :refer [select-rows drop-rows]]
[tablecloth.api.join-separate :refer [join-columns]]
[tablecloth.api.columns :refer [drop-columns select-columns]]
[tablecloth.api.utils :refer [column-names grouped? process-group-data]])
(:import [org.roaringbitmap RoaringBitmap]))
;; joins
(defn- multi-join
[ds-left ds-right join-fn cols-left cols-right {:keys [hashing]
:or {hashing identity} :as options}]
(let [join-column-name (gensym "^___join_column_hash")
dsl (join-columns ds-left join-column-name cols-left (assoc options
:result-type hashing
:drop-columns? false))
dsr (join-columns ds-right join-column-name cols-right (assoc options
:result-type hashing
:drop-columns? false))
joined-ds (join-fn join-column-name dsl dsr options)]
(-> joined-ds
(ds/drop-columns [join-column-name (-> joined-ds
(meta)
:right-column-names
(get join-column-name))]))))
(defn- resolve-join-column-names
[ds-left ds-right columns-selector]
(if (map? columns-selector)
(-> columns-selector
(update :left (partial column-names ds-left))
(update :right (partial column-names ds-right)))
(let [left (column-names ds-left columns-selector)
right (column-names ds-right columns-selector)]
{:left left :right right})))
(defmacro make-join-fns
[join-fns-list]
`(do
~@(for [[n impl] join-fns-list]
`(defn ~n
([~'ds-left ~'ds-right ~'columns-selector] (~n ~'ds-left ~'ds-right ~'columns-selector nil))
([~'ds-left ~'ds-right ~'columns-selector ~'options]
(let [cols# (resolve-join-column-names ~'ds-left ~'ds-right ~'columns-selector)
cols-left# (:left cols#)
cols-right# (:right cols#)
opts# (or ~'options {})
hashing# (:hashing opts#)]
(if (and (= 1 (count cols-left#)) (not hashing#))
(~impl [(first cols-left#) (first cols-right#)] ~'ds-left ~'ds-right opts#)
(multi-join ~'ds-left ~'ds-right ~impl cols-left# cols-right# opts#))))))))
(make-join-fns [[left-join j/left-join]
[right-join j/right-join]
[inner-join j/inner-join]
[asof-join j/left-join-asof]])
(defn full-join
"Join keeping all rows"
([ds-left ds-right columns-selector] (full-join ds-left ds-right columns-selector nil))
([ds-left ds-right columns-selector options]
(let [rj (right-join ds-left ds-right columns-selector options)]
(-> (->> rj
(ds/concat (left-join ds-left ds-right columns-selector options)))
(ds/unique-by identity)
(with-meta (assoc (meta rj) :name "full-join"))))))
(defn- anti-semi-join-fn
([nm rows-fn join-column-name dsl dsr] (anti-semi-join-fn nm rows-fn join-column-name dsl dsr nil))
([nm rows-fn join-column-name dsl dsr options]
(let [lj (j/left-join join-column-name dsl dsr options)
right-columns (:right-column-names (meta lj))
^RoaringBitmap missing-column (col/missing (lj (right-columns (if (vector? join-column-name)
(second join-column-name)
join-column-name))))
^RoaringBitmap left-column (col/missing (lj (if (vector? join-column-name)
(first join-column-name)
join-column-name)))]
(-> lj
(rows-fn (RoaringBitmap/andNot missing-column left-column))
(drop-columns (vals right-columns))
(ds/unique-by identity)
(vary-meta assoc :name nm)))))
(make-join-fns [[anti-join (partial anti-semi-join-fn "anti-join" select-rows)]
[semi-join (partial anti-semi-join-fn "semi-join" drop-rows)]])
(defn cross-join
"Cross product from selected columns"
([ds-left ds-right] (cross-join ds-left ds-right :all))
([ds-left ds-right columns-selector] (cross-join ds-left ds-right columns-selector nil))
([ds-left ds-right columns-selector {:keys [unique?] :or {unique? false} :as options}]
(let [{:keys [left right]} (resolve-join-column-names ds-left ds-right columns-selector)
dl (select-columns ds-left left)
dr (select-columns ds-right right)]
(j/pd-merge (if unique? (ds/unique-by dl identity) dl)
(if unique? (ds/unique-by dr identity) dr)
(merge options {:how :cross})))))
(defn expand
"TidyR expand.
Creates all possible combinations of selected columns."
[ds columns-selector & r]
(if (grouped? ds)
(process-group-data ds #(apply expand % columns-selector r) true)
(let [ds1 (if (dataset? columns-selector)
columns-selector
(ds/unique-by (select-columns ds (column-names ds columns-selector)) identity))]
(if-not (seq r)
ds1
(cross-join ds1 (apply expand ds r))))))
(defn complete
"TidyR complete.
Fills a dataset with all possible combinations of selected columns. When a given combination doesn't exist, missing values are created."
[ds columns-selector & r]
(if (grouped? ds)
(process-group-data ds #(apply complete % columns-selector r) true)
(let [expanded (apply expand ds columns-selector r)
ecnames (column-names expanded)
lj (left-join expanded ds ecnames)]
(drop-columns lj (vals (select-keys (:right-column-names (meta lj)) ecnames))))))
;; set operations
(defn intersect
([ds-left ds-right] (intersect ds-left ds-right nil))
([ds-left ds-right options]
(-> (semi-join ds-left ds-right (distinct (clojure.core/concat (ds/column-names ds-left)
(ds/column-names ds-right))) options)
(vary-meta assoc :name "intersection"))))
(defn difference
([ds-left ds-right] (difference ds-left ds-right nil))
([ds-left ds-right options]
(-> (anti-join ds-left ds-right (distinct (clojure.core/concat (ds/column-names ds-left)
(ds/column-names ds-right))) options)
(vary-meta assoc :name "difference"))))
(defn union
[ds & datasets]
(-> (apply ds/concat ds datasets)
(ds/unique-by identity)
(vary-meta assoc :name "union")))
(defn- add-empty-missing-column
[ds name]
(let [cnt (ds/row-count ds)]
(->> (repeat cnt nil)
(col/new-column name)
(ds/add-column ds))))
(defn- add-empty-missing-columns
[ds-left ds-right]
(let [cols-l (set (ds/column-names ds-left))
cols-r (set (ds/column-names ds-right))
diff-l (s/difference cols-r cols-l)
diff-r (s/difference cols-l cols-r)
ds-left+ (reduce add-empty-missing-column ds-left diff-l)
ds-right+ (reduce add-empty-missing-column ds-right diff-r)]
(ds/concat ds-left+ ds-right+)))
(defn bind
[ds & datasets]
(reduce #(add-empty-missing-columns %1 %2) ds datasets))
;;
(defn append
"Concats columns of several datasets"
[ds & datasets]
(reduce #(ds/append-columns %1 (ds/columns %2)) ds datasets))