forked from riemann/riemann
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.clj
371 lines (331 loc) · 12.8 KB
/
service.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
(ns riemann.service
"Lifecycle protocol for stateful services bound to a core."
(:require wall.hack
riemann.instrumentation)
(:use clojure.tools.logging
[riemann.time :only [unix-time every! cancel]])
(:import (riemann.instrumentation Instrumented)
(java.util.concurrent TimeUnit
ThreadFactory
AbstractExecutorService
Executor
ExecutorService
BlockingQueue
LinkedBlockingQueue
RejectedExecutionException
ArrayBlockingQueue
SynchronousQueue
ThreadPoolExecutor)))
(defprotocol Service
"Services are components of a core with a managed lifecycle. They're used for
stateful things like connection pools, network servers, and background
threads."
(reload! [service core]
"Informs the service of a change in core.")
(start! [service]
"Starts a service. Must be idempotent.")
(stop! [service]
"Stops a service. Must be idempotent.")
(conflict? [service1 service2]
"Do these two services conflict with one another? Adding
a service to a core *replaces* any conflicting services."))
(defprotocol ServiceEquiv
(equiv? [service1 service2]
"Used to identify which services can remain running through a core
transition, like reloading. If the old service is equivalent to the
new service, the old service may be preserved and used by the new
core. Otherwise, the old service may be shut down and replaced by
the new."))
(extend-protocol ServiceEquiv
nil
(equiv? [s1 s2]
(nil? s2)))
(defprotocol ServiceStatus
(running? [service]
"Queries the running state of the service."))
(defrecord ScheduledTaskService [name equiv-key interval delay f core task]
ServiceEquiv
(equiv? [this other]
(and
(instance? ScheduledTaskService other)
(= name (:name other))
(= interval (:interval other))
(= delay (:delay other))
(= equiv-key (:equiv-key other))))
Service
(conflict? [this other]
(and
(instance? ScheduledTaskService other)
(= name (:name other))))
(reload! [this new-core]
(reset! core new-core))
(start! [this]
(locking this
(when @task
(cancel @task))
(let [t (every! interval delay #(f @core))]
(reset! task t))))
(stop! [this]
(locking this
(when @task
(cancel @task)))))
(defrecord ThreadService [name equiv-key f core running thread]
ServiceEquiv
(equiv? [this other]
(and
(instance? ThreadService other)
(= name (:name other))
(= equiv-key (:equiv-key other))))
Service
(conflict? [this other]
(and
(instance? ThreadService other)
(= name (:name other))))
(reload! [this new-core]
(reset! core new-core))
(start! [this]
(locking this
(when-not @running
(reset! running true)
(let [t (Thread. (fn thread-service-runner []
(while @running
(try
(f @core)
(catch InterruptedException e
:interrupted)))))]
(reset! thread t)
(.start t)))))
(stop! [this]
(locking this
(when @running
(reset! running false)
(.interrupt ^Thread @thread)
; Wait for exit
(while (.isAlive ^Thread @thread)
(Thread/sleep 5))))))
(defn thread-service
"Returns a ThreadService which will call (f core) repeatedly when started.
Will only stop between calls to f. Start and stop are blocking operations.
Equivalent to other ThreadServices with the same name and equivalence key--
if not provided, defaults nil. Conflicts with other ThreadServices of the
same name."
([name f]
(thread-service name nil f))
([name equiv-key f]
(ThreadService. name equiv-key f (atom nil) (atom false) (atom nil))))
(defn scheduled-task-service
"Returns a ScheduledTaskService which will schedule a task which call (f core)
repeatedly every `interval` after `delay` seconds.
Equivalent to other ScheduledTaskService with the same `name`, `equiv-key`,
`delay` and `interval`.
Conflicts with other ScheduledTaskService of the same name."
([name equiv-key interval delay f]
(ScheduledTaskService. name equiv-key interval delay f (atom nil) (atom nil))))
(defmacro all-equal?
"Takes two objects to compare and a list of forms to compare them by.
```clojure
(all-equiv? foo bar
(class)
(foo 2)
(.getSize))
```
becomes
```clojure
(let [a foo
b bar]
(and (= (class a) (class b))
(= (foo 2 a) (foo 2 b))
(= (.getSize a) (.getSize b))))
```"
[a b & forms]
(let [asym (gensym "a__")
bsym (gensym "b__")]
`(let [~asym ~a
~bsym ~b]
(and ~@(map (fn [[fun & args]]
`(= (~fun ~asym ~@args) (~fun ~bsym ~@args)))
forms)))))
; Wraps an ExecutorService with a start/stop lifecycle
(defprotocol IExecutorServiceService
(getExecutor [this]))
(deftype ExecutorServiceService
[name equiv-key f ^:volatile-mutable ^ExecutorService executor stats]
IExecutorServiceService
(getExecutor [this] executor)
ServiceEquiv
(equiv? [a b]
(all-equal? a ^ExecutorServiceService b
(class)
(.name)
(.equiv-key)))
Service
(conflict? [a b]
(all-equal? a ^ExecutorServiceService b
(class)
(.name)))
(reload! [this new-core])
(start! [this]
(locking this
(when-not executor
(info "Executor Service" name "starting")
(let [x (f)
queue-capacity (if (instance? ThreadPoolExecutor x)
(.remainingCapacity
(.getQueue ^ThreadPoolExecutor x))
Integer/MAX_VALUE)]
(reset! stats {:accepted 0
:completed 0
:rejected 0
:time (unix-time)
:queue-capacity queue-capacity})
(set! executor x)))))
(stop! [this]
(locking this
(when executor
(info "Executor Service" name "stopping")
(.shutdown executor)
(set! executor nil))))
Executor
(execute [this runnable]
(if-let [x executor]
(try
(.execute x runnable)
(catch RejectedExecutionException e
; Update rejected stats and rethrow
(swap! stats (fn [stats] (assoc stats :rejected
(inc (:rejected stats)))))
(throw e)))
(throw (RejectedExecutionException.
(str "ExecutorServiceService " name
" isn't running.")))))
Instrumented
(events [this]
(when (instance? ThreadPoolExecutor executor)
(let [time (unix-time)
tasks-completed (.getCompletedTaskCount executor)
tasks-accepted (.getTaskCount executor)
{:keys [queue-capacity
dcompleted
daccepted
drejected
dtime]} (swap!
stats
(fn [stats]
(merge stats
{:dcompleted (- tasks-completed
(:completed stats))
:completed tasks-completed
:daccepted (- tasks-accepted
(:accepted stats))
:accepted tasks-accepted
:drejected (:rejected stats)
:rejected 0
:dtime (- time (:time stats))
:time time})))
threads-active (.getActiveCount executor)
queue-size (max 0 (- tasks-accepted
tasks-completed
threads-active))
queue-used (/ queue-size queue-capacity)
queue-used-state (condp < queue-used
3/4 "critical"
1/2 "warning"
"ok")
s (partial str "riemann executor " (clojure.core/name name) " ")]
[{:service (s "accepted rate")
:metric (/ daccepted dtime)
:tags ["riemann"]
:state "ok"
:time time}
{:service (s "completed rate")
:metric (/ dcompleted dtime)
:tags ["riemann"]
:state "ok"
:time time}
{:service (s "rejected rate")
:metric (/ drejected dtime)
:tags ["riemann"]
:state (if (pos? drejected) "warning" "ok")
:time time}
{:service (s "queue capacity")
:metric queue-capacity
:tags ["riemann"]
:state "ok"
:time time}
{:service (s "queue size")
:metric queue-size
:tags ["riemann"]
:state queue-used-state
:time time}
{:service (s "queue used")
:metric queue-used
:tags ["riemann"]
:state queue-used-state
:time time}
{:service (s "threads active")
:metric threads-active
:tags ["riemann"]
:state "ok"
:time time}
{:service (s "threads alive")
:metric (.getPoolSize executor)
:tags ["riemann"]
:state "ok"
:time time}]))))
(defn executor-service
"Creates a new threadpool executor service ... service! Takes a function
which generates an ExecutorService. Returns an ExecutorServiceService which
provides start/stop/reload/equiv? lifecycle management of that service.
Equivalence-key controls how services are compared to tell if they are
equivalent. Services with a nil equivalence key are *never* equivalent.
Otherwise, services are equivalent when their class, name, and equiv-key are
equal.
```clojure
(executor-service* :graphite {foo: 4}
#(ThreadPoolExecutor. 2 ...))
```"
([name f] (executor-service name nil f))
([name equiv-key f]
(ExecutorServiceService. name equiv-key f nil (atom nil))))
(defmacro literal-executor-service
"Like executor-service, but captures the *expression* passed to it as the
equivalence key. This only works if the expression is literal; if there are
any variables or function calls, they will be compared as code, not as
their evaluated values.
- OK: (literal-executor-service :io (ThreadPoolExecutor. 2 ...))
- OK: (literal-executor-service :io (ThreadPoolExecutor. (inc 1) ...))
- BAD: (literal-executor-service :io (ThreadPoolExecutor. x ...))"
[name executor-service-expr]
`(executor-service
~name
(quote ~executor-service-expr)
(fn [] ~executor-service-expr)))
(defn threadpool-service
"An ExecutorServiceService based on a ThreadPoolExecutor with core and
maximum threadpool sizes, and a LinkedBlockingQueue of a given size. Options:
- :core-pool-size Default 1
- :max-pool-size Default 128
- :keep-alive-time Default 10
- :keep-alive-unit Default MILLISECONDS
- :queue-size Default 1000"
([name] (threadpool-service name {}))
([name {:keys [core-pool-size
max-pool-size
keep-alive-time
keep-alive-unit
queue-size]
:as opts
:or {core-pool-size 1
max-pool-size 128
keep-alive-time 10
keep-alive-unit TimeUnit/MILLISECONDS
queue-size 1000}}]
(executor-service
name
(merge opts {:type `threadpool-service})
#(ThreadPoolExecutor.
core-pool-size
max-pool-size
keep-alive-time
keep-alive-unit
(LinkedBlockingQueue. ^int queue-size)))))