# Working with distributed

[Distributed](https://distributed.readthedocs.org) is a cool library for doing distributed execution. You should check it out, if you haven't already.

Assuming you already have an IPython cluster running:

In [1]:
import ipyparallel as ipp
rc = ipp.Client()
rc.ids

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

You can turn your IPython cluster into a distributed cluster by calling `Client.become_distributed()`:

In [2]:
rc.stop_distributed()

In [3]:
executor = rc.become_distributed(ncores=1)
executor

<Executor: scheduler=172.17.0.2:39898 workers=12 threads=12>

This will:

1. start a Scheduler on the Hub
2. start a Worker on each engine
3. return an Executor, the distributed client API

By default, distributed Workers will use threads to run on all cores of a machine. 
In this case, since I already have one *engine* per core,
I tell distributed to run one core per Worker with `ncores=1`.

We can now use our IPython cluster with distributed:

In [4]:
from distributed import progress

def square(x):
    return x ** 2

def neg(x):
        return -x

A = executor.map(square, range(1000))
B = executor.map(neg, A)
total = executor.submit(sum, B)
progress(total)

In [5]:
total.result()

-332833500

I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.

First, I need to get a mapping of one engine per host:

In [6]:
import socket

engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()
engine_hosts

{0: 'c1f0e6865bd0',
 1: 'c1f0e6865bd0',
 2: 'c1f0e6865bd0',
 3: 'c1f0e6865bd0',
 4: 'b0cd5ef58393',
 5: 'b0cd5ef58393',
 6: 'b0cd5ef58393',
 7: 'b0cd5ef58393',
 8: '5f5fe9720dc2',
 9: '5f5fe9720dc2',
 10: '5f5fe9720dc2',
 11: '5f5fe9720dc2'}

I can reverse this mapping, to get a list of engines on each host:

In [7]:
host_engines = {}
for engine_id, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(engine_id)

host_engines

{'5f5fe9720dc2': [8, 9, 10, 11],
 'b0cd5ef58393': [4, 5, 6, 7],
 'c1f0e6865bd0': [0, 1, 2, 3]}

Now I can get one engine per host:

In [8]:
one_engine_per_host = [ engines[0] for engines in host_engines.values()]
one_engine_per_host

[4, 8, 0]

*Here's a concise, but more opaque version that does the same thing:*

In [9]:
one_engine_per_host = list({host:eid for eid,host in engine_hosts.items()}.values())
one_engine_per_host

[7, 11, 3]

I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:

In [10]:
rc.stop_distributed()

executor = rc.become_distributed(one_engine_per_host)
executor

<Executor: scheduler=172.17.0.2:37811 workers=3 threads=12>

And submit the same tasks again:

In [11]:
A = executor.map(square, range(100))
B = executor.map(neg, A)
total = executor.submit(sum, B)
progress(total)

## Debugging distributed with IPython

In [12]:
rc.stop_distributed()

executor = rc.become_distributed(one_engine_per_host)
executor

<Executor: scheduler=172.17.0.2:50514 workers=3 threads=12>

Let's set the %px magics to only run on our one engine per host:

In [13]:
view = rc[one_engine_per_host]
view.block = True
view.activate()

Let's submit some work that's going to fail somewhere in the middle:

In [14]:
from IPython.display import display
from distributed import progress

def shift5(x):
    return x - 5

def inverse(x):
    return 1 / x

shifted = executor.map(shift5, range(1, 10))
inverted = executor.map(inverse, shifted)
                       
total = executor.submit(sum, inverted)
display(progress(total))
total.result()

ZeroDivisionError: division by zero

We can see which task failed:

In [15]:
[ f for f in inverted if f.status == 'error' ]

[<Future: status: error, key: inverse-bc7b7ed8f481797d9d45265f252f8ec0>]

When IPython starts a worker on each engine,
it stores it in the `distributed_worker` variable in the engine's namespace.
This lets us query the worker interactively.

We can check out the current data resident on each worker:

In [16]:
%%px
distributed_worker.data

[0;31mOut[7:8]: [0m
{'inverse-d6e0e00cb3347936f19e9df3d38015f4': 0.3333333333333333,
 'shift5-5eacd1e1be0e3976b644ab2a163dd111': 3}

[0;31mOut[11:8]: [0m
{'inverse-10c0d2c106d28ef201f314c962839478': -1.0,
 'inverse-6f68a726ebadebb024c30716b0ddbb33': -0.25,
 'inverse-a354f18c105e23ca2dc069587b14a5b8': 0.5,
 'shift5-3712885ebc1366e919cb866c6ce7e77c': -1,
 'shift5-4786df703d91632633e692bfcb47532e': -4,
 'shift5-a427120073a9ba8d2916dc408ec401c4': 0,
 'shift5-da44bc9d4f586c6a49b0d5822143e2a4': 2}

[0;31mOut[3:8]: [0m
{'inverse-59a0bf9ebd0226fb90bc0a5df5068b9f': 0.25,
 'inverse-65ef68148fccac8c4b06be72b955729a': -0.5,
 'inverse-6dda39073ba4c48681553145497ff0ad': 1.0,
 'inverse-8990646f88d288c4fbdb3b0be2017cf8': -0.3333333333333333,
 'shift5-17492f2a64f68132ebc986319e5dc9c3': 4,
 'shift5-5dbf6e302e056087bd8a55aa5ec44932': -2,
 'shift5-b0c68352cec6649a8b25dd9dded3679e': 1,
 'shift5-daecbbabc904f01ce981587adf933318': -3}

Now that we can poke around with each Worker,
we can have a slightly easier time figuring out what went wrong.