See tutorial [here](https://ipython.org/ipython-doc/dev/parallel/parallel_multiengine.html). If you want to install it see [here](https://github.com/ipython/ipyparallel).

Make sure your `jupyter_notebook_config.json` loads the `ipyparallel.nbextension` to be able to start clusters from the **IPython Clusters** tab

In [46]:
%%writefile /home/user/.jupyter/jupyter_notebook_config.json
{
  "NotebookApp": {
    "extra_template_paths": [
      "/home/user/.local/share/jupyter/templates"
    ],
    "server_extensions": [
      "nbextensions",
      "ipyparallel.nbextension"
    ]
  },
  "version": 1
}

Overwriting /home/user/.jupyter/jupyter_notebook_config.json


## Start an IP Cluster from the notebooks main page

observe that you can use the `%%px` magic to send tasks to execute in each on but they are separate kernels

## Basics

In [9]:
import ipyparallel as ipp
import numpy as np
rc = ipp.Client()
dv = rc.direct_view()
print "available engines", rc.ids

available engines [0, 1, 2, 3]


In [10]:
import os
print "this notebook kernel pid", os.getpid()

this notebook kernel pid 95223


In [11]:
%%px
print "my pid", os.getpid()

CompositeError: one or more exceptions from call to method: execute
[0:execute]: NameError: name 'os' is not defined
[1:execute]: NameError: name 'os' is not defined
[2:execute]: NameError: name 'os' is not defined
[3:execute]: NameError: name 'os' is not defined

In [12]:
%%px
import os
print "my pid", os.getpid()

[stdout:0]  my pid 95669
[stdout:1]  my pid 95670
[stdout:2]  my pid 95672
[stdout:3]  my pid 95674


In [13]:
%px import numpy as np

In [14]:
a = %px np.random.random()

[0;31mOut[0:4]: [0m0.29635249096204574

[0;31mOut[1:4]: [0m0.6770112089408589

[0;31mOut[2:4]: [0m0.40379879994810375

[0;31mOut[3:4]: [0m0.6650611172167279

## Working with views

In [15]:
dv = rc.direct_view()

In [16]:
dv = rc.direct_view()
dr = dv.apply(lambda: "hello")

In [17]:
dr.get()

['hello', 'hello', 'hello', 'hello']

In [26]:
def f(x):
    return "hello "+str(x)
dr = dv.apply(f,"magics")

In [27]:
dr.get()

['hello magics', 'hello magics', 'hello magics', 'hello magics']

In [28]:
dr.metadata

[{'after': [],
  'completed': datetime.datetime(2016, 5, 23, 20, 19, 25, 760084),
  'data': {},
  'engine_id': 0,
  'engine_uuid': u'520a8066-0f74-441c-8725-9753aa89ea7e',
  'error': None,
  'execute_input': None,
  'execute_result': None,
  'follow': [],
  'msg_id': u'032413f6-595d-45d9-b431-097406efa391',
  'outputs': [],
  'received': datetime.datetime(2016, 5, 23, 20, 19, 25, 769991),
  'started': datetime.datetime(2016, 5, 23, 20, 19, 25, 759128),
  'status': u'ok',
  'stderr': '',
  'stdout': '',
  'submitted': datetime.datetime(2016, 5, 23, 20, 19, 25, 753508)},
 {'after': [],
  'completed': datetime.datetime(2016, 5, 23, 20, 19, 25, 764070),
  'data': {},
  'engine_id': 1,
  'engine_uuid': u'17f55a12-5b19-4018-862c-46b2816d33d6',
  'error': None,
  'execute_input': None,
  'execute_result': None,
  'follow': [],
  'msg_id': u'66f1b0ed-46e9-4067-a2f2-936ae8faa558',
  'outputs': [],
  'received': datetime.datetime(2016, 5, 23, 20, 19, 25, 771836),
  'started': datetime.datetime(2

Con `map` mapeamos todos hacemos una operación sobre todos los elementos de una lista en paralelo

In [29]:
dr = dv.map_sync(f, range(10))

In [30]:
dr

['hello 0',
 'hello 1',
 'hello 2',
 'hello 3',
 'hello 4',
 'hello 5',
 'hello 6',
 'hello 7',
 'hello 8',
 'hello 9']

In [31]:
dr = dv.map_sync(lambda x: x**2, range(10))

In [32]:
dr

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

accedemos y definimos variables en cada engine

In [33]:
dv.clear()
dv["a"]

CompositeError: one or more exceptions from call to method: _pull
[Engine Exception]NameError: name 'a' is not defined
[Engine Exception]NameError: name 'a' is not defined
[Engine Exception]NameError: name 'a' is not defined
[Engine Exception]NameError: name 'a' is not defined

In [34]:
a = np.array([1,2,3,4])
dv.push({"a": a+1})

<AsyncResult: _push>

In [35]:
dv["a"]

[array([2, 3, 4, 5]),
 array([2, 3, 4, 5]),
 array([2, 3, 4, 5]),
 array([2, 3, 4, 5])]

Distribuimos y recogemos datos y exploramos el espacio de nombres de cada engine

In [36]:
import numpy as np
data = np.random.randint(10, size=16)
print data
dv.scatter('a',data);

[3 3 0 5 9 4 1 0 6 4 5 5 4 3 4 3]


In [37]:
dv['a']

[array([3, 3, 0, 5]),
 array([9, 4, 1, 0]),
 array([6, 4, 5, 5]),
 array([4, 3, 4, 3])]

In [38]:
dv.apply (lambda: a+1).get()

[array([4, 4, 1, 6]),
 array([10,  5,  2,  1]),
 array([7, 5, 6, 6]),
 array([5, 4, 5, 4])]

In [39]:
dv.execute("import numpy as np")    
def create_b():
    global b
    b = np.copy(a)+1

In [40]:
dv.apply(create_b)

<AsyncResult: create_b>

In [41]:
dv['b']

[array([4, 4, 1, 6]),
 array([10,  5,  2,  1]),
 array([7, 5, 6, 6]),
 array([5, 4, 5, 4])]

In [42]:
dv.gather("b").get()

array([ 4,  4,  1,  6, 10,  5,  2,  1,  7,  5,  6,  6,  5,  4,  5,  4])

Fíjate cómo un array se distribuye por filas. Si lo quieres distribuir por columnas tendrías que usar la transpuesta

In [43]:
data = np.random.randint(10, size=(10,5))
print data
dv.scatter('a',data);
print dv['a']

[[9 3 7 5 7]
 [8 3 1 8 0]
 [6 3 9 6 0]
 [8 6 3 3 2]
 [9 4 6 8 6]
 [9 9 7 8 0]
 [4 8 3 1 5]
 [4 2 6 5 0]
 [2 1 1 6 2]
 [8 6 6 2 0]]
[array([[9, 3, 7, 5, 7],
       [8, 3, 1, 8, 0],
       [6, 3, 9, 6, 0]]), array([[8, 6, 3, 3, 2],
       [9, 4, 6, 8, 6],
       [9, 9, 7, 8, 0]]), array([[4, 8, 3, 1, 5],
       [4, 2, 6, 5, 0]]), array([[2, 1, 1, 6, 2],
       [8, 6, 6, 2, 0]])]


Observa que las estructuras compartidas son de solo lectura

In [44]:
def incr_a():
    global a
    a += 1

In [45]:
dv.apply(incr_a).get()

CompositeError: one or more exceptions from call to method: incr_a
[Engine Exception]ValueError: output array is read-only
[Engine Exception]ValueError: output array is read-only
[Engine Exception]ValueError: output array is read-only
[Engine Exception]ValueError: output array is read-only

Mide la escalabilidad, comparando la ejecución local con la distribuida

In [47]:
def gen_data(its):
    r = 0
    for i in xrange(int(its)):
        r += 1
    return r

In [50]:
%time gen_data(1e8)

CPU times: user 6.48 s, sys: 98.6 ms, total: 6.58 s
Wall time: 6.83 s


100000000

In [51]:
%time dv.apply(gen_data, 1e8).get()

CPU times: user 35.4 ms, sys: 79.4 ms, total: 115 ms
Wall time: 12.8 s


[100000000, 100000000, 100000000, 100000000]

## Parallel magics

In [52]:
%px print "hola"

[stdout:0] hola
[stdout:1] hola
[stdout:2] hola
[stdout:3] hola


In [53]:
%px import numpy as np

In [54]:
%px r = np.random.randint(10)

In [55]:
dv.gather("r").get()

[6, 9, 5, 7]

ejecuta un código en cada engine. Observa cómo recoge el stdout y la salida final. **ESTO ES ÚTIL PARA DEBUGGING**

In [56]:
%%px
def get_rnd_vector(l):
    print "calling with arg",l
    return np.random.randint(10, size=l)
get_rnd_vector(10)

[stdout:0] calling with arg 10
[stdout:1] calling with arg 10
[stdout:2] calling with arg 10
[stdout:3] calling with arg 10


[0;31mOut[0:8]: [0marray([1, 8, 4, 3, 4, 8, 1, 9, 5, 5])

[0;31mOut[1:8]: [0marray([9, 8, 8, 3, 3, 2, 6, 5, 2, 4])

[0;31mOut[2:8]: [0marray([0, 7, 6, 2, 7, 7, 5, 7, 8, 8])

[0;31mOut[3:8]: [0marray([5, 1, 1, 8, 6, 3, 8, 9, 0, 1])

In [57]:
dv.map(lambda x: get_rnd_vector(x), [2,3,4,5]).get()

[array([8, 5]), array([3, 4, 4]), array([8, 3, 2, 5]), array([1, 6, 9, 7, 4])]

observa que la función no existe en el cliente

In [58]:
get_rnd_vector(5)

NameError: name 'get_rnd_vector' is not defined

In [169]:
%%px --local
import numpy as np
import os
def is_prime(a):
    prime = True    
    for i in np.arange(2, a):
        if float(a)/i % 1 == 0:
            prime = False
    return os.getpid(),prime

In [170]:
%%px
print "my pid", os.getpid(), is_prime(3)

[stdout:0] my pid 82973 (82973, True)
[stdout:1] my pid 82971 (82971, True)
[stdout:2] my pid 82972 (82972, True)
[stdout:3] my pid 82970 (82970, True)


`%px` is for simple execution, with stdout and stderr capture. Typically you would use methods from the `DirectView`

In [171]:
dv.map_sync(is_prime,[1,2,3,4,5,6,7,8,9])

[(82973, True),
 (82973, True),
 (82973, True),
 (82971, False),
 (82971, True),
 (82972, False),
 (82972, True),
 (82970, False),
 (82970, False)]

### Midiendo el tiempo de ejecución en IPython

In [59]:
def long_loop(N):
    for i in xrange(int(N)):
        a = 1

In [60]:
%timeit long_loop(1e3)

10000 loops, best of 3: 48.2 µs per loop


In [61]:
%timeit -n 100 -r 4 long_loop(1e3)

100 loops, best of 4: 43.4 µs per loop


In [62]:
t = %timeit -n 2000 -r 4 -o long_loop(1e3)

2000 loops, best of 4: 47.4 µs per loop


In [63]:
import numpy as np
print "loops       ", t.loops
print "repeats     ", t.repeat
print "compile time", t.compile_time
print "best        ", t.best
print "all         ", t.all_runs
print "all/nloops  ", np.array(t.all_runs)/t.loops

loops        2000
repeats      4
compile time 0.00019
best         4.74104881287e-05
all          [0.09482097625732422, 0.09891605377197266, 0.14472198486328125, 0.10009288787841797]
all/nloops   [  4.74104881e-05   4.94580269e-05   7.23609924e-05   5.00464439e-05]


observa que cada run ejecuta 2000 veces (_loops_) el código. `t.all_runs` reporta el tiempo total de cada run. al dividir `t.all_runs` por el número total de _loops_ obtenemos el tiempo medio de ejecución del código.