# Python distributed computing

<br/>
<div align="center">18th of June, 2021</div>
<br/>
<div align="center">
    Thomas Arildsen<br/>
    <a href="mailto:tari@its.aau.dk">tari@its.aau.dk</a>
<div/>
<br/>
<div align="center">
CLAAUDIA<br/>
Aalborg University
</div>

# Python distributed computing

## Agenda

* Using the `distributed` module
* Integrating with `dask`
* Alternative: IPython parallel

## Introduction

- We have seen some theory and practice of parallel computing on *one* physical computer this morning.
- Now we turn to distributed computing. That is, computing across two or more physical computers (each of which can have several CPUs/cores)

### *Symmetric Multi-Processing (SMP)* vs. distributed processing

![Symmetric Multi-Processing](figures/SMP_-_Symmetric_Multiprocessor_System.svg)
*Image source: [WikiPedia](https://commons.wikimedia.org/wiki/File:SMP_-_Symmetric_Multiprocessor_System.svg) (CC-BY-SA)*

### Symmetric Multi-Processing (SMP) vs. *distributed processing*

![Symmetric Multi-Processing](figures/Beowulf.png)
*Image source: [WikiPedia](https://commons.wikimedia.org/wiki/File:Beowulf.png) (public domain)*

How can we do distributed computing in Python?

## `distributed`

### Set up cluster

We are going to cheat a bit and set up a scheduler and workers on our own computer and pretend these are running on separate computers...

In [2]:
from distributed import LocalCluster
lc = LocalCluster()

In [None]:
lc

- The scheduler and workers can also be started from the command line - see: http://distributed.readthedocs.io/en/latest/setup.html#using-the-command-line

### Using the cluster - Dask

In order to distribute tasks on the workers, we create a `dask.distributed.Client`:

In [3]:
from dask.distributed import Client

client = Client(lc.scheduler_address)
client

0,1
Client  Scheduler: tcp://127.0.0.1:51118  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.59 GB


We can now submit work on the cluster in the same way as we do when using a pool of workers in `concurrent.futures`.

**SPMD-style processing**
- We re-use an example from earlier today from `concurrent.futures` - now adapted to `dask.distributed`.

In [4]:
import os, time

def _f(d):
    # Defines the f(d) function (f(d) is a single task)
    time.sleep(float(d)/10.)
    pid = os.getpid()
    print(" _f argument: {:2d}, process id: {:7d} ".format(d, pid))
    return pid

print("Parent process id: {:7d}".format(os.getpid()))
future = client.map(_f, (30 ,15 ,2))
client.gather(future)

Parent process id:    2764


[2776, 2778, 2777]

**MPMD-style processing**
- Again, we re-use an example from earlier today from `concurrent.futures`.

In [5]:
def _f1(d):
    # Defines the f1(d) function (f(d) is a single task)
    return d + 1

def _f2(d):
    # Defines the f1(d) function (f(d) is a single task)
    return d + 2

def _f3(d):
    # Defines the f1(d) function (f(d) is a single task)
    return d + 3

In [6]:
if __name__ == '__main__': # We have to use this to make it work in this interactive interpreter
    futurelist = []
    futurelist.append(client.submit(_f1, 1))
    futurelist.append(client.submit(_f2, 1))
    futurelist.append(client.submit(_f3, 1))
    #print([future.result() for future in futurelist])
    print(client.gather(futurelist))

[2, 3, 4]


- Notice how we could use `client.gather` on a list of result futures to get them all.
- We could also call `.result()` on each of the individual result futures to retrieve them individually.

### Caveats on distribution of data

- When we submit tasks to the workers, the functions and data get serialised (pickled) and transmitted to the workers.
- This is relatively inexpensive in the cases we saw earlier today where all processing takes place on one computer; objects and functions get copied in local memory.
- In the case of distributed computing, the serialised data is transmitted across a network to the workers $\rightarrow$ generally much slower.
- Generally, the tasks we want to perform on the workers should take (much) longer than it takes to transmit the data to the workers. Otherwise, distributing the work is not worthwhile.
- For large amounts of data, it is often better to keep this stored in files in a network location accessible to all the workers.

### Load data from file on workers

In this example we demonstrate how the workers can load the data they need from files they can access locally.

In [None]:
ls datafile*

In [None]:
import glob
files = glob.glob('*.hdf5')
for idx, file in enumerate(files):
    files[idx] = os.path.abspath(file)
files

absolute path in order to mitigate issues

Define a function to load a file on a worker:

In [None]:
import h5py
import dask.array as da

def load_file(filename):
    f = h5py.File(filename,'r')
    return f['/data'][...]

In [None]:
import dask.array as da

def load_file2(filename):
    return da.random.normal(size=(100,100))

Let us try it:

In [None]:
load_file(files[0])

Now we can map the `load_file` function to the workers to let them load each their own data set from file:

In [None]:
futures = client.map(load_file, files)
futures

### Bring the function to the data

When you need to do further processing on a result that has been computed on the workers, it is usually much more efficient to keep the data on the workers and push the function you need computed to them instead of bringing the data back to the client computer.

Slow:

In [None]:
futures[0].result().shape

The above fetches the $100 \times 100$ NumPy array (large) that was loaded from file on the worker and queries its shape on the client computer.

Fast:

In [None]:
client.submit(lambda a: a.shape, futures[0]).result()

The above sends a function to the worker containing the data loaded in `futures[0]`. The function queries the shape property of the array. The resulting (small) shape tuple is then sent back to the client computer.
- Note how this example uses a lambda function to call the shape property of the array since the shape property is not a function we can call.

## Integration with Dask

We can use Dask arrays (which we saw earlier today) directly on top of `distributed`'s futures and thereby transparently use Dask in a distributed way.

**Example**

In [None]:
import dask
import dask.array as da
from dask import delayed
concat_future = client.submit(da.concatenate, futures)

In [None]:
array = concat_future.result()
array

`array` is now one big Dask array which we can manipulate on the client computer.
- The data is not on the client computer; it sits on the worker nodes - distributed across them.
- Dask now transparently takes care of the subsequent operations we do on the combined array:

In [None]:
client.compute(array[:10,:10]).result()

We just used Dask from the `distributed` side where we had already retrieved data on the workers and then put a Dask interface on top of it afterwards.

We can also start from Dask if we like and set that up to use `distributed` "under the hood":

In [None]:
y = da.arange(100, chunks=(25,))
y

In [None]:
client.compute(y).result()

### Considerations about network and cluster set-up

- For the sake of security, it is recommended to use a distributed set-up as we have sketched here on a secure network.
- That is, you will typically run a set-up like this on a dedicated cluster where you have control over who can access the scheduler and worker nodes and the network between them is closed to outsiders.
- This is to avoid the risk of unauthorised users interacting with the scheduler and submitting malevolent tasks etc.
- So, a distributed worker set-up like this makes sense if you have access to a cluster where you can control the entire cluster or at least trust other users on the cluster.
- If you do not have access to a cluster, `distributed` also lets you run distributed work on for example Amazon EC2: https://distributed.readthedocs.io/en/latest/ec2.html

## IPython parallel

IPython also includes functionality to run a cluster for distributed computing.
- The security aspects mentioned before are particularly important for IPython parallel: http://minrk-parallel.readthedocs.io/en/latest/security.html

Using IPython parallel is similar to `distributed`. We illustrate it with a small example.

### Install `ipyparallel`

```bash
conda install ipyparallel
```

### Set up cluster

As before, we are going to cheat and set up a scheduler and workers on our own computer and pretend these are running on separate computers...

Start both scheduler and 4 workers using the following command:
```bash
ipcluster start -n 4
```
- I will demonstrate alongside so you can see what it should look like.
- The above starts a controller and 4 engines on the local computer. To start controller and engines on different computers in a cluster see further set-up details here: http://minrk-parallel.readthedocs.io/en/latest/process.html

### Using the cluster

In order to distribute tasks on the engines, we need to import `ipyparallel` on the client computer.

In [None]:
import ipyparallel as ipp

c = ipp.Client()

c.ids

c[:].apply_sync(lambda : "Hello, World")

One way to distribute tasks to the workers in a similar way to previous examples we have used, we use `ipyparallel.LoadBalancedView`.

**SPMD-style processing**
- We re-use an example from earlier today from `concurrent.futures` - now adapted to `ipyparallel`.

In [None]:
import os, time

@ipp.require('os', 'time')
def _f(d):
    # Defines the f(d) function (f(d) is a single task)
    time.sleep(float(d)/10.)
    pid = os.getpid()
    print(" _f argument: {:2d}, process id: {:7d} ".format(d, pid))
    return pid

print("Parent process id: {:7d}".format(os.getpid()))
lview = c.load_balanced_view()
result = lview.map_async(_f, (30 ,15 ,2))
result

In [None]:
result.result()

- Note the `@ipp.require` decorator to ensure that necessary modules are available on the workers.

**MPMD-style processing**
- Again, we re-use an example from earlier today from `concurrent.futures`.

In [None]:
def _f1(d):
    # Defines the f1(d) function (f(d) is a single task)
    return d + 1

def _f2(d):
    # Defines the f1(d) function (f(d) is a single task)
    return d + 2

def _f3(d):
    # Defines the f1(d) function (f(d) is a single task)
    return d + 3

futurelist = []
futurelist.append(lview.apply_async(_f1, 1))
futurelist.append(lview.apply_async(_f2, 1))
futurelist.append(lview.apply_async(_f3, 1))
print([future.result() for future in futurelist])

### Integration with Dask

IPython Parallel can also provide a cluster for Dask

* Similar to how we saw it done with `dask.distributed`
* See: https://distributed.readthedocs.io/en/latest/ipython.html