diff --git a/pympipool/scheduler/__init__.py b/pympipool/scheduler/__init__.py index 4492553d..1c8215c5 100644 --- a/pympipool/scheduler/__init__.py +++ b/pympipool/scheduler/__init__.py @@ -71,6 +71,7 @@ def create_executor( gpus_per_worker (int): number of GPUs per worker - defaults to 0 oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI and SLURM only) - default False cwd (str/None): current working directory where the parallel python task is executed + executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle @@ -94,56 +95,47 @@ def create_executor( backend=backend, flux_installed=flux_installed, slurm_installed=slurm_installed ) check_pmi(backend=backend, pmi=pmi) + executor_kwargs = { + "cores": cores_per_worker, + "hostname_localhost": hostname_localhost, + "cwd": cwd, + } if backend == "flux": check_oversubscribe(oversubscribe=oversubscribe) check_command_line_argument_lst( command_line_argument_lst=command_line_argument_lst ) + executor_kwargs["threads_per_core"] = threads_per_core + executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) + executor_kwargs["executor"] = executor + executor_kwargs["pmi"] = pmi if block_allocation: + executor_kwargs["init_function"] = init_function return PyFluxExecutor( max_workers=int(max_cores / cores_per_worker), - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - init_function=init_function, - cwd=cwd, - executor=executor, - hostname_localhost=hostname_localhost, - pmi=pmi, + executor_kwargs=executor_kwargs, ) else: return PyFluxStepExecutor( max_cores=max_cores, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - cwd=cwd, - executor=executor, - hostname_localhost=hostname_localhost, - pmi=pmi, + executor_kwargs=executor_kwargs, ) elif backend == "slurm": check_executor(executor=executor) + executor_kwargs["threads_per_core"] = threads_per_core + executor_kwargs["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) + executor_kwargs["command_line_argument_lst"] = command_line_argument_lst + executor_kwargs["oversubscribe"] = oversubscribe if block_allocation: + executor_kwargs["init_function"] = init_function return PySlurmExecutor( max_workers=int(max_cores / cores_per_worker), - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - oversubscribe=oversubscribe, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, + executor_kwargs=executor_kwargs, ) else: return PySlurmStepExecutor( max_cores=max_cores, - cores_per_worker=cores_per_worker, - threads_per_core=threads_per_core, - gpus_per_worker=gpus_per_worker, - oversubscribe=oversubscribe, - cwd=cwd, - hostname_localhost=hostname_localhost, + executor_kwargs=executor_kwargs, ) else: # backend="local" check_threads_per_core(threads_per_core=threads_per_core) @@ -152,18 +144,15 @@ def create_executor( command_line_argument_lst=command_line_argument_lst ) check_executor(executor=executor) + executor_kwargs["oversubscribe"] = oversubscribe if block_allocation: + executor_kwargs["init_function"] = init_function return PyLocalExecutor( max_workers=int(max_cores / cores_per_worker), - cores_per_worker=cores_per_worker, - init_function=init_function, - cwd=cwd, - hostname_localhost=hostname_localhost, + executor_kwargs=executor_kwargs, ) else: return PyLocalStepExecutor( max_cores=max_cores, - cores_per_worker=cores_per_worker, - cwd=cwd, - hostname_localhost=hostname_localhost, + executor_kwargs=executor_kwargs, ) diff --git a/pympipool/scheduler/flux.py b/pympipool/scheduler/flux.py index d032b5cf..dc8403a7 100644 --- a/pympipool/scheduler/flux.py +++ b/pympipool/scheduler/flux.py @@ -22,20 +22,7 @@ class PyFluxExecutor(ExecutorBroker): Args: max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true + executor_kwargs (dict): keyword arguments for the executor Examples: @@ -51,7 +38,7 @@ class PyFluxExecutor(ExecutorBroker): >>> def init_k(): >>> return {"k": 3} >>> - >>> with PyFluxExecutor(max_workers=2, init_function=init_k) as p: + >>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] @@ -61,34 +48,16 @@ class PyFluxExecutor(ExecutorBroker): def __init__( self, max_workers: int = 1, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - init_function: Optional[callable] = None, - cwd: Optional[str] = None, - executor: Optional[flux.job.FluxExecutor] = None, - pmi: Optional[str] = None, - hostname_localhost: Optional[bool] = False, + executor_kwargs: dict = {}, ): super().__init__() + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["interface_class"] = FluxPythonInterface self._set_process( process=[ RaisingThread( target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores_per_worker, - "interface_class": FluxPythonInterface, - "hostname_localhost": hostname_localhost, - "init_function": init_function, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": int(gpus_per_worker / cores_per_worker), - "cwd": cwd, - "executor": executor, - "pmi": pmi, - }, + kwargs=executor_kwargs, ) for _ in range(max_workers) ], @@ -104,19 +73,7 @@ class PyFluxStepExecutor(ExecutorSteps): Args: max_cores (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed - executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux - pmi (str): PMI interface to use (OpenMPI v5 requires pmix) default is None - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true + executor_kwargs (dict): keyword arguments for the executor Examples: @@ -140,32 +97,16 @@ class PyFluxStepExecutor(ExecutorSteps): def __init__( self, max_cores: int = 1, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - cwd: Optional[str] = None, - executor: Optional[flux.job.FluxExecutor] = None, - pmi: Optional[str] = None, - hostname_localhost: Optional[bool] = False, + executor_kwargs: dict = {}, ): super().__init__() + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["interface_class"] = FluxPythonInterface + executor_kwargs["max_cores"] = max_cores self._set_process( RaisingThread( target=execute_separate_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores_per_worker, - "interface_class": FluxPythonInterface, - "max_cores": max_cores, - "hostname_localhost": hostname_localhost, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": int(gpus_per_worker / cores_per_worker), - "cwd": cwd, - "executor": executor, - "pmi": pmi, - }, + kwargs=executor_kwargs, ) ) diff --git a/pympipool/scheduler/local.py b/pympipool/scheduler/local.py index 1e38a2aa..2de08953 100644 --- a/pympipool/scheduler/local.py +++ b/pympipool/scheduler/local.py @@ -1,5 +1,3 @@ -from typing import Optional - from pympipool.shared.executorbase import ( execute_parallel_tasks, execute_separate_tasks, @@ -12,7 +10,7 @@ class PyLocalExecutor(ExecutorBroker): """ - The pympipool.mpi.PyMPIExecutor leverages the message passing interface MPI to distribute python tasks on a + The pympipool.mpi.PyLocalExecutor leverages the message passing interface MPI to distribute python tasks on a workstation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyLocalExecutor can be executed in a serial python process and does not require the python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improve the usability in particular when used in combination with @@ -20,17 +18,7 @@ class PyLocalExecutor(ExecutorBroker): Args: max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true + executor_kwargs (dict): keyword arguments for the executor Examples: @@ -46,38 +34,22 @@ class PyLocalExecutor(ExecutorBroker): >>> def init_k(): >>> return {"k": 3} >>> - >>> with PyLocalExecutor(max_workers=2, init_function=init_k) as p: + >>> with PyLocalExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] """ - def __init__( - self, - max_workers: int = 1, - cores_per_worker: int = 1, - oversubscribe: bool = False, - init_function: Optional[callable] = None, - cwd: Optional[str] = None, - hostname_localhost: bool = False, - ): + def __init__(self, max_workers: int = 1, executor_kwargs: dict = {}): super().__init__() + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["interface_class"] = MpiExecInterface self._set_process( process=[ RaisingThread( target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores_per_worker, - "interface_class": MpiExecInterface, - "hostname_localhost": hostname_localhost, - "init_function": init_function, - # Interface Arguments - "cwd": cwd, - "oversubscribe": oversubscribe, - }, + kwargs=executor_kwargs, ) for _ in range(max_workers) ], @@ -94,16 +66,7 @@ class PyLocalStepExecutor(ExecutorSteps): Args: max_cores (int): defines the number cores which can be used in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - cwd (str/None): current working directory where the parallel python task is executed - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true + executor_kwargs (dict): keyword arguments for the executor Examples: @@ -116,7 +79,7 @@ class PyLocalStepExecutor(ExecutorSteps): >>> rank = MPI.COMM_WORLD.Get_rank() >>> return np.array([i, j, k]), size, rank >>> - >>> with PyMPIStepExecutor(max_cores=2) as p: + >>> with PyLocalStepExecutor(max_cores=2) as p: >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result()) @@ -127,25 +90,15 @@ class PyLocalStepExecutor(ExecutorSteps): def __init__( self, max_cores: int = 1, - cores_per_worker: int = 1, - oversubscribe: bool = False, - cwd: Optional[str] = None, - hostname_localhost: bool = False, + executor_kwargs: dict = {}, ): super().__init__() + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["interface_class"] = MpiExecInterface + executor_kwargs["max_cores"] = max_cores self._set_process( RaisingThread( target=execute_separate_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores_per_worker, - "interface_class": MpiExecInterface, - "max_cores": max_cores, - "hostname_localhost": hostname_localhost, - # Interface Arguments - "cwd": cwd, - "oversubscribe": oversubscribe, - }, + kwargs=executor_kwargs, ) ) diff --git a/pympipool/scheduler/slurm.py b/pympipool/scheduler/slurm.py index 1f3dc79c..a70110f4 100644 --- a/pympipool/scheduler/slurm.py +++ b/pympipool/scheduler/slurm.py @@ -1,4 +1,3 @@ -from typing import Optional from pympipool.shared.executorbase import ( execute_parallel_tasks, execute_separate_tasks, @@ -18,20 +17,7 @@ class PySlurmExecutor(ExecutorBroker): Args: max_workers (int): defines the number workers which can execute functions in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - command_line_argument_lst (list): Additional command line arguments for the srun call + executor_kwargs (dict): keyword arguments for the executor Examples: @@ -47,7 +33,7 @@ class PySlurmExecutor(ExecutorBroker): >>> def init_k(): >>> return {"k": 3} >>> - >>> with PySlurmExecutor(max_workers=2, init_function=init_k) as p: + >>> with PySlurmExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) @@ -57,34 +43,16 @@ class PySlurmExecutor(ExecutorBroker): def __init__( self, max_workers: int = 1, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - oversubscribe: bool = False, - init_function: Optional[callable] = None, - cwd: Optional[str] = None, - hostname_localhost: bool = False, - command_line_argument_lst: list[str] = [], + executor_kwargs: dict = {}, ): super().__init__() + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["interface_class"] = SrunInterface self._set_process( process=[ RaisingThread( target=execute_parallel_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores_per_worker, - "interface_class": SrunInterface, - "hostname_localhost": hostname_localhost, - "init_function": init_function, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": int(gpus_per_worker / cores_per_worker), - "cwd": cwd, - "oversubscribe": oversubscribe, - "command_line_argument_lst": command_line_argument_lst, - }, + kwargs=executor_kwargs, ) for _ in range(max_workers) ], @@ -100,19 +68,7 @@ class PySlurmStepExecutor(ExecutorSteps): Args: max_cores (int): defines the number cores which can be used in parallel - cores_per_worker (int): number of MPI cores to be used for each function call - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - cwd (str/None): current working directory where the parallel python task is executed - hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the - context of an HPC cluster this essential to be able to communicate to an - Executor running on a different compute node within the same allocation. And - in principle any computer should be able to resolve that their own hostname - points to the same address as localhost. Still MacOS >= 12 seems to disable - this look up for security reasons. So on MacOS it is required to set this - option to true - command_line_argument_lst (list): Additional command line arguments for the srun call + executor_kwargs (dict): keyword arguments for the executor Examples: @@ -135,31 +91,15 @@ class PySlurmStepExecutor(ExecutorSteps): def __init__( self, max_cores: int = 1, - cores_per_worker: int = 1, - threads_per_core: int = 1, - gpus_per_worker: int = 0, - oversubscribe: bool = False, - cwd: Optional[str] = None, - hostname_localhost: bool = False, - command_line_argument_lst: list[str] = [], + executor_kwargs: dict = {}, ): super().__init__() + executor_kwargs["future_queue"] = self._future_queue + executor_kwargs["interface_class"] = SrunInterface + executor_kwargs["max_cores"] = max_cores self._set_process( RaisingThread( target=execute_separate_tasks, - kwargs={ - # Executor Arguments - "future_queue": self._future_queue, - "cores": cores_per_worker, - "interface_class": SrunInterface, - "max_cores": max_cores, - "hostname_localhost": hostname_localhost, - # Interface Arguments - "threads_per_core": threads_per_core, - "gpus_per_core": int(gpus_per_worker / cores_per_worker), - "cwd": cwd, - "oversubscribe": oversubscribe, - "command_line_argument_lst": command_line_argument_lst, - }, + kwargs=executor_kwargs, ) ) diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index 20e43157..ea1f50fd 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -14,7 +14,7 @@ from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread -from pympipool.shared.interface import BaseInterface +from pympipool.shared.interface import BaseInterface, MpiExecInterface from pympipool.shared.inputcheck import ( check_resource_dict, check_resource_dict_is_empty, @@ -255,8 +255,8 @@ def cloudpickle_register(ind: int = 2): def execute_parallel_tasks( future_queue: queue.Queue, - cores: int, - interface_class: BaseInterface, + cores: int = 1, + interface_class: BaseInterface = MpiExecInterface, hostname_localhost: bool = False, init_function: Optional[callable] = None, **kwargs, @@ -309,8 +309,8 @@ def execute_parallel_tasks( def execute_separate_tasks( future_queue: queue.Queue, - interface_class: BaseInterface, - max_cores: int, + interface_class: BaseInterface = MpiExecInterface, + max_cores: int = 1, hostname_localhost: bool = False, **kwargs, ): @@ -331,6 +331,8 @@ def execute_separate_tasks( """ active_task_dict = {} process_lst, qtask_lst = [], [] + if "cores" not in kwargs.keys(): + kwargs["cores"] = 1 while True: task_dict = future_queue.get() if "shutdown" in task_dict.keys() and task_dict["shutdown"]: @@ -548,7 +550,7 @@ def _submit_function_to_separate_process( qtask: queue.Queue, interface_class: BaseInterface, executor_kwargs: dict, - max_cores: int, + max_cores: int = 1, hostname_localhost: bool = False, ): """ diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index 14f6413d..eecc3d3c 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -49,7 +49,10 @@ def setUp(self): self.executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): - with PyFluxExecutor(max_workers=2, executor=self.executor) as exe: + with PyFluxExecutor( + max_workers=2, + executor_kwargs={"executor": self.executor}, + ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) self.assertEqual(fs_1.result(), 1) @@ -59,7 +62,8 @@ def test_flux_executor_serial(self): def test_flux_executor_threads(self): with PyFluxExecutor( - max_workers=1, threads_per_core=2, executor=self.executor + max_workers=1, + executor_kwargs={"executor": self.executor, "threads_per_core": 2}, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -70,7 +74,8 @@ def test_flux_executor_threads(self): def test_flux_executor_parallel(self): with PyFluxExecutor( - max_workers=1, cores_per_worker=2, executor=self.executor, pmi=pmi + max_workers=1, + executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) @@ -78,7 +83,8 @@ def test_flux_executor_parallel(self): def test_single_task(self): with PyFluxExecutor( - max_workers=1, cores_per_worker=2, executor=self.executor, pmi=pmi + max_workers=1, + executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( @@ -120,9 +126,11 @@ def test_execute_task_threads(self): def test_internal_memory(self): with PyFluxExecutor( max_workers=1, - cores_per_worker=1, - init_function=set_global, - executor=self.executor, + executor_kwargs={ + "executor": self.executor, + "cores": 1, + "init_function": set_global, + }, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 4d5c72e0..25c2c5b4 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -61,7 +61,10 @@ def sleep_one(i): class TestPyMpiExecutorSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with PyLocalExecutor(max_workers=2, hostname_localhost=True) as exe: + with PyLocalExecutor( + max_workers=2, + executor_kwargs={"hostname_localhost": True}, + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -71,7 +74,9 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with PyLocalExecutor(max_workers=1, hostname_localhost=True) as exe: + with PyLocalExecutor( + max_workers=1, executor_kwargs={"hostname_localhost": True} + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -80,26 +85,12 @@ def test_pympiexecutor_one_worker(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_pympiexecutor_errors(self): - with self.assertRaises(TypeError): - PyLocalExecutor( - max_workers=1, - cores_per_worker=1, - threads_per_core=2, - hostname_localhost=True, - ) - with self.assertRaises(TypeError): - PyLocalExecutor( - max_workers=1, - cores_per_worker=1, - gpus_per_worker=1, - hostname_localhost=True, - ) - class TestPyMpiExecutorStepSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with PyLocalStepExecutor(max_cores=2, hostname_localhost=True) as exe: + with PyLocalStepExecutor( + max_cores=2, executor_kwargs={"hostname_localhost": True} + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -109,7 +100,9 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with PyLocalStepExecutor(max_cores=1, hostname_localhost=True) as exe: + with PyLocalStepExecutor( + max_cores=1, executor_kwargs={"hostname_localhost": True} + ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -118,22 +111,6 @@ def test_pympiexecutor_one_worker(self): self.assertTrue(fs_1.done()) self.assertTrue(fs_2.done()) - def test_pympiexecutor_errors(self): - with self.assertRaises(TypeError): - PyLocalStepExecutor( - max_cores=1, - cores_per_worker=1, - threads_per_core=2, - hostname_localhost=True, - ) - with self.assertRaises(TypeError): - PyLocalStepExecutor( - max_cores=1, - cores_per_worker=1, - gpus_per_worker=1, - hostname_localhost=True, - ) - @unittest.skipIf( skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." @@ -141,7 +118,8 @@ def test_pympiexecutor_errors(self): class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -150,7 +128,8 @@ def test_pympiexecutor_one_worker_with_mpi(self): def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -168,7 +147,8 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): def test_pympiexecutor_one_worker_with_mpi_echo(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() @@ -181,7 +161,8 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): with PyLocalStepExecutor( - max_cores=2, cores_per_worker=2, hostname_localhost=True + max_cores=2, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -190,7 +171,8 @@ def test_pympiexecutor_one_worker_with_mpi(self): def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): with PyLocalStepExecutor( - max_cores=2, cores_per_worker=2, hostname_localhost=True + max_cores=2, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -208,7 +190,8 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): def test_pympiexecutor_one_worker_with_mpi_echo(self): with PyLocalStepExecutor( - max_cores=2, cores_per_worker=2, hostname_localhost=True + max_cores=2, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() @@ -219,9 +202,11 @@ class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): with PyLocalExecutor( max_workers=1, - cores_per_worker=1, - init_function=set_global, - hostname_localhost=True, + executor_kwargs={ + "cores": 1, + "init_function": set_global, + "hostname_localhost": True, + }, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) @@ -258,7 +243,8 @@ def test_execute_task(self): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=1, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 1, "hostname_localhost": True}, ) as p: output = p.submit(calc_array, i=2) self.assertEqual(len(p), 1) @@ -271,7 +257,8 @@ def test_pool_serial(self): def test_executor_multi_submission(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=1, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 1, "hostname_localhost": True}, ) as p: fs_1 = p.submit(calc_array, i=2) fs_2 = p.submit(calc_array, i=2) @@ -281,7 +268,10 @@ def test_executor_multi_submission(self): self.assertTrue(fs_2.done()) def test_shutdown(self): - p = PyLocalExecutor(max_workers=1, cores_per_worker=1, hostname_localhost=True) + p = PyLocalExecutor( + max_workers=1, + executor_kwargs={"cores": 1, "hostname_localhost": True}, + ) fs1 = p.submit(sleep_one, i=2) fs2 = p.submit(sleep_one, i=4) sleep(1) @@ -294,7 +284,8 @@ def test_shutdown(self): def test_pool_serial_map(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=1, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 1, "hostname_localhost": True}, ) as p: output = p.map(calc_array, [1, 2, 3]) self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) @@ -302,14 +293,16 @@ def test_pool_serial_map(self): def test_executor_exception(self): with self.assertRaises(RuntimeError): with PyLocalExecutor( - max_workers=1, cores_per_worker=1, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 1, "hostname_localhost": True}, ) as p: p.submit(raise_error) def test_executor_exception_future(self): with self.assertRaises(RuntimeError): with PyLocalExecutor( - max_workers=1, cores_per_worker=1, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 1, "hostname_localhost": True}, ) as p: fs = p.submit(raise_error) fs.result() @@ -328,7 +321,14 @@ def test_meta(self): "max_workers": 1, } with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={ + "cores": 2, + "hostname_localhost": True, + "init_function": None, + "cwd": None, + "oversubscribe": False, + }, ) as exe: for k, v in meta_data_exe_dict.items(): if k != "interface_class": @@ -348,7 +348,13 @@ def test_meta_step(self): "max_cores": 2, } with PyLocalStepExecutor( - max_cores=2, cores_per_worker=2, hostname_localhost=True + max_cores=2, + executor_kwargs={ + "cores": 2, + "hostname_localhost": True, + "cwd": None, + "oversubscribe": False, + }, ) as exe: for k, v in meta_data_exe_dict.items(): if k != "interface_class": @@ -361,7 +367,8 @@ def test_meta_step(self): ) def test_pool_multi_core(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as p: output = p.submit(mpi_funct, i=2) self.assertEqual(len(p), 1) @@ -377,7 +384,8 @@ def test_pool_multi_core(self): ) def test_pool_multi_core_map(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={"cores": 2, "hostname_localhost": True}, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( diff --git a/tests/test_local_executor_future.py b/tests/test_local_executor_future.py index 64a4ec5a..a7adf7f1 100644 --- a/tests/test_local_executor_future.py +++ b/tests/test_local_executor_future.py @@ -18,7 +18,8 @@ def calc(i): class TestFuture(unittest.TestCase): def test_pool_serial(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=1, hostname_localhost=True + max_workers=1, + executor_kwargs={"hostname_localhost": True, "cores": 1}, ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) @@ -32,7 +33,8 @@ def test_pool_serial(self): ) def test_pool_serial_multi_core(self): with PyLocalExecutor( - max_workers=1, cores_per_worker=2, hostname_localhost=True + max_workers=1, + executor_kwargs={"hostname_localhost": True, "cores": 2}, ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) @@ -62,7 +64,9 @@ def callback(future): def submit(): # Executor only exists in this scope and can get garbage collected after # this function is exits - future = PyLocalExecutor(hostname_localhost=True).submit(slow_callable) + future = PyLocalExecutor( + executor_kwargs={"hostname_localhost": True} + ).submit(slow_callable) future.add_done_callback(callback) return future @@ -99,9 +103,9 @@ def __init__(self): def run(self): self.running = True - future = PyLocalExecutor(hostname_localhost=True).submit( - self.return_42 - ) + future = PyLocalExecutor( + executor_kwargs={"hostname_localhost": True} + ).submit(self.return_42) future.add_done_callback(self.finished) return future