### Example notebook for remote Dask execution on Zeus

#### Prerequisite: prepare the environment 

1. Connect to the Zeus VPN using the VPN client, e.g. TunnelBlick

2. Add your local public key in the .ssh/authorized_keys on your Zeus account

3. Copy the zeus_util.py script in the same directory of this notebook. The module requires *IPython*, *ipywidgets*, *Dask* and *Dask.distributed* Python libraries, as well as Xarray and Cartopy for the map. If not already available, these can installed with one of the following commands:

`pip3 install IPython ipywidgets dask==2.12.0 distributed==2.12.0 xarray==0.15.0 cartopy`

`conda install -c conda-forge IPython ipywidgets dask==2.12.0 distributed==2.12.0 xarray==0.15.0 cartopy`

Alternatively create the full conda environment starting from the requirements listed in *env_ds.yml* by running the following commands:

`conda env create -f env_ds.yml`

`source activate data-science-cmcc-v1-local`

Import Python utilities and setup Zeus username (replace YOURUSERNAME with your account)

In [3]:
import os, sys
sys.path.append("../")
import zeus
zeus.init(user="tl28319")

ssh_askpass: exec(/usr/X11R6/bin/ssh-askpass): No such file or directory
Permission denied, please try again.
ssh_askpass: exec(/usr/X11R6/bin/ssh-askpass): No such file or directory
Permission denied, please try again.
ssh_askpass: exec(/usr/X11R6/bin/ssh-askpass): No such file or directory
tl28319@192.168.118.11: Permission denied (publickey,gssapi-keyex,gssapi-with-mic,password).


UnboundLocalError: local variable 'res' referenced before assignment

Check the function documentation

In [None]:
help(zeus.start_dask)

Start a new Dask cluster on Zeus with 1 worker nodes (*n_workers = 1*)

In [None]:
from distributed import Client, get_client
client = zeus.start_dask(name="Test",
              project="R000",
              cores=36,
              processes=12,
              memory="50 GB",
              interface="ib0",
              walltime="00:30", 
              job_extra=["-x"], 
              n_workers=1
             )

Check if it the cluster has been allocated on LSF (some time is required before LSF provides the resources; try to run the cell again after a few seconds). When available click on the **Dashboard** link to open the Dask monitoring interface.

In [None]:
client

Let's load the input dataset in parallel using Xarray and perform the average of *tasmax* over the time dimension. The dataset concerns the *tasmax* variable and consists of 156 files (25GB), spanning the whole historical period from 1850 to 2005.

In [None]:
def mf_compute(n):
    import xarray as xr
    client = get_client()
    multi_file_dataset = xr.open_mfdataset(
        '/work/asc/ophidia/demo/cmip5/CMCC-CM/historical/day/tasmax/tasmax_day_CMCC-CM_historical_r1i1p1_*.nc',
        combine='by_coords', concat_dim="time", parallel=True
    )
    return multi_file_dataset

future = client.submit(mf_compute, 1)
multi_file_dataset = future.result()
mean = multi_file_dataset['tasmax'].mean(dim='time')
result = mean.compute()

Show the computation result

In [None]:
print(result)

Create a map of the average temperature computed using Cartopy.

In [None]:
%matplotlib inline
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
from cartopy.mpl.geoaxes import GeoAxes
from cartopy.util import add_cyclic_point
import numpy as np
import warnings
warnings.filterwarnings("ignore")

lat = result['lat'].values
lon = result['lon'].values
var = result.values

fig = plt.figure(figsize=(15, 6), dpi=100)

#Add Geo axes to the figure with the specified projection (PlateCarree)
projection = ccrs.PlateCarree()
ax = plt.axes(projection=projection)

#Draw coastline and gridlines
ax.coastlines()
gl = ax.gridlines(crs=projection, draw_labels=True, linewidth=1, color='black', alpha=0.9, linestyle=':')
gl.xlabels_top = False
gl.ylabels_right = False

var = np.reshape(var, (len(lat), len(lon)))

#Wraparound points in longitude
var_cyclic, lon_cyclic = add_cyclic_point(var, coord=lon)
x, y = np.meshgrid(lon_cyclic,lat)

#Define color levels for color bar
levStep = (np.max(var)-np.min(var))/20
clevs = np.arange(np.min(var),np.max(var)+levStep,levStep)

#Set filled contour plot
cnplot = ax.contourf(x, y, var_cyclic, clevs, transform=projection,cmap=plt.cm.jet)
plt.colorbar(cnplot,ax=ax)

ax.set_aspect('auto', adjustable=None)
plt.title('1850-2005 tasmax average (deg K)')
plt.show()

Shutdown the cluster and release the resources on LSF. Note sometimes warnings might be raised by the function due to synchronization effects.

In [None]:
zeus.stop_dask(client)