/
common_aggregative_flow.go
165 lines (138 loc) · 4.7 KB
/
common_aggregative_flow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package metrics
import (
"math"
)
const (
// If this value is too large then it will be required too many events
// per second to calculate percentile correctly (Per50, Per99 etc).
// If this value is too small then the percentile will be calculated
// not accurate.
iterationsRequiredPerSecond = 20
)
type commonAggregativeFlow struct {
commonAggregative
}
func (m *commonAggregativeFlow) init(r *Registry, parent Metric, key string, tags AnyTags) {
m.commonAggregative.init(r, parent, key, tags)
}
// NewAggregativeStatistics returns a "Flow" (see "Flow" in README.md) implementation of AggregativeStatistics.
func (m *commonAggregativeFlow) NewAggregativeStatistics() AggregativeStatistics {
stats := newAggregativeStatisticsFlow()
stats.percentiles = m.registry.defaultPercentiles
return stats
}
// guessPercentileValue is a so-so correct way of correcting the percentile value for big amount of events
// (> iterationsRequiredPerSecond)
//
// See "Flow" in README.md
func guessPercentileValue(curValue, newValue float64, count uint64, perc float64) float64 {
// The more events we received the more precise value we should get,
// so will should change the value slower if the count is higher
inertness := float64(count) / iterationsRequiredPerSecond
// See "How the calculation of percentile values works" in README.md
requireGreater := float64(randIntn(math.MaxUint32))/float64(math.MaxUint32) > perc
if newValue > curValue {
if requireGreater {
return curValue
}
} else {
if !requireGreater {
return curValue
}
}
if requireGreater {
inertness *= perc
} else {
inertness *= 1 - perc
}
return (curValue*inertness + newValue) / (inertness + 1)
}
type aggregativeStatisticsFlow struct {
tickID uint64
locker Spinlock
percentiles []float64
percentileValues [maxPercentileValues]float64
}
// GetPercentile returns a percentile value for a given percentile (see https://en.wikipedia.org/wiki/Percentile).
//
// It returns nil if the percentile is not from the list: 0.01, 0.1, 0.5, 0.9, 0.99.
func (s *aggregativeStatisticsFlow) GetPercentile(percentile float64) *float64 {
if s == nil {
return nil
}
for idx, p := range s.percentiles {
if p == percentile {
return &s.percentileValues[idx]
}
}
return nil
}
// GetPercentiles returns percentile values for a given slice of percentiles.
//
// Returned values are ordered accordingly to the input slice. An element of the returned
// slice is "nil" if the according percentile is not from the list: 0.01, 0.1, 0.5, 0.9, 0.99.
//
// There's no performance profit to prefer either of GetPercentile/GetPercentiles for any case (because it's a "Flow"
// method of percentile calculate), so just use what is more convenient.
func (s *aggregativeStatisticsFlow) GetPercentiles(percentiles []float64) []*float64 {
if len(percentiles) == 0 {
return nil
}
r := make([]*float64, 0, len(percentiles))
for _, percentile := range percentiles {
r = append(r, s.GetPercentile(percentile))
}
return r
}
// Note! considerValue should be called only for locked items
func (s *aggregativeStatisticsFlow) considerValue(v float64) {
s.tickID++
if s.tickID == 1 {
for idx := range s.percentiles {
s.percentileValues[idx] = v
}
return
}
for idx, p := range s.percentiles {
s.percentileValues[idx] = guessPercentileValue(s.percentileValues[idx], v, s.tickID, p)
}
}
// GetDefaultPercentiles returns all percentiles.
func (s *aggregativeStatisticsFlow) GetDefaultPercentiles() ([]float64, []float64) {
s.locker.Lock()
defer s.locker.Unlock()
r := make([]float64, len(s.percentiles))
copy(r, s.percentileValues[:])
return s.percentiles, r
}
// ConsiderValue is an analog of Prometheus' observe (see "Aggregative metrics" in README.md)
func (s *aggregativeStatisticsFlow) ConsiderValue(v float64) {
//s.locker.Lock()
s.considerValue(v)
//s.locker.Unlock()
}
// Set resets the statistics and sets only one event with the value passed as the argument,
// so all aggregative values (avg, min, max, ...) will be equal to the value
func (s *aggregativeStatisticsFlow) Set(value float64) {
s.locker.Lock()
for idx := range s.percentiles {
s.percentileValues[idx] = value
}
s.locker.Unlock()
}
// MergeStatistics adds statistics of the argument to the own one.
//
// See "Attention!" of "How the calculation of percentile values works" in README.md.
func (s *aggregativeStatisticsFlow) MergeStatistics(oldSI AggregativeStatistics) {
if oldSI == nil {
return
}
oldS := oldSI.(*aggregativeStatisticsFlow)
if s.tickID+oldS.tickID == 0 {
return
}
for idx := range s.percentiles {
s.percentileValues[idx] = (s.percentileValues[idx]*float64(s.tickID) + oldS.percentileValues[idx]*float64(oldS.tickID)) / float64(s.tickID+oldS.tickID)
}
s.tickID += oldS.tickID
}