-
Notifications
You must be signed in to change notification settings - Fork 3
/
lsf.py
321 lines (260 loc) · 10 KB
/
lsf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import os
import re
import math
from string import Template
from datetime import timedelta
from subprocess import Popen, PIPE, check_output
from .mycluster import get_data
from .mycluster import load_template
"""
bjobs -u all -q emerald
bqueues -l emerald
"""
def scheduler_type():
return 'lsf'
def name():
lsid_output = check_output(['lsid']).splitlines()
for line in lsid_output:
if line.startswith('My cluster name is'):
return line.rsplit(' ', 1)[1].strip()
return 'undefined'
def queues():
queue_list = []
with os.popen('bqueues -w -u `whoami`') as f:
f.readline() # read header
for line in f:
q = line.split(' ')[0].strip()
queue_list.append(q)
return queue_list
def accounts():
return []
def available_tasks(queue_id):
# split queue id into queue and parallel env
# list free slots
free_tasks = 0
max_tasks = 0
run_tasks = 0
queue_name = queue_id
q_output = check_output(['bqueues', queue_name]).splitlines()
for line in q_output:
if line.startswith(queue_name):
new_line = re.sub(' +', ' ', line).strip()
try:
max_tasks = int(new_line.split(' ')[4])
except:
pass
pen_tasks = int(new_line.split(' ')[8])
run_tasks = int(new_line.split(' ')[9])
sus_tasks = int(new_line.split(' ')[10])
return {'available': max_tasks - run_tasks, 'max tasks': max_tasks}
def tasks_per_node(queue_id):
host_list = None
q_output = check_output(['bqueues', '-l', queue_id]).splitlines()
for line in q_output:
if line.startswith('HOSTS:'):
host_list = line.strip().rsplit(' ', 1)[1].replace('/', '')
if host_list == 'none':
return 0
bhosts_output = check_output(['bhosts', '-l', host_list]).splitlines()
line = re.sub(' +', ' ', bhosts_output[2]).strip()
tasks = int(line.split(' ')[3])
return tasks
def node_config(queue_id):
# Find first node with queue and record node config
# bqueues -l queue_id
host_list = None
config = {}
q_output = check_output(['bqueues', '-l', queue_id]).splitlines()
for line in q_output:
if line.startswith('HOSTS:'):
host_list = line.strip().rsplit(' ', 1)[1].replace('/', '')
if host_list == 'none':
config['max task'] = 0
config['max thread'] = 0
config['max memory'] = 0
return config
bhosts_output = check_output(['bhosts', '-l', host_list]).splitlines()
line = re.sub(' +', ' ', bhosts_output[2]).strip()
tasks = int(line.split(' ')[3])
line = re.sub(' +', ' ', bhosts_output[6]).strip()
memory = float(line.split(' ')[11].replace('G', ''))
config['max task'] = tasks
config['max thread'] = tasks
config['max memory'] = memory
return config
def create_submit(queue_id, **kwargs):
queue_name = queue_id
num_tasks = 1
if 'num_tasks' in kwargs:
num_tasks = kwargs['num_tasks']
tpn = tasks_per_node(queue_id)
queue_tpn = tpn
if 'tasks_per_node' in kwargs:
tpn = min(tpn, kwargs['tasks_per_node'])
nc = node_config(queue_id)
qc = available_tasks(queue_id)
if qc['max tasks'] > 0:
num_tasks = min(num_tasks, qc['max tasks'])
num_threads_per_task = nc['max thread']
if 'num_threads_per_task' in kwargs:
num_threads_per_task = kwargs['num_threads_per_task']
num_threads_per_task = min(num_threads_per_task, int(
math.ceil(float(nc['max thread']) / float(tpn))))
my_name = kwargs.get('my_name', "myclusterjob")
my_output = kwargs.get('my_output', "myclusterjob.out")
my_script = kwargs.get('my_script', None)
if 'mycluster-' in my_script:
my_script = get_data(my_script)
user_email = kwargs.get('user_email', None)
project_name = kwargs.get('project_name', 'default')
wall_clock = kwargs.get('wall_clock', '12:00')
if ':' not in wall_clock:
wall_clock = wall_clock + ':00'
num_nodes = int(math.ceil(float(num_tasks) / float(tpn)))
num_queue_slots = num_nodes * queue_tpn
no_syscribe = kwargs.get('no_syscribe', False)
record_job = not no_syscribe
openmpi_args = kwargs.get('openmpi_args', "-bysocket -bind-to-socket")
qos = kwargs.get('qos', None)
template = load_template('lsf.jinja')
script_str = template.render(my_name=my_name,
my_script=my_script,
my_output=my_output,
user_email=user_email,
queue_name=queue_name,
num_queue_slots=num_queue_slots,
num_tasks=num_tasks,
tpn=tpn,
num_threads_per_task=num_threads_per_task,
num_nodes=num_nodes,
project_name=project_name,
wall_clock=wall_clock,
record_job=record_job,
openmpi_args=openmpi_args,
qos=qos)
return script_str
def submit(script_name, immediate, depends_on=None,
depends_on_always_run=False):
job_id = None
if depends_on and depends_on_always_run:
cmd = 'bsub -w "ended(%s)" < %s ' % (depends_on, script_name)
with os.popen(cmd) as f:
output = f.readline()
try:
job_id = int(output.split(' ')[1].replace(
'<', '').replace('>', ''))
except:
print 'Job submission failed: ' + output
elif depends_on is not None:
cmd = 'bsub -w "done(%s)" < %s ' % (depends_on, script_name)
with os.popen(cmd) as f:
output = f.readline()
try:
job_id = int(output.split(' ')[1].replace(
'<', '').replace('>', ''))
except:
print 'Job submission failed: ' + output
else:
with os.popen('bsub <' + script_name) as f:
output = f.readline()
try:
job_id = int(output.split(' ')[1].replace(
'<', '').replace('>', ''))
except:
print 'Job submission failed: ' + output
return job_id
def delete(job_id):
with os.popen('bkill ' + job_id) as f:
pass
def status():
status_dict = {}
with os.popen('bjobs -w') as f:
try:
f.readline() # read header
for line in f:
new_line = re.sub(' +', ' ', line.strip())
job_id = int(new_line.split(' ')[0])
state = new_line.split(' ')[2]
if state == 'RUN':
status_dict[job_id] = 'r'
else:
status_dict[job_id] = state
except e:
print e
return status_dict
def job_stats(job_id):
stats_dict = {}
with os.popen('bacct -l ' + str(job_id)) as f:
try:
line = f.readline()
new_line = re.sub(' +', ' ', line.strip())
stats_dict['wallclock'] = new_line.split(' ')[0]
stats_dict['cpu'] = new_line.split(' ')[1]
stats_dict['queue'] = new_line.split(' ')[2]
# float(new_line.split(' ')[4])*int(new_line.split(' ')[3])
stats_dict['mem'] = '-'
except:
print('LSF: Error reading job stats')
return stats_dict
def job_stats_enhanced(job_id):
"""
Get full job and step stats for job_id
"""
stats_dict = {}
with os.popen('bjobs -o "jobid run_time cpu_used queue slots stat exit_code start_time estimated_start_time finish_time delimiter=\'|\'" -noheader ' + str(job_id)) as f:
try:
line = f.readline()
cols = line.split('|')
stats_dict['job_id'] = cols[0]
if cols[1] != '-':
stats_dict['wallclock'] = timedelta(
seconds=float(cols[1].split(' ')[0]))
if cols[2] != '-':
stats_dict['cpu'] = timedelta(
seconds=float(cols[2].split(' ')[0]))
stats_dict['queue'] = cols[3]
stats_dict['status'] = cols[5]
stats_dict['exit_code'] = cols[6]
stats_dict['start'] = cols[7]
stats_dict['start_time'] = cols[8]
if stats_dict['status'] in ['DONE', 'EXIT']:
stats_dict['end'] = cols[9]
steps = []
stats_dict['steps'] = steps
except:
with os.popen('bhist -l ' + str(job_id)) as f:
try:
output = f.readlines()
for line in output:
if "Done successfully" in line:
stats_dict['status'] = 'DONE'
return stats_dict
elif "Completed <exit>" in line:
stats_dict['status'] = 'EXIT'
return stats_dict
else:
stats_dict['status'] = 'UNKNOWN'
except Exception as e:
print(e)
print('LSF: Error reading job stats')
stats_dict['status'] = 'UNKNOWN'
return stats_dict
def running_stats(job_id):
stats_dict = {}
with os.popen('bjobs -W ' + str(job_id)) as f:
try:
line = f.readline()
new_line = re.sub(' +', ' ', line.strip())
stats_dict['wallclock'] = new_line.split(' ')[0]
except:
pass
with os.popen('bjobs -W ' + str(job_id)) as f:
try:
line = f.readline()
new_line = re.sub(' +', ' ', line.strip())
ntasks = int(new_line.split(' ')[2])
stats_dict['mem'] = float(new_line.split(' ')[1]) * ntasks
stats_dict['cpu'] = float(new_line.split(' ')[0]) * ntasks
except:
pass
return stats_dict