This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 206
/
window_extensions.cljc
249 lines (200 loc) · 8.16 KB
/
window_extensions.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
(ns onyx.windowing.window-extensions
(:require [onyx.windowing.units :refer [to-standard-units coerce-key] :as units]
[onyx.windowing.window-id :as wid]
[onyx.static.default-vals :as d]))
(defn window-id-impl-extents [units min-value w-range w-slide window-time]
(let [min-value (or min-value 0)]
(wid/wids min-value w-range w-slide window-time)))
(defprotocol IWindow
(extent-operations [this all-extents segment time-index]
"Given a segment time and all extents, return the vector of operations that should be performed on the windows.
Operations take the form [action arg1 arg2].
Support actions are:
[:merge-extents extent1 extent2 merged-extent]
[:alter-extents old-extent new-extent]
[:update extent]")
(event-time [this segment])
(extents [this time-index])
(time-index [this time]
"Given a segment, return the coerced window time for the window key.")
(time-index->extent [this time]
"Given a time in milliseconds, compute the corresponding extent.")
(bounds [this window-id]
"Returns a vector of two elements. The first is the lower bound that this window
id accepts, and the second is the upper."))
(defrecord FixedWindow
[id task type init window-key min-value range w-range units slide timeout-gap doc window]
IWindow
(extent-operations [this _ _ time-index]
(map (fn [extent]
[:update extent])
(extents this time-index)))
(event-time [this segment]
(get segment window-key))
(extents [this time-index]
(window-id-impl-extents units min-value w-range w-range time-index))
(time-index [this time]
(units/coerce-key time units))
(bounds [this window-id]
(let [win-min (or min-value (get d/default-vals :onyx.windowing/min-value))]
[(wid/extent-lower win-min w-range w-range window-id)
(wid/extent-upper win-min w-range window-id)])))
(defrecord SlidingWindow
[id task type init window-key min-value range slide units w-range w-slide timeout-gap doc window]
IWindow
(extent-operations [this _ _ time-index]
(map (fn [extent]
[:update extent])
(extents this time-index)))
(extents [this time-index]
(window-id-impl-extents units min-value w-range w-slide time-index))
(event-time [this segment]
(get segment window-key))
(time-index [this time]
(units/coerce-key time units))
(bounds [this window-id]
(let [win-min (or min-value (get d/default-vals :onyx.windowing/min-value))]
[(wid/extent-lower win-min w-range w-slide window-id)
(wid/extent-upper win-min w-slide window-id)])))
(defrecord GlobalWindow
[id task type init window-key min-value range slide timeout-gap doc window]
IWindow
(extent-operations [this _ _ _]
;; Always return the same window ID, the actual number
;; doesn't matter - as long as its constant.
[[:update 1]])
(event-time [this segment]
(if window-key
(get segment window-key)))
(extents [this _]
[1])
(time-index [this time]
0)
(bounds [this window-id]
;; Everything is in bounds.
#?(:clj [Double/NEGATIVE_INFINITY Double/POSITIVE_INFINITY])
#?(:cljs [(.-NEGATIVE_INFINITY js/Number) (.-POSITIVE_INFINITY js/Number)])))
(defn bounding-extents
"Find the extents with the closest lower bounds."
[extents session-time]
(loop [extent (first extents)
vs (rest extents)
closest-below [#?(:clj Long/MAX_VALUE
:cljs (.-POSITIVE_INFINITY js/Number))
nil]
closest-above [#?(:clj Long/MAX_VALUE
:cljs (.-POSITIVE_INFINITY js/Number))
nil]]
(if (nil? extent)
[(second closest-below)
(second closest-above)]
(let [[session-lower-bound] extent
lower-distance (- session-time session-lower-bound)
new-closest-below (if (and (<= session-lower-bound session-time)
(< lower-distance (first closest-below)))
[lower-distance extent]
closest-below)
upper-distance (- session-lower-bound session-time)
new-closest-above (if (and (>= session-lower-bound session-time)
(< upper-distance (first closest-above)))
[upper-distance extent]
closest-above)]
(recur (first vs)
(rest vs)
new-closest-below
new-closest-above)))))
(defrecord SessionWindow
[id task type init window-key min-value range slide gap timeout-gap units doc window]
IWindow
(extent-operations [this all-extents _ time-index]
(let [[below-extent above-extent] (bounding-extents @all-extents time-index)
[below-lower below-upper] below-extent
[above-lower above-upper] above-extent
below-contains? (and below-upper (>= below-upper (- time-index gap)))
above-contains? (and above-lower (>= (+ time-index gap) above-lower))]
(cond ;; matches point exactly
(and below-extent above-extent (= below-extent above-extent))
[[:update below-extent]]
(and below-contains? above-contains?)
[[:merge-extents
[below-lower below-upper]
[above-lower above-upper]
[below-lower above-upper]]
[:update [below-lower above-upper]]]
(and below-contains? (> time-index below-upper))
[[:alter-extents
[below-lower below-upper]
[below-lower time-index]]
[:update [below-lower time-index]]]
below-contains?
[[:update [below-lower (max below-upper time-index)]]]
(and above-contains? (< time-index above-lower))
[[:alter-extents
[above-lower above-upper]
[time-index above-upper]]
[:update [time-index above-upper]]]
above-contains?
[[:update [above-lower (max time-index above-upper)]]]
;; no windows matched
:else
[[:update [time-index time-index]]])))
(event-time [this segment]
(get segment window-key))
(extents [this time-index]
(throw (ex-info "Direct time-index->extents lookup is not supported for session windwos." {})))
(time-index [this t]
(units/coerce-key t units))
(bounds [this window-id]
window-id))
(defmulti extent-serializer
"Given a window, return the type of extent serializer"
(fn [window]
(:window/type window)))
(defmethod extent-serializer :fixed
[window]
:long)
(defmethod extent-serializer :sliding
[window]
:long)
(defmethod extent-serializer :global
[window]
:nil)
(defmethod extent-serializer :session
[window]
:long-long)
(defmulti windowing-builder
"Given a window, return the concrete type to perform
operations against."
(fn [window]
(:window/type window)))
(defn ordered-log? [window]
(boolean (some #{:ordered-log} (:window/storage-strategy window))))
(defmethod windowing-builder :fixed
[window]
(fn [{:keys [range] :as m}]
(-> m
(assoc :units (units/standard-units-for (last range)))
(assoc :w-range (apply units/to-standard-units range))
(map->FixedWindow))))
(defmethod windowing-builder :sliding
[window]
(when (ordered-log? window)
(throw (ex-info "Ordered log computation of windows is not supported for sliding windows." {})))
(fn [{:keys [range slide] :as m}]
(-> m
(assoc :units (units/standard-units-for (last range)))
(assoc :w-range (apply units/to-standard-units range))
(assoc :w-slide (apply to-standard-units (or slide range)))
(map->SlidingWindow))))
(defmethod windowing-builder :global
[window]
map->GlobalWindow)
(defmethod windowing-builder :session
[window]
(when (ordered-log? window)
(throw (ex-info "Ordered log computation of windows is not supported for session windows." {})))
(fn [{:keys [timeout-gap] :as m}]
(-> m
(assoc :units (units/standard-units-for (last timeout-gap)))
(assoc :gap (apply units/to-standard-units timeout-gap))
map->SessionWindow)))