This repository has been archived by the owner on Jan 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 204
/
triggers.cljc
123 lines (99 loc) · 4.1 KB
/
triggers.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
(ns onyx.triggers
(:require [onyx.windowing.units :refer [coerce-key to-standard-units standard-units-for]]
[onyx.static.util :refer [kw->fn now ms->ns]]))
;;; State helper functions
(defn next-fire-time
[{:keys [trigger/period] :as trigger}]
(if (= (standard-units-for (second period)) :milliseconds)
(let [ms (apply to-standard-units period)]
;; use monotonically increasing clock for Java
#?(:clj (+ (System/nanoTime) (ms->ns ms)))
;; cljs clock is susceptible to time changes
#?(:cljs (+ (now) ms)))
(throw (ex-info ":trigger/period must be a unit that can be converted to :milliseconds"
{:trigger trigger}))))
(defn exceeds-watermark? [window upper-extent-bound segment]
(let [watermark (get segment (:window/window-key window))]
(>= (coerce-key watermark :milliseconds) upper-extent-bound)))
(defn exceeds-percentile-watermark? [window trigger lower-extent-bound upper-extent-bound segment]
(let [watermark (get segment (:window/window-key window))
pct (:trigger/watermark-percentage trigger)
offset (* (- upper-extent-bound lower-extent-bound) pct)]
(>= (coerce-key watermark :milliseconds) (+ lower-extent-bound offset))))
;;; State initialization functions
(defn segment-init-state [_]
0)
(defn timer-init-state
[trigger]
[false (next-fire-time trigger)])
(defn punctuation-init-state
[trigger]
{:fire? false})
;; Init local functions
(defn punctuation-init-locals [trigger]
{:pred-fn (kw->fn (:trigger/pred trigger))})
;;; State transition functions
(defn segment-next-state
[{:keys [trigger/threshold]} state {:keys [event-type] :as state-event}]
(if (= event-type :new-segment)
(inc (mod state (first threshold)))
state))
(defn timer-next-state
[{:keys [trigger/period] :as trigger}
[_ fire-time]
{:keys [event-type] :as state-event}]
(let [fire? (or (> #?(:clj (System/nanoTime))
#?(:cljs (now))
fire-time)
(boolean (#{:job-completed :recovered} event-type)))]
[fire? (if fire? (next-fire-time trigger) fire-time)]))
(defn punctuation-next-state
[trigger state state-event]
(let [{:keys [pred-fn]} trigger]
{:fire? (pred-fn trigger state-event)}))
;;; Fire predicate functions
(defn segment-fire?
[{:keys [trigger/threshold] :as trigger}
trigger-state
{:keys [event-type] :as state-event}]
(or (and (= event-type :new-segment)
(= trigger-state (first threshold)))
(#{:job-completed :recovered} event-type)))
(defn timer-fire?
[trigger [fire? _] state-event]
fire?)
(defn punctuation-fire?
[trigger state state-event]
(:fire? state))
(defn watermark-init-locals [{:keys [trigger/delay]}]
{:delay (if delay (apply to-standard-units delay) 0)})
(defn watermark-fire?
[trigger _ {:keys [event-type upper-bound watermarks] :as state-event}]
(or (= :job-completed event-type)
(and (= :watermark event-type)
(> (:input watermarks) (+ upper-bound (:delay trigger))))))
(defn percentile-watermark-fire?
[trigger _ {:keys [lower-bound upper-bound event-type segment window]}]
;; If this was stimulated by a new segment, check if it should fire.
;; Otherwise if this was a completed task, always fire.
(or (and segment (exceeds-percentile-watermark? window trigger lower-bound upper-bound segment))
(#{:job-completed :recovered} event-type)))
;;; Top level vars to bundle the functions together
(def ^:export segment
{:trigger/init-state segment-init-state
:trigger/next-state segment-next-state
:trigger/trigger-fire? segment-fire?})
(def ^:export timer
{:trigger/init-state timer-init-state
:trigger/next-state timer-next-state
:trigger/trigger-fire? timer-fire?})
(def ^:export punctuation
{:trigger/init-state punctuation-init-state
:trigger/init-locals punctuation-init-locals
:trigger/next-state punctuation-next-state
:trigger/trigger-fire? punctuation-fire?})
(def ^:export watermark
{:trigger/init-locals watermark-init-locals
:trigger/trigger-fire? watermark-fire?})
(def ^:export percentile-watermark
{:trigger/trigger-fire? percentile-watermark-fire?})