-
Notifications
You must be signed in to change notification settings - Fork 60
/
timewheel.go
173 lines (149 loc) · 3.79 KB
/
timewheel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package timewheel
import (
"context"
"sync"
"time"
)
// Task 时间轮任务
type Task struct {
ctx context.Context
uniqID string // 任务唯一标识
attempts uint16 // 当前尝试的次数
maxAttempts uint16 // 最大尝试次数
round int // 延迟执行的轮数
remainder time.Duration // 任务执行前的剩余延迟(小于时间轮精度)
deferFn func(attempts uint16) time.Duration // 返回任务下一次延迟执行的时间
callback func(ctx context.Context, taskID string) error // 任务回调函数
}
// TimeWheel 单时间轮
type TimeWheel interface {
// AddTask 添加一个任务,到期被执行,默认仅执行一次;若指定了重试次数,则在返回`error`后重试;
// 注意:任务是异步执行的,`ctx`一旦被取消,则任务也随之取消;如要保证任务不被取消,可以使用`yiigo.DetachContext`(如果您的Go版本>=1.21.0,请使用 `context.WithoutCancel`)
AddTask(ctx context.Context, taskID string, fn func(ctx context.Context, taskID string) error, options ...Option)
// Run 运行时间轮
Run()
// Stop 终止时间轮
Stop()
}
type timewheel struct {
slot int
tick time.Duration
size int
bucket []sync.Map
stop chan struct{}
}
func (tw *timewheel) AddTask(ctx context.Context, taskID string, fn func(ctx context.Context, taskID string) error, options ...Option) {
task := &Task{
ctx: ctx,
uniqID: taskID,
callback: fn,
maxAttempts: 1,
deferFn: func(attempts uint16) time.Duration {
return 0
},
}
for _, fn := range options {
fn(task)
}
tw.requeue(task)
}
func (tw *timewheel) Run() {
go tw.scheduler()
}
func (tw *timewheel) Stop() {
select {
case <-tw.stop: // 时间轮已停止
return
default:
}
close(tw.stop)
}
func (tw *timewheel) requeue(task *Task) {
select {
case <-task.ctx.Done(): // 任务被取消
return
case <-tw.stop: // 时间轮已停止
return
default:
}
// 任务已达到最大尝试次数
if task.attempts >= task.maxAttempts {
return
}
task.attempts++
tick := tw.tick.Nanoseconds()
delay := task.deferFn(task.attempts)
duration := delay.Nanoseconds()
// 圈数
task.round = int(duration / (tick * int64(tw.size)))
// 槽位
slot := (int(duration/tick)%tw.size + tw.slot) % tw.size
if slot == tw.slot {
if task.round == 0 {
task.remainder = delay
go tw.do(task)
return
}
task.round--
}
// 剩余延迟
task.remainder = time.Duration(duration % tick)
tw.bucket[slot].Store(task.uniqID, task)
}
func (tw *timewheel) scheduler() {
ticker := time.NewTicker(tw.tick)
defer ticker.Stop()
for {
select {
case <-tw.stop: // 时间轮已停止
return
case <-ticker.C:
tw.slot = (tw.slot + 1) % tw.size
go tw.process(tw.slot)
}
}
}
func (tw *timewheel) process(slot int) {
tw.bucket[slot].Range(func(key, value any) bool {
select {
case <-tw.stop: // 时间轮已停止
return false
default:
}
task := value.(*Task)
if task.round > 0 {
task.round--
return true
}
go tw.do(task)
tw.bucket[slot].Delete(key)
return true
})
}
func (tw *timewheel) do(task *Task) {
defer func() {
if recover() != nil {
tw.requeue(task)
}
}()
if task.remainder > 0 {
time.Sleep(task.remainder)
}
select {
case <-task.ctx.Done(): // 任务被取消
return
default:
}
if err := task.callback(task.ctx, task.uniqID); err != nil {
tw.requeue(task)
}
}
// New 返回一个时间轮实例
func New(tick time.Duration, size int) TimeWheel {
return &timewheel{
tick: tick,
size: size,
bucket: make([]sync.Map, size),
stop: make(chan struct{}),
}
}