Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pympipool/external_interfaces/communication.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import subprocess
from time import sleep

import cloudpickle
import zmq
Expand Down Expand Up @@ -37,6 +38,9 @@ def receive_dict(self):
Returns:
dict: dictionary with response received from the connected client
"""
while self.is_alive():
if self._socket.poll(timeout=1000) != 0:
break
output = cloudpickle.loads(self._socket.recv())
if "result" in output.keys():
return output["result"]
Expand Down Expand Up @@ -95,6 +99,18 @@ def bootup(self, command_lst, cwd=None, cores=None):
cwd=cwd,
)

def is_alive(self):
if self._process is not None and self._process.poll() is None:
return True
elif self._queue_adapter is not None:
return True
elif self._process is None:
return False
else:
raise subprocess.SubprocessError(
"The subprocess exited with error code: ", self._process.returncode
)

def shutdown(self, wait=True):
result = None
if self._process is not None and self._process.poll() is None:
Expand Down
89 changes: 57 additions & 32 deletions pympipool/external_interfaces/pool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC
import subprocess

from pympipool.external_interfaces.communication import SocketInterface
from pympipool.shared_functions.external_interfaces import (
Expand Down Expand Up @@ -107,14 +108,20 @@ def map(self, func, iterable, chunksize=None):
# multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults
if chunksize is None:
chunksize = 1
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": True,
}
)
if self._interface.is_alive():
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": True,
}
)
else:
raise subprocess.SubprocessError(
"The subprocess exited with error code: ",
self._interface._process.returncode,
)

def starmap(self, func, iterable, chunksize=None):
"""
Expand All @@ -131,14 +138,20 @@ def starmap(self, func, iterable, chunksize=None):
# multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults
if chunksize is None:
chunksize = 1
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": False,
}
)
if self._interface.is_alive():
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": False,
}
)
else:
raise subprocess.SubprocessError(
"The subprocess exited with error code: ",
self._interface._process.returncode,
)


class MPISpawnPool(PoolBase):
Expand Down Expand Up @@ -214,14 +227,20 @@ def map(self, func, iterable, chunksize=None):
# multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults
if chunksize is None:
chunksize = 1
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": True,
}
)
if self._interface.is_alive():
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": True,
}
)
else:
raise subprocess.SubprocessError(
"The subprocess exited with error code: ",
self._interface._process.returncode,
)

def starmap(self, func, iterable, chunksize=None):
"""
Expand All @@ -238,11 +257,17 @@ def starmap(self, func, iterable, chunksize=None):
# multiprocessing.pool.Pool and mpi4py.future.ExecutorPool have different defaults
if chunksize is None:
chunksize = 1
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": False,
}
)
if self._interface.is_alive():
return self._interface.send_and_receive_dict(
input_dict={
"fn": func,
"iterable": iterable,
"chunksize": chunksize,
"map": False,
}
)
else:
raise subprocess.SubprocessError(
"The subprocess exited with error code: ",
self._interface._process.returncode,
)
41 changes: 27 additions & 14 deletions pympipool/shared_functions/external_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,20 +257,30 @@ def get_parallel_subprocess_command(

def _execute_parallel_tasks_loop(interface, future_queue):
while True:
task_dict = future_queue.get()
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
f = task_dict.pop("future")
if f.set_running_or_notify_cancel():
try:
f.set_result(interface.send_and_receive_dict(input_dict=task_dict))
except Exception as thread_exeception:
f.set_exception(exception=thread_exeception)
raise thread_exeception
elif "fn" in task_dict.keys() and "init" in task_dict.keys():
interface.send_dict(input_dict=task_dict)
try:
task_dict = future_queue.get_nowait()
except queue.Empty:
pass
else:
if "shutdown" in task_dict.keys() and task_dict["shutdown"]:
interface.shutdown(wait=task_dict["wait"])
break
elif "fn" in task_dict.keys() and "future" in task_dict.keys():
f = task_dict.pop("future")
if f.set_running_or_notify_cancel() and interface.is_alive():
try:
f.set_result(
interface.send_and_receive_dict(input_dict=task_dict)
)
except Exception as thread_exeception:
f.set_exception(exception=thread_exeception)
raise thread_exeception
elif not interface.is_alive():
f.cancel()
cancel_items_in_queue(que=future_queue)
break
elif "fn" in task_dict.keys() and "init" in task_dict.keys():
interface.send_dict(input_dict=task_dict)


def _execute_serial_tasks_loop(
Expand All @@ -296,6 +306,9 @@ def _execute_serial_tasks_loop(
_update_future_dict(
interface=interface, future_dict=future_dict, sleep_interval=sleep_interval
)
if not interface.is_alive():
cancel_items_in_queue(que=future_queue)
break


def _update_future_dict(interface, future_dict, sleep_interval=0.1):
Expand Down