diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 3e95579f..54b2f6c3 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -13,14 +13,6 @@ __all__ = [] -try: - from executorlib.cache.executor import FileExecutor - - __all__ += [FileExecutor] -except ImportError: - pass - - class Executor: """ The executorlib.Executor leverages either the message passing interface (MPI), the SLURM workload manager or @@ -47,6 +39,7 @@ class Executor: flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And @@ -95,6 +88,7 @@ def __init__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = True, init_function: Optional[callable] = None, @@ -115,6 +109,7 @@ def __new__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = True, init_function: Optional[callable] = None, @@ -162,6 +157,7 @@ def __new__( of the individual function. init_function (None): optional function to preset arguments for functions which are submitted later disable_dependencies (boolean): Disable resolving future objects during the submission. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For debugging purposes and to get an overview of the specified dependencies. @@ -180,7 +176,28 @@ def __new__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - if not disable_dependencies: + if "pysqa_" in backend and not plot_dependency_graph: + from executorlib.cache.executor import create_file_executor + + return create_file_executor( + max_workers=max_workers, + backend=backend, + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=flux_executor, + flux_executor_pmi_mode=flux_executor_pmi_mode, + flux_executor_nesting=flux_executor_nesting, + pysqa_config_directory=pysqa_config_directory, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + ) + elif not disable_dependencies: + if pysqa_config_directory is not None: + raise ValueError( + "The pysqa_config_directory is only required for the pysqa backend." + ) return ExecutorWithDependencies( max_workers=max_workers, backend=backend, @@ -199,6 +216,10 @@ def __new__( else: _check_plot_dependency_graph(plot_dependency_graph=plot_dependency_graph) _check_refresh_rate(refresh_rate=refresh_rate) + if pysqa_config_directory is not None: + raise ValueError( + "The pysqa_config_directory is only required for the pysqa backend." + ) return create_executor( max_workers=max_workers, backend=backend, diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index 0195a14e..c389c5be 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -7,6 +7,14 @@ execute_in_subprocess, terminate_subprocess, ) +from executorlib.standalone.inputcheck import ( + check_command_line_argument_lst, + check_executor, + check_gpus_per_worker, + check_nested_flux_executor, + check_oversubscribe, + check_threads_per_core, +) from executorlib.standalone.thread import RaisingThread try: @@ -20,10 +28,10 @@ class FileExecutor(ExecutorBase): def __init__( self, cache_directory: str = "cache", - resource_dict: Optional[dict] = None, + resource_dict: dict = {}, execute_function: callable = execute_with_pysqa, terminate_function: Optional[callable] = None, - config_directory: Optional[str] = None, + pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ): """ @@ -36,19 +44,24 @@ def __init__( - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. terminate_function (callable, optional): The function to terminate the tasks. - config_directory (str, optional): path to the config directory. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. """ super().__init__() - default_resource_dict = { - "cores": 1, - "cwd": None, - } - if resource_dict is None: - resource_dict = {} - resource_dict.update( - {k: v for k, v in default_resource_dict.items() if k not in resource_dict} - ) + if "openmpi_oversubscribe" in resource_dict: + check_oversubscribe(oversubscribe=resource_dict["openmpi_oversubscribe"]) + del resource_dict["openmpi_oversubscribe"] + if "slurm_cmd_args" in resource_dict: + check_command_line_argument_lst( + command_line_argument_lst=resource_dict["slurm_cmd_args"] + ) + del resource_dict["slurm_cmd_args"] + if "threads_per_core" in resource_dict: + check_threads_per_core(threads_per_core=resource_dict["threads_per_core"]) + del resource_dict["threads_per_core"] + if "gpus_per_core" in resource_dict: + check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) + del resource_dict["gpus_per_core"] if execute_function == execute_in_subprocess and terminate_function is None: terminate_function = terminate_subprocess cache_directory_path = os.path.abspath(cache_directory) @@ -62,8 +75,58 @@ def __init__( "cache_directory": cache_directory_path, "resource_dict": resource_dict, "terminate_function": terminate_function, - "config_directory": config_directory, + "pysqa_config_directory": pysqa_config_directory, "backend": backend, }, ) ) + + +def create_file_executor( + max_workers: int = 1, + backend: str = "pysqa_flux", + max_cores: int = 1, + cache_directory: Optional[str] = None, + resource_dict: Optional[dict] = None, + flux_executor=None, + flux_executor_pmi_mode: Optional[str] = None, + flux_executor_nesting: bool = False, + pysqa_config_directory: Optional[str] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[callable] = None, +): + if cache_directory is None: + cache_directory = "executorlib_cache" + if max_workers != 1: + raise ValueError( + "The number of workers cannot be controlled with the pysqa based backend." + ) + if max_cores != 1: + raise ValueError( + "The number of cores cannot be controlled with the pysqa based backend." + ) + if hostname_localhost is not None: + raise ValueError( + "The option to connect to hosts based on their hostname is not available with the pysqa based backend." + ) + if block_allocation: + raise ValueError( + "The option block_allocation is not available with the pysqa based backend." + ) + if init_function is not None: + raise ValueError( + "The option to specify an init_function is not available with the pysqa based backend." + ) + if flux_executor_pmi_mode is not None: + raise ValueError( + "The option to specify the flux pmi mode is not available with the pysqa based backend." + ) + check_executor(executor=flux_executor) + check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + return FileExecutor( + cache_directory=cache_directory, + resource_dict=resource_dict, + pysqa_config_directory=pysqa_config_directory, + backend=backend.split("pysqa_")[-1], + ) diff --git a/executorlib/cache/shared.py b/executorlib/cache/shared.py index dd09542b..22177b32 100644 --- a/executorlib/cache/shared.py +++ b/executorlib/cache/shared.py @@ -52,7 +52,7 @@ def execute_tasks_h5( execute_function: callable, resource_dict: dict, terminate_function: Optional[callable] = None, - config_directory: Optional[str] = None, + pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, ) -> None: """ @@ -66,7 +66,7 @@ def execute_tasks_h5( - cwd (str/None): current working directory where the parallel python task is executed execute_function (callable): The function to execute the tasks. terminate_function (callable): The function to terminate the tasks. - config_directory (str, optional): path to the config directory. + pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. Returns: @@ -120,7 +120,7 @@ def execute_tasks_h5( process_dict[k] for k in future_wait_key_lst ], resource_dict=task_resource_dict, - config_directory=config_directory, + config_directory=pysqa_config_directory, backend=backend, ) file_name_dict[task_key] = os.path.join( diff --git a/tests/test_cache_executor_mpi.py b/tests/test_cache_executor_mpi.py index bc5e1226..becba0f2 100644 --- a/tests/test_cache_executor_mpi.py +++ b/tests/test_cache_executor_mpi.py @@ -32,7 +32,15 @@ def mpi_funct(i): class TestCacheExecutorMPI(unittest.TestCase): def test_executor(self): with FileExecutor( - resource_dict={"cores": 2}, execute_function=execute_in_subprocess + resource_dict={ + "cores": 2, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + }, + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) diff --git a/tests/test_cache_executor_pysqa_flux.py b/tests/test_cache_executor_pysqa_flux.py index 545f9f25..cd4d6b70 100644 --- a/tests/test_cache_executor_pysqa_flux.py +++ b/tests/test_cache_executor_pysqa_flux.py @@ -5,7 +5,7 @@ try: import flux.job - from executorlib import FileExecutor + from executorlib import Executor skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -30,9 +30,11 @@ def mpi_funct(i): ) class TestCacheExecutorPysqa(unittest.TestCase): def test_executor(self): - with FileExecutor( - resource_dict={"cores": 2}, - backend="flux", + with Executor( + resource_dict={"cores": 2, "cwd": "executorlib_cache"}, + backend="pysqa_flux", + cache_directory="executorlib_cache", + block_allocation=False, ) as exe: fs1 = exe.submit(mpi_funct, 1) self.assertFalse(fs1.done()) @@ -40,5 +42,5 @@ def test_executor(self): self.assertTrue(fs1.done()) def tearDown(self): - if os.path.exists("cache"): - shutil.rmtree("cache") + if os.path.exists("executorlib_cache"): + shutil.rmtree("executorlib_cache") diff --git a/tests/test_cache_executor_serial.py b/tests/test_cache_executor_serial.py index 96aa2df0..24081bf7 100644 --- a/tests/test_cache_executor_serial.py +++ b/tests/test_cache_executor_serial.py @@ -11,7 +11,7 @@ from executorlib.standalone.thread import RaisingThread try: - from executorlib import FileExecutor + from executorlib.cache.executor import FileExecutor from executorlib.cache.shared import execute_tasks_h5 skip_h5py_test = False @@ -32,14 +32,26 @@ def list_files_in_working_directory(): ) class TestCacheExecutorSerial(unittest.TestCase): def test_executor_mixed(self): - with FileExecutor(execute_function=execute_in_subprocess) as exe: + with FileExecutor( + execute_function=execute_in_subprocess, + resource_dict={ + "cores": 1, + "cwd": None, + }, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2) self.assertFalse(fs1.done()) self.assertEqual(fs1.result(), 3) self.assertTrue(fs1.done()) def test_executor_dependence_mixed(self): - with FileExecutor(execute_function=execute_in_subprocess) as exe: + with FileExecutor( + execute_function=execute_in_subprocess, + resource_dict={ + "cores": 1, + "cwd": None, + }, + ) as exe: fs1 = exe.submit(my_funct, 1, b=2) fs2 = exe.submit(my_funct, 1, b=fs1) self.assertFalse(fs2.done()) @@ -49,7 +61,11 @@ def test_executor_dependence_mixed(self): def test_executor_working_directory(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileExecutor( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + resource_dict={ + "cores": 1, + "cwd": cwd, + }, + execute_function=execute_in_subprocess, ) as exe: fs1 = exe.submit(list_files_in_working_directory) self.assertEqual(fs1.result(), os.listdir(cwd)) @@ -74,7 +90,10 @@ def test_executor_function(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": { + "cores": 1, + "cwd": None, + }, "terminate_function": terminate_subprocess, }, ) @@ -115,7 +134,10 @@ def test_executor_function_dependence_kwargs(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": { + "cores": 1, + "cwd": None, + }, "terminate_function": terminate_subprocess, }, ) @@ -156,7 +178,10 @@ def test_executor_function_dependence_args(self): "future_queue": q, "cache_directory": cache_dir, "execute_function": execute_in_subprocess, - "resource_dict": {"cores": 1, "cwd": None}, + "resource_dict": { + "cores": 1, + "cwd": None, + }, "terminate_function": terminate_subprocess, }, )