forked from bearded-web/bearded
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
150 lines (135 loc) · 3.47 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package scheduler
import (
"sync"
"github.com/Sirupsen/logrus"
"github.com/bearded-web/bearded/models/scan"
"github.com/bearded-web/bearded/pkg/manager"
)
type Scheduler interface {
// this method blocks until context is done or returns pack of jobs
// GetJobs(context.Context, *agent.Agent) ([]*agent.Job, error)
AddScan(*scan.Scan) error
GetSession() (*scan.Session, error)
UpdateScan(*scan.Scan) error
}
type MemoryScheduler struct {
mgr *manager.Manager
scans map[string]*scan.Scan
rw sync.RWMutex
}
var _ Scheduler = &MemoryScheduler{} // check interface compatibility
// Memory scheduler is just a prototype of scheduler, it mustn't be used in production environment
func NewMemoryScheduler(mgr *manager.Manager) *MemoryScheduler {
return &MemoryScheduler{
scans: map[string]*scan.Scan{},
mgr: mgr,
}
}
//func (s *MemoryScheduler) GetJobs(ctx context.Context, agnt *agent.Agent) ([]*agent.Job, error) {
//
//}
func (s *MemoryScheduler) AddScan(sc *scan.Scan) error {
return s.UpdateScan(sc)
}
func (s *MemoryScheduler) UpdateScan(sc *scan.Scan) error {
s.rw.Lock()
s.scans[s.mgr.FromId(sc.Id)] = sc
s.rw.Unlock()
return nil
}
func (s *MemoryScheduler) GetSession() (*scan.Session, error) {
s.rw.RLock()
defer s.rw.RUnlock()
scans:
for id, sc := range s.scans {
sessions:
for _, sess := range sc.Sessions {
switch sess.Status {
case scan.StatusCreated:
sess.Status = scan.StatusQueued
err := s.mgr.Scans.UpdateSession(sc, sess)
if err != nil {
if s.mgr.IsNotFound(err) {
delete(s.scans, id)
continue scans
}
logrus.Error(err)
continue scans
}
return sess, nil
case scan.StatusQueued:
// all scans session run in sequence order
continue scans
case scan.StatusFinished:
// go to the next session
continue sessions
case scan.StatusWorking:
// if session has children, then we take one which is just created and return it
if len(sess.Children) > 0 {
child := s.GetChild(sc, sess.Children)
if child != nil {
child.Status = scan.StatusQueued
err := s.mgr.Scans.UpdateSession(sc, child)
if err != nil {
if s.mgr.IsNotFound(err) {
delete(s.scans, id)
continue scans
}
logrus.Error(err)
continue scans
}
return child, nil
}
}
// this scan is still working go to the next one
continue scans
case scan.StatusPaused:
continue scans
case scan.StatusFailed:
delete(s.scans, id)
continue scans
}
}
// it looks like all session is finished, delete scan from queue
delete(s.scans, id)
}
return nil, nil
}
func (s *MemoryScheduler) GetChild(sc *scan.Scan, sessions []*scan.Session) *scan.Session {
id := s.mgr.FromId(sc.Id)
sessions:
for _, sess := range sessions {
switch sess.Status {
case scan.StatusCreated:
sess.Status = scan.StatusQueued
err := s.mgr.Scans.UpdateSession(sc, sess)
if err != nil {
if s.mgr.IsNotFound(err) {
delete(s.scans, id)
return nil
}
logrus.Error(err)
return nil
}
return sess
case scan.StatusQueued:
// all scans session run in sequence order
return nil
case scan.StatusFinished:
// go to the next session
continue sessions
case scan.StatusWorking:
if len(sess.Children) > 0 {
return s.GetChild(sc, sess.Children)
}
return nil
case scan.StatusPaused:
return nil
case scan.StatusFailed:
// sub session might fail
// delete(s.scans, id)
return nil
}
}
return nil
}