forked from voldemort/voldemort
/
RequestCounter.java
350 lines (303 loc) · 11.8 KB
/
RequestCounter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
package voldemort.store.stats;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import voldemort.utils.SystemTime;
import voldemort.utils.Time;
/**
* A thread-safe request counter that calculates throughput for a specified
* duration of time.
*
*
*/
public class RequestCounter {
private final AtomicReference<Accumulator> values;
private final long durationMs;
private final Time time;
private final Histogram histogram;
private volatile long q95LatencyMs;
private volatile long q99LatencyMs;
private boolean useHistogram;
private static final Logger logger = Logger.getLogger(RequestCounter.class.getName());
/**
* @param durationMs specifies for how long you want to maintain this
* counter (in milliseconds).
*/
public RequestCounter(long durationMs) {
this(durationMs, SystemTime.INSTANCE, false);
}
/**
* @param durationMs specifies for how long you want to maintain this
* counter (in milliseconds). useHistogram indicates that this
* counter should also use a histogram.
*/
public RequestCounter(long durationMs, boolean useHistogram) {
this(durationMs, SystemTime.INSTANCE, useHistogram);
}
/**
* For testing request expiration via an injected time provider
*/
RequestCounter(long durationMs, Time time) {
this(durationMs, time, false);
}
RequestCounter(long durationMs, Time time, boolean useHistogram) {
this.time = time;
this.values = new AtomicReference<Accumulator>(new Accumulator());
this.durationMs = durationMs;
this.q95LatencyMs = 0;
this.q99LatencyMs = 0;
this.useHistogram = useHistogram;
if(this.useHistogram)
this.histogram = new Histogram(10000, 1);
else
this.histogram = null;
}
public long getCount() {
return getValidAccumulator().count;
}
public long getTotalCount() {
return getValidAccumulator().total;
}
public float getThroughput() {
Accumulator oldv = getValidAccumulator();
double elapsed = (time.getMilliseconds() - oldv.startTimeMS) / (double) Time.MS_PER_SECOND;
if(elapsed > 0f) {
return (float) (oldv.count / elapsed);
} else {
return 0f;
}
}
public float getThroughputInBytes() {
Accumulator oldv = getValidAccumulator();
double elapsed = (time.getMilliseconds() - oldv.startTimeMS) / (double) Time.MS_PER_SECOND;
if(elapsed > 0f) {
return (float) (oldv.totalBytes / elapsed);
} else {
return 0f;
}
}
public String getDisplayThroughput() {
return String.format("%.2f", getThroughput());
}
public double getAverageTimeInMs() {
return getValidAccumulator().getAverageTimeNS() / Time.NS_PER_MS;
}
public String getDisplayAverageTimeInMs() {
return String.format("%.4f", getAverageTimeInMs());
}
public long getDuration() {
return durationMs;
}
public long getMaxLatencyInMs() {
return getValidAccumulator().maxLatencyNS / Time.NS_PER_MS;
}
private void maybeResetHistogram() {
if(!this.useHistogram)
return;
Accumulator accum = values.get();
long now = time.getMilliseconds();
if(now - accum.startTimeMS > durationMs) {
// timing instrumentation (debug only)
long startTimeNs = 0;
if(logger.isDebugEnabled()) {
startTimeNs = System.nanoTime();
}
// Reset the histogram
q95LatencyMs = histogram.getQuantile(0.95);
q99LatencyMs = histogram.getQuantile(0.99);
histogram.reset();
// timing instrumentation (debug only)
if(logger.isDebugEnabled()) {
logger.debug("Histogram (" + System.identityHashCode(histogram)
+ ") : reset, Q95, & Q99 took " + (System.nanoTime() - startTimeNs)
+ " ns.");
}
}
}
private Accumulator getValidAccumulator() {
Accumulator accum = values.get();
long now = time.getMilliseconds();
/*
* if still in the window, just return it
*/
if(now - accum.startTimeMS <= durationMs) {
return accum;
}
/*
* try to set. if we fail, then someone else set it, so just return that
* new one
*/
Accumulator newWithTotal = accum.newWithTotal();
if(values.compareAndSet(accum, newWithTotal)) {
return newWithTotal;
}
return values.get();
}
/*
* Updates the stats accumulator with another operation. We need to make
* sure that the request is only added to a non-expired pair. If so, start a
* new counter pair with recent time. We'll only try to do this 3 times - if
* other threads keep modifying while we're doing our own work, just bail.
*
* @param timeNS time of operation, in nanoseconds
*/
public void addRequest(long timeNS) {
addRequest(timeNS, 0, 0, 0);
}
/**
* @see #addRequest(long) Detailed request to track additionald data about
* PUT, GET and GET_ALL
*
* @param numEmptyResponses For GET and GET_ALL, how many keys were no
* values found
* @param bytes Total number of bytes across all versions of values' bytes
* @param getAllAggregatedCount Total number of keys returned for getAll
* calls
*/
public void addRequest(long timeNS,
long numEmptyResponses,
long bytes,
long getAllAggregatedCount) {
// timing instrumentation (trace only)
long startTimeNs = 0;
if(logger.isTraceEnabled()) {
startTimeNs = System.nanoTime();
}
long timeMs = timeNS / Time.NS_PER_MS;
if(this.useHistogram) {
histogram.insert(timeMs);
maybeResetHistogram();
}
for(int i = 0; i < 3; i++) {
Accumulator oldv = getValidAccumulator();
Accumulator newv = new Accumulator(oldv.startTimeMS,
oldv.count + 1,
oldv.totalTimeNS + timeNS,
oldv.total + 1,
oldv.numEmptyResponses + numEmptyResponses,
Math.max(timeNS, oldv.maxLatencyNS),
oldv.totalBytes + bytes,
Math.max(oldv.maxBytes, bytes),
oldv.getAllAggregatedCount + getAllAggregatedCount,
getAllAggregatedCount > oldv.getAllMaxCount ? getAllAggregatedCount
: oldv.getAllMaxCount);
if(values.compareAndSet(oldv, newv)) {
// timing instrumentation (trace only)
if(logger.isTraceEnabled()) {
logger.trace("addRequest (histogram.insert and accumulator update) took "
+ (System.nanoTime() - startTimeNs) + " ns.");
}
// Return since data has been accumulated
return;
}
}
logger.info("addRequest lost timing instrumentation data because three retries was insufficient to update the accumulator.");
// timing instrumentation (trace only)
if(logger.isTraceEnabled()) {
logger.trace("addRequest (histogram.insert and accumulator update) took "
+ (System.nanoTime() - startTimeNs) + " ns.");
}
}
/**
* Return the number of requests that have returned returned no value for
* the requested key. Tracked only for GET.
*/
public long getNumEmptyResponses() {
return getValidAccumulator().numEmptyResponses;
}
/**
* Return the size of the largest response or request in bytes returned.
* Tracked only for GET, GET_ALL and PUT.
*/
public long getMaxSizeInBytes() {
return getValidAccumulator().maxBytes;
}
/**
* Return the average size of all the versioned values returned. Tracked
* only for GET, GET_ALL and PUT.
*/
public double getAverageSizeInBytes() {
return getValidAccumulator().getAverageBytes();
}
/**
* Return the aggregated number of keys returned across all getAll calls,
* taking into account multiple values returned per call.
*/
public long getGetAllAggregatedCount() {
return getValidAccumulator().getAllAggregatedCount;
}
/**
* Return the maximum number of keys returned across all getAll calls.
*/
public long getGetAllMaxCount() {
return getValidAccumulator().getAllMaxCount;
}
public long getQ95LatencyMs() {
return q95LatencyMs;
}
public long getQ99LatencyMs() {
return q99LatencyMs;
}
private class Accumulator {
final long startTimeMS;
final long count;
final long totalTimeNS;
final long total;
final long numEmptyResponses; // GET and GET_ALL: number of empty
// responses that have been returned
final long getAllAggregatedCount; // GET_ALL: a single call to GET_ALL
// can return multiple k-v pairs.
// Track total requested.
final long getAllMaxCount; // GET_ALL : track max number of keys
// requesed
final long maxLatencyNS;
final long maxBytes; // Maximum single value
final long totalBytes; // Sum of all the values
public Accumulator() {
this(RequestCounter.this.time.getMilliseconds(), 0, 0, 0, 0, 0, 0, 0, 0, 0);
}
/**
* This method resets startTimeMS.
*
* @return
*/
public Accumulator newWithTotal() {
return new Accumulator(RequestCounter.this.time.getMilliseconds(),
0,
0,
total,
0,
0,
0,
0,
0,
0);
}
public Accumulator(long startTimeMS,
long count,
long totalTimeNS,
long total,
long numEmptyResponses,
long maxLatencyNS,
long totalBytes,
long maxBytes,
long getAllAggregatedCount,
long getAllMaxCount) {
this.startTimeMS = startTimeMS;
this.count = count;
this.totalTimeNS = totalTimeNS;
this.total = total;
this.numEmptyResponses = numEmptyResponses;
this.maxLatencyNS = maxLatencyNS;
this.totalBytes = totalBytes;
this.maxBytes = maxBytes;
this.getAllAggregatedCount = getAllAggregatedCount;
this.getAllMaxCount = getAllMaxCount;
}
public double getAverageTimeNS() {
return count > 0 ? 1f * totalTimeNS / count : 0f;
}
public double getAverageBytes() {
return count > 0 ? 1f * totalBytes / count : -0f;
}
}
}