/
jobtable.py
431 lines (382 loc) · 14.9 KB
/
jobtable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
import pandas
import sys
import os
import numpy as np
from pyiron.base.settings.generic import Settings
"""
The Jobtable module provides a set of top level functions to interact with the database.
"""
__author__ = "Jan Janssen"
__copyright__ = (
"Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
"Computational Materials Design (CM) Department"
)
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
s = Settings()
def _job_dict(
database,
sql_query,
user,
project_path,
recursive,
job=None,
sub_job_name="%",
element_lst=None,
):
"""
Internal function to access the database from the project directly.
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]
job (str): job_name - by default None
sub_job_name (str): path inside the HDF5 file - "%" by default to accept any path
element_lst (list): list of elements required in the chemical formular - by default None
Returns:
list: the function returns a list of dicts like get_items_sql, but it does not format datetime:
[{'chemicalformula': u'Ni108',
'computer': u'mapc157',
'hamilton': u'LAMMPS',
'hamversion': u'1.1',
'id': 24,
'job': u'DOF_1_0',
'parentid': 21L,
'project': u'lammps.phonons.Ni_fcc',
'projectpath': u'D:/PyIron/PyIron_data/projects',
'status': u'finished',
'timestart': datetime.datetime(2016, 6, 24, 10, 17, 3, 140000),
'timestop': datetime.datetime(2016, 6, 24, 10, 17, 3, 173000),
'totalcputime': 0.033,
'username': u'test'},
{'chemicalformula': u'Ni108',
'computer': u'mapc157',
'hamilton': u'LAMMPS',
'hamversion': u'1.1',
'id': 21,
'job': u'ref',
'parentid': 20L,
'project': u'lammps.phonons.Ni_fcc',
'projectpath': u'D:/PyIron/PyIron_data/projects',
'status': u'finished',
'timestart': datetime.datetime(2016, 6, 24, 10, 17, 2, 429000),
'timestop': datetime.datetime(2016, 6, 24, 10, 17, 2, 463000),
'totalcputime': 0.034,
'username': u'test'},.......]
"""
dict_clause = {}
# FOR GET_ITEMS_SQL: clause = []
if user is not None:
dict_clause["username"] = str(user)
# FOR GET_ITEMS_SQL: clause.append("username = '" + self.user + "'")
if sql_query is not None:
# FOR GET_ITEMS_SQL: clause.append(self.sql_query)
if "AND" in sql_query:
cl_split = sql_query.split(" AND ")
elif "and" in sql_query:
cl_split = sql_query.split(" and ")
else:
cl_split = [sql_query]
dict_clause.update(
{str(element.split()[0]): element.split()[2] for element in cl_split}
)
if job is not None:
dict_clause["job"] = str(job)
if recursive:
dict_clause["project"] = str(project_path) + "%"
else:
dict_clause["project"] = str(project_path)
if sub_job_name is None:
dict_clause["subjob"] = None
elif sub_job_name != "%":
dict_clause["subjob"] = str(sub_job_name)
if element_lst is not None:
dict_clause["element_lst"] = element_lst
s.logger.debug("sql_query: %s", str(dict_clause))
return database.get_items_dict(dict_clause)
def get_db_columns(database):
"""
Get column names
Args:
database (DatabaseAccess): Database object
Returns:
list: list of column names like:
['id',
'parentid',
'masterid',
'projectpath',
'project',
'job',
'subjob',
'chemicalformula',
'status',
'hamilton',
'hamversion',
'username',
'computer',
'timestart',
'timestop',
'totalcputime']
"""
return database.get_table_headings()
def job_table(
database,
sql_query,
user,
project_path,
recursive=True,
columns=None,
all_columns=False,
sort_by="id",
max_colwidth=200,
full_table=False,
element_lst=None,
job_name_contains='',
):
"""
Access the job_table
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]
columns (list): by default only the columns ['job', 'project', 'chemicalformula'] are selected, but the
user can select a subset of ['id', 'status', 'chemicalformula', 'job', 'subjob', 'project',
'projectpath', 'timestart', 'timestop', 'totalcputime', 'computer', 'hamilton', 'hamversion',
'parentid', 'masterid']
all_columns (bool): Select all columns - this overwrites the columns option.
sort_by (str): Sort by a specific column
max_colwidth (int): set the column width
full_table (bool): Whether to show the entire pandas table
element_lst (list): list of elements required in the chemical formular - by default None
job_name_contains (str): a string which should be contained in every job_name
Returns:
pandas.Dataframe: Return the result as a pandas.Dataframe object
"""
if columns is None:
columns = ["job", "project", "chemicalformula"]
all_db = [
"id",
"status",
"chemicalformula",
"job",
"subjob",
"projectpath",
"project",
"timestart",
"timestop",
"totalcputime",
"computer",
"hamilton",
"hamversion",
"parentid",
"masterid",
]
if all_columns:
columns = all_db
job_dict = _job_dict(
database=database,
sql_query=sql_query,
user=user,
project_path=project_path,
recursive=recursive,
element_lst=element_lst,
)
if full_table:
pandas.set_option('display.max_rows', None)
pandas.set_option('display.max_columns', None)
else:
pandas.reset_option('display.max_rows')
pandas.reset_option('display.max_columns')
pandas.set_option("display.max_colwidth", max_colwidth)
df = pandas.DataFrame(job_dict)
if len(job_dict) == 0:
return df
if job_name_contains != '':
df = df[df.job.str.contains(job_name_contains)]
if sort_by in columns:
return df[columns].sort_values(by=sort_by)
return df[columns]
def get_jobs(database, sql_query, user, project_path, recursive=True, columns=None):
"""
Internal function to return the jobs as dictionary rather than a pandas.Dataframe
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]
columns (list): by default only the columns ['id', 'project'] are selected, but the user can select a subset
of ['id', 'status', 'chemicalformula', 'job', 'subjob', 'project', 'projectpath', 'timestart',
'timestop', 'totalcputime', 'computer', 'hamilton', 'hamversion', 'parentid', 'masterid']
Returns:
dict: columns are used as keys and point to a list of the corresponding values
"""
if columns is None:
columns = ["id", "project"]
df = job_table(database, sql_query, user, project_path, recursive, columns=columns)
if len(df) == 0:
dictionary = {}
for key in columns:
dictionary[key] = list()
return dictionary
# return {key: list() for key in columns}
dictionary = {}
for key in df.keys():
dictionary[key] = df[
key
].tolist() # ToDo: Check difference of tolist and to_list
return dictionary
# return {key: df[key].tolist() for key in df.keys()}
def get_job_ids(database, sql_query, user, project_path, recursive=True):
"""
Return the job IDs matching a specific query
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
recursive (bool): search subprojects [True/False]
Returns:
list: a list of job IDs
"""
return get_jobs(database, sql_query, user, project_path, recursive=recursive)["id"]
def get_child_ids(database, sql_query, user, project_path, job_specifier, status=None):
"""
Get the childs for a specific job
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
job_specifier (str): name of the master job or the master jobs job ID
status (str): filter childs which match a specific status - None by default
Returns:
list: list of child IDs
"""
id_master = get_job_id(database, sql_query, user, project_path, job_specifier)
if id_master is None:
return []
else:
search_dict = {"masterid": str(id_master)}
if status is not None:
search_dict["status"] = status
return sorted(
[
job["id"]
for job in database.get_items_dict(
search_dict, return_all_columns=False
)
]
)
def get_job_id(database, sql_query, user, project_path, job_specifier):
"""
get the job_id for job named job_name in the local project path from database
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
job_specifier (str): name of the job or job ID
Returns:
int: job ID of the job
"""
if isinstance(job_specifier, (int, np.integer)):
return job_specifier # is id
job_specifier.replace(".", "_")
# if job_specifier[0] is not '/':
# sub_job_name = '/' + job_specifier
# else:
# sub_job_name = job_specifier
# job_dict = _job_dict(database, sql_query, user, project_path, recursive=False, # job=job_specifier,
# sub_job_name=sub_job_name)
# if len(job_dict) == 0:
# job_dict = _job_dict(database, sql_query, user, project_path, recursive=True, # job=job_specifier,
# sub_job_name=sub_job_name)
job_dict = _job_dict(
database, sql_query, user, project_path, recursive=False, job=job_specifier
)
if len(job_dict) == 0:
job_dict = _job_dict(
database, sql_query, user, project_path, recursive=True, job=job_specifier
)
if len(job_dict) == 0:
return None
elif len(job_dict) == 1:
return job_dict[0]["id"]
else:
raise ValueError(
"job name '{0}' in this project is not unique".format(job_dict)
)
def set_job_status(database, sql_query, user, project_path, job_specifier, status):
"""
Set the status of a particular job
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
job_specifier (str): name of the job or job ID
status (str): job status can be one of the following ['initialized', 'appended', 'created', 'submitted',
'running', 'aborted', 'collect', 'suspended', 'refresh', 'busy', 'finished']
"""
database.item_update(
{"status": str(status)},
get_job_id(database, sql_query, user, project_path, job_specifier),
)
def get_job_status(database, sql_query, user, project_path, job_specifier):
"""
Get the status of a particular job
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
job_specifier (str): name of the job or job ID
Returns:
str: job status can be one of the following ['initialized', 'appended', 'created', 'submitted', 'running',
'aborted', 'collect', 'suspended', 'refresh', 'busy', 'finished']
"""
try:
return database.get_item_by_id(
get_job_id(database, sql_query, user, project_path, job_specifier)
)["status"]
except KeyError:
return None
def get_job_working_directory(database, sql_query, user, project_path, job_specifier):
"""
Get the working directory of a particular job
Args:
database (DatabaseAccess): Database object
sql_query (str): SQL query to enter a more specific request
user (str): username of the user whoes user space should be searched
project_path (str): root_path - this is in contrast to the project_path in GenericPath
job_specifier (str): name of the job or job ID
Returns:
str: working directory as absolute path
"""
try:
db_entry = database.get_item_by_id(
get_job_id(database, sql_query, user, project_path, job_specifier)
)
if db_entry:
job_name = db_entry["subjob"][1:]
return os.path.join(
db_entry["projectpath"],
db_entry["project"],
job_name + "_hdf5",
job_name,
)
else:
return None
except KeyError:
return None