Skip to content

Commit

Permalink
CalcJob: Allow to define order of copying of input files (aiidateam…
Browse files Browse the repository at this point in the history
…#6285)

There are three different sources of files that are copied to the working
directory on the remote computer where a calculation job is executed:

* Sandbox: files written to a temporary sandbox folder on the local file
  system written by the `CalcJob` plugin, first in the
  `prepare_for_submission` method, followed by the `presubmit` of the
  base class.
* Local: files written to the temporary sandbox folder by the engine
  based on the `local_copy_list` defined by the plugin in the
  `prepare_for_submission` method.
* Remote: files written directly to the remote working directory by the
  engine base on the `remote_copy_list` defined by the plugin in the
  `prepare_for_submission` method.

Historically, these operations were performed in the order of sandbox,
local and remote. For certain use cases, however, this was deemed
non-ideal, for example because files from the remote would override files
written by the plugin itself in the sandbox. This enum can be assigned to
the `CalcInfo` instance returned by the calculation job plugin
`prepare_for_submission` implementation to change the order.

Here, a new attribute `file_copy_operation_order` is added to the
`CalcInfo` datastructure. It takes a list of instance of the
`FileCopyOperation` enum. The enum defines the types of file copy
operations that exist, and the `CalcInfo` attribute specifies the order
in which they should be executed. This design makes it flexible to the
potential addition of other file copy operations in the future.

The default value is set to the current order of sandbox, local, remote.
It is set both in the `CalcJob.presubmit` in case the plugin
implementation did not explicitly specify it (where it is also type
checked) as well in the `upload_calculation` function, which is
sometimes directly without going through `CalcJob.presubmit` in which
case the default wouldn't be set.

To simplify the implementation, the code to copy these three types of
files are factored out of the `upload_calculation` function to the
functions `_copy_remote_files`, `_copy_remote_files` and
`_copy_sandbox_files` functions, respectively.

Finally, the original approach would not actually copy the local and
sandbox files separately directly to the remote working directory.
Rather, it would copy the local files to the sandbox, and then the
contents of the entire sandbox would be copied over. This required
updating the `provenance_exclude_list` with the files added from the
`local_copy_list` because they should not be copied to the node's
repository. The implementation is simplified by copying the local files
directly to the working directory, instead of going through the sandbox.
Since the transport interface only works with files and not streams in
memory, the implementation is still forced to first copy the local files
to a temporary directory on disk before copying them over to the working
directory using the transport.
  • Loading branch information
sphuber committed Mar 27, 2024
1 parent 58c0d84 commit 6898ff4
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 94 deletions.
37 changes: 37 additions & 0 deletions docs/source/topics/calculations/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ In this simple example, we define four simple attributes:
* ``remote_copy_list``: a list of tuples that instruct what files to copy to the working directory from the machine on which the job will run
* ``retrieve_list``: a list of tuples instructing which files should be retrieved from the working directory and stored in the local repository after the job has finished
See :ref:`topics:calculations:usage:calcjobs:file-copy-order` for details on the order in which input files are copied to the working directory.
In this example we only need to run a single code, so the ``codes_info`` list has a single ``CodeInfo`` datastructure.
This datastructure needs to define which code it needs to run, which is one of the inputs passed to the ``CalcJob``, and does so by means of its UUID.
Through the ``stdout_name`` attribute, we tell the engine where the output of the executable should be redirected to.
Expand Down Expand Up @@ -563,6 +564,42 @@ The parser implementation can then parse these files and store the relevant info
After the parser terminates, the engine will automatically clean up the sandbox folder with the temporarily retrieved files.
The concept of the ``retrieve_temporary_list`` is essentially that the files will be available during parsing and will be destroyed immediately afterwards.
.. _topics:calculations:usage:calcjobs:file-copy-order:
Controlling order of file copying
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. versionadded:: 2.6
Input files can come from three sources in a calculations job:
#. Sandbox: files that are written by the ``CalcJob`` plugin in the :meth:`~aiida.engine.processes.calcjobs.calcjob.CalcJob.prepare_for_submission` to the sandbox folder
#. Local: files of input nodes that are defined through the ``local_copy_list``
#. Remote: files of ``RemoteData`` input nodes that are defined through the ``remote_copy_list``
By default, these files are copied in the order of sandbox, local, and finally remote.
The order can be controlled through the ``file_copy_operation_order`` attribute of the :class:`~aiida.common.datastructures.CalcInfo` which takes a list of :class:`~aiida.common.datastructures.FileCopyOperation` instances, for example:
.. code-block:: python
class CustomFileCopyOrder(CalcJob)
def prepare_for_submission(self, _):
from aiida.common.datastructures import CalcInfo, CodeInfo, FileCopyOperation
code_info = CodeInfo()
code_info.code_uuid = self.inputs.code.uuid
calc_info = CalcInfo()
calc_info.codes_info = [code_info]
calc_info.file_copy_operation_order = [
FileCopyOperation.LOCAL,
FileCopyOperation.REMOTE,
FileCopyOperation.SANDBOX,
]
return calc_info
.. _topics:calculations:usage:calcjobs:stashing:
Stashing on the remote
Expand Down
28 changes: 28 additions & 0 deletions src/aiida/common/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,30 @@ class CalcJobState(Enum):
PARSING = 'parsing'


class FileCopyOperation(IntEnum):
"""Enum to represent the copy operations that are used when creating the working directory of a ``CalcJob``.
There are three different sources of files that are copied to the working directory on the remote computer where a
calculation job is executed:
* Local: files written to the temporary sandbox folder by the engine based on the ``local_copy_list`` defined
by the plugin in the ``prepare_for_submission`` method.
* Remote: files written directly to the remote working directory by the engine base on the ``remote_copy_list``
defined by the plugin in the ``prepare_for_submission`` method.
* Sandbox: files written to a temporary sandbox folder on the local file system written by the ``CalcJob``
plugin, first in the ``prepare_for_submission`` method, followed by the ``presubmit`` of the base class.
Historically, these operations were performed in the order of sandbox, local and remote. For certain use cases,
however, this was deemed non-ideal, for example because files from the remote would override files written by the
plugin itself in the sandbox. The ``CalcInfo.file_copy_operation_order`` attribute can be used to specify a list
of this enum to indicate the desired order for file copy operations.
"""

LOCAL = 0
REMOTE = 1
SANDBOX = 2


class CalcInfo(DefaultFieldsAttributeDict):
"""This object will store the data returned by the calculation plugin and to be
passed to the ExecManager.
Expand Down Expand Up @@ -85,6 +109,8 @@ class CalcInfo(DefaultFieldsAttributeDict):
but can also be `CodeRunMode.PARALLEL`)
* skip_submit: a flag that, when set to True, orders the engine to skip the submit/update steps (so no code will
run, it will only upload the files and then retrieve/parse).
* file_copy_operation_order: Order in which input files are copied to the working directory. Should be a list of
:class:`aiida.common.datastructures.FileCopyOperation` instances.
"""

_default_fields = (
Expand All @@ -110,6 +136,7 @@ class CalcInfo(DefaultFieldsAttributeDict):
'codes_info',
'codes_run_mode',
'skip_submit',
'file_copy_operation_order',
)

if TYPE_CHECKING:
Expand All @@ -135,6 +162,7 @@ class CalcInfo(DefaultFieldsAttributeDict):
codes_info: None | list[CodeInfo]
codes_run_mode: None | CodeRunMode
skip_submit: None | bool
file_copy_operation_order: None | list[FileCopyOperation]


class CodeInfo(DefaultFieldsAttributeDict):
Expand Down
205 changes: 118 additions & 87 deletions src/aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import shutil
from collections.abc import Mapping
from logging import LoggerAdapter
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union
from typing import Mapping as MappingType

from aiida.common import AIIDA_LOGGER, exceptions
from aiida.common.datastructures import CalcInfo
from aiida.common.datastructures import CalcInfo, FileCopyOperation
from aiida.common.folders import SandboxFolder
from aiida.common.links import LinkType
from aiida.engine.processes.exit_code import ExitCode
Expand Down Expand Up @@ -201,96 +201,30 @@ def upload_calculation(
remote_symlink_list = calc_info.remote_symlink_list or []
provenance_exclude_list = calc_info.provenance_exclude_list or []

for uuid, filename, target in local_copy_list:
logger.debug(f'[submission of calculation {node.uuid}] copying local file/folder to {target}')

try:
data_node = load_node(uuid=uuid)
except exceptions.NotExistent:
data_node = _find_data_node(inputs, uuid) if inputs else None

if data_node is None:
logger.warning(f'failed to load Node<{uuid}> specified in the `local_copy_list`')
file_copy_operation_order = calc_info.file_copy_operation_order or [
FileCopyOperation.SANDBOX,
FileCopyOperation.LOCAL,
FileCopyOperation.REMOTE,
]

for file_copy_operation in file_copy_operation_order:
if file_copy_operation is FileCopyOperation.LOCAL:
_copy_local_files(logger, node, transport, inputs, local_copy_list)
elif file_copy_operation is FileCopyOperation.REMOTE:
if not dry_run:
_copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list)
elif file_copy_operation is FileCopyOperation.SANDBOX:
if not dry_run:
_copy_sandbox_files(logger, node, transport, folder)
else:
# If no explicit source filename is defined, we assume the top-level directory
filename_source = filename or '.'
filename_target = target or ''

# Make the target filepath absolute and create any intermediate directories if they don't yet exist
filepath_target = pathlib.Path(folder.abspath) / filename_target
filepath_target.parent.mkdir(parents=True, exist_ok=True)

if data_node.base.repository.get_object(filename_source).file_type == FileType.DIRECTORY:
# If the source object is a directory, we copy its entire contents
data_node.base.repository.copy_tree(filepath_target, filename_source)
sources = data_node.base.repository.list_object_names(filename_source)
if filename_target:
sources = [str(pathlib.Path(filename_target) / subpath) for subpath in sources]
provenance_exclude_list.extend(sources)
else:
# Otherwise, simply copy the file
with folder.open(target, 'wb') as handle:
with data_node.base.repository.open(filename, 'rb') as source:
shutil.copyfileobj(source, handle)

provenance_exclude_list.append(target)
raise RuntimeError(f'file copy operation {file_copy_operation} is not yet implemented.')

# In a dry_run, the working directory is the raw input folder, which will already contain these resources
if not dry_run:
for filename in folder.get_content_list():
logger.debug(f'[submission of calculation {node.pk}] copying file/folder {filename}...')
transport.put(folder.get_abs_path(filename), filename)

for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_copy_list:
if remote_computer_uuid == computer.uuid:
logger.debug(
f'[submission of calculation {node.pk}] copying {dest_rel_path} '
f'remotely, directly on the machine {computer.label}'
)
try:
transport.copy(remote_abs_path, dest_rel_path)
except FileNotFoundError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
f'resource from {remote_abs_path} to {dest_rel_path}! NOT Stopping but just ignoring!.'
)
except (IOError, OSError):
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
f'resource from {remote_abs_path} to {dest_rel_path}! Stopping.'
)
raise
else:
raise NotImplementedError(
f'[submission of calculation {node.pk}] Remote copy between two different machines is '
'not implemented yet'
)

for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_symlink_list:
if remote_computer_uuid == computer.uuid:
logger.debug(
f'[submission of calculation {node.pk}] copying {dest_rel_path} remotely, '
f'directly on the machine {computer.label}'
)
remote_dirname = pathlib.Path(dest_rel_path).parent
try:
transport.makedirs(remote_dirname, ignore_existing=True)
transport.symlink(remote_abs_path, dest_rel_path)
except (IOError, OSError):
logger.warning(
f'[submission of calculation {node.pk}] Unable to create remote symlink '
f'from {remote_abs_path} to {dest_rel_path}! Stopping.'
)
raise
else:
raise IOError(
f'It is not possible to create a symlink between two different machines for calculation {node.pk}'
)
else:
if dry_run:
if remote_copy_list:
filepath = os.path.join(workdir, '_aiida_remote_copy_list.txt')
with open(filepath, 'w', encoding='utf-8') as handle: # type: ignore[assignment]
for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_copy_list:
for _, remote_abs_path, dest_rel_path in remote_copy_list:
handle.write(
f'would have copied {remote_abs_path} to {dest_rel_path} in working '
f'directory on remote {computer.label}'
Expand All @@ -299,7 +233,7 @@ def upload_calculation(
if remote_symlink_list:
filepath = os.path.join(workdir, '_aiida_remote_symlink_list.txt')
with open(filepath, 'w', encoding='utf-8') as handle: # type: ignore[assignment]
for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_symlink_list:
for _, remote_abs_path, dest_rel_path in remote_symlink_list:
handle.write(
f'would have created symlinks from {remote_abs_path} to {dest_rel_path} in working'
f'directory on remote {computer.label}'
Expand Down Expand Up @@ -346,6 +280,103 @@ def upload_calculation(
return None


def _copy_remote_files(logger, node, computer, transport, remote_copy_list, remote_symlink_list):
"""Perform the copy instructions of the ``remote_copy_list`` and ``remote_symlink_list``."""
for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_copy_list:
if remote_computer_uuid == computer.uuid:
logger.debug(
f'[submission of calculation {node.pk}] copying {dest_rel_path} '
f'remotely, directly on the machine {computer.label}'
)
try:
transport.copy(remote_abs_path, dest_rel_path)
except FileNotFoundError:
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
f'resource from {remote_abs_path} to {dest_rel_path}! NOT Stopping but just ignoring!.'
)
except (IOError, OSError):
logger.warning(
f'[submission of calculation {node.pk}] Unable to copy remote '
f'resource from {remote_abs_path} to {dest_rel_path}! Stopping.'
)
raise
else:
raise NotImplementedError(
f'[submission of calculation {node.pk}] Remote copy between two different machines is '
'not implemented yet'
)

for remote_computer_uuid, remote_abs_path, dest_rel_path in remote_symlink_list:
if remote_computer_uuid == computer.uuid:
logger.debug(
f'[submission of calculation {node.pk}] copying {dest_rel_path} remotely, '
f'directly on the machine {computer.label}'
)
remote_dirname = pathlib.Path(dest_rel_path).parent
try:
transport.makedirs(remote_dirname, ignore_existing=True)
transport.symlink(remote_abs_path, dest_rel_path)
except (IOError, OSError):
logger.warning(
f'[submission of calculation {node.pk}] Unable to create remote symlink '
f'from {remote_abs_path} to {dest_rel_path}! Stopping.'
)
raise
else:
raise IOError(
f'It is not possible to create a symlink between two different machines for calculation {node.pk}'
)


def _copy_local_files(logger, node, transport, inputs, local_copy_list):
"""Perform the copy instrctions of the ``local_copy_list``."""
with TemporaryDirectory() as tmpdir:
dirpath = pathlib.Path(tmpdir)

# The transport class can only copy files directly from the file system, so the files in the source node's repo
# have to first be copied to a temporary directory on disk.
for uuid, filename, target in local_copy_list:
logger.debug(f'[submission of calculation {node.uuid}] copying local file/folder to {target}')

try:
data_node = load_node(uuid=uuid)
except exceptions.NotExistent:
data_node = _find_data_node(inputs, uuid) if inputs else None

if data_node is None:
logger.warning(f'failed to load Node<{uuid}> specified in the `local_copy_list`')
continue

# If no explicit source filename is defined, we assume the top-level directory
filename_source = filename or '.'
filename_target = target or ''

# Make the target filepath absolute and create any intermediate directories if they don't yet exist
filepath_target = (dirpath / filename_target).resolve().absolute()
filepath_target.parent.mkdir(parents=True, exist_ok=True)

if data_node.base.repository.get_object(filename_source).file_type == FileType.DIRECTORY:
# If the source object is a directory, we copy its entire contents
data_node.base.repository.copy_tree(filepath_target, filename_source)
else:
# Otherwise, simply copy the file
with filepath_target.open('wb') as handle:
with data_node.base.repository.open(filename_source, 'rb') as source:
shutil.copyfileobj(source, handle)

# Now copy the contents of the temporary folder to the remote working directory using the transport
for filepath in dirpath.iterdir():
transport.put(str(filepath), filepath.name)


def _copy_sandbox_files(logger, node, transport, folder):
"""Copy the contents of the sandbox folder to the working directory."""
for filename in folder.get_content_list():
logger.debug(f'[submission of calculation {node.pk}] copying file/folder {filename}...')
transport.put(folder.get_abs_path(filename), filename)


def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | ExitCode:
"""Submit a previously uploaded `CalcJob` to the scheduler.
Expand Down
18 changes: 17 additions & 1 deletion src/aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from aiida import orm
from aiida.common import AttributeDict, exceptions
from aiida.common.datastructures import CalcInfo
from aiida.common.datastructures import CalcInfo, FileCopyOperation
from aiida.common.folders import Folder
from aiida.common.lang import classproperty, override
from aiida.common.links import LinkType
Expand Down Expand Up @@ -974,6 +974,22 @@ def presubmit(self, folder: Folder) -> CalcInfo:
codes_run_mode = calc_info.codes_run_mode

job_tmpl.codes_run_mode = codes_run_mode

if calc_info.file_copy_operation_order is not None:
if not isinstance(calc_info.file_copy_operation_order, list) or any( # type: ignore[redundant-expr]
not isinstance(e, FileCopyOperation) for e in calc_info.file_copy_operation_order
):
raise PluginInternalError(
'calc_info.file_copy_operation_order is not a list of `FileCopyOperation` enums.'
)
else:
# Set the default
calc_info.file_copy_operation_order = [
FileCopyOperation.SANDBOX,
FileCopyOperation.LOCAL,
FileCopyOperation.REMOTE,
]

########################################################################

custom_sched_commands = self.node.get_option('custom_scheduler_commands')
Expand Down
Loading

0 comments on commit 6898ff4

Please sign in to comment.