/
runCatalog.go
413 lines (365 loc) · 16.6 KB
/
runCatalog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
// Copyright (c) 2016 OpenM++
// This code is licensed under the MIT license (see LICENSE.txt for details)
package main
import (
"container/list"
"os"
"path/filepath"
"sync"
"time"
"github.com/openmpp/go/ompp/helper"
)
// RunCatalog is a most recent state of model run for each model.
type RunCatalog struct {
rscLock sync.Mutex // mutex to lock for model list operations
models map[string]modelRunBasic // map model digest to basic info to run the model and manage log files
etcDir string // model run templates directory, if relative then must be relative to oms root directory
runTemplates []string // list of model run templates
mpiTemplates []string // list of model MPI run templates
presets []RunOptionsPreset // list of preset run options
runLst *list.List // list of model runs state (runStateLog)
modelRuns map[string]map[string]RunState // map each model digest to run stamps to run state and run log path
JobServiceState // jobs service state: paused, resources usage and limits
queueKeys []string // run submission stamps of model runs waiting in the queue
queueJobs map[string]queueJobFile // model run jobs waiting in the queue
activeJobs map[string]runJobFile // active (currently running) model run jobs
historyJobs map[string]historyJobFile // models run jobs history
selectedKeys []string // jobs selected from queue to run now
computeState map[string]computeItem // map names of server or cluster the state of computational resources
startupNames []string // names of the servers which are starting now
shutdownNames []string // names of the servers which are stopping now
}
var theRunCatalog RunCatalog // list of most recent state of model run for each model.
// modelRunBasic is basic info to run model and obtain model logs
type modelRunBasic struct {
name string // model name
binDir string // database and .exe directory: directory part of models/bin/model.sqlite
logDir string // model log directory
isLogDir bool // if true then use model log directory for model run logs
}
// RunCatalogConfig is "public" state of model run catalog for json import-export
type RunCatalogConfig struct {
RunTemplates []string // list of model run templates
DefaultMpiTemplate string // default template to run MPI model
MpiTemplates []string // list of model MPI run templates
Presets []RunOptionsPreset // list of preset run options
}
// RunOptionsPreset is "public" view of model run options preset
type RunOptionsPreset struct {
Name string // name of preset, based on file name
Options string // run options as json stringify
}
// RunRequest is request to run the model with specified model options.
// Log to console always enabled.
// Model run console output redirected to log file: modelName.YYYY_MM_DD_hh_mm_ss_SSS.console.log
type RunRequest struct {
ModelName string // model name to run
ModelDigest string // model digest to run
RunStamp string // run stamp, if empty then auto-generated as timestamp
Dir string // working directory to run the model, if relative then must be relative to oms root directory
Opts map[string]string // model run options
Env map[string]string // environment variables to set
Threads int // number of modelling threads
IsMpi bool // if true then it use MPI to run the model
Mpi struct {
Np int // if non-zero then number of MPI processes
IsNotOnRoot bool // if true then do no run modelling threads on MPI root process
IsNotByJob bool // if true then do not allocate resources by job, use CPU, threads and memory as is
}
Template string // template file name to make run model command line
Tables []string // if not empty then output tables or table groups to retain, by default retain all tables
Microdata struct {
IsToDb bool // if true then store entity microdata in database: -Microdata.ToDb true
IsInternal bool // if true then allow to use internal attributes: -Microdata.UseInternal true
Entity []struct { // list of entities and attributes: -Microdata.Person age,income -Microdata.Other All
Name string // entity name
Attr []string // list of microdata attributes, it is also can be All
}
}
RunNotes []struct {
LangCode string // model language code
Note string // run notes
}
}
// RunJob is model run request and run job control: submission stamp and model process id
type RunJob struct {
SubmitStamp string // submission timestamp
Pid int // process id
CmdPath string // executable path
RunRequest // model run request: model name, digest and run options
Res RunRes // model run resources: CPU cores and memory
IsOverLimit bool // if true then model run resource(s) exceed limit(s)
QueuePos int // one-based position in global queue
LogFileName string // log file name
LogPath string // log file path: log/dir/modelName.RunStamp.console.log
}
// RunRes is model run computational resources
type RunRes struct {
Cpu int // cpu cores count
Mem int // if not zero then memory size in gigabytes
}
// run job control file info
type runJobFile struct {
filePath string // job control file path
isError bool // if true then ignore that file due to error
RunJob // job control file content
}
// run job control file info
type queueJobFile struct {
runJobFile
preRes RunRes // model run resources required for queue jobs before this job
position int // part of file name: queue position
}
// job control file info for history job: parts of file name
type historyJobFile struct {
filePath string // job control file path
isError bool // if true then ignore that file due to error
SubmitStamp string // submission timestamp
ModelName string // model name
ModelDigest string // model digest
RunStamp string // run stamp, if empty then auto-generated as timestamp
JobStatus string // run status
RunTitle string // model run title: run name, task run name or workset name
}
// RunState is model run state.
// Model run console output redirected to log file: modelName.YYYY_MM_DD_hh_mm_ss_SSS.console.log
type RunState struct {
ModelName string // model name
ModelDigest string // model digest
RunStamp string // model run stamp, may be auto-generated as timestamp
SubmitStamp string // submission timestamp
IsFinal bool // final state, model completed
UpdateDateTime string // last update date-time
RunName string // if not empty then run name
TaskRunName string // if not empty then task run name
IsLog bool // if true then use run log file
LogFileName string // log file name
logPath string // log file path: log/dir/modelName.RunStamp.console.log
isHistory bool // if true then it is model run history or run done outside of oms service
pid int // process id
cmdPath string // executable path
killC chan bool // channel to kill model process
}
// runStateLog is model run state and log file lines.
type runStateLog struct {
RunState // model run state
logLineLst []string // model run log lines
}
// RunStateLogPage is run model status and page of the log lines.
type RunStateLogPage struct {
RunState // model run state
Offset int // log page start line
Size int // log page size
TotalSize int // log total run line count
Lines []string // page of log lines
}
// service state and job control state
type JobServiceState struct {
IsQueuePaused bool // if true then jobs queue is paused, jobs are not selected from queue
JobUpdateDateTime string // last date-time jobs list updated
MpiRes RunRes // MPI total available resources available (CPU cores and memory) as sum of all servers or localhost resources
ActiveTotalRes RunRes // MPI active model run resources (CPU cores and memory) used by all oms instances
ActiveOwnRes RunRes // MPI active model run resources (CPU cores and memory) used by this oms instance
QueueTotalRes RunRes // MPI queue model run resources (CPU cores and memory) requested by all oms instances
QueueOwnRes RunRes // MPI queue model run resources (CPU cores and memory) requested by this oms instance
topQueueRes RunRes // resources required for MPI queue first job
MpiErrorRes RunRes // MPI computational resources on "error" servers
LocalRes RunRes // localhost non-MPI jobs total resources limits
LocalActiveRes RunRes // localhost non-MPI jobs resources used by this instance to run models
LocalQueueRes RunRes // localhost non-MPI jobs queue resources for this oms instance
isLeader bool // if true then this oms instance is a leader
maxStartTime int64 // max time in milliseconds to start compute server or cluster
maxStopTime int64 // max time in milliseconds to stop compute server or cluster
maxIdleTime int64 // max idle in milliseconds time before stopping server or cluster
lastStartStopTs int64 // last time when start or stop of computational servers done
maxComputeErrors int // errors threshold for compute server or cluster
jobLastPosition int // last job position in the queue
jobFirstPosition int // minimal job position in the queue
hostFile hostIni // MPI jobs hostfile settings
}
// computational server or cluster state
type computeItem struct {
name string // name of server or cluster
state string // state: start, stop, ready, error, empty "" means power off
totalRes RunRes // total computational resources (CPU cores and memory)
usedRes RunRes // resources (CPU cores and memory) used by all oms instances
ownRes RunRes // resources (CPU cores and memory) used by this instance
errorCount int // number of incomplete starts, stops and errors
lastUsedTs int64 // last activity time (unix milliseconds): server start, stop or used
startExe string // name of executable to start server, e.g.: /bin/sh
startArgs []string // arguments to start server, e.g.: -c start.sh my-server-name
stopExe string // name of executable to stop server,, e.g.: /bin/sh
stopArgs []string // arguments to stop server, e.g.: -c stop.sh my-server-name
}
// computational server or cluster usage
type computeUse struct {
name string // name of server or cluster
RunRes // used computational resources (CPU cores and memory)
filePath string // if not empty then compute use file path
}
// MPI jobs process, threads and hostfile config from job.ini file
type hostIni struct {
maxThreads int // max number of modelling threads per MPI process, zero means unlimited
isUse bool // if true then create and use hostfile to run MPI jobs
dir string // HostFileDir = models/log
hostName string // HostName = @-HOST-@
cpuCores string // CpuCores = @-CORES-@
rootLine string // RootLine = cpm slots=1 max_slots=1
hostLine string // HostLine = @-HOST-@ slots=@-CORES-@
}
// job control state
type jobControlState struct {
Queue []string // jobs queue
}
// timeout in msec, wait on stdout and stderr polling.
const logTickTimeout = 7
// file name of MPI model run template by default
const defaultMpiTemplate = "mpi.ModelRun.template.txt"
// RefreshCatalog reset state of most recent model run for each model.
func (rsc *RunCatalog) refreshCatalog(etcDir string, jsc *jobControlState) error {
// get list of template files
rsc.runTemplates = []string{}
rsc.mpiTemplates = []string{}
if dirExist(etcDir) {
if fl, err := filepath.Glob(etcDir + "/" + "run.*.template.txt"); err == nil {
for k := range fl {
f := filepath.Base(fl[k])
if f != "." && f != ".." && f != "/" && f != "\\" {
rsc.runTemplates = append(rsc.runTemplates, f)
}
}
}
if fl, err := filepath.Glob(etcDir + "/" + "mpi.*.template.txt"); err == nil {
for k := range fl {
f := filepath.Base(fl[k])
if f != "." && f != ".." && f != "/" && f != "\\" {
rsc.mpiTemplates = append(rsc.mpiTemplates, f)
}
}
}
}
// read all run options preset files
// keep stem of preset file name: run-options.RiskPaths.1-small.json => RiskPaths.1-small
// and file content as string
rsc.presets = []RunOptionsPreset{}
if dirExist(etcDir) {
if fl, err := filepath.Glob(etcDir + "/" + "run-options.*.json"); err == nil {
for k := range fl {
f := filepath.Base(fl[k])
if len(f) < len("run-options.*.json") { // file name must be at least that size
continue
}
bt, err := os.ReadFile(fl[k]) // read entire file
if err != nil {
continue // skip on errors
}
rsc.presets = append(rsc.presets,
RunOptionsPreset{
Name: f[len("run-options.") : len(f)-(len(".json"))], // stem of the file: skip prefix and suffix
Options: string(bt),
})
}
}
}
// make all models basic info: name, digest and files location
mbs := theCatalog.allModels()
rbs := make(map[string]modelRunBasic, len(mbs))
for idx := range mbs {
rbs[mbs[idx].digest] = modelRunBasic{
name: mbs[idx].name,
binDir: mbs[idx].binDir,
logDir: mbs[idx].logDir,
isLogDir: mbs[idx].isLogDir,
}
}
// lock and update run state catalog
rsc.rscLock.Lock()
defer rsc.rscLock.Unlock()
// update etc directory and list of templates
rsc.etcDir = etcDir
// copy existing models run history
rLst := list.New()
if rsc.runLst != nil {
for re := rsc.runLst.Front(); re != nil; re = re.Next() {
rs, ok := re.Value.(*runStateLog) // model run state expected
if !ok || rs == nil {
continue
}
if _, ok = rbs[rs.ModelDigest]; ok { // copy existing run history
rLst.PushBack(rs)
}
}
}
rsc.runLst = rLst
// model log history: add new models and delete existing models
if rsc.modelRuns == nil {
rsc.modelRuns = map[string]map[string]RunState{}
}
// if model deleted then delete model logs history
for d := range rsc.modelRuns {
if _, ok := rbs[d]; !ok {
delete(rsc.modelRuns, d)
}
}
// if new model added then add new empty logs history
for d := range rbs {
if _, ok := rsc.modelRuns[d]; !ok {
rsc.modelRuns[d] = map[string]RunState{}
}
}
rsc.models = rbs
// cleanup jobs control state
rsc.IsQueuePaused = true // pause jobs queue until jobs state updated current files
rsc.JobUpdateDateTime = helper.MakeDateTime(time.Now())
rsc.queueKeys = []string{}
rsc.activeJobs = map[string]runJobFile{}
rsc.queueJobs = map[string]queueJobFile{}
rsc.historyJobs = make(map[string]historyJobFile, theCfg.runHistoryMaxSize)
rsc.computeState = map[string]computeItem{}
rsc.jobLastPosition = jobPositionDefault + 1
rsc.jobFirstPosition = jobPositionDefault - 1
if rsc.maxComputeErrors <= 1 {
rsc.maxComputeErrors = maxComputeErrorsDefault
}
if rsc.selectedKeys == nil {
rsc.selectedKeys = []string{}
}
if rsc.startupNames == nil {
rsc.startupNames = []string{}
}
if rsc.shutdownNames == nil {
rsc.shutdownNames = []string{}
}
if jsc != nil {
if len(jsc.Queue) > 0 {
rsc.queueKeys = append(rsc.queueKeys, jsc.Queue...)
}
}
return nil
}
// get "public" configuration of model run catalog
func (rsc *RunCatalog) toPublicConfig() *RunCatalogConfig {
// lock run catalog and return results
rsc.rscLock.Lock()
defer rsc.rscLock.Unlock()
rcp := RunCatalogConfig{
RunTemplates: make([]string, len(rsc.runTemplates)),
DefaultMpiTemplate: defaultMpiTemplate,
MpiTemplates: make([]string, len(rsc.mpiTemplates)),
Presets: make([]RunOptionsPreset, len(rsc.presets)),
}
copy(rcp.RunTemplates, rsc.runTemplates)
copy(rcp.MpiTemplates, rsc.mpiTemplates)
copy(rcp.Presets, rsc.presets)
return &rcp
}
// allModels return basic info from catalog about all models.
func (rsc *RunCatalog) allModels() map[string]modelRunBasic {
rsc.rscLock.Lock()
defer rsc.rscLock.Unlock()
rbs := make(map[string]modelRunBasic, len(rsc.models))
for key, val := range rsc.models {
rbs[key] = val
}
return rbs
}