diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 2bf66ec7..bd8153c9 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -12,6 +12,9 @@ pandas.DataFrame. """ +from typing import Optional + +import executorlib._version from executorlib.executor.base import BaseExecutor from executorlib.executor.flux import ( FluxClusterExecutor, @@ -22,12 +25,48 @@ SlurmClusterExecutor, SlurmJobExecutor, ) -from executorlib.standalone.cache import get_cache_data -from . import _version + +def get_cache_data(cache_directory: str) -> list[dict]: + """ + Collect all HDF5 files in the cache directory + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. + """ + from executorlib.standalone.hdf import get_cache_data + + return get_cache_data(cache_directory=cache_directory) + + +def terminate_tasks_in_cache( + cache_directory: str, + config_directory: Optional[str] = None, + backend: Optional[str] = None, +): + """ + Delete all jobs stored in the cache directory from the queuing system + + Args: + cache_directory (str): The directory to store cache files. + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. + """ + from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache + + return terminate_tasks_in_cache( + cache_directory=cache_directory, + config_directory=config_directory, + backend=backend, + ) + __all__: list[str] = [ "get_cache_data", + "terminate_tasks_in_cache", "BaseExecutor", "FluxJobExecutor", "FluxClusterExecutor", @@ -36,11 +75,4 @@ "SlurmClusterExecutor", ] -try: - from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache - - __all__ += ["terminate_tasks_in_cache"] -except ImportError: - pass - -__version__ = _version.__version__ +__version__ = executorlib._version.__version__ diff --git a/executorlib/standalone/cache.py b/executorlib/standalone/cache.py deleted file mode 100644 index 92e1ae68..00000000 --- a/executorlib/standalone/cache.py +++ /dev/null @@ -1,57 +0,0 @@ -import os - -import cloudpickle - -group_dict = { - "fn": "function", - "args": "input_args", - "kwargs": "input_kwargs", - "output": "output", - "error": "error", - "runtime": "runtime", - "queue_id": "queue_id", - "error_log_file": "error_log_file", -} - - -def get_cache_files(cache_directory: str) -> list[str]: - """ - Recursively find all HDF5 files in the cache_directory which contain outputs. - - Args: - cache_directory (str): The directory to store cache files. - - Returns: - list[str]: List of HDF5 file in the cache directory which contain outputs. - """ - file_lst = [] - cache_directory_abs = os.path.abspath(cache_directory) - for dirpath, _, filenames in os.walk(cache_directory_abs): - file_lst += [os.path.join(dirpath, f) for f in filenames if f.endswith("_o.h5")] - return file_lst - - -def get_cache_data(cache_directory: str) -> list[dict]: - """ - Collect all HDF5 files in the cache directory - - Args: - cache_directory (str): The directory to store cache files. - - Returns: - list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. - """ - import h5py - import numpy as np - - file_lst = [] - for file_name in get_cache_files(cache_directory=cache_directory): - with h5py.File(file_name, "r") as hdf: - file_content_dict = { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) - return file_lst diff --git a/executorlib/task_scheduler/file/hdf.py b/executorlib/standalone/hdf.py similarity index 67% rename from executorlib/task_scheduler/file/hdf.py rename to executorlib/standalone/hdf.py index 812198c9..5e8970f4 100644 --- a/executorlib/task_scheduler/file/hdf.py +++ b/executorlib/standalone/hdf.py @@ -5,7 +5,16 @@ import h5py import numpy as np -from executorlib.standalone.cache import group_dict +group_dict = { + "fn": "function", + "args": "input_args", + "kwargs": "input_kwargs", + "output": "output", + "error": "error", + "runtime": "runtime", + "queue_id": "queue_id", + "error_log_file": "error_log_file", +} def dump(file_name: Optional[str], data_dict: dict) -> None: @@ -110,3 +119,54 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: if "queue_id" in hdf: return cloudpickle.loads(np.void(hdf["/queue_id"])) return None + + +def get_cache_data(cache_directory: str) -> list[dict]: + """ + Collect all HDF5 files in the cache directory + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. + """ + return [ + _get_content_of_file(file_name=file_name) | {"filename": file_name} + for file_name in get_cache_files(cache_directory=cache_directory) + ] + + +def get_cache_files(cache_directory: str) -> list[str]: + """ + Recursively find all HDF5 files in the cache_directory which contain outputs. + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[str]: List of HDF5 file in the cache directory which contain outputs. + """ + file_lst = [] + cache_directory_abs = os.path.abspath(cache_directory) + for dirpath, _, filenames in os.walk(cache_directory_abs): + file_lst += [os.path.join(dirpath, f) for f in filenames if f.endswith("_o.h5")] + return file_lst + + +def _get_content_of_file(file_name: str) -> dict: + """ + Get content of an HDF5 file + + Args: + file_name (str): file name + + Returns: + dict: Content of HDF5 file + """ + with h5py.File(file_name, "r") as hdf: + return { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } diff --git a/executorlib/task_scheduler/file/backend.py b/executorlib/task_scheduler/file/backend.py index cbe869cc..a945d30c 100644 --- a/executorlib/task_scheduler/file/backend.py +++ b/executorlib/task_scheduler/file/backend.py @@ -3,7 +3,7 @@ from typing import Any from executorlib.standalone.error import backend_write_error_file -from executorlib.task_scheduler.file.hdf import dump, load +from executorlib.standalone.hdf import dump, load from executorlib.task_scheduler.file.shared import FutureItem diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 0cc2364f..66c5e4f4 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -3,9 +3,9 @@ from pysqa import QueueAdapter +from executorlib.standalone.hdf import dump, get_queue_id from executorlib.standalone.inputcheck import check_file_exists from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa -from executorlib.task_scheduler.file.hdf import dump, get_queue_id def execute_with_pysqa( diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 5902d8e1..c712c863 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -4,10 +4,9 @@ from concurrent.futures import Future from typing import Any, Callable, Optional -from executorlib.standalone.cache import get_cache_files from executorlib.standalone.command import get_cache_execute_command +from executorlib.standalone.hdf import get_cache_files, get_output from executorlib.standalone.serialize import serialize_funct -from executorlib.task_scheduler.file.hdf import get_output from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess diff --git a/executorlib/task_scheduler/file/subprocess_spawner.py b/executorlib/task_scheduler/file/subprocess_spawner.py index d418a3d4..c1b2157f 100644 --- a/executorlib/task_scheduler/file/subprocess_spawner.py +++ b/executorlib/task_scheduler/file/subprocess_spawner.py @@ -3,8 +3,8 @@ import time from typing import Optional +from executorlib.standalone.hdf import dump from executorlib.standalone.inputcheck import check_file_exists -from executorlib.task_scheduler.file.hdf import dump def execute_in_subprocess( diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index d4b57982..e64db61b 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -4,7 +4,6 @@ import time from typing import Callable, Optional -from executorlib.standalone.cache import get_cache_files from executorlib.standalone.command import get_interactive_execute_command from executorlib.standalone.interactive.communication import ( SocketInterface, @@ -130,7 +129,7 @@ def _execute_task_with_cache( cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be overwritten by setting the cache_key. """ - from executorlib.task_scheduler.file.hdf import dump, get_output + from executorlib.standalone.hdf import dump, get_cache_files, get_output task_key, data_dict = serialize_funct( fn=task_dict["fn"], diff --git a/tests/test_cache_backend_execute.py b/tests/test_cache_backend_execute.py index 30dd7238..4e7d681d 100644 --- a/tests/test_cache_backend_execute.py +++ b/tests/test_cache_backend_execute.py @@ -7,7 +7,7 @@ try: from executorlib.task_scheduler.file.backend import backend_execute_task_in_file from executorlib.task_scheduler.file.shared import _check_task_output, FutureItem - from executorlib.task_scheduler.file.hdf import dump, get_runtime + from executorlib.standalone.hdf import dump, get_runtime from executorlib.standalone.serialize import serialize_funct skip_h5io_test = False diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 107d5add..0968fabb 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -10,8 +10,9 @@ try: import flux.job - from executorlib.task_scheduler.file.hdf import dump - from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache, execute_with_pysqa + from executorlib import terminate_tasks_in_cache + from executorlib.standalone.hdf import dump + from executorlib.task_scheduler.file.queue_spawner import execute_with_pysqa from executorlib.standalone.scheduler import terminate_with_pysqa skip_flux_test = "FLUX_URI" not in os.environ diff --git a/tests/test_slurmclusterexecutor.py b/tests/test_slurmclusterexecutor.py index 4973037d..a26524e7 100644 --- a/tests/test_slurmclusterexecutor.py +++ b/tests/test_slurmclusterexecutor.py @@ -14,7 +14,7 @@ skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None try: - from executorlib.task_scheduler.file.hdf import dump + from executorlib.standalone.hdf import dump skip_h5py_test = False except ImportError: diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index fd77d8df..3327e583 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -4,7 +4,7 @@ try: - from executorlib.task_scheduler.file.hdf import ( + from executorlib.standalone.hdf import ( dump, load, get_output,