diff --git a/pympipool/scheduler/__init__.py b/pympipool/scheduler/__init__.py index 1c8215c5..d8dbffe7 100644 --- a/pympipool/scheduler/__init__.py +++ b/pympipool/scheduler/__init__.py @@ -1,15 +1,15 @@ import os import shutil from typing import Optional -from pympipool.scheduler.local import ( - PyLocalExecutor, - PyLocalStepExecutor, +from pympipool.scheduler.universal import ( + UniversalExecutor, + UniversalStepExecutor, ) -from pympipool.scheduler.slurm import ( - PySlurmExecutor, - PySlurmStepExecutor, +from pympipool.scheduler.interface import ( + MpiExecInterface, + SLURM_COMMAND, + SrunInterface, ) -from pympipool.shared.interface import SLURM_COMMAND from pympipool.shared.inputcheck import ( check_command_line_argument_lst, check_gpus_per_worker, @@ -23,10 +23,7 @@ ) try: # The PyFluxExecutor requires flux-core to be installed. - from pympipool.scheduler.flux import ( - PyFluxExecutor, - PyFluxStepExecutor, - ) + from pympipool.scheduler.flux import FluxPythonInterface flux_installed = "FLUX_URI" in os.environ except ImportError: @@ -111,14 +108,16 @@ def create_executor( executor_kwargs["pmi"] = pmi if block_allocation: executor_kwargs["init_function"] = init_function - return PyFluxExecutor( + return UniversalExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, + interface_class=FluxPythonInterface, ) else: - return PyFluxStepExecutor( + return UniversalStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, + interface_class=FluxPythonInterface, ) elif backend == "slurm": check_executor(executor=executor) @@ -128,14 +127,16 @@ def create_executor( executor_kwargs["oversubscribe"] = oversubscribe if block_allocation: executor_kwargs["init_function"] = init_function - return PySlurmExecutor( + return UniversalExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, + interface_class=SrunInterface, ) else: - return PySlurmStepExecutor( + return UniversalStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, + interface_class=SrunInterface, ) else: # backend="local" check_threads_per_core(threads_per_core=threads_per_core) @@ -147,12 +148,14 @@ def create_executor( executor_kwargs["oversubscribe"] = oversubscribe if block_allocation: executor_kwargs["init_function"] = init_function - return PyLocalExecutor( + return UniversalExecutor( max_workers=int(max_cores / cores_per_worker), executor_kwargs=executor_kwargs, + interface_class=MpiExecInterface, ) else: - return PyLocalStepExecutor( + return UniversalStepExecutor( max_cores=max_cores, executor_kwargs=executor_kwargs, + interface_class=MpiExecInterface, ) diff --git a/pympipool/scheduler/flux.py b/pympipool/scheduler/flux.py index dc8403a7..c1e36754 100644 --- a/pympipool/scheduler/flux.py +++ b/pympipool/scheduler/flux.py @@ -3,112 +3,7 @@ import flux.job -from pympipool.shared.executorbase import ( - execute_parallel_tasks, - execute_separate_tasks, - ExecutorBroker, - ExecutorSteps, -) -from pympipool.shared.interface import BaseInterface -from pympipool.shared.thread import RaisingThread - - -class PyFluxExecutor(ExecutorBroker): - """ - The pympipool.flux.PyFluxExecutor leverages the flux framework to distribute python tasks within a queuing system - allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number of - threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per - worker. - - Args: - max_workers (int): defines the number workers which can execute functions in parallel - executor_kwargs (dict): keyword arguments for the executor - - Examples: - - >>> import numpy as np - >>> from pympipool.scheduler.flux import PyFluxExecutor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> def init_k(): - >>> return {"k": 3} - >>> - >>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: - >>> fs = p.submit(calc, 2, j=4) - >>> print(fs.result()) - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - - """ - - def __init__( - self, - max_workers: int = 1, - executor_kwargs: dict = {}, - ): - super().__init__() - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["interface_class"] = FluxPythonInterface - self._set_process( - process=[ - RaisingThread( - target=execute_parallel_tasks, - kwargs=executor_kwargs, - ) - for _ in range(max_workers) - ], - ) - - -class PyFluxStepExecutor(ExecutorSteps): - """ - The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing - system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number - of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per - worker. - - Args: - max_cores (int): defines the number workers which can execute functions in parallel - executor_kwargs (dict): keyword arguments for the executor - - Examples: - - >>> import numpy as np - >>> from pympipool.scheduler.flux import PyFluxStepExecutor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> with PyFluxStepExecutor(max_cores=2) as p: - >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) - >>> print(fs.result()) - - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - - """ - - def __init__( - self, - max_cores: int = 1, - executor_kwargs: dict = {}, - ): - super().__init__() - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["interface_class"] = FluxPythonInterface - executor_kwargs["max_cores"] = max_cores - self._set_process( - RaisingThread( - target=execute_separate_tasks, - kwargs=executor_kwargs, - ) - ) +from pympipool.scheduler.interface import BaseInterface class FluxPythonInterface(BaseInterface): diff --git a/pympipool/shared/interface.py b/pympipool/scheduler/interface.py similarity index 100% rename from pympipool/shared/interface.py rename to pympipool/scheduler/interface.py diff --git a/pympipool/scheduler/local.py b/pympipool/scheduler/local.py deleted file mode 100644 index 2de08953..00000000 --- a/pympipool/scheduler/local.py +++ /dev/null @@ -1,104 +0,0 @@ -from pympipool.shared.executorbase import ( - execute_parallel_tasks, - execute_separate_tasks, - ExecutorBroker, - ExecutorSteps, -) -from pympipool.shared.interface import MpiExecInterface -from pympipool.shared.thread import RaisingThread - - -class PyLocalExecutor(ExecutorBroker): - """ - The pympipool.mpi.PyLocalExecutor leverages the message passing interface MPI to distribute python tasks on a - workstation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyLocalExecutor can be executed - in a serial python process and does not require the python script to be executed with MPI. Consequently, it is - primarily an abstraction of its functionality to improve the usability in particular when used in combination with - Jupyter notebooks. - - Args: - max_workers (int): defines the number workers which can execute functions in parallel - executor_kwargs (dict): keyword arguments for the executor - - Examples: - - >>> import numpy as np - >>> from pympipool.scheduler.local import PyLocalExecutor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> def init_k(): - >>> return {"k": 3} - >>> - >>> with PyLocalExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: - >>> fs = p.submit(calc, 2, j=4) - >>> print(fs.result()) - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - - """ - - def __init__(self, max_workers: int = 1, executor_kwargs: dict = {}): - super().__init__() - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["interface_class"] = MpiExecInterface - self._set_process( - process=[ - RaisingThread( - target=execute_parallel_tasks, - kwargs=executor_kwargs, - ) - for _ in range(max_workers) - ], - ) - - -class PyLocalStepExecutor(ExecutorSteps): - """ - The pympipool.mpi.PyLocalStepExecutor leverages the message passing interface MPI to distribute python tasks on a - workstation. In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.mpi.PyLocalStepExecutor can be executed - in a serial python process and does not require the python script to be executed with MPI. Consequently, it is - primarily an abstraction of its functionality to improve the usability in particular when used in combination with - Jupyter notebooks. - - Args: - max_cores (int): defines the number cores which can be used in parallel - executor_kwargs (dict): keyword arguments for the executor - - Examples: - - >>> import numpy as np - >>> from pympipool.scheduler.local import PyLocalStepExecutor - >>> - >>> def calc(i, j, k): - >>> from mpi4py import MPI - >>> size = MPI.COMM_WORLD.Get_size() - >>> rank = MPI.COMM_WORLD.Get_rank() - >>> return np.array([i, j, k]), size, rank - >>> - >>> with PyLocalStepExecutor(max_cores=2) as p: - >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) - >>> print(fs.result()) - - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - - """ - - def __init__( - self, - max_cores: int = 1, - executor_kwargs: dict = {}, - ): - super().__init__() - executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["interface_class"] = MpiExecInterface - executor_kwargs["max_cores"] = max_cores - self._set_process( - RaisingThread( - target=execute_separate_tasks, - kwargs=executor_kwargs, - ) - ) diff --git a/pympipool/scheduler/slurm.py b/pympipool/scheduler/universal.py similarity index 56% rename from pympipool/scheduler/slurm.py rename to pympipool/scheduler/universal.py index a70110f4..b6dd8531 100644 --- a/pympipool/scheduler/slurm.py +++ b/pympipool/scheduler/universal.py @@ -4,25 +4,28 @@ ExecutorBroker, ExecutorSteps, ) -from pympipool.shared.interface import SrunInterface +from pympipool.scheduler.interface import BaseInterface from pympipool.shared.thread import RaisingThread +from pympipool.scheduler.interface import MpiExecInterface -class PySlurmExecutor(ExecutorBroker): +class UniversalExecutor(ExecutorBroker): """ - The pympipool.slurm.PySlurmExecutor leverages the srun command to distribute python tasks within a SLURM queuing - system allocation. In analogy to the pympipool.flux.PyFluxExecutor it provides the option to specify the number of - threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per - worker. + The pympipool.scheduler.universal.UniversalExecutor leverages the pympipool interfaces to distribute python tasks on + a workstation or inside a queuing system allocation. In contrast to the mpi4py.futures.MPIPoolExecutor the + pympipool.scheduler.universal.UniversalExecutor can be executed in a serial python process and does not require the + python script to be executed with MPI. Consequently, it is primarily an abstraction of its functionality to improve + the usability in particular when used in combination with Jupyter notebooks. Args: max_workers (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor + interface_class (BaseInterface): interface class to initiate python processes Examples: >>> import numpy as np - >>> from pympipool.scheduler.slurm import PySlurmExecutor + >>> from pympipool.scheduler.flux import PyFluxExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI @@ -33,21 +36,22 @@ class PySlurmExecutor(ExecutorBroker): >>> def init_k(): >>> return {"k": 3} >>> - >>> with PySlurmExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: + >>> with PyFluxExecutor(max_workers=2, executor_kwargs={"init_function": init_k}) as p: >>> fs = p.submit(calc, 2, j=4) >>> print(fs.result()) - [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + """ def __init__( self, max_workers: int = 1, executor_kwargs: dict = {}, + interface_class: BaseInterface = MpiExecInterface, ): super().__init__() executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["interface_class"] = SrunInterface + executor_kwargs["interface_class"] = interface_class self._set_process( process=[ RaisingThread( @@ -59,21 +63,21 @@ def __init__( ) -class PySlurmStepExecutor(ExecutorSteps): +class UniversalStepExecutor(ExecutorSteps): """ - The pympipool.slurm.PySlurmStepExecutor leverages the srun command to distribute python tasks within a SLURM queuing - system allocation. In analogy to the pympipool.flux.PyFluxExecutor it provides the option to specify the number of - threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per + The pympipool.flux.PyFluxStepExecutor leverages the flux framework to distribute python tasks within a queuing + system allocation. In analogy to the pympipool.slurm.PySlurmExecutur it provides the option to specify the number + of threads per worker as well as the number of GPUs per worker in addition to specifying the number of cores per worker. Args: - max_cores (int): defines the number cores which can be used in parallel + max_cores (int): defines the number workers which can execute functions in parallel executor_kwargs (dict): keyword arguments for the executor Examples: >>> import numpy as np - >>> from pympipool.scheduler.slurm import PySlurmStepExecutor + >>> from pympipool.scheduler.flux import PyFluxStepExecutor >>> >>> def calc(i, j, k): >>> from mpi4py import MPI @@ -81,21 +85,23 @@ class PySlurmStepExecutor(ExecutorSteps): >>> rank = MPI.COMM_WORLD.Get_rank() >>> return np.array([i, j, k]), size, rank >>> - >>> with PySlurmStepExecutor(max_cores=2) as p: + >>> with PyFluxStepExecutor(max_cores=2) as p: >>> fs = p.submit(calc, 2, j=4, k=3, resource_dict={"cores": 2}) >>> print(fs.result()) [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + """ def __init__( self, max_cores: int = 1, executor_kwargs: dict = {}, + interface_class: BaseInterface = MpiExecInterface, ): super().__init__() executor_kwargs["future_queue"] = self._future_queue - executor_kwargs["interface_class"] = SrunInterface + executor_kwargs["interface_class"] = interface_class executor_kwargs["max_cores"] = max_cores self._set_process( RaisingThread( diff --git a/pympipool/shared/__init__.py b/pympipool/shared/__init__.py index 2693e2b4..73723ea0 100644 --- a/pympipool/shared/__init__.py +++ b/pympipool/shared/__init__.py @@ -8,7 +8,7 @@ ) from pympipool.shared.executorbase import cancel_items_in_queue from pympipool.shared.thread import RaisingThread -from pympipool.shared.interface import MpiExecInterface, SrunInterface +from pympipool.scheduler.interface import MpiExecInterface, SrunInterface __all__ = [ diff --git a/pympipool/shared/executorbase.py b/pympipool/shared/executorbase.py index ea1f50fd..d0b9ea28 100644 --- a/pympipool/shared/executorbase.py +++ b/pympipool/shared/executorbase.py @@ -14,7 +14,7 @@ from pympipool.shared.communication import interface_bootup from pympipool.shared.thread import RaisingThread -from pympipool.shared.interface import BaseInterface, MpiExecInterface +from pympipool.scheduler.interface import BaseInterface, MpiExecInterface from pympipool.shared.inputcheck import ( check_resource_dict, check_resource_dict_is_empty, diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 1eb599ee..ec5fe768 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -8,10 +8,7 @@ try: import flux.job - from pympipool.scheduler.flux import ( - PyFluxExecutor, - FluxPythonInterface, - ) + from pympipool.scheduler.flux import FluxPythonInterface skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) diff --git a/tests/test_flux_executor.py b/tests/test_flux_executor.py index eecc3d3c..292f4a83 100644 --- a/tests/test_flux_executor.py +++ b/tests/test_flux_executor.py @@ -5,15 +5,13 @@ import numpy as np +from pympipool.scheduler.universal import UniversalExecutor from pympipool.shared.executorbase import cloudpickle_register, execute_parallel_tasks try: import flux.job - from pympipool.scheduler.flux import ( - PyFluxExecutor, - FluxPythonInterface, - ) + from pympipool.scheduler.flux import FluxPythonInterface skip_flux_test = "FLUX_URI" not in os.environ pmi = os.environ.get("PYMPIPOOL_PMIX", None) @@ -49,9 +47,10 @@ def setUp(self): self.executor = flux.job.FluxExecutor() def test_flux_executor_serial(self): - with PyFluxExecutor( + with UniversalExecutor( max_workers=2, executor_kwargs={"executor": self.executor}, + interface_class=FluxPythonInterface, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -61,9 +60,10 @@ def test_flux_executor_serial(self): self.assertTrue(fs_2.done()) def test_flux_executor_threads(self): - with PyFluxExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "threads_per_core": 2}, + interface_class=FluxPythonInterface, ) as exe: fs_1 = exe.submit(calc, 1) fs_2 = exe.submit(calc, 2) @@ -73,18 +73,20 @@ def test_flux_executor_threads(self): self.assertTrue(fs_2.done()) def test_flux_executor_parallel(self): - with PyFluxExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, + interface_class=FluxPythonInterface, ) as exe: fs_1 = exe.submit(mpi_funct, 1) self.assertEqual(fs_1.result(), [(1, 2, 0), (1, 2, 1)]) self.assertTrue(fs_1.done()) def test_single_task(self): - with PyFluxExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"executor": self.executor, "cores": 2, "pmi": pmi}, + interface_class=FluxPythonInterface, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( @@ -124,13 +126,14 @@ def test_execute_task_threads(self): q.join() def test_internal_memory(self): - with PyFluxExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={ "executor": self.executor, "cores": 1, "init_function": set_global, }, + interface_class=FluxPythonInterface, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) diff --git a/tests/test_local_executor.py b/tests/test_local_executor.py index 25c2c5b4..84496738 100644 --- a/tests/test_local_executor.py +++ b/tests/test_local_executor.py @@ -6,10 +6,10 @@ import numpy as np -from pympipool.scheduler.local import ( - PyLocalExecutor, - PyLocalStepExecutor, - MpiExecInterface, +from pympipool.scheduler.interface import MpiExecInterface +from pympipool.scheduler.universal import ( + UniversalExecutor, + UniversalStepExecutor, ) from pympipool.shared.backend import call_funct from pympipool.shared.executorbase import ( @@ -61,9 +61,10 @@ def sleep_one(i): class TestPyMpiExecutorSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=2, executor_kwargs={"hostname_localhost": True}, + interface_class=MpiExecInterface, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -74,8 +75,10 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with PyLocalExecutor( - max_workers=1, executor_kwargs={"hostname_localhost": True} + with UniversalExecutor( + max_workers=1, + executor_kwargs={"hostname_localhost": True}, + interface_class=MpiExecInterface, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -88,8 +91,10 @@ def test_pympiexecutor_one_worker(self): class TestPyMpiExecutorStepSerial(unittest.TestCase): def test_pympiexecutor_two_workers(self): - with PyLocalStepExecutor( - max_cores=2, executor_kwargs={"hostname_localhost": True} + with UniversalStepExecutor( + max_cores=2, + executor_kwargs={"hostname_localhost": True}, + interface_class=MpiExecInterface, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -100,8 +105,10 @@ def test_pympiexecutor_two_workers(self): self.assertTrue(fs_2.done()) def test_pympiexecutor_one_worker(self): - with PyLocalStepExecutor( - max_cores=1, executor_kwargs={"hostname_localhost": True} + with UniversalStepExecutor( + max_cores=1, + executor_kwargs={"hostname_localhost": True}, + interface_class=MpiExecInterface, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(calc, 1) @@ -117,9 +124,10 @@ def test_pympiexecutor_one_worker(self): ) class TestPyMpiExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -127,9 +135,10 @@ def test_pympiexecutor_one_worker_with_mpi(self): self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -146,9 +155,10 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() @@ -160,9 +170,10 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): ) class TestPyMpiStepExecutorMPI(unittest.TestCase): def test_pympiexecutor_one_worker_with_mpi(self): - with PyLocalStepExecutor( + with UniversalStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as exe: cloudpickle_register(ind=1) fs_1 = exe.submit(mpi_funct, 1) @@ -170,9 +181,10 @@ def test_pympiexecutor_one_worker_with_mpi(self): self.assertTrue(fs_1.done()) def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): - with PyLocalStepExecutor( + with UniversalStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: cloudpickle_register(ind=1) fs1 = p.submit(mpi_funct, 1) @@ -189,9 +201,10 @@ def test_pympiexecutor_one_worker_with_mpi_multiple_submissions(self): ) def test_pympiexecutor_one_worker_with_mpi_echo(self): - with PyLocalStepExecutor( + with UniversalStepExecutor( max_cores=2, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: cloudpickle_register(ind=1) output = p.submit(echo_funct, 2).result() @@ -200,13 +213,14 @@ def test_pympiexecutor_one_worker_with_mpi_echo(self): class TestPyMpiExecutorInitFunction(unittest.TestCase): def test_internal_memory(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={ "cores": 1, "init_function": set_global, "hostname_localhost": True, }, + interface_class=MpiExecInterface, ) as p: f = p.submit(get_global) self.assertFalse(f.done()) @@ -242,9 +256,10 @@ def test_execute_task(self): class TestFuturePool(unittest.TestCase): def test_pool_serial(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: output = p.submit(calc_array, i=2) self.assertEqual(len(p), 1) @@ -256,9 +271,10 @@ def test_pool_serial(self): self.assertEqual(output.result(), np.array(4)) def test_executor_multi_submission(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: fs_1 = p.submit(calc_array, i=2) fs_2 = p.submit(calc_array, i=2) @@ -268,9 +284,10 @@ def test_executor_multi_submission(self): self.assertTrue(fs_2.done()) def test_shutdown(self): - p = PyLocalExecutor( + p = UniversalExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) fs1 = p.submit(sleep_one, i=2) fs2 = p.submit(sleep_one, i=4) @@ -283,26 +300,29 @@ def test_shutdown(self): fs2.result() def test_pool_serial_map(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: output = p.map(calc_array, [1, 2, 3]) self.assertEqual(list(output), [np.array(1), np.array(4), np.array(9)]) def test_executor_exception(self): with self.assertRaises(RuntimeError): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: p.submit(raise_error) def test_executor_exception_future(self): with self.assertRaises(RuntimeError): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 1, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: fs = p.submit(raise_error) fs.result() @@ -313,14 +333,14 @@ def test_executor_exception_future(self): def test_meta(self): meta_data_exe_dict = { "cores": 2, - "interface_class": "", + "interface_class": "", "hostname_localhost": True, "init_function": None, "cwd": None, "oversubscribe": False, "max_workers": 1, } - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={ "cores": 2, @@ -329,6 +349,7 @@ def test_meta(self): "cwd": None, "oversubscribe": False, }, + interface_class=MpiExecInterface, ) as exe: for k, v in meta_data_exe_dict.items(): if k != "interface_class": @@ -341,13 +362,13 @@ def test_meta(self): def test_meta_step(self): meta_data_exe_dict = { "cores": 2, - "interface_class": "", + "interface_class": "", "hostname_localhost": True, "cwd": None, "oversubscribe": False, "max_cores": 2, } - with PyLocalStepExecutor( + with UniversalStepExecutor( max_cores=2, executor_kwargs={ "cores": 2, @@ -355,6 +376,7 @@ def test_meta_step(self): "cwd": None, "oversubscribe": False, }, + interface_class=MpiExecInterface, ) as exe: for k, v in meta_data_exe_dict.items(): if k != "interface_class": @@ -366,9 +388,10 @@ def test_meta_step(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: output = p.submit(mpi_funct, i=2) self.assertEqual(len(p), 1) @@ -383,9 +406,10 @@ def test_pool_multi_core(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_multi_core_map(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"cores": 2, "hostname_localhost": True}, + interface_class=MpiExecInterface, ) as p: output = p.map(mpi_funct, [1, 2, 3]) self.assertEqual( diff --git a/tests/test_local_executor_future.py b/tests/test_local_executor_future.py index a7adf7f1..a3e19e54 100644 --- a/tests/test_local_executor_future.py +++ b/tests/test_local_executor_future.py @@ -5,7 +5,8 @@ import numpy as np -from pympipool.scheduler.local import PyLocalExecutor +from pympipool.scheduler.universal import UniversalExecutor +from pympipool.scheduler.interface import MpiExecInterface skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None @@ -17,9 +18,10 @@ def calc(i): class TestFuture(unittest.TestCase): def test_pool_serial(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True, "cores": 1}, + interface_class=MpiExecInterface, ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) @@ -32,9 +34,10 @@ def test_pool_serial(self): skip_mpi4py_test, "mpi4py is not installed, so the mpi4py tests are skipped." ) def test_pool_serial_multi_core(self): - with PyLocalExecutor( + with UniversalExecutor( max_workers=1, executor_kwargs={"hostname_localhost": True, "cores": 2}, + interface_class=MpiExecInterface, ) as p: output = p.submit(calc, i=2) self.assertTrue(isinstance(output, Future)) @@ -64,8 +67,9 @@ def callback(future): def submit(): # Executor only exists in this scope and can get garbage collected after # this function is exits - future = PyLocalExecutor( - executor_kwargs={"hostname_localhost": True} + future = UniversalExecutor( + executor_kwargs={"hostname_localhost": True}, + interface_class=MpiExecInterface, ).submit(slow_callable) future.add_done_callback(callback) return future @@ -103,8 +107,9 @@ def __init__(self): def run(self): self.running = True - future = PyLocalExecutor( - executor_kwargs={"hostname_localhost": True} + future = UniversalExecutor( + executor_kwargs={"hostname_localhost": True}, + interface_class=MpiExecInterface, ).submit(self.return_42) future.add_done_callback(self.finished) diff --git a/tests/test_shared_backend.py b/tests/test_shared_backend.py index 18e00bef..f9c1d0a2 100644 --- a/tests/test_shared_backend.py +++ b/tests/test_shared_backend.py @@ -3,7 +3,7 @@ import unittest from pympipool.shared.backend import parse_arguments -from pympipool.shared.interface import SrunInterface, MpiExecInterface +from pympipool.scheduler.interface import SrunInterface, MpiExecInterface class TestParser(unittest.TestCase): diff --git a/tests/test_shared_communication.py b/tests/test_shared_communication.py index f9b800e2..6eca60d7 100644 --- a/tests/test_shared_communication.py +++ b/tests/test_shared_communication.py @@ -14,7 +14,7 @@ SocketInterface, ) from pympipool.shared.executorbase import cloudpickle_register -from pympipool.shared.interface import MpiExecInterface +from pympipool.scheduler.interface import MpiExecInterface skip_mpi4py_test = importlib.util.find_spec("mpi4py") is None