# Load-balancing with IPython.parallel

In [1]:
import os,sys,time
import numpy as np

from IPython.core.display import display
import ipyparallel as ipp
rc = ipp.Client()
dview = rc[:]

Create a LoadBalancedView

In [2]:
lview = rc.load_balanced_view()
lview

<LoadBalancedView None>

LoadBalancedViews behave very much like a DirectView on a single engine:

Each call to `apply()` results in a single remote computation,
and the result (or AsyncResult) of that call is returned directly,
rather than in a list, as in the multi-engine DirectView.

In [3]:
e0 = rc[0]

In [4]:
from numpy.linalg import norm
A = np.random.random(1024)

e0.apply_sync(norm, A, 2)

18.595714829840038

In [5]:
lview.apply_sync(norm, A, 2)

18.595714829840038

However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:

In [7]:
e0.apply_sync(os.getpid)

23698

In [8]:
for i in range(2*len(rc.ids)):
    pid = lview.apply_sync(os.getpid)
    print("task %i ran on: %i" % (i, pid))

task 0 ran on: 23698
task 1 ran on: 23699
task 2 ran on: 23701
task 3 ran on: 23700
task 4 ran on: 23698
task 5 ran on: 23699
task 6 ran on: 23701
task 7 ran on: 23700


# Map

The LoadBalancedView also has a load-balanced version of the builtin `map()`

In [13]:
lview.block = True

serial_result   =  list(map(lambda x:x**10, range(32)))
parallel_result = lview.map(lambda x:x**10, range(32))

serial_result==parallel_result

True

Just like `apply()`, you can use non-blocking map with `block=False` or `map_async`

In [15]:
amr = lview.map_async(lambda x:x**10, range(32))
len(amr.msg_ids)

32

In [16]:
amr = lview.map_async(lambda x:x**10, range(32), chunksize=4)
len(amr.msg_ids)

8

In [18]:
amr = lview.map_async(lambda x:x**10, range(3200))
len(amr.msg_ids)
amr.wait_interactive()
amr.wall_time()

3200/3200 tasks finished after   40 s
done


TypeError: 'float' object is not callable

In [19]:
amr.wall_time

40.066079

In [20]:
amr.serial_time

4.272494999999998

In [21]:
amr = lview.map_async(lambda x:x**10, range(3200), chunksize=100)
len(amr.msg_ids)
amr.wait_interactive()
amr.serial_time, amr.wall_time

  32/32 tasks finished after    0 s
done


(0.046026, 0.604445)

## Map results are iterable!

AsyncResults with multiple results are actually iterable before their
results arrive.

This means that you can perform map/reduce operations on elements as
they come in:

In [22]:
lview.block = False

In [24]:
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print(dv['id'])

# create a Reference to `id`. This will be a different value on each engine
ref = ipp.Reference('id')

tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
    print("%i: %.3f"%(i, time.time()-tic))

[0, 1, 2, 3]
0: 0.027
1: 1.034
2: 2.059
3: 3.051


In [25]:
amr = lview.map_async(time.sleep, [1] * 12)

In [26]:
amr.wait_interactive()

  12/12 tasks finished after    3 s
done


In [27]:
amr.wall_time, amr.elapsed

(3.171044, 3.171044)

In [28]:
amr.serial_time

12.041780999999999

In [30]:
(amr.serial_time / amr.wall_time) / len(rc)

0.9493546131810215

Now we submit a bunch of tasks of increasing magnitude, and
watch where they happen, iterating through the results as they come.

In [31]:
def sleep_here(t):
    """sleep here for a time, return where it happened"""
    import time
    time.sleep(t)
    return id

amr = lview.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r, time.time()-tic))


task 0 on engine 0: 0.002
task 1 on engine 1: 0.016
task 2 on engine 3: 0.019
task 3 on engine 2: 0.019
task 4 on engine 0: 0.049
task 5 on engine 2: 0.049
task 6 on engine 1: 0.069
task 7 on engine 3: 0.100
task 8 on engine 2: 0.175
task 9 on engine 0: 0.206
task 10 on engine 1: 0.214
task 11 on engine 3: 0.237
task 12 on engine 2: 0.311
task 13 on engine 0: 0.398
task 14 on engine 1: 0.405
task 15 on engine 3: 0.431
task 16 on engine 2: 0.506
task 17 on engine 0: 0.579
task 18 on engine 1: 0.644
task 19 on engine 3: 0.670
task 20 on engine 2: 0.744
task 21 on engine 0: 0.826
task 22 on engine 1: 0.883
task 23 on engine 3: 0.962
task 24 on engine 2: 1.026
task 25 on engine 0: 1.110
task 26 on engine 1: 1.179
task 27 on engine 3: 1.253
task 28 on engine 2: 1.318
task 29 on engine 0: 1.452
task 30 on engine 1: 1.517
task 31 on engine 3: 1.587
task 32 on engine 2: 1.654
task 33 on engine 0: 1.837
task 34 on engine 1: 1.901
task 35 on engine 3: 1.975
task 36 on engine 2: 2.050
task 37 on 

In [32]:
amr.wall_time

13.781203

In [33]:
amr.serial_time

49.838634

In [34]:
amr.serial_time / amr.wall_time

3.616421149880747

Unlike `DirectView.map()`, which always results in one task per engine,
LoadBalance map defaults to one task per *item* in the sequence.  This
can be changed by specifying the `chunksize` keyword arg.

In [35]:
amr = lview.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f"%(i, r, time.time()-tic))

task 0 on engine 2: 0.083
task 1 on engine 2: 0.083
task 2 on engine 2: 0.084
task 3 on engine 2: 0.084
task 4 on engine 0: 0.265
task 5 on engine 0: 0.265
task 6 on engine 0: 0.265
task 7 on engine 0: 0.265
task 8 on engine 1: 0.426
task 9 on engine 1: 0.426
task 10 on engine 1: 0.426
task 11 on engine 1: 0.426
task 12 on engine 3: 0.602
task 13 on engine 3: 0.602
task 14 on engine 3: 0.602
task 15 on engine 3: 0.602
task 16 on engine 2: 0.829
task 17 on engine 2: 0.829
task 18 on engine 2: 0.829
task 19 on engine 2: 0.830
task 20 on engine 0: 1.151
task 21 on engine 0: 1.151
task 22 on engine 0: 1.151
task 23 on engine 0: 1.151
task 24 on engine 1: 1.470
task 25 on engine 1: 1.471
task 26 on engine 1: 1.471
task 27 on engine 1: 1.471
task 28 on engine 3: 1.841
task 29 on engine 3: 1.841
task 30 on engine 3: 1.841
task 31 on engine 3: 1.841
task 32 on engine 2: 2.215
task 33 on engine 2: 2.215
task 34 on engine 2: 2.216
task 35 on engine 2: 2.216
task 36 on engine 0: 2.692
task 37 on 

# Excercise

## Parallelize nested loops

Often we want to run a function with a variety of combinations of arguments.
A useful skill is the ability to express a nested loop in terms of a map.

In [36]:
def area(w,h):
    return w*h


widths = range(1,4)
heights = range(6,10)

areas = []
for w in widths:
    for h in heights:
        areas.append(area(w,h))
areas

[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]

In [38]:
for w in widths:
    for h in heights:
        print((w, h))


(1, 6)
(1, 7)
(1, 8)
(1, 9)
(2, 6)
(2, 7)
(2, 8)
(2, 9)
(3, 6)
(3, 7)
(3, 8)
(3, 9)


In [37]:
%run ../hints
nesthint()

In [39]:
from itertools import product
product?

[0;31mInit signature:[0m [0mproduct[0m[0;34m([0m[0mself[0m[0;34m,[0m [0;34m/[0m[0;34m,[0m [0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
product(*iterables, repeat=1) --> product object

Cartesian product of input iterables.  Equivalent to nested for-loops.

For example, product(A, B) returns the same as:  ((x,y) for x in A for y in B).
The leftmost iterators are in the outermost for-loop, so the output tuples
cycle in a manner similar to an odometer (with the rightmost element changing
on every iteration).

To compute the product of an iterable with itself, specify the number
of repetitions with the optional repeat keyword argument. For example,
product(A, repeat=4) means the same as product(A, A, A, A).

product('ab', range(3)) --> ('a',0) ('a',1) ('a',2) ('b',0) ('b',1) ('b',2)
product((0,1), (0,1), (0,1)) --> (0,0,0) (0,0,1) (0,1,0) (0,1,1) (1,0,0) ...
[0;31mType:[0m           type


In [41]:
list(product(widths, heights))

[(1, 6),
 (1, 7),
 (1, 8),
 (1, 9),
 (2, 6),
 (2, 7),
 (2, 8),
 (2, 9),
 (3, 6),
 (3, 7),
 (3, 8),
 (3, 9)]

In [48]:
all_widths = []
all_heights = []
for width in widths:
    for height in heights:
        all_widths.append(width)
        all_heights.append(height)
all_widths, all_heights

([1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3], [6, 7, 8, 9, 6, 7, 8, 9, 6, 7, 8, 9])

In [47]:
list(zip((1, 6), (1, 7)))

[(1, 1), (6, 7)]

In [44]:
list(zip((1, 'a'), (2, 'b')))

[(1, 2), ('a', 'b')]

In [52]:
a = [1, 2, 3]

In [54]:
print(a)

[1, 2, 3]


In [55]:
print(*a)

1 2 3


In [50]:
# %load ../soln/nestedloop.py
# To parallelize every call with map, you just need to get a list for each argument.
# You can use `itertools.product` + `zip` to get this:


import itertools

product = list(itertools.product(widths, heights))
# [(1, 6), (1, 7), (2, 6), (2, 7), (3, 6), (3, 7)]

# So we have a "list of pairs", 
# but what we really want is a single list for each argument, i.e. a "pair of lists".
# This is exactly what the slightly weird `zip(*product)` syntax gets us:

allwidths, allheights = zip(*itertools.product(widths, heights))

print(" widths", allwidths)
print("heights", allheights)

# Now we just map our function onto those two lists, to parallelize nested for loops:

ar = lview.map_async(area, allwidths, allheights)


 widths (1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3)
heights (6, 7, 8, 9, 6, 7, 8, 9, 6, 7, 8, 9)


Validate the result:

In [51]:
p_areas = ar.get()
p_areas

[6, 7, 8, 9, 12, 14, 16, 18, 18, 21, 24, 27]

In [None]:
areas == p_areas

## Examples and Exercises

- [Counting Words!](../examples/Counting%20Words.ipynb)
- [Monte Carlo Options Pricing](../examples/MC%20Options.ipynb)

Now that we've seen multiplexing and load-balancing, let's see how they are [used together](All%20Together.ipynb).