-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
bucketing.go
359 lines (325 loc) · 12.9 KB
/
bucketing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregation
import (
"math"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
)
type (
// TimedFloat64Buckets keeps buckets that have been collected at a certain time.
TimedFloat64Buckets struct {
bucketsMutex sync.RWMutex
// buckets is a ring buffer indexed by timeToIndex() % len(buckets).
// Each element represents a certain granularity of time, and the total
// represented duration adds up to a window length of time.
buckets []float64
// firstWrite holds the time when the first write has been made.
// This time is reset to `now` when the very first write happens,
// or when a first write happens after `window` time of inactivity.
// The difference between `now` and `firstWrite` is used to compute
// the number of eligible buckets for computation of average values.
firstWrite time.Time
// lastWrite stores the time when the last write was made.
// This is used to detect when we have gaps in the data (i.e. more than a
// granularity has expired since the last write) so that we can zero those
// entries in the buckets array. It is also used when calculating the
// WindowAverage to know how much of the buckets array represents valid data.
lastWrite time.Time
// granularity is the duration represented by each bucket in the buckets ring buffer.
granularity time.Duration
// window is the total time represented by the buckets ring buffer.
window time.Duration
// The total sum of all buckets within the window. This total includes
// invalid buckets, e.g. buckets written to before firstTime or after
// lastTime are included in this total.
windowTotal float64
}
// WeightedFloat64Buckets is the implementation of buckets, that
// uses weighted average algorithm.
WeightedFloat64Buckets struct {
*TimedFloat64Buckets
// smoothingCoeff contains the speed with which the importance
// of items in the past decays. The larger the faster weights decay.
// It is autocomputed from window size and weightPrecision constant
// and is bounded by minExponent below.
smoothingCoeff float64
}
)
// String implements the Stringer interface.
func (t *TimedFloat64Buckets) String() string {
return spew.Sdump(t.buckets)
}
// computeSmoothingCoeff computes the decay given number of buckets.
// The function uses precision and min exponent value constants.
func computeSmoothingCoeff(nb float64) float64 {
return math.Max(
// Given number of buckets, infer the desired multiplier
// so that at least weightPrecision sum of buckets is met.
1-math.Pow(1-weightPrecision, 1/nb),
// If it's smaller than minExponent — then use minExponent,
// otherwise with extremely large windows we basically end up
// very close to the simple average.
minExponent,
)
}
// NewTimedFloat64Buckets generates a new TimedFloat64Buckets with the given
// granularity.
func NewTimedFloat64Buckets(window, granularity time.Duration) *TimedFloat64Buckets {
// Number of buckets is `window` divided by `granularity`, rounded up.
// e.g. 60s / 2s = 30.
nb := math.Ceil(float64(window) / float64(granularity))
return &TimedFloat64Buckets{
buckets: make([]float64, int(nb)),
granularity: granularity,
window: window,
}
}
// NewWeightedFloat64Buckets generates a new WeightedFloat64Buckets with the given
// granularity.
func NewWeightedFloat64Buckets(window, granularity time.Duration) *WeightedFloat64Buckets {
// Number of buckets is `window` divided by `granularity`, rounded up.
// e.g. 60s / 2s = 30.
nb := math.Ceil(float64(window) / float64(granularity))
return &WeightedFloat64Buckets{
TimedFloat64Buckets: NewTimedFloat64Buckets(window, granularity),
smoothingCoeff: computeSmoothingCoeff(nb),
}
}
// IsEmpty returns true if no data has been recorded for the `window` period.
func (t *TimedFloat64Buckets) IsEmpty(now time.Time) bool {
now = now.Truncate(t.granularity)
t.bucketsMutex.RLock()
defer t.bucketsMutex.RUnlock()
return t.isEmptyLocked(now)
}
// isEmptyLocked expects `now` to be truncated and at least Read Lock held.
func (t *TimedFloat64Buckets) isEmptyLocked(now time.Time) bool {
return now.Sub(t.lastWrite) > t.window
}
func roundToNDigits(n int, f float64) float64 {
p := math.Pow10(n)
return math.Floor(f*p) / p
}
const (
// minExponent is the minimal decay multiplier for the weighted average computations.
minExponent = 0.2
// The sum of weights for weighted average should be at least this much.
weightPrecision = 0.9999
precision = 6
)
// WindowAverage returns the exponential weighted average. This means
// that more recent items have much greater impact on the average than
// the older ones.
// TODO(vagababov): optimize for O(1) computation, if possible.
// E.g. with data [10, 10, 5, 5] (newest last), then
// the `WindowAverage` would return (10+10+5+5)/4 = 7.5
// This with exponent of 0.6 would return 5*0.6+5*0.6*0.4+10*0.6*0.4^2+10*0.6*0.4^3 = 5.544
// If we reverse the data to [5, 5, 10, 10] the simple average would remain the same,
// but this one would change to 9.072.
func (t *WeightedFloat64Buckets) WindowAverage(now time.Time) float64 {
now = now.Truncate(t.granularity)
t.bucketsMutex.RLock()
defer t.bucketsMutex.RUnlock()
if t.isEmptyLocked(now) {
return 0
}
totalB := len(t.buckets)
numB := len(t.buckets)
multiplier := t.smoothingCoeff
// We start with 0es. But we know that we have _some_ data because
// IsEmpty returned false.
if now.After(t.lastWrite) {
numZ := now.Sub(t.lastWrite) / t.granularity
// Skip to this multiplier directly: m*(1-m)^(nz-1).
multiplier = multiplier * math.Pow(1-t.smoothingCoeff, float64(numZ))
// Reduce effective number of buckets.
numB -= int(numZ)
}
startIdx := t.timeToIndex(t.lastWrite) + totalB // To ensure always positive % operation.
ret := 0.
for i := 0; i < numB; i++ {
effectiveIdx := (startIdx - i) % totalB
v := t.buckets[effectiveIdx] * multiplier
ret += v
multiplier *= (1 - t.smoothingCoeff)
// TODO(vagababov): bail out if sm > weightPrecision?
}
return ret
}
// WindowAverage returns the average bucket value over the window.
//
// If the first write was less than the window length ago, an average is
// returned over the partial window. For example, if firstWrite was 6 seconds
// ago, the average will be over these 6 seconds worth of buckets, even if the
// window is 60s. If a window passes with no data being received, the first
// write time is reset so this behaviour takes effect again.
//
// Similarly, if we have not received recent data, the average is based on a
// partial window. For example, if the window is 60 seconds but we last
// received data 10 seconds ago, the window average will be the average over
// the first 50 seconds.
//
// In other cases, for example if there are gaps in the data shorter than the
// window length, the missing data is assumed to be 0 and the average is over
// the whole window length inclusive of the missing data.
func (t *TimedFloat64Buckets) WindowAverage(now time.Time) float64 {
now = now.Truncate(t.granularity)
t.bucketsMutex.RLock()
defer t.bucketsMutex.RUnlock()
switch d := now.Sub(t.lastWrite); {
case d <= 0:
// If LastWrite equal or greater than Now
// return the current WindowTotal, divided by the
// number of valid buckets
numB := math.Min(
float64(t.lastWrite.Sub(t.firstWrite)/t.granularity)+1, // +1 since the times are inclusive.
float64(len(t.buckets)))
return roundToNDigits(precision, t.windowTotal/numB)
case d < t.window:
// If we haven't received metrics for some time, which is less than
// the window -- remove the outdated items and divide by the number
// of valid buckets
stIdx := t.timeToIndex(t.lastWrite)
eIdx := t.timeToIndex(now)
ret := t.windowTotal
for i := stIdx + 1; i <= eIdx; i++ {
ret -= t.buckets[i%len(t.buckets)]
}
numB := math.Min(
float64(t.lastWrite.Sub(t.firstWrite)/t.granularity)+1, // +1 since the times are inclusive.
float64(len(t.buckets)-(eIdx-stIdx)))
return roundToNDigits(precision, ret/numB)
default: // Nothing for more than a window time, just 0.
return 0.
}
}
// timeToIndex converts time to an integer that can be used for modulo
// operations to find the index in the bucket list.
// bucketMutex needs to be held.
func (t *TimedFloat64Buckets) timeToIndex(tm time.Time) int {
// I don't think this run in 2038 :-)
// NB: we need to divide by granularity, since it's a compressing mapping
// to buckets.
return int(tm.Unix()) / int(t.granularity.Seconds())
}
// Record adds a value with an associated time to the correct bucket.
// If this record would introduce a gap in the data, any intervening times
// between the last write and this one will be recorded as zero. If an entire
// window length has expired without data, the firstWrite time is reset,
// meaning the WindowAverage will be of a partial window until enough data is
// received to fill it again.
func (t *TimedFloat64Buckets) Record(now time.Time, value float64) {
bucketTime := now.Truncate(t.granularity)
t.bucketsMutex.Lock()
defer t.bucketsMutex.Unlock()
writeIdx := t.timeToIndex(now)
if t.lastWrite != bucketTime {
if bucketTime.Add(t.window).After(t.lastWrite) {
// If it is the first write or it happened before the first write which we
// have in record, update the firstWrite.
if t.firstWrite.IsZero() || t.firstWrite.After(bucketTime) {
t.firstWrite = bucketTime
}
if bucketTime.After(t.lastWrite) {
if bucketTime.Sub(t.lastWrite) >= t.window {
// This means we had no writes for the duration of `window`. So reset the firstWrite time.
t.firstWrite = bucketTime
// Reset all the buckets.
for i := range t.buckets {
t.buckets[i] = 0
}
t.windowTotal = 0
} else {
// In theory we might lose buckets between stats gathering.
// Thus we need to clean not only the current index, but also
// all the ones from the last write. This is slower than the loop above
// due to possible wrap-around, so they are not merged together.
for i := t.timeToIndex(t.lastWrite) + 1; i <= writeIdx; i++ {
idx := i % len(t.buckets)
t.windowTotal -= t.buckets[idx]
t.buckets[idx] = 0
}
}
// Update the last write time.
t.lastWrite = bucketTime
}
// The else case is t.lastWrite - t.window < bucketTime < t.lastWrite, we can simply add
// the value to the bucket.
} else {
// Ignore this value because it happened a window size ago.
return
}
}
t.buckets[writeIdx%len(t.buckets)] += value
t.windowTotal += value
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// ResizeWindow implements window resizing for the weighted averaging buckets object.
func (t *WeightedFloat64Buckets) ResizeWindow(w time.Duration) {
t.TimedFloat64Buckets.ResizeWindow(w)
t.smoothingCoeff = computeSmoothingCoeff(math.Ceil(float64(w) / float64(t.granularity)))
}
// ResizeWindow resizes the window. This is an O(N) operation,
// and is not supposed to be executed very often.
func (t *TimedFloat64Buckets) ResizeWindow(w time.Duration) {
// Same window size, bail out.
sameWindow := func() bool {
t.bucketsMutex.RLock()
defer t.bucketsMutex.RUnlock()
return w == t.window
}()
if sameWindow {
return
}
numBuckets := int(math.Ceil(float64(w) / float64(t.granularity)))
newBuckets := make([]float64, numBuckets)
newTotal := 0.
// We need write lock here.
// So that we can copy the existing buckets into the new array.
t.bucketsMutex.Lock()
defer t.bucketsMutex.Unlock()
// If we had written any data within `window` time, then exercise the O(N)
// copy algorithm. Otherwise, just assign zeroes.
if time.Now().Truncate(t.granularity).Sub(t.lastWrite) <= t.window {
// If the window is shrinking, then we need to copy only
// `newBuckets` buckets.
oldNumBuckets := len(t.buckets)
tIdx := t.timeToIndex(t.lastWrite)
for i := 0; i < min(numBuckets, oldNumBuckets); i++ {
oi := tIdx % oldNumBuckets
ni := tIdx % numBuckets
newBuckets[ni] = t.buckets[oi]
// In case we're shrinking, make sure the total
// window sum will match. This is no-op in case if
// window is getting bigger.
newTotal += t.buckets[oi]
tIdx--
}
// We can reset this as well to the earliest well known time when we might have
// written data, if it is
t.firstWrite = t.lastWrite.Add(-time.Duration(oldNumBuckets-1) * t.granularity)
} else {
// No valid data so far, so reset to initial value.
t.firstWrite = time.Time{}
}
t.window = w
t.buckets = newBuckets
t.windowTotal = newTotal
}