/
tasks.cljs
191 lines (160 loc) · 7.55 KB
/
tasks.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
(ns keechma.toolbox.tasks
(:require [cljs.core.async :refer [<! put! take! chan close! timeout]]
[medley.core :refer [dissoc-in]]
[promesa.core :as p]
[keechma.toolbox.pipeline.core :as pp])
(:import [goog.async AnimationDelay Throttle])
(:require-macros [cljs.core.async.macros :refer [go-loop]]))
(defrecord TaskStateUpdate [app-db id state])
(defn raf-producer [res-chan _]
(let [is-running? (atom true)
wait-delay (fn wait-delay []
(.start (AnimationDelay.
(fn [val]
(put! res-chan val)
(when @is-running? (wait-delay))))))]
(wait-delay)
(fn []
(reset! is-running? false)
nil)))
(defn throttle [f interval]
(let [t (Throttle. f interval)]
(fn [& args] (.apply (.-fire t) t (to-array args)))))
(defn app-db-change-producer [res-chan app-db-watcher]
(let [throttled-put! (throttle #(put! res-chan true) 1)]
(app-db-watcher
(fn [data]
(throttled-put! data)))
(fn [])))
(defn finish-task! [app-db id state]
(let [current (get-in app-db [:kv ::tasks id])
version (:version current)
stopper (get-in current [:stoppers version])]
(dissoc-in (if stopper (stopper app-db state) app-db) [:kv ::tasks id])))
(defn make-app-db-watcher [app-db-atom watcher-id]
(fn [cb]
(add-watch app-db-atom watcher-id
(fn [_ _ old-state new-state]
(when (not= old-state new-state)
(cb new-state))))))
(defn clear-task-version [app-db id version]
(-> app-db
(dissoc-in [:kv ::tasks id :states version])
(dissoc-in [:kv ::tasks id :stoppers version])))
(defn register-task!
([app-db-atom id producer reducer resolve runner-chan]
(let [version (gensym id)
watcher-id [id version]
finisher (producer runner-chan (make-app-db-watcher app-db-atom watcher-id))
stopper (fn [app-db state]
(close! runner-chan)
(remove-watch app-db-atom watcher-id)
(if (= ::stopped state)
(resolve)
(resolve :keechma.toolbox.pipeline.core/break))
(if-let [finisher-res (finisher)]
(let [new-app-db (reducer {:state state :value finisher-res :id id} app-db)]
(if (instance? TaskStateUpdate new-app-db)
(throw (ex-info "It's impossible to change task state when the task is not running"
{:task id :state state}))
(clear-task-version new-app-db id version)))
(clear-task-version app-db id version)))]
(reset! app-db-atom (-> (finish-task! @app-db-atom id ::cancelled)
(assoc-in [:kv ::tasks id :version] version)
(assoc-in [:kv ::tasks id :states version] ::running)
(assoc-in [:kv ::tasks id :stoppers version] stopper)))
{:stopper stopper
:version version})))
(defn update-task-state! [state]
(fn [app-db id]
(pp/commit! (finish-task! app-db id state))))
(def stop-task! (update-task-state! ::stopped))
(def cancel-task! (update-task-state! ::cancelled))
(defn update-task-state [state]
(fn [app-db id]
(->TaskStateUpdate app-db id state)))
(def stop-task (update-task-state ::stopped))
(def cancel-task (update-task-state ::cancelled))
(defn update-app-db-atom! [payload app-db-atom]
(if (nil? payload)
app-db-atom
(reset! app-db-atom payload)))
(defn ex-task-cancelled [id version]
(ex-info "Task cancelled" {::task {:id id :version version :state ::cancelled}}))
(defn task-running? [app-db id]
(let [task (get-in app-db [:kv ::tasks id])
current-version (:version task)]
(not (nil? (get-in task [:states current-version])))))
(defn task-loop [{:keys [producer reducer ctrl app-db-atom value resolve reject id pipelines$]}]
(let [runner-chan (chan)
pipelines-watcher-key (keyword (gensym :w))
parent-pipeline-id (:pipeline/running ctrl)
{:keys [stopper version]} (register-task! app-db-atom id producer reducer resolve runner-chan)]
(add-watch pipelines$ pipelines-watcher-key
(fn [_ _ _ new-val]
(when-not (get-in new-val (conj parent-pipeline-id :running?))
(reset! app-db-atom (finish-task! @app-db-atom id ::cancelled)))))
(let [started-at (.getTime (js/Date.))]
(go-loop [times-invoked 0]
(if-let [runner-value (<! runner-chan)]
(let [app-db @app-db-atom
reducer-result (reducer {:times-invoked times-invoked
:started-at started-at
:id id
:value runner-value
:state ::running}
app-db)
task-state-update? (instance? TaskStateUpdate reducer-result)]
(if task-state-update?
(do
(remove-watch pipelines$ pipelines-watcher-key)
(reset! app-db-atom (finish-task! (:app-db reducer-result) (:id reducer-result) (:state reducer-result))))
(do
(when (not= app-db-atom reducer-result)
(reset! app-db-atom reducer-result))
(recur (inc times-invoked)))))
(remove-watch pipelines$ pipelines-watcher-key))))))
(defn blocking-task-producer
([producer id reducer ctrl app-db-atom value]
(blocking-task-producer producer id reducer ctrl app-db-atom value nil))
([producer id reducer ctrl app-db-atom value pipelines$]
(p/promise (fn [resolve reject]
(task-loop {:reducer reducer
:producer producer
:ctrl ctrl
:app-db-atom app-db-atom
:value value
:resolve resolve
:reject reject
:id id
:pipelines$ pipelines$})))))
(defn blocking-task! [producer id reducer]
(with-meta (partial blocking-task-producer producer id reducer) {:pipeline? true}))
(defn non-blocking-task-producer
([producer id reducer ctrl app-db-atom value]
(non-blocking-task-producer producer id reducer ctrl app-db-atom value nil))
([producer id reducer ctrl app-db-atom value pipelines$]
(let [parent-pipeline-id (:pipeline/running ctrl)
running-path (conj parent-pipeline-id :running?)]
(task-loop {:reducer reducer
:producer producer
:ctrl ctrl
:app-db-atom app-db-atom
:value value
:resolve identity
:reject identity
:id id
;; Detach this task from the pipeline, it will continue running even if the pipeline is done or cancelled
:pipelines$ (atom (assoc-in {} running-path true))}))
nil))
(defn non-blocking-task! [producer id reducer]
(with-meta (partial non-blocking-task-producer producer id reducer) {:pipeline? true}))
(def blocking-raf! (partial blocking-task! raf-producer))
(def non-blocking-raf! (partial non-blocking-task! raf-producer))
(defn block-until! [id predicate-fn]
(blocking-task! app-db-change-producer id
(fn [_ app-db]
(let [res (predicate-fn app-db)]
(if res
(stop-task app-db id)
app-db)))))