> Created by Teodora Szasz, tszasz@uchicago.edu

# <h1 align="center">5. Distributing tasks on several cores with `IPython.parallel` </h1>

- Firstly, we have to install `ipyparallel`

- Next, we launch 4 IPython engines by running `ipcluster start` in your Terminal.
- In general, you can launch as many engines as the number of CPUs you have on your machine.

In [1]:
import numpy as np
# ipyparallel was Ipython.parallel before IPython 4.0
from ipyparallel import Client

- Once the engines have been launched, we create a `Client` instance.
- This object will give us access at the engines:

In [2]:
rc = Client()

- There are two ways to access the engines:
    - Direct interface - provides direct access to every engine
    - Load-balanced interface - we submit jobs to a scheduler which dynamically assigns them to the engines depending on their current load.
    

### A. Direct interface

In [3]:
rc.ids

[0, 1, 2, 3]

- There are several ways to run code in parallel on the engines.
- Let's use `%px` magic command

In [4]:
%px import os, time

In [5]:
%px print(os.getpid())

[stdout:0] 13885
[stdout:1] 13888
[stdout:2] 13889
[stdout:3] 13894


- The code passed to the `%px` magic command is executed on all engines.
- We just displayed the PID (process identifier) of every engine.
- Every engine is an independent Python process.
- We can specify the exact list of engines to run the code, using `--targets` option:

In [6]:
%%px --targets :-1
    print(os.getpid())

[stdout:0] 13885
[stdout:1] 13888
[stdout:2] 13889


- `%pxconfig` magic command can also be used to configure the parallel interface.

### B. Load-balanced interface

- This interface gives us higher level parallel computing routines, dynamically executed by the machines.

- Let's estimate pi in prallel using Monte-Carlo method:

- We will sample a large number of points uniformly in a square, and estimate the proportion of those which are in a quarter disc. Then, we will get an estimation of pi since we know that this proportion should be `pi/4`.

- Firstly, create a balanced view:

In [7]:
v = rc.load_balanced_view()

- We sample and count the number of points in a quater disc:

In [8]:
def sample(n):
    import numpy as np
    # Random coordinates.
    x,y = np.random.rand(2,n)
    # Square distances tot the origin.
    r_square = x ** 2 + y ** 2
    # Number of points in the quarter disc.
    return (r_square <= 1).sum()

- We create a function that returns an estimation of pi based on the number of points in the quarter disc, and the total number of points

In [9]:
def pi(n_in, n):
    return 4. * float(n_in) / n

- Here is an example:

In [10]:
n = 100000000

In [11]:
pi(sample(n),n)

3.14178932

- Let's evaluate the time taken by this funtion on a single core:

In [12]:
%timeit pi(sample(n),n)

1 loop, best of 3: 4.57 s per loop


- We will now run this simulation in parallel.
- First, we divide this task in 100 smaller subtasks where the number of points is divided by 100:

In [13]:
args = [n // 100] * 100

- We use parallel `map()` function to run these tasks in parallel.
- Our `sample()` function is called 100 times, taking `n // 100` as its argument every time.
- We will then combine the 100 results together.

In [14]:
ar = v.map(sample, args)

In [17]:
ar.ready(), ar.progress

(True, 100)

In [18]:
ar.elapsed, ar.serial_time

(2.508833, 8.963200000000006)

In [None]:
%debug

> [0;32m<ipython-input-20-6beae10f07b3>[0m(1)[0;36m<module>[0;34m()[0m
[0;32m----> 1 [0;31m[0;32mimport[0m [0mipdb[0m[0;34m;[0m [0mipdb[0m[0;34m.[0m[0mset_trace[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0m
[0m[0;32m      2 [0;31m[0mpi[0m[0;34m([0m[0mnp[0m[0;34m.[0m[0msum[0m[0;34m([0m[0mar[0m[0;34m.[0m[0mresult[0m[0;34m)[0m[0;34m,[0m[0mn[0m[0;34m)[0m[0;34m[0m[0m
[0m
ipdb> ar.result
<bound method Future.result of <AsyncMapResult: sample:finished>>
ipdb> n
100000000
ipdb> sum(ar.result)
*** TypeError: 'method' object is not iterable
ipdb> np.sum(ar.result,n)
*** ValueError: 'axis' entry is out of bounds
ipdb> np.sum(ar.result),n
(<bound method Future.result of <AsyncMapResult: sample:finished>>, 100000000)
ipdb> np.sum((ar.result),n)
*** ValueError: 'axis' entry is out of bounds
ipdb> np.sum(ar.result)
<bound method Future.result of <AsyncMapResult: sample:finished>>
ipdb> x = ar.result
ipdb> x
<bound method Future.result of <AsyncMapResult

In [20]:
pi(np.sum(ar.result),n)

ImportError: No module named 'ipdb'

### Some useful references:

- Documentation of IPython.parallel: https://ipyparallel.readthedocs.io/en/latest/
- MPI with IPython: https://ipython.org/ipython-doc/3/parallel/parallel_mpi.html