/
cron.go
115 lines (94 loc) · 2.63 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package cron
import (
"sync"
"github.com/mdouchement/logger"
"github.com/mdouchement/shigoto/pkg/runner"
"github.com/mdouchement/shigoto/pkg/shigoto"
"github.com/robfig/cron/v3"
)
// A Pool contains a pool of crons and carries lots of helping methods.
type Pool struct {
mu sync.Mutex
logger logger.Logger
running map[string]bool
cron map[string]*cron.Cron
shigoto map[string]*shigoto.Shigoto
}
// New returns a new Pool.
func New(l logger.Logger) *Pool {
return &Pool{
logger: l,
running: make(map[string]bool),
cron: make(map[string]*cron.Cron),
shigoto: make(map[string]*shigoto.Shigoto),
}
}
// Register adds the given shigoto to the cron pool.
func (p *Pool) Register(s *shigoto.Shigoto) {
p.mu.Lock()
defer p.mu.Unlock()
p.shigoto[s.Name] = s
cron := cron.New(cron.WithLogger(
cron.PrintfLogger(p.logger)),
cron.WithChain(
cron.SkipIfStillRunning(cron.PrintfLogger(p.logger)),
),
)
p.cron[s.Name] = cron
for _, baito := range s.Baito {
chain := runner.Chain(baito.Commands()...)
chain.AttachLogger(p.logger)
cron.Schedule(baito.Schedule(), chain)
p.logger.Infof(`New job registered "%s" - "%s"`, baito.Name(), baito.Schedule())
}
}
// Get returns the registred shigoto according the given name.
func (p *Pool) Get(name string) *shigoto.Shigoto {
p.mu.Lock()
defer p.mu.Unlock()
return p.shigoto[name]
}
// Start start all schedulers.
func (p *Pool) Start() {
p.mu.Lock()
defer p.mu.Unlock()
for name, cron := range p.cron {
if p.running[name] {
continue
}
p.logger.Infof("Starting scheduler '%s' with %d jobs", name, len(p.shigoto[name].Baito))
cron.Start()
p.running[name] = true
}
}
// Stop stops and waits for the termination of the tasks of all schedulers.
func (p *Pool) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
for name, cron := range p.cron {
p.logger.Infof("Shuting down the scheduler '%s'...", name)
<-cron.Stop().Done() // Wait for the termination of the tasks.
delete(p.running, name)
}
}
// StartShigoto starts the scheduler of the given shigoto's name.
func (p *Pool) StartShigoto(name string) {
p.mu.Lock()
defer p.mu.Unlock()
if p.running[name] {
return
}
p.logger.Infof("Starting scheduler '%s' with %d jobs", name, len(p.shigoto[name].Baito))
p.cron[name].Start()
p.running[name] = true
}
// StopShigoto stops and waits for the termination of the tasks of the given shigoto.
func (p *Pool) StopShigoto(name string) {
p.mu.Lock()
defer p.mu.Unlock()
p.logger.Infof("Shuting down the scheduler '%s'...", name)
<-p.cron[name].Stop().Done() // Wait for the termination of the tasks.
delete(p.shigoto, name)
delete(p.cron, name)
delete(p.running, name)
}