Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MRG] BUG: send length of data on MPI child completion #196

Merged
merged 10 commits into from Oct 21, 2020
13 changes: 12 additions & 1 deletion doc/parallel.rst
Expand Up @@ -32,7 +32,7 @@ This backend will use MPI (Message Passing Interface) on the system to split neu

**MacOS Dependencies**::

$ conda install yes openmpi mpi4py
$ conda install -y openmpi mpi4py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the contributing guide still recommends the pip install. Perhaps we should update it and point here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't really tell why mpi4py is included in "Building the Documentation". It seems like it would fail on macs. Also, it may not even be necessary for just building the docs. I think the plot_simulate_evoked.py example will fall back to the Joblib backend.

I did add a link to this page at the top of the contribution guide.

$ pip install psutil

**MacOS Environment**::
Expand Down Expand Up @@ -70,3 +70,14 @@ Verifies that MPI, NEURON, and Python are all working together.
# set n_procs to the number of processors MPI can use (up to number of cores on system)
with MPIBackend(n_procs=2):
dpls = simulate_dipole(net, n_trials=1)

**Notes for contributors**::

MPI parallelization with NEURON requires that the simulation be launched with the ``nrniv`` binary from the command-line. The ``mpiexec`` command is used to launch multiple ``nrniv`` processes which communicate via MPI. This is done using ``subprocess.Popen()`` in ``MPIBackend.simulate()`` to launch parallel child processes (``MPISimulation``) to carry out the simulation. The communication sequence between ``MPIBackend`` and ``MPISimulation`` is outlined below.

#. In order to pass the parameters from ``MPIBackend`` the child ``MPISimulation`` processes' ``stdin`` is used. Parameters are pickled and base64 encoded before being written to the processes' ``stdin``. Closing ``stdin`` (from the ``MPIBackend`` side) signals to the child processes that it is done sending parameters and the parallel simulation should begin.
#. Output from the simulation (either to ``stdout`` or ``stderr``) is communicated back to ``MPIBackend``, where it will be printed to the console. Typical output at this point would be simulation progress messages as well as any MPI warnings/errors during the simulation.
#. Once the simulation has completed, the child process with rank 0 (in ``MPISimulation.run()``) sends a signal to ``MPIBackend`` that the simulation has completed and simulation data will be written to ``stderr``. The data is pickled and base64 encoded before it is written to ``stderr`` in ``MPISimulation._write_data_stderr()``. No other output (e.g. raised exceptions) can go to ``stderr`` during this step.
#. At this point, the child process with rank 0 (the only rank with complete simulation results) will send another signal that includes the expected length of the pickled and encoded data (in bytes) to ``stdout``. ``MPIBackend`` will use this signal to know that data transfer has completed and check the length of data it receives.

It is important that ``MPISimulation`` uses the ``flush()`` method on ``stdout`` before and after each signal to ensure that the signal will immediately be available for reading by ``MPIBackend`` and not buffered with other output.
170 changes: 108 additions & 62 deletions hnn_core/mpi_child.py
Expand Up @@ -5,6 +5,8 @@
# Authors: Blake Caldwell <blake_caldwell@brown.edu>

import sys
import pickle
import codecs


def _read_all_bytes(stream_in, chunk_size=4096):
Expand All @@ -18,79 +20,123 @@ def _read_all_bytes(stream_in, chunk_size=4096):
return all_data


def run_mpi_simulation():
from mpi4py import MPI

import pickle
import codecs

from hnn_core import Network
from hnn_core.network_builder import NetworkBuilder, _simulate_single_trial

# using template for reading stdin from:
# https://github.com/cloudpipe/cloudpickle/blob/master/tests/testutils.py

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# get parameters from stdin
if rank == 0:
stream_in = sys.stdin
# Force the use of bytes streams under Python 3
if hasattr(sys.stdin, 'buffer'):
stream_in = sys.stdin.buffer
input_bytes = _read_all_bytes(stream_in)
stream_in.close()

params = pickle.loads(codecs.decode(input_bytes, "base64"))
else:
params = None

params = comm.bcast(params, root=0)
net = Network(params)
# XXX store the initial prng_seedcore params to be referenced in each trial
prng_seedcore_initial = net.params['prng_*'].copy()

sim_data = []
for trial_idx in range(params['N_trials']):
# XXX this should be built into NetworkBuilder
# update prng_seedcore params to provide jitter between trials
for param_key in prng_seedcore_initial.keys():
net.params[param_key] = (prng_seedcore_initial[param_key] +
trial_idx)
neuron_net = NetworkBuilder(net)
dpl = _simulate_single_trial(neuron_net, trial_idx)
if rank == 0:
spikedata = neuron_net.get_data_from_neuron()
sim_data.append((dpl, spikedata))

# flush output buffers from all ranks
sys.stdout.flush()
sys.stderr.flush()

if rank == 0:
# the parent process is waiting for the string "sim_complete"
# to signal that the output will only contain sim_data
sys.stdout.write('sim_complete')
sys.stdout.flush() # flush to ensure signal is not buffered
class MPISimulation(object):
"""The MPISimulation class.
Parameters
----------
skip_mpi_import : bool | None
Skip importing MPI. Only useful for testing with pytest.

Attributes
----------
comm : mpi4py.Comm object
The handle used for communicating among MPI processes
rank : int
The rank for each processor part of the MPI communicator
"""

def __init__(self, skip_mpi_import=False):
self.skip_mpi_import = skip_mpi_import
if skip_mpi_import:
self.rank = 0
else:
from mpi4py import MPI

self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()

def __enter__(self):
return self

def __exit__(self, type, value, traceback):
# skip Finalize() if we didn't import MPI on __init__
if hasattr(self, 'comm'):
from mpi4py import MPI
MPI.Finalize()

def _read_params(self):
"""Read params broadcasted to all ranks on stdin"""

# get parameters from stdin
if self.rank == 0:
input_bytes = _read_all_bytes(sys.stdin.buffer)
sys.stdin.close()

params = pickle.loads(codecs.decode(input_bytes, "base64"))
else:
params = None

params = self.comm.bcast(params, root=0)
return params

def _write_data_stderr(self, sim_data):
"""write base64 encoded data to stderr"""

# only have rank 0 write to stdout/stderr
if self.rank > 0:
return

# pickle the data and encode as base64 before sending to stderr
pickled_str = pickle.dumps(sim_data)
pickled_bytes = codecs.encode(pickled_str,
'base64')
pickled_bytes = codecs.encode(pickled_str, 'base64')

# base64 encoding requires data padded to a multiple of 4
padding = len(pickled_bytes) % 4
pickled_bytes += b"===" * padding
sys.stderr.write(pickled_bytes.decode())
sys.stderr.flush()

MPI.Finalize()
return 0
# the parent process is waiting for "end_of_sim:[#bytes]" with the
# length of data
sys.stdout.write('end_of_data:%d' % len(pickled_bytes))
sys.stdout.flush() # flush to ensure signal is not buffered

def run(self, params):
"""Run MPI simulation(s) and write results to stderr"""

from hnn_core import Network
from hnn_core.parallel_backends import _clone_and_simulate

prng_seedcore_initial = params['prng_*']

net = Network(params)
sim_data = []
for trial_idx in range(params['N_trials']):
single_sim_data = _clone_and_simulate(net, trial_idx,
prng_seedcore_initial)

# go ahead and append trial data for each rank, though
# only rank 0 has data that should be sent back to MPIBackend
sim_data.append(single_sim_data)

# flush output buffers from all ranks (any errors or status mesages)
sys.stdout.flush()
sys.stderr.flush()

if self.rank == 0:
# the parent process is waiting for "end_of_sim" to signal that
# the following stderr will only contain sim_data
sys.stdout.write('end_of_sim')
sys.stdout.flush() # flush to ensure signal is not buffered

return sim_data


if __name__ == '__main__':
"""This file is called on command-line from nrniv"""

import traceback
rc = 0

try:
rc = run_mpi_simulation()
except Exception as e:
with MPISimulation() as mpi_sim:
params = mpi_sim._read_params()
sim_data = mpi_sim.run(params)
mpi_sim._write_data_stderr(sim_data)
except Exception:
# This can be useful to indicate the problem to the
# caller (in parallel_backends.py)
print("Exception: %s" % e)
traceback.print_exc(file=sys.stdout)
blakecaldwell marked this conversation as resolved.
Show resolved Hide resolved
rc = 2

sys.exit(rc)