diff --git a/.ci_support/environment-mpich.yml b/.ci_support/environment-mpich.yml index ccde13ed..411ecde6 100644 --- a/.ci_support/environment-mpich.yml +++ b/.ci_support/environment-mpich.yml @@ -12,3 +12,4 @@ dependencies: - networkx =3.4.2 - pygraphviz =1.14 - ipython =8.29.0 +- pysqa =0.2.0 diff --git a/.ci_support/environment-openmpi.yml b/.ci_support/environment-openmpi.yml index 973513fc..b2d7c810 100644 --- a/.ci_support/environment-openmpi.yml +++ b/.ci_support/environment-openmpi.yml @@ -11,4 +11,5 @@ dependencies: - matplotlib =3.9.2 - networkx =3.4.2 - pygraphviz =1.14 +- pysqa =0.2.0 - ipython =8.29.0 diff --git a/.github/workflows/unittest-flux-mpich.yml b/.github/workflows/unittest-flux-mpich.yml index 1ed22ca4..7d2b5b76 100644 --- a/.github/workflows/unittest-flux-mpich.yml +++ b/.github/workflows/unittest-flux-mpich.yml @@ -34,4 +34,4 @@ jobs: timeout-minutes: 5 run: > flux start - python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py; + python -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py; diff --git a/.github/workflows/unittest-flux-openmpi.yml b/.github/workflows/unittest-flux-openmpi.yml index 5cf3315f..c954553e 100644 --- a/.github/workflows/unittest-flux-openmpi.yml +++ b/.github/workflows/unittest-flux-openmpi.yml @@ -34,7 +34,7 @@ jobs: timeout-minutes: 5 run: > flux start - coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py; + coverage run -a --omit="executorlib/_version.py,tests/*" -m unittest tests/test_flux_executor.py tests/test_executor_backend_flux.py tests/test_cache_executor_pysqa_flux.py; coverage xml env: PYMPIPOOL_PMIX: "pmix" diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index f9919680..a2191e38 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -18,6 +18,8 @@ def __init__( cwd: Optional[str] = None, execute_function: callable = execute_in_subprocess, terminate_function: Optional[callable] = None, + config_directory: Optional[str] = None, + backend: Optional[str] = None, ): """ Initialize the FileExecutor. @@ -27,7 +29,9 @@ def __init__( execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. terminate_function (callable, optional): The function to terminate the tasks. - cwd (str/None): current working directory where the parallel python task is executed + cwd (str, optional): current working directory where the parallel python task is executed + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() if execute_function == execute_in_subprocess and terminate_function is None: @@ -44,6 +48,8 @@ def __init__( "cores_per_worker": cores_per_worker, "cwd": cwd, "terminate_function": terminate_function, + "config_directory": config_directory, + "backend": backend, }, ) ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index 2f8ba947..f89e279a 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -53,6 +53,8 @@ def execute_tasks_h5( cores_per_worker: int = 1, cwd: Optional[str] = None, terminate_function: Optional[callable] = None, + config_directory: Optional[str] = None, + backend: Optional[str] = None, ) -> None: """ Execute tasks stored in a queue using HDF5 files. @@ -64,6 +66,8 @@ def execute_tasks_h5( execute_function (callable): The function to execute the tasks. cwd (str/None): current working directory where the parallel python task is executed terminate_function (callable): The function to terminate the tasks. + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks. Returns: None @@ -117,6 +121,8 @@ def execute_tasks_h5( process_dict[k] for k in future_wait_key_lst ], resource_dict=resource_dict, + config_directory=config_directory, + backend=backend, ) file_name_dict[task_key] = os.path.join( cache_directory, task_key + ".h5out" diff --git a/executorlib/standalone/cache/queue.py b/executorlib/standalone/cache/queue.py new file mode 100644 index 00000000..c1b6b176 --- /dev/null +++ b/executorlib/standalone/cache/queue.py @@ -0,0 +1,39 @@ +from typing import List, Optional + +from pysqa import QueueAdapter + + +def execute_with_pysqa( + command: str, + resource_dict: dict, + task_dependent_lst: List[int] = [], + config_directory: Optional[str] = None, + backend: Optional[str] = None, +) -> int: + """ + Execute a command by submitting it to the queuing system + + Args: + command (list): The command to be executed. + task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. + resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. + Example resource dictionary: { + cwd: None, + } + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks. + + Returns: + int: queuing system ID + """ + if resource_dict is None: + resource_dict = {"cwd": "."} + qa = QueueAdapter(directory=config_directory, queue_type=backend) + submit_kwargs = { + "command": " ".join(command), + "dependency_list": [str(qid) for qid in task_dependent_lst], + "working_directory": resource_dict["cwd"], + } + del resource_dict["cwd"] + submit_kwargs.update(resource_dict) + return qa.submit_job(**submit_kwargs) diff --git a/executorlib/standalone/cache/spawner.py b/executorlib/standalone/cache/spawner.py index b4da5ae3..3a3b3184 100644 --- a/executorlib/standalone/cache/spawner.py +++ b/executorlib/standalone/cache/spawner.py @@ -7,6 +7,8 @@ def execute_in_subprocess( command: list, task_dependent_lst: list = [], resource_dict: Optional[dict] = None, + config_directory: Optional[str] = None, + backend: Optional[str] = None, ) -> subprocess.Popen: """ Execute a command in a subprocess. @@ -18,6 +20,8 @@ def execute_in_subprocess( Example resource dictionary: { cwd: None, } + config_directory (str, optional): path to the config directory. + backend (str, optional): name of the backend used to spawn tasks. Returns: subprocess.Popen: The subprocess object. @@ -27,6 +31,12 @@ def execute_in_subprocess( task_dependent_lst = [ task for task in task_dependent_lst if task.poll() is None ] + if config_directory is not None: + raise ValueError( + "config_directory parameter is not supported for subprocess spawner." + ) + if backend is not None: + raise ValueError("backend parameter is not supported for subprocess spawner.") if resource_dict is None: resource_dict = {"cwd": None} elif len(resource_dict) == 0: diff --git a/pyproject.toml b/pyproject.toml index 5fff829f..5a0563c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,15 +36,14 @@ Repository = "https://github.com/pyiron/executorlib" [project.optional-dependencies] mpi = ["mpi4py==4.0.1"] -hdf = [ - "h5py==3.12.1", -] +hdf = ["h5py==3.12.1"] graph = [ "pygraphviz==1.14", "matplotlib==3.9.2", "networkx==3.4.2", "ipython==8.29.0", ] +queue = ["pysqa==0.2.0"] [tool.setuptools.packages.find] include = ["executorlib*"] diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py new file mode 100644 index 00000000..eb71c794 --- /dev/null +++ b/tests/test_cache_executor_pysqa_flux.py @@ -0,0 +1,46 @@ +import os +import importlib +import unittest +import shutil + +try: + import flux.job + from executorlib import FileExecutor + from executorlib.standalone.cache.queue import execute_with_pysqa + + skip_flux_test = "FLUX_URI" not in os.environ + pmi = os.environ.get("PYMPIPOOL_PMIX", None) +except ImportError: + skip_flux_test = True + + +skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None + + +def mpi_funct(i): + from mpi4py import MPI + + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return i, size, rank + + +@unittest.skipIf( + skip_flux_test or skip_mpi4py_test, + "h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.", +) +class TestCacheExecutorPysqa(unittest.TestCase): + def test_executor(self): + with FileExecutor( + cores_per_worker=2, + execute_function=execute_with_pysqa, + backend="flux", + ) as exe: + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertTrue(fs1.done()) + + def tearDown(self): + if os.path.exists("cache"): + shutil.rmtree("cache")