-
Notifications
You must be signed in to change notification settings - Fork 2
/
schedule_manager.go
160 lines (136 loc) · 3.3 KB
/
schedule_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2020 The GoSchedule Authors. All rights reserved.
// Use of this source code is governed by BSD
// license that can be found in the LICENSE file.
package core
import (
"context"
"errors"
"io"
"sync"
"time"
u "github.com/jasonjoo2010/goschedule/core/utils"
"github.com/jasonjoo2010/goschedule/definition"
"github.com/jasonjoo2010/goschedule/log"
"github.com/jasonjoo2010/goschedule/store"
"github.com/jasonjoo2010/goschedule/types"
"github.com/jasonjoo2010/goschedule/utils"
)
type ScheduleManager struct {
io.Closer
mu sync.Mutex
wg sync.WaitGroup
cfg types.ScheduleConfig
ctx context.Context
ctxCancel context.CancelFunc
store store.Store
scheduler *definition.Scheduler
workerSet *u.WorkerSet
}
func initCfg(cfg *types.ScheduleConfig) error {
if cfg.DeathTimeout <= 0 {
cfg.DeathTimeout = 60 * time.Second
}
if cfg.HeartbeatInterval <= 0 {
cfg.HeartbeatInterval = 5 * time.Second
}
if cfg.StallAfterStartup <= 0 {
cfg.StallAfterStartup = 10 * time.Second
}
if cfg.ScheduleInterval <= 0 {
cfg.ScheduleInterval = 10 * time.Second
}
if cfg.HeartbeatInterval*2 > cfg.DeathTimeout {
return errors.New("Heartbeat interval should be no more than half of the death timeout")
}
return nil
}
func New(cfg types.ScheduleConfig, store store.Store) (*ScheduleManager, error) {
if err := initCfg(&cfg); err != nil {
return nil, err
}
// generate uuid
seq, err := store.Sequence()
if err != nil {
return nil, err
}
uuid := utils.GenerateUUID(seq)
s := &definition.Scheduler{
ID: uuid,
Enabled: true,
}
m := &ScheduleManager{
store: store,
scheduler: s,
workerSet: u.NewWorkerSet(),
cfg: cfg,
}
return m, nil
}
func (s *ScheduleManager) Store() store.Store {
return s.store
}
func (s *ScheduleManager) Scheduler() definition.Scheduler {
return *s.scheduler
}
func (s *ScheduleManager) Start() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.ctx != nil {
return errors.New("Manager has already been started")
}
s.ctx, s.ctxCancel = context.WithCancel(context.Background())
s.wg.Add(2)
go utils.LoopContext(s.ctx,
s.cfg.HeartbeatInterval,
s.registerInfo,
func() {
defer s.wg.Done()
defer s.cleanScheduler(s.scheduler.ID)
})
go utils.LoopContext(s.ctx,
s.cfg.ScheduleInterval,
s.schedule,
func() {
defer s.wg.Done()
defer s.stopAllWorkers()
})
return nil
}
func (s *ScheduleManager) Shutdown() {
s.Close()
}
func (s *ScheduleManager) cleanup() {
s.cleanScheduler(s.scheduler.ID)
log.Info("Manager has been shutdown")
}
// Shutdown close the manager. Please use `Close()` instead
// And it won't close the store it uses anymore. The lifecycle of the store should be maintained outside.
func (s *ScheduleManager) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if utils.ContextDone(s.ctx) {
return errors.New("Manager has already been closed")
}
s.ctxCancel()
defer s.cleanup()
if s.cfg.ShutdownTimeout == 0 {
s.wg.Wait()
return nil
}
// wait with a timeout
notifyC := make(chan int, 1)
timeout := time.NewTimer(s.cfg.ShutdownTimeout)
go func() {
// wait for heartbeat and schedule loops to stop
s.wg.Wait()
close(notifyC)
}()
select {
case <-notifyC:
timeout.Stop()
return nil
case <-timeout.C:
log.Warn("Fail to wait for all loops to stop, force to quit")
return nil
}
}