-
Notifications
You must be signed in to change notification settings - Fork 107
/
time.clj
277 lines (242 loc) · 8.94 KB
/
time.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
(ns
^{:author "Zach Tellman"
:doc "This namespace contains methods for converting units of time, with milliseconds as the base representation, and for deferring execution of functions to some time in the future. In practice, the methods here are not necessary to use Manifold effectively - `manifold.deferred/timeout` and `manifold.stream/periodically` are more directly useful - but they are available for anyone who should need them."}
manifold.time
(:require
[clojure.tools.logging :as log]
[manifold.executor :as ex]
[clojure.string :as str]
[manifold.utils :refer [definterface+ defprotocol+]])
(:import
[java.util
Calendar
TimeZone]
[java.util.concurrent
Future
Executor
Executors
TimeUnit
ScheduledExecutorService
ScheduledThreadPoolExecutor
TimeoutException]))
(defn nanoseconds
"Converts nanoseconds -> milliseconds"
[n]
(/ n 1e6))
(defn microseconds
"Converts microseconds -> milliseconds"
[n]
(/ n 1e3))
(defn milliseconds
"Converts milliseconds -> milliseconds"
[n]
n)
(defn seconds
"Converts seconds -> milliseconds"
[n]
(* n 1e3))
(defn minutes
"Converts minutes -> milliseconds"
[n]
(* n 6e4))
(defn hours
"Converts hours -> milliseconds"
[n]
(* n 36e5))
(defn days
"Converts days -> milliseconds"
[n]
(* n 864e5))
(defn hz
"Converts frequency -> period in milliseconds"
[n]
(/ 1e3 n))
(let [intervals (partition 2
["d" (days 1)
"h" (hours 1)
"m" (minutes 1)
"s" (seconds 1)])]
(defn format-duration
"Takes a duration in milliseconds, and returns a formatted string
describing the interval, i.e. '5d 3h 1m'"
[n]
(loop [s "", n n, intervals intervals]
(if (empty? intervals)
(if (empty? s)
"0s"
(str/trim s))
(let [[desc val] (first intervals)]
(if (>= n val)
(recur
(str s (int (/ n val)) desc " ")
(rem n val)
(rest intervals))
(recur s n (rest intervals))))))))
(let [sorted-units [:millisecond Calendar/MILLISECOND
:second Calendar/SECOND
:minute Calendar/MINUTE
:hour Calendar/HOUR
:day Calendar/DAY_OF_YEAR
:week Calendar/WEEK_OF_MONTH
:month Calendar/MONTH]
unit->calendar-unit (apply hash-map sorted-units)
units (->> sorted-units (partition 2) (map first))
unit->cleared-fields (zipmap
units
(map
#(->> (take % units) (map unit->calendar-unit))
(range (count units))))]
(defn floor
"Takes a `timestamp`, and rounds it down to the nearest even multiple of the `unit`.
(floor 1001 :second) => 1000
(floor (seconds 61) :minute) => 60000
"
[timestamp unit]
(assert (contains? unit->calendar-unit unit))
(let [^Calendar cal (doto (Calendar/getInstance (TimeZone/getTimeZone "UTC"))
(.setTimeInMillis timestamp))]
(doseq [field (unit->cleared-fields unit)]
(.set cal field 0))
(.getTimeInMillis cal)))
(defn add
"Takes a `timestamp`, and adds `value` multiples of `unit` to the value."
[timestamp value unit]
(assert (contains? unit->calendar-unit unit))
(let [^Calendar cal (doto (Calendar/getInstance (TimeZone/getTimeZone "UTC"))
(.setTimeInMillis timestamp))]
(.add cal (unit->calendar-unit unit) value)
(.getTimeInMillis cal))))
;;;
(in-ns 'manifold.deferred)
(clojure.core/declare success! error! deferred)
(in-ns 'manifold.time)
;;;
(definterface+ IClock
(in [^double interval-millis ^Runnable f])
(every [^double delay-millis ^double period-millis ^Runnable f]))
(defprotocol+ IMockClock
(now [clock] "Returns the current time for the clock")
(advance [clock time]
"Advances the mock clock by the specified interval of `time`.
Advancing the clock is a continuous action - the clock doesn't just jump
from `now` to `new-now = (+ (now clock) time)`. Rather, for each scheduled
event within `[now; new-now]` the clock is reset to the time of the event
and the event function is executed.
For example, if you have a periodic function scheduled with
(every 1 #(swap! counter inc))
and advance the clock by 5, the counter will be incremented 6 times in
total: once initially, as the initial delay is 0 and 5 times for every 1 ms
step of the clock."))
(defn- cancel-on-exception [f cancel-fn]
(fn []
(try (f)
(catch Throwable t
(cancel-fn)
(throw t)))))
(defn scheduled-executor->clock [^ScheduledExecutorService e]
(reify IClock
(in [_ interval-millis f]
(.schedule e f (long (* interval-millis 1e3)) TimeUnit/MICROSECONDS))
(every [_ delay-millis period-millis f]
(let [future-ref (promise)
cancel-fn (fn []
(let [^Future future @future-ref]
(.cancel future false)))]
(deliver future-ref
(.scheduleAtFixedRate e
^Runnable (cancel-on-exception f cancel-fn)
(long (* delay-millis 1e3))
(long (* period-millis 1e3))
TimeUnit/MICROSECONDS))
cancel-fn))))
(defn mock-clock
"Creates a clock designed for testing scheduled behaviors. It can replace the default
scheduler using `with-clock`, and can be advanced to a particular time via `advance`. By
default, the initial time is `0`."
([]
(mock-clock 0))
([initial-time]
(let [now (atom initial-time)
events (atom (sorted-map))]
(reify
IClock
(in [this interval-millis f]
(swap! events update-in [(+ @now interval-millis)] #(conj (or % []) f))
(advance this 0))
(every [this delay-millis period-millis f]
(assert (< 0 period-millis))
(let [period (atom period-millis)
cancel-fn #(reset! period -1)]
(->> (with-meta (cancel-on-exception f cancel-fn) {::period period})
(.in this (max 0 delay-millis)))
cancel-fn))
IMockClock
(now [_] @now)
(advance
[this time]
(let [limit (+ @now time)]
(loop []
(if (or (empty? @events)
(< limit (key (first @events))))
(do
(reset! now limit)
nil)
(let [[t fs] (first @events)]
(swap! events dissoc t)
(reset! now t)
(doseq [f fs]
(let [period (some-> f meta ::period deref)]
(when (or (nil? period) (pos? period))
(try
(f)
(when period (.in this period f))
(catch Throwable e
(log/warn e "error in mock clock"))))))
(recur))))))))))
(let [num-cores (.availableProcessors (Runtime/getRuntime))
cnt (atom 0)
clock (delay
(scheduled-executor->clock
(ScheduledThreadPoolExecutor.
1
(ex/thread-factory
(fn []
(str "manifold-scheduler-pool-" (swap! cnt inc)))
(deliver (promise) nil)))))]
(def ^:dynamic ^IClock *clock*
(reify IClock
(in [_ f interval] (.in ^IClock @clock f interval))
(every [_ f delay period] (.every ^IClock @clock f delay period)))))
(defmacro with-clock
"Ensures that all calls to `every` and `in` are made through the specified clock, rather
than the default one."
[clock & body]
`(binding [*clock* ~clock]
~@body))
(defn in
"Schedules no-arg function `f` to be invoked in `interval` milliseconds. Returns a deferred
representing the returned value of the function."
[^double interval f]
(let [d (manifold.deferred/deferred)
f (fn []
(try
(manifold.deferred/success! d (f))
(catch Throwable e
(manifold.deferred/error! d e))))]
(.in *clock* interval f)
d))
(defn every
"Schedules no-arg function `f` to be invoked every `period` milliseconds, after `initial-delay`
milliseconds, which defaults to `0`. Returns a zero-argument function which, when invoked,
cancels the repeated invocation.
If the invocation of `f` ever throws an exception, repeated invocation is automatically
cancelled."
([period f]
(every period 0 f))
([period initial-delay f]
(.every *clock* initial-delay period f)))
(defn at
"Schedules no-arg function `f` to be invoked at `timestamp`, which is the milliseconds
since the epoch. Returns a deferred representing the returned value of the function."
[timestamp f]
(in (max 0 (- timestamp (System/currentTimeMillis))) f))