Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] BUG: send length of data on MPI child completion #196

Merged
merged 10 commits into from Oct 21, 2020
16 changes: 12 additions & 4 deletions hnn_core/mpi_child.py
Expand Up @@ -69,18 +69,26 @@ def run_mpi_simulation():
sys.stderr.flush()

if rank == 0:
# the parent process is waiting for the string "sim_complete"
# the parent process is waiting for the string "end_of_sim"
# to signal that the output will only contain sim_data
sys.stdout.write('sim_complete')
sys.stdout.write('end_of_sim')
sys.stdout.flush() # flush to ensure signal is not buffered

# pickle the data and encode as base64 before sending to stderr
pickled_str = pickle.dumps(sim_data)
pickled_bytes = codecs.encode(pickled_str,
'base64')
pickled_bytes = codecs.encode(pickled_str, 'base64')

# base64 encoding requires data padded to a multiple of 4
padding = len(pickled_bytes) % 4
pickled_bytes += b"===" * padding
sys.stderr.write(pickled_bytes.decode())
sys.stderr.flush()

# the parent process is waiting for the string "end_of_sim:"
# with the expected length of data it that it received
sys.stdout.write('end_of_data:%d' % len(pickled_bytes))
sys.stdout.flush()

MPI.Finalize()
return 0

Expand Down
55 changes: 38 additions & 17 deletions hnn_core/parallel_backends.py
Expand Up @@ -12,6 +12,7 @@
from warnings import warn
from subprocess import Popen
import selectors
import binascii

_BACKEND = None

Expand Down Expand Up @@ -230,20 +231,20 @@ def _read_data(self, fd, mask):

# only _read_stdout gets a signal from the fd. the end of simulation
# data is signalled by the process terminating
return False
return None

def _read_stdout(self, fd, mask):
"""read from fd until receiving the process simulation is complete"""
data = os.read(fd, 4096)
if data:
str_data = data.decode()
if str_data == 'sim_complete':
return True
if str_data == 'end_of_sim' or str_data.startswith('end_of_data'):
return str_data

# output from process includes newlines
sys.stdout.write(str_data)

return False
return None

def simulate(self, net):
"""Simulate the HNN model in parallel on all cores
Expand Down Expand Up @@ -317,20 +318,30 @@ def simulate(self, net):

# loop while the process is running
while proc.poll() is None:
# wait for an event on the selector, timeout after 1s and check
# that process is still running
# wait for an event on the selector, timeout after 1s
events = self.sel.select(timeout=1)
for key, mask in events:
callback = key.data
finished = callback(key.fileobj, mask)
if finished:
# finishied receiving printable output
self.sel.unregister(pipe_stdout_r)
self.sel.unregister(pipe_stderr_r)
# everything received should now be handled by _read_data
self.sel.register(pipe_stderr_r, selectors.EVENT_READ,
self._read_data)

completion_singal = callback(key.fileobj, mask)
blakecaldwell marked this conversation as resolved.
Show resolved Hide resolved
if completion_singal is not None:
if completion_singal == "end_of_sim":
# finishied receiving printable output
# everything else received is data
self.sel.unregister(pipe_stderr_r)
self.sel.register(pipe_stderr_r, selectors.EVENT_READ,
self._read_data)
elif completion_singal.startswith("end_of_data"):
split_string = completion_singal.split(':')
if len(split_string) > 1:
data_length = int(split_string[1])
self.sel.unregister(pipe_stdout_r)
else:
raise ValueError("Invalid data send completion "
"signal from child MPI process")
# there could still be data in stderr, so we return
# to waiting until the process ends

# cleanup the selector
self.sel.unregister(pipe_stderr_r)
self.sel.close()

Expand All @@ -344,11 +355,21 @@ def simulate(self, net):
if proc.returncode != 0:
raise RuntimeError("MPI simulation failed")

if not data_length == len(self.proc_data_bytes):
raise RuntimeError("Failed to receive all data from the child MPI"
" process. Expecting %d bytes, got %d" %
(data_length, len(self.proc_data_bytes)))

if len(self.proc_data_bytes) == 0:
raise RuntimeError("MPI simulation didn't return any data")

# decode base64 object
data_pickled = codecs.decode(self.proc_data_bytes, "base64")
# decode base64 byte string
try:
data_pickled = codecs.decode(self.proc_data_bytes, "base64")
except binascii.Error:
raise ValueError("Incorrect padding for data length %d bytes" %
len(self.proc_data_bytes), "(mod 4 = %d)" %
len(self.proc_data_bytes) % 4)
blakecaldwell marked this conversation as resolved.
Show resolved Hide resolved

# unpickle the data
sim_data = pickle.loads(data_pickled)
Expand Down