# Introduction to Dask

- Workshop: **Tutorial: High performance computing with Python and RS-DAT, EO summer school 2025**

- Date: September 3, 2025 

Dask is a parallel and distributed computing Python libary. In a nutshell, Dask allows you to:

- set up a **task graph** from several APIs (arrays, data frames, tasks, etc.).
- **run the graph in parallel** on different types of infrastructure (multi-core CPU, cloud, HPC).

From the [Dask documentation](https://docs.dask.org):

<img src="https://docs.dask.org/en/stable/_images/dask-overview.svg" width=70%>

## 1. Creating a task graphs

### 1.1 Dask Delayed

The [Dask delayed](https://docs.dask.org/en/stable/delayed.html) interface is generic, and it allows to build task graphs directly from Python functions.

In [None]:
from dask.delayed import delayed

In [None]:
@delayed
def add(x, y):
    print(x, " + ", y)
    return x + y

In [None]:
a_p = add(1, 2)

In [None]:
b_p = add(a_p, 3)

In [None]:
c_p = add(a_p, b_p)

Visualize task graph using `dask.visualize()`:

In [None]:
import dask
dask.visualize(c_p, rankdir="LR")

Execute the graph (using the default thread-based scheduler) with the `.compute()` method:

In [None]:
c_p.compute()

### 1.2 Dask Arrays

When working with more complex data structures, we can use Dask high-level APIs - e.g. for [array data](https://docs.dask.org/en/stable/array.html). 

From the [Dask documentation](https://docs.dask.org/en/stable/array.html):

<img src="https://docs.dask.org/en/stable/_images/dask-array.svg" width=70%>



In [None]:
import dask.array as da
x = da.random.random((2000, 1000), chunks=(500, 500))
x

In [None]:
y = da.dot(x, x.T)

In [None]:
z = y.mean()

Again, calling `dask.visualize()` on a Dask object shows the task graph:

In [None]:
dask.visualize(z)

And again, calling the `.compute()` method execute the task graph (with the default thread-based scheduler).

In [None]:
z.compute()

### 1.3 Xarray

[Xarray](https://docs.xarray.dev) is the most popular Python library for (labelled) multi-dimensional arrays. 

From the [Xarray documentation](https://docs.xarray.dev): 

<img src="https://docs.xarray.dev/en/stable/_images/dataset-diagram.png" width=70%>

Xarray supports Dask Array as internal data structure, enabling parallel computing and handling of larger-than-memory datasets. 

In [None]:
import rioxarray  # rasterio-enabled xarray

raster_path = '/project/remotesensing/Data/eo-summer-school/sentinel2_rgb_mosaic.tif'

raster = rioxarray.open_rasterio(raster_path, chunks="auto", lock=False)
raster

As for Dask Array, calculations are executed lazily, and only the data that is actually needed will be actually loaded*:

In [None]:
cutout = raster[:, -8192:-1, -8192:-1]
cutout_max = cutout.max()

Similarly to Dask Array objects, also Xarray objects have the `.compute()` method:

In [None]:
cutout_max.compute()

## 2. Execute task graphs

Dask supports [several types of infrastructure](https://docs.dask.org/en/stable/deploying.html) to execute the task graph.

### 2.1 Multi-threading/processing

One can run the graph locally using `multithreading` or `multiprocessing`:

In [None]:
cutout_max.compute(scheduler="threads", n_workers=2)

In [None]:
cutout_max.compute(scheduler="processes", n_workers=2)

### 2.2 Distributed scheduler

Alternatively, one can run the graph on Dask's "distributed" scheduler (which, despite the name, also supports local deployments - see the [`LocalCluster()`](https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster)). 

On HPC systems, Dask clusters are most easily created via [Dask-Jobqueue](https://jobqueue.dask.org), which can be nicely integrated within the Jupyter environment (see side bar).

In [None]:
# connect to cluster here

In [None]:
cutout_max.compute()

Dask distributed also provides a dashboard that can be used to monitor the execution of the task graph together with plenty of resource usage information!  