/
ticker.go
116 lines (101 loc) · 1.88 KB
/
ticker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package scrape
import (
"context"
"sync"
"time"
"github.com/pingcap/ng-monitoring/utils"
)
type Ticker struct {
sync.Mutex
interval time.Duration
subscribers map[int]chan time.Time
idAlloc int
cancel context.CancelFunc
lastTime time.Time
}
func NewTicker(d time.Duration) *Ticker {
if d == 0 {
panic("should never happen")
}
ctx, cancel := context.WithCancel(context.Background())
t := &Ticker{
interval: d,
subscribers: make(map[int]chan time.Time),
cancel: cancel,
}
go utils.GoWithRecovery(func() {
t.run(ctx)
}, nil)
return t
}
type TickerChan struct {
id int
ch chan time.Time
ticker *Ticker
}
func (tc *TickerChan) Stop() {
tc.ticker.Lock()
defer tc.ticker.Unlock()
delete(tc.ticker.subscribers, tc.id)
}
func (t *Ticker) Subscribe() *TickerChan {
ch := make(chan time.Time, 1)
t.Lock()
defer t.Unlock()
t.idAlloc += 1
id := t.idAlloc
t.subscribers[id] = ch
return &TickerChan{
id: id,
ch: ch,
ticker: t,
}
}
func (t *Ticker) Reset(d time.Duration) {
if t.interval == d {
return
}
t.Stop()
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
t.interval = d
go utils.GoWithRecovery(func() {
t.run(ctx)
}, nil)
}
func (t *Ticker) run(ctx context.Context) {
nextStart := int64(t.interval) - time.Now().UnixNano()%int64(t.interval)
select {
case <-time.After(time.Duration(nextStart)):
// Continue after the scraping offset.
case <-ctx.Done():
return
}
t.notify(time.Now())
ticker := time.NewTicker(t.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
t.notify(now)
}
}
}
func (t *Ticker) notify(now time.Time) {
t.Lock()
defer t.Unlock()
t.lastTime = now
for _, ch := range t.subscribers {
select {
case ch <- now:
default:
}
}
}
func (t *Ticker) Stop() {
if t.cancel != nil {
t.cancel()
}
}