Skip to content

Commit

Permalink
refactor: Job.calcState 复用 now
Browse files Browse the repository at this point in the history
  • Loading branch information
caixw committed Sep 20, 2023
1 parent d33f8bf commit c57ced7
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 17 deletions.
17 changes: 6 additions & 11 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func (j *Job) Name() string { return j.name }

// Next 返回下次执行的时间点
//
// 如果返回值的 IsZero() 为 true,则表示该任务不需要再执行,
// 一般为 At 之类的一次任务。
// 如果返回值的 IsZero() 为 true,则表示该任务不需要再执行。
func (j *Job) Next() time.Time {
j.locker.RLock()
defer j.locker.RUnlock()
Expand Down Expand Up @@ -71,15 +70,13 @@ func (j *Job) Err() error {
// 即从任务执行完成的时间点计算下一次执行时间。
func (j *Job) Delay() bool { return j.delay }

// goroutine 启动需要时间,短时间任务,可能存在 goroutine 未初始化完成,
// 第二次调用已经开始,所以此处先初始化相关的状态信息,使第二次调用处理非法状态。
func (j *Job) calcState() {
func (j *Job) calcState(now time.Time) {
j.locker.Lock()
defer j.locker.Unlock()

j.state = Running
j.prev = j.next
j.next = j.s.Next(time.Now()) // 先计算 next,保证调用者重复调用 run 时能获取正确的 next。
j.next = j.s.Next(now) // 先计算 next,保证调用者重复调用 run 时能获取正确的 next。
}

// 运行当前的任务
Expand All @@ -100,7 +97,6 @@ func (j *Job) run(at time.Time, errlog, infolog Logger) {
} else {
j.err = fmt.Errorf("%v", msg)
}

j.state = Failed

if errlog != nil {
Expand All @@ -109,8 +105,7 @@ func (j *Job) run(at time.Time, errlog, infolog Logger) {
}
}()

j.err = j.f(at)
if j.err != nil {
if j.err = j.f(at); j.err != nil {
j.state = Failed

if errlog != nil {
Expand Down Expand Up @@ -163,7 +158,7 @@ func (s *Server) Tick(name string, f JobFunc, dur time.Duration, imm, delay bool

// Cron 使用 cron 表达式新建一个定时任务
//
// 具体文件可以参考 schedulers/cron.Parse
// 具体文件可以参考 [cron.Parse]
func (s *Server) Cron(name string, f JobFunc, spec string, delay bool) {
scheduler, err := cron.Parse(spec, s.Location())
if err != nil {
Expand All @@ -174,7 +169,7 @@ func (s *Server) Cron(name string, f JobFunc, spec string, delay bool) {

// At 添加 At 类型的定时器
//
// 具体文件可以参考 schedulers/at.At
// 具体文件可以参考 [at.At]
func (s *Server) At(name string, f JobFunc, t time.Time, delay bool) {
s.New(name, f, at.At(t), delay)
}
Expand Down
2 changes: 1 addition & 1 deletion job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestServer_Jobs(t *testing.T) {
a.Equal(len(jobs), len(srv.jobs))
}

func TestServer_NewCron(t *testing.T) {
func TestServer_Cron(t *testing.T) {
a := assert.New(t, false)

srv := NewServer(nil, nil, nil)
Expand Down
7 changes: 4 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ func (s *Server) schedule(ctx context.Context) {

now := time.Now()
for _, j := range s.jobs {
next := j.Next() // j.Next() 需要锁,防止反复计算。
if next.After(now) || next.IsZero() {
if next := j.Next(); next.After(now) || next.IsZero() {
break
}

if j.State() == Running && j.Delay() {
continue
}

j.calcState() // 计算关键信息
// j.run 启动需要时间,可能存在 j.run 未初始化完成,第二次调用已经开始,
// 所以此处先初始化相关的状态信息,使第二次调用处理非法状态。
j.calcState(now)
go j.run(now, s.erro, s.info)
}

Expand Down
4 changes: 2 additions & 2 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ func TestServer_Serve_delay(t *testing.T) {
for i := 1; i < len(tickers1); i++ {
prev := tickers1[i-1].Unix()
curr := tickers1[i].Unix()
delta := math.Abs(float64(curr - prev)) // 不可能正好相差 2 秒,但应该不会超过 3 秒。
a.True(delta <= 3, "%d != %d", prev, curr)
delta := math.Abs(float64(curr - prev)) // 缺失一次执行,应该介于 4-6 之间?
a.True(delta >= 4 && delta < 6, "%d != %d", prev, curr)
}
for i := 1; i < len(tickers2); i++ {
prev := tickers2[i-1].Unix()
Expand Down

0 comments on commit c57ced7

Please sign in to comment.