-
Notifications
You must be signed in to change notification settings - Fork 0
/
fixed.go
139 lines (117 loc) · 3.75 KB
/
fixed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package fixed
import (
"context"
"encoding/json"
"fmt"
"github.com/robfig/cron/v3"
"github.com/xpwu/go-log/log"
"github.com/xpwu/timer/scheduler"
"github.com/xpwu/timer/task/callback"
"github.com/xpwu/timer/task/flag"
"time"
)
type Fixed struct {
TryCount uint16 `json:"try"`
Id string `json:"id"`
TimePoint scheduler.UnixTimeSecond `json:"tp"`
OpFlag string `json:"op"`
}
func NewFixedTask(f *Fixed) scheduler.Task {
return append([]byte{flag.Fixed}, f.ToBytes()...)
}
/**
可能的情况分析:
1、id1的fixed 执行时间点依次为 t1 t2 t3 t4 t5 ... ,t1时运行Run(), Run()中添加时间点为t2的任务,但是scheduler在
删除t1时,出现异常,则scheduler中同时存在id1的时间点为t1与t2的两个任务。
分析:异常恢复后,可能会同时执行t1与t2,t2可能会连续执行到t5,t1会再次生成t2 t3 t4 t5的任务,但是t1生成的t5与t2生成的t5
是一样的,在合并task时,最终会变成只有一个task。中间多执行的t2 t3 t4 通过回调方的幂等而过滤(因为重试机制,回调方必须实现
幂等)。//todo 后续可以考虑尽可能少的出现重复执行的时间点
*/
func (f *Fixed) Run(ctx context.Context, schedulerTime scheduler.UnixTimeSecond) {
ctx, logger := log.WithCtx(ctx)
logger.PushPrefix(fmt.Sprintf("run fixed. id=%s, timepoint=%d", f.Id, f.TimePoint))
logger.Debug("start")
defer func() {
logger.Info("end")
}()
cronTimeB, opF, ok := db.Get(f.Id)
// 已经删除或者OpFlag不相同的task都不真正的执行
if !ok || opF != f.OpFlag {
logger.Info("not run because of being deleted")
return
}
// 只有非重试的情况下,才添加下一次的scheduler
if f.TryCount == 0 {
cronTime := NewCronTimeFromBytes(cronTimeB)
// 增加一个小的偏移,以防端点处的bug
next := scheduler.UnixTimeSecond(cronTime.Next(time.Unix(int64(f.TimePoint), 1000)).Unix())
fixed := &Fixed{
TryCount: 0,
Id: f.Id,
TimePoint: next,
OpFlag: f.OpFlag,
}
tk := NewFixedTask(fixed)
scheduler.AddTask(next, []scheduler.Task{tk})
}
req := &callback.Request{
TimePoint: f.TimePoint,
Id: f.Id,
}
ok = callback.Callback(ctx, confValue.CallbackUrl, req)
if ok {
return
}
// retry, 超过最大重试时间,直接放弃
if int(f.TryCount) >= len(callback.ReTryDuration)-1 {
logger.Warning("discard because of exceeding max retries(~20day)")
return
}
tc := f.TryCount + 1
// 从当前时间计算下次重试的时间
next := scheduler.UnixTimeSecond(time.Now().Unix()) + callback.ReTryDuration[tc]
newF := &Fixed{
TryCount: tc,
Id: f.Id,
TimePoint: f.TimePoint,
OpFlag: f.OpFlag,
}
scheduler.AddTask(next, []scheduler.Task{NewFixedTask(newF)})
}
func (f *Fixed) ToBytes() []byte {
r, err := json.Marshal(f)
if err != nil {
panic(err)
}
return r
}
func FromBytes(b []byte) *Fixed {
f := &Fixed{}
_ = json.Unmarshal(b, f)
return f
}
type CronTime struct {
cron.SpecSchedule
// json:"Location" 覆盖嵌套的 Location 域
LocationStr string `json:"Location"`
StartTime scheduler.UnixTimeSecond
RawString string `json:"raw"`
}
func NewCronTimeFromSpec(s *cron.SpecSchedule, start scheduler.UnixTimeSecond, rawString string) *CronTime {
return &CronTime{
SpecSchedule: *s,
LocationStr: s.Location.String(),
StartTime: start,
RawString: rawString,
}
}
func NewCronTimeFromBytes(j []byte) *CronTime {
ret := &CronTime{}
_ = json.Unmarshal(j, ret)
ret.Location, _ = time.LoadLocation(ret.LocationStr)
return ret
}
func (c *CronTime) ToBytes() []byte {
d, _ := json.Marshal(c)
return d
}