/
main.go
214 lines (186 loc) · 6.16 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package server
import (
"fmt"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/labbcb/rnnr/models"
log "github.com/sirupsen/logrus"
)
// Main is a main instance.
type Main struct {
Router *mux.Router
DB *DB
ServiceInfo *models.ServiceInfo
}
// NewMain creates a server and initializes Task and Node endpoints.
// database is URI to MongoDB (without database name, which is rnnr).
// sleepTimes defines the time in seconds that main will sleep after task management iteration.
func NewMain(database string, sleepTime time.Duration) (*Main, error) {
connection, err := MongoConnect(database, "rnnr")
if err != nil {
return nil, fmt.Errorf("connecting to MongoDB: %w", err)
}
main := &Main{
Router: mux.NewRouter(),
DB: connection,
ServiceInfo: &models.ServiceInfo{
ID: "rnnr",
Name: "RNNR",
Type: &models.ServiceType{
Group: "org.bcblab",
Artifact: "tes",
Version: "1.0.0",
},
Description: "Distributed task execution system for scaling reproducible workflows",
Storage: []string{"Local", "NFS"},
UpdatedAt: time.Now(),
Version: "1.4.1",
},
}
main.register()
go main.StartTaskManager(sleepTime)
return main, nil
}
// StartTaskManager starts task management.
// It will iterate over: 1) queued tasks; 2) initialized tasks; and 3) running tasks.
// Then it will sleepTime seconds and start over.
func (m *Main) StartTaskManager(sleepTime time.Duration) {
for {
if err := m.InitializeTasks(); err != nil {
log.WithError(err).Warn("Unable to initialize tasks.")
}
if err := m.RunTasks(); err != nil {
log.WithError(err).Warn("Unable to run tasks.")
}
if err := m.CheckTasks(); err != nil {
log.WithError(err).Warn("Unable to check tasks.")
}
time.Sleep(sleepTime)
}
}
// InitializeTasks iterates over all Queued tasks requesting a computing node for each task.
// The selected node is assigned to perform the task. The task changes to the Initializing state.
// If no active node has enough computing resources to perform the task the same is kept in queue.
func (m *Main) InitializeTasks() error {
tasks, err := m.DB.ListTasks(0, 0, models.Full, nil, []models.State{models.Queued})
if err != nil {
return err
}
for _, task := range tasks {
node, err := m.RequestNode(task.Resources)
switch err.(type) {
case nil:
task.Host = node.Host
task.State = models.Initializing
now := time.Now()
task.Logs[0].StartTime = &now
if err := m.DB.UpdateTask(task); err != nil {
log.WithError(err).WithFields(log.Fields{"id": task.ID, "name": task.Name}).Error("Unable to update task.")
continue
}
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host}).Info("Task initialized.")
case *NoActiveNodes:
log.Warn("No active nodes")
case *NoEnoughResources:
default:
log.WithError(err).WithFields(log.Fields{"id": task.ID, "name": task.Name}).Error("Unable to request node.")
}
}
return nil
}
// RunTasks tries to start initialized tasks.
func (m *Main) RunTasks() error {
tasks, err := m.DB.ListTasks(0, 0, models.Full, nil, []models.State{models.Initializing})
if err != nil {
return err
}
ch := make(chan *models.Task)
wg := &sync.WaitGroup{}
wg.Add(len(tasks))
for _, task := range tasks {
node, err := m.DB.GetNode(task.Host)
if err != nil {
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host}).Error("Unable to get node.")
continue
}
go m.RunTask(task, node, ch, wg)
}
go func() {
wg.Wait()
close(ch)
}()
for task := range ch {
if err := m.DB.UpdateTask(task); err != nil {
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "error": err}).Warn("Unable to update task.")
}
}
return nil
}
// RunTask remotely starts a task.
func (m *Main) RunTask(task *models.Task, node *models.Node, res chan<- *models.Task, wg *sync.WaitGroup) {
defer wg.Done()
switch err := RemoteRun(task, node.Address()).(type) {
case nil:
task.State = models.Running
task.Metrics = &models.Metrics{}
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host}).Info("Task running.")
case *NetworkError:
log.WithError(err).WithFields(log.Fields{"id": task.ID, "host": task.Host}).Warn("Network error.")
default:
task.State = models.SystemError
now := time.Now()
task.Logs[0].EndTime = &now
task.Logs[0].SystemLogs = []string{err.Error()}
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host, "state": task.State, "error": err}).Error("Unable to run task.")
}
res <- task
}
// CheckTasks will iterate over running tasks checking if they have been completed well or not.
// It runs concurrently.
func (m *Main) CheckTasks() error {
tasks, err := m.DB.ListTasks(0, 0, models.Full, nil, []models.State{models.Running})
if err != nil {
return err
}
ch := make(chan *models.Task)
wg := &sync.WaitGroup{}
wg.Add(len(tasks))
for _, task := range tasks {
node, err := m.DB.GetNode(task.Host)
if err != nil {
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host}).Error("Unable to get node.")
continue
}
go CheckTask(task, node, ch, wg)
}
go func() {
wg.Wait()
close(ch)
}()
for task := range ch {
if err := m.DB.UpdateTask(task); err != nil {
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "error": err}).Error("Unable to update task.")
}
}
return nil
}
// CheckTask remotely check a running task.
func CheckTask(task *models.Task, node *models.Node, res chan<- *models.Task, wg *sync.WaitGroup) {
defer wg.Done()
switch err := RemoteCheck(task, node.Address()).(type) {
case nil:
if task.State != models.Running {
now := time.Now()
task.Logs[0].EndTime = &now
log.WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host, "state": task.State}).Info("Task finished.")
}
case *NetworkError:
log.WithError(err).WithFields(log.Fields{"id": task.ID, "host": task.Host}).Warn("Network error.")
default:
task.State = models.SystemError
task.Logs[0].SystemLogs = append(task.Logs[0].SystemLogs, err.Error())
log.WithError(err).WithFields(log.Fields{"id": task.ID, "name": task.Name, "host": task.Host, "state": task.State}).Error("Unable to check task.")
}
res <- task
}