/
runJobControl.go
303 lines (252 loc) · 9.27 KB
/
runJobControl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
// Copyright (c) 2016 OpenM++
// This code is licensed under the MIT license (see LICENSE.txt for details)
package main
import (
"context"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/openmpp/go/ompp/db"
"github.com/openmpp/go/ompp/helper"
"github.com/openmpp/go/ompp/omppLog"
ps "github.com/keybase/go-ps"
)
const jobScanInterval = 1123 // timeout in msec, sleep interval between scanning all job directories
const computeStartStopInterval = 3373 // timeout in msec, interval between start or stop computational servers, must be at least 2 * jobScanInterval
const jobQueueScanInterval = 107 // timeout in msec, sleep interval between getting next job from the queue
const jobOuterScanInterval = 5021 // timeout in msec, sleep interval between scanning active job directory
const serverTimeoutDefault = 60 // time in seconds to start or stop compute server
const minJobTickMs int64 = 1597707959000 // unix milliseconds of 2020-08-17 23:45:59
const jobPositionDefault = 20220817 // queue job position by default, e.g. if queue is empty
const maxComputeErrorsDefault = 8 // default errors threshold for compute server or cluster
/*
scan active job directory to find active model run files without run state.
It can be a result of oms restart or server reboot.
if active job file found and no run state then
create run job from active file
add it to the list of "outer" jobs (active jobs without run state)
for each job in the outer list
find model process by pid and executable name
if process exist then wait until it done
check if file still exist
read run_lst row
if no run_lst row then move job file to history as error
else
if run state is not completed then update run state as error
and move file to history according to status
*/
func scanOuterJobs(doneC <-chan bool) {
if !theCfg.isJobControl {
return // job control disabled
}
// map active job file path to file content (run job), it is only job where no run state in RunCatalog
outerJobs := map[string]RunJob{}
activeDir := filepath.Join(theCfg.jobDir, "active")
nActive := len(activeDir)
ptrn := activeDir + string(filepath.Separator) + "*-#-" + theCfg.omsName + "-#-*.json"
for {
// find active job files
fLst := filesByPattern(ptrn, "Error at active job files search")
if len(fLst) <= 0 {
if isExitSleep(jobOuterScanInterval, doneC) {
return
}
continue // no active jobs
}
// find new active jobs since last scan which do not exist in run state list of RunCatalog
for k := range fLst {
if _, ok := outerJobs[fLst[k]]; ok {
continue // this file already in the outer jobs list
}
// get submission stamp, model name, digest and process id from active job file name
stamp, _, mn, dgst, _, _, _, pid := parseActivePath(fLst[k][nActive+1:])
if stamp == "" || mn == "" || dgst == "" || pid <= 0 {
continue // file name is not an active job file name
}
// find run state by model digest and submission stamp
isFound, _ := theRunCatalog.getRunStateBySubmitStamp(dgst, stamp)
if isFound {
continue // this is an active job under oms control
}
// run state not found: create run state from active job file
var jc RunJob
isOk, err := helper.FromJsonFile(fLst[k], &jc)
if err != nil {
omppLog.Log(err)
}
if !isOk || err != nil {
moveActiveJobToHistory(fLst[k], "", stamp, mn, dgst, "no-model-run-time-stamp") // invalid file content: move to history with unknown status
continue
}
// add job into outer jobs list
outerJobs[fLst[k]] = jc
}
// for outer jobs find process by pid and executable name
// if process completed then move job file into the history
for fp, jc := range outerJobs {
proc, err := ps.FindProcess(jc.Pid)
if err == nil && proc != nil &&
strings.HasSuffix(strings.ToLower(jc.CmdPath), strings.ToLower(proc.Executable())) {
continue // model still running
}
// check if job file not exist then remove it from the outer job list
if !fileExist(fp) {
delete(outerJobs, fp)
continue
}
// get run_lst row and move to jib history according to status
// model process does not exist, run status must completed: s=success, x=exit, e=error
// if model status is not completed then it is an error
var rStat string
rp, ok := theCatalog.RunStatus(jc.ModelDigest, jc.RunStamp)
if ok && rp != nil {
rStat = rp.Status
if !db.IsRunCompleted(rStat) {
rStat = db.ErrorRunStatus
_, e := theCatalog.UpdateRunStatus(jc.ModelDigest, jc.RunStamp, db.ErrorRunStatus)
if e != nil {
omppLog.Log(e)
}
}
}
moveActiveJobToHistory(fp, rStat, jc.SubmitStamp, jc.ModelName, jc.ModelDigest, jc.RunStamp)
delete(outerJobs, fp)
}
// wait for doneC or sleep
if isExitSleep(jobOuterScanInterval, doneC) {
return
}
}
}
// scan model run queue, start model runs, start or stop MPI servers
func scanRunJobs(doneC <-chan bool) {
if !theCfg.isJobControl {
return // job control disabled: no queue
}
lastStartStopTs := time.Now().UnixMilli() // last time when start and stop of computational servers done
for {
// get job from the queue and run
if job, isFound, qPath, compUse, hf, e := theRunCatalog.selectJobFromQueue(); isFound && e == nil {
_, e = theRunCatalog.runModel(job, qPath, hf, compUse)
if e != nil {
omppLog.Log(e)
}
} else {
if e != nil {
omppLog.Log(e)
if qPath != "" {
moveJobQueueToFailed(qPath, job.SubmitStamp, job.ModelName, job.ModelDigest, "") // can not run this job: remove from the queue
}
}
}
// check if this is a time to do start or stop computational servers:
// interval must be at least double of job files scan to make sure server state files updated
if lastStartStopTs+computeStartStopInterval < time.Now().UnixMilli() {
// start computational servers or clusters
startNames, maxStatTime, startExes, startArgs := theRunCatalog.selectToStartCompute()
for k := range startNames {
if startExes[k] != "" {
go doStartStopCompute(startNames[k], "start", startExes[k], startArgs[k], maxStatTime)
} else {
doStartOnceCompute(startNames[k]) // special case: server always ready
}
}
// stop computational servers or clusters
stopNames, maxStopTime, stopExes, stopArgs := theRunCatalog.selectToStopCompute()
for k := range stopNames {
if stopExes[k] != "" {
go doStartStopCompute(stopNames[k], "stop", stopExes[k], stopArgs[k], maxStopTime)
} else {
doStopCleanupCompute(stopNames[k]) // special case: server never stop
}
}
lastStartStopTs = time.Now().UnixMilli()
}
// wait for doneC or sleep
if isExitSleep(jobQueueScanInterval, doneC) {
return
}
}
}
// start (or stop) computational server or cluster and create (or delete) ready state file
func doStartStopCompute(name, state, exe string, args []string, maxTime int64) {
omppLog.Log(state, ": ", name)
// create server start or stop file to signal server state
sf := createCompStateFile(name, state)
if sf == "" {
omppLog.Log("FAILED to create state file: ", state, " ", name)
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(maxTime)*time.Millisecond)
defer cancel()
// make a command, run it and return combined output
out, err := exec.CommandContext(ctx, exe, args...).CombinedOutput()
if len(out) > 0 {
omppLog.Log(string(out))
}
// create or delete server ready file and delete state file
isOk := false
if err == nil {
readyPath := compReadyPath(name)
if state == "start" {
isOk = fileCreateEmpty(false, readyPath)
if !isOk {
omppLog.Log("FAILED to create server ready file: ", readyPath)
}
} else {
isOk = fileDeleteAndLog(false, readyPath)
if !isOk {
omppLog.Log("FAILED to delete server ready file: ", readyPath)
}
}
if isOk {
okStart := deleteCompStateFiles(name, "start")
okStop := deleteCompStateFiles(name, "stop")
okErr := deleteCompStateFiles(name, "error")
isOk = okStart && okStop && okErr
}
if isOk {
omppLog.Log("Done: ", state, " ", name)
}
} else {
if createCompStateFile(name, "error") == "" {
omppLog.Log("FAILED to create error state file: ", name)
}
if ctx.Err() == context.DeadlineExceeded {
omppLog.Log("ERROR server timeout: ", state, " ", name)
}
omppLog.Log("Error: ", err)
omppLog.Log("FAILED: ", state, " ", name)
}
// remove this server from startup or shutdown list
if state == "start" {
theRunCatalog.startupCompleted(isOk, name)
} else {
theRunCatalog.shutdownCompleted(isOk, name)
}
}
// start computational server, special case: if server always ready then only create ready state file
func doStartOnceCompute(name string) {
omppLog.Log("Start: ", name)
readyPath := compReadyPath(name)
isOk := fileCreateEmpty(false, readyPath)
if !isOk {
omppLog.Log("FAILED to create server ready file: ", readyPath)
}
if isOk {
okStart := deleteCompStateFiles(name, "start")
okStop := deleteCompStateFiles(name, "stop")
okErr := deleteCompStateFiles(name, "error")
isOk = okStart && okStop && okErr
}
if isOk {
omppLog.Log("Done start of: ", name)
}
}
// stop computational server, special case: if server never stop then only cleanup state files
func doStopCleanupCompute(name string) {
deleteCompStateFiles(name, "start")
deleteCompStateFiles(name, "stop")
deleteCompStateFiles(name, "error")
}