<img src="https://docs.xarray.dev/en/stable/_static/dataset-diagram-logo.png" align="right" width="30%">

# Parallel computing with Dask

This notebook demonstrates one of Xarray's most powerful features: the ability
to wrap [dask arrays](https://docs.dask.org/en/stable/array.html) and allow users to seamlessly execute analysis code in
parallel.

By the end of this notebook, you will:

1. Xarray DataArrays and Datasets are "dask collections" i.e. you can execute
   top-level dask functions such as `dask.visualize(xarray_object)`
2. Learn that all xarray built-in operations can transparently use dask


**Important:** *Using Dask does not always make your computations run faster!* Performance will depend on the computational infrastructure you're using (for example, how many CPU cores), how the data you're working with is structured and stored, and the algorithms and code you're running. Be sure to review the [Dask best-practices](https://docs.dask.org/en/stable/best-practices.html) if you're new to Dask!

<a id='readwrite'></a>

## Reading data

The `chunks` argument to both `open_dataset` and `open_mfdataset` allow you to
read datasets as dask arrays. 


In [None]:
import xarray as xr

In [None]:
ds = xr.tutorial.open_dataset(
    "air_temperature",
    chunks={  # this tells xarray to open the dataset as a dask array
        "lat": 25,
        "lon": 25,
        "time": -1,
    },
)
ds.air

The representation ("repr" in Python parlance) for the `air` DataArray shows the very nice HTML dask array repr. You can access the underlying chunk sizes using `.chunks`:

In [None]:
ds.air.chunks

**Tip**: All variables in a `Dataset` need _not_ have the same chunk size along
common dimensions.

<a id='compute'></a>

## lazy computation 

Xarray seamlessly wraps dask so all computation is deferred until explicitly
requested.

In [None]:
mean = ds.air.mean("time")

Dask actually constructs a graph of the required computation. Here it's pretty simple: The full array is subdivided into 3 arrays. Dask will load each of these subarrays in a separate thread using the default [single-machine scheduling](https://docs.dask.org/en/stable/scheduling.html). You can visualize dask 'task graphs' which represent the requested computation:

In [None]:
mean.data.visualize(optimize_graph=True)

### Getting concrete values from dask arrays

At some point, you will want to actually get concrete values (_usually_ a numpy array) from dask.

There are two ways to compute values on dask arrays.

1. `.compute()` returns an xarray object
2. `.load()` replaces the dask array in the xarray object with a numpy array.
   This is equivalent to `ds = ds.compute()`

## Distributed Clusters

As your data volumes grow and algorithms get more complex it can be hard to print out task graph representations and understand what Dask is doing behind the scenes. Luckily, you can use Dask's 'Distributed' scheduler to get very useful diagnotisic information.

First let's set up a `LocalCluster` using [dask.distributed](https://distributed.dask.org/).

You can use any kind of Dask cluster. This step is completely independent of
xarray. While not strictly necessary, the dashboard provides a nice learning
tool.

In [None]:
from dask.distributed import Client

# This piece of code is just for a correct dashboard link mybinder.org or other JupyterHub demos
import dask
import os

if os.environ.get('JUPYTERHUB_USER'):
    dask.config.set(**{"distributed.dashboard.link": "/user/{JUPYTERHUB_USER}/proxy/{port}/status"})

client = Client(local_directory='/tmp')
client

☝️ Click the Dashboard link above. 

👈 Or click the "Search" 🔍 button in the [dask-labextension](https://github.com/dask/dask-labextension) dashboard.

NOTE: if using the dask-labextension, you should disable the 'Simple' JupyterLab interface (`View -> Simple Interface`), so that you can drag and rearrange whichever dashboards you want. The `Workers` and `Task Stream` are good to make sure the dashboard is working!

In [None]:
import dask.array

dask.array.ones((1000, 4), chunks=(2, 1)).compute()  # should see activity in dashboard

## Examining a DataArray with dask

Let's go back to our xarray DataSet, in addition to computing the mean, other operations such as indexing will automatically use whichever Dask Cluster we are connected to!

In [None]:
ds.air.isel(lon=1, lat=20)

and more complicated operations...


In [None]:
timeseries = ds.air.rolling(time=5).mean().isel(lon=1, lat=20)  # no activity on dashboard
timeseries  # contains dask array

In [None]:
timeseries = ds.air.rolling(time=5).mean()  # no activity on dashboard
timeseries  # contains dask array

In [None]:
computed = mean.compute()  # activity on dashboard
computed  # has real numpy values

Note that `mean` still contains a dask array


In [None]:
mean

But if we call `.load()`, `mean` will now contain a numpy array

In [None]:
mean.load()

Let's check that again...


In [None]:
mean

**Tip:** `.persist()` loads the values into distributed RAM. This is useful if
you will be repeatedly using a dataset for computation but it is too large to
load into local memory. You will see a persistent task on the dashboard.

See https://docs.dask.org/en/latest/api.html#dask.persist for more


### Extracting underlying data: `.values` vs `.data`

There are two ways to pull out the underlying data in an xarray object.

1. `.values` will always return a NumPy array. For dask-backed xarray objects,
   this means that compute will always be called
2. `.data` will return a Dask array

## Xarray data structures are first-class dask collections.

This means you can do things like `dask.compute(xarray_object)`,
`dask.visualize(xarray_object)`, `dask.persist(xarray_object)`. This works for
both DataArrays and Datasets

### Exercise

Visualize the task graph for a few different computations on ds.air!


Gracefully shutdown our connection to the Dask cluster. This becomes more important when you are running on large HPC or Cloud servers rather than a laptop!

In [None]:
client.close()