From 4f2ee7461405f03887df8584a3deca2964f1520f Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 26 Jul 2023 10:18:54 -0600 Subject: [PATCH 1/6] Add check to see if the process is still alive --- .../external_interfaces/communication.py | 10 +++ pympipool/external_interfaces/pool.py | 89 ++++++++++++------- .../shared_functions/external_interfaces.py | 4 + 3 files changed, 71 insertions(+), 32 deletions(-) diff --git a/pympipool/external_interfaces/communication.py b/pympipool/external_interfaces/communication.py index e70641ff..2049e724 100644 --- a/pympipool/external_interfaces/communication.py +++ b/pympipool/external_interfaces/communication.py @@ -95,6 +95,16 @@ def bootup(self, command_lst, cwd=None, cores=None): cwd=cwd, ) + def is_alive(self): + if self._process is not None and self._process.poll() is None: + return True + elif self._process is None: + return False + else: + raise subprocess.SubprocessError( + "The subprocess exited with error code: ", self._process.returncode + ) + def shutdown(self, wait=True): result = None if self._process is not None and self._process.poll() is None: diff --git a/pympipool/external_interfaces/pool.py b/pympipool/external_interfaces/pool.py index 9effff92..62b00eaa 100644 --- a/pympipool/external_interfaces/pool.py +++ b/pympipool/external_interfaces/pool.py @@ -1,4 +1,5 @@ from abc import ABC +import subprocess from pympipool.external_interfaces.communication import SocketInterface from pympipool.shared_functions.external_interfaces import ( @@ -107,14 +108,20 @@ def map(self, func, iterable, chunksize=None): # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults if chunksize is None: chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) + if self._interface.is_alive(): + return self._interface.send_and_receive_dict( + input_dict={ + "fn": func, + "iterable": iterable, + "chunksize": chunksize, + "map": True, + } + ) + else: + raise subprocess.SubprocessError( + "The subprocess exited with error code: ", + self._interface._process.returncode, + ) def starmap(self, func, iterable, chunksize=None): """ @@ -131,14 +138,20 @@ def starmap(self, func, iterable, chunksize=None): # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults if chunksize is None: chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) + if self._interface.is_alive(): + return self._interface.send_and_receive_dict( + input_dict={ + "fn": func, + "iterable": iterable, + "chunksize": chunksize, + "map": False, + } + ) + else: + raise subprocess.SubprocessError( + "The subprocess exited with error code: ", + self._interface._process.returncode, + ) class MPISpawnPool(PoolBase): @@ -214,14 +227,20 @@ def map(self, func, iterable, chunksize=None): # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults if chunksize is None: chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": True, - } - ) + if self._interface.is_alive(): + return self._interface.send_and_receive_dict( + input_dict={ + "fn": func, + "iterable": iterable, + "chunksize": chunksize, + "map": True, + } + ) + else: + raise subprocess.SubprocessError( + "The subprocess exited with error code: ", + self._interface._process.returncode, + ) def starmap(self, func, iterable, chunksize=None): """ @@ -238,11 +257,17 @@ def starmap(self, func, iterable, chunksize=None): # multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults if chunksize is None: chunksize = 1 - return self._interface.send_and_receive_dict( - input_dict={ - "fn": func, - "iterable": iterable, - "chunksize": chunksize, - "map": False, - } - ) + if self._interface.is_alive(): + return self._interface.send_and_receive_dict( + input_dict={ + "fn": func, + "iterable": iterable, + "chunksize": chunksize, + "map": False, + } + ) + else: + raise subprocess.SubprocessError( + "The subprocess exited with error code: ", + self._interface._process.returncode, + ) diff --git a/pympipool/shared_functions/external_interfaces.py b/pympipool/shared_functions/external_interfaces.py index 9f1d819d..67c7b2f2 100644 --- a/pympipool/shared_functions/external_interfaces.py +++ b/pympipool/shared_functions/external_interfaces.py @@ -271,6 +271,8 @@ def _execute_parallel_tasks_loop(interface, future_queue): raise thread_exeception elif "fn" in task_dict.keys() and "init" in task_dict.keys(): interface.send_dict(input_dict=task_dict) + if not interface.is_alive(): + break def _execute_serial_tasks_loop( @@ -296,6 +298,8 @@ def _execute_serial_tasks_loop( _update_future_dict( interface=interface, future_dict=future_dict, sleep_interval=sleep_interval ) + if not interface.is_alive(): + break def _update_future_dict(interface, future_dict, sleep_interval=0.1): From a69076f1dbd3fb078ca06de971294619c5f08d87 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 26 Jul 2023 10:31:21 -0600 Subject: [PATCH 2/6] no wait on queue --- .../shared_functions/external_interfaces.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/pympipool/shared_functions/external_interfaces.py b/pympipool/shared_functions/external_interfaces.py index 67c7b2f2..857dece2 100644 --- a/pympipool/shared_functions/external_interfaces.py +++ b/pympipool/shared_functions/external_interfaces.py @@ -257,20 +257,24 @@ def get_parallel_subprocess_command( def _execute_parallel_tasks_loop(interface, future_queue): while True: - task_dict = future_queue.get() - if "shutdown" in task_dict.keys() and task_dict["shutdown"]: - interface.shutdown(wait=task_dict["wait"]) - break - elif "fn" in task_dict.keys() and "future" in task_dict.keys(): - f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): - try: - f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) - except Exception as thread_exeception: - f.set_exception(exception=thread_exeception) - raise thread_exeception - elif "fn" in task_dict.keys() and "init" in task_dict.keys(): - interface.send_dict(input_dict=task_dict) + try: + task_dict = future_queue.get_nowait() + except queue.Empty: + pass + else: + if "shutdown" in task_dict.keys() and task_dict["shutdown"]: + interface.shutdown(wait=task_dict["wait"]) + break + elif "fn" in task_dict.keys() and "future" in task_dict.keys(): + f = task_dict.pop("future") + if f.set_running_or_notify_cancel(): + try: + f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) + except Exception as thread_exeception: + f.set_exception(exception=thread_exeception) + raise thread_exeception + elif "fn" in task_dict.keys() and "init" in task_dict.keys(): + interface.send_dict(input_dict=task_dict) if not interface.is_alive(): break From 60ac1d968753d312b4ab93090c72fbe187ae2a91 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 26 Jul 2023 10:40:06 -0600 Subject: [PATCH 3/6] try to cancel futures --- pympipool/shared_functions/external_interfaces.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pympipool/shared_functions/external_interfaces.py b/pympipool/shared_functions/external_interfaces.py index 857dece2..3647b9be 100644 --- a/pympipool/shared_functions/external_interfaces.py +++ b/pympipool/shared_functions/external_interfaces.py @@ -267,16 +267,20 @@ def _execute_parallel_tasks_loop(interface, future_queue): break elif "fn" in task_dict.keys() and "future" in task_dict.keys(): f = task_dict.pop("future") - if f.set_running_or_notify_cancel(): + if f.set_running_or_notify_cancel() and interface.is_alive(): try: - f.set_result(interface.send_and_receive_dict(input_dict=task_dict)) + f.set_result( + interface.send_and_receive_dict(input_dict=task_dict) + ) except Exception as thread_exeception: f.set_exception(exception=thread_exeception) raise thread_exeception + elif not interface.is_alive(): + f.cancel() + cancel_items_in_queue(que=future_queue) + break elif "fn" in task_dict.keys() and "init" in task_dict.keys(): interface.send_dict(input_dict=task_dict) - if not interface.is_alive(): - break def _execute_serial_tasks_loop( @@ -303,6 +307,7 @@ def _execute_serial_tasks_loop( interface=interface, future_dict=future_dict, sleep_interval=sleep_interval ) if not interface.is_alive(): + cancel_items_in_queue(que=future_queue) break From fd64f5bb29d82d5e4649d6f65b0dda7fd8ac98de Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 26 Jul 2023 10:59:46 -0600 Subject: [PATCH 4/6] add sleep --- pympipool/external_interfaces/communication.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pympipool/external_interfaces/communication.py b/pympipool/external_interfaces/communication.py index 2049e724..1a814b89 100644 --- a/pympipool/external_interfaces/communication.py +++ b/pympipool/external_interfaces/communication.py @@ -1,4 +1,5 @@ import subprocess +from time import sleep import cloudpickle import zmq @@ -96,8 +97,11 @@ def bootup(self, command_lst, cwd=None, cores=None): ) def is_alive(self): + sleep(1) # give the process time to start up if self._process is not None and self._process.poll() is None: return True + elif self._queue_adapter is not None: + return True elif self._process is None: return False else: From d1f9212af0fb18daf08c19a4edaf27e06dd6b500 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 26 Jul 2023 11:11:24 -0600 Subject: [PATCH 5/6] use zmq poller --- pympipool/external_interfaces/communication.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pympipool/external_interfaces/communication.py b/pympipool/external_interfaces/communication.py index 1a814b89..f6e49a31 100644 --- a/pympipool/external_interfaces/communication.py +++ b/pympipool/external_interfaces/communication.py @@ -17,6 +17,8 @@ class SocketInterface(object): def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): self._context = zmq.Context() self._socket = self._context.socket(zmq.PAIR) + self._poller = zmq.Poller() + self._poller.register(self._socket, zmq.POLLIN) self._process = None self._queue_adapter = queue_adapter self._queue_adapter_kwargs = queue_adapter_kwargs @@ -38,7 +40,12 @@ def receive_dict(self): Returns: dict: dictionary with response received from the connected client """ - output = cloudpickle.loads(self._socket.recv()) + output = {} + while self.is_alive(): + socks = dict(self._poller.poll(1000)) + if socks and socks.get(self._socket) == zmq.POLLIN: + output = cloudpickle.loads(self._socket.recv(zmq.NOBLOCK)) + break if "result" in output.keys(): return output["result"] else: @@ -97,7 +104,6 @@ def bootup(self, command_lst, cwd=None, cores=None): ) def is_alive(self): - sleep(1) # give the process time to start up if self._process is not None and self._process.poll() is None: return True elif self._queue_adapter is not None: From d709f61d1a392a38e828d5630186193feebfa344 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 26 Jul 2023 11:37:37 -0600 Subject: [PATCH 6/6] try poll() --- pympipool/external_interfaces/communication.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pympipool/external_interfaces/communication.py b/pympipool/external_interfaces/communication.py index f6e49a31..2d3a9c8c 100644 --- a/pympipool/external_interfaces/communication.py +++ b/pympipool/external_interfaces/communication.py @@ -17,8 +17,6 @@ class SocketInterface(object): def __init__(self, queue_adapter=None, queue_adapter_kwargs=None): self._context = zmq.Context() self._socket = self._context.socket(zmq.PAIR) - self._poller = zmq.Poller() - self._poller.register(self._socket, zmq.POLLIN) self._process = None self._queue_adapter = queue_adapter self._queue_adapter_kwargs = queue_adapter_kwargs @@ -40,12 +38,10 @@ def receive_dict(self): Returns: dict: dictionary with response received from the connected client """ - output = {} while self.is_alive(): - socks = dict(self._poller.poll(1000)) - if socks and socks.get(self._socket) == zmq.POLLIN: - output = cloudpickle.loads(self._socket.recv(zmq.NOBLOCK)) + if self._socket.poll(timeout=1000) != 0: break + output = cloudpickle.loads(self._socket.recv()) if "result" in output.keys(): return output["result"] else: