forked from ouqiang/timewheel
-
Notifications
You must be signed in to change notification settings - Fork 0
/
timewheel.go
179 lines (155 loc) · 4.19 KB
/
timewheel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package timewheel
import (
"container/list"
"time"
)
// @author qiang.ou<qingqianludao@gmail.com>
// Job 延时任务回调函数
type Job func(interface{})
// TaskData 回调函数参数类型
// TimeWheel 时间轮
type TimeWheel struct {
interval time.Duration // 指针每隔多久往前移动一格
ticker *time.Ticker
slots []*list.List // 时间轮槽
// key: 定时器唯一标识 value: 定时器所在的槽, 主要用于删除定时器, 不会出现并发读写,不加锁直接访问
timer map[interface{}]int
currentPos int // 当前指针指向哪一个槽
slotNum int // 槽数量
job Job // 定时器回调函数
addTaskChannel chan Task // 新增任务channel
removeTaskChannel chan interface{} // 删除任务channel
stopChannel chan bool // 停止定时器channel
}
// Task 延时任务
type Task struct {
delay time.Duration // 延迟时间
circle int // 时间轮需要转动几圈
key interface{} // 定时器唯一标识, 用于删除定时器
data interface{} // 回调函数参数
}
// New 创建时间轮
func New(interval time.Duration, slotNum int, job Job) *TimeWheel {
if interval <= 0 || slotNum <= 0 || job == nil {
return nil
}
tw := &TimeWheel{
interval: interval,
slots: make([]*list.List, slotNum),
timer: make(map[interface{}]int),
currentPos: 0,
job: job,
slotNum: slotNum,
addTaskChannel: make(chan Task),
removeTaskChannel: make(chan interface{}),
stopChannel: make(chan bool),
}
tw.initSlots()
return tw
}
// 初始化槽,每个槽指向一个双向链表
func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.slotNum; i++ {
tw.slots[i] = list.New()
}
}
// Start 启动时间轮
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.start()
}
// Stop 停止时间轮
func (tw *TimeWheel) Stop() {
tw.stopChannel <- true
}
// AddTimer 添加定时器 key为定时器唯一标识
func (tw *TimeWheel) AddTimer(delay time.Duration, key interface{}, data interface{}) {
if delay < 0 {
return
}
tw.addTaskChannel <- Task{delay: delay, key: key, data: data}
}
// RemoveTimer 删除定时器 key为添加定时器时传递的定时器唯一标识
func (tw *TimeWheel) RemoveTimer(key interface{}) {
if key == nil {
return
}
tw.removeTaskChannel <- key
}
func (tw *TimeWheel) start() {
for {
select {
case <-tw.ticker.C:
tw.tickHandler()
case task := <-tw.addTaskChannel:
tw.addTask(&task)
case key := <-tw.removeTaskChannel:
tw.removeTask(key)
case <-tw.stopChannel:
tw.ticker.Stop()
return
}
}
}
func (tw *TimeWheel) tickHandler() {
l := tw.slots[tw.currentPos]
tw.scanAndRunTask(l)
if tw.currentPos == tw.slotNum-1 {
tw.currentPos = 0
} else {
tw.currentPos++
}
}
// 扫描链表中过期定时器, 并执行回调函数
func (tw *TimeWheel) scanAndRunTask(l *list.List) {
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.circle > 0 {
task.circle--
e = e.Next()
continue
}
go tw.job(task.data)
next := e.Next()
l.Remove(e)
if task.key != nil {
delete(tw.timer, task.key)
}
e = next
}
}
// 新增任务到链表中
func (tw *TimeWheel) addTask(task *Task) {
pos, circle := tw.getPositionAndCircle(task.delay)
task.circle = circle
tw.slots[pos].PushBack(task)
if task.key != nil {
tw.timer[task.key] = pos
}
}
// 获取定时器在槽中的位置, 时间轮需要转动的圈数
func (tw *TimeWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.interval.Seconds())
circle = int(delaySeconds / intervalSeconds / tw.slotNum)
pos = int(tw.currentPos+delaySeconds/intervalSeconds) % tw.slotNum
return
}
// 从链表中删除任务
func (tw *TimeWheel) removeTask(key interface{}) {
// 获取定时器所在的槽
position, ok := tw.timer[key]
if !ok {
return
}
// 获取槽指向的链表
l := tw.slots[position]
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.key == key {
delete(tw.timer, task.key)
l.Remove(e)
}
e = e.Next()
}
}