Parallel Computing with Python using a cluster
================================

**[Rodrigo Nemmen](http://rodrigonemmen.com), IAG USP**

This IPython notebook illustrates a few simple ways of doing parallel computing. It assumes you have a parallel IPython cluster running over SSH as described in this [tutorial](http://astropython.blogspot.com.br/2016/02/how-to-setup-ipython-parallel-cluster.html). Launch the cluster:

    ipcluster start --profile='ssh'

Practical examples included in this notebook:

1. Parallel function mapping to a list of arguments (multiprocessing module)
2. Parallel execution of array function (scatter/gather) + parallel execution of scripts
4. Easy parallel Monte Carlo (parallel magics)

In [1]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [2]:
import scipy
import nemmen

# 1. Mapping a model to a grid of parameters 

<!---
Inspired on "useful parallel".
-->

Uses the *multiprocessing* module that comes by default with python, i.e. method independent of IPython.

Idea: you have a function $f(\mathbf{x},\mathbf{y})$ of two parameters (e.g., $f$ may represent your model) stored in the arrays $(\mathbf{x},\mathbf{y})$. Given the arrays $\mathbf{x}$ and $\mathbf{y}$, you want to compute the values of $f(\mathbf{x},\mathbf{y})$. Let's assume for simplicity that there is no dependence on the neighbours. This is an embarassingly parallel problem.

<!---
### TODO

* Random sampling of parameter space if desired
-->

In [2]:
import multiprocessing

Time wasting function that depends on two parameters. Here, I generate a lot of random numbers based on the normal distribution. The two parameters are $\mu$ and $\sigma$.

In [3]:
def f(z):
    x=nemmen.random_normal(z[0], z[1], 100000)
    return x.sum()

Arrays of input parameters. You could easily modify this to take as input a matrix, not two arrays.

In [4]:
n=3000
X=numpy.linspace(-1,1,n) # mean
Y=numpy.linspace(0.1,1,n) # std. dev.

In [5]:
# creates list of arguments [Xi, Yi]
pargs=[]	# this is a list of lists!
for i in range(X.size):
	pargs.append([X[i],Y[i]])

Parallel execution. Check out all the cores being used.

In [6]:
ncores=multiprocessing.cpu_count() # number of cores
pool = multiprocessing.Pool(processes=ncores) # initializes parallel engine

In [10]:
%%time 
t=pool.map(f, pargs)	# parallel function map

CPU times: user 43.6 ms, sys: 1.88 ms, total: 45.5 ms
Wall time: 3.94 s


In [11]:
pool.close()	# close the parallel engine

Serial execution

In [12]:
%time t=map(f, pargs)

CPU times: user 9.88 s, sys: 19 ms, total: 9.89 s
Wall time: 9.94 s


In [None]:
# if you want to convert list to matrix 
#y=array(t)

Note that there is a similar map method for IPython parallel.

# 2. Parallel execution of array function

Uses IPython parallel. Consider a function $f(x)$ which takes an array $x$ containing the grid of input parameters. We want to split the function calls ("split the array") to the different cores in our machine:

![test](http://computing.llnl.gov/tutorials/parallel_comp/images/array_proc1.gif)
![test](http://computing.llnl.gov/tutorials/parallel_comp/images/array_proc2.gif)

Our time-waster function $f(x)$ that can be applied to an array of integers

In [3]:
# test if n is prime
def isprime(n):
    for i in range(3, n):
        if n % i == 0:
            return False
    return True

# tests each element of an array if it is prime
def f(x):
    return map(isprime,x) 

Generates big array of integers (in this case, 10k elements)

In [4]:
x = scipy.random.randint(0,100000, (10000,)) 

Serial execution

In [5]:
%time y=f(x)

CPU times: user 19.8 s, sys: 236 ms, total: 20 s
Wall time: 20.1 s


Now explain how IPython parallel works **(slides)**

In [6]:
import ipyparallel
client = ipyparallel.Client(profile='ssh')

We are going to use the direct view, which means that commands always run on all nodes. This as opposed to a balanced view, which asynchronously executes code on nodes which are idle. In addition, we are going to turn blocking on. This means that jobs will block further execution until all nodes have finished.

In [8]:
direct = client[:]
direct.block = True

Splits the input array $x$ between the cores

In [9]:
direct.scatter('x',x)

Verify that the array was indeed divided equally among the 24 engines

In [10]:
direct['x.size']

[417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 417,
 416,
 416,
 416,
 416,
 416,
 416,
 416,
 416]

In [11]:
direct['x']

[array([43609, 71158,  3753, 34684, 95499, 85862, 18642, 22492, 68964,
        76654, 28322,  5383, 10417, 70534, 33174, 84461, 91996, 74484,
        85981, 89913, 45067, 26499, 12274, 22477, 67491, 82092, 93669,
        61269, 24715, 30040, 74499,  3159, 92823, 27064, 97127, 81057,
        57558, 57227, 14720, 97548, 60105, 32947, 40596, 18095, 41710,
        11929,  3393,  2379, 99779, 44112, 15009, 18991, 75538, 18589,
        65596, 20400, 29623, 89350, 18159, 39343, 20068, 55423, 27784,
        66954, 95358,  1854, 13241, 20355, 74891, 85421, 67552, 35079,
        64141, 12869, 80213, 18247, 20648, 14626,  8468,  2453, 38690,
         3688, 38945, 76869,  5997,   677, 61467, 77392, 38949, 54048,
        56890, 30225, 34041, 70058, 88233, 71794, 62115,  2693, 80626,
        86700, 66882, 72946, 96093, 17044, 16985, 96545,  8040,  4961,
        80393, 75950, 41741, 37924, 72517, 58094, 66147, 54210, 46269,
        26827, 42693, 79816, 89265, 21135,  6118,  4449, 48058, 45208,
      

Let's try to apply the function in each different core

In [12]:
%%px
y=f(x)

CompositeError: one or more exceptions from call to method: execute
[0:execute]: NameError: name 'f' is not defined
[1:execute]: NameError: name 'f' is not defined
[2:execute]: NameError: name 'f' is not defined
[3:execute]: NameError: name 'f' is not defined
.... 20 more exceptions ...

Why the errors above? Because each core does not see the local engine. They work as separate machines and you have to load all variables and modules in each engine. That's easy.

In [13]:
%%file myscript.py
# test if n is prime
def isprime(n):
    for i in range(3, n):
        if n % i == 0:
            return False
    return True

# tests each element of an array if it is prime
def f(x):
    return map(isprime,x) 

Overwriting myscript.py


Execute code which defines the methods on the different engines

In [14]:
direct.run("myscript.py")

<AsyncResult: finished>

Now compute the "model grid" correctly

In [15]:
%%time
%px y=f(x)

CPU times: user 55 ms, sys: 6.79 ms, total: 61.8 ms
Wall time: 2.7 s


Alternatively to the command above, you could use

    direct.apply(f,x)
or

    direct.execute('y=f(x)')
You should time this in your watch.

Now we have the separate arrays $y$ containing the results on each engine. How to get it back to the local engine?

In [16]:
%%px 
import numpy
numpy.size(y)

[0;31mOut[0:3]: [0m417

[0;31mOut[1:3]: [0m417

[0;31mOut[2:3]: [0m417

[0;31mOut[3:3]: [0m417

[0;31mOut[4:3]: [0m417

[0;31mOut[5:3]: [0m417

[0;31mOut[6:3]: [0m417

[0;31mOut[7:3]: [0m417

[0;31mOut[8:3]: [0m417

[0;31mOut[9:3]: [0m417

[0;31mOut[10:3]: [0m417

[0;31mOut[11:3]: [0m417

[0;31mOut[12:3]: [0m417

[0;31mOut[13:3]: [0m417

[0;31mOut[14:3]: [0m417

[0;31mOut[15:3]: [0m417

[0;31mOut[16:3]: [0m416

[0;31mOut[17:3]: [0m416

[0;31mOut[18:3]: [0m416

[0;31mOut[19:3]: [0m416

[0;31mOut[20:3]: [0m416

[0;31mOut[21:3]: [0m416

[0;31mOut[22:3]: [0m416

[0;31mOut[23:3]: [0m416

In [17]:
y=direct.gather('y')

We have it back in the local engine.  :)

# 3. Easy parallel Monte Carlo

Suppose you need to do 100k Monte Carlo simulations. Wouldn't it be great if you could easily split them among your (hopefully many) cores?

In this example, I have a dataset and I will just do bootstrapping resamples on the dataset. 

In [18]:
import astropy.io.ascii as ascii

Load dataset ([available in the repository](https://github.com/rsnemmen/parallel-python-tutorial/blob/master/allrb.dat))

In [20]:
data=ascii.read('/Users/nemmen/work/projects/finished/jetpower/data/allrb.dat')

xdata=data['pb']
ydata=data['pj']

In [21]:
# number of desired bootstraps
nboot=400000

# number of bootstraps that will go to each core
n=nboot/size(client.ids)

## Passes variables to the engines

First make sure all of them see the path to any custom modules

In [22]:
%%px
import sys, os
home=os.environ['HOME']
sys.path.append(home+'/work/software/python') # make sure pythonpath is updated

In [23]:
direct.push(dict(n=n,xdata=xdata,ydata=ydata))

with direct.sync_imports():
    import numpy, nemmen, scipy

importing numpy on engine(s)
importing nemmen on engine(s)
importing scipy on engine(s)


Now everything below is executed in parallel! (IPython magic)

<!---
Have a look also at the %autopx command.
-->

In [24]:
%%time
%%px
r=numpy.zeros(n)

for i in range(n):
    [xsim,ysim]=nemmen.bootstrap([xdata,ydata]) # data bootstrapping
    r[i]=scipy.stats.pearsonr(xsim,ysim)[0]	 # just compute simple Pearson coefficient

CPU times: user 65.9 ms, sys: 6.92 ms, total: 72.8 ms
Wall time: 8 s


Assemble result from cores

In [25]:
r=direct.gather('r')

For comparison, how long does it take to do the same simulation in serial mode?

In [26]:
%%time
for i in range(nboot):
    [xsim,ysim]=nemmen.bootstrap([xdata,ydata])
    t=scipy.stats.pearsonr(xsim,ysim)[0]

CPU times: user 1min 33s, sys: 742 ms, total: 1min 34s
Wall time: 1min 35s


# Useful documentation

## IPython video tutorials

* [IPython in depth: high productivity interactive and parallel python - PyCon 2014](http://youtu.be/XFw1JVXKJss)
* [The IPython Notebook Revolution](http://youtu.be/t_TzRaK9kpU)

## Parallel computing

### General

* [Introduction to parallel programming](https://computing.llnl.gov/tutorials/parallel_comp)

### Python

* [AstroPython blog](http://astropython.blogspot.com.br), maintained (not so often anymore) by the speaker
* [Using IPython for parallel computing](http://ipython.org/ipython-doc/stable/parallel/index.html)
* [Parallel computing with IPython](http://www.astro.washington.edu/users/vanderplas/Astr599/notebooks/21_IPythonParallel)
* [Simple python parallelism](http://scottsievert.github.io/blog/2014/07/30/simple-python-parallelism/) and [easily distributing a parallel IPy Notebook on a cluster](http://twiecki.github.io/blog/2014/02/24/ipython-nb-cluster/). Some ideas on making a function parallel easily

### Executing general (non-python) jobs in parallel

* [Submit non-python commands to a running IPython cluster](https://gist.github.com/zonca/8994544)
* [GNU Parallel](http://www.gnu.org/software/parallel/)
* [xjobs](http://www.maier-komor.de/xjobs.html)

In [7]:
client.ids

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23]