forked from alex023/clock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.go
95 lines (86 loc) · 2.42 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package clock
import (
"github.com/HuKeping/rbtree"
"log"
"runtime/debug"
"sync/atomic"
"time"
)
// Job External access interface for timed tasks
type Job interface {
C() <-chan Job //C Get a Chan,which can get message if Job is executed
Count() uint64 //计数器,表示已执行(或触发)的次数
Max() uint64 //允许执行的最大次数
Cancel() //撤销加载的任务,不再定时执行
isAvailable() bool //return true,if job not action or not cancel
}
// jobItem implementation of "Job" interface and "rbtree.Item" interface
type jobItem struct {
id uint64 //唯一键值,内部由管理器生成,以区分同一时刻的不同任务事件
actionCount uint64 //计数器,表示已执行(或触发)的次数
actionMax uint64 //允许执行的最大次数
intervalTime time.Duration //间隔时间
createTime time.Time //创建时间,略有误差
actionTime time.Time //计算得出的最近一次执行时间点
fn func() //事件函数
msgChan chan Job //消息通道,执行时,控制器通过该通道向外部传递消息
cancelFlag int32
clock *Clock
}
// Less Based rbtree ,implements Item interface for sort
func (je jobItem) Less(another rbtree.Item) bool {
item, ok := another.(*jobItem)
if !ok {
return false
}
if !je.actionTime.Equal(item.actionTime) {
return je.actionTime.Before(item.actionTime)
}
return je.id < item.id
}
func (je *jobItem) C() <-chan Job {
return je.msgChan
}
func (je *jobItem) action(async bool) {
je.actionCount++
if async {
go safeCall(je.fn)
} else {
safeCall(je.fn)
}
select {
case je.msgChan <- je:
default:
//some times,client should not receive msgChan,so must discard jobItem when blocking
}
}
func (je *jobItem) Cancel() {
if atomic.CompareAndSwapInt32(&je.cancelFlag, 0, 1) {
je.clock.rmJob(je)
je.innerCancel()
}
}
func (je *jobItem) isAvailable() bool {
return je.cancelFlag == 0
}
func (je *jobItem) innerCancel() {
je.clock = nil
close(je.msgChan)
}
// Count implement for Job
func (je jobItem) Count() uint64 {
return je.actionCount
}
// Max implement for Job
func (je jobItem) Max() uint64 {
return je.actionMax
}
func safeCall(fn func()) {
defer func() {
if err := recover(); err != nil {
log.Printf("[clock] recovering reason is %+v. More detail:", err)
log.Println(string(debug.Stack()))
}
}()
fn()
}