/
Throttle.h
324 lines (272 loc) · 7.94 KB
/
Throttle.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_THROTTLE_H
#define CEPH_THROTTLE_H
#include "Cond.h"
#include <condition_variable>
class CephContext;
class PerfCounters;
/**
* @class Throttle
* Throttles the maximum number of active requests.
*
* This class defines the maximum number of slots currently taken away. The
* excessive requests for more of them are delayed, until some slots are put
* back, so @p get_current() drops below the limit after fulfills the requests.
*/
class Throttle {
CephContext *cct;
const std::string name;
PerfCounters *logger;
ceph::atomic_t count, max;
Mutex lock;
list<Cond*> cond;
const bool use_perf;
public:
Throttle(CephContext *cct, const std::string& n, int64_t m = 0, bool _use_perf = true);
~Throttle();
private:
void _reset_max(int64_t m);
bool _should_wait(int64_t c) const {
int64_t m = max.read();
int64_t cur = count.read();
return
m &&
((c <= m && cur + c > m) || // normally stay under max
(c >= m && cur > m)); // except for large c
}
bool _wait(int64_t c);
public:
/**
* gets the number of currently taken slots
* @returns the number of taken slots
*/
int64_t get_current() const {
return count.read();
}
/**
* get the max number of slots
* @returns the max number of slots
*/
int64_t get_max() const { return max.read(); }
/**
* return true if past midpoint
*/
bool past_midpoint() const {
return count.read() >= max.read() / 2;
}
/**
* set the new max number, and wait until the number of taken slots drains
* and drops below this limit.
*
* @param m the new max number
* @returns true if this method is blocked, false it it returns immediately
*/
bool wait(int64_t m = 0);
/**
* take the specified number of slots from the stock regardless the throttling
* @param c number of slots to take
* @returns the total number of taken slots
*/
int64_t take(int64_t c = 1);
/**
* get the specified amount of slots from the stock, but will wait if the
* total number taken by consumer would exceed the maximum number.
* @param c number of slots to get
* @param m new maximum number to set, ignored if it is 0
* @returns true if this request is blocked due to the throttling, false
* otherwise
*/
bool get(int64_t c = 1, int64_t m = 0);
/**
* the unblocked version of @p get()
* @returns true if it successfully got the requested amount,
* or false if it would block.
*/
bool get_or_fail(int64_t c = 1);
/**
* put slots back to the stock
* @param c number of slots to return
* @returns number of requests being hold after this
*/
int64_t put(int64_t c = 1);
/**
* reset the zero to the stock
*/
void reset();
bool should_wait(int64_t c) const {
return _should_wait(c);
}
void reset_max(int64_t m) {
Mutex::Locker l(lock);
_reset_max(m);
}
};
/**
* BackoffThrottle
*
* Creates a throttle which gradually induces delays when get() is called
* based on params low_threshhold, high_threshhold, expected_throughput,
* high_multiple, and max_multiple.
*
* In [0, low_threshhold), we want no delay.
*
* In [low_threshhold, high_threshhold), delays should be injected based
* on a line from 0 at low_threshhold to
* high_multiple * (1/expected_throughput) at high_threshhold.
*
* In [high_threshhold, 1), we want delays injected based on a line from
* (high_multiple * (1/expected_throughput)) at high_threshhold to
* (high_multiple * (1/expected_throughput)) +
* (max_multiple * (1/expected_throughput)) at 1.
*
* Let the current throttle ratio (current/max) be r, low_threshhold be l,
* high_threshhold be h, high_delay (high_multiple / expected_throughput) be e,
* and max_delay (max_muliple / expected_throughput) be m.
*
* delay = 0, r \in [0, l)
* delay = (r - l) * (e / (h - l)), r \in [l, h)
* delay = e + (r - h)((m - e)/(1 - h))
*/
class BackoffThrottle {
CephContext *cct;
const std::string name;
PerfCounters *logger;
std::mutex lock;
using locker = std::unique_lock<std::mutex>;
unsigned next_cond = 0;
/// allocated once to avoid constantly allocating new ones
vector<std::condition_variable> conds;
const bool use_perf;
/// pointers into conds
list<std::condition_variable*> waiters;
std::list<std::condition_variable*>::iterator _push_waiter() {
unsigned next = next_cond++;
if (next_cond == conds.size())
next_cond = 0;
return waiters.insert(waiters.end(), &(conds[next]));
}
void _kick_waiters() {
if (!waiters.empty())
waiters.front()->notify_all();
}
/// see above, values are in [0, 1].
double low_threshhold = 0;
double high_threshhold = 1;
/// see above, values are in seconds
double high_delay_per_count = 0;
double max_delay_per_count = 0;
/// Filled in in set_params
double s0 = 0; ///< e / (h - l), l != h, 0 otherwise
double s1 = 0; ///< (m - e)/(1 - h), 1 != h, 0 otherwise
/// max
uint64_t max = 0;
uint64_t current = 0;
std::chrono::duration<double> _get_delay(uint64_t c) const;
public:
/**
* set_params
*
* Sets params. If the params are invalid, returns false
* and populates errstream (if non-null) with a user compreshensible
* explanation.
*/
bool set_params(
double low_threshhold,
double high_threshhold,
double expected_throughput,
double high_multiple,
double max_multiple,
uint64_t throttle_max,
ostream *errstream);
std::chrono::duration<double> get(uint64_t c = 1);
std::chrono::duration<double> wait() {
return get(0);
}
uint64_t put(uint64_t c = 1);
uint64_t take(uint64_t c = 1);
uint64_t get_current();
uint64_t get_max();
BackoffThrottle(CephContext *cct, const std::string& n,
unsigned expected_concurrency, ///< [in] determines size of conds
bool _use_perf = true);
~BackoffThrottle();
};
/**
* @class SimpleThrottle
* This is a simple way to bound the number of concurrent operations.
*
* It tracks the first error encountered, and makes it available
* when all requests are complete. wait_for_ret() should be called
* before the instance is destroyed.
*
* Re-using the same instance isn't safe if you want to check each set
* of operations for errors, since the return value is not reset.
*/
class SimpleThrottle {
public:
SimpleThrottle(uint64_t max, bool ignore_enoent);
~SimpleThrottle();
void start_op();
void end_op(int r);
bool pending_error() const;
int wait_for_ret();
private:
mutable Mutex m_lock;
Cond m_cond;
uint64_t m_max;
uint64_t m_current;
int m_ret;
bool m_ignore_enoent;
};
class OrderedThrottle;
class C_OrderedThrottle : public Context {
public:
C_OrderedThrottle(OrderedThrottle *ordered_throttle, uint64_t tid)
: m_ordered_throttle(ordered_throttle), m_tid(tid) {
}
protected:
void finish(int r) override;
private:
OrderedThrottle *m_ordered_throttle;
uint64_t m_tid;
};
/**
* @class OrderedThrottle
* Throttles the maximum number of active requests and completes them in order
*
* Operations can complete out-of-order but their associated Context callback
* will completed in-order during invokation of start_op() and wait_for_ret()
*/
class OrderedThrottle {
public:
OrderedThrottle(uint64_t max, bool ignore_enoent);
C_OrderedThrottle *start_op(Context *on_finish);
void end_op(int r);
bool pending_error() const;
int wait_for_ret();
protected:
friend class C_OrderedThrottle;
void finish_op(uint64_t tid, int r);
private:
struct Result {
bool finished;
int ret_val;
Context *on_finish;
Result(Context *_on_finish = NULL)
: finished(false), ret_val(0), on_finish(_on_finish) {
}
};
typedef std::map<uint64_t, Result> TidResult;
mutable Mutex m_lock;
Cond m_cond;
uint64_t m_max;
uint64_t m_current;
int m_ret_val;
bool m_ignore_enoent;
uint64_t m_next_tid;
uint64_t m_complete_tid;
TidResult m_tid_result;
void complete_pending_ops();
};
#endif