From eeecff78c8bd94d70b918f6ad0a765e84940f196 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 5 Aug 2025 09:17:28 +0200 Subject: [PATCH 1/5] Refactor HDF5 interface --- executorlib/__init__.py | 50 ++++++++++++--- executorlib/standalone/cache.py | 57 ----------------- .../file => standalone}/hdf.py | 62 ++++++++++++++++++- executorlib/task_scheduler/file/backend.py | 2 +- .../task_scheduler/file/queue_spawner.py | 2 +- executorlib/task_scheduler/file/shared.py | 3 +- .../task_scheduler/file/subprocess_spawner.py | 2 +- .../task_scheduler/interactive/shared.py | 3 +- tests/test_cache_backend_execute.py | 2 +- tests/test_fluxclusterexecutor.py | 2 +- tests/test_slurmclusterexecutor.py | 2 +- tests/test_standalone_hdf.py | 2 +- 12 files changed, 110 insertions(+), 79 deletions(-) delete mode 100644 executorlib/standalone/cache.py rename executorlib/{task_scheduler/file => standalone}/hdf.py (67%) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 2bf66ec7..bc464cfd 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -22,12 +22,49 @@ SlurmClusterExecutor, SlurmJobExecutor, ) -from executorlib.standalone.cache import get_cache_data +import executorlib._version + + +def get_cache_data(cache_directory: str) -> list[dict]: + """ + Collect all HDF5 files in the cache directory + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. + """ + from executorlib.standalone.hdf import get_cache_data + + return get_cache_data(cache_directory=cache_directory) + + +def terminate_tasks_in_cache( + cache_directory: str, + config_directory: Optional[str] = None, + backend: Optional[str] = None, +): + """ + Delete all jobs stored in the cache directory from the queuing system + + Args: + cache_directory (str): The directory to store cache files. + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks ["slurm", "flux"]. + """ + from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache + + return terminate_tasks_in_cache( + cache_directory=cache_directory, + config_directory=config_directory, + backend=backend, + ) -from . import _version __all__: list[str] = [ "get_cache_data", + "terminate_tasks_in_cache", "BaseExecutor", "FluxJobExecutor", "FluxClusterExecutor", @@ -36,11 +73,4 @@ "SlurmClusterExecutor", ] -try: - from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache - - __all__ += ["terminate_tasks_in_cache"] -except ImportError: - pass - -__version__ = _version.__version__ +__version__ = executorlib._version.__version__ diff --git a/executorlib/standalone/cache.py b/executorlib/standalone/cache.py deleted file mode 100644 index 92e1ae68..00000000 --- a/executorlib/standalone/cache.py +++ /dev/null @@ -1,57 +0,0 @@ -import os - -import cloudpickle - -group_dict = { - "fn": "function", - "args": "input_args", - "kwargs": "input_kwargs", - "output": "output", - "error": "error", - "runtime": "runtime", - "queue_id": "queue_id", - "error_log_file": "error_log_file", -} - - -def get_cache_files(cache_directory: str) -> list[str]: - """ - Recursively find all HDF5 files in the cache_directory which contain outputs. - - Args: - cache_directory (str): The directory to store cache files. - - Returns: - list[str]: List of HDF5 file in the cache directory which contain outputs. - """ - file_lst = [] - cache_directory_abs = os.path.abspath(cache_directory) - for dirpath, _, filenames in os.walk(cache_directory_abs): - file_lst += [os.path.join(dirpath, f) for f in filenames if f.endswith("_o.h5")] - return file_lst - - -def get_cache_data(cache_directory: str) -> list[dict]: - """ - Collect all HDF5 files in the cache directory - - Args: - cache_directory (str): The directory to store cache files. - - Returns: - list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. - """ - import h5py - import numpy as np - - file_lst = [] - for file_name in get_cache_files(cache_directory=cache_directory): - with h5py.File(file_name, "r") as hdf: - file_content_dict = { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) - return file_lst diff --git a/executorlib/task_scheduler/file/hdf.py b/executorlib/standalone/hdf.py similarity index 67% rename from executorlib/task_scheduler/file/hdf.py rename to executorlib/standalone/hdf.py index 812198c9..97666055 100644 --- a/executorlib/task_scheduler/file/hdf.py +++ b/executorlib/standalone/hdf.py @@ -5,7 +5,16 @@ import h5py import numpy as np -from executorlib.standalone.cache import group_dict +group_dict = { + "fn": "function", + "args": "input_args", + "kwargs": "input_kwargs", + "output": "output", + "error": "error", + "runtime": "runtime", + "queue_id": "queue_id", + "error_log_file": "error_log_file", +} def dump(file_name: Optional[str], data_dict: dict) -> None: @@ -110,3 +119,54 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: if "queue_id" in hdf: return cloudpickle.loads(np.void(hdf["/queue_id"])) return None + + +def get_content_of_file(file_name: str) -> dict: + """ + Get content of an HDF5 file + + Args: + file_name (str): file name + + Returns: + dict: Content of HDF5 file + """ + with h5py.File(file_name, "r") as hdf: + return { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } + + +def get_cache_data(cache_directory: str) -> list[dict]: + """ + Collect all HDF5 files in the cache directory + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. + """ + return [ + get_content_of_file(file_name=file_name) | {"filename": file_name} + for file_name in get_cache_files(cache_directory=cache_directory) + ] + + +def get_cache_files(cache_directory: str) -> list[str]: + """ + Recursively find all HDF5 files in the cache_directory which contain outputs. + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[str]: List of HDF5 file in the cache directory which contain outputs. + """ + file_lst = [] + cache_directory_abs = os.path.abspath(cache_directory) + for dirpath, _, filenames in os.walk(cache_directory_abs): + file_lst += [os.path.join(dirpath, f) for f in filenames if f.endswith("_o.h5")] + return file_lst diff --git a/executorlib/task_scheduler/file/backend.py b/executorlib/task_scheduler/file/backend.py index cbe869cc..a945d30c 100644 --- a/executorlib/task_scheduler/file/backend.py +++ b/executorlib/task_scheduler/file/backend.py @@ -3,7 +3,7 @@ from typing import Any from executorlib.standalone.error import backend_write_error_file -from executorlib.task_scheduler.file.hdf import dump, load +from executorlib.standalone.hdf import dump, load from executorlib.task_scheduler.file.shared import FutureItem diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 0cc2364f..1a727660 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -5,7 +5,7 @@ from executorlib.standalone.inputcheck import check_file_exists from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa -from executorlib.task_scheduler.file.hdf import dump, get_queue_id +from executorlib.standalone.hdf import dump, get_queue_id def execute_with_pysqa( diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 5902d8e1..4f89dbdb 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -4,10 +4,9 @@ from concurrent.futures import Future from typing import Any, Callable, Optional -from executorlib.standalone.cache import get_cache_files from executorlib.standalone.command import get_cache_execute_command from executorlib.standalone.serialize import serialize_funct -from executorlib.task_scheduler.file.hdf import get_output +from executorlib.standalone.hdf import get_cache_files, get_output from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess diff --git a/executorlib/task_scheduler/file/subprocess_spawner.py b/executorlib/task_scheduler/file/subprocess_spawner.py index d418a3d4..f6cddb82 100644 --- a/executorlib/task_scheduler/file/subprocess_spawner.py +++ b/executorlib/task_scheduler/file/subprocess_spawner.py @@ -4,7 +4,7 @@ from typing import Optional from executorlib.standalone.inputcheck import check_file_exists -from executorlib.task_scheduler.file.hdf import dump +from executorlib.standalone.hdf import dump def execute_in_subprocess( diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index d4b57982..416efa9d 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -4,7 +4,6 @@ import time from typing import Callable, Optional -from executorlib.standalone.cache import get_cache_files from executorlib.standalone.command import get_interactive_execute_command from executorlib.standalone.interactive.communication import ( SocketInterface, @@ -130,7 +129,7 @@ def _execute_task_with_cache( cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be overwritten by setting the cache_key. """ - from executorlib.task_scheduler.file.hdf import dump, get_output + from executorlib.standalone.hdf import dump, get_output, get_cache_files task_key, data_dict = serialize_funct( fn=task_dict["fn"], diff --git a/tests/test_cache_backend_execute.py b/tests/test_cache_backend_execute.py index 30dd7238..4e7d681d 100644 --- a/tests/test_cache_backend_execute.py +++ b/tests/test_cache_backend_execute.py @@ -7,7 +7,7 @@ try: from executorlib.task_scheduler.file.backend import backend_execute_task_in_file from executorlib.task_scheduler.file.shared import _check_task_output, FutureItem - from executorlib.task_scheduler.file.hdf import dump, get_runtime + from executorlib.standalone.hdf import dump, get_runtime from executorlib.standalone.serialize import serialize_funct skip_h5io_test = False diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 107d5add..52e4536a 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -10,7 +10,7 @@ try: import flux.job - from executorlib.task_scheduler.file.hdf import dump + from executorlib.standalone.hdf import dump from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache, execute_with_pysqa from executorlib.standalone.scheduler import terminate_with_pysqa diff --git a/tests/test_slurmclusterexecutor.py b/tests/test_slurmclusterexecutor.py index 4973037d..a26524e7 100644 --- a/tests/test_slurmclusterexecutor.py +++ b/tests/test_slurmclusterexecutor.py @@ -14,7 +14,7 @@ skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None try: - from executorlib.task_scheduler.file.hdf import dump + from executorlib.standalone.hdf import dump skip_h5py_test = False except ImportError: diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index fd77d8df..3327e583 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -4,7 +4,7 @@ try: - from executorlib.task_scheduler.file.hdf import ( + from executorlib.standalone.hdf import ( dump, load, get_output, From 2702fcbfd2c36e17107417d4ee840a9ee6edc5a7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 5 Aug 2025 07:18:03 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/__init__.py | 2 +- executorlib/task_scheduler/file/queue_spawner.py | 2 +- executorlib/task_scheduler/file/shared.py | 2 +- executorlib/task_scheduler/file/subprocess_spawner.py | 2 +- executorlib/task_scheduler/interactive/shared.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index bc464cfd..2310c0a7 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -12,6 +12,7 @@ pandas.DataFrame. """ +import executorlib._version from executorlib.executor.base import BaseExecutor from executorlib.executor.flux import ( FluxClusterExecutor, @@ -22,7 +23,6 @@ SlurmClusterExecutor, SlurmJobExecutor, ) -import executorlib._version def get_cache_data(cache_directory: str) -> list[dict]: diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 1a727660..66c5e4f4 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -3,9 +3,9 @@ from pysqa import QueueAdapter +from executorlib.standalone.hdf import dump, get_queue_id from executorlib.standalone.inputcheck import check_file_exists from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa -from executorlib.standalone.hdf import dump, get_queue_id def execute_with_pysqa( diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 4f89dbdb..c712c863 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -5,8 +5,8 @@ from typing import Any, Callable, Optional from executorlib.standalone.command import get_cache_execute_command -from executorlib.standalone.serialize import serialize_funct from executorlib.standalone.hdf import get_cache_files, get_output +from executorlib.standalone.serialize import serialize_funct from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess diff --git a/executorlib/task_scheduler/file/subprocess_spawner.py b/executorlib/task_scheduler/file/subprocess_spawner.py index f6cddb82..c1b2157f 100644 --- a/executorlib/task_scheduler/file/subprocess_spawner.py +++ b/executorlib/task_scheduler/file/subprocess_spawner.py @@ -3,8 +3,8 @@ import time from typing import Optional -from executorlib.standalone.inputcheck import check_file_exists from executorlib.standalone.hdf import dump +from executorlib.standalone.inputcheck import check_file_exists def execute_in_subprocess( diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 416efa9d..e64db61b 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -129,7 +129,7 @@ def _execute_task_with_cache( cache_key (str, optional): By default the cache_key is generated based on the function hash, this can be overwritten by setting the cache_key. """ - from executorlib.standalone.hdf import dump, get_output, get_cache_files + from executorlib.standalone.hdf import dump, get_cache_files, get_output task_key, data_dict = serialize_funct( fn=task_dict["fn"], From f8dd9392947491a6e1561214027415141e6652df Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 5 Aug 2025 09:19:39 +0200 Subject: [PATCH 3/5] fixes --- executorlib/__init__.py | 1 + executorlib/standalone/hdf.py | 38 +++++++++++++++++------------------ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index bc464cfd..86e1194a 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -11,6 +11,7 @@ Finally, the get_cache_data() function allows users to cache the content of their current cache directory in one pandas.DataFrame. """ +from typing import Optional from executorlib.executor.base import BaseExecutor from executorlib.executor.flux import ( diff --git a/executorlib/standalone/hdf.py b/executorlib/standalone/hdf.py index 97666055..5e8970f4 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/standalone/hdf.py @@ -121,24 +121,6 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: return None -def get_content_of_file(file_name: str) -> dict: - """ - Get content of an HDF5 file - - Args: - file_name (str): file name - - Returns: - dict: Content of HDF5 file - """ - with h5py.File(file_name, "r") as hdf: - return { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - - def get_cache_data(cache_directory: str) -> list[dict]: """ Collect all HDF5 files in the cache directory @@ -150,7 +132,7 @@ def get_cache_data(cache_directory: str) -> list[dict]: list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. """ return [ - get_content_of_file(file_name=file_name) | {"filename": file_name} + _get_content_of_file(file_name=file_name) | {"filename": file_name} for file_name in get_cache_files(cache_directory=cache_directory) ] @@ -170,3 +152,21 @@ def get_cache_files(cache_directory: str) -> list[str]: for dirpath, _, filenames in os.walk(cache_directory_abs): file_lst += [os.path.join(dirpath, f) for f in filenames if f.endswith("_o.h5")] return file_lst + + +def _get_content_of_file(file_name: str) -> dict: + """ + Get content of an HDF5 file + + Args: + file_name (str): file name + + Returns: + dict: Content of HDF5 file + """ + with h5py.File(file_name, "r") as hdf: + return { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } From d86766403e1c55f80cb6995b36f170f86afa288f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 5 Aug 2025 07:19:52 +0000 Subject: [PATCH 4/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 081f5635..bd8153c9 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -11,6 +11,7 @@ Finally, the get_cache_data() function allows users to cache the content of their current cache directory in one pandas.DataFrame. """ + from typing import Optional import executorlib._version From 8b4589d422e6949b9bad76c7bf0c488409a758f8 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Tue, 5 Aug 2025 09:27:24 +0200 Subject: [PATCH 5/5] fix import --- tests/test_fluxclusterexecutor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index 52e4536a..0968fabb 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -10,8 +10,9 @@ try: import flux.job + from executorlib import terminate_tasks_in_cache from executorlib.standalone.hdf import dump - from executorlib.task_scheduler.file.queue_spawner import terminate_tasks_in_cache, execute_with_pysqa + from executorlib.task_scheduler.file.queue_spawner import execute_with_pysqa from executorlib.standalone.scheduler import terminate_with_pysqa skip_flux_test = "FLUX_URI" not in os.environ