forked from jarohen/chime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
schedule.clj
373 lines (323 loc) · 13.3 KB
/
schedule.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
(ns chime.schedule
"Lightweight scheduling library."
(:require [clojure.tools.logging :as log]
[chime.times :as times])
(:import (clojure.lang IDeref IBlockingDeref IPending ISeq IAtom2 PersistentQueue IAtom)
(java.time Instant Clock ZonedDateTime)
(java.time.temporal ChronoUnit)
(java.util.concurrent ThreadFactory TimeUnit ScheduledFuture ScheduledThreadPoolExecutor)
(java.lang AutoCloseable)
(java.util.concurrent.atomic AtomicLong)))
(def ^:private default-thread-factory
(let [thread-no (AtomicLong. 0)]
(reify ThreadFactory
(newThread [_ r]
(doto (Thread. r)
(.setName (str "chime-" (.incrementAndGet thread-no))))))))
(defn default-error-handler
([e]
(default-error-handler nil e))
([id e]
(log/warn e (cond-> "Error running scheduled fn"
(some? id) (str \space id)))
true))
(def ^:private seq-step
(juxt first next))
(defn- atom-step [a]
(let [[old _] (swap-vals! a pop)]
[(first old) a]))
(defn- mutable-times
[times]
(atom (into PersistentQueue/EMPTY times)))
(defn chime-at
"Calls <f> with the current time, at every time in the <times> sequence.
```
(:require [chime.schedule :as chime])
(:import [java.time Instant])
(let [now (Instant/now)]
(chime/chime-at
[(.plusSeconds now 2)
(.plusSeconds now 4)]
(fn [time]
(println \"Chiming at\" time)))
```
If one of those times is in the past (e.g. by accident), or a job spills over to
the next time (i.e. overunning), it will (by default) be scheduled with no delay
(i.e. 'push-forward' semantics). If you don't want that to happen, use the <drop-overruns?>
option (i.e. 'catch-up' semantics).
Providing a custom <thread-factory> is supported, but optional (see `default-thread-factory`).
Providing a custom <clock> is supported, but optional (see `times/*clock*`).
Providing a custom (1-arg) `error-handler` is supported, but optional (see `default-error-handler`).
Return truthy from this function to continue the schedule (the default), or falsy to shut it down.
Returns an AutoCloseable that you can `.close` in order to shutdown the schedule.
You can also deref the return value to wait for the schedule to finish.
When the schedule is either manually closed (w/o an <on-aborted> callback) or exhausted,
the <on-finished> callback will be called. If <on-aborted> has been provided it will be
called instead (only on manual close)."
(^AutoCloseable [times f] (chime-at times f nil))
(^AutoCloseable [times f {:keys [error-handler on-finished on-aborted thread-factory clock drop-overruns? mutable?]
:or {error-handler default-error-handler
thread-factory default-thread-factory ;; loom-friendly (i.e. virtual threads)
clock times/*clock*
mutable? false}}]
(let [times (cond-> times mutable? mutable-times)
step* (if mutable? atom-step seq-step)
pool (doto (ScheduledThreadPoolExecutor. 1 ^ThreadFactory thread-factory)
(.setRemoveOnCancelPolicy true))
!latch (promise)
done? (partial realized? !latch)
current (atom nil)
f (bound-fn* f)
error! (bound-fn* error-handler)
next-times (when-not mutable? (atom nil))]
(letfn [(close [finished?]
(.shutdown pool)
(when (and (deliver !latch ::done) finished? on-finished)
(on-finished)))
(schedule-loop [times]
(let [[curr-time times] (step* times)]
(letfn [(task []
(if (try
(when-not (done?)
;; provide the object provided by the user
;; (as opposed to Instant converted)
(f curr-time)
true)
(catch Exception e
(try
(error! e)
(catch Exception e
(log/error e "error calling chime error-handler, stopping schedule")))))
(do
(cond->> times next-times (reset! next-times))
(schedule-loop times))
(close true)))]
(if curr-time
(let [dlay (->> (times/to-instant curr-time)
(.between ChronoUnit/MILLIS (times/now clock)))]
(if (or (pos? dlay)
(not drop-overruns?))
(->> (.schedule pool ^Runnable task dlay TimeUnit/MILLISECONDS)
(reset! current))
(recur times)))
(close true)))))]
;; kick-off the schedule loop
(schedule-loop times)
(reify ;; the returned object represents 2 things
AutoCloseable ;; whole-schedule
(close [_]
(when-not (done?) ;; aborting a finished schedule is meaningless
(close false) ;; false here means the `on-finished` will NOT be called
;; if we have an abort handler, use it now - otherwise use the finish one
(when-some [f (or on-aborted on-finished)]
(f))))
IDeref ;; whole-schedule
(deref [_] (deref !latch))
IBlockingDeref ;; whole-schedule
(deref [_ ms timeout-val] (deref !latch ms timeout-val))
IPending ;; whole-schedule
(isRealized [_] (done?))
ScheduledFuture ;; next-job
(cancel [_ interrupt?] ;; expose interrupt facilities (opt-in)
(when-let [^ScheduledFuture fut @current]
(let [ret (or (.isCancelled fut)
(.cancel fut interrupt?))]
(when (and (true? ret)
(not (done?)))
;; don't forget to re-schedule starting
;; with the job AFTER the cancelled one
(if mutable?
(schedule-loop times)
(schedule-loop (swap! next-times next))))
ret)))
(getDelay [_ time-unit] ;; expose remaining time until next chime
(when-let [^ScheduledFuture fut @current]
(.getDelay fut time-unit)))
IAtom
(swap [_ f]
(if mutable?
(swap! times f)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
(swap [_ f arg1]
(if mutable?
(swap! times f arg1)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
(swap [_ f arg1 arg2]
(if mutable?
(swap! times f arg1 arg2)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
(swap [_ f arg1 arg2 more]
(if mutable?
(swap! times (partial apply f) arg1 arg2 more)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
IAtom2
(swapVals [_ f]
(if mutable?
(swap-vals! times f)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
(swapVals [_ f arg1]
(if mutable?
(swap-vals! times f arg1)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
(swapVals [_ f arg1 arg2]
(if mutable?
(swap-vals! times f arg1 arg2)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
(swapVals [_ f arg1 arg2 more]
(if mutable?
(swap-vals! times (partial apply f) arg1 arg2 more)
(throw (UnsupportedOperationException. "Schedule NOT mutable!"))))
;ISeq ;; whole schedule
;(first [_] (some-> next-times deref first))
;;; not to be used directly (danger of holding the head)
;;; but more to support seq fns like `second`, `nth` etc
;(next [_] (some-> next-times deref next))
;(more [_] (some-> next-times deref rest))
)))))
;; HIGH-LEVEL API REFLECTING THE SEMANTICS OF THE CONSTRUCT ABOVE
;; ==============================================================
(defn skip-next!
"Cancels the upcoming chime, potentially abruptly,
as it may have already started. The rest of the schedule
will remain unaffected, unless the interruption is handled
by the error-handler (i.e. `InterruptedException`), and it
returns falsy, or throws (the default one returns true).
Returns true if already cancelled."
[sched]
(future-cancel sched))
(defn until-next
"Returns the remaining time (in millis by default)
until the next non-cancelled chime."
(^long [sched]
(until-next sched TimeUnit/MILLISECONDS))
(^long [^ScheduledFuture sched time-unit]
(.getDelay sched time-unit)))
(defn skip-next?!
"Like `skip-next!`, but only if the upcoming task
hasn't already started (with millisecond tolerance)."
[^ScheduledFuture sched]
(when (pos? (until-next sched))
(skip-next! sched)))
(defn skip-next-n!
"Cancels the next <n> tasks.
Returns a vector of booleans (per `skip-next!`)"
[^ScheduledFuture sched n]
(into []
(comp (map (fn [_] (skip-next! sched)))
(take-while true?))
(range n)))
(defn shutdown!
"Gracefully closes the entire schedule (per `pool.shutdown()`).
If the next task hasn't started yet, it will be cancelled,
otherwise it will be allowed to finish."
[^AutoCloseable sched]
(-> (doto sched (.close))
skip-next?!))
(defn shutdown-now!
"Attempts a graceful shutdown (per `shutdown!`), but if the latest task
is already happening attempts to interrupt it. Semantically equivalent
to `pool.shutdownNow()`."
[sched]
(or (shutdown! sched)
(skip-next! sched)))
(defn finished?
"Returns true if the entire schedule has finished, false otherwise."
[sched]
(realized? sched))
(defn wait-for
"Blocking call for waiting until the schedule finishes,
or the provided <timeout-ms> has elapsed. Useful as the
last expression in `with-open`."
([sched]
(deref sched))
([sched timeout-ms timeout-val]
(deref sched timeout-ms timeout-val)))
(defn next-at
"Returns the (future) `ZonedDateTime` when the next chime will occur,
or nil if it has already started (with millisecond tolerance), or cancelled."
(^ZonedDateTime [sched]
(next-at sched times/*clock*))
(^ZonedDateTime [sched ^Clock clock]
(let [remaining (until-next sched)]
(when (pos? remaining)
(-> (ZonedDateTime/now clock)
(.plus remaining ChronoUnit/MILLIS))))))
(defn- append-with*
[f sched]
(doto sched (swap! f)))
(defn append-absolute!
"Assuming a mutable schedule which has not finished,
appends the specified <times> to it. These should be
*after* the last one already in."
[sched & times]
(-> (fn [tsq]
(cond-> tsq
(seq times)
(into times)))
(append-with* sched)))
(defn append-relative-to-last!
"Assuming a mutable schedule which has not finished,
appends the result of `(offset-fn last-chime)` into it."
[sched offset-fn]
(-> (fn [ts]
(if-let [t (last ts)]
(conj ts (offset-fn t))
ts))
(append-with* sched)))
(defn interruptable*
"Returns a function which wraps <f>
with a `Thread.isInterrupted()` check.
Worth considering when using `shutdown-now!` or `skip-next!`."
[f]
(fn [x]
(when-not (.isInterrupted (Thread/currentThread))
(f x))))
(defmacro interruptable
"Like `interruptable*`, but for arbitrary code that
doesn't care about the argument passed to job-fns."
[& body]
`(interruptable* (fn [~'_] ~@body)))
(comment
(def vthread-factory
"A `ThreadFactory` that produces virtual-threads."
(-> (Thread/ofVirtual)
(.name "chime-" 0)
.factory))
;; MUTABLE TIMES EXAMPLE
(defn times []
(->> (times/every-n-seconds 5)
(take 10)
))
(def sched (chime-at (times) println {:thread-factory vthread-factory
:mutable? true
}))
(skip-next?! sched)
(until-next sched)
(append-relative-to-last! sched #(.plusSeconds ^Instant % 2))
(shutdown! sched)
(future-done? sched)
(with-open [sch (chime-at
(take 10 (times/every-n-seconds 1))
println
{:thread-factory vthread-factory
:on-finished #(println "done")
:mutable? true})]
(append-relative-to-last! sch #(.plusSeconds ^Instant % 2))
;(wait-for sch)
(wait-for sch 12000 :timeout) ;; occasionally succeeds
)
(require '[clj-async-profiler.core :as prof])
(prof/profile
(let [sch (chime-at
(take 100 (times/every-n-seconds 1))
println
{:on-finished #(println "done")
:mutable? true})]
;; simulate async mutation
(chime-at [(-> (times/now)
(.plusSeconds 20))]
(fn [_] (append-relative-to-last! sch #(.plusSeconds ^Instant % 2)))
{:on-finished #(println "mutated `sch`")})
(wait-for sch))
)
)