-
Notifications
You must be signed in to change notification settings - Fork 7
/
timer.go
98 lines (79 loc) · 2.18 KB
/
timer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package localcache
import (
"sync"
"time"
"github.com/RussellLuo/timingwheel"
)
// expireQueue stores tasks that are automatically deleted after the key expires
type expireQueue struct {
tick time.Duration
wheelSize int64
// The time wheel stores tasks that are scheduled to expire and be deleted.
tw *timingwheel.TimingWheel
mu sync.Mutex
timers map[string]*timingwheel.Timer
}
// newExpireQueue generates an expireQueue object.
// The queue is implemented through a time wheel.
// The elements in the queue are deleted regularly according to the expiration time.
func newExpireQueue(tick time.Duration, wheelSize int64) *expireQueue {
queue := &expireQueue{
tick: tick,
wheelSize: wheelSize,
tw: timingwheel.NewTimingWheel(tick, wheelSize),
timers: make(map[string]*timingwheel.Timer),
}
// Start a goroutine to handle expired entries
queue.tw.Start()
return queue
}
// add scheduled expired tasks.
// When each scheduled task expires, it will be executed as an independent goroutine.
func (q *expireQueue) add(key string, expireTime time.Time, f func()) {
q.mu.Lock()
defer q.mu.Unlock()
d := expireTime.Sub(currentTime())
timer := q.tw.AfterFunc(d, q.task(key, f))
q.timers[key] = timer
return
}
// update the expiration time of the key element
func (q *expireQueue) update(key string, expireTime time.Time, f func()) {
q.mu.Lock()
defer q.mu.Unlock()
if timer, ok := q.timers[key]; ok {
timer.Stop()
}
d := expireTime.Sub(currentTime())
timer := q.tw.AfterFunc(d, q.task(key, f))
q.timers[key] = timer
}
// remove element key
func (q *expireQueue) remove(key string) {
q.mu.Lock()
defer q.mu.Unlock()
if timer, ok := q.timers[key]; ok {
timer.Stop()
delete(q.timers, key)
}
}
// clear the queue
func (q *expireQueue) clear() {
q.tw.Stop()
q.tw = timingwheel.NewTimingWheel(q.tick, q.wheelSize)
q.timers = make(map[string]*timingwheel.Timer)
// Restart a goroutine to process expired entries
q.tw.Start()
}
// stop the running of the time wheel queue
func (q *expireQueue) stop() {
q.tw.Stop()
}
func (q *expireQueue) task(key string, f func()) func() {
return func() {
f()
q.mu.Lock()
delete(q.timers, key)
q.mu.Unlock()
}
}