diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 7b47910b..3674d0f0 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -8,8 +8,10 @@ SlurmClusterExecutor, SlurmJobExecutor, ) +from executorlib.standalone.cache import get_cache_data __all__: list[str] = [ + "get_cache_data", "FluxJobExecutor", "FluxClusterExecutor", "SingleNodeExecutor", @@ -17,11 +19,4 @@ "SlurmClusterExecutor", ] -try: - from executorlib.standalone.hdf import get_cache_data -except ImportError: - pass -else: - __all__ += ["get_cache_data"] - __version__ = _get_versions()["version"] diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index c7568eb9..21f51719 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -1,4 +1,3 @@ -import contextlib from typing import Callable, Optional, Union from executorlib.executor.base import ExecutorBase @@ -17,12 +16,6 @@ from executorlib.task_scheduler.interactive.dependency import DependencyTaskScheduler from executorlib.task_scheduler.interactive.onetoone import OneProcessTaskScheduler -with contextlib.suppress(ImportError): - from executorlib.task_scheduler.interactive.fluxspawner import ( - FluxPythonSpawner, - validate_max_workers, - ) - class FluxJobExecutor(ExecutorBase): """ @@ -440,6 +433,11 @@ def create_flux_executor( Returns: InteractiveStepExecutor/ InteractiveExecutor """ + from executorlib.task_scheduler.interactive.fluxspawner import ( + FluxPythonSpawner, + validate_max_workers, + ) + if resource_dict is None: resource_dict = {} cores_per_worker = resource_dict.get("cores", 1) diff --git a/executorlib/standalone/cache.py b/executorlib/standalone/cache.py new file mode 100644 index 00000000..dae1334e --- /dev/null +++ b/executorlib/standalone/cache.py @@ -0,0 +1,42 @@ +import os + +import cloudpickle +import numpy as np + +group_dict = { + "fn": "function", + "args": "input_args", + "kwargs": "input_kwargs", + "output": "output", + "error": "error", + "runtime": "runtime", + "queue_id": "queue_id", +} + + +def get_cache_data(cache_directory: str) -> list[dict]: + """ + Collect all HDF5 files in the cache directory + + Args: + cache_directory (str): The directory to store cache files. + + Returns: + list[dict]: List of dictionaries each representing on of the HDF5 files in the cache directory. + """ + import h5py + + file_lst = [] + for task_key in os.listdir(cache_directory): + file_name = os.path.join(cache_directory, task_key, "cache.h5out") + os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) + if os.path.exists(file_name): + with h5py.File(file_name, "r") as hdf: + file_content_dict = { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } + file_content_dict["filename"] = file_name + file_lst.append(file_content_dict) + return file_lst diff --git a/executorlib/task_scheduler/file/backend.py b/executorlib/task_scheduler/file/backend.py index 63fe6ea5..f5c846f8 100644 --- a/executorlib/task_scheduler/file/backend.py +++ b/executorlib/task_scheduler/file/backend.py @@ -2,7 +2,7 @@ import time from typing import Any -from executorlib.standalone.hdf import dump, load +from executorlib.task_scheduler.file.hdf import dump, load from executorlib.task_scheduler.file.shared import FutureItem diff --git a/executorlib/standalone/hdf.py b/executorlib/task_scheduler/file/hdf.py similarity index 76% rename from executorlib/standalone/hdf.py rename to executorlib/task_scheduler/file/hdf.py index 8fb26f72..3e9829b7 100644 --- a/executorlib/standalone/hdf.py +++ b/executorlib/task_scheduler/file/hdf.py @@ -1,19 +1,10 @@ -import os from typing import Any, Optional import cloudpickle import h5py import numpy as np -group_dict = { - "fn": "function", - "args": "input_args", - "kwargs": "input_kwargs", - "output": "output", - "error": "error", - "runtime": "runtime", - "queue_id": "queue_id", -} +from executorlib.standalone.cache import group_dict def dump(file_name: Optional[str], data_dict: dict) -> None: @@ -98,25 +89,17 @@ def get_runtime(file_name: str) -> float: def get_queue_id(file_name: Optional[str]) -> Optional[int]: + """ + Get queuing system id from HDF5 file + + Args: + file_name (str): file name of the HDF5 file as absolute path + + Returns: + int: queuing system id from the execution of the python function + """ if file_name is not None: with h5py.File(file_name, "r") as hdf: if "queue_id" in hdf: return cloudpickle.loads(np.void(hdf["/queue_id"])) return None - - -def get_cache_data(cache_directory: str) -> list[dict]: - file_lst = [] - for task_key in os.listdir(cache_directory): - file_name = os.path.join(cache_directory, task_key, "cache.h5out") - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) - if os.path.exists(file_name): - with h5py.File(file_name, "r") as hdf: - file_content_dict = { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) - return file_lst diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 4cee4754..ac6ecc5b 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -4,8 +4,8 @@ from pysqa import QueueAdapter -from executorlib.standalone.hdf import dump, get_queue_id from executorlib.standalone.inputcheck import check_file_exists +from executorlib.task_scheduler.file.hdf import dump, get_queue_id def execute_with_pysqa( diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 36c3f693..313a8b68 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -7,8 +7,8 @@ from typing import Any, Callable, Optional from executorlib.standalone.command import get_command_path -from executorlib.standalone.hdf import dump, get_output from executorlib.standalone.serialize import serialize_funct_h5 +from executorlib.task_scheduler.file.hdf import dump, get_output class FutureItem: diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index cc582347..4761d928 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -140,7 +140,7 @@ def _execute_task_with_cache( future_queue (Queue): Queue for receiving new tasks. cache_directory (str): The directory to store cache files. """ - from executorlib.standalone.hdf import dump, get_output + from executorlib.task_scheduler.file.hdf import dump, get_output task_key, data_dict = serialize_funct_h5( fn=task_dict["fn"], diff --git a/tests/test_cache_backend_execute.py b/tests/test_cache_backend_execute.py index 2e38f2cf..f780070f 100644 --- a/tests/test_cache_backend_execute.py +++ b/tests/test_cache_backend_execute.py @@ -7,7 +7,7 @@ try: from executorlib.task_scheduler.file.backend import backend_execute_task_in_file from executorlib.task_scheduler.file.shared import _check_task_output, FutureItem - from executorlib.standalone.hdf import dump, get_runtime + from executorlib.task_scheduler.file.hdf import dump, get_runtime from executorlib.standalone.serialize import serialize_funct_h5 skip_h5io_test = False diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index bded9cab..b850992c 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -2,11 +2,11 @@ import shutil import unittest -from executorlib import SingleNodeExecutor +from executorlib import SingleNodeExecutor, get_cache_data from executorlib.standalone.serialize import cloudpickle_register try: - from executorlib import get_cache_data + import h5py skip_h5py_test = False except ImportError: diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index 95d1aa6e..4dc56168 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -4,7 +4,7 @@ try: - from executorlib.standalone.hdf import ( + from executorlib.task_scheduler.file.hdf import ( dump, load, get_output,