-
Notifications
You must be signed in to change notification settings - Fork 13
/
jobloader.py
120 lines (93 loc) · 3.57 KB
/
jobloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# coding: utf-8
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
# Distributed under the terms of "New BSD License", see the LICENSE file.
"""
A helper class to be assigned to the project, which facilitates tab-completion when
loading jobs.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
import numpy as np
from pyiron_base.state import state
from pyiron_base.database.jobtable import get_job_id
from pyiron_base.jobs.job.util import _get_safe_job_name
if TYPE_CHECKING:
from pyiron_base.jobs.job.generic import GenericJob
from pyiron_base.jobs.job.path import JobPath
from pyiron_base.project.generic import Project
class _JobByAttribute(ABC):
"""
A parent class for accessing project jobs by a call and a job specifier, or by tab
completion.
"""
def __init__(self, project: Project):
self._project = project
@property
def _job_table(self):
return self._project.job_table(columns=["job"])
@property
def _job_names(self):
return self._job_table["job"].values
def __dir__(self):
return self._job_names
def _id_from_name(self, name):
return self._job_table.loc[self._job_names == name, "id"].values[0]
def __getattr__(self, item):
return self._project.load_from_jobpath(
job_id=self._id_from_name(item), convert_to_object=self.convert_to_object
)
def __getitem__(self, item):
return self.__getattr__(item)
def __call__(self, job_specifier, convert_to_object=None):
if self._project.sql_query is not None:
state.logger.warning(
f"SQL filter '{self._project.sql_query}' is active (may exclude job)"
)
if not isinstance(job_specifier, (int, np.integer)):
job_specifier = _get_safe_job_name(name=job_specifier)
job_id = get_job_id(
database=self._project.db,
sql_query=self._project.sql_query,
user=self._project.user,
project_path=self._project.project_path,
job_specifier=job_specifier,
)
if job_id is None:
state.logger.warning(
f"Job '{job_specifier}' does not exist and cannot be loaded"
)
return None
return self._project.load_from_jobpath(
job_id=job_id,
convert_to_object=convert_to_object
if convert_to_object is not None
else self.convert_to_object,
)
@property
@abstractmethod
def convert_to_object(self):
pass
class JobLoader(_JobByAttribute):
"""
Load an existing pyiron object - most commonly a job - from the database
Args:
job_specifier (str, int): name of the job or job ID
Returns:
GenericJob, JobCore: Either the full GenericJob object or just a reduced JobCore object
"""
convert_to_object = True
def __call__(self, job_specifier, convert_to_object=None) -> GenericJob:
return super().__call__(job_specifier, convert_to_object=convert_to_object)
class JobInspector(_JobByAttribute):
"""
Inspect an existing pyiron object - most commonly a job - from the database
Args:
job_specifier (str, int): name of the job or job ID
Returns:
JobCore: Access to the HDF5 object - not a GenericJob object - use :meth:`~.Project.load()`
instead.
"""
convert_to_object = False
def __call__(self, job_specifier) -> JobPath:
return super().__call__(job_specifier)