-
Notifications
You must be signed in to change notification settings - Fork 6
/
bulkhead.clj
238 lines (195 loc) · 10 KB
/
bulkhead.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
(ns resilience.bulkhead
(:refer-clojure :exclude [name])
(:require [resilience.util :as u]
[resilience.spec :as s])
(:import (io.github.resilience4j.bulkhead BulkheadConfig BulkheadConfig$Builder BulkheadRegistry Bulkhead)
(io.github.resilience4j.bulkhead.event BulkheadEvent BulkheadEvent$Type)
(io.github.resilience4j.core EventConsumer)))
(defn ^BulkheadConfig bulkhead-config
"Create a BulkheadConfig.
Allowed options are:
* :max-concurrent-calls
Configures the max amount of concurrent calls the bulkhead will support.
* :max-wait-millis
Configures a maximum amount of time in ms the calling thread will wait
to enter the bulkhead. If bulkhead has space available, entry is
guaranteed and immediate. If bulkhead is full, calling threads will
contest for space, if it becomes available. :max-wait-millis can be set to 0.
Note: for threads running on an event-loop or equivalent (rx computation
pool, etc), setting :max-wait-time to 0 is highly recommended. Blocking an
event-loop thread will most likely have a negative effect on application
throughput.
"
[opts]
(s/verify-opt-map-keys-with-spec :bulkhead/bulkhead-config opts)
(if (empty? opts)
(throw (IllegalArgumentException. "please provide not empty configuration for bulkhead."))
(let [^BulkheadConfig$Builder config (BulkheadConfig/custom)]
(when-let [max-calls (:max-concurrent-calls opts)]
(.maxConcurrentCalls config (int max-calls)))
(when-let [wait-millis (:max-wait-millis opts)]
(.maxWaitTime config wait-millis))
(.build config))))
(defn ^BulkheadRegistry registry-with-config
"Create a BulkheadRegistry with a bulkhead configurations map.
Please refer to `bulkhead-config` for allowed key value pairs
within the bulkhead configuration map."
[^BulkheadConfig config]
(let [c (if (instance? BulkheadConfig config)
config
(bulkhead-config config))]
(BulkheadRegistry/of c)))
(defmacro defregistry
"Define a BulkheadRegistry under `name` with a bulkhead configurations map.
Please refer to `bulkhead-config` for allowed key value pairs
within the bulkhead configuration map."
[name config]
(let [sym (with-meta (symbol name) {:tag `BulkheadRegistry})]
`(def ~sym
(let [config# (bulkhead-config ~config)]
(registry-with-config config#)))))
(defn get-all-bulkheads
"Get all bulkhead registered to this bulkhead registry instance"
[^BulkheadRegistry registry]
(let [heads (.getAllBulkheads registry)
iter (.iterator heads)]
(u/lazy-seq-from-iterator iter)))
(defn ^Bulkhead bulkhead
"Create a bulkhead with a `name` and a bulkhead configurations map.
The `name` argument is only used to register this newly created bulkhead
to a BulkheadRegistry. If you don't want to bind this bulkhead with
a BulkheadRegistry, the `name` argument is ignored.
Please refer to `bulkhead-config` for allowed key value pairs
within the bulkhead configurations map.
If you want to register this bulkhead to a BulkheadRegistry,
you need to put :registry key with a BulkheadRegistry in the `config`
argument. If you do not provide any other configurations, the newly created
bulkhead will inherit bulkhead configurations from this
provided BulkheadRegistry
Example:
(bulkhead my-bulkhead {:registry my-registry})
If you want to register this bulkhead to a BulkheadRegistry
and you want to use new bulkhead configurations to overwrite the configurations
inherited from the registered BulkheadRegistry,
you need not only provide the :registry key with the BulkheadRegistry in `config`
argument but also provide other bulkhead configurations you'd like to overwrite.
Example:
(bulkhead my-bulkhead {:registry my-registry
:max-wait-millis 50})
If you only want to create a bulkhead and not register it to any
BulkheadRegistry, you just need to provide bulkhead configurations in `config`
argument. The `name` argument is ignored."
[^String name config]
(let [^BulkheadRegistry registry (:registry config)
config (dissoc config :registry)]
(cond
(and registry (not-empty config))
(let [config (bulkhead-config config)]
(.bulkhead registry name ^BulkheadConfig config))
registry
(.bulkhead registry name)
:else
(let [config (bulkhead-config config)]
(Bulkhead/of name ^BulkheadConfig config)))))
(defmacro defbulkhead
"Define a bulkhead under `name` and use the same name to register
the newly created bulkhead to bulkhead registry.
Please refer to `bulkhead-config` for allowed key value pairs
within the bulkhead configurations map.
If you want to register this bulkhead to a BulkheadRegistry,
you need to put :registry key with a BulkheadRegistry in the `config`
argument. If you do not provide any other configurations, the newly created
bulkhead will inherit bulkhead configurations from this
provided BulkheadRegistry
Example:
(defbulkhead my-bulkhead {:registry my-registry})
If you want to register this bulkhead to a BulkheadRegistry
and you want to use new bulkhead configurations to overwrite the configurations
inherited from the registered BulkheadRegistry,
you need not only provide the :registry key with the BulkheadRegistry in `config`
argument but also provide other bulkhead configurations you'd like to overwrite.
Example:
(defbulkhead my-bulkhead {:registry my-registry
:max-wait-millis 50})
If you only want to create a bulkhead and not register it to any
BulkheadRegistry, you just need to provide bulkhead configurations in `config`
argument."
[name config]
(let [sym (with-meta (symbol name) {:tag `Bulkhead})
^String name-in-string (str *ns* "/" name)]
`(def ~sym (bulkhead ~name-in-string ~config))))
(defn ^String name
"Get the name of this Bulkhead"
[^Bulkhead bulkhead]
(.getName bulkhead))
(defn ^BulkheadConfig config
"Get the Metrics of this Bulkhead"
[^Bulkhead bulkhead]
(.getBulkheadConfig bulkhead))
(defn metrics
"Get the BulkheadConfig of this Bulkhead"
[^Bulkhead bulkhead]
(let [metric (.getMetrics bulkhead)]
{:available-concurrent-calls (.getAvailableConcurrentCalls metric)}))
(def ^{:dynamic true
:doc "Contextual value represents bulkhead name"}
*bulkhead-name*)
(def ^{:dynamic true
:doc "Contextual value represents event create time"}
*creation-time*)
(defmacro ^{:private true :no-doc true} with-context [abstract-event & body]
(let [abstract-event (vary-meta abstract-event assoc :tag `BulkheadEvent)]
`(binding [*bulkhead-name* (.getBulkheadName ~abstract-event)
*creation-time* (.getCreationTime ~abstract-event)]
~@body)))
(defn- create-consumer [consumer-fn]
(reify EventConsumer
(consumeEvent [_ event]
(with-context event
(consumer-fn)))))
(defn set-on-call-rejected-event-consumer!
[^Bulkhead bulkhead consumer-fn]
"set a consumer to consume `on-call-rejected` event which emitted when request rejected by bulkhead.
`consumer-fn` accepts a function which takes no arguments.
Please note that in `consumer-fn` you can get the bulkhead name and the creation time of the
consumed event by accessing `*bulkhead-name*` and `*creation-time*` under this namespace."
(let [pub (.getEventPublisher bulkhead)]
(.onCallRejected pub (create-consumer consumer-fn))))
(defn set-on-call-permitted-event-consumer!
"set a consumer to consume `on-call-permitted` event which emitted when request permitted by bulkhead.
`consumer-fn` accepts a function which takes no arguments.
Please note that in `consumer-fn` you can get the bulkhead name and the creation time of the
consumed event by accessing `*bulkhead-name*` and `*creation-time*` under this namespace."
[^Bulkhead bulkhead consumer-fn]
(let [pub (.getEventPublisher bulkhead)]
(.onCallPermitted pub (create-consumer consumer-fn))))
(defn set-on-call-finished-event-consumer!
[^Bulkhead bulkhead consumer-fn]
"set a consumer to consume `on-call-finished` event which emitted when a request finished and leave this bulkhead.
`consumer-fn` accepts a function which takes no arguments.
Please note that in `consumer-fn` you can get the bulkhead name and the creation time of the
consumed event by accessing `*bulkhead-name*` and `*creation-time*` under this namespace."
(let [pub (.getEventPublisher bulkhead)]
(.onCallFinished pub (create-consumer consumer-fn))))
(defn set-on-all-event-consumer!
[^Bulkhead bulkhead consumer-fn-map]
"set a consumer to consume all available events emitted from the bulkhead.
`consumer-fn-map` accepts a map which contains following key and function pairs:
* `on-call-rejected` accepts a function which takes no arguments
* `on-call-permitted` accepts a function which takes no arguments
* `on-call-finished` accepts a function which takes no arguments
Please note that in `consumer-fn` you can get the bulkhead name and the creation time of the
consumed event by accessing `*bulkhead-name*` and `*creation-time*` under this namespace."
(let [pub (.getEventPublisher bulkhead)]
(.onEvent pub (reify EventConsumer
(consumeEvent [_ event]
(with-context event
(when-let [consumer-fn (->> (u/case-enum (.getEventType ^BulkheadEvent event)
BulkheadEvent$Type/CALL_REJECTED
:on-call-rejected
BulkheadEvent$Type/CALL_PERMITTED
:on-call-permitted
BulkheadEvent$Type/CALL_FINISHED
:on-call-finished)
(get consumer-fn-map))]
(consumer-fn))))))))