-
Notifications
You must be signed in to change notification settings - Fork 46
/
generic.py
491 lines (387 loc) · 17.4 KB
/
generic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
import inspect
from pyiron.base.job.generic import GenericJob
"""
The GenericMaster is the template class for all meta jobs
"""
__author__ = "Jan Janssen"
__copyright__ = "Copyright 2019, Max-Planck-Institut für Eisenforschung GmbH - " \
"Computational Materials Design (CM) Department"
__version__ = "1.0"
__maintainer__ = "Jan Janssen"
__email__ = "janssen@mpie.de"
__status__ = "production"
__date__ = "Sep 1, 2017"
try:
FileExistsError = FileExistsError
except NameError:
class FileExistsError(OSError):
pass
class GenericMaster(GenericJob):
"""
The GenericMaster is the template class for all meta jobs - meaning all jobs which contain multiple other jobs. It
defines the shared functionality of the different kind of job series.
Args:
project (ProjectHDFio): ProjectHDFio instance which points to the HDF5 file the job is stored in
job_name (str): name of the job, which has to be unique within the project
Attributes:
.. attribute:: job_name
name of the job, which has to be unique within the project
.. attribute:: status
execution status of the job, can be one of the following [initialized, appended, created, submitted,
running, aborted, collect, suspended, refresh,
busy, finished]
.. attribute:: job_id
unique id to identify the job in the pyiron database
.. attribute:: parent_id
job id of the predecessor job - the job which was executed before the current one in the current job series
.. attribute:: master_id
job id of the master job - a meta job which groups a series of jobs, which are executed either in parallel
or in serial.
.. attribute:: child_ids
list of child job ids - only meta jobs have child jobs - jobs which list the meta job as their master
.. attribute:: project
Project instance the jobs is located in
.. attribute:: project_hdf5
ProjectHDFio instance which points to the HDF5 file the job is stored in
.. attribute:: job_info_str
short string to describe the job by it is job_name and job ID - mainly used for logging
.. attribute:: working_directory
working directory of the job is executed in - outside the HDF5 file
.. attribute:: path
path to the job as a combination of absolute file system path and path within the HDF5 file.
.. attribute:: version
Version of the hamiltonian, which is also the version of the executable unless a custom executable is used.
.. attribute:: executable
Executable used to run the job - usually the path to an external executable.
.. attribute:: library_activated
For job types which offer a Python library pyiron can use the python library instead of an external
executable.
.. attribute:: server
Server object to handle the execution environment for the job.
.. attribute:: queue_id
the ID returned from the queuing system - it is most likely not the same as the job ID.
.. attribute:: logger
logger object to monitor the external execution and internal pyiron warnings.
.. attribute:: restart_file_list
list of files which are used to restart the calculation from these files.
.. attribute:: job_type
Job type object with all the available job types: ['ExampleJob', 'SerialMaster', 'ParallelMaster',
'ScriptJob', 'ListMaster']
.. attribute:: child_names
Dictionary matching the child ID to the child job name.
"""
def __init__(self, project, job_name):
super(GenericMaster, self).__init__(project, job_name=job_name)
self._job_name_lst = []
self._job_object_dict = {}
self._child_id_func = None
self._child_id_func_str = None
@property
def child_names(self):
"""
Dictionary matching the child ID to the child job name
Returns:
dict: {child_id: child job name }
"""
child_dict = {}
for child_id in self.child_ids:
child_dict[child_id] = self.project.db.get_item_by_id(child_id)["job"]
return child_dict
@property
def child_ids(self):
"""
list of child job ids - only meta jobs have child jobs - jobs which list the meta job as their master
Returns:
list: list of child job ids
"""
if self._child_id_func:
return self._child_id_func(self)
else:
return super(GenericMaster, self).child_ids
# def copy(self):
# """
# Copy the GenericJob object which links to the job and its HDF5 file
#
# Returns:
# GenericJob: New GenericJob object pointing to the same job
# """
# self_copied = super(GenericMaster, self).copy()
# self_copied._job_name_lst = self._job_name_lst[:]
# self._load_all_child_jobs(job_to_load=self)
# self_copied._job_object_dict = {key: value.copy() for key, value in self._job_object_dict.items()}
# self_copied._child_id_func = self._child_id_func
# self_copied._child_id_func_str = self._child_id_func_str
# return self_copied
def first_child_name(self):
"""
Get the name of the first child job
Returns:
str: name of the first child job
"""
return self.project.db.get_item_by_id(self.child_ids[0])['job']
def validate_ready_to_run(self):
"""
Validate that the calculation is ready to be executed. By default no generic checks are performed, but one could
check that the input information is complete or validate the consistency of the input at this point.
"""
pass
def append(self, job):
"""
Append a job to the GenericMaster - just like you would append an element to a list.
Args:
job (GenericJob): job to append
"""
if job.server.cores >= self.server.cores:
self.server.cores = job.server.cores
if job.job_name not in self._job_name_lst:
self._job_name_lst.append(job.job_name)
self._child_job_update_hdf(parent_job=self, child_job=job)
def pop(self, i=-1):
"""
Pop a job from the GenericMaster - just like you would pop an element from a list
Args:
i (int): position of the job. (Default is last element, -1.)
Returns:
GenericJob: job
"""
job_name_to_return = self._job_name_lst[i]
job_to_return = self._load_all_child_jobs(self._load_job_from_cache(job_name_to_return))
del self._job_name_lst[i]
with self.project_hdf5.open("input") as hdf5_input:
hdf5_input["job_list"] = self._job_name_lst
job_to_return.project_hdf5.remove_group()
job_to_return.project_hdf5 = self.project_hdf5.__class__(self.project, job_to_return.job_name,
h5_path='/' + job_to_return.job_name)
if isinstance(job_to_return, GenericMaster):
for sub_job in job_to_return._job_object_dict.values():
self._child_job_update_hdf(parent_job=job_to_return, child_job=sub_job)
job_to_return.status.initialized = True
return job_to_return
def move_to(self, project):
"""
Move the content of the job including the HDF5 file to a new location
Args:
project (ProjectHDFio): project to move the job to
Returns:
JobCore: JobCore object pointing to the new location.
"""
if self._job_id:
for child_id in self.child_ids:
child = self.project.load(child_id)
child.move_to(project.open(self.job_name + '_hdf5'))
super(GenericMaster, self).move_to(project)
def copy_to(self, project=None, new_job_name=None, input_only=False, new_database_entry=True):
"""
Copy the content of the job including the HDF5 file to a new location
Args:
project (ProjectHDFio): project to copy the job to
new_job_name (str): to duplicate the job within the same porject it is necessary to modify the job name
- optional
input_only (bool): [True/False] to copy only the input - default False
new_database_entry (bool): [True/False] to create a new database entry - default True
Returns:
GenericJob: GenericJob object pointing to the new location.
"""
new_generic_job = super(GenericMaster, self).copy_to(project=project, new_job_name=new_job_name,
input_only=input_only,
new_database_entry=new_database_entry)
if new_generic_job.job_id and new_database_entry and self._job_id:
for child_id in self.child_ids:
child = self.project.load(child_id)
new_child = child.copy_to(project.open(self.job_name + '_hdf5'),
new_database_entry=new_database_entry)
if new_database_entry and child.parent_id:
new_child.parent_id = new_generic_job.job_id
if new_database_entry and child.master_id:
new_child.master_id = new_generic_job.job_id
return new_generic_job
def to_hdf(self, hdf=None, group_name=None):
"""
Store the GenericMaster in an HDF5 file
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(GenericMaster, self).to_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
hdf5_input["job_list"] = self._job_name_lst
self._to_hdf_child_function(hdf=hdf5_input)
for job in self._job_object_dict.values():
job.to_hdf()
def from_hdf(self, hdf=None, group_name=None):
"""
Restore the GenericMaster from an HDF5 file
Args:
hdf (ProjectHDFio): HDF5 group object - optional
group_name (str): HDF5 subgroup name - optional
"""
super(GenericMaster, self).from_hdf(hdf=hdf, group_name=group_name)
with self.project_hdf5.open("input") as hdf5_input:
job_list_tmp = hdf5_input["job_list"]
self._from_hdf_child_function(hdf=hdf5_input)
self._job_name_lst = job_list_tmp
def set_child_id_func(self, child_id_func):
"""
Add an external function to derive a list of child IDs - experimental feature
Args:
child_id_func (Function): Python function which returns the list of child IDs
"""
self._child_id_func = child_id_func
self.save()
self.status.finished = True
def get_child_cores(self):
"""
Calculate the currently active number of cores, by summarizing all childs which are neither finished nor
aborted.
Returns:
(int): number of cores used
"""
return sum([int(db_entry['computer'].split('#')[1]) for db_entry in
self.project.db.get_items_dict({'masterid': self.job_id})
if db_entry['status'] not in ['finished', 'aborted']])
def __len__(self):
"""
Length of the GenericMaster equal the number of childs appended.
Returns:
int: length of the GenericMaster
"""
return len(self._job_name_lst)
def __getitem__(self, item):
"""
Get/ read data from the GenericMaster
Args:
item (str, slice): path to the data or key of the data object
Returns:
dict, list, float, int: data or data object
"""
child_id_lst = self.child_ids
child_name_lst = [self.project.db.get_item_by_id(child_id)["job"] for child_id in self.child_ids]
if isinstance(item, int):
item = self._job_name_lst[item]
return self._get_item_when_str(item=item, child_id_lst=child_id_lst, child_name_lst=child_name_lst)
def __getattr__(self, item):
"""
CHeck if a job with the specific name exists
Args:
item (str): name of the job
Returns:
"""
item_from_get_item = self.__getitem__(item=item)
if item_from_get_item is not None:
return item_from_get_item
else:
raise AttributeError
def _load_all_child_jobs(self, job_to_load):
"""
Helper function to load all child jobs to memory - like it was done in the previous implementation
Args:
job_to_load (GenericJob): job to be reloaded
Returns:
GenericJob: job to be reloaded - including all the child jobs and their child jobs
"""
if isinstance(job_to_load, GenericMaster):
for sub_job_name in job_to_load._job_name_lst:
job_to_load._job_object_dict[sub_job_name] = \
self._load_all_child_jobs(job_to_load._load_job_from_cache(sub_job_name))
return job_to_load
def _load_job_from_cache(self, job_name):
"""
Helper funcction to load a job either from the _job_object_dict or from the HDF5 file
Args:
job_name (str): name of the job
Returns:
GenericJob: the reloaded job
"""
if job_name in self._job_object_dict.keys():
return self._job_object_dict[job_name]
else:
ham_obj = self.project_hdf5.create_object(class_name=self._hdf5[job_name + '/TYPE'], project=self._hdf5,
job_name=job_name)
ham_obj.from_hdf()
return ham_obj
def _to_hdf_child_function(self, hdf):
"""
Helper function to store the child function in HDF5
Args:
hdf: HDF5 file object
"""
hdf["job_list"] = self._job_name_lst
if self._child_id_func is not None:
try:
hdf["child_id_func"] = inspect.getsource(self._child_id_func)
except IOError:
hdf["child_id_func"] = self._child_id_func_str
else:
hdf["child_id_func"] = "None"
def _from_hdf_child_function(self, hdf):
"""
Helper function to load the child function from HDF5
Args:
hdf: HDF5 file object
"""
try:
child_id_func_str = hdf["child_id_func"]
except ValueError:
child_id_func_str = "None"
if child_id_func_str == "None":
self._child_id_func = None
else:
self._child_id_func_str = child_id_func_str
self._child_id_func = self.get_function_from_string(child_id_func_str)
def _get_item_when_str(self, item, child_id_lst, child_name_lst):
"""
Helper function for __get_item__ when item is type string
Args:
item (str):
child_id_lst (list): a list containing all child job ids
child_name_lst (list): a list containing the names of all child jobs
Returns:
anything
"""
name_lst = item.split("/")
item_obj = name_lst[0]
if item_obj in child_name_lst:
child_id = child_id_lst[child_name_lst.index(item_obj)]
if len(name_lst) > 1:
return self.project.inspect(child_id)['/'.join(name_lst[1:])]
else:
return self.project.load(child_id, convert_to_object=True)
elif item_obj in self._job_name_lst:
child = self._load_job_from_cache(job_name=item_obj)
if len(name_lst) == 1:
return child
else:
return child['/'.join(name_lst[1:])]
else:
return super(GenericMaster, self).__getitem__(item)
def _child_job_update_hdf(self, parent_job, child_job):
"""
Args:
parent_job:
child_job:
"""
child_job.project_hdf5.file_name = parent_job.project_hdf5.file_name
child_job.project_hdf5.h5_path = parent_job.project_hdf5.h5_path + '/' + child_job.job_name
if isinstance(child_job, GenericMaster):
for sub_job_name in child_job._job_name_lst:
self._child_job_update_hdf(parent_job=child_job, child_job=child_job._load_job_from_cache(sub_job_name))
parent_job._job_object_dict[child_job.job_name] = child_job
def _executable_activate_mpi(self):
"""
Internal helper function to switch the executable to MPI mode
"""
pass
@staticmethod
def get_function_from_string(function_str):
"""
Convert a string of source code to a function
Args:
function_str: function source code
Returns:
function:
"""
exec(function_str)
return eval(function_str.split("(")[0][4:])