/
job_sbatch.py
106 lines (93 loc) · 3.13 KB
/
job_sbatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# -*- coding: utf-8 -*-
u"""Operations run inside the report directory to extract data.
:copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""
from __future__ import absolute_import, division, print_function
from pykern import pkcollections
from pykern import pkio
from pykern import pkjson
from pykern import pksubprocess
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdp, pkdexc, pkdc
import sirepo.template
from sirepo import job, simulation_db
import time
import subprocess
import sirepo.template
import re
def default_command(in_file):
"""Reads `in_file` passes to `msg.jobProcessCmd`
Must be called in run_dir
Writes its output on stdout.
Args:
in_file (str): json parsed to msg
Returns:
str: json output of command, e.g. status msg
"""
f = pkio.py_path(in_file)
msg = pkjson.load_any(f)
msg.runDir = pkio.py_path(msg.runDir) # TODO(e-carlin): find common place to serialize/deserialize paths
f.remove()
return pkjson.dump_pretty(
PKDict(globals()['_do_' + msg.jobProcessCmd](
msg,
sirepo.template.import_module(msg.simulationType),
)).pkupdate(opDone=True),
pretty=False,
)
def _do_compute(msg, template):
def wait_for_job_completion(job_id):
s = 'pending'
while s in ('running', 'pending'):
o = subprocess.check_output(
('scontrol', 'show', 'job', job_id)
).decode('utf-8')
r = re.search(r'(?<=JobState=)(.*)(?= Reason)', o)
assert r, 'output={}'.format(s)
s = r.group().lower()
time.sleep(2) # TODO(e-carlin): cfg
assert s == 'completed', 'output={}'.format(o)
msg.runDir = pkio.py_path(msg.runDir)
with pkio.save_chdir('/'):
pkio.unchecked_remove(msg.runDir)
pkio.mkdir_parent(msg.runDir)
msg.simulationStatus = PKDict(
computeJobStart=int(time.time()),
state=job.RUNNING,
)
try:
a = _get_sbatch_script(
simulation_db.prepare_simulation(
msg.data,
run_dir=msg.runDir
)[0])
with open('slurmscript', 'w') as x:
x.write(a)
o, e = subprocess.Popen(
('sbatch'),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
).communicate(
input=a
)
assert e == '', 'error={}'.format(e)
r = re.search(r'\d+$', o)
assert r is not None, 'output={} did not cotain job id'.format(o)
wait_for_job_completion(r.group())
# TODO(e-carlin): parallel status
except Exception as e:
pkdc(pkdexc())
return PKDict(state=job.ERROR, error=str(e))
return PKDict(state=job.COMPLETED)
def _get_sbatch_script(cmd):
# TODO(e-carlin): configure the SBATCH* parameters
return'''#!/bin/bash
#SBATCH --partition=compute
#SBATCH --ntasks=1
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=4
#SBATCH --mem-per-cpu=128M
{}
'''.format(' '.join(cmd)) # TODO(e-carlin): quote?