-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
core.cljc
272 lines (223 loc) · 11.3 KB
/
core.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
(ns superlifter.core
(:require [urania.core :as u]
[promesa.core :as prom]
#?(:clj [superlifter.logging :refer [log]]
:cljs [superlifter.logging :refer-macros [log]]))
(:refer-clojure :exclude [resolve]))
#?(:cljs (def Throwable js/Error))
(defprotocol Cache
(->urania [this])
(urania-> [this new-value]))
(extend-protocol Cache
#?(:clj clojure.lang.Atom
:cljs cljs.core/Atom)
(->urania [this]
(deref this))
(urania-> [this new-value]
(reset! this new-value)))
(def default-bucket-id :default)
(defn- clear-ready [bucket]
(update bucket :queue assoc :ready []))
(defn- ready-all [bucket]
(update bucket :queue (fn [queue]
(-> (assoc queue :waiting [])
(assoc :ready (:waiting queue))))))
(defn- update-bucket! [context bucket-id f]
(let [bucket-id (if (contains? @(:buckets context) bucket-id)
bucket-id
(do (log :warn "Bucket" bucket-id "does not exist, using default bucket")
default-bucket-id))
new (-> (swap! (:buckets context)
(fn [buckets]
(update buckets bucket-id (comp f clear-ready))))
(get bucket-id))]
(if-let [muses (not-empty (get-in new [:queue :ready]))]
(let [cache (get-in new [:urania-opts :cache])]
(log :info "Fetching" (count muses) "muses from bucket" bucket-id)
(-> (u/execute! (u/collect muses)
(merge (:urania-opts new)
(when cache
{:cache (->urania cache)})))
(prom/then
(fn [[result new-cache-value]]
(when cache
(urania-> cache new-cache-value))
result))))
(do (log :debug "Nothing ready to fetch for" bucket-id)
(prom/resolved nil)))))
(defn- fetch-bucket! [context bucket-id]
(update-bucket! context bucket-id ready-all))
(defn fetch!
"Performs a fetch of all muses in the queue"
([context] (fetch! context default-bucket-id))
([context bucket-id]
(fetch-bucket! context bucket-id)))
(defn fetch-all! [context]
(prom/then (prom/all (map (partial fetch! context) (keys @(:buckets context))))
(fn [results]
(reduce into [] results))))
(defn enqueue!
"Enqueues a muse describing work to be done and returns a promise which will be delivered with the result of the work.
The muses in the queue will all be fetched together when a trigger condition is met."
([context muse] (enqueue! context default-bucket-id muse))
([context bucket-id muse]
(let [p (prom/deferred)
delivering-muse (u/map (fn [result]
(prom/resolve! p result)
result)
muse)]
(log :debug "Enqueuing muse into" bucket-id (:id muse))
(update-bucket! context
bucket-id
(fn [bucket]
(reduce (fn [b trigger-fn]
(trigger-fn b))
(update-in bucket [:queue :waiting] conj delivering-muse)
(keep :enqueue-fn (vals (:triggers bucket))))))
p)))
(defn- fetch-all-handling-errors! [context bucket-id]
(try (prom/catch (fetch-bucket! context bucket-id)
(fn [error]
(log :warn "Fetch failed" error)))
(catch Throwable t
(log :warn "Fetch failed" t))))
(defmulti start-trigger! (fn [kind _context _bucket-id _opts] kind))
(defmethod start-trigger! :queue-size [_ _context _bucket-id {:keys [threshold] :as opts}]
(assoc opts :enqueue-fn (fn [{:keys [queue] :as bucket}]
(if (= threshold (count (:waiting queue)))
(-> bucket
(assoc-in [:queue :ready] (take threshold (:waiting queue)))
(update-in [:queue :waiting] #(drop threshold %)))
bucket))))
(defmethod start-trigger! :elastic [kind _context _bucket-id opts]
(assoc opts
:enqueue-fn (fn [{:keys [queue] :as bucket}]
(let [threshold (get-in bucket [:triggers kind :threshold] 0)]
(if (<= threshold (count (:waiting queue)))
(-> bucket
(assoc-in [:queue :ready] (:waiting queue))
(assoc-in [:queue :waiting] [])
(assoc-in [:triggers kind :threshold] 0))
bucket)))))
(defn update-trigger! [context bucket-id trigger-kind opts-fn]
(update-bucket! context bucket-id (fn [bucket]
(update-in bucket [:triggers trigger-kind] opts-fn))))
(defmethod start-trigger! :interval [_ context bucket-id opts]
(let [watcher #?(:clj (future (loop []
(Thread/sleep (:interval opts))
(fetch-all-handling-errors! context bucket-id)
(recur)))
:cljs (js/setInterval #(fetch-all-handling-errors! context bucket-id)
(:interval opts)))]
(assoc opts :stop-fn #?(:clj #(future-cancel watcher)
:cljs #(js/clearInterval watcher)))))
#?(:cljs
(defn- check-debounced [context bucket-id interval last-updated]
(let [lu @last-updated]
(cond
(nil? lu) (js/setTimeout check-debounced interval context bucket-id interval last-updated)
(= :exit lu) nil
(<= interval (- (js/Date.) lu))
(do (fetch-all-handling-errors! context bucket-id)
(compare-and-set! last-updated lu nil)
(js/setTimeout check-debounced 0 context bucket-id interval last-updated))
:else
(js/setTimeout check-debounced (- interval (- (js/Date.) lu)) context bucket-id interval last-updated)))))
(defmethod start-trigger! :debounced [_ context bucket-id opts]
(let [interval (:interval opts)
last-updated (atom nil)
watcher #?(:clj (future (loop []
(let [lu @last-updated]
(cond
(nil? lu) (do (Thread/sleep interval)
(recur))
(= :exit lu) nil
(<= interval (- (System/currentTimeMillis) lu))
(do (fetch-all-handling-errors! context bucket-id)
(compare-and-set! last-updated lu nil)
(recur))
:else
(do (Thread/sleep (- interval (- (System/currentTimeMillis) lu)))
(recur))))))
:cljs (js/setTimeout check-debounced 0 context bucket-id interval last-updated))]
(assoc opts
:enqueue-fn (fn [bucket]
(reset! last-updated #?(:clj (System/currentTimeMillis)
:cljs (js/Date.)))
bucket)
:stop-fn #(do #?(:clj (future-cancel watcher)
:cljs (js/clearInterval watcher))
(reset! last-updated :exit)))))
(defmethod start-trigger! :default [_context _bucket-id _trigger-kind opts]
opts)
(defn- start-triggers! [context bucket-id {:keys [triggers] :as opts}]
(update opts :triggers
#(do (log :debug "Starting" (count triggers) "triggers for bucket" bucket-id)
(reduce-kv (fn [ts trigger-kind trigger-opts]
(log :debug "Starting trigger" trigger-kind "for bucket" bucket-id trigger-opts)
(assoc ts trigger-kind (start-trigger! trigger-kind context bucket-id trigger-opts)))
{}
%))))
(defn- start-bucket! [context bucket-id opts]
(log :debug "Starting bucket" bucket-id)
(start-triggers! context bucket-id (-> (assoc opts :queue {:ready [] :waiting []} :id bucket-id)
(update :urania-opts #(merge (:urania-opts context) %)))))
(defn- start-buckets! [{:keys [buckets] :as context}]
(let [context (update context :buckets atom)
started-buckets (reduce-kv (fn [buckets id opts]
(assoc buckets id (start-bucket! context id opts)))
{}
buckets)]
(reset! (:buckets context) started-buckets)
context))
(defn- stop-bucket! [context bucket-id]
(doseq [{:keys [stop-fn]} (vals (:triggers (get @(:buckets context) bucket-id)))
:when stop-fn]
(stop-fn)))
(defn add-bucket! [context bucket-id opts]
(locking (:sync-lock context)
;; it is possible that many threads will attempt to add the same bucket at the same time
;; it's important we only start the bucket once, so we obtain a lock and start the bucket before mutating the buckets atom
(if-not (contains? @(:buckets context) bucket-id)
(do (log :debug "Adding bucket" bucket-id opts)
(swap! (:buckets context) assoc bucket-id (start-bucket! context bucket-id opts)))
(log :warn "Bucket" bucket-id "already exists"))
context))
(defn default-opts []
{:urania-opts {:cache (atom {})}})
(defn start!
"Starts a superlifter with the supplied options, which can contain:
:buckets {:default bucket-opts
:my-bucket bucket-opts
...}
:urania-opts The options map supplied to urania for running muses.
Contains :env, :cache and :executor
:cache must implement the Cache protocol
See urania documentation for details
The `:default` bucket is used for all activity not associated with a named bucket.
Bucket options can contain the following:
:triggers Conditions to perform a fetch of all muses in the queue.
Triggers is a map of trigger-kind to trigger, looking like:
{:queue-size {:threshold 10}
:interval {:interval 100}
The fetch will be performed whenever any single trigger condition is met.
Triggers can be of several types:
Queue size trigger, which performs the fetch when the queue reaches n items
{:queue-size {:threshold n}}
Interval trigger, which performs the fetch every n milliseconds
{:interval {:interval n}}
If no triggers are supplied, superlifter runs in 'manual' mode and fetches will only be performed when you call `fetch!`
You can supply your own trigger definition by participating in the `start-trigger!` multimethod.
:urania-opts Override the top-level urania-opts at the bucket level
Returns a context which can be used to stop superlifter, enqueue muses and trigger fetches.
"
[opts]
(-> (merge (default-opts) opts)
#?(:clj (assoc :sync-lock (Object.)))
(update-in [:buckets default-bucket-id] #(or % {}))
(start-buckets!)))
(defn stop!
"Stops superlifter"
[context]
(run! (partial stop-bucket! context) (keys @(:buckets context)))
context)