# Working with IPython and dask.distributed

[dask.distributed](https://distributed.readthedocs.io) is a cool library for doing distributed execution. You should check it out, if you haven't already.

Assuming you already have an IPython cluster running:

In [2]:
import ipyparallel as ipp
rc = ipp.Client()
rc.ids

[0, 1, 2, 3]

You can turn your IPython cluster into a distributed cluster by calling `Client.become_dask()`:

In [3]:
executor = rc.become_dask(ncores=1)
executor

0,1
Client  Scheduler: tcp://192.168.1.19:51609,Cluster  Workers: 4  Cores: 4  Memory: 8.47 GB


This will:

1. start a Scheduler on the Hub
2. start a Worker on each engine
3. return an Executor, the distributed client API

By default, distributed Workers will use threads to run on all cores of a machine. 
In this case, since I already have one *engine* per core,
I tell distributed to run one core per Worker with `ncores=1`.

We can now use our IPython cluster with distributed:

In [1]:
from distributed import progress

def square(x):
    return x ** 2

def neg(x):
        return -x

A = executor.map(square, range(1000))
B = executor.map(neg, A)
total = executor.submit(sum, B)
progress(total)

NameError: name 'executor' is not defined

In [16]:
total.result()

-332833500

I could also let distributed do its multithreading thing, and run one multi-threaded Worker per engine.

First, I need to get a mapping of one engine per host:

In [17]:
import socket

engine_hosts = rc[:].apply_async(socket.gethostname).get_dict()
engine_hosts

{0: 'DESKTOP-9O127PB',
 1: 'DESKTOP-9O127PB',
 2: 'DESKTOP-9O127PB',
 3: 'DESKTOP-9O127PB'}

I can reverse this mapping, to get a list of engines on each host:

In [18]:
host_engines = {}
for engine_id, host in engine_hosts.items():
    if host not in host_engines:
        host_engines[host] = []
    host_engines[host].append(engine_id)

host_engines

{'DESKTOP-9O127PB': [0, 1, 2, 3]}

Now I can get one engine per host:

In [19]:
one_engine_per_host = [ engines[0] for engines in host_engines.values()]
one_engine_per_host

[0]

*Here's a concise, but more opaque version that does the same thing:*

In [20]:
one_engine_per_host = list({host:eid for eid,host in engine_hosts.items()}.values())
one_engine_per_host

[3]

I can now stop the first distributed cluster, and start a new one on just these engines, letting distributed allocate threads:

In [21]:
rc.stop_distributed()

executor = rc.become_dask(one_engine_per_host)
executor

tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: scheduler='tcp://192.168.1.19:63890' processes=24 cores=24>>
Traceback (most recent call last):
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\ioloop.py", line 1229, in _run
    return self.callback()
  File "C:\Users\tedal\Anaconda3\lib\site-packages\distributed\client.py", line 900, in _heartbeat
    self.scheduler_comm.send({'op': 'heartbeat-client'})
  File "C:\Users\tedal\Anaconda3\lib\site-packages\distributed\batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: scheduler='tcp://192.168.1.19:63890' processes=24 cores=24>>
Traceback (most recent call last):
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\ioloop.py", line 1229, in _run
    return self.callback()
  File "C:\Users\tedal\Anaconda3\lib\site-packages\distributed\clien

0,1
Client  Scheduler: tcp://192.168.1.19:50390,Cluster  Workers: 1  Cores: 1  Memory: 2.12 GB


And submit the same tasks again:

In [23]:
A = executor.map(square, range(100))
B = executor.map(neg, A)
total = executor.submit(sum, B)
progress(total)

VBox()

## Debugging distributed with IPython

In [24]:
rc.stop_distributed()

executor = rc.become_dask(one_engine_per_host)
executor

tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: scheduler='tcp://192.168.1.19:50390' processes=1 cores=1>>
Traceback (most recent call last):
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\ioloop.py", line 1229, in _run
    return self.callback()
  File "C:\Users\tedal\Anaconda3\lib\site-packages\distributed\client.py", line 900, in _heartbeat
    self.scheduler_comm.send({'op': 'heartbeat-client'})
  File "C:\Users\tedal\Anaconda3\lib\site-packages\distributed\batched.py", line 117, in send
    raise CommClosedError
distributed.comm.core.CommClosedError


0,1
Client  Scheduler: tcp://192.168.1.19:53103,Cluster  Workers: 1  Cores: 1  Memory: 2.12 GB


tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x00000167A97E9488>, <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\tedal\Anaconda3\lib\site-packages\distributed\comm\tcp.py", line 177, in read
    n_frames = yield stream.read_bytes(8)
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\gen.py", line 1133, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\ioloop.py", line 758, in _run_callback
    ret = callback()
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\stack_context.py", line 300, in null_wrapper
    return fn(*args, **kwargs)
  File "C:\Users\tedal\Anaconda3\lib\site-packages\tornado\ioloop.py", line 779, in _disc

Let's set the %px magics to only run on our one engine per host:

In [12]:
view = rc[one_engine_per_host]
view.block = True
view.activate()

Let's submit some work that's going to fail somewhere in the middle:

In [1]:
from IPython.display import display
from distributed import progress

def shift5(x):
    return x - 5

def inverse(x):
    return 1 / x

shifted = executor.map(shift5, range(1, 10))
inverted = executor.map(inverse, shifted)
                       
total = executor.submit(sum, inverted)
display(progress(total))
total.result()

NameError: name 'executor' is not defined

We can see which task failed:

In [14]:
[ f for f in inverted if f.status == 'error' ]

[<Future: status: error, key: inverse-f8907aa30adc310cc8168553500ca8bb>]

When IPython starts a worker on each engine,
it stores it in the `distributed_worker` variable in the engine's namespace.
This lets us query the worker interactively.

We can check out the current data resident on each worker:

In [15]:
%%px
dask_worker.data

[0;31mOut[7:2]: [0m
{'inverse-07072811957c38188d819607f8020bed': 0.3333333333333333,
 'inverse-0994af96c984b7254e2437daa46df6c8': 1.0,
 'inverse-1934b1ad8662540a6b1a321502d3d81e': 0.25,
 'inverse-2e0af360f3e400c0360eaa3351e80a4d': -1.0,
 'inverse-8ef20ef722160668e84ab435b8293751': -0.5,
 'inverse-bee9906329afc3cb86cc241209453f56': -0.3333333333333333,
 'inverse-cfd3e5b72a33fd2fa85c683107287cf9': -0.25,
 'inverse-d9ed866e67ebc068f6561f9263c4cf73': 0.5,
 'shift5-17c829bc866d38df11bb25ffc7ea887f': -3,
 'shift5-3035396f215ce921eda38f8f36ca3e90': 4,
 'shift5-4951afd99368d41997f42a2f823f566f': 2,
 'shift5-5c9f9254c4a34e7571d53ee4839ea6f2': 1,
 'shift5-8458d8715078405cb9dfed60d1c3d26a': -2,
 'shift5-899e24c059f86698e06254cfd5f3f4ea': -1,
 'shift5-9326e9993993cb1c08355c6e5b8e5970': -4,
 'shift5-cabacfd5aaf525d183d932617b8eac5a': 0,
 'shift5-e233bf13876414d6a0a4817695ac7ca1': 3}

Now that we can poke around with each Worker,
we can have a slightly easier time figuring out what went wrong.