# MPI for Python (mpi4py)
It allows the program to be parallely executed with messages being passed between nodes.

After struggling with the installation, start the cluster with:

ipcluster start --engines=MPI -n 4 [link](https://github.com/ipython/ipyparallel/blob/master/examples/Using%20MPI%20with%20IPython%20Parallel.ipynb)

In [1]:
import ipyparallel as ipp
rc = ipp.Client()
view = rc[:]

In [2]:
@view.remote(block=True)
def mpi_rank():
    from mpi4py import MPI
    comm = MPI.COMM_WORLD
    return comm.Get_rank()

### test that is working

In [3]:
mpi_rank()


[1, 2, 3, 0]

In [13]:
mpi_rank.block = False
ar = mpi_rank()
ar.get_dict()

{0: 0, 1: 1, 2: 2, 3: 3}

With %%px cell magic, the next cell will actually execute entirely on each engine

In [14]:
%%px
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
   data = [(i+1)**2 for i in range(size)]
else:
   data = None
data = comm.scatter(data, root=0)

assert data == (rank+1)**2, 'data=%s, rank=%s' % (data, rank)
{
    'data': data,
    'rank': rank,
}

[0;31mOut[0:2]: [0m{'data': 1, 'rank': 0}

[0;31mOut[1:2]: [0m{'data': 4, 'rank': 1}

[0;31mOut[2:2]: [0m{'data': 9, 'rank': 2}

[0;31mOut[3:2]: [0m{'data': 16, 'rank': 3}

In [32]:
%%px
from mpi4py import MPI
comm=MPI.COMM_WORLD
print 'nodes', MPI.COMM_WORLD.size
rank = comm.rank
print rank

[stdout:0] 
nodes 4
3
[stdout:1] 
nodes 4
0
[stdout:2] 
nodes 4
2
[stdout:3] 
nodes 4
1


# Hello world!
basic commands to communicate with the cluster

### Accessors

In [33]:
%%px
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
name = MPI.Get_processor_name()

print ("Hello, World! "
       "I am process %d of %d on %s" %
       (rank, size, name))

[stdout:0] Hello, World! I am process 3 of 4 on Joses-Mac.local
[stdout:1] Hello, World! I am process 0 of 4 on Joses-Mac.local
[stdout:2] Hello, World! I am process 2 of 4 on Joses-Mac.local
[stdout:3] Hello, World! I am process 1 of 4 on Joses-Mac.local


### Split the data

In [34]:
%%px
if rank < size//2:
    color = 55
    key = -rank
else:
    color = 77
    key = +rank
newcomm = MPI.COMM_WORLD.Split(color, key)
newcomm.Free()
print rank, color, key

[stdout:0] 3 77 3
[stdout:1] 0 55 0
[stdout:2] 2 77 2
[stdout:3] 1 55 -1


### Exchange values
use dest and source from and to where send values

In [35]:
%%px
comm = MPI.COMM_WORLD

sendmsg = [comm.rank]*3
right = (comm.rank + 1) % comm.size
left = (comm.rank - 1) % comm.size

req1 = comm.isend(sendmsg, dest=right)
req2 = comm.isend(sendmsg, dest=left)
lmsg = comm.recv(source=left)
rmsg = comm.recv(source=right)

MPI.Request.Waitall([req1, req2])
assert lmsg == [left] * 3
assert rmsg == [right] * 3
print comm.rank, left, sendmsg

[stdout:0] 3 2 [3, 3, 3]
[stdout:1] 0 3 [0, 0, 0]
[stdout:2] 2 1 [2, 2, 2]
[stdout:3] 1 0 [1, 1, 1]


In [None]:
%%px
import numpy
comm = MPI.COMM_WORLD
assert comm.size == 2

if comm.rank == 0:
    array1 = numpy.arange(10000, dtype='f')
    array2 = numpy.empty(10000, dtype='f')
    target = 1
else:
    array1 = numpy.ones(10000, dtype='f')
    array2 = numpy.empty(10000, dtype='f')
    target = 0

request = comm.Isend([array1, MPI.FLOAT], dest=target)
comm.Recv([array2, MPI.FLOAT], source=target)
request.Wait()

### Broadcast
(one to all)

In [16]:
%%px
comm = MPI.COMM_WORLD

if comm.rank == 0:
    sendmsg = (7, "abc", [1.0,2+3j], {3:4})
else:
    sendmsg = None

recvmsg = comm.bcast(sendmsg, root=0)
print comm.rank, recvmsg

[stdout:0] 0 (7, 'abc', [1.0, (2+3j)], {3: 4})
[stdout:1] 1 (7, 'abc', [1.0, (2+3j)], {3: 4})
[stdout:2] 2 (7, 'abc', [1.0, (2+3j)], {3: 4})
[stdout:3] 3 (7, 'abc', [1.0, (2+3j)], {3: 4})


### Scatter

In [18]:
%%px
comm = MPI.COMM_WORLD

if comm.rank == 0:
    sendmsg = [i**2 for i in range(comm.size)]
else:
    sendmsg = None

recvmsg = comm.scatter(sendmsg, root=0)
print comm.rank, recvmsg

[stdout:0] 0 0
[stdout:1] 1 1
[stdout:2] 2 4
[stdout:3] 3 9


### Gather & Gather to All

In [19]:
%%px
comm = MPI.COMM_WORLD

sendmsg = comm.rank**2

recvmsg1 = comm.gather(sendmsg, root=0)
recvmsg2 = comm.allgather(sendmsg)
print comm.rank, recvmsg1, recvmsg2

[stdout:0] 0 [0, 1, 4, 9] [0, 1, 4, 9]
[stdout:1] 1 None [0, 1, 4, 9]
[stdout:2] 2 None [0, 1, 4, 9]
[stdout:3] 3 None [0, 1, 4, 9]


### Reduce & Reduce to All

In [20]:
%%px
comm = MPI.COMM_WORLD

sendmsg = comm.rank

recvmsg1 = comm.reduce(sendmsg, op=MPI.SUM, root=0)
recvmsg2 = comm.allreduce(sendmsg)
print comm.rank, sendmsg, recvmsg1, recvmsg2

[stdout:0] 0 0 6 6
[stdout:1] 1 1 None 6
[stdout:2] 2 2 None 6
[stdout:3] 3 3 None 6


## Compute Pi

In [30]:
%%px
import math

def compute_pi(n, start=0, step=1):
    h = 1.0 / n
    s = 0.0
    for i in range(start, n, step):
        x = h * (i + 0.5)
        s += 4.0 / (1.0 + x**2)
    return s * h

comm = MPI.COMM_WORLD
nprocs = comm.Get_size()
myrank = comm.Get_rank()

if myrank == 0:
    n = 10
else:
    n = None

n = comm.bcast(n, root=0)

mypi = compute_pi(n, myrank, nprocs)
print myrank, mypi
pi = comm.reduce(mypi, op=MPI.SUM, root=0)

if myrank == 0:
    error = abs(pi - math.pi)
    print ("pi is approximately %.16f, "
        "error is %.16f" % (pi, error))

[stdout:0] 
0 0.963863435985
pi is approximately 3.1424259850010983, error is 0.0008333314113051
[stdout:1] 1 0.908549442942
[stdout:2] 2 0.657665667321
[stdout:3] 3 0.612347438753


### an example
Having a vector of size N an elements x_i, want to compute a function f to each element
and return another vector of size N with values f(x_i)

In [71]:
%%timeit
%%px
import numpy as np
import scipy as sp
from mpi4py import MPI

def funct(x):
    return np.random.normal(0., 1.0)

def par_fucnt(N):
    N= N
    x = np.arange(N)
    #print x, map(funct, x)

    comm = MPI.COMM_WORLD
    nprocs = comm.Get_size()
    myrank = comm.Get_rank()


    chunks = [[] for _ in np.arange(nprocs)]
    n= np.ceil(float(len(x))/nprocs)

    for i, chunk in enumerate(x):
        chunks[int(i//n)].append(chunk)
      
    #print [map(funct, chuck) for chuck in chunks ]

    scatter= comm.scatter(chunks, root=0)
    result_per_node = map(funct, scatter)
    result_gather   = comm.gather(result_per_node, root=0)

    #final_result= np.array(result_gather)
    #print final_result.flatten()  # only work if sublist have same # elements

    if myrank == 0:
        final_result = [item for sublist in result_gather for item in sublist]
    else:
        final_result = None

    #print final_result 
    #print comm.rank, scatter, result_per_node
    #print result_gather

par_fucnt(1000000)

1 loop, best of 3: 5.57 s per loop


In [69]:
%%timeit
par_fucnt(1000000)

1 loop, best of 3: 9.93 s per loop


In [70]:
%%timeit 
x = np.arange(1000000)
map(funct, x)

1 loop, best of 3: 289 ms per loop


In [75]:

def like(x):
    return -((x[0])**2+(x[1])**2/1.0-1.5*x[0]*x[1])/2.0

x = [(1, 2),(2,3)]
print map(like, x)

[-1.0, -2.0]
