From 2d631f0d16a5105ffbbe3694b5061aa22ae62be8 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 16:37:15 -0600 Subject: [PATCH 1/9] Integration with pysqa --- pympipool/queue/__init__.py | 0 pympipool/queue/executor.py | 171 ++++++++++++++++++++++++++++++++++++ pympipool/queue/pool.py | 115 ++++++++++++++++++++++++ 3 files changed, 286 insertions(+) create mode 100644 pympipool/queue/__init__.py create mode 100644 pympipool/queue/executor.py create mode 100644 pympipool/queue/pool.py diff --git a/pympipool/queue/__init__.py b/pympipool/queue/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pympipool/queue/executor.py b/pympipool/queue/executor.py new file mode 100644 index 00000000..48d3b3ab --- /dev/null +++ b/pympipool/queue/executor.py @@ -0,0 +1,171 @@ +from threading import Thread + +from pympipool.share.executor import ExecutorBase +from pympipool.share.communication import SocketInterface +from pympipool.share.serial import ( + execute_serial_tasks_loop, + get_parallel_subprocess_command, + execute_parallel_tasks_loop, +) + + +def execute_parallel_tasks( + future_queue, + cores, + oversubscribe=False, + enable_flux_backend=False, + cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, +): + interface = SocketInterface() + command_lst = get_parallel_subprocess_command( + port_selected=interface.bind_to_random_port(), + cores=cores, + cores_per_task=1, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_mpi4py_backend=False, + ) + if queue_adapter is not None: + queue_adapter.submit( + working_directory=cwd, + cores=cores, + command=' '.join(command_lst), + **queue_adapter_kwargs + ) + else: + interface.bootup(command_lst=command_lst, cwd=cwd) + execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) + + +def execute_serial_tasks( + future_queue, + cores, + oversubscribe=False, + enable_flux_backend=False, + cwd=None, + sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, +): + future_dict = {} + interface = SocketInterface() + command_lst = get_parallel_subprocess_command( + port_selected=interface.bind_to_random_port(), + cores=cores, + cores_per_task=1, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_mpi4py_backend=True, + ) + if queue_adapter is not None: + queue_adapter.submit( + working_directory=cwd, + cores=cores, + command=' '.join(command_lst), + **queue_adapter_kwargs + ) + else: + interface.bootup(command_lst=command_lst, cwd=cwd) + execute_serial_tasks_loop( + interface=interface, + future_queue=future_queue, + future_dict=future_dict, + sleep_interval=sleep_interval, + ) + + +class QueueExecutor(ExecutorBase): + """ + The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. + In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process + and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + cores (int): defines the number of MPI ranks to use for each function call + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False + enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec + init_function (None): optional function to preset arguments for functions which are submitted later + cwd (str/None): current working directory where the parallel python task is executed + + Simple example: + ``` + import numpy as np + from pympipool import Executor + + def calc(i, j, k): + from mpi4py import MPI + size = MPI.COMM_WORLD.Get_size() + rank = MPI.COMM_WORLD.Get_rank() + return np.array([i, j, k]), size, rank + + def init_k(): + return {"k": 3} + + with Executor(cores=2, init_function=init_k) as p: + fs = p.submit(calc, 2, j=4) + print(fs.result()) + + >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + cores, + oversubscribe=False, + enable_flux_backend=False, + init_function=None, + cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, + ): + super().__init__() + self._process = Thread( + target=execute_parallel_tasks, + args=( + self._future_queue, + cores, + oversubscribe, + enable_flux_backend, + cwd, + queue_adapter, + queue_adapter_kwargs + ), + ) + self._process.start() + if init_function is not None: + self._future_queue.put( + {"init": True, "fn": init_function, "args": (), "kwargs": {}} + ) + + +class PoolExecutor(ExecutorBase): + def __init__( + self, + max_workers=1, + oversubscribe=False, + enable_flux_backend=False, + cwd=None, + sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, + ): + super().__init__() + self._process = Thread( + target=execute_serial_tasks, + args=( + self._future_queue, + max_workers, + oversubscribe, + enable_flux_backend, + cwd, + sleep_interval, + queue_adapter, + queue_adapter_kwargs, + ), + ) + self._process.start() diff --git a/pympipool/queue/pool.py b/pympipool/queue/pool.py new file mode 100644 index 00000000..31267c86 --- /dev/null +++ b/pympipool/queue/pool.py @@ -0,0 +1,115 @@ +from pympipool.share.pool import Pool as PoolBase, MPISpawnPool as MPISpawnPoolBase +from pympipool.share.serial import get_parallel_subprocess_command + + +class Pool(PoolBase): + """ + The pympipool.Pool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast to the + mpi4py.futures.MPIPoolExecutor the pympipool.Pool can be executed in a serial python process and does not require + the python script to be executed with MPI. Still internally the pympipool.Pool uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + max_workers (int): defines the total number of MPI ranks to use + cores_per_task (int): defines the number of MPI ranks per task + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) + + Simple example: + ``` + import numpy as np + from pympipool import Pool + + def calc(i): + return np.array(i ** 2) + + with Pool(cores=2) as p: + print(p.map(func=calc, iterable=[1, 2, 3, 4])) + ``` + """ + def __init__( + self, + max_workers=1, + oversubscribe=False, + enable_flux_backend=False, + cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, + ): + super(PoolBase, self).__init__() + command_lst = get_parallel_subprocess_command( + port_selected=self._interface.bind_to_random_port(), + cores=max_workers, + cores_per_task=1, + oversubscribe=oversubscribe, + enable_flux_backend=enable_flux_backend, + enable_mpi4py_backend=True, + ) + if queue_adapter is not None: + queue_adapter.submit( + working_directory=cwd, + cores=max_workers, + command=' '.join(command_lst), + **queue_adapter_kwargs + ) + else: + self._interface.bootup( + command_lst=command_lst, + cwd=cwd, + ) + + +class MPISpawnPool(MPISpawnPoolBase): + """ + The pympipool.MPISpawnPool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast + to the mpi4py.futures.MPIPoolExecutor the pympipool.MPISpawnPool can be executed in a serial python process and does + not require the python script to be executed with MPI. Still internally the pympipool.Pool uses the + mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the + usability in particular when used in combination with Jupyter notebooks. + + Args: + max_ranks (int): defines the total number of MPI ranks to use + ranks_per_task (int): defines the number of MPI ranks per task + oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) + + Simple example: + ``` + from pympipool import MPISpawnPool + + def calc(i, comm): + return i, comm.Get_size(), comm.Get_rank() + + with MPISpawnPool(max_ranks=4, ranks_per_task=2) as p: + print(p.map(func=calc, iterable=[1, 2, 3, 4])) + ``` + """ + def __init__( + self, + max_ranks=1, + ranks_per_task=1, + oversubscribe=False, + cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, + ): + super(MPISpawnPoolBase, self).__init__() + command_lst = get_parallel_subprocess_command( + port_selected=self._interface.bind_to_random_port(), + cores=max_ranks, + cores_per_task=ranks_per_task, + oversubscribe=oversubscribe, + enable_flux_backend=False, + enable_mpi4py_backend=True, + ) + if queue_adapter is not None: + queue_adapter.submit( + working_directory=cwd, + cores=max_ranks, + command=' '.join(command_lst), + **queue_adapter_kwargs + ) + else: + self._interface.bootup( + command_lst=command_lst, + cwd=cwd, + ) From 5a8946c8b066ff63ecb637ef7a5f55c1a02bac32 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 16:44:12 -0600 Subject: [PATCH 2/9] black formatting --- pympipool/queue/executor.py | 6 +++--- pympipool/queue/pool.py | 6 ++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pympipool/queue/executor.py b/pympipool/queue/executor.py index 48d3b3ab..de562053 100644 --- a/pympipool/queue/executor.py +++ b/pympipool/queue/executor.py @@ -31,7 +31,7 @@ def execute_parallel_tasks( queue_adapter.submit( working_directory=cwd, cores=cores, - command=' '.join(command_lst), + command=" ".join(command_lst), **queue_adapter_kwargs ) else: @@ -63,7 +63,7 @@ def execute_serial_tasks( queue_adapter.submit( working_directory=cwd, cores=cores, - command=' '.join(command_lst), + command=" ".join(command_lst), **queue_adapter_kwargs ) else: @@ -133,7 +133,7 @@ def __init__( enable_flux_backend, cwd, queue_adapter, - queue_adapter_kwargs + queue_adapter_kwargs, ), ) self._process.start() diff --git a/pympipool/queue/pool.py b/pympipool/queue/pool.py index 31267c86..3cf95af8 100644 --- a/pympipool/queue/pool.py +++ b/pympipool/queue/pool.py @@ -27,6 +27,7 @@ def calc(i): print(p.map(func=calc, iterable=[1, 2, 3, 4])) ``` """ + def __init__( self, max_workers=1, @@ -49,7 +50,7 @@ def __init__( queue_adapter.submit( working_directory=cwd, cores=max_workers, - command=' '.join(command_lst), + command=" ".join(command_lst), **queue_adapter_kwargs ) else: @@ -83,6 +84,7 @@ def calc(i, comm): print(p.map(func=calc, iterable=[1, 2, 3, 4])) ``` """ + def __init__( self, max_ranks=1, @@ -105,7 +107,7 @@ def __init__( queue_adapter.submit( working_directory=cwd, cores=max_ranks, - command=' '.join(command_lst), + command=" ".join(command_lst), **queue_adapter_kwargs ) else: From b520ca3ae2d6288a7a82fee56b99fb27950453f7 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 17:09:26 -0600 Subject: [PATCH 3/9] Move pysqa integration to socket interface --- pympipool/queue/__init__.py | 0 pympipool/queue/executor.py | 171 ------------------------------- pympipool/queue/pool.py | 117 --------------------- pympipool/share/communication.py | 28 +++-- pympipool/share/executor.py | 16 ++- pympipool/share/pool.py | 20 +++- pympipool/share/serial.py | 2 + 7 files changed, 52 insertions(+), 302 deletions(-) delete mode 100644 pympipool/queue/__init__.py delete mode 100644 pympipool/queue/executor.py delete mode 100644 pympipool/queue/pool.py diff --git a/pympipool/queue/__init__.py b/pympipool/queue/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pympipool/queue/executor.py b/pympipool/queue/executor.py deleted file mode 100644 index de562053..00000000 --- a/pympipool/queue/executor.py +++ /dev/null @@ -1,171 +0,0 @@ -from threading import Thread - -from pympipool.share.executor import ExecutorBase -from pympipool.share.communication import SocketInterface -from pympipool.share.serial import ( - execute_serial_tasks_loop, - get_parallel_subprocess_command, - execute_parallel_tasks_loop, -) - - -def execute_parallel_tasks( - future_queue, - cores, - oversubscribe=False, - enable_flux_backend=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, -): - interface = SocketInterface() - command_lst = get_parallel_subprocess_command( - port_selected=interface.bind_to_random_port(), - cores=cores, - cores_per_task=1, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_mpi4py_backend=False, - ) - if queue_adapter is not None: - queue_adapter.submit( - working_directory=cwd, - cores=cores, - command=" ".join(command_lst), - **queue_adapter_kwargs - ) - else: - interface.bootup(command_lst=command_lst, cwd=cwd) - execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) - - -def execute_serial_tasks( - future_queue, - cores, - oversubscribe=False, - enable_flux_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, -): - future_dict = {} - interface = SocketInterface() - command_lst = get_parallel_subprocess_command( - port_selected=interface.bind_to_random_port(), - cores=cores, - cores_per_task=1, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_mpi4py_backend=True, - ) - if queue_adapter is not None: - queue_adapter.submit( - working_directory=cwd, - cores=cores, - command=" ".join(command_lst), - **queue_adapter_kwargs - ) - else: - interface.bootup(command_lst=command_lst, cwd=cwd) - execute_serial_tasks_loop( - interface=interface, - future_queue=future_queue, - future_dict=future_dict, - sleep_interval=sleep_interval, - ) - - -class QueueExecutor(ExecutorBase): - """ - The pympipool.Executor behaves like the concurrent.futures.Executor but it uses mpi4py to execute parallel tasks. - In contrast to the mpi4py.futures.MPIPoolExecutor the pympipool.Executor can be executed in a serial python process - and does not require the python script to be executed with MPI. Still internally the pympipool.Executor uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - cores (int): defines the number of MPI ranks to use for each function call - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - default False - enable_flux_backend (bool): use the flux-framework as backend rather than just calling mpiexec - init_function (None): optional function to preset arguments for functions which are submitted later - cwd (str/None): current working directory where the parallel python task is executed - - Simple example: - ``` - import numpy as np - from pympipool import Executor - - def calc(i, j, k): - from mpi4py import MPI - size = MPI.COMM_WORLD.Get_size() - rank = MPI.COMM_WORLD.Get_rank() - return np.array([i, j, k]), size, rank - - def init_k(): - return {"k": 3} - - with Executor(cores=2, init_function=init_k) as p: - fs = p.submit(calc, 2, j=4) - print(fs.result()) - - >>> [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] - ``` - """ - - def __init__( - self, - cores, - oversubscribe=False, - enable_flux_backend=False, - init_function=None, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = Thread( - target=execute_parallel_tasks, - args=( - self._future_queue, - cores, - oversubscribe, - enable_flux_backend, - cwd, - queue_adapter, - queue_adapter_kwargs, - ), - ) - self._process.start() - if init_function is not None: - self._future_queue.put( - {"init": True, "fn": init_function, "args": (), "kwargs": {}} - ) - - -class PoolExecutor(ExecutorBase): - def __init__( - self, - max_workers=1, - oversubscribe=False, - enable_flux_backend=False, - cwd=None, - sleep_interval=0.1, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super().__init__() - self._process = Thread( - target=execute_serial_tasks, - args=( - self._future_queue, - max_workers, - oversubscribe, - enable_flux_backend, - cwd, - sleep_interval, - queue_adapter, - queue_adapter_kwargs, - ), - ) - self._process.start() diff --git a/pympipool/queue/pool.py b/pympipool/queue/pool.py deleted file mode 100644 index 3cf95af8..00000000 --- a/pympipool/queue/pool.py +++ /dev/null @@ -1,117 +0,0 @@ -from pympipool.share.pool import Pool as PoolBase, MPISpawnPool as MPISpawnPoolBase -from pympipool.share.serial import get_parallel_subprocess_command - - -class Pool(PoolBase): - """ - The pympipool.Pool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast to the - mpi4py.futures.MPIPoolExecutor the pympipool.Pool can be executed in a serial python process and does not require - the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_workers (int): defines the total number of MPI ranks to use - cores_per_task (int): defines the number of MPI ranks per task - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - - Simple example: - ``` - import numpy as np - from pympipool import Pool - - def calc(i): - return np.array(i ** 2) - - with Pool(cores=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_workers=1, - oversubscribe=False, - enable_flux_backend=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super(PoolBase, self).__init__() - command_lst = get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_workers, - cores_per_task=1, - oversubscribe=oversubscribe, - enable_flux_backend=enable_flux_backend, - enable_mpi4py_backend=True, - ) - if queue_adapter is not None: - queue_adapter.submit( - working_directory=cwd, - cores=max_workers, - command=" ".join(command_lst), - **queue_adapter_kwargs - ) - else: - self._interface.bootup( - command_lst=command_lst, - cwd=cwd, - ) - - -class MPISpawnPool(MPISpawnPoolBase): - """ - The pympipool.MPISpawnPool behaves like the multiprocessing.Pool but it uses mpi4py to distribute tasks. In contrast - to the mpi4py.futures.MPIPoolExecutor the pympipool.MPISpawnPool can be executed in a serial python process and does - not require the python script to be executed with MPI. Still internally the pympipool.Pool uses the - mpi4py.futures.MPIPoolExecutor, consequently it is primarily an abstraction of its functionality to improve the - usability in particular when used in combination with Jupyter notebooks. - - Args: - max_ranks (int): defines the total number of MPI ranks to use - ranks_per_task (int): defines the number of MPI ranks per task - oversubscribe (bool): adds the `--oversubscribe` command line flag (OpenMPI only) - - Simple example: - ``` - from pympipool import MPISpawnPool - - def calc(i, comm): - return i, comm.Get_size(), comm.Get_rank() - - with MPISpawnPool(max_ranks=4, ranks_per_task=2) as p: - print(p.map(func=calc, iterable=[1, 2, 3, 4])) - ``` - """ - - def __init__( - self, - max_ranks=1, - ranks_per_task=1, - oversubscribe=False, - cwd=None, - queue_adapter=None, - queue_adapter_kwargs=None, - ): - super(MPISpawnPoolBase, self).__init__() - command_lst = get_parallel_subprocess_command( - port_selected=self._interface.bind_to_random_port(), - cores=max_ranks, - cores_per_task=ranks_per_task, - oversubscribe=oversubscribe, - enable_flux_backend=False, - enable_mpi4py_backend=True, - ) - if queue_adapter is not None: - queue_adapter.submit( - working_directory=cwd, - cores=max_ranks, - command=" ".join(command_lst), - **queue_adapter_kwargs - ) - else: - self._interface.bootup( - command_lst=command_lst, - cwd=cwd, - ) diff --git a/pympipool/share/communication.py b/pympipool/share/communication.py index 51ca4fe0..79467683 100644 --- a/pympipool/share/communication.py +++ b/pympipool/share/communication.py @@ -5,10 +5,12 @@ class SocketInterface(object): - def __init__(self): + def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): self._context = zmq.Context() self._socket = self._context.socket(zmq.PAIR) self._process = None + self._queue_adapter = queue_adapter + self._queue_adapter_kwargs = queue_adapter_kwargs def send_dict(self, input_dict): self._socket.send(cloudpickle.dumps(input_dict)) @@ -28,14 +30,22 @@ def send_and_receive_dict(self, input_dict): def bind_to_random_port(self): return self._socket.bind_to_random_port("tcp://*") - def bootup(self, command_lst, cwd=None): - self._process = subprocess.Popen( - args=command_lst, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=subprocess.PIPE, - cwd=cwd, - ) + def bootup(self, command_lst, cwd=None, cores=None): + if self._queue_adapter is not None: + self._queue_adapter.submit( + working_directory=cwd, + cores=cores, + command=" ".join(command_lst), + **self._queue_adapter_kwargs + ) + else: + self._process = subprocess.Popen( + args=command_lst, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + cwd=cwd, + ) def shutdown(self, wait=True): result = None diff --git a/pympipool/share/executor.py b/pympipool/share/executor.py index 86014efc..eb97c2c8 100644 --- a/pympipool/share/executor.py +++ b/pympipool/share/executor.py @@ -96,11 +96,21 @@ def __init__( enable_flux_backend=False, init_function=None, cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, ): super().__init__() self._process = Thread( target=execute_parallel_tasks, - args=(self._future_queue, cores, oversubscribe, enable_flux_backend, cwd), + args=( + self._future_queue, + cores, + oversubscribe, + enable_flux_backend, + cwd, + queue_adapter, + queue_adapter_kwargs, + ), ) self._process.start() if init_function is not None: @@ -117,6 +127,8 @@ def __init__( enable_flux_backend=False, cwd=None, sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, ): super().__init__() self._process = Thread( @@ -128,6 +140,8 @@ def __init__( enable_flux_backend, cwd, sleep_interval, + queue_adapter, + queue_adapter_kwargs, ), ) self._process.start() diff --git a/pympipool/share/pool.py b/pympipool/share/pool.py index bbf8d610..13c09043 100644 --- a/pympipool/share/pool.py +++ b/pympipool/share/pool.py @@ -3,9 +3,11 @@ class PoolBase(object): - def __init__(self): + def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): self._future_dict = {} - self._interface = SocketInterface() + self._interface = SocketInterface( + queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + ) cloudpickle_register(ind=3) def __enter__(self): @@ -51,8 +53,12 @@ def __init__( oversubscribe=False, enable_flux_backend=False, cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, ): - super().__init__() + super().__init__( + queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + ) self._interface.bootup( command_lst=get_parallel_subprocess_command( port_selected=self._interface.bind_to_random_port(), @@ -63,6 +69,7 @@ def __init__( enable_mpi4py_backend=True, ), cwd=cwd, + cores=max_workers, ) def map(self, func, iterable, chunksize=None): @@ -145,8 +152,12 @@ def __init__( ranks_per_task=1, oversubscribe=False, cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, ): - super().__init__() + super().__init__( + queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + ) self._interface.bootup( command_lst=get_parallel_subprocess_command( port_selected=self._interface.bind_to_random_port(), @@ -157,6 +168,7 @@ def __init__( enable_mpi4py_backend=True, ), cwd=cwd, + cores=max_ranks, ) def map(self, func, iterable, chunksize=None): diff --git a/pympipool/share/serial.py b/pympipool/share/serial.py index a17d91a5..00cfa6fe 100644 --- a/pympipool/share/serial.py +++ b/pympipool/share/serial.py @@ -129,6 +129,7 @@ def execute_parallel_tasks( enable_mpi4py_backend=False, ), cwd=cwd, + cores=cores, ) execute_parallel_tasks_loop(interface=interface, future_queue=future_queue) @@ -153,6 +154,7 @@ def execute_serial_tasks( enable_mpi4py_backend=True, ), cwd=cwd, + cores=cores, ) execute_serial_tasks_loop( interface=interface, From 15ebe1a835cfc500e0cb39ff284a4c863c001833 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 17:31:14 -0600 Subject: [PATCH 4/9] fixes --- pympipool/share/serial.py | 18 +++++++++++++++--- tests/test_interface.py | 2 +- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pympipool/share/serial.py b/pympipool/share/serial.py index 00cfa6fe..20ddbc8e 100644 --- a/pympipool/share/serial.py +++ b/pympipool/share/serial.py @@ -116,9 +116,17 @@ def execute_serial_tasks_loop(interface, future_queue, future_dict, sleep_interv def execute_parallel_tasks( - future_queue, cores, oversubscribe=False, enable_flux_backend=False, cwd=None + future_queue, + cores, + oversubscribe=False, + enable_flux_backend=False, + cwd=None, + queue_adapter=None, + queue_adapter_kwargs=None, ): - interface = SocketInterface() + interface = SocketInterface( + queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + ) interface.bootup( command_lst=get_parallel_subprocess_command( port_selected=interface.bind_to_random_port(), @@ -141,9 +149,13 @@ def execute_serial_tasks( enable_flux_backend=False, cwd=None, sleep_interval=0.1, + queue_adapter=None, + queue_adapter_kwargs=None, ): future_dict = {} - interface = SocketInterface() + interface = SocketInterface( + queue_adapter=queue_adapter, queue_adapter_kwargs=queue_adapter_kwargs + ) interface.bootup( command_lst=get_parallel_subprocess_command( port_selected=interface.bind_to_random_port(), diff --git a/tests/test_interface.py b/tests/test_interface.py index 621ddd8b..6ae8aa97 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -12,7 +12,7 @@ class TestInterface(unittest.TestCase): def test_interface(self): cloudpickle_register(ind=1) task_dict = {"fn": calc, 'args': (), "kwargs": {"i": 2}} - interface = SocketInterface() + interface = SocketInterface(queue_adapter=None, queue_adapter_kwargs=None) interface.bootup( command_lst=get_parallel_subprocess_command( port_selected=interface.bind_to_random_port(), From b2d70c5deeca11aaf53f9d32d5608ce07b57880c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 17:48:24 -0600 Subject: [PATCH 5/9] black formatting --- pympipool/share/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/share/executor.py b/pympipool/share/executor.py index 6c242c6c..6b8c6aaa 100644 --- a/pympipool/share/executor.py +++ b/pympipool/share/executor.py @@ -109,7 +109,7 @@ def __init__( "enable_flux_backend": enable_flux_backend, "cwd": cwd, "queue_adapter": queue_adapter, - "queue_adapter_kwargs": queue_adapter_kwargs + "queue_adapter_kwargs": queue_adapter_kwargs, }, ) self._process.start() From 6d0307038d9ec94d9e13c8aadf349afa81d846a4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 19:06:30 -0600 Subject: [PATCH 6/9] submit_job() vs. submit() --- pympipool/share/communication.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/share/communication.py b/pympipool/share/communication.py index 79467683..8cc9575d 100644 --- a/pympipool/share/communication.py +++ b/pympipool/share/communication.py @@ -32,7 +32,7 @@ def bind_to_random_port(self): def bootup(self, command_lst, cwd=None, cores=None): if self._queue_adapter is not None: - self._queue_adapter.submit( + self._queue_adapter.submit_job( working_directory=cwd, cores=cores, command=" ".join(command_lst), From eadb5191132531adbf515a37219943cd410a8299 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 19:19:56 -0600 Subject: [PATCH 7/9] always communicate hostname when queue_adapter is set --- pympipool/share/pool.py | 2 ++ pympipool/share/serial.py | 7 ++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pympipool/share/pool.py b/pympipool/share/pool.py index 13c09043..dea4bf92 100644 --- a/pympipool/share/pool.py +++ b/pympipool/share/pool.py @@ -67,6 +67,7 @@ def __init__( oversubscribe=oversubscribe, enable_flux_backend=enable_flux_backend, enable_mpi4py_backend=True, + enable_multi_host=queue_adapter is not None, ), cwd=cwd, cores=max_workers, @@ -166,6 +167,7 @@ def __init__( oversubscribe=oversubscribe, enable_flux_backend=False, enable_mpi4py_backend=True, + enable_multi_host=queue_adapter is not None, ), cwd=cwd, cores=max_ranks, diff --git a/pympipool/share/serial.py b/pympipool/share/serial.py index 20ddbc8e..ccba2378 100644 --- a/pympipool/share/serial.py +++ b/pympipool/share/serial.py @@ -18,6 +18,7 @@ def command_line_options( oversubscribe=False, enable_flux_backend=False, enable_mpi4py_backend=True, + enable_multi_host=False ): if enable_flux_backend: command_lst = ["flux", "run"] @@ -34,7 +35,7 @@ def command_line_options( else: command_lst += ["-n", str(cores), "python"] command_lst += [path] - if enable_flux_backend: + if enable_flux_backend or enable_multi_host: command_lst += [ "--host", hostname, @@ -60,6 +61,7 @@ def get_parallel_subprocess_command( oversubscribe=False, enable_flux_backend=False, enable_mpi4py_backend=True, + enable_multi_host=False, ): if enable_mpi4py_backend: executable = "mpipool.py" @@ -74,6 +76,7 @@ def get_parallel_subprocess_command( oversubscribe=oversubscribe, enable_flux_backend=enable_flux_backend, enable_mpi4py_backend=enable_mpi4py_backend, + enable_multi_host=enable_multi_host, ) return command_lst @@ -135,6 +138,7 @@ def execute_parallel_tasks( oversubscribe=oversubscribe, enable_flux_backend=enable_flux_backend, enable_mpi4py_backend=False, + enable_multi_host=queue_adapter is not None, ), cwd=cwd, cores=cores, @@ -164,6 +168,7 @@ def execute_serial_tasks( oversubscribe=oversubscribe, enable_flux_backend=enable_flux_backend, enable_mpi4py_backend=True, + enable_multi_host=queue_adapter is not None, ), cwd=cwd, cores=cores, From a690e2cce281730fbb7a46f26344da5b0916079c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 19:20:18 -0600 Subject: [PATCH 8/9] black formatting --- pympipool/share/serial.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pympipool/share/serial.py b/pympipool/share/serial.py index ccba2378..f7d56e5f 100644 --- a/pympipool/share/serial.py +++ b/pympipool/share/serial.py @@ -18,7 +18,7 @@ def command_line_options( oversubscribe=False, enable_flux_backend=False, enable_mpi4py_backend=True, - enable_multi_host=False + enable_multi_host=False, ): if enable_flux_backend: command_lst = ["flux", "run"] From bf7a7f64f24e4f4265275230be9b1f544d918364 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 12 Jul 2023 19:30:32 -0600 Subject: [PATCH 9/9] fix shutdown --- pympipool/share/communication.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pympipool/share/communication.py b/pympipool/share/communication.py index 8cc9575d..ac29e4f0 100644 --- a/pympipool/share/communication.py +++ b/pympipool/share/communication.py @@ -54,6 +54,10 @@ def shutdown(self, wait=True): input_dict={"shutdown": True, "wait": wait} ) self._process_close(wait=wait) + elif self._queue_adapter is not None and self._socket is not None: + result = self.send_and_receive_dict( + input_dict={"shutdown": True, "wait": wait} + ) if self._socket is not None: self._socket.close() if self._context is not None: