forked from logrhythm/clj-headlights
-
Notifications
You must be signed in to change notification settings - Fork 2
/
pipeline.clj
233 lines (202 loc) · 9.71 KB
/
pipeline.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
(ns clj-headlights.pipeline
(:require [schema.core :as s]
[clj-headlights.pardo :as pardo]
[clj-headlights.clj-fn-call :as clj-fn-call]
[clj-headlights.pcollections :as pcollections]
[clojure.tools.logging :as log]
[clj-headlights.utils :as utils])
(:import (java.io Serializable ByteArrayOutputStream ObjectOutputStream)
(clojure.lang Var IFn)
(org.apache.beam.sdk Pipeline)
(org.apache.beam.sdk.values PCollection PCollectionList TupleTag PDone PCollectionTuple POutput PBegin KV)
(org.apache.beam.sdk.transforms Create$Values Flatten PTransform DoFn$ProcessContext GroupByKey)
(org.apache.beam.sdk.transforms.windowing Window GlobalWindows)
(org.apache.beam.sdk.transforms.join KeyedPCollectionTuple CoGroupByKey)
(org.apache.beam.sdk.testing TestPipeline)
(org.apache.beam.sdk.options PipelineOptionsFactory)
(org.apache.beam.sdk.coders KvCoder IterableCoder)
(org.apache.beam.sdk.extensions.protobuf ByteStringCoder ProtoCoder)
(com.google.bigtable.v2 Mutation)
(clj_headlights PipelineOptions EdnCoder NippyCoder)))
(def get-side-output pardo/get-side-output)
(def get-side-outputs pardo/get-side-outputs)
(s/defn create :- Pipeline
"Create a Pipeline object. This is the first building block of any Beam pipeline."
[options :- PipelineOptions]
(utils/retry-on-error 3 IllegalStateException (Pipeline/create options)))
(s/defn make-pipeline-options :- PipelineOptions
"Create a PipelineOptions object."
[]
(PipelineOptionsFactory/as PipelineOptions))
(defn replace-keywords
"Replaces special keywords with Beam objects, preserves order"
[clj-call ^DoFn$ProcessContext ctx window]
(let [params (->> clj-call
:params
(replace {:with-context ctx :with-window window}))]
(assoc clj-call :params params)))
(defn side-output-function-wrapper [context val window state clj-call]
(let [outputs (filter some? (pardo/invoke-with-optional-state (replace-keywords clj-call context window) val state))]
(doseq [[tuple-tag-key output] outputs]
(if (= :main tuple-tag-key)
(pardo/emit-main-output context output)
(pardo/emit-side-output context tuple-tag-key output)))))
(s/defn df-map-cat-with-side-outputs :- PCollectionTuple
"Acts like a map-cat but returns a `PCollectionTuple` partitioned by the tag of the
outputs of `f`. `f` must return a sequence of tuples in the form `[:tag value]`.
The main output should use the tag `:main`. Nil values are filtered out"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall
tags :- [s/Keyword]]
(pardo/create-and-apply
pcoll
name
[#'side-output-function-wrapper (clj-fn-call/to-serializable-clj-call clj-call)]
{:outputs (into {} (map (fn [tag] [tag (pardo/default-coder)]) (conj tags :main)))}))
(defn process-pcoll [pcoll name wrapping-call clj-call]
(pardo/create-and-apply
pcoll
name
(clj-fn-call/append-argument-to-clj-call wrapping-call (clj-fn-call/to-serializable-clj-call clj-call))
{}))
(defn apply-to-value-and-output
[^DoFn$ProcessContext context value window state clj-call]
(let [result (pardo/invoke-with-optional-state (replace-keywords clj-call context window) value state)]
(.output context result)))
(s/defn df-map :- PCollection
"Returns a `PCollection` of the return values of function `clj-call` being applied to the
input `pcoll` - used for strictly 1-to-1 transformations"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll pcoll name #'apply-to-value-and-output clj-call))
(s/defn df-map-with-side-input :- PCollection
[pcoll :- pcollections/PCollectionType
name :- s/Str
side-input-view
clj-call :- clj-fn-call/CljCall]
(pardo/create-and-apply
pcoll
name
(clj-fn-call/append-argument-to-clj-call clj-call side-input-view)
{:side-inputs [side-input-view]}))
(defn apply-to-value-and-seq-outputs
[^DoFn$ProcessContext context value _window state clj-call]
(doseq [v (pardo/invoke-with-optional-state clj-call value state)]
(.output context v)))
(s/defn df-mapcat :- PCollection
"Similar to `df-map` but the return-values of `f` are flattened in the output
`PCollection` - used when the transformation is not strictly 1-to-1"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll pcoll name #'apply-to-value-and-seq-outputs clj-call))
(defn filterer
[^DoFn$ProcessContext context value _window state clj-call]
(when (pardo/invoke-with-optional-state clj-call value state)
(.output context value)))
(s/defn df-filter :- PCollection
"Returns a `PCollection` containing the elements of `pcoll` where `(f element)` is truthy"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll pcoll name #'filterer clj-call))
(defn apply-to-value-and-output-kv
[^DoFn$ProcessContext context value _window state clj-call]
(let [result (pardo/invoke-with-optional-state clj-call value state)]
(.output context (KV/of (first result) (second result)))))
(defn process-pcoll-kv
[pcoll name wrapping-call clj-call]
(pardo/create-and-apply
pcoll
name
(clj-fn-call/append-argument-to-clj-call wrapping-call (clj-fn-call/to-serializable-clj-call clj-call))
{:outputs {:main (KvCoder/of (EdnCoder.) (NippyCoder.))}}))
(s/defn df-map-kv
"Same as df-map but expects the output of `f` to be a sequence of length 2 which is converted
to a Dataflow KV object. Mostly used before a `GroupBy`"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll-kv pcoll name #'apply-to-value-and-output-kv clj-call))
(defn apply-to-value-and-seq-outputs-kv
[^DoFn$ProcessContext context value _window state clj-call]
(doseq [[k v] (pardo/invoke-with-optional-state clj-call value state)]
(.output context (KV/of k v))))
(s/defn df-mapcat-kv
"Same as `df-map-kv` except `f` returns a sequence of lenght-2-sequences to be flattened
and converted into KV objects"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll-kv pcoll name #'apply-to-value-and-seq-outputs-kv clj-call))
(s/defn df-apply-dofn [pcoll name clj-call]
(pardo/create-and-apply pcoll name clj-call {}))
(defn filterer-kv
[^DoFn$ProcessContext context value _window state clj-call]
(when (pardo/invoke-with-optional-state clj-call value state)
(.output context (KV/of (first value) (second value)))))
(s/defn df-filter-kv :- PCollection
"returns a `PCollection` containing the elements of `pcoll` where `(f element)` is true. coerces to KV"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll-kv pcoll name #'filterer-kv clj-call))
(defn apply-to-value-and-output-with-timestamp
[^DoFn$ProcessContext context value _window state clj-call]
(let [[result instant] (pardo/invoke-with-optional-state clj-call value state)]
(.outputWithTimestamp context result instant)))
(s/defn df-map-with-timestamp
"Assigns a timestamp to pcoll rows"
[pcoll :- pcollections/PCollectionType
name :- s/Str
clj-call :- clj-fn-call/CljCall]
(process-pcoll pcoll name #'apply-to-value-and-output-with-timestamp clj-call))
(defn ^PCollection output-is-kv [^PCollection pcoll] (.setCoder pcoll (KvCoder/of (EdnCoder/of) (NippyCoder/of))))
(s/defn flatten-pcollections :- PCollection
"Merge a list of pcolls."
[pcolls :- [PCollection]
name :- s/Str]
(-> (PCollectionList/of ^Iterable pcolls)
(.apply name (Flatten/pCollections))))
(s/defn co-group-by-key :- PCollection
"Takes a list of PCollections which are already in KV form, and joins them on their keys.
Output pcollection will look like: [key [first-pcoll-vals second-pcoll-vals third-pcoll-vals &c]]"
[pcolls :- (s/cond-pre PCollection [PCollection])
name :- s/Str]
(let [pcolls (if (vector? pcolls) pcolls [pcolls])] ; TODO: do not allow co-group-by-key of one pcollection
(loop [keyed-pcoll-tuple (KeyedPCollectionTuple/of (TupleTag. "0") (first pcolls))
remaining-pcolls (rest pcolls)
counter 1]
(if (empty? remaining-pcolls)
(.apply keyed-pcoll-tuple name (CoGroupByKey/create))
(recur (.and keyed-pcoll-tuple (TupleTag. (str counter)) (first remaining-pcolls)) (rest remaining-pcolls) (inc counter))))))
(s/defn group-by-key :- PCollection
[pcoll :- PCollection
name :- s/Str]
(.apply pcoll name (GroupByKey/create)))
(s/defn composite
"Nests transforms that happen in f into a composite transform"
[name :- s/Str
inputs :- [pcollections/PCollectionType]
f :- IFn]
(let [transform (proxy [PTransform] []
(expand [pcoll-like]
(let [transform-result (if (= (class pcoll-like) PCollectionList)
(apply f (.getAll pcoll-like))
(f pcoll-like))]
(if (coll? transform-result)
(PCollectionList/of ^Iterable transform-result)
transform-result))))
application-result (.apply
(if (= 1 (count inputs))
(first inputs)
(PCollectionList/of ^Iterable inputs))
name
transform)]
(if (= PCollectionList (class application-result))
(into [] (.getAll ^PCollectionList application-result))
application-result)))
(defn context->ms [^DoFn$ProcessContext c]
(.getMillis (.timestamp c)))