# Multi-GPU with Dask

Leveraging GPUs to accelerate your workloads can result in orders of magnitude increase in performance, but once your workload is fully utilizing the device you will start to reach a new ceiling of performance.

This is where multi-GPU and multi-node workloads come in. It is possible to use many GPUs together and see another step change in performance.

Before we dive into multi-GPU workloads I do want to caution that distributed computation can increase the complexity of your code. The tools discussed in this chapter do everything they can to ease the burden of distributed computing but we should be sure to check that we have squeezed out every last drop of performance on a single GPU before we start scaling out.

## Dask

[Dask](https://dask.org) is a Python library for scaling out your Python code. At it's core Dask takes your Python code and converts it into a computation graph of function calls, inputs and outputs. It then has a selection of schedulers that it can use to execute through this graph in parallel. Here we are going to focus on Dask's distributed scheduler.

In [None]:
from dask.distributed import Client

client = Client()
client

In [None]:
# Submit a function to be executed on the Dask cluster
f = client.submit(lambda: 10 + 1)
f.result()

In [None]:
# Use a high level collection API to distribute familiar work on the cluster
import dask.array as da
arr = da.random.random((1000, 1000), chunks=(100, 100))
arr.mean().compute()

Dask doesn't hugely care about what your code it doing, it just attempts to run through the graph as quickly as possible with its pool of workers. Because we've done all of our GPU computation in Python Dask can also distribute our GPU code too.

In [None]:
client.close()

### Distributed clusters

In order for Dask to distribute a graph onto many machines it needs a scheduler process and a number of worker processes. We can start these manually, either via the CLI commands `dask-scheduler` and `dask-worker` or using any number of Dask's cluster managers.

#### Cluster managers

Dask can be run in any number of compute environments as long as you have a Python environment, network connectivity and can start the scheduler and worker processes.

To make creating a Dask cluster a consistent experience there are a number of cluster manager classes that you can import and instantiate which will construct a cluster for you.

The first that most folks interact with is `LocalCluster`. When you create an instance of this class it will inspect the CPU and memory resources available on the local computer and create subprocesses for the scheduler and an appropriate number of workers automatically.

In [None]:
from dask.distributed import LocalCluster

cluster = LocalCluster()
cluster

This is great for trying Dask our and using it to leverage all of the CPU cores available on your local machine.

Once you are ready to break beyond the confines of your computer there are cluster managers for HPC platforms like SLURM, PBS and SGE. Cluster managers for Kubernetes, Hadoop and public cloud providers including Amazon Web Services, Microsoft Azure and Google Cloud Platform.

The most popular cluster manager we see folks using is the `SSHCluster` class which opens secure shell connections to other machines on your network and starts Dask processes on them. This can be great for remotely leveraging servers or even other desktops that aren't being used.

```python
from dask.distributed import SSHCluster

cluster = SSHCluster([
    "localhost",  # Hostname to start the scheduler on
    "10.0.0.2",   # Hostname to start the first worker on
    "10.0.0.3",   # Hostname to start the second worker on
    ...           # etc
])
```

In [None]:
cluster.close()

#### Dask CUDA

When it comes to using GPUs with Dask there are a few things we need to bear in mind. Each Dask worker needs to have exactly one GPU, so if your machine has multiple GPUs you'll need one worker per device. There are also a few other things that need to be done in order for a Dask worker to successfully be able to leverage a GPU. To simplify this for the user you can use the tools found in the Python package `dask-cuda`.

The Dask CUDA package has a cluster manager called `LocalCUDACluster` and an alternative worker CLI command called `dask-cuda-worker`. Both of these inspect your hardware and start one worker per GPU and correctly configure each workers to only use their allocated device.

In [None]:
from dask_cuda import LocalCUDACluster

cluster = LocalCUDACluster()
cluster

In [None]:
client = Client(cluster)

It is also possible to configure the other cluster managers that leverage HPC and Cloud capabilities to use the Dask CUDA worker instead of the regular worker.

Once we have a Dask cluster with GPU workers we could manually submit some CUDA kernels written with Numba to be executed on those GPUS.

In [None]:
from numba import cuda

@cuda.jit
def some_kernel():
    i = 0
    while i < 1_000_000:
        i += 1 
        
f = client.submit(some_kernel[1024*1024, 1024])
f

In [None]:
f

### High level collections

Thankfully we don't have to do everything manually with Dask. Dask has a concept of high-level collections which implement the API of a popular Python package but chunk/partition up data structures and tasks so that they can be run on a Dask cluster. Commonly folks use `dask.array` which follows the NumPy API, `dask.dataframe` which follows the Pandas API and `dask.ml` which follows the Scikit-Learn API.

This approach may sound familiar, we've already seen RAPIDS libraries that mimic the APIs of these libraries to provide accelerated computation. Dask does the same but for distributed computation. One of the benefits of this approach is that we can combine them to get distributed and accelerated computation of the tools we already know and love.

When `dask.dataframe` create a DataFrame it constructs a task graph that consists of many smaller Pandas DataFrames. Then operations like taking the mean of a series will first be performed on each Pandas DataFrame before the results are aggregated to get the overall mean. But Dask is not limited to using Pandas in its DataFrame collection, it can also leverage other libraries that follow the Pandas API like cuDF.

cuDF comes with a useful helper library for constructing Dask DataFrames made up of cuDF DataFrames and we can load our data and perform operations just as we have seen before.

In [None]:
import dask
import cudf
import dask_cudf

In [None]:
@dask.delayed
def gen_partition():
    return cudf.datasets.timeseries()

gddf = dask_cudf.from_delayed([gen_partition() for i in range(30)])
gddf

In [None]:
gddf.head()

In [None]:
len(gddf)

In [None]:
gddf.groupby("name").x.mean().compute()

But now our DataFrame is distributed across all of our GPUs and computations can leverage the performance of all of our hardware.

### Communication

In Chapter 1 when we were exploring Numba CUDA we saw that there were penalties when it comes to moving data from CPU memory into the GPU memory. The same applies with moving data between GPU memories and between GPUs on different machines.

By default Dask uses a custom TCP protocol for communication between workers. This means that any memory transfer from one GPU to another has to make its way back up the PCI-E lanes to the CPU, into the operating system's network stack to be routed to it's destination. If the GPU it is sending it to is in the same machine it will head back down the PCI-E lanes and into the GPU. If it is located on another machine it will make its way our via the IP network, most likely via an ethernet connection.

In the case where our two GPUs are sat next to each other on the motherboard this is very wasteful. They could even be connected to each other directly via NVLINK, or at least connected to the same PCI-E switch on the motherboard. Routing every transfer via the CPU is wasteful, and that's where UCX comes in.

#### UCX

[UCX](https://openucx.org/) is a network protocol which can inspect the topology of the systems and find an optimal route via accelerated hardware. If two GPUs are connected via NVLINK then UCX will use that to transfer data, if they are connected to the same PCI-E switch that is the next best bet. If the GPUs are on two separate machines but those machines have Infiniband network cards then UCX can leverage RDMA over Infiniband to also transfer data directly between the GPUs.

UCX will do everything in its power to transfer data as directly and performantly as possible between two locations before ultimately falling back to TCP.

#### Dask communication protocols

Dask supports alternative communication protocols which can be configured by the user. This includes UCX which we can leverage for more performance but also other protocols like websockets which may be more flexible in modern system architectures due to being easier to proxy.

If we use UCX with our GPU workers and have accelerated networking hardware like NVLINK or Infiniband then we can see much reduced memory transfer times between our GPU workers.

### Resource annotations

The last topic I wanted to cover with Dask and GPUs is annotations. Dask has a capability where each task in the task graph can be annotated with requirements that a worker needs to have in order to be able to run it.

When we start up a worker we can also add our resource labels so that the scheduler can place appropriate tasks on appropriate workers. This feature is most powerful when our workers are a mixed bag of configurations.


```console
$ dask-cuda-worker scheduler:8786 --resources "GPU=2"
```

There may be steps in your task graph where memory usage increases heavily during an intermediate calculation. It can be helpful to steer these tasks towards workers that have more memory than the rest.

We can also use this for GPU work if not all of our workers have GPUs. It would be reasonable to have a number of regular Dask workers that take on most of your tasks but also have a couple of GPU workers to run steps that have been optimised to run on the GPU.

This would likely be most useful if you have an existing workload that leverages Dask and you want to experiment with GPUs. You could add another worker that has a GPU, choose some tasks in your workflow to optimize with Numba and annotate those tasks to only run on your GPU worker.

```python
foo = client.submit(some_non_gpu_function)

with dask.annotate(resources={'GPU': 1}):
    bar = client.submit(a_gpu_function, foo)
    
baz = client.submit(another_non_gpu_function, bar)
```