-
Notifications
You must be signed in to change notification settings - Fork 1
/
runModel.go
740 lines (638 loc) · 22.9 KB
/
runModel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
// Copyright (c) 2016 OpenM++
// This code is licensed under the MIT license (see LICENSE.txt for details)
package main
import (
"bufio"
"errors"
"io"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"text/template"
"time"
"github.com/openmpp/go/ompp/db"
"github.com/openmpp/go/ompp/helper"
"github.com/openmpp/go/ompp/omppLog"
)
// runModel starts new model run and return run stamp.
// if run stamp not specified as input parameter then use unique timestamp.
// Model run console output redirected to log file: models/log/modelName.runStamp.console.log
func (rsc *RunCatalog) runModel(job *RunJob, queueJobPath string, hfCfg hostIni, compUse []computeUse) (*RunState, error) {
// make model process run stamp, if not specified then use timestamp by default
ts, tNow := theCatalog.getNewTimeStamp()
rStamp := helper.CleanPath(job.RunStamp)
if rStamp == "" {
rStamp = ts
}
// new run state
rs := &RunState{
ModelName: job.ModelName,
ModelDigest: job.ModelDigest,
RunStamp: rStamp,
SubmitStamp: job.SubmitStamp,
UpdateDateTime: helper.MakeDateTime(tNow),
}
// set directories: work directory and bin model.exe directory
// if bin directory is relative then it must be relative to oms root directory
// re-base it to model work directory
binRoot, _ := theCatalog.getModelDir()
mb, ok := theCatalog.modelBasicByDigestOrName(rs.ModelDigest)
if !ok {
err := errors.New("Model not found: " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log("Model run error: ", err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: model failed to start
}
binDir := mb.binDir
wDir := binDir
if job.Dir != "" {
wDir = filepath.Join(binRoot, job.Dir)
}
binDir, err := filepath.Rel(wDir, binDir)
if err != nil {
binDir = binRoot
}
// make file path for model console log output
if mb.isLogDir {
if mb.logDir == "" {
mb.logDir, mb.isLogDir = theCatalog.getModelLogDir()
}
}
if mb.isLogDir {
if mb.logDir == "" {
mb.logDir = binDir
}
rs.IsLog = mb.isLogDir
rs.LogFileName = rs.ModelName + "." + rStamp + ".console.log"
rs.logPath = filepath.Join(mb.logDir, rs.LogFileName)
}
// make model run command line arguments, starting from process run stamp and log options
mArgs := []string{}
mArgs = append(mArgs, "-OpenM.RunStamp", rStamp)
mArgs = append(mArgs, "-OpenM.LogToConsole", "true")
mArgs = append(mArgs, "-OpenM.LogToFile", "false")
importDbLcDot := strings.ToLower("-ImportDb.")
microdataLcDot := strings.ToLower("-microdata.")
dotRunDescrLc := strings.ToLower(".RunDescription")
entAttrs := theCatalog.entityAttrsByDigest(rs.ModelDigest)
descrNotes := []db.DescrNote{}
// append model run options from run request
for krq, val := range job.Opts {
if len(krq) < 1 { // skip empty run options
continue
}
// command line argument key starts with "-" ex: "-OpenM.Threads"
key := krq
if krq[0] != '-' {
key = "-" + krq
}
// save run name and task run name to return as part of run state
if strings.EqualFold(key, "-OpenM.RunName") {
rs.RunName = val
}
if strings.EqualFold(key, "-OpenM.TaskRunName") {
rs.TaskRunName = val
}
// thread count MUST be specified using request Threads
if strings.EqualFold(key, "-OpenM.Threads") {
continue // skip number of threads option: use request Threads value instead
}
// MPI "not on root" flag
if strings.EqualFold(key, "-OpenM.NotOnRoot") {
continue // skip MPI "not on root" flag: use request Mpi.IsNotOnRoot boolean instead
}
if strings.EqualFold(key, "-OpenM.LogToConsole") {
continue // skip log to console input run option: it is already on
}
if strings.EqualFold(key, "-OpenM.LogToFile") {
continue // skip log to file input run option: replaced by console output
}
if strings.EqualFold(key, "-OpenM.LogFilePath") {
continue // skip log file path input run option: replaced by console output
}
if strings.EqualFold(key, "-OpenM.Database") {
continue // database connection string not allowed as run option
}
if strings.HasPrefix(strings.ToLower(key), importDbLcDot) {
continue // import database connection string not allowed as run option
}
if strings.HasSuffix(strings.ToLower(key), dotRunDescrLc) {
if 1+len(dotRunDescrLc) >= len(key) {
err = errors.New("Model run error: invalid run description key: " + key)
omppLog.Log(err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: microdata not allowed
}
lc := key[1:(len(key) - len(dotRunDescrLc))] // language code
idx := -1
for k := range descrNotes {
if descrNotes[k].LangCode == lc {
idx = k
break
}
}
if idx < 0 {
idx = len(descrNotes)
descrNotes = append(descrNotes, db.DescrNote{LangCode: lc})
}
descrNotes[idx].Descr = helper.QuoteForIni(val)
continue // use ini-file for run description
}
// if this is microdata run option then microdata must be enabled
// do not allow microdata options which are part of Microdata run request:
// -Microdata.ToDb -Microdata.UseInternal
// -Microdata.All -Microdata.anyEntityName
if strings.HasPrefix(strings.ToLower(key), microdataLcDot) {
if !theCfg.isMicrodata {
err = errors.New("Model run error: microdata not allowed: " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log(err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: microdata not allowed
}
subKey := key[len(microdataLcDot):]
if strings.EqualFold(subKey, "All") || strings.EqualFold(subKey, "ToDb") || strings.EqualFold(subKey, "UseInternal") {
err = errors.New("Model run error: incorrect use of run option: " + key + ": " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log(err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: incorrect microdata option
}
for k := range entAttrs {
if subKey == entAttrs[k].Name {
err = errors.New("Model run error: incorrect use of run option: " + key + ": " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log(err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: incorrect microdata option
}
}
}
mArgs = append(mArgs, key, val) // append command line argument key and value
}
// use job control resources if not explicitly disabled and create hostfile
hfPath := ""
if job.IsMpi && !job.Mpi.IsNotByJob {
hfPath, err = createHostFile(job, hfCfg, compUse)
if err != nil {
omppLog.Log("Model run error: ", err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err
}
}
// append threads number if required
if job.Res.ThreadCount > 1 {
mArgs = append(mArgs, "-OpenM.Threads", strconv.Itoa(job.Res.ThreadCount))
}
if job.IsMpi && job.Mpi.IsNotOnRoot {
mArgs = append(mArgs, "-OpenM.NotOnRoot")
}
// if list of tables to retain is not empty then put the list into ini-file:
//
// [Tables]
// Retain = ageSexIncome, AdditionalTables
//
// if list of tables to retain is not empty then put the list into ini-file:
//
// [Microdata]
// ToDb = true
// UseInternal = true
// Person = age,income
// Other = All
//
iniContent := ""
// append tables to retain to ini file content
if len(job.Tables) > 0 {
iniContent += "[Tables]" + "\n" + "Retain = " + strings.Join(job.Tables, ", ") + "\n"
}
// append microdata run options to ini file content
if theCfg.isMicrodata && len(entAttrs) > 0 && job.Microdata.IsToDb && len(job.Microdata.Entity) > 0 {
iniContent += "[Microdata]" + "\n" + "ToDb = true\n"
if job.Microdata.IsInternal {
iniContent += "UseInternal = true\n"
}
// for each entity check if All attributes included or attributes must be specified as comma separated list
for k := range job.Microdata.Entity {
// find entity name in the list of model entities
eIdx := -1
for j := range entAttrs {
if entAttrs[j].Name == job.Microdata.Entity[k].Name {
eIdx = j
break
}
}
if eIdx < 0 || eIdx >= len(entAttrs) {
err = errors.New("Model run error: invalid microdata entity: " + job.Microdata.Entity[k].Name + ": " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log(err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: microdata entity name not found
}
// check if all entity attributes included in run microdata
na := len(job.Microdata.Entity[k].Attr)
isAll := na == 1 && job.Microdata.Entity[k].Attr[0] == "All"
if !isAll {
attrs := make([]string, na)
copy(attrs, job.Microdata.Entity[k].Attr)
sort.Strings(attrs)
for j := range entAttrs[eIdx].Attr {
if !job.Microdata.IsInternal && entAttrs[eIdx].Attr[j].IsInternal {
continue // skip: this model run does not using internal attributes
}
n := sort.SearchStrings(attrs, entAttrs[eIdx].Attr[j].Name)
isAll = n >= 0 && n < na && attrs[n] == entAttrs[eIdx].Attr[j].Name
if !isAll {
break
}
}
}
// append entity attributes to ini file content: EntityName = All or EntityName = AttrA, AttrB
if isAll {
iniContent += job.Microdata.Entity[k].Name + " = All\n"
} else {
iniContent += job.Microdata.Entity[k].Name + " = " + strings.Join(job.Microdata.Entity[k].Attr, ",") + "\n"
}
}
}
// if run description or notes specified then use ini-file:
//
// [EN]
// RunDescription = "model run #7 with 50,000 cases"
// RunNotesPath = run_notes-in-english.md
//
// save run notes into the file(s) and append that file path to the ini-file content
for _, rn := range job.RunNotes {
if rn.Note == "" {
continue
}
if !rs.IsLog {
e := errors.New("Unable to save run notes: " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log("Model run error: ", e)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, e
}
p, e := filepath.Abs(filepath.Join(mb.logDir, rStamp+".run_notes."+rn.LangCode+".md"))
if e == nil {
e = os.WriteFile(p, []byte(rn.Note), 0644)
}
awd := ""
if e == nil {
if awd, e = filepath.Abs(wDir); e == nil {
p, e = filepath.Rel(awd, p)
}
}
if e != nil {
omppLog.Log("Model run error: ", e)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, e
}
// store path to notes .md file instead of actual notes
dnIdx := -1
for k := range descrNotes {
if descrNotes[k].LangCode == rn.LangCode {
dnIdx = k
break
}
}
if dnIdx < 0 {
dnIdx = len(descrNotes)
descrNotes = append(descrNotes, db.DescrNote{LangCode: rn.LangCode})
}
descrNotes[dnIdx].Note = p
}
// validate run description and notes language code
if len(descrNotes) > 0 {
langLst, _ := theCatalog.LangListByDigestOrName(rs.ModelDigest)
for k := range descrNotes {
isOk := false
for j := range langLst {
isOk = langLst[j].LangCode == descrNotes[k].LangCode
if isOk {
break
}
}
if !isOk {
err = errors.New("Model run error: invalid language code: " + descrNotes[k].LangCode + ": " + rs.ModelName + ": " + rs.ModelDigest)
omppLog.Log(err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, err // exit with error: language code not found
}
iniContent += "[" + descrNotes[k].LangCode + "]" + "\n"
if descrNotes[k].Descr != "" {
iniContent += "RunDescription = " + descrNotes[k].Descr + "\n"
}
if descrNotes[k].Note != "" {
iniContent += "RunNotesPath = " + descrNotes[k].Note + "\n" // path to notes .md file
}
}
}
// create ini file and append -ini fileName.ini to model run options
if iniContent != "" {
p, e := filepath.Abs(filepath.Join(mb.logDir, rStamp+"."+mb.model.Name+".ini"))
if e == nil {
e = os.WriteFile(p, []byte(iniContent), 0644)
}
awd := ""
if e == nil {
if awd, e = filepath.Abs(wDir); e == nil {
p, e = filepath.Rel(awd, p)
}
}
if e != nil {
omppLog.Log("Model run error: ", e)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, e
}
mArgs = append(mArgs, "-ini", p) // append ini file path to command line arguments
}
// cleanup helpers
delComputeUse := func(cuLst []computeUse) {
for _, cu := range cuLst {
if cu.filePath != "" {
fileDeleteAndLog(false, cu.filePath)
}
}
}
cleanAndReturn := func(e error, rState *RunState, qPath string, cuLst []computeUse) (*RunState, error) {
omppLog.Log("Error at starting model: ", e)
delComputeUse(cuLst)
moveJobQueueToFailed(qPath, rState.SubmitStamp, rState.ModelName, rState.ModelDigest, rState.RunStamp)
rState.IsFinal = true
return rState, errors.New("Error at starting model " + rState.ModelName + ": " + e.Error())
}
// assume model exe name is the same as model name
mExe := helper.CleanPath(rs.ModelName)
cmd, err := rsc.makeCommand(mExe, binDir, wDir, mb.dbPath, mArgs, job.RunRequest, job.Res.ProcessCount, hfPath)
if err != nil {
omppLog.Log("Error at starting model: ", err)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, errors.New("Error at starting model " + rs.ModelName + ": " + err.Error())
}
// create job usage file for each computational server
isErr := false
for k := 0; !isErr && k < len(compUse); k++ {
compUse[k].filePath = compUsedPath(compUse[k].name, rs.SubmitStamp, compUse[k].Cpu, compUse[k].Mem)
isErr = !fileCreateEmpty(false, compUse[k].filePath)
}
if isErr {
omppLog.Log("Error at starting model: ", rs.ModelName, " ", rs.ModelDigest, " ", rs.SubmitStamp)
delComputeUse(compUse)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rs.IsFinal = true
return rs, errors.New("Error at starting model " + rs.ModelName + " " + rs.ModelDigest)
}
// connect console output to log line array
outPipe, err := cmd.StdoutPipe()
if err != nil {
return cleanAndReturn(err, rs, queueJobPath, compUse)
}
errPipe, err := cmd.StderrPipe()
if err != nil {
return cleanAndReturn(err, rs, queueJobPath, compUse)
}
outDoneC := make(chan bool, 1)
errDoneC := make(chan bool, 1)
rs.killC = make(chan bool, 1)
logTck := time.NewTicker(logTickTimeout * time.Millisecond)
// append console output to log lines array
doLog := func(rState *RunState, r io.Reader, done chan<- bool) {
sc := bufio.NewScanner(r)
for sc.Scan() {
rsc.updateRunStateLog(rState, false, sc.Text())
}
done <- true
close(done)
}
// run state initialized: append it to the run state list
// create model run log file
rsc.createRunStateLog(rs)
// start console output listners
go doLog(rs, outPipe, outDoneC)
go doLog(rs, errPipe, errDoneC)
// start the model
omppLog.Log("Run model: ", mExe, " in directory: ", wDir)
if rs.logPath != "" {
omppLog.Log("Run model: ", mExe, " log: ", rs.logPath)
}
omppLog.Log(strings.Join(cmd.Args, " "))
rs.cmdPath = cmd.Path
rsc.updateRunStateProcess(rs, false)
err = cmd.Start()
if err != nil {
omppLog.Log("Model run error: ", err)
delComputeUse(compUse)
moveJobQueueToFailed(queueJobPath, rs.SubmitStamp, rs.ModelName, rs.ModelDigest, rStamp)
rsc.updateRunStateLog(rs, true, err.Error())
rs.IsFinal = true
return rs, err // exit with error: model failed to start
}
// else model started
rs.pid = cmd.Process.Pid
rsc.updateRunStateProcess(rs, false)
// move job file form queue to active
activeJobPath, _ := moveJobToActive(queueJobPath, rs, job.Res, rs.RunStamp)
// wait until run completed or terminated
go func(rState *RunState, cmd *exec.Cmd, jobPath string, cuLst []computeUse) {
// wait until stdout and stderr closed
for outDoneC != nil || errDoneC != nil {
select {
case _, ok := <-outDoneC:
if !ok {
outDoneC = nil
}
case _, ok := <-errDoneC:
if !ok {
errDoneC = nil
}
case isKill, ok := <-rState.killC:
if !ok {
rState.killC = nil
}
if isKill && ok {
omppLog.Log("Kill run: ", rState.ModelName, " ", rState.ModelDigest, " ", rState.RunName, " ", rState.RunStamp)
if e := cmd.Process.Kill(); e != nil {
omppLog.Log(e)
}
}
case <-logTck.C:
}
}
// wait for model run to be completed
e := cmd.Wait()
if e != nil {
omppLog.Log("Model run error: ", e)
delComputeUse(cuLst)
rsc.updateRunStateLog(rState, true, e.Error())
moveActiveJobToHistory(jobPath, db.ErrorRunStatus, rState.SubmitStamp, rState.ModelName, rState.ModelDigest, rState.RunStamp)
_, e = theCatalog.UpdateRunStatus(rState.ModelDigest, rState.RunStamp, db.ErrorRunStatus)
if e != nil {
omppLog.Log(e)
}
return
}
// else: completed OK
rsc.updateRunStateLog(rState, true, "")
delComputeUse(cuLst)
moveActiveJobToHistory(jobPath, db.DoneRunStatus, rState.SubmitStamp, rState.ModelName, rState.ModelDigest, rState.RunStamp)
}(rs, cmd, activeJobPath, compUse)
return rs, nil
}
// makeCommand return command to run the model.
// If template file name specified then template processing results used to create command line.
// If this is MPI model run then tempalate is requred
// MPI run template can be model specific: "mpi.ModelName.template.txt" or default: "mpi.ModelRun.template.txt".
func (rsc *RunCatalog) makeCommand(mExe, binDir, workDir, dbPath string, mArgs []string, req RunRequest, procCount int, hfPath string) (*exec.Cmd, error) {
// check is it MPI model run, to run MPI model template is required
if req.IsMpi && req.Template == "" {
// search for model-specific MPI template
mtn := "mpi." + req.ModelName + ".template.txt"
for _, tn := range theRunCatalog.mpiTemplates {
if tn == mtn {
req.Template = mtn
}
}
// if model-specific MPI template not found then use default MPI template to run the model
if req.Template == "" {
req.Template = defaultMpiTemplate
}
}
isTmpl := req.Template != ""
// if this is regular non-MPI model.exe run and no template:
// ./modelExe -OpenM.LogToFile true ...etc...
var cmd *exec.Cmd
if !isTmpl && !req.IsMpi {
if binDir == "" || binDir == "." || binDir == "./" {
mExe = "./" + mExe
} else {
mExe = filepath.Join(binDir, mExe)
}
cmd = exec.Command(mExe, mArgs...)
}
// if template specified then process template to get exe name and arguments
if isTmpl {
// parse template
tmpl, err := template.ParseFiles(filepath.Join(rsc.etcDir, req.Template))
if err != nil {
return nil, err
}
// set template parameters
wd, err := filepath.Abs(workDir)
if err != nil {
return nil, err
}
np := procCount
if req.IsMpi && req.Mpi.IsNotOnRoot {
np++
}
d := struct {
ModelName string // model name
ExeStem string // base part of model exe name, usually modelName
Dir string // work directory to run the model
BinDir string // bin directory where model.exe is located
DbPath string // path to sqlite database file: models/bin/model.sqlite
MpiNp int // number of MPI processes
HostFile string // if not empty then absolute path to hostfile
Args []string // model command line arguments
Env map[string]string // environment variables to run the model
}{
ModelName: req.ModelName,
ExeStem: mExe,
Dir: wd,
BinDir: binDir,
DbPath: dbPath,
MpiNp: np,
HostFile: hfPath,
Args: mArgs,
Env: req.Env,
}
// execute template and convert results in array of text lines
var b strings.Builder
err = tmpl.Execute(&b, d)
if err != nil {
return nil, err
}
tLines := strings.Split(strings.ReplaceAll(b.String(), "\r", "\n"), "\n")
// from template processing results get:
// exe name as first non-empty line
// use all other non-empty lines as command line arguments
cExe := ""
cArgs := []string{}
for k := range tLines {
cl := strings.TrimSpace(tLines[k])
if cl == "" {
continue
}
if cExe == "" {
cExe = cl
} else {
cArgs = append(cArgs, cl)
}
}
if cExe == "" {
return nil, errors.New("Error: empty template processing results, cannot run the model: " + req.ModelName)
}
// make command
cmd = exec.Command(cExe, cArgs...)
}
// if this is not MPI run then:
// set work directory
// append request environment variables to model environment
if !req.IsMpi {
cmd.Dir = workDir
if len(req.Env) > 0 {
env := os.Environ()
for key, val := range req.Env {
if key != "" && val != "" {
env = append(env, key+"="+val)
}
}
cmd.Env = env
}
}
return cmd, nil
}
// RtopModelRun kill model run by run stamp
// or remove run request from the queue by submit stamp or by run stamp.
// Return submission stamp, job file path and two flags: if model run found and if model is runniing now
func (rsc *RunCatalog) stopModelRun(modelDigest string, stamp string) (bool, string, string, bool) {
tNow := time.Now()
rsc.rscLock.Lock()
defer rsc.rscLock.Unlock()
// find model run state by digest and run stamp
rsl := rsc.findRunStateLog(modelDigest, stamp)
if rsl == nil { // if model run stamp and submit stamp not found then check if there is a job file in the queue
if qj, ok := rsc.queueJobs[stamp]; ok {
return true, stamp, qj.filePath, false // job file found in the queue
}
return false, "", "", false // no model run stamp and no submit stamp found
}
// find model in the active job list or if not active then find it in job queue
jobPath := ""
if aj, ok := rsc.activeJobs[rsl.SubmitStamp]; ok {
jobPath = aj.filePath
} else {
if qj, ok := rsc.queueJobs[rsl.SubmitStamp]; ok {
jobPath = qj.filePath
}
}
rsl.UpdateDateTime = helper.MakeDateTime(tNow)
// kill model run if model is running
if rsl.killC != nil {
rsl.killC <- true
return true, rsl.SubmitStamp, jobPath, true
}
// else remove request from the queue
rsl.IsFinal = true
return true, rsl.SubmitStamp, jobPath, false
}