-
Notifications
You must be signed in to change notification settings - Fork 57
/
slurm.py
244 lines (206 loc) · 8.11 KB
/
slurm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
'''
This module contains functionality related to integration with the SLURM HPC
resource manger.
'''
import datetime
import logging
import re
import time
import sciluigi.parameter
import sciluigi.task
import subprocess as sub
# ================================================================================
# Setup logging
log = logging.getLogger('sciluigi-interface')
# A few 'constants'
RUNMODE_LOCAL = 'runmode_local'
RUNMODE_HPC = 'runmode_hpc'
RUNMODE_MPI = 'runmode_mpi'
# ================================================================================
class SlurmInfo():
'''
A data object for keeping slurm run parameters.
'''
runmode = None # One of RUNMODE_LOCAL|RUNMODE_HPC|RUNMODE_MPI
project = None
partition = None
cores = None
time = None
jobname = None
threads = None
def __init__(self, runmode, project, partition, cores, time, jobname, threads):
'''
Init a SlurmInfo object, from string data.
Time is on format: [[[d-]HH:]MM:]SS
'''
self.runmode = runmode
self.project = project
self.partition = partition
self.cores = cores
self.time = time
self.jobname = jobname
self.threads = threads
def __str__(self):
'''
Return a readable string representation of the info stored
'''
strrepr = ('(time: {t}, '
'partition: {pt}, '
'cores: {c}, '
'threads: {thr}, '
'jobname: {j}, '
'project: {pr})').format(
t=self.time,
pt=self.partition,
c=self.cores,
thr=self.threads,
j=self.jobname,
pr=self.project)
return strrepr
def get_argstr_hpc(self):
'''
Return a formatted string with arguments and option flags to SLURM
commands such as salloc and sbatch, for non-MPI, HPC jobs.
'''
argstr = ' -A {pr} -p {pt} -n {c} -t {t} -J {j} srun -n 1 -c {thr} '.format(
pr=self.project,
pt=self.partition,
c=self.cores,
t=self.time,
j=self.jobname,
thr=self.threads)
return argstr
def get_argstr_mpi(self):
'''
Return a formatted string with arguments and option flags to SLURM
commands such as salloc and sbatch, for MPI jobs.
'''
argstr = ' -A {pr} -p {pt} -n {c1} -t {t} -J {j} mpirun -v -np {c2} '.format(
pr=self.project,
pt=self.partition,
c1=self.cores,
t=self.time,
j=self.jobname,
c2=self.cores)
return argstr
# ================================================================================
class SlurmInfoParameter(sciluigi.parameter.Parameter):
'''
A specialized luigi parameter, taking SlurmInfo objects.
'''
def parse(self, x):
if isinstance(x, SlurmInfo):
return x
else:
raise Exception('Parameter is not instance of SlurmInfo: %s' % x)
# ================================================================================
class SlurmHelpers():
'''
Mixin with various convenience methods for executing jobs via SLURM
'''
# Other class-fields
slurminfo = SlurmInfoParameter(default=None) # Class: SlurmInfo
# Main Execution methods
def ex(self, command):
'''
Execute either locally or via SLURM, depending on config
'''
if isinstance(command, list):
command = ' '.join(command)
if self.slurminfo.runmode == RUNMODE_LOCAL:
log.info('Executing command in local mode: %s', command)
self.ex_local(command) # Defined in task.py
elif self.slurminfo.runmode == RUNMODE_HPC:
log.info('Executing command in HPC mode: %s', command)
self.ex_hpc(command)
elif self.slurminfo.runmode == RUNMODE_MPI:
log.info('Executing command in MPI mode: %s', command)
self.ex_mpi(command)
def ex_hpc(self, command):
'''
Execute command in HPC mode
'''
if isinstance(command, list):
command = sub.list2cmdline(command)
fullcommand = 'salloc %s %s' % (self.slurminfo.get_argstr_hpc(), command)
(retcode, stdout, stderr) = self.ex_local(fullcommand)
self.log_slurm_info(stderr)
return (retcode, stdout, stderr)
def ex_mpi(self, command):
'''
Execute command in HPC mode with MPI support (multi-node, message passing interface).
'''
if isinstance(command, list):
command = sub.list2cmdline(command)
fullcommand = 'salloc %s %s' % (self.slurminfo.get_argstr_mpi(), command)
(retcode, stdout, stderr) = self.ex_local(fullcommand)
self.log_slurm_info(stderr)
return (retcode, stdout, stderr)
# Various convenience methods
def assert_matches_character_class(self, char_class, a_string):
'''
Helper method, that tests whether a string matches a regex character class.
'''
if not bool(re.match('^{c}+$'.format(c=char_class), a_string)):
raise Exception('String {s} does not match character class {cc}'.format(
s=a_string, cc=char_class))
def clean_filename(self, filename):
'''
Clean up a string to make it suitable for use as file name.
'''
return re.sub('[^A-Za-z0-9\_\ ]', '_', str(filename)).replace(' ', '_')
#def get_task_config(self, name):
# return luigi.configuration.get_config().get(self.task_family, name)
def log_slurm_info(self, slurm_stderr):
'''
Parse information of the following example form:
salloc: Granted job allocation 5836263
srun: Job step created
salloc: Relinquishing job allocation 5836263
salloc: Job allocation 5836263 has been revoked.
'''
matches = re.search('[0-9]+', slurm_stderr)
if matches:
jobid = matches.group(0)
# Write slurm execution time to audit log
cmd = 'sacct -j {jobid} --noheader --format=elapsed'.format(jobid=jobid)
(_, jobinfo_stdout, _) = self.ex_local(cmd)
sacct_matches = re.findall('([0-9\:\-]+)', jobinfo_stdout)
if len(sacct_matches) < 2:
log.warn('Not enough matches from sacct for task %s: %s',
self.instance_name, ', '.join(['Match: %s' % m for m in sacct_matches])
)
else:
slurm_exectime_fmted = sacct_matches[1]
# Date format needs to be handled differently if the days field is included
if '-' in slurm_exectime_fmted:
tobj = time.strptime(slurm_exectime_fmted, '%d-%H:%M:%S')
self.slurm_exectime_sec = int(datetime.timedelta(
tobj.tm_mday,
tobj.tm_sec,
0,
0,
tobj.tm_min,
tobj.tm_hour).total_seconds())
else:
tobj = time.strptime(slurm_exectime_fmted, '%H:%M:%S')
self.slurm_exectime_sec = int(datetime.timedelta(
0,
tobj.tm_sec,
0,
0,
tobj.tm_min,
tobj.tm_hour).total_seconds())
log.info('Slurm execution time for task %s was %ss',
self.instance_name,
self.slurm_exectime_sec)
self.add_auditinfo('slurm_exectime_sec', int(self.slurm_exectime_sec))
# Write this last, so as to get the main task exectime and slurm exectime together in
# audit log later
self.add_auditinfo('slurm_jobid', jobid)
# ================================================================================
class SlurmTask(SlurmHelpers, sciluigi.task.Task):
'''
luigi task that includes the SlurmHelpers mixin.
'''
pass