-
Notifications
You must be signed in to change notification settings - Fork 38
/
at_at.clj
409 lines (362 loc) · 15 KB
/
at_at.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
(ns overtone.at-at
(:require
[clojure.pprint :as pprint])
(:import
(java.io Writer)
(java.util.concurrent Executors Future ScheduledThreadPoolExecutor ThreadFactory ThreadPoolExecutor TimeUnit)))
(declare job-string)
(defn uncaught-exception-handler
"Called when a scheduled function throws. Use `alter-var-root` to customize
this."
[throwable job]
(println (str throwable " thrown by at-at task: " (job-string job)))
(.printStackTrace throwable)
(throw throwable))
(defrecord PoolInfo [thread-pool jobs-ref id-count-ref])
(defrecord MutablePool [pool-atom])
(defrecord RecurringJob [id created-at ms-period initial-delay job pool-info desc scheduled?])
(defrecord ScheduledJob [id created-at initial-delay job pool-info desc scheduled?])
(defn- format-date
"Format date object as a string such as: 15:23:35s"
[date]
(.format (java.text.SimpleDateFormat. "EEE hh':'mm':'ss's'") date))
(defmethod pprint/simple-dispatch PoolInfo [obj]
(println (str "#<PoolInfo: " (:thread-pool obj) " "
(count @(:jobs-ref obj)) " jobs>")))
(defmethod print-method PoolInfo
[obj ^Writer w]
(.write w (str "#<PoolInfo: " (:thread-pool obj) " "
(count @(:jobs-ref obj)) " jobs>")))
(defmethod pprint/simple-dispatch MutablePool [obj]
(println (str "#<MutablePool - "
"jobs: "(count @(:jobs-ref @(:pool-atom obj)))
">")))
(defmethod print-method MutablePool
[obj ^Writer w]
(.write w (str "#<MutablePool - "
"jobs: "(count @(:jobs-ref @(:pool-atom obj)))
">")))
(defmethod pprint/simple-dispatch RecurringJob [obj]
(println (str "#<RecurringJob id: " (:id obj)
", created-at: " (format-date (:created-at obj))
", ms-period: " (:ms-period obj)
", initial-delay: " (:initial-delay obj)
", desc: \"" (:desc obj) "\""
", scheduled? " @(:scheduled? obj) ">")))
(defmethod print-method RecurringJob
[obj ^Writer w]
(.write w (str "#<RecurringJob id: " (:id obj)
", created-at: " (format-date (:created-at obj))
", ms-period: " (:ms-period obj)
", initial-delay: " (:initial-delay obj)
", desc: \"" (:desc obj) "\""
", scheduled? " @(:scheduled? obj) ">")))
(defmethod pprint/simple-dispatch ScheduledJob [obj]
(println (str "#<ScheduledJob id: " (:id obj)
", created-at: " (format-date (:created-at obj))
", initial-delay: " (:initial-delay obj)
", desc: \"" (:desc obj) "\""
", scheduled? " @(:scheduled? obj) ">")))
(defmethod print-method ScheduledJob
[obj ^Writer w]
(.write w (str "#<ScheduledJob id: " (:id obj)
", created-at: " (format-date (:created-at obj))
", initial-delay: " (:initial-delay obj)
", desc: \"" (:desc obj) "\""
", scheduled? " @(:scheduled? obj) ">")))
(defn- switch!
"Sets the value of atom to new-val. Similar to reset! except returns the
immediately previous value."
[atom new-val]
(let [old-val @atom
success? (compare-and-set! atom old-val new-val)]
(if success?
old-val
(recur atom new-val))))
(defn- cpu-count
"Returns the number of CPUs on this machine."
[]
(.availableProcessors (Runtime/getRuntime)))
(defn- wrap-fun-with-exception-handler
[fun job-info-prom]
(fn [& args]
(try
(apply fun args)
(catch Throwable t
(uncaught-exception-handler t @job-info-prom)))))
(defn- schedule-job
"Schedule the fun to execute periodically in pool-info's pool with the
specified initial-delay and ms-period. Returns a RecurringJob record."
[pool-info fun initial-delay ms-period desc interspaced?]
(let [initial-delay (long initial-delay)
ms-period (long ms-period)
^ScheduledThreadPoolExecutor t-pool (:thread-pool pool-info)
job-info-prom (promise)
^Callable fun (wrap-fun-with-exception-handler fun job-info-prom)
job (if interspaced?
(.scheduleWithFixedDelay t-pool
fun
initial-delay
ms-period
TimeUnit/MILLISECONDS)
(.scheduleAtFixedRate t-pool
fun
initial-delay
ms-period
TimeUnit/MILLISECONDS))
start-time (System/currentTimeMillis)
jobs-ref (:jobs-ref pool-info)
id-count-ref (:id-count-ref pool-info)
job-info (dosync
(let [id (commute id-count-ref inc)
job-info (RecurringJob. id
start-time
ms-period
initial-delay
job
pool-info
desc
(atom true))]
(commute jobs-ref assoc id job-info)
job-info))]
(deliver job-info-prom job-info)
job-info))
(defn- wrap-fun-to-remove-itself
[fun jobs-ref job-info-prom]
(fn [& args]
(let [job-info @job-info-prom
id (:id job-info)
sched-ref (:scheduled? job-info)]
(reset! sched-ref false)
(dosync
(commute jobs-ref dissoc id))
(apply fun args))))
(defn- schedule-at
"Schedule the fun to execute once in the pool-info's pool after the
specified initial-delay. Returns a ScheduledJob record."
[pool-info fun initial-delay desc]
(let [initial-delay (long initial-delay)
^ScheduledThreadPoolExecutor t-pool (:thread-pool pool-info)
jobs-ref (:jobs-ref pool-info)
job-info-prom (promise)
^Callable fun (-> fun
(wrap-fun-with-exception-handler job-info-prom)
(wrap-fun-to-remove-itself jobs-ref job-info-prom))
job (.schedule t-pool fun initial-delay TimeUnit/MILLISECONDS)
start-time (System/currentTimeMillis)
id-count-ref (:id-count-ref pool-info)
job-info (dosync
(let [id (commute id-count-ref inc)
job-info (ScheduledJob. id
start-time
initial-delay
job
pool-info
desc
(atom true))]
(commute jobs-ref assoc id job-info)
job-info))]
(deliver job-info-prom job-info)
job-info))
(defn- shutdown-pool-now!
"Shut the pool down NOW!"
[pool-info]
(.shutdownNow ^ScheduledThreadPoolExecutor (:thread-pool pool-info))
(doseq [job (vals @(:jobs-ref pool-info))]
(reset! (:scheduled? job) false)))
(defn- shutdown-pool-gracefully!
"Shut the pool down gracefully - waits until all previously
submitted jobs have completed"
[pool-info]
(.shutdown ^ScheduledThreadPoolExecutor (:thread-pool pool-info))
(let [jobs (vals @(:jobs-ref pool-info))]
(future
(loop [jobs jobs]
(doseq [job jobs]
(when (and @(:scheduled? job)
(or
(.isCancelled ^Future (:job job))
(.isDone ^Future (:job job))))
(reset! (:scheduled? job) false)))
(when-let [jobs (filter (fn [j] @(:scheduled? j)) jobs)]
(Thread/sleep 500)
(when (seq jobs)
(recur jobs)))))))
(defn- mk-sched-thread-pool
"Create a new scheduled thread pool containing num-threads threads."
[num-threads]
(let [thread-factory (Executors/defaultThreadFactory)
t-pool (ScheduledThreadPoolExecutor.
num-threads
(reify ThreadFactory
(newThread [this runnable]
(let [thread (.newThread thread-factory runnable)]
(.setName thread (str "at-at-" (.getName thread)))
thread))))]
t-pool))
(defn- mk-pool-info
[t-pool]
(PoolInfo. t-pool (ref {}) (ref 0N)))
(defn mk-pool
"Returns MutablePool record storing a mutable reference (atom) to a
PoolInfo record which contains a newly created pool of threads to
schedule new events for. Pool size defaults to the cpu count + 2."
[& {:keys [cpu-count stop-delayed? stop-periodic?]
:or {cpu-count (+ 2 (cpu-count))}}]
(MutablePool. (atom (mk-pool-info (mk-sched-thread-pool cpu-count)))))
(defn every
"Calls fun every ms-period, and takes an optional initial-delay for
the first call in ms. Returns a scheduled-fn which may be cancelled
with stop / kill.
Default options are
{:initial-delay 0 :desc \"\"}"
[ms-period fun pool & {:keys [initial-delay desc]
:or {initial-delay 0
desc ""}}]
(schedule-job @(:pool-atom pool) fun initial-delay ms-period desc false))
(defn interspaced
"Calls fun repeatedly with an interspacing of ms-period, i.e. the next
call of fun will happen ms-period milliseconds after the completion
of the previous call. Also takes an optional initial-delay for the
first call in ms. Returns a scheduled-fn which may be cancelled with
stop / kill.
Default options are
{:initial-delay 0 :desc \"\"}"
[ms-period fun pool & {:keys [initial-delay desc]
:or {initial-delay 0
desc ""}}]
(schedule-job @(:pool-atom pool) fun initial-delay ms-period desc true))
(defn now
"Return the current time in ms"
[]
(System/currentTimeMillis))
(defn at
"Schedules fun to be executed at ms-time (in milliseconds).
Use (now) to get the current time in ms.
Example usage:
(at (+ 1000 (now))
#(println \"hello from the past\")
pool
:desc \"Message from the past\") ;=> prints 1s from now"
[ms-time fun pool & {:keys [desc]
:or {desc ""}}]
(let [initial-delay (- ms-time (now))
pool-info @(:pool-atom pool)]
(schedule-at pool-info fun initial-delay desc)))
(defn after
"Schedules fun to be executed after delay-ms (in
milliseconds).
Example usage:
(after 1000
#(println \"hello from the past\")
pool
:desc \"Message from the past\") ;=> prints 1s from now"
[delay-ms fun pool & {:keys [desc]
:or {desc ""}}]
(let [pool-info @(:pool-atom pool)]
(schedule-at pool-info fun delay-ms desc)))
(defn shutdown-pool!
[pool-info strategy]
(case strategy
:stop (shutdown-pool-gracefully! pool-info)
:kill (shutdown-pool-now! pool-info)))
(defn stop-and-reset-pool!
"Shuts down the threadpool of given MutablePool using the specified
strategy (defaults to :stop). Shutdown happens asynchronously on a
separate thread. The pool is reset to a fresh new pool preserving
the original size. Returns the old pool-info.
Strategies for stopping the old pool:
:stop - allows all running and scheduled tasks to complete before
waiting
:kill - forcefully interrupts all running tasks and does not wait
Example usage:
(stop-and-reset-pool! pool) ;=> pool is reset gracefully
(stop-and-reset-pool! pool
:strategy :kill) ;=> pool is reset forcefully"
[pool & {:keys [strategy]
:or {strategy :stop}}]
(when-not (some #{strategy} #{:stop :kill})
(throw (Exception. (str "Error: unknown pool stopping strategy: " strategy ". Expecting one of :stop or :kill"))))
(let [pool-atom (:pool-atom pool)
^ThreadPoolExecutor tp-executor (:thread-pool @pool-atom)
num-threads (.getCorePoolSize tp-executor)
new-t-pool (mk-sched-thread-pool num-threads)
new-pool-info (mk-pool-info new-t-pool)
old-pool-info (switch! pool-atom new-pool-info)]
(future (shutdown-pool! old-pool-info strategy))
old-pool-info))
(defn- cancel-job
"Cancel/stop scheduled fn if it hasn't already executed"
[job-info cancel-immediately?]
(if (:scheduled? job-info)
(let [job (:job job-info)
id (:id job-info)
pool-info (:pool-info job-info)
pool (:thread-pool pool-info)
jobs-ref (:jobs-ref pool-info)]
(.cancel ^Future job cancel-immediately?)
(reset! (:scheduled? job-info) false)
(dosync
(let [job (get @jobs-ref id)]
(commute jobs-ref dissoc id)
(true? (and job (nil? (get @jobs-ref id))))))) ;;return true if success
false))
(defn- cancel-job-id
[id pool cancel-immediately?]
(let [pool-info @(:pool-atom pool)
jobs-info @(:jobs-ref pool-info)
job-info (get jobs-info id)]
(cancel-job job-info cancel-immediately?)))
(defn stop
"Stop a recurring or scheduled job gracefully either using a
corresponding record or unique id. If you specify an id, you also
need to pass the associated pool."
([job] (cancel-job job false))
([id pool] (cancel-job-id id pool false)))
(defn kill
"kill a recurring or scheduled job forcefully either using a
corresponding record or unique id. If you specify an id, you also
need to pass the associated pool."
([job] (cancel-job job true))
([id pool] (cancel-job-id id pool true)))
(defn scheduled-jobs
"Returns a set of all current jobs (both scheduled and recurring)
for the specified pool."
[pool]
(let [pool-atom (:pool-atom pool)
jobs @(:jobs-ref @pool-atom)
jobs (vals jobs)]
jobs))
(defn- format-start-time
[date]
(if (< date (now))
""
(str ", starts at: " (format-date date))))
(defn- recurring-job-string
[job]
(str "[" (:id job) "]"
"[RECUR] created: " (format-date (:created-at job))
(format-start-time (+ (:created-at job) (:initial-delay job)))
", period: " (:ms-period job) "ms"
(when (not= "" (:desc job))
(str ", desc: \"" (:desc job) "\""))))
(defn- scheduled-job-string
[job]
(str "[" (:id job) "]"
"[SCHED] created: " (format-date (:created-at job))
(format-start-time (+ (:created-at job) (:initial-delay job)))
(when (not= "" (:desc job))
(str ", desc: \"" (:desc job) "\""))))
(defn- job-string
[job]
(cond
(= RecurringJob (type job)) (recurring-job-string job)
(= ScheduledJob (type job)) (scheduled-job-string job)))
(defn show-schedule
"Pretty print all of the pool's scheduled jobs"
([pool]
(let [jobs (scheduled-jobs pool)]
(if (empty? jobs)
(println "No jobs are currently scheduled.")
(dorun
(map #(println (job-string %)) jobs))))))