This repository has been archived by the owner on Dec 8, 2022. It is now read-only.
/
time_window_sample.go
207 lines (163 loc) · 4.64 KB
/
time_window_sample.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package metrics
import (
"sync"
"time"
)
type timedValue struct {
timestamp int64
value int64
}
type TimeWindowSample struct {
startWindowSize int
maxWindowSize int
timeWindow time.Duration
scaleFactor float64
mutex sync.Mutex // guards everything below
count int64 // Total number of updates seen by the sample
dropped int64 // Total number of dropped points that are in the time window
values []timedValue
numValues int // number of active values being tracked by this sample
latest int
earliest int
}
func NewTimeWindowSample(startWindowSize int, maxWindowSize int, timeWindow time.Duration) Sample {
return &TimeWindowSample{
startWindowSize: startWindowSize,
maxWindowSize: maxWindowSize,
timeWindow: timeWindow,
scaleFactor: 1.5,
values: make([]timedValue, startWindowSize),
}
}
func (sample *TimeWindowSample) resize(size int) {
if size < sample.numValues {
panic("size must be greater than sample numValues")
}
previous := sample.values
sample.values = make([]timedValue, size)
copy(sample.values, previous)
sample.earliest = 0
sample.latest = int(sample.numValues - 1)
}
func (sample *TimeWindowSample) drop(forced int, nowNano int64) {
numValues := sample.numValues
permitAfterNano := nowNano - sample.timeWindow.Nanoseconds()
advance := func() {
forced--
sample.numValues--
if sample.values[sample.earliest].timestamp > permitAfterNano {
sample.dropped++
}
sample.earliest = (sample.earliest + 1) % len(sample.values)
}
for i := 0; i < numValues; i++ {
if forced > 0 || sample.values[sample.earliest].timestamp <= permitAfterNano {
advance()
} else {
break
}
}
}
func (sample *TimeWindowSample) add(value int64, nowNano int64) {
if sample.numValues >= len(sample.values) {
panic("cannot add to sample that is already full")
}
if sample.numValues == 0 {
sample.earliest, sample.latest = 0, 0
} else {
sample.latest = (sample.latest + 1) % len(sample.values)
}
sample.numValues++
sample.values[sample.latest].value = value
sample.values[sample.latest].timestamp = nowNano
}
func (sample *TimeWindowSample) Update(value int64) {
sample.mutex.Lock()
defer sample.mutex.Unlock()
sample.count++
now := time.Now().UnixNano()
// scale up if needed
if sample.numValues == len(sample.values) {
if sample.numValues < sample.maxWindowSize {
newSize := int(float64(len(sample.values)) * sample.scaleFactor)
if newSize > sample.maxWindowSize {
newSize = sample.maxWindowSize
}
sample.resize(newSize)
} else {
sample.drop(1, now)
}
}
sample.add(value, now)
// scale down if needed
if int(float64(3*sample.numValues)*sample.scaleFactor) < len(sample.values) {
newSize := int(float64(len(sample.values)) / sample.scaleFactor)
if newSize < sample.startWindowSize {
newSize = sample.startWindowSize
}
sample.resize(newSize)
}
}
func (sample *TimeWindowSample) Clear() {
sample.mutex.Lock()
defer sample.mutex.Unlock()
sample.numValues = 0
sample.count = 0
}
func (sample *TimeWindowSample) Count() int64 {
sample.mutex.Lock()
defer sample.mutex.Unlock()
return sample.count
}
func (sample *TimeWindowSample) Dropped() int64 {
sample.mutex.Lock()
defer sample.mutex.Unlock()
return sample.dropped
}
func (sample *TimeWindowSample) Size() int {
sample.mutex.Lock()
defer sample.mutex.Unlock()
return sample.numValues
}
func (sample *TimeWindowSample) Values() []int64 {
sample.mutex.Lock()
defer sample.mutex.Unlock()
sample.drop(0, time.Now().UnixNano())
values := make([]int64, sample.numValues)
for i, idx := 0, sample.earliest; i < sample.numValues; i, idx = i+1, (idx+1)%len(sample.values) {
values[i] = sample.values[idx].value
}
return values
}
func (sample *TimeWindowSample) Max() int64 {
return SampleMax(sample.Values())
}
func (sample *TimeWindowSample) Mean() float64 {
return SampleMean(sample.Values())
}
func (sample *TimeWindowSample) Min() int64 {
return SampleMin(sample.Values())
}
func (sample *TimeWindowSample) Percentile(percentile float64) float64 {
return SamplePercentile(sample.Values(), percentile)
}
func (sample *TimeWindowSample) Percentiles(percentiles []float64) []float64 {
return SamplePercentiles(sample.Values(), percentiles)
}
func (sample *TimeWindowSample) Snapshot() Sample {
values := sample.Values()
return &SampleSnapshot{
count: sample.count,
dropped: sample.dropped,
values: values,
}
}
func (sample *TimeWindowSample) StdDev() float64 {
return SampleStdDev(sample.Values())
}
func (sample *TimeWindowSample) Sum() int64 {
return SampleSum(sample.Values())
}
func (sample *TimeWindowSample) Variance() float64 {
return SampleVariance(sample.Values())
}