# Going parallel with IPython: the basics

## Preliminaries

IPython parallel was an integral part of IPython time ago, but now it is a separate package. Install instructions here:
https://ipyparallel.readthedocs.io/en/latest/index.html

## Checking execution in a cluster

We are dealing here with clusters of IPython engines **in your same computer**. 

You can start clusters from the IPython clusters tab, if you installed the Notebook server extension:

$ ipcluster nbextension enable 

Alternatively, there is a command line interface to start clusters, for example:

$ ipcluster start -n 4

If you execute the following code, you will see the ids of the four engines you have started up.

In [45]:
|import ipyparallel as ipp
rc = ipp.Client()
rc.ids

[0, 1, 2, 3]

Now for a simple execution on the cluster we are going to use the following function.

In [2]:
def longrun(n):
    from random import randint
    bignumber = 0
    for i in range(n):
       bignumber += randint(1,100)
    return bignumber

In [22]:
%timeit longrun(100000)

10 loops, best of 3: 152 ms per loop


The code has been executed in the **default kernel**, we can see its process id with the following.

In [6]:
from os import getpid
print(getpid())

14577


Now it is time to use the cluster for executing the function. We have to use apply_sync to execute it synchronously.

In [7]:
%timeit rc[2].apply_sync(longrun, 100000)

The slowest run took 4.33 times longer than the fastest. This could mean that an intermediate result is being cached.
1 loop, best of 3: 151 ms per loop


In [23]:
rc[0].apply_sync(getpid)

14552

You can do parallel execution simply by referring to a view of nodes, i.e. a list of engines.

In [24]:
rc[:].apply_sync(getpid)

[14552, 14557, 14558, 14559]

In [25]:
%timeit ns = rc[:].apply_sync(longrun, 100000)

1 loop, best of 3: 308 ms per loop


In [12]:
print(len(rc[1:3]))
print(type(rc[:]))

2
<class 'ipyparallel.client.view.DirectView'>


Note the function has been executed as many times as engines in the view.

In [26]:
ns = rc[:].apply_sync(longrun, 10000)
print(ns)

[508856, 501081, 508670, 503339]


This is called the <b>direct view</b> that basically works as a multiplexer, i.e. it broadcast the computation (the function in this case) to all the nodes specified.

## Mapping computations

We can use the <a href="https://docs.python.org/2/library/functions.html#map">map built-in</a> to spread execution among engines. The following example calls longrun four times, one per each of the elements in the list passed, using **regular non-parallel Python**.

Note that the map applies the computation to **each element** in the list.

In [29]:
# This is the regular map built-in in Python:
r = map(longrun, [1000, 500, 1000, 200])
print(list(r))

[48435, 25688, 50247, 9824]


Views have a **map** method that works similarly to the map built-in but does parallel execution.

In [30]:
view = rc.load_balanced_view()
view.map_sync(longrun, [1000, 500, 1000, 200, 400, 800])

[49961, 24407, 48716, 10029, 20708, 41518]

Note that times are in general worse than using non-parallel Python, but this is just because we are using a local in-process cluster, and the overhead of the communication to engines adds time for a relatively short computation.

In [31]:
type(view)

ipyparallel.client.view.LoadBalancedView

This is using the controller to load balance so leaving decisions to the underlying infrastructure.

The map is different from **parallel functions**. Parallel functions split the computation in as many tasks as engines, not as elements in the list.

In [32]:
v = rc[:]
@v.parallel(block=True, )
def longrunparallel(x):
    return(len(x))

In [39]:
ret = longrunparallel(range(102))
print (len(range(102)))
print(ret, sum(ret))

102
[26, 26, 25, 25] 102


Details on running parallel functions are here:
http://ipython.org/ipython-doc/rel-0.11/api/generated/IPython.parallel.client.remotefunction.html

## Engines are dicts

Using dict syntax with views works as push/pull operations of variables in particular processes.

In [40]:
rc[1]["myVarIn1"] = "I am in 1"

In [42]:
print (myVarIn1)

NameError: name 'myVarIn1' is not defined

In [44]:
print(rc[1]["myVarIn1"])
print (rc[0]["myVarIn1"])

I am in 1


RemoteError: NameError(name 'myVarIn1' is not defined)

If we use parallel.Reference("x") in apply() functions, we are referring to a variable x that is already remote. This way, we can avoid copying the same variable through the network in several calls to apply. Think if x is a large array, this will save a lot of time.

## Scatter and gather

Clusters start to become useful when you can partition data to do operations in pieces of data.

In [45]:
import numpy as np
# The magic is needed to execute it in every instance 
# (you could alternatively place the import inside the function)

%px import numpy as np 
def CostlyOperation():
    global a  
    array = np.copy(a)
    return array*array

Scatter/gather take an array and partition it among the parallel instances available.

In [47]:
a = np.random.randn(200000)+1
print (a, len(a))
other_view = rc[:]
print (other_view)

other_view.scatter('a', a)
# With other_view['a'] you can see the partitions
other_view['a']


[ 0.76282399 -0.36440919  1.59916077 ...,  2.70057731  0.11940081
  2.68023101] 200000
<DirectView [0, 1, 2, 3]>


[array([ 0.76282399, -0.36440919,  1.59916077, ...,  0.89453239,
         0.70317073,  0.59536623]),
 array([-0.14962874,  2.17051201,  2.30378551, ..., -0.26951869,
         1.89702123, -2.16316327]),
 array([-1.04528056, -0.42361677,  2.69030795, ...,  1.15064081,
         0.29789342,  1.58629635]),
 array([ 0.10011466,  0.516968  ,  0.41366228, ...,  2.70057731,
         0.11940081,  2.68023101])]

In [48]:

# In this case, parallel execution is slower, as we are
# doing it in the same machine, but with more data transfer.
%timeit other_view.apply_sync(CostlyOperation)
%timeit CostlyOperation()

other_view.apply_sync(CostlyOperation)

100 loops, best of 3: 11.8 ms per loop
1000 loops, best of 3: 868 µs per loop


[array([ 0.58190045,  0.13279406,  2.55731518, ...,  0.80018819,
         0.49444908,  0.35446094]),
 array([ 0.02238876,  4.71112238,  5.30742768, ...,  0.07264033,
         3.59868954,  4.67927535]),
 array([ 1.09261146,  0.17945117,  7.23775689, ...,  1.32397427,
         0.08874049,  2.51633612]),
 array([ 0.01002295,  0.26725591,  0.17111648, ...,  7.29311782,
         0.01425655,  7.18363827])]

Now you can recover values with gather:

In [50]:
other_view.block=True # Needed so that we wait for all the 

# We can get the original a that was distributed this way:
b = other_view.gather("a")
print (b, len(b))

[ 0.76282399 -0.36440919  1.59916077 ...,  2.70057731  0.11940081
  2.68023101] 200000


## Working with multiple (Internet)data, same program

In some cases, the same program needs to repeat a task from different parts of the data, and these data are external to our system. This is a good case for ad hoc parallel computation.

For example, if we want to count n-gram data from Google:

http://storage.googleapis.com/books/ngrams/books/datasetsv2.html

We can distribute the list of files to the cluster, and each node will download a different piece of data from Google.

In [46]:
# Some files from Google, this could be a complete list generated usign loops.
files = ["googlebooks-eng-all-5gram-20120701-zk.gz", 
         "googlebooks-eng-all-5gram-20120701-zl.gz", 
         "googlebooks-eng-all-5gram-20120701-zm.gz", 
         "googlebooks-eng-all-5gram-20120701-zn.gz", 
        ]

%px import wget
%px import gzip


clus = rc[:]
#manda trabajo asincrono
@clus.parallel(block=False, )
def countngrams(filename):
    base_url = "http://storage.googleapis.com/books/ngrams/books/"
    wget.download(base_url + filename[0])
    lines = 0
    with gzip.open(filename[0], 'r') as f:
        for line in f:
            lines+=1
    return lines

# This is the blocking sequential version:
#for f in files:
#import wget
#import gzip
#    res = countngrams(f)
#    print(res)

import time

res = countngrams(files)
#cuando el calculo termina imprime
while not res.ready():
    time.sleep(5)
    
print(res.result())


[422, 4810, 1951, 312798]


In this case, the call was non-blocking, so that we get a handle to retrive the data later.

In [47]:
%ls
#%rm *.gz


Out-of-core con Dask.ipynb  log
bloque6.zip                 mydask.png
bloque6_guia.docx           myfile.hdf5
bloque6_guia.pdf            [34mnoaa[m[m/
datos_sensor.hdf5           parallel.ipynb
ipython-parallel.PPT
