-
Notifications
You must be signed in to change notification settings - Fork 0
/
delay_task.go
72 lines (59 loc) · 1.33 KB
/
delay_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package cron
import (
"fmt"
"time"
. "github.com/lybxkl/gmqtt/common/log"
)
var DelayTaskManager = NewMemDelayTaskManage()
type ID = string
type DelayTask struct {
ID ID
DealTime time.Duration // 处理时间
Data interface{}
Fn func(data interface{})
CancelCallback func()
icron Icron
}
func (g *DelayTask) Run() {
defer func() {
if err := recover(); err != nil {
Log.Error(err)
}
g.icron.Remove(g.ID)
}()
g.Fn(g.Data)
}
type DelayTaskManage interface {
Run(*DelayTask) error
Cancel(ID)
}
type memDelayTaskManage struct {
icron Icron
}
func NewMemDelayTaskManage() DelayTaskManage {
return &memDelayTaskManage{icron: Get()}
}
func (d *memDelayTaskManage) Run(task *DelayTask) error {
Log.Debugf("添加%s的延迟发送任务, 延迟时间:%ds", task.ID, task.DealTime)
if task.DealTime <= 0 {
//task.DealTime = 1
go func() {
task.Fn(task.Data)
}()
return nil
}
task.icron = d.icron
err := d.icron.AddJob(fmt.Sprintf("@every %ds", task.DealTime), task.ID, task)
return err
}
func (d *memDelayTaskManage) Cancel(id ID) {
Log.Debugf("取消%s的延迟发送任务", id)
job, exist := d.icron.GetJob(id)
if !exist {
return
}
d.icron.Remove(id)
if task, ok := job.(*DelayTask); ok {
task.CancelCallback() // 执行取消任务回调方法
}
}