-
Notifications
You must be signed in to change notification settings - Fork 18
/
join_separate.clj
191 lines (173 loc) · 8.82 KB
/
join_separate.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
(ns tablecloth.api.join-separate
(:refer-clojure :exclude [pmap])
(:require [tech.v3.dataset :as ds]
[tech.v3.dataset.column :as col]
[tech.v3.tensor :as tens]
[tech.v3.datatype :as dtt]
[clojure.string :as str]
[tech.v3.parallel.for :refer [pmap]]
[tech.v3.dataset.tensor]
[tablecloth.api.utils :refer [iterable-sequence? column-names grouped? process-group-data ->str]]
[tablecloth.api.columns :refer [select-columns drop-columns add-column]]))
(defn- process-join-columns
[ds target-column join-function col-names drop-columns?]
(let [cols (select-columns ds col-names)
result (add-column ds target-column (when (seq cols) (->> (ds/value-reader cols)
(map join-function))))]
(if drop-columns? (drop-columns result col-names) result)))
(defn join-columns
"Join clumns of dataset. Accepts:
dataset
column selector (as in select-columns)
options
`:separator` (default \"-\")
`:drop-columns?` - whether to drop source columns or not (default true)
`:result-type`
`:map` - packs data into map
`:seq` - packs data into sequence
`:string` - join strings with separator (default)
or custom function which gets row as a vector
`:missing-subst` - substitution for missing value"
([ds target-column columns-selector] (join-columns ds target-column columns-selector nil))
([ds target-column columns-selector {:keys [separator missing-subst drop-columns? result-type parallel?]
:or {separator "-" drop-columns? true result-type :string}
:as _conf}]
(let [missing-subst-fn #(map (fn [v] (if (nil? v) missing-subst v)) %)
col-names (column-names ds columns-selector)
join-function (comp (cond
(= :map result-type) #(zipmap col-names %)
(= :seq result-type) seq
(fn? result-type) result-type
:else (if (iterable-sequence? separator)
(let [sep (concat
(conj (seq separator) :empty)
(cycle separator))]
(fn [row] (->> row
(remove nil?)
(interleave sep)
(rest)
(apply str))))
(fn [row] (->> row
(remove nil?)
(str/join separator)))))
missing-subst-fn)]
(if (grouped? ds)
(process-group-data ds #(process-join-columns % target-column join-function col-names drop-columns?) parallel?)
(process-join-columns ds target-column join-function col-names drop-columns?)))))
;;
(defn- infer-target-columns
[col res]
(let [colname (col/column-name col)]
(map #(str colname "-" %) (range (count (first res))))))
(defn- separate-column->columns
[col target-columns replace-missing separator-fn]
(let [res (pmap separator-fn col)]
(if (map? (first res))
(ds/->dataset res) ;; ds/column->dataset functionality
(->> (if (or (= :infer target-columns)
(not target-columns)) (infer-target-columns col res)
target-columns)
(map-indexed vector)
(reduce (fn [curr [idx colname]]
(if-not colname
curr
(conj curr colname (pmap #(replace-missing (nth % idx)) res)))) [])
(apply array-map)
(ds/->dataset)))))
(defn- prepare-missing-subst-fn
[missing-subst]
(let [missing-subst-fn (cond
(or (set? missing-subst)
(fn? missing-subst)) missing-subst
(iterable-sequence? missing-subst) (set missing-subst)
:else (partial = missing-subst))]
(fn [v] (if (missing-subst-fn v) nil v))))
(defn- process-separate-columns
[ds column target-columns replace-missing separator-fn drop-column?]
(let [result (separate-column->columns (ds column) target-columns replace-missing separator-fn)]
(if (= drop-column? :all)
result
(let [[dataset-before dataset-after] (map (partial ds/select-columns ds)
(split-with #(not= % column)
(ds/column-names ds)))]
(cond-> (ds/->dataset dataset-before)
(not drop-column?) (ds/add-column (ds column))
result (ds/append-columns (seq (ds/columns result)))
:else (ds/append-columns (ds/columns (ds/drop-columns dataset-after [column]))))))))
(defn separate-column
([ds column] (separate-column ds column identity))
([ds column separator] (separate-column ds column nil separator))
([ds column target-columns separator] (separate-column ds column target-columns separator nil))
([ds column target-columns separator {:keys [missing-subst drop-column? parallel?]
:or {missing-subst ""}
:as _conf}]
(let [separator-fn (cond
(string? separator) (let [pat (re-pattern separator)]
#(-> (str %)
(str/split pat)
(concat (repeat ""))))
(instance? java.util.regex.Pattern separator) #(-> separator
(re-matches (str %))
(rest)
(concat (repeat "")))
:else separator)
replace-missing (if missing-subst
(prepare-missing-subst-fn missing-subst)
identity)
drop-column? (if (not (nil? drop-column?)) drop-column? true)]
(if (grouped? ds)
(process-group-data ds #(process-separate-columns % column target-columns replace-missing separator-fn drop-column?) parallel?)
(process-separate-columns ds column target-columns replace-missing separator-fn drop-column?)))))
(defn- prefix [prefix-name value]
(let [with-prefix (str (->str prefix-name) "-" value)]
(if (keyword? prefix-name)
(keyword with-prefix)
with-prefix)))
(defn array-column->columns
"Converts a column of type java array into several columns,
one for each element of the array of all rows. The source column is dropped afterwards.
The function assumes that arrays in all rows have same type and length and are numeric.
`ds` Datset to operate on.
`src-column` The (array) column to convert
`opts` can contain:
`prefix` newly created column will get prefix before column number
"
([ds src-column opts]
(assert (not (grouped? ds)) "Not supported on grouped datasets")
(let [len-arrays (-> ds src-column first count)
new-ds
(->
(dtt/concat-buffers (ds src-column))
(tens/reshape [(ds/row-count ds) len-arrays])
(tech.v3.dataset.tensor/tensor->dataset))
new-ds-renamed (if (:prefix opts)
(ds/rename-columns new-ds
(zipmap (range len-arrays)
(map #(prefix (:prefix opts) %) (range len-arrays))))
new-ds)
]
(-> ds
(ds/append-columns (ds/columns new-ds-renamed))
(ds/drop-columns [src-column]))))
([ds src-column]
(array-column->columns ds src-column {})))
(defn columns->array-column
"Converts several columns to a single column of type array.
The src columns are dropped afterwards.
`ds` Dataset to operate on.
`column-selector` anything supported by [[select-columns]]
`new-column` new column to create
"
[ds column-selector new-column]
(assert (not (grouped? ds)) "Not supported on grouped datasets")
(let [ds-to-convert (select-columns ds column-selector)
rows
(->
(dtt/concat-buffers (ds/columns ds-to-convert))
(tens/reshape [(ds/column-count ds-to-convert)
(ds/row-count ds-to-convert)])
(tens/transpose [1 0])
(tens/rows))]
(-> ds
(drop-columns (column-names ds-to-convert))
(ds/add-column (ds/new-column new-column (map tech.v3.datatype/->array rows))))))