/
rolling_policy.go
96 lines (86 loc) · 2 KB
/
rolling_policy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package stat
import (
"sync"
"time"
)
// RollingPolicy 基于持续时间的环形窗口的策略,随时间段移动存储桶偏移量。
type RollingPolicy struct {
mu sync.RWMutex
size int
window *Window
offset int
bucketDuration time.Duration
lastAppendTime time.Time
}
// timespan 时间跨度
func (r *RollingPolicy) timespan() int {
v := int(time.Since(r.lastAppendTime) / r.bucketDuration)
if v > -1 {
// 时钟回滚?
return v
}
return r.size
}
// add
func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
r.mu.Lock()
timespan := r.timespan()
if timespan > 0 {
r.lastAppendTime = r.lastAppendTime.Add(time.Duration(timespan * int(r.bucketDuration)))
offset := r.offset
// 重置过期的 bucket
s := offset + 1
if timespan > r.size {
timespan = r.size
}
// e: reset offset must start from offset+1
e, e1 := s+timespan, 0
if e > r.size {
e1 = e - r.size
e = r.size
}
for i := s; i < e; i++ {
r.window.ResetBucket(i)
offset = i
}
for i := 0; i < e1; i++ {
r.window.ResetBucket(i)
offset = i
}
r.offset = offset
}
f(r.offset, val)
r.mu.Unlock()
}
// Append 将给定的点附加到窗口
func (r *RollingPolicy) Append(val float64) {
r.add(r.window.Append, val)
}
// Add 将给定值添加到存储桶中的最新点
func (r *RollingPolicy) Add(val float64) {
r.add(r.window.Add, val)
}
// Reduce 缩减应用窗口
func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) {
r.mu.RLock()
timespan := r.timespan()
if count := r.size - timespan; count > 0 {
offset := r.offset + timespan + 1
if offset >= r.size {
offset = offset - r.size
}
val = f(r.window.Iterator(offset, count))
}
r.mu.RUnlock()
return val
}
// NewRollingPolicy 实例化 RollingPolicy 对象
func NewRollingPolicy(window *Window, bucketDuration time.Duration) *RollingPolicy {
return &RollingPolicy{
window: window,
size: window.Size(),
offset: 0,
bucketDuration: bucketDuration,
lastAppendTime: time.Now(),
}
}