forked from riemann/riemann
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.clj
117 lines (105 loc) · 4.18 KB
/
pool.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
(ns riemann.pool
"A generic thread-safe resource pool."
(:use clojure.tools.logging
[slingshot.slingshot :only [throw+]])
(:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))
; THIS IS A MUTABLE STATE OF AFFAIRS. WHICH IS TO SAY, IT IS FUCKING TERRIBLE.
(defprotocol Pool
(grow [pool]
"Adds an element to the pool.")
(claim [pool] [pool timeout]
"Take a thingy from the pool. Timeout in seconds; if unspecified, 0.
Returns nil if no thingy available.")
(release [pool thingy]
"Returns a thingy to the pool.")
(invalidate [pool thingy]
"Tell the pool a thingy is no longer valid."))
(defrecord FixedQueuePool [queue open close regenerate-interval]
Pool
(grow [this]
(loop []
(if-let [thingy (try (open) (catch Exception t nil))]
(.put ^LinkedBlockingQueue queue thingy)
(do
(Thread/sleep (* 1000 regenerate-interval))
(recur)))))
(claim [this]
(claim this nil))
(claim [this timeout]
(let [timeout (* 1000 (or timeout 0))]
(or
(try
(.poll ^LinkedBlockingQueue queue timeout TimeUnit/MILLISECONDS)
(catch java.lang.InterruptedException e
nil))
(throw+
{:type ::timeout
:message (str "Couldn't claim a resource from the pool within "
timeout " ms")}))))
(release [this thingy]
(when thingy
(.put ^LinkedBlockingQueue queue thingy)))
(invalidate [this thingy]
(when thingy
(try (close thingy)
(catch Exception t
(warn t "Closing" thingy "threw")))
(future (grow this)))))
(defn fixed-pool
"A fixed pool of thingys. (open) is called to generate a thingy. (close
thingy) is called when a thingy is invalidated. When thingys are invalidated,
the pool will immediately try to open a new one; if open throws or returns
nil, the pool will sleep for regenerate-interval seconds before retrying
(open).
- :regenerate-interval How long to wait between retrying (open).
- :size Number of thingys in the pool.
- :block-start Should (fixed-pool) wait until the pool is full
before returning?
Note that fixed-pool is correct only if every successful (claim) is followed
by exactly one of either (invalidate) or (release). If calls are unbalanced;
e.g. resources are not released, doubly released, or released *and*
invalidated, starvation or unbounded blocking could occur. (with-pool)
provides this guarantee."
([open]
(fixed-pool open {}))
([open opts]
(fixed-pool open identity opts))
([open close opts]
(let [^int size (or (:size opts) (* 2 (.availableProcessors
(Runtime/getRuntime))))
regenerate-interval (or (:regenerate-interval opts) 5)
block-start (get opts :block-start true)
pool (FixedQueuePool.
(LinkedBlockingQueue. size)
open
close
regenerate-interval)
openers (doall
(map (fn open-pool [_]
(future (grow pool)))
(range size)))]
(when block-start
(doseq [worker openers] @worker))
pool)))
(defmacro with-pool
"Evaluates body in a try expression with a symbol 'thingy claimed from the
given pool, with specified claim timeout. Releases thingy at the end of the
body, or if an exception is thrown, invalidates them and rethrows. Example:
```clojure
; With client, taken from connection-pool, waiting 5 seconds to claim, send
; client a message.
(with-pool [client connection-pool 5]
(send client a-message))
```"
[[thingy pool timeout] & body]
; Destructuring bind could change nil to a, say, vector, and cause
; unbalanced claim/release.
`(let [thingy# (claim ~pool ~timeout)
~thingy thingy#]
(try
(let [res# (do ~@body)]
(release ~pool thingy#)
res#)
(catch Exception t#
(invalidate ~pool thingy#)
(throw t#)))))