<img src="https://gateway.dask.org/_static/images/dask-horizontal-white.svg"
     alt="Dask Logo"
     style="margin-right: 10px; width: 50%" />
# Distributed computing with Dask

EODC offers Dask as service by utilising [Dask Gateway](https://gateway.dask.org/). User can launch a Dask cluster in a shared and managed cluster environment without requring to have direct access to any cloud infrastructure resources such as VMs or Kubernetes clusters. The objetive is to lower the entrance barrier for users to run large scale data analysis on demand and in a scaleable environment.

An generic introduction of the usage of Dask Gateway can be found on the official [Dask Gateway documentation](https://gateway.dask.org/usage.html). In the following we will demonstrate the use of the Dask service at EODC to further support users.

Pre-requisit is to have Dask Gateway installed in your environment
```bash
pip install dask-gateway
```
or 
```bash
conda install -c conda-forge dask-gateway
```

It is important to note that the Python environment running the code and the environment utilised by Dask Gateway have to be almost identical.

We will install some additional packages used in this demo afterwards.

## Authentication via OIDC password grant flow
Only authenticated access is granted to the EODC Dask service, therefore a helper class to authenticate a user against the EODC identifiy managment system is implemented in the [EODC SDK](https://github.com/eodcgmbh/eodc-sdk).
The users password is directly handed over to the request object and is not stored.
Refreshed token is used to request a new access token in case it is expired, which is handled automatically in the authenticator.

## Connect to EODC Dask

Authenticating and connecting to EODC Dask can be done with a few lines of Python code.

Run the following in order to make sure all dependencies are met.

In [6]:
from eodc.dask import EODCDaskGateway
from rich.console import Console
from rich.prompt import Prompt
console = Console()
your_username = Prompt.ask(prompt="Enter your Username")
gateway = EODCDaskGateway(username=your_username)

KeyError: 'access_token'

## Change Cluster configuration if needed

In [None]:
cluster_options = gateway.cluster_options()
cluster_options

VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…

Options<worker_cores=2,
        worker_memory=2.0,
        image='registry.eodc.eu/eodc/clusters/dedl-deployment/dedl-dask:2023.08.3'>


## Create a Dask Cluster

Now we are going to create a Dask Cluster in order to run compute jobs.
To communicate with the cluster we have to instantiate a client as well.
Per default, no worker nodes are spawned, but this can be done either manually or even by enabling adaptive scaling of the cluster.

**Important: Please use the widget to add/scale the Dask workers. Per default no worker is spawned, therefore no computations can be performed by the cluster.**

In [None]:
cluster = gateway.new_cluster(cluster_options)
client = cluster.get_client()
cluster

KeyboardInterrupt: 

If you want to spawn a workers directly via Python adaptively please use the following method call. With the following the cluster will be scaled to 2 workers initially.
Depending on the load, Dask will add addtional workers, up to 5, if needed.

In [None]:
cluster.adapt(minimum=2, maximum=5)

## List clusters if available

In [None]:
console.print(gateway.list_clusters())

We can connect to already running clusters again.

In [None]:
cluster = gateway.connect(gateway.list_clusters()[0].name)
console.print(cluster)

## Display Dask Dashboard to monitor execution of computations
Copy the following link into a browser of your choice. Please consider the dashboard url provided is making use of http and not https.

In [None]:
cluster.dashboard_link

In [None]:
import s3fs
import xarray as xr

s3fs_central = s3fs.S3FileSystem(
    anon=True,
    use_ssl=True,
    client_kwargs={"endpoint_url": "https://s3.central.data.destination-earth.eu"})

s3fs_lumi = s3fs.S3FileSystem(
    anon=True,
    use_ssl=True,
    client_kwargs={"endpoint_url": "https://s3.lumi.data.destination-earth.eu"})

In [None]:
s3fs_central.ls("increment1-testdata")

Read data stored in S3 bucket at central site (Poland).
The data we want to read is a single Zarr data store representing GFM flood data over Pakistan for 2022-08-30

In [None]:
flood_map = xr.open_zarr(store=s3fs.S3Map(root=f"increment1-testdata/2022-08-30.zarr", s3=s3fs_central, check=False),
                         decode_coords="all",)["flood"].assign_attrs(location="central", resolution=20)
flood_map

Run simple computation and compute the flooded area

In [None]:
flooded_area_ = flood_map.sum()*20*20/1000.
flooded_area_

So far we haven't computed anything, so lets do the computation now on the Dask cluster.

In [None]:
flooded_area = client.compute(flooded_area_, sync=True)
console.print(f"Flooded area: {flooded_area.data}km2")

Read data stored in S3 bucket at LUMI bridge (Finland).
Data we want to read is a datacube generated from ERA-5 representing predicted rainfall data.

In [None]:
rainfall = xr.open_zarr(store=s3fs.S3Map(root=f"increment1-testdata/predicted_rainfall.zarr",
                                         s3=s3fs_lumi,
                                         check=False),
                        decode_coords="all",)["tp"].assign_attrs(location="lumi", resolution=20)
rainfall

In [None]:
from datetime import datetime
from attr import dataclass

def accum_rain_predictions(rain_data, startdate, enddate, extent):
    rain_ = rain_data.sel(time=slice(startdate, enddate),
                          latitude=slice(extent.max_y, extent.min_y),
                          longitude=slice(extent.min_x, extent.max_x))
    return rain_.cumsum(dim="time", keep_attrs=True)*1000

@dataclass
class Extent:
    min_x: float
    min_y: float
    max_x: float
    max_y: float
    crs: str

# compute accumulated rainfall over Pakistan
roi_extent = Extent(65, 21, 71, 31, crs='EPSG:4326')
acc_rain_ = accum_rain_predictions(rainfall, startdate=datetime(2022, 8, 18),
                                             enddate=datetime(2022, 8, 30),
                                             extent=roi_extent)

# compute average rainfall for August 2022
rain_ = rainfall.sel(time=slice(datetime(2022, 8, 1), datetime(2022, 8, 30))).mean(dim="time", keep_attrs=True)*1000
rain_

And again run the computation on our EODC Dask cluster.
First we compute the accumulated rainfall over Pakistan.
Secondly we compute the average rainfall for August 2022 (monthly mean) at global scale.

In [None]:
acc_rain = client.compute(acc_rain_, sync=True)
acc_rain
mean_rain = client.compute(rain_, sync=True)
mean_rain

Plot a histogram of the accumlated rainfall computed for Pakistan.

In [None]:
acc_rain.plot()

In [None]:
cluster.close(shutdown=True)