-
Notifications
You must be signed in to change notification settings - Fork 0
/
metric.go
117 lines (91 loc) · 2.41 KB
/
metric.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"sort"
"sync"
"time"
)
const SlidingWindowSize = 60
// Refer to https://github.com/Netflix/Hystrix/blob/master/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/BucketedRollingCounterStream.java
// Implement sliding window through bucket
// Buckets: key is timestamp and granularity is second
// bucket is the time duration of client requests
type WindowMetric struct {
Buckets map[int64]*Bucket
Mutex sync.RWMutex
}
type Bucket struct {
Durations []time.Duration
}
func NewWindowMetric() *WindowMetric {
wm := &WindowMetric{
Buckets: make(map[int64]*Bucket),
Mutex: sync.RWMutex{},
}
return wm
}
type Durations []time.Duration
func (c Durations) Len() int { return len(c) }
func (c Durations) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c Durations) Less(i, j int) bool { return c[i] < c[j] }
// TODO: cached durations after computed to prevent dup calc
func (wm *WindowMetric) SortDurations() Durations {
var durations Durations
now := time.Now()
wm.Mutex.Lock()
defer wm.Mutex.Unlock()
for ts, b := range wm.Buckets {
if ts >= now.Unix() - SlidingWindowSize {
for _, d := range b.Durations {
durations = append(durations, d)
}
}
}
sort.Sort(durations)
return durations
}
func (wm *WindowMetric) getBucketSlow(now time.Time) *Bucket {
wm.Mutex.Lock()
defer wm.Mutex.Unlock()
wm.Buckets[now.Unix()] = &Bucket{}
return wm.Buckets[now.Unix()]
}
func (wm *WindowMetric) getBucket() *Bucket {
wm.Mutex.RLock()
now := time.Now()
bucket, exists := wm.Buckets[now.Unix()]
wm.Mutex.RUnlock()
// Similar to sync.Once to do double check
if !exists {
bucket = wm.getBucketSlow(now)
}
return bucket
}
func (wm *WindowMetric) del() {
now := time.Now()
for ts := range wm.Buckets {
if ts <= now.Unix() - SlidingWindowSize {
delete(wm.Buckets, ts)
}
}
}
// Add appends the time.Duration given to the current time bucket.
func (wm *WindowMetric) Add(duration time.Duration) {
b := wm.getBucket()
wm.Mutex.Lock()
defer wm.Mutex.Unlock()
b.Durations = append(b.Durations, duration)
wm.del()
}
// Computes the average timing in the last one minute.
func (wm *WindowMetric) Mean() uint32 {
sortedDurations := wm.SortDurations()
length := sortedDurations.Len()
if length == 0 {
return 0
}
var sum time.Duration
for _, d := range sortedDurations {
sum += d
}
return uint32(sum.Nanoseconds()/int64(length)) / 1000000
}