-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
quantile.go
120 lines (101 loc) · 2.5 KB
/
quantile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package quantile
import (
"strings"
"sync"
"time"
"github.com/bmizerany/perks/quantile"
"github.com/nsqio/nsq/internal/stringy"
)
type Result struct {
Count int `json:"count"`
Percentiles []map[string]float64 `json:"percentiles"`
}
func (r *Result) String() string {
var s []string
for _, item := range r.Percentiles {
s = append(s, stringy.NanoSecondToHuman(item["value"]))
}
return strings.Join(s, ", ")
}
type Quantile struct {
sync.Mutex
streams [2]quantile.Stream
currentIndex uint8
lastMoveWindow time.Time
currentStream *quantile.Stream
Percentiles []float64
MoveWindowTime time.Duration
}
func New(WindowTime time.Duration, Percentiles []float64) *Quantile {
q := Quantile{
currentIndex: 0,
lastMoveWindow: time.Now(),
MoveWindowTime: WindowTime / 2,
Percentiles: Percentiles,
}
for i := 0; i < 2; i++ {
q.streams[i] = *quantile.NewTargeted(Percentiles...)
}
q.currentStream = &q.streams[0]
return &q
}
func (q *Quantile) Result() *Result {
if q == nil {
return &Result{}
}
queryHandler := q.QueryHandler()
result := Result{
Count: queryHandler.Count(),
Percentiles: make([]map[string]float64, len(q.Percentiles)),
}
for i, p := range q.Percentiles {
value := queryHandler.Query(p)
result.Percentiles[i] = map[string]float64{"quantile": p, "value": value}
}
return &result
}
func (q *Quantile) Insert(msgStartTime int64) {
q.Lock()
now := time.Now()
for q.IsDataStale(now) {
q.moveWindow()
}
q.currentStream.Insert(float64(now.UnixNano() - msgStartTime))
q.Unlock()
}
func (q *Quantile) QueryHandler() *quantile.Stream {
q.Lock()
now := time.Now()
for q.IsDataStale(now) {
q.moveWindow()
}
merged := quantile.NewTargeted(q.Percentiles...)
merged.Merge(q.streams[0].Samples())
merged.Merge(q.streams[1].Samples())
q.Unlock()
return merged
}
func (q *Quantile) IsDataStale(now time.Time) bool {
return now.After(q.lastMoveWindow.Add(q.MoveWindowTime))
}
func (q *Quantile) Merge(them *Quantile) {
q.Lock()
them.Lock()
iUs := q.currentIndex
iThem := them.currentIndex
q.streams[iUs].Merge(them.streams[iThem].Samples())
iUs ^= 0x1
iThem ^= 0x1
q.streams[iUs].Merge(them.streams[iThem].Samples())
if q.lastMoveWindow.Before(them.lastMoveWindow) {
q.lastMoveWindow = them.lastMoveWindow
}
q.Unlock()
them.Unlock()
}
func (q *Quantile) moveWindow() {
q.currentIndex ^= 0x1
q.currentStream = &q.streams[q.currentIndex]
q.lastMoveWindow = q.lastMoveWindow.Add(q.MoveWindowTime)
q.currentStream.Reset()
}