# Load-balancing with IPython.parallel

In [None]:
import os,sys,time
import numpy as np

from IPython.core.display import display
import ipyparallel as parallel
rc = parallel.Client()
dview = rc[:]

Create a LoadBalancedView

In [None]:
lview = rc.load_balanced_view()
lview

LoadBalancedViews behave very much like a DirectView on a single engine:

Each call to `apply()` results in a single remote computation,
and the result (or AsyncResult) of that call is returned directly,
rather than in a list, as in the multi-engine DirectView.

In [None]:
e0 = rc[0]

In [None]:
from numpy.linalg import norm
A = np.random.random(1024)

e0.apply_sync(norm, A, 2)

In [None]:
lview.apply_sync(norm, A, 2)

However, unlike the DirectView of a single engine, you are letting the IPython Scheduler decide which engine should do the work:

In [None]:
e0.apply_sync(os.getpid)

In [None]:
for i in range(2*len(rc.ids)):
    pid = lview.apply_sync(os.getpid)
    print("task %i ran on: %i" % (i, pid))

# Map

The LoadBalancedView also has a load-balanced version of the builtin `map()`

In [None]:
lview.block = True

serial_result   =       map(lambda x:x**10, range(32))
parallel_result = lview.map(lambda x:x**10, range(32))

serial_result==parallel_result

Just like `apply()`, you can use non-blocking map with `block=False` or `map_async`

In [None]:
amr = lview.map_async(lambda x:x**10, range(32))
amr.msg_ids

In [None]:
amr = lview.map_async(lambda x:x**10, range(32), chunksize=4)
amr.msg_ids

## Map results are iterable!

AsyncResults with multiple results are actually iterable before their
results arrive.

This means that you can perform map/reduce operations on elements as
they come in:

In [None]:
lview.block = False

In [None]:
# scatter 'id', so id=0,1,2 on engines 0,1,2
dv = rc[:]
dv.scatter('id', rc.ids, flatten=True)
print(dv['id'])

# create a Reference to `id`. This will be a different value on each engine
ref = parallel.Reference('id')

tic = time.time()
ar = dv.apply(time.sleep, ref)
for i,r in enumerate(ar):
    print("%i: %.3f"%(i, time.time()-tic))

In [None]:
amr = lview.map_async(time.sleep, [1] * 12)

In [None]:
amr.wait_interactive()

In [None]:
amr.wall_time, amr.elapsed

In [None]:
amr.serial_time

Now we submit a bunch of tasks of increasing magnitude, and
watch where they happen, iterating through the results as they come.

In [None]:
def sleep_here(t):
    """sleep here for a time, return where it happened"""
    import time
    time.sleep(t)
    return id

amr = lview.map(sleep_here, [.01*t for t in range(100)])
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f" % (i, r, time.time()-tic))


In [None]:
amr.wall_time

In [None]:
amr.serial_time

In [None]:
amr.serial_time / amr.wall_time

Unlike `DirectView.map()`, which always results in one task per engine,
LoadBalance map defaults to one task per *item* in the sequence.  This
can be changed by specifying the `chunksize` keyword arg.

In [None]:
amr = lview.map(sleep_here, [.01*t for t in range(100)], chunksize=4)
tic = time.time()
for i,r in enumerate(amr):
    print("task %i on engine %i: %.3f"%(i, r, time.time()-tic))

# Excercise

## Parallelize nested loops

Often we want to run a function with a variety of combinations of arguments.
A useful skill is the ability to express a nested loop in terms of a map.

In [None]:
def area(w,h):
    return w*h


widths = range(1,4)
heights = range(6,10)

areas = []
for w in widths:
    for h in heights:
        areas.append(area(w,h))
areas

In [None]:
%run ../hints
nesthint()

In [None]:
%load ../soln/nestedloop.py

Validate the result:

In [None]:
p_areas = ar.get()
p_areas

In [None]:
areas == p_areas

## Examples and Exercises

- [Counting Words!](../examples/Counting%20Words.ipynb)
- [Monte Carlo Options Pricing](../examples/MC%20Options.ipynb)

Now that we've seen multiplexing and load-balancing, let's see how they are [used together](All%20Together.ipynb).