/
queuestatus.py
313 lines (270 loc) · 11.2 KB
/
queuestatus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
Set of functions to interact with the queuing system directly from within pyiron - optimized for the Sun grid engine.
"""
import pandas
import time
from pyiron_base.settings.generic import Settings
from pyiron_base.generic.util import static_isinstance
from pyiron_base.job.jobstatus import job_status_finished_lst
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
QUEUE_SCRIPT_PREFIX = "pi_"
s = Settings()
def queue_table(job_ids=[], project_only=True, full_table=False):
"""
Display the queuing system table as pandas.Dataframe
Args:
job_ids (list): check for a specific list of job IDs - empty list by default
project_only (bool): Query only for jobs within the current project - True by default
full_table (bool): Return all entries from the queuing system without filtering - False by default
Returns:
pandas.DataFrame: Output from the queuing system - optimized for the Sun grid engine
"""
if project_only and not job_ids:
return []
if s.queue_adapter is not None:
if full_table:
pandas.set_option('display.max_rows', None)
pandas.set_option('display.max_columns', None)
else:
pandas.reset_option('display.max_rows')
pandas.reset_option('display.max_columns')
df = s.queue_adapter.get_status_of_my_jobs()
if not project_only:
return df[
[
True if QUEUE_SCRIPT_PREFIX in job_name else False
for job_name in list(df.jobname)
]
]
else:
job_name_lst = [QUEUE_SCRIPT_PREFIX + str(job_id) for job_id in job_ids]
return df[
[
True if job_name in job_name_lst else False
for job_name in list(df.jobname)
]
]
else:
return None
def queue_check_job_is_waiting_or_running(item):
"""
Check if a job is still listed in the queue system as either waiting or running.
Args:
item (int, GenericJob): Provide either the job_ID or the full hamiltonian
Returns:
bool: [True/False]
"""
que_id = validate_que_request(item)
if s.queue_adapter is not None:
return s.queue_adapter.get_status_of_job(process_id=que_id) in [
"pending",
"running",
]
else:
return None
def queue_info_by_job_id(job_id):
"""
Display the queuing system info of job by qstat | grep shell command
as dictionary
Args:
requested_id (int): query for a specific job_id
Returns:
dict: Dictionary with the output from the queuing system - optimized for the Sun grid engine
"""
if s.queue_adapter is not None:
return s.queue_adapter.get_status_of_job(process_id=job_id)
else:
return None
def queue_is_empty():
"""
Check if the queue table is currently empty - no more jobs to wait for.
Returns:
bool: True if the table is empty, else False - optimized for the Sun grid engine
"""
if s.queue_adapter is not None:
return len(s.queue_adapter.get_status_of_my_jobs()) == 0
else:
return True
def queue_delete_job(item):
"""
Delete a job from the queuing system
Args:
item (int, pyiron_base.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian
Returns:
str: Output from the queuing system as string - optimized for the Sun grid engine
"""
que_id = validate_que_request(item)
if s.queue_adapter is not None:
return s.queue_adapter.delete_job(process_id=que_id)
else:
return None
def queue_enable_reservation(item):
"""
Enable a reservation for a particular job within the queuing system
Args:
item (int, pyiron_base.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian
Returns:
str: Output from the queuing system as string - optimized for the Sun grid engine
"""
que_id = validate_que_request(item)
if s.queue_adapter is not None:
if isinstance(que_id, list):
return [s.queue_adapter.enable_reservation(process_id=q) for q in que_id]
else:
return s.queue_adapter.enable_reservation(process_id=que_id)
else:
return None
def wait_for_job(job, interval_in_s=5, max_iterations=100):
"""
Sleep until the job is finished but maximum interval_in_s * max_iterations seconds.
Args:
job (pyiron_base.job.generic.GenericJob): Job to wait for
interval_in_s (int): interval when the job status is queried from the database - default 5 sec.
max_iterations (int): maximum number of iterations - default 100
Raises:
ValueError: max_iterations reached, job still running
"""
if job.status.string not in job_status_finished_lst:
if s.queue_adapter is not None and s.queue_adapter.remote_flag and job.server.queue is not None:
finished = False
for _ in range(max_iterations):
if not queue_check_job_is_waiting_or_running(item=job):
s.queue_adapter.transfer_file_to_remote(
file=job.project_hdf5.file_name,
transfer_back=True,
delete_remote=False
)
status_hdf5 = job.project_hdf5["status"]
job.status.string = status_hdf5
else:
status_hdf5 = job.status.string
if status_hdf5 in job_status_finished_lst:
job.transfer_from_remote()
finished = True
break
time.sleep(interval_in_s)
if not finished:
raise ValueError("Maximum iterations reached, but the job was not finished.")
else:
finished = False
for _ in range(max_iterations):
if s.database_is_disabled:
job.project.db.update()
job.refresh_job_status()
if job.status.string in job_status_finished_lst:
finished = True
break
time.sleep(interval_in_s)
if not finished:
raise ValueError("Maximum iterations reached, but the job was not finished.")
def wait_for_jobs(project, interval_in_s=5, max_iterations=100, recursive=True):
"""
Wait for the calculation in the project to be finished
Args:
project: Project instance the jobs is located in
interval_in_s (int): interval when the job status is queried from the database - default 5 sec.
max_iterations (int): maximum number of iterations - default 100
recursive (bool): search subprojects [True/False] - default=True
Raises:
ValueError: max_iterations reached, but jobs still running
"""
finished = False
for _ in range(max_iterations):
project.update_from_remote(recursive=True)
df = project.job_table(recursive=recursive)
if all(df.status.isin(job_status_finished_lst)):
finished = True
break
time.sleep(interval_in_s)
if not finished:
raise ValueError("Maximum iterations reached, but the job was not finished.")
def update_from_remote(project, recursive=True):
"""
Update jobs from the remote server
Args:
project: Project instance the jobs is located in
recursive (bool): search subprojects [True/False] - default=True
"""
if s.queue_adapter is not None and s.queue_adapter.remote_flag:
df_project = project.job_table(recursive=recursive)
df_submitted = df_project[df_project.status == "submitted"]
df_combined = df_project[df_project.status.isin(["running", "submitted"])]
df_queue = s.queue_adapter.get_status_of_my_jobs()
if len(df_queue) > 0 and len(df_queue[df_queue.jobname.str.startswith("pi_")]) > 0:
df_queue = df_queue[df_queue.jobname.str.startswith("pi_")]
df_queue["pyiron_id"] = df_queue.apply(
lambda x: int(x["jobname"].split("pi_")[1]),
axis=1
)
jobs_now_running_lst = df_queue[df_queue.status == "running"].pyiron_id.values
_ = [
project.set_job_status(job_specifier=job_id, status="running")
for job_id in df_submitted.id.values if job_id in jobs_now_running_lst
]
else:
jobs_now_running_lst = []
for job_id in df_combined.id.values:
if job_id not in jobs_now_running_lst:
job = project.inspect(job_id)
s.queue_adapter.transfer_file_to_remote(
file=job.project_hdf5.file_name,
transfer_back=True,
delete_remote=False
)
status_hdf5 = job.project_hdf5["status"]
project.set_job_status(
job_specifier=job.job_id,
status=status_hdf5
)
if status_hdf5 in job_status_finished_lst:
job_object = job.to_object()
job_object.transfer_from_remote()
def validate_que_request(item):
"""
Internal function to convert the job_ID or hamiltonian to the queuing system ID.
Args:
item (int, pyiron_base.job.generic.GenericJob): Provide either the job_ID or the full hamiltonian
Returns:
int: queuing system ID
"""
if isinstance(item, int):
que_id = item
elif static_isinstance(item.__class__, "pyiron_base.master.generic.GenericMaster"):
if item.server.queue_id:
que_id = item.server.queue_id
else:
queue_id_lst = [item.project.load(child_id).server.queue_id for child_id in item.child_ids]
que_id = [queue_id for queue_id in queue_id_lst if queue_id is not None]
if len(que_id) == 0:
raise ValueError("This job does not have a queue ID.")
elif static_isinstance(item.__class__, "pyiron_base.job.generic.GenericJob"):
if item.server.queue_id:
que_id = item.server.queue_id
else:
raise ValueError("This job does not have a queue ID.")
elif static_isinstance(item.__class__, "pyiron_base.job.core.JobCore"):
if "server" in item.project_hdf5.list_nodes():
server_hdf_dict = item.project_hdf5["server"]
if "qid" in server_hdf_dict.keys():
que_id = server_hdf_dict["qid"]
else:
raise ValueError("This job does not have a queue ID.")
else:
raise ValueError("This job does not have a queue ID.")
else:
raise TypeError(
"The queue can either query for IDs or for pyiron GenericJobObjects."
)
return que_id