# More efficient data movement with MPI

Just like [we did](memmap.ipynb) manually with memmap,
you can move data more efficiently with MPI by sending it to just one engine,
and using MPI to broadcast it to the rest of the engines.


In [1]:
import socket
import os, sys, re

import numpy as np

import ipyparallel as parallel

For this demo, I will connect to a cluster with engines started with MPI.
If you have MPI and mpi4py on your machine, you can start a local cluster with MPI with:

    ipcluster start -n 8 --engines=MPI --profile mpi

In [2]:
mpi_profile = 'mpi'
rc = parallel.Client(profile=mpi_profile)
eall = rc[:]
root = rc[-1]

In [3]:
%px from mpi4py.MPI import COMM_WORLD as MPI

In [4]:
mpi_ranks = eall.apply_async(lambda : MPI.Get_rank()).get_dict()
root_rank = root.apply_sync(lambda : MPI.Get_rank())
mpi_ranks

{0: 0, 1: 2, 2: 3, 3: 1}

In [5]:
sz = 256
data = np.random.random((sz, sz))
data = data.dot(data.T)

In [6]:
%%time 
ar = eall.push({'data': data}, block=False)
ar.wait_interactive()

   4/4 tasks finished after    0 s
done
CPU times: user 16.4 ms, sys: 6.95 ms, total: 23.3 ms
Wall time: 79.1 ms


In [7]:
@parallel.interactive
def _bcast(key, root_rank):
    """function to run on engines as part of broadcast"""
    g = globals()
    obj = g.get(key, None)
    obj = MPI.bcast(obj, root_rank)
    g[key] = obj

def broadcast(key, obj, dv, root, root_rank):
    """More efficient broadcast by doing push to root,
    and MPI broadcast to other engines.
    
    Still O(N) messages, but all but one message is always small.
    """
    root.push({key : obj}, block=False)
    return dv.apply_async(_bcast, key, root_rank)

In [8]:
%%time
ar = broadcast('data', data, eall, root, root_rank)
ar.wait_interactive()

   4/4 tasks finished after    0 s
done
CPU times: user 22.4 ms, sys: 7 ms, total: 29.4 ms
Wall time: 75.8 ms


In [9]:
eall.apply_sync(np.linalg.norm, parallel.Reference('data'), 2)

[16429.257994180392,
 16429.257994180392,
 16429.257994180392,
 16429.257994180392]