-
-
Notifications
You must be signed in to change notification settings - Fork 26
/
rate_limiter.clj
103 lines (93 loc) · 3.77 KB
/
rate_limiter.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
(ns diehard.rate-limiter)
(defprotocol IRateLimiter
(acquire!
[this]
[this permits]
"Acquire given number of permits. It will block until there are permits available.")
(try-acquire
[this]
[this permits]
[this permits wait-time]
"Try to acquire given number of permits, allows blocking for at most `wait-ms` milliseconds.
Return true if there are enough permits in permitted time."))
(declare refill acquire-sleep-ms try-acquire-sleep-ms)
(defn- do-acquire [rate-limiter permits]
(refill rate-limiter)
(acquire-sleep-ms rate-limiter permits))
(defn- do-try-acquire [rate-limiter permits max-wait-ms]
(refill rate-limiter)
(try-acquire-sleep-ms rate-limiter permits max-wait-ms))
(defrecord TokenBucketRateLimiter [rate max-tokens
;; internal state
state]
IRateLimiter
(acquire! [this]
(acquire! this 1))
(acquire! [this permits]
(let [sleep (do-acquire this permits)]
(when (> sleep 0)
(Thread/sleep sleep))))
(try-acquire [this]
(try-acquire this 1))
(try-acquire [this permits]
(try-acquire this permits 0))
(try-acquire [this permits wait-ms]
(let [sleep (do-try-acquire this permits wait-ms)]
(if (false? sleep)
false
(do
(when (> sleep 0)
(Thread/sleep sleep))
true)))))
(defn- refill [^TokenBucketRateLimiter rate-limiter]
;; refill
(let [now (System/currentTimeMillis)]
(swap! (.-state rate-limiter)
(fn [state]
(-> state
(update :reserved-tokens
#(if (> (:last-refill-ts state) 0)
(max (- (.-max-tokens rate-limiter))
(- % (* (- now (:last-refill-ts state))
(.-rate rate-limiter))))
%))
(assoc :last-refill-ts now))))))
(defn- acquire-sleep-ms [^TokenBucketRateLimiter rate-limiter permits]
(let [{pending-tokens :reserved-tokens} (swap! (.-state rate-limiter)
update :reserved-tokens + permits)]
(if (<= pending-tokens 0)
0
;; time as milliseconds
(long (/ pending-tokens (.-rate rate-limiter))))))
(defn- try-acquire-sleep-ms [^TokenBucketRateLimiter rate-limiter permits max-wait-ms]
(try
(let [{pending-tokens :reserved-tokens}
(swap! (.-state rate-limiter)
(fn [state]
(update state :reserved-tokens
(fn [pending-tokens]
;; test if we can pass in wait period
(if (<= (- (+ pending-tokens permits)
(* max-wait-ms (.-rate rate-limiter)))
0)
(+ pending-tokens permits)
(throw (ex-info "Not enough permits." {:rate-limiter true})))))))]
(if (<= pending-tokens 0)
0
(long (/ pending-tokens (.-rate rate-limiter)))))
(catch clojure.lang.ExceptionInfo _
false)))
(def ^{:const true :no-doc true}
allowed-rate-limiter-option-keys
#{:rate :max-cached-tokens})
(defn rate-limiter
"Create a default rate limiter with:
* `rate`: permits per second
* `max-cached-tokens`: the max size of tokens that the bucket can cache when it's idle"
[opts]
(if-let [rate (:rate opts)]
(let [max-cached-tokens (:max-cached-tokens opts rate)]
(TokenBucketRateLimiter. (/ (double rate) 1000) max-cached-tokens
(atom {:reserved-tokens (double 0)
:last-refill-ts (long -1)})))
(throw (IllegalArgumentException. ":rate is required for rate-limiter"))))