# Distributed trask graph execution

In [1]:
from pi_workload import define_pi_workload

Note, we only need to import the Dask distributed package,

In [2]:
import dask.distributed

In [3]:
client = dask.distributed.Client(n_workers=1)

In [4]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:42547  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 16  Memory: 135.09 GB


The other code is basically identical to the previous example,

In [5]:
import time, numpy

In [6]:
start = time.time()
pi = define_pi_workload().compute()
elapse = time.time() - start

print('pi computed:   ', pi)
print('pi from numpy: ', numpy.pi)
print('wall time: ', elapse, 'sec')

workload in giga bytes: 250.0
500.0 chunks to process
pi computed:    3.141595629056
pi from numpy:  3.141592653589793
wall time:  80.39229726791382 sec


Note, that task graph execution was still local, as we didn't specify a remote setup of Dask workers!

## Really going distributed...

There are only two ingredients for this to work properly: The first, each machine needs to have local access to a Dask software environment. The second, the machines need to be able to communicate with each other. (If these kind of examples don't work on your machines, a likely reason is that necessary ports are blocked. In this case, your admins might be able to help.)

### Create a Dask scheduler process on a second machine
 First, we need to setup a Dask scheduler process which needs to be visible by the local machine with the client process. This will only setup the scheduler, without any actual Dask workers.

In [9]:
!hostname

scalc15


In a Jupyter terminal open the Dask scheduler on a remote machine with SSH,


```bash
$ ssh khoeflich@scalc03.geomar.de \
source $HOME/Course-Data-Science-with-Dask/04_distributed/conda.sh && \
dask-scheduler
```


In [10]:
client = dask.distributed.Client('tcp://10.199.124.115:8786')

In [11]:
client

0,1
Client  Scheduler: tcp://10.199.124.115:8786  Dashboard: http://10.199.124.115:36571/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


### Create a Dask worker on a third machine
To be able to execute task graphs, we need at least one Dask worker.

```bash
$ ssh khoeflich@scalc08.geomar.de \
source $HOME/Course-Data-Science-with-Dask/04_distributed/conda.sh && \
dask-worker 10.199.124.115:8786
```

In [12]:
client

0,1
Client  Scheduler: tcp://10.199.124.115:8786  Dashboard: http://10.199.124.115:36571/status,Cluster  Workers: 1  Cores: 16  Memory: 135.09 GB


Now we can repeat the workload from above, but on a scaled out / distributed cluster.

In [13]:
start = time.time()
pi = define_pi_workload().compute()
elapse = time.time() - start

print('pi computed:   ', pi)
print('pi from numpy: ', numpy.pi)
print('wall time: ', elapse, 'sec')

workload in giga bytes: 250.0
500.0 chunks to process
pi computed:    3.141594410496
pi from numpy:  3.141592653589793
wall time:  77.30971550941467 sec


### Of course, we could add further Dask workers manually...

But doing this would be rather tedious. Luckily, Dask is a relatively mature collection of tools and provides many high-level "cluster managers", such as e.g. the `dask-ssh` command-line tool or the `Dask.distributed.SSHCluster()` interface. (For an incomplete list of more Dask deployment options, see the slides!)

## Using a cluster manager interface

We could scale out a Dask cluster with `dask-ssh scalc{06..09}.geomar.de` in the terminal, but let's have a look at how cluster management from within Jupyter notebook looks. (This is very similar for many other Dask cluster deployment options! And migration of a Dask task graph calculation to a differently managed machine becomes simple from a user perspective.)

In [15]:
ssh_cluster = dask.distributed.SSHCluster(
    ["scalc05.geomar.de", # scheduler
     "scalc06.geomar.de", # workers...
     "scalc07.geomar.de",
     "scalc08.geomar.de",
     "scalc09.geomar.de",
    ],
)

distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at: tcp://10.199.124.105:8786
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.199.124.109:37151'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.199.124.106:37817'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.199.124.108:33053'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Sta

In [16]:
client = dask.distributed.Client(ssh_cluster)

In [17]:
client

0,1
Client  Scheduler: tcp://10.199.124.105:8786  Dashboard: http://10.199.124.105:8787/status,Cluster  Workers: 4  Cores: 64  Memory: 540.37 GB


In [18]:
start = time.time()
pi = define_pi_workload().compute()
elapse = time.time() - start

print('computed:   ', pi)
print('from numpy: ', numpy.pi)
print('wall time: ', elapse, 'sec')

workload in giga bytes: 250.0
500.0 chunks to process
computed:    3.141587140864
from numpy:  3.141592653589793
wall time:  23.551914930343628 sec


Note, that we have strongly reduced the task graph execution wall time here!

In [19]:
ssh_cluster.close() # don't forget this, especially with remote workers!

## Python environment

In [20]:
!conda list --explicit

# This file may be used to create an environment using:
# $ conda create --name <env> --file <this file>
# platform: linux-64
@EXPLICIT
https://conda.anaconda.org/conda-forge/linux-64/_libgcc_mutex-0.1-conda_forge.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/ca-certificates-2020.11.8-ha878542_0.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/ld_impl_linux-64-2.35.1-hed1e6ac_0.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/libgfortran5-9.3.0-he4bcb1c_17.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/libstdcxx-ng-9.3.0-h2ae2ef3_17.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/pandoc-2.11.1.1-h36c2ea0_0.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/libgfortran-ng-9.3.0-he4bcb1c_17.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/libgomp-9.3.0-h5dbcf3e_17.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/_openmp_mutex-4.5-1_gnu.tar.bz2
https://conda.anaconda.org/conda-forge/linux-64/libgcc-ng-9.3.0-h5dbcf3e_17.tar.bz2
ht

In [21]:
!pip list

Package                       Version
----------------------------- -------------------
argon2-cffi                   20.1.0
async-generator               1.10
asyncssh                      2.4.2
attrs                         20.3.0
backcall                      0.2.0
backports.functools-lru-cache 1.6.1
bcrypt                        3.2.0
bleach                        3.2.1
bokeh                         2.2.3
brotlipy                      0.7.0
certifi                       2020.11.8
cffi                          1.14.3
chardet                       3.0.4
click                         7.1.2
cloudpickle                   1.6.0
conda                         4.9.2
conda-package-handling        1.7.2
cryptography                  3.2.1
cytoolz                       0.11.0
dask                          2.30.0
dask-jobqueue                 0.7.1
decorator                     4.4.2
defusedxml                    0.6.0
distributed                   2.30.1
entrypoints                   0.3
fsspe