# Parallel computing with Dask

## Authors & Contributors
### Authors
- Tina Odaka, Ifremer (France), [@tinaok](https://github.com/tinaok)
### Contributors
- Pier Lorenzo Marasco, Ispra (Italy), [@pl-marasco](https://github.com/pl-marasco)
- Anne Fouilloux, University of Oslo (Norway), @annefou

<div class="alert alert-info">
<i class="fa-question-circle fa" style="font-size: 22px;color:#666;"></i> Overview
    <br>
    <br>
    <b>Questions</b>
    <ul>
        <li>What is Dask?</li>
        <li>How can I parallelize my data analysis with Dask?</li>
    </ul>
    <b>Objectives</b>
    <ul>
        <li>Learn about Dask</li>
        <li>Learn about Dask Gateway, Dask client, scheduler, workers</li>
        <li>Understand out-of-core and speed-up limitations</li>
    </ul>
</div>

## Context


We will be using [Dask](https://docs.dask.org/) with [Xarray](https://docs.xarray.dev/en/stable/) to parallelize our data analysis. The analysis is very similar to what we have done in previous episodes but this time we will use data on a global coverage that we read from a shared catalog (stored online in the Pangeo EOSC Openstack Object Storage).

### Data

In this episode, we will be using Global Long Term Statistics (1999-2019) product provided by the [Copernicus Global Land Service over Lombardia](https://land.copernicus.eu/global/index.html) and access them through [S3-comptabile storage](https://en.wikipedia.org/wiki/Amazon_S3) ([OpenStack Object Storage "Swift"](https://wiki.openstack.org/wiki/Swift)) with a data catalog we have created and made publicly available.

## Setup

This episode uses the following Python packages:

- pooch {cite:ps}`e-pooch-Uieda2020`
- s3fs {cite:ps}`e-s3fs-2016`
- xarray {cite:ps}`e-xarray-hoyer2017` with [`netCDF4`](https://pypi.org/project/h5netcdf/) and [`h5netcdf`](https://pypi.org/project/h5netcdf/) engines
- hvplot {cite:ps}`e-holoviews-rudiger2020`
- dask {cite:ps}`e-dask-2016`
- graphviz {cite:ps}`e-graphviz-Ellson2003`
- numpy {cite:ps}`e-numpy-harris2020`
- pandas {cite:ps}`e-pandas-reback2020`
- geopandas {cite:ps}`e-geopandas-jordahl2020`

Please install these packages if not already available in your Python environment (you might want to take a look at [the Setup page of the tutorial](https://pangeo-data.github.io/foss4g-2022/before/setup.html)).
### Packages

In this episode, Python packages are imported when we start to use them. However, for best software practices, we recommend you to install and import all the necessary libraries at the top of your Jupyter notebook.

## Parallelize with Dask

We know that chunking is key for analyzing large datasets and in this episode, we will learn to parallelize our data analysis using Dask thanks to this chunking.

### What is Dask?

**Dask** accelerates the existing Python ecosystem: with very or no changes in your code, you can speed-up computation using Dask.

- Dask is a flexible library for parallel computing in Python.
- It is widely used for getting the necessary performance when handling large and complex Earth Science datasets.
- Dask is powerful, scalable and flexible. It is the leading platform today for analytics.
- It scales natively to clusters, cloud and bridges prototyping up to production.
- The strength of Dask is that is accelerates the existing Python ecosystem e.g. Numpy, Pandas and Scikit-learn with few effort from end-users.

It is interesting to note that at first, Dask has been created to handle data that is larger than memory, on a single computer. It then was extended with Distributed to compute data in parallel over cluster of computers.

#### How does Dask accelerate your data analysis?

- Dask chunks your big datasets and this is how we can easily parallelize and scale.

For instance, dask can chunk a large numpy array into smaller ones and compute each chunk independently.

![Dask and Numpy](https://examples.dask.org/_images/dask-array-black-text.svg)

`Xarray` uses Dask Arrays instead of Numpy when chunking is enabled, and thus all Xarray operations are distributed through Dask.

#### How does Xarray with dask distribute data analysis?

When we use chunks with `Xarray`, the real computation is only done when needed; for instance when invoking `compute()` function. Dask generates a **task graph** describing the computation to be done and a **scheduler** executes these tasks across several **workers**.

![Xarray with dask](../figures/dask-xarray-explained.png)

:::{tip}
A Dask client can also be created on a single machine (for instance your laptop) e.g. there is no need to have dedicated computational resources. However, speedup will only be limited to your single machine resources if you do not have dedicated computational resources!
:::

## Set up a local Dask 

There are different methods to use Dask depending on the underlying infrastructure. For this workshop according to the Pangeo EOSC deployment, you will learn how to set up Dask gateways to manage Dask clusters and run our data analysis in parallel e.g. distribute tasks across several workers.

However, you do not always need to access a multi-node Dask cluster. It is very handy to prototype and/or run data analysis on your own laptop, or a small server. Let's keep it simple for now and learn how to create a local dask cluster to distribute the work.

### Create a local dask cluster
 
The Dask client is what allows you to interact with Dask. 
The Client will create the Directed Acyclic Graph (DAG) of tasks by analysing the code, and will be responsible for telling the scheduler what to compute. It will also gather results from the workers and aggregates the results in the Client process.

With no argument to `Client()` function, you create a local dask cluster with a number of workers and threads per worker corresponding to the number of cores in the local machine. Here, we are running this notebook in the cloud, so the number of cores is the number of cores on the cloud computing resource (not on your laptop).

In [None]:
from distributed import Client

client = Client()   # create a local dask cluster on the local machine.
client

Inspecting the `Cluster Info` section gives us information about the created cluster: we have 4 workers and a total of 4 threads (e.g. 1 thread per worker). you can use `n_workers` and `threads_per_worker` whenvever you want to creat a local dask cluster with less workers and threads than on the local machine. For instance, we could use `n_workers=2` and `threads_per_worker=2` (the total number of threads would still be 4 but each worker would have 2 threads). This is sometimes preferable (in terms of performance) but out of scope.

## Open a single file for parallel processing

We will first open a single file: we use the same syntax as earlier but this time, we pass the additional parameter `chunks` to explicitely define how the chunking (and then parallel computing) needs to be done. 
- `-1` for time means the dataset is loaded with dask using a single chunk for all arrays;
- `auto` will use dask auto chunking taking into account the engine preferred chunks.

In [None]:
import xarray as xr
import s3fs

In [None]:
fs = s3fs.S3FileSystem(anon=True,
      client_kwargs={
         'endpoint_url': 'https://object-store.cloud.muni.cz'
      })

In [None]:
%%time
s3path = 's3://foss4g-data/CGLS_LTS_1999_2019/c_gls_NDVI-LTS_1999-2019-1221_GLOBE_VGT-PROBAV_V3.0.1.nc'
LTS = xr.open_dataset(fs.open(s3path), chunks={'time':-1, 'lat':'auto', 'lon':5000})
LTS

## Select a single location and visualize the task graph 

In [None]:
save=LTS.sel(lat=45.50, lon=9.36, method='nearest')['min']
save.data.visualize()

If you look onto the task graph, you can see that only one dask worker needs to read the data (only one chunk needs to be read). So most dask workers are doing some useless reads of data.
To avoid unecessary operations, we optimize the task graph using `optimize`, and verify the graph.

## Optimize the task graph

In [None]:
import dask

In [None]:
(save,) = dask.optimize(save)
save.data.visualize()

## Compute on the dask workers

In [None]:
save.compute()

### Close client to terminate local dask cluster

The client will be automatically closed when your Python session ends. When using Jupyter notebooks, we recommend to close it explicitely whenever you are done with your local dask cluster.

In [None]:
client.close()

## Set up Dask Gateway

When we want to scale out our data analysis and cannot only use the local machine, we need to be able to access a multi-node Dask cluster. 
On the EOSC Pangeo infrastructure, we can use Dask gateways to manage Dask clusters and run our data analysis in parallel e.g. distribute tasks across several workers.

In [None]:
from dask_gateway import Gateway, BasicAuth
gateway = Gateway(
    "http://api-daskhub-dask-gateway.daskhub:8000/",
    auth = BasicAuth(password="pangeo_dask")
)

### Create a new Dask cluster on the Dask gateway

In [None]:
cluster = gateway.new_cluster()
cluster.scale(4)
cluster

## Get a client from the Dask Gateway Cluster

The Dask client is what allows you to interact with Dask. The Client will create the Directed Acyclic Graph (DAG) of tasks by analysing the code, and will be responsible for telling the scheduler what to compute. It will also gather results from the workers and aggregates the results in the Client process.

In [1]:
cluster = None # This is needed for building the Jupyter Book

In [None]:
from distributed import Client

if cluster:
    client = Client(cluster) # create a dask Gateway cluster
else:
    client = Client()   # create a local dask cluster on the machine.
client

## Global LTS

In the previous episode, we used Long-term Timeseries for the region of Lombardy e.g. a very small area. Now we would like to use the original dataset that has a global coverage.

## Read from online kerchunked consolidated dataset

The kerchunk generated Json file representing our dataset can be shared on cloud and loaded from there too. We will access Long Term TimeSeries of NDVI statistics from OpenStack Object Storage using the Zarr metadata generated with kerchunk (see previous episode on chunking).

In [None]:
import pandas as pd
import numpy as np

In [None]:
catalogue="https://object-store.cloud.muni.cz/swift/v1/foss4g-catalogue/c_gls_NDVI-LTS_1999-2019.json"
LTS = xr.open_mfdataset(
    "reference://", engine="zarr",
    backend_kwargs={
        "storage_options": {
            "fo":catalogue
                    },
        "consolidated": False
    }
)
LTS

### Fix time coordinate

In [None]:
dates_2022 = pd.date_range('20220101', '20221231')
decadie = dates_2022[np.isin(dates_2022.day, [1,11,21])]
LTS = LTS.assign_coords(time=decadie)
LTS

## Clip LTS over Lombardia

As in previous episodes, we use a shapefile over Italy to select data over this Area of Interest (AOI).

In [None]:
import geopandas as gpd

In [None]:
try:
    GAUL = gpd.read_file('Italy.geojson')
except:
    GAUL = gpd.read_file('zip+https://mars.jrc.ec.europa.eu/asap/files/gaul1_asap.zip') 

In [None]:
AOI_name = 'Lombardia'
AOI = GAUL[GAUL.name1 == AOI_name]
AOI_poly = AOI.geometry
AOI_poly

We first select a geographical area that covers Lombardia and then clip using the shapefile.

In [None]:
LTS = LTS.sel(lat=slice(46.5,44.5), lon=slice(8.5,11.5))

In [None]:
LTS = LTS.rio.write_crs(4326)

In [None]:
LTS = LTS.rio.clip(AOI_poly, crs=4326)

### Print metadata
We can print metadata without performing any computation yet. This is what we call *lazy* computation.

In [None]:
LTS

## Compute

### Install additional packages in dask workers

In [None]:
from distributed.diagnostics.plugin import PipInstall
extra_packages=["xarray", "netCDF4", "s3fs", "h5netcdf", "numpy", "zip", "pandas", "geopandas", "rioxarray", "rasterio", "scipy", "zarr"]

plugin=PipInstall(extra_packages,restart=True)
client.register_worker_plugin(plugin)

Verify the installation of package on dask worker.

In [None]:
client.get_versions(packages=extra_packages,check=True)

In [None]:
%%time
LTS_min = LTS['min']
(LTS_min,)=dask.optimize(LTS_min)
LTS_min.data.visualize()

In [None]:
%%time
LTS_min.compute()

In [None]:
%%time
LTS_max = LTS['max']
(LTS_max,)=dask.optimize(LTS_max)
LTS_max.data.visualize()

In [None]:
%%time
LTS_max.compute()

## Get NDVI for 2022 over Lombardia

We re-use the file we created during the first episode. If the file is missing it will be downloaded from Zenodo.

In [None]:
import pooch
try:
    cgls_ds = xr.open_dataset('C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc')
except:
    cgls_file = pooch.retrieve(
        url="https://zenodo.org/record/6969999/files/C_GLS_NDVI_20220101_20220701_Lombardia_S3_2_masked.nc",
        known_hash="md5:be3f16913ebbdb4e7af227f971007b22",
        path=f".",)    
    cgls_ds = xr.open_dataset(cgls_file)

In [None]:
cgls_ds

In [None]:
NDVI_AOI = cgls_ds.NDVI.rio.write_crs(4326)

In [None]:
NDVI_AOI = NDVI_AOI.rio.clip(AOI_poly, crs=4326)

In [None]:
NDVI_AOI

The nominal spatial resolution of the Long term statistics is 1km. As the current NDVI product has a nominal spatial resolution of 300m a re projection is needed. RioXarray through RasterIO that wraps the GDAL method can take care of this. More info about all the options can be found [here](https://rasterio.readthedocs.io/en/stable/api/rasterio.warp.html#rasterio.warp.reproject).

In [None]:
NDVI_1k = NDVI_AOI.rio.reproject_match(LTS)

In [None]:
NDVI_1k = NDVI_1k.rename({'x': 'lon', 'y':'lat'})

In [None]:
VCI = ((NDVI_1k - LTS['min']) / (LTS['max'] - LTS['min'])) * 100

In [None]:
VCI

In [None]:
VCI.name = 'VCI'

In [None]:
VCI

In [None]:
%%time
VCI_c = VCI.compute()

In [None]:
VCI.isel(time=-1).plot()

Now you have catalogue, original data source, both on cloud space, thus even from dask workers which do not have access to your NFS local disk space, data are accessible.
Now you are ready to parallelize your analysis using dask workers from dask gateway!

In [None]:
client.close()

In [None]:
cluster.shutdown()

## Packages citation

```{bibliography}
:style: alpha
:filter: topic % "dask" and topic % "package"
:keyprefix: e-
```