Skip to content

Commit

Permalink
Merge pull request #1477 from pyiron/executablecontainer_runstatic
Browse files Browse the repository at this point in the history
Introduce calculate() function
  • Loading branch information
jan-janssen authored Jun 20, 2024
2 parents 64c5158 + c1addc8 commit db1f546
Show file tree
Hide file tree
Showing 9 changed files with 420 additions and 134 deletions.
2 changes: 1 addition & 1 deletion pyiron_base/jobs/datamining.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ def __init__(self, project, job_name):
)
self._enforce_update = False
self._job_status = ["finished"]
self._python_only_job = True
self._job_with_calculate_function = True
self.analysis_project = project.project

@property
Expand Down
61 changes: 46 additions & 15 deletions pyiron_base/jobs/flex/executablecontainer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import cloudpickle
import numpy as np

from pyiron_base.jobs.job.template import TemplateJob
from pyiron_base.jobs.job.runfunction import CalculateFunctionCaller


class ExecutableContainerJob(TemplateJob):
Expand Down Expand Up @@ -42,7 +44,35 @@ class ExecutableContainerJob(TemplateJob):
def __init__(self, project, job_name):
super().__init__(project, job_name)
self._write_input_funct = None
self._collect_output_funct = None
# Set job_with_calculate_function flag to true to use run_static() to execute the python function generated by
# the job with its arguments job.get_calculate_function(**job.calculate_kwargs) without calling the old
# interface with write_input() and collect_output(). Finally, the output dictionary is stored in the HDF5 file
# using self.save_output(output_dict, shell_output)
self._job_with_calculate_function = True

@property
def calculate_kwargs(self) -> dict:
"""
Generate keyword arguments for the calculate() function.
Example:
>>> calculate_function = job.get_calculate_function()
>>> shell_output, parsed_output, job_crashed = calculate_function(**job.calculate_kwargs)
>>> job.save_output(output_dict=parsed_output, shell_output=shell_output)
Returns:
dict: keyword arguments for the calculate() function
"""
kwargs = super().calculate_kwargs
kwargs.update(
{
"input_parameter_dict": self.input.to_builtin(),
"executable_script": self.executable.executable_path,
"shell_parameter": True,
}
)
return kwargs

def set_job_type(
self,
Expand Down Expand Up @@ -72,22 +102,23 @@ def set_job_type(
if default_input_dict is not None:
self.input.update(default_input_dict)

def write_input(self):
if self._write_input_funct is not None:
self._write_input_funct(
input_dict=self.input.to_builtin(),
working_directory=self.working_directory,
)
def get_calculate_function(self) -> callable:
"""
Generate calculate() function
def run_static(self):
self.storage.output.stdout = super().run_static()
Example:
def collect_output(self):
if self._collect_output_funct is not None:
self.output.update(
self._collect_output_funct(working_directory=self.working_directory)
)
self.to_hdf()
>>> calculate_function = job.get_calculate_function()
>>> shell_output, parsed_output, job_crashed = calculate_function(**job.calculate_kwargs)
>>> job.save_output(output_dict=parsed_output, shell_output=shell_output)
Returns:
callable: calculate() functione
"""
return CalculateFunctionCaller(
write_input_funct=self._write_input_funct,
collect_output_funct=self._collect_output_funct,
)

def to_dict(self):
job_dict = super().to_dict()
Expand Down
180 changes: 129 additions & 51 deletions pyiron_base/jobs/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import platform
import posixpath
from typing import Optional
import warnings

from h5io_browser.base import _read_hdf, _write_hdf
Expand All @@ -27,6 +28,9 @@
from pyiron_base.jobs.job.extension.files import File
from pyiron_base.jobs.job.extension.jobstatus import JobStatus
from pyiron_base.jobs.job.runfunction import (
CalculateFunctionCaller,
execute_job_with_calculate_function,
execute_job_with_external_executable,
run_job_with_parameter_repair,
run_job_with_status_initialized,
run_job_with_status_created,
Expand All @@ -39,7 +43,6 @@
run_job_with_status_finished,
run_job_with_runmode_modal,
run_job_with_runmode_queue,
execute_job_with_external_executable,
write_input_files_from_input_dict,
)
from pyiron_base.jobs.job.util import (
Expand Down Expand Up @@ -160,10 +163,11 @@ def __init__(self, project, job_name):
self._restart_file_dict = dict()
self._exclude_nodes_hdf = list()
self._exclude_groups_hdf = list()
self._collect_output_funct = None
self._executor_type = None
self._process = None
self._compress_by_default = False
self._python_only_job = False
self._job_with_calculate_function = False
self._write_work_dir_warnings = True
self.interactive_cache = None
self.error = GenericError(working_directory=self.project_hdf5.working_directory)
Expand Down Expand Up @@ -417,24 +421,110 @@ def executor_type(self, exe):
f"Expected an executor class or string representing one, but got {exe}"
)

@property
def calculate_kwargs(self) -> dict:
"""
Generate keyword arguments for the calculate() function. A new simulation code only has to extend the
get_input_parameter_dict() function which by default specifies an hierarchical dictionary with files_to_write
and files_to_copy.
Example:
>>> calculate_function = job.get_calculate_function()
>>> shell_output, parsed_output, job_crashed = calculate_function(**job.calculate_kwargs)
>>> job.save_output(output_dict=parsed_output, shell_output=shell_output)
Returns:
dict: keyword arguments for the calculate() function
"""
executable, shell = self.executable.get_input_for_subprocess_call(
cores=self.server.cores, threads=self.server.threads, gpus=self.server.gpus
)
return {
"working_directory": self.working_directory,
"input_parameter_dict": self.get_input_parameter_dict(),
"executable_script": executable,
"shell_parameter": shell,
"cores": self.server.cores,
"threads": self.server.threads,
"gpus": self.server.gpus,
"conda_environment_name": self.server.conda_environment_name,
"conda_environment_path": self.server.conda_environment_path,
"accept_crash": self.server.accept_crash,
"accepted_return_codes": self.executable.accepted_return_codes,
"output_parameter_dict": self.get_output_parameter_dict(),
}

def clear_job(self):
"""
Convenience function to clear job info after suspend. Mimics deletion of all the job info after suspend in a
local test environment.
"""
del self.__name__
del self.__version__
del self._executable
del self._server
del self._logger
del self._import_directory
del self._status
del self._restart_file_list
del self._restart_file_dict

def copy(self):
"""
Copy the GenericJob object which links to the job and its HDF5 file
Returns:
GenericJob: New GenericJob object pointing to the same job
"""
# Store all job arguments in the HDF5 file
delete_file_after_copy = _job_store_before_copy(job=self)

# Copy Python object - super().copy() causes recursion error for serial master
copied_self = self.__class__(
job_name=self.job_name, project=self.project_hdf5.open("..")
)
copied_self.reset_job_id()

# Reload object from HDF5 file
_job_reload_after_copy(
job=copied_self, delete_file_after_copy=delete_file_after_copy
)

# Copy executor - it cannot be copied and is just linked instead
if self.server.executor is not None:
copied_self.server.executor = self.server.executor
if self.server.future is not None and not self.server.future.done():
raise RuntimeError(
"Jobs whose server has executor and future attributes cannot be copied unless the future is `done()`"
)
return copied_self

def collect_logfiles(self):
"""
Collect the log files of the external executable and store the information in the HDF5 file. This method has
to be implemented in the individual hamiltonians.
"""
pass

def write_input(self):
def get_calculate_function(self) -> callable:
"""
Call routines that generate the code specific input files
Generate calculate() function
Example:
>>> calculate_function = job.get_calculate_function()
>>> shell_output, parsed_output, job_crashed = calculate_function(**job.calculate_kwargs)
>>> job.save_output(output_dict=parsed_output, shell_output=shell_output)
Returns:
callable: calculate() functione
"""
write_input_files_from_input_dict(
input_dict=self.get_input_file_dict(),
working_directory=self.working_directory,
return CalculateFunctionCaller(
collect_output_funct=self._collect_output_funct,
)

def get_input_file_dict(self) -> dict:
def get_input_parameter_dict(self) -> dict:
"""
Get an hierarchical dictionary of input files. On the first level the dictionary is divided in file_to_create
and files_to_copy. Both are dictionaries use the file names as keys. In file_to_create the values are strings
Expand All @@ -447,7 +537,7 @@ def get_input_file_dict(self) -> dict:
if (
state.settings.configuration["write_work_dir_warnings"]
and self._write_work_dir_warnings
and not self._python_only_job
and not self._job_with_calculate_function
):
content = [
"Files in this directory are intended to be written and read by pyiron. \n\n",
Expand All @@ -469,6 +559,9 @@ def get_input_file_dict(self) -> dict:
"files_to_copy": _get_restart_copy_dict(job=self),
}

def get_output_parameter_dict(self):
return {}

def collect_output(self):
"""
Collect the output files of the external executable and store the information in the HDF5 file. This method has
Expand All @@ -478,6 +571,23 @@ def collect_output(self):
"read procedure must be defined for derived Hamilton!"
)

def save_output(
self, output_dict: Optional[dict] = None, shell_output: Optional[str] = None
):
"""
Store output of the calculate function in the HDF5 file.
Args:
output_dict (dict): hierarchical output dictionary to be stored in the HDF5 file.
shell_output (str): shell output from calling the external executable to be stored in the HDF5 file.
"""
if shell_output is not None:
self.storage.output.stdout = shell_output
if output_dict is not None:
self.output.update(output_dict)
if shell_output is not None or output_dict is not None:
self.to_hdf()

def suspend(self):
"""
Suspend the job by storing the object and its state persistently in HDF5 file and exit it.
Expand Down Expand Up @@ -517,51 +627,16 @@ def refresh_job_status(self):
else:
self.status.finished = True

def clear_job(self):
"""
Convenience function to clear job info after suspend. Mimics deletion of all the job info after suspend in a
local test environment.
"""
del self.__name__
del self.__version__
del self._executable
del self._server
del self._logger
del self._import_directory
del self._status
del self._restart_file_list
del self._restart_file_dict

def copy(self):
def write_input(self):
"""
Copy the GenericJob object which links to the job and its HDF5 file
Call routines that generate the code specific input files
Returns:
GenericJob: New GenericJob object pointing to the same job
"""
# Store all job arguments in the HDF5 file
delete_file_after_copy = _job_store_before_copy(job=self)

# Copy Python object - super().copy() causes recursion error for serial master
copied_self = self.__class__(
job_name=self.job_name, project=self.project_hdf5.open("..")
)
copied_self.reset_job_id()

# Reload object from HDF5 file
_job_reload_after_copy(
job=copied_self, delete_file_after_copy=delete_file_after_copy
write_input_files_from_input_dict(
input_dict=self.get_input_parameter_dict(),
working_directory=self.working_directory,
)

# Copy executor - it cannot be copied and is just linked instead
if self.server.executor is not None:
copied_self.server.executor = self.server.executor
if self.server.future is not None and not self.server.future.done():
raise RuntimeError(
"Jobs whose server has executor and future attributes cannot be copied unless the future is `done()`"
)
return copied_self

def _internal_copy_to(
self,
project=None,
Expand Down Expand Up @@ -838,7 +913,10 @@ def run_static(self):
"""
The run static function is called by run to execute the simulation.
"""
return execute_job_with_external_executable(job=self)
if self._job_with_calculate_function:
execute_job_with_calculate_function(job=self)
else:
return execute_job_with_external_executable(job=self)

def run_if_scheduler(self):
"""
Expand Down Expand Up @@ -1502,7 +1580,7 @@ def _create_job_structure(self, debug=False):
self.save()

def _check_if_input_should_be_written(self):
if self._python_only_job:
if self._job_with_calculate_function:
return False
else:
return not (
Expand Down
Loading

0 comments on commit db1f546

Please sign in to comment.