diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index be8fa27c..7a40ba48 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -358,28 +358,48 @@ def __init__( if not plot_dependency_graph: import pysqa # noqa - from executorlib.task_scheduler.file.task_scheduler import ( - create_file_executor, - ) + if block_allocation: + from executorlib.task_scheduler.interactive.spawner_pysqa import ( + create_pysqa_block_allocation_scheduler, + ) - super().__init__( - executor=create_file_executor( - max_workers=max_workers, - backend="flux", - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - flux_executor=None, - pmi_mode=pmi_mode, - flux_executor_nesting=False, - flux_log_files=False, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, + super().__init__( + executor=create_pysqa_block_allocation_scheduler( + max_cores=max_cores, + cache_directory=cache_directory, + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + pmi_mode=pmi_mode, + init_function=init_function, + max_workers=max_workers, + resource_dict=resource_dict, + pysqa_config_directory=pysqa_config_directory, + backend="flux", + ) + ) + else: + from executorlib.task_scheduler.file.task_scheduler import ( + create_file_executor, + ) + + super().__init__( + executor=create_file_executor( + max_workers=max_workers, + backend="flux", + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=None, + pmi_mode=pmi_mode, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + disable_dependencies=disable_dependencies, + ) ) - ) else: super().__init__( executor=DependencyTaskScheduler( diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index e8244d1b..97b27c49 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -166,28 +166,49 @@ def __init__( if not plot_dependency_graph: import pysqa # noqa - from executorlib.task_scheduler.file.task_scheduler import ( - create_file_executor, - ) + if block_allocation: + from executorlib.task_scheduler.interactive.spawner_pysqa import ( + create_pysqa_block_allocation_scheduler, + ) - super().__init__( - executor=create_file_executor( - max_workers=max_workers, - backend="slurm", - max_cores=max_cores, - cache_directory=cache_directory, - resource_dict=resource_dict, - pmi_mode=pmi_mode, - flux_executor=None, - flux_executor_nesting=False, - flux_log_files=False, - pysqa_config_directory=pysqa_config_directory, - hostname_localhost=hostname_localhost, - block_allocation=block_allocation, - init_function=init_function, - disable_dependencies=disable_dependencies, + super().__init__( + executor=create_pysqa_block_allocation_scheduler( + max_cores=max_cores, + cache_directory=cache_directory, + hostname_localhost=hostname_localhost, + log_obj_size=log_obj_size, + pmi_mode=pmi_mode, + init_function=init_function, + max_workers=max_workers, + resource_dict=resource_dict, + pysqa_config_directory=pysqa_config_directory, + backend="slurm", + ), + ) + + else: + from executorlib.task_scheduler.file.task_scheduler import ( + create_file_executor, + ) + + super().__init__( + executor=create_file_executor( + max_workers=max_workers, + backend="slurm", + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + pmi_mode=pmi_mode, + flux_executor=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=pysqa_config_directory, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + disable_dependencies=disable_dependencies, + ) ) - ) else: super().__init__( executor=DependencyTaskScheduler( diff --git a/executorlib/task_scheduler/interactive/spawner_pysqa.py b/executorlib/task_scheduler/interactive/spawner_pysqa.py new file mode 100644 index 00000000..b91178be --- /dev/null +++ b/executorlib/task_scheduler/interactive/spawner_pysqa.py @@ -0,0 +1,248 @@ +import hashlib +import os +from time import sleep +from typing import Callable, Optional + +from pysqa import QueueAdapter + +from executorlib.standalone.inputcheck import validate_number_of_cores +from executorlib.standalone.interactive.spawner import BaseSpawner +from executorlib.standalone.scheduler import pysqa_execute_command, terminate_with_pysqa +from executorlib.task_scheduler.interactive.blockallocation import ( + BlockAllocationTaskScheduler, +) + + +class PysqaSpawner(BaseSpawner): + def __init__( + self, + cwd: Optional[str] = None, + cores: int = 1, + threads_per_core: int = 1, + gpus_per_core: int = 0, + num_nodes: Optional[int] = None, + exclusive: bool = False, + openmpi_oversubscribe: bool = False, + slurm_cmd_args: Optional[list[str]] = None, + pmi_mode: Optional[str] = None, + config_directory: Optional[str] = None, + backend: Optional[str] = None, + **kwargs, + ): + """ + Subprocess interface implementation. + + Args: + cwd (str, optional): The current working directory. Defaults to None. + cores (int): The number of cores to use. Defaults to 1. + threads_per_core (int): The number of threads per core. Defaults to 1. + gpus_per_core (int): number of GPUs per worker - defaults to 0 + num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None. + exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults + to False. + openmpi_oversubscribe (bool): Whether to oversubscribe the cores. Defaults to False. + slurm_cmd_args (list, optional): Additional command line arguments for the srun call (SLURM only) + pmi_mode (str, optional): PMI interface to use (OpenMPI v5 requires pmix) default is None + config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). + backend (str): name of the backend used to spawn tasks. + """ + super().__init__( + cwd=cwd, + cores=cores, + openmpi_oversubscribe=openmpi_oversubscribe, + ) + self._threads_per_core = threads_per_core + self._gpus_per_core = gpus_per_core + self._num_nodes = num_nodes + self._exclusive = exclusive + self._slurm_cmd_args = slurm_cmd_args + self._pmi_mode = pmi_mode + self._config_directory = config_directory + self._backend = backend + self._pysqa_submission_kwargs = kwargs + self._process: Optional[int] = None + self._queue_adapter: Optional[QueueAdapter] = None + + def bootup( + self, + command_lst: list[str], + stop_function: Optional[Callable] = None, + ): + """ + Method to start the subprocess interface. + + Args: + command_lst (list[str]): The command list to execute. + stop_function (Callable): Function to stop the interface. + + Returns: + bool: Whether the interface was successfully started. + """ + self._queue_adapter = QueueAdapter( + directory=self._config_directory, + queue_type=self._backend, + execute_command=pysqa_execute_command, + ) + self._process = self._start_process_helper( + command_lst=command_lst, + queue_adapter=self._queue_adapter, + ) + while True: + if self._check_process_helper(command_lst=command_lst): + return True + elif stop_function is not None and stop_function(): + self.shutdown(wait=True) + return False + else: + sleep(1) # Wait for the process to start + + def generate_command(self, command_lst: list[str]) -> list[str]: + """ + Method to generate the command list. + + Args: + command_lst (list[str]): The command list. + + Returns: + list[str]: The generated command list. + """ + if self._cores > 1 and self._backend == "slurm": + command_prepend = ["srun", "-n", str(self._cores)] + if self._pmi_mode is not None: + command_prepend += ["--mpi=" + self._pmi_mode] + if self._num_nodes is not None: + command_prepend += ["-N", str(self._num_nodes)] + if self._threads_per_core > 1: + command_prepend += ["--cpus-per-task=" + str(self._threads_per_core)] + if self._gpus_per_core > 0: + command_prepend += ["--gpus-per-task=" + str(self._gpus_per_core)] + if self._exclusive: + command_prepend += ["--exact"] + if self._openmpi_oversubscribe: + command_prepend += ["--oversubscribe"] + if self._slurm_cmd_args is not None and len(self._slurm_cmd_args) > 0: + command_prepend += self._slurm_cmd_args + elif self._cores > 1 and self._backend == "flux": + command_prepend = ["flux", "run", "-n", str(self._cores)] + if self._pmi_mode is not None: + command_prepend += ["-o", "pmi=" + self._pmi_mode] + if self._num_nodes is not None: + raise ValueError() + if self._threads_per_core > 1: + raise ValueError() + if self._gpus_per_core > 0: + raise ValueError() + if self._exclusive: + raise ValueError() + if self._openmpi_oversubscribe: + raise ValueError() + elif self._cores > 1: + raise ValueError( + f"backend should be None, slurm or flux, not {self._backend}" + ) + else: + command_prepend = [] + return command_prepend + command_lst + + def shutdown(self, wait: bool = True): + """ + Method to shutdown the subprocess interface. + + Args: + wait (bool, optional): Whether to wait for the interface to shutdown. Defaults to True. + """ + if self._process is not None: + terminate_with_pysqa( + queue_id=self._process, + config_directory=self._config_directory, + backend=self._backend, + ) + self._process = None + + def poll(self) -> bool: + """ + Method to check if the subprocess interface is running. + + Returns: + bool: True if the interface is running, False otherwise. + """ + if self._process is not None and self._queue_adapter is not None: + status = self._queue_adapter.get_status_of_job(process_id=self._process) + return status in ["running", "pending"] + else: + return False + + def _start_process_helper( + self, command_lst: list[str], queue_adapter: QueueAdapter + ) -> int: + hash = hashlib.md5(str(self).encode()).hexdigest() + if self._cwd is not None: + working_directory = os.path.join(self._cwd, hash) + else: + working_directory = os.path.abspath(hash) + return queue_adapter.submit_job( + command=" ".join(self.generate_command(command_lst=command_lst)), + working_directory=working_directory, + cores=int(self._cores * self._threads_per_core), + **self._pysqa_submission_kwargs, + ) + + def _check_process_helper(self, command_lst: list[str]) -> bool: + if self._queue_adapter is not None: + status = self._queue_adapter.get_status_of_job(process_id=self._process) + else: + status = None + if status == "running": + return True + elif status is None: + raise RuntimeError( + f"Failed to start the process with command: {command_lst}" + ) + elif status == "error": + self._process = self._start_process_helper( + command_lst=command_lst, queue_adapter=self._queue_adapter + ) + return False + + def __del__(self): + self.shutdown(wait=True) + + +def create_pysqa_block_allocation_scheduler( + max_cores: Optional[int] = None, + cache_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + log_obj_size: bool = False, + pmi_mode: Optional[str] = None, + init_function: Optional[Callable] = None, + max_workers: Optional[int] = None, + resource_dict: Optional[dict] = None, + pysqa_config_directory: Optional[str] = None, + backend: Optional[str] = None, +): + if resource_dict is None: + resource_dict = {} + cores_per_worker = resource_dict.get("cores", 1) + if "cwd" in resource_dict and resource_dict["cwd"] is not None: + resource_dict["cwd"] = os.path.abspath(resource_dict["cwd"]) + if cache_directory is not None: + resource_dict["cache_directory"] = os.path.abspath(cache_directory) + else: + resource_dict["cache_directory"] = os.path.abspath(".") + resource_dict["hostname_localhost"] = hostname_localhost + resource_dict["log_obj_size"] = log_obj_size + resource_dict["pmi_mode"] = pmi_mode + resource_dict["init_function"] = init_function + resource_dict["config_directory"] = pysqa_config_directory + resource_dict["backend"] = backend + max_workers = validate_number_of_cores( + max_cores=max_cores, + max_workers=max_workers, + cores_per_worker=cores_per_worker, + set_local_cores=False, + ) + return BlockAllocationTaskScheduler( + max_workers=max_workers, + executor_kwargs=resource_dict, + spawner=PysqaSpawner, + ) diff --git a/tests/test_fluxclusterexecutor.py b/tests/test_fluxclusterexecutor.py index a1ff411b..04680c34 100644 --- a/tests/test_fluxclusterexecutor.py +++ b/tests/test_fluxclusterexecutor.py @@ -14,6 +14,7 @@ from executorlib.standalone.hdf import dump from executorlib.task_scheduler.file.spawner_pysqa import execute_with_pysqa from executorlib.standalone.scheduler import terminate_with_pysqa + from executorlib.task_scheduler.interactive.spawner_pysqa import PysqaSpawner skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("EXECUTORLIB_PMIX", None) @@ -24,6 +25,11 @@ skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None +def echo(i): + sleep(1) + return i + + def mpi_funct(i): from mpi4py import MPI @@ -32,6 +38,10 @@ def mpi_funct(i): return i, size, rank +def stop_function(): + return True + + @unittest.skipIf( skip_flux_test or skip_mpi4py_test, "h5py or mpi4py or flux are not installed, so the h5py, flux and mpi4py tests are skipped.", @@ -51,6 +61,40 @@ def test_executor(self): self.assertEqual(len(os.listdir("executorlib_cache")), 4) self.assertTrue(fs1.done()) + def test_executor_blockallocation(self): + with FluxClusterExecutor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + block_allocation=True, + cache_directory="executorlib_cache", + pmi_mode=pmi, + max_workers=1, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(mpi_funct, 1) + self.assertFalse(fs1.done()) + self.assertEqual(fs1.result(), [(1, 2, 0), (1, 2, 1)]) + self.assertEqual(len(os.listdir("executorlib_cache")), 2) + self.assertTrue(fs1.done()) + + def test_executor_blockallocation_echo(self): + with FluxClusterExecutor( + resource_dict={"cores": 1, "cwd": "executorlib_cache"}, + block_allocation=True, + cache_directory="executorlib_cache", + pmi_mode=pmi, + max_workers=2, + ) as exe: + cloudpickle_register(ind=1) + fs1 = exe.submit(echo, 1) + fs2 = exe.submit(echo, 2) + self.assertFalse(fs1.done()) + self.assertFalse(fs2.done()) + self.assertEqual(fs1.result(), 1) + self.assertEqual(fs2.result(), 2) + self.assertEqual(len(os.listdir("executorlib_cache")), 4) + self.assertTrue(fs1.done()) + self.assertTrue(fs2.done()) + def test_executor_no_cwd(self): with FluxClusterExecutor( resource_dict={"cores": 2}, @@ -122,3 +166,24 @@ def test_terminate_tasks_in_cache(self): def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) + + +@unittest.skipIf( + skip_flux_test, + "flux is not installed, so the flux tests are skipped.", +) +class TestPysqaSpawner(unittest.TestCase): + def test_pysqa_spawner_sleep(self): + interface_flux = PysqaSpawner(backend="flux", cores=1) + self.assertTrue(interface_flux.bootup(command_lst=["sleep", "1"])) + self.assertTrue(interface_flux._check_process_helper(command_lst=[])) + self.assertTrue(interface_flux.poll()) + process_id = interface_flux._process + interface_flux.shutdown(wait=True) + interface_flux._process = process_id + self.assertFalse(interface_flux.poll()) + self.assertFalse(interface_flux._check_process_helper(command_lst=["sleep", "1"])) + + def test_pysqa_spawner_big(self): + interface_flux = PysqaSpawner(backend="flux", cores=100) + self.assertFalse(interface_flux.bootup(command_lst=["sleep", "1"], stop_function=stop_function)) diff --git a/tests/test_slurmclusterexecutor.py b/tests/test_slurmclusterexecutor.py index a26524e7..41057119 100644 --- a/tests/test_slurmclusterexecutor.py +++ b/tests/test_slurmclusterexecutor.py @@ -20,6 +20,13 @@ except ImportError: skip_h5py_test = True +try: + import pysqa + + skip_pysqa_test = False +except ImportError: + skip_pysqa_test = True + submission_template = """\ #!/bin/bash #SBATCH --output=time.out @@ -108,3 +115,13 @@ def test_executor_existing_files(self): def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) + + +@unittest.skipIf(skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped.") +class TestSlurmClusterInit(unittest.TestCase): + def test_slurm_cluster_block_allocation(self): + with self.assertRaises(ValueError): + SlurmClusterExecutor(block_allocation=True) + + def test_slurm_cluster_file(self): + self.assertTrue(SlurmClusterExecutor(block_allocation=False)) \ No newline at end of file diff --git a/tests/test_standalone_interactive_backend.py b/tests/test_standalone_interactive_backend.py index f4b46e30..ed3745e3 100644 --- a/tests/test_standalone_interactive_backend.py +++ b/tests/test_standalone_interactive_backend.py @@ -6,6 +6,13 @@ from executorlib.standalone.interactive.spawner import MpiExecSpawner from executorlib.task_scheduler.interactive.spawner_slurm import SrunSpawner +try: + from executorlib.task_scheduler.interactive.spawner_pysqa import PysqaSpawner, create_pysqa_block_allocation_scheduler + + skip_pysqa_test = False +except ImportError: + skip_pysqa_test = True + class TestParser(unittest.TestCase): def test_command_local(self): @@ -121,3 +128,49 @@ def test_command_slurm_user_command(self): ), ) self.assertEqual(result_dict, parse_arguments(command_lst)) + + @unittest.skipIf(skip_pysqa_test, "pysqa is not installed, so the pysqa tests are skipped.") + def test_command_pysqa(self): + interface_slurm = PysqaSpawner(backend="slurm", cores=2, pmi_mode="pmix", num_nodes=2, threads_per_core=2, gpus_per_core=1, exclusive=True, openmpi_oversubscribe=True, slurm_cmd_args=["test"]) + output = ['srun', '-n', '2', '--mpi=pmix', '-N', '2', '--cpus-per-task=2', '--gpus-per-task=1', '--exact', '--oversubscribe', 'test'] + self.assertEqual(interface_slurm.generate_command(command_lst=[]), output) + + with self.assertRaises(FileNotFoundError): + interface_slurm.bootup(command_lst=["sleep", "1"]) + + interface_flux = PysqaSpawner(backend="flux", cores=2, pmi_mode="pmix") + output = ['flux', 'run', '-n', '2', '-o', 'pmi=pmix'] + self.assertEqual(interface_flux.generate_command(command_lst=[]), output) + + interface_flux = PysqaSpawner(backend="flux", cores=2, pmi_mode="pmix", num_nodes=2) + with self.assertRaises(ValueError): + interface_flux.generate_command(command_lst=[]) + + interface_flux = PysqaSpawner(backend="flux", cores=2, pmi_mode="pmix", threads_per_core=2) + with self.assertRaises(ValueError): + interface_flux.generate_command(command_lst=[]) + + interface_flux = PysqaSpawner(backend="flux", cores=2, pmi_mode="pmix", gpus_per_core=1) + with self.assertRaises(ValueError): + interface_flux.generate_command(command_lst=[]) + + interface_flux = PysqaSpawner(backend="flux", cores=2, pmi_mode="pmix", exclusive=True) + with self.assertRaises(ValueError): + interface_flux.generate_command(command_lst=[]) + + interface_flux = PysqaSpawner(backend="flux", cores=2, pmi_mode="pmix", openmpi_oversubscribe=True) + with self.assertRaises(ValueError): + interface_flux.generate_command(command_lst=[]) + + interface_nobackend = PysqaSpawner(cores=2) + with self.assertRaises(ValueError): + interface_nobackend.generate_command(command_lst=[]) + + with self.assertRaises(RuntimeError): + interface_nobackend._check_process_helper(command_lst=[]) + + with self.assertRaises(ValueError): + create_pysqa_block_allocation_scheduler() + + with self.assertRaises(ValueError): + create_pysqa_block_allocation_scheduler(resource_dict={"cwd": "."}) \ No newline at end of file