<center> <b> EXPERIMENTAL VERSION, NOT TESTED ON CLUSTER <b> </center>

---

# Climate analysis using structured data - HPC4DS

<picture>
  <source media="(prefers-color-scheme: dark)" srcset="https://docs.dask.org/en/stable/_static/images/dask-horizontal-white.svg">
  <img alt="dask'" src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="150">
</picture>

Student:
- Laiti Francesco
- Lobba Davide

---

This notebook contains the source code to read, process and write netCDF data using Dask framework to exploit parallel computational power of the [HPC@UniTN](https://sites.google.com/unitn.it/hpc/) cluster.

> The notebook has been tested on Python 3.7.7 using conda environment.

### Initialization

In this section we define the requirements to run correctly the notebook.

#### Import libraries

We import the necessary Python libraries.

In [None]:
import netCDF4 as nc
import numpy as np

#### Declare global constants

We declare the global constants used in this notebook.

In [None]:
# paths
INPUT_FILE = '/shares/HPC4DataScience/pta/CMCC-CM2-SR5_historical/pr_day_CMCC-CM2-SR5_historical_r1i1p1f1_gn_20000101-20141231.nc'
OUTPUT_FILE = 'pr_reduce.nc'

# specific data of our problem
NLAT = 192
NLON = 288
START_LAT = -90.0
START_LON = 0.0
LAT_NAME = "lat"
LON_NAME = "lon"
TIME_NAME = "time"
PR_NAME = "pr"
DEGREES_EAST = "degrees_east"
DEGREES_NORTH = "degrees_north"
LAT_UNITS = "degrees_north"
LON_UNITS = "degrees_east"
UNITS = "units"
PR_UNITS = "kg m-2 s-1"

# PBS confg
QUEUE = 'short_cpuQ'
WALLTIME = '00:02:00'
CORES = 2 # total cores per job
MEMORY = '2GB' # total RAM memory per job
RESOURCE_SPEC = 'select=1:ncpus=2:mem=2gb'
PORT_SERVER = 33567

### Setup PBS configuration

Check using `qstat` if your job has been submitted. If you cannot see it, open the error output file. In my case there was an error `ImportError: cannot import name '_unicodefun' from 'click'`. To solve it install the last version of `clock` by running `pip install --upgrade click==8.0.2` from the terminal

In [None]:
from dask_jobqueue import PBSCluster
cluster = PBSCluster(cores = CORES, 
                     memory = MEMORY, 
                     resource_spec = RESOURCE_SPEC, 
                     queue = QUEUE, 
                     walltime = WALLTIME,
                     scheduler_options={"dashboard_address": f":{PORT_SERVER}"}) # default port is already used by someone else on the cluster
cluster.scale(jobs=5)

from dask.distributed import Client
client = Client(cluster)

In [None]:
cluster.get_logs()

### Get info of netCDF data

In [None]:
ds = nc.Dataset(INPUT_FILE)
nrecord = ds.dimensions[TIME_NAME].size

print(f'--- INFO: found dim = {len(ds.dimensions)} and nrecord = {nrecord} ---')

### Reading step

In [None]:
# useful for the writing part
lats = np.array(ds.variables[LAT_NAME])
lons = np.array(ds.variables[LON_NAME])

In [None]:
import dask.array as da
pr_in = da.from_array(ds.variables[PR_NAME]) # chunks='auto' is enabled by default
pr_out = da.mean(pr_in, axis=0)
# pr_out.visualize()
%time pr_out = pr_out.compute()

In [None]:
print(pr_in.shape)
print(pr_out.shape)

Close the netCDF dataset to use, just to be safe that the variable is not assigned to an open dataset.

In [None]:
try: ds.close()
except: print(f"Error while closing the {INPUT_FILE}")

### Writing step

Reference: https://unidata.github.io/python-training/workshop/Bonus/netcdf-writing/

In [None]:
ds = nc.Dataset(OUTPUT_FILE, mode='w', format='NETCDF4_CLASSIC')

lat_dim = ds.createDimension(LAT_NAME, NLAT)
lon_dim = ds.createDimension(LON_NAME, NLON)
# time_dim = ds.createDimension(TIME_NAME, nrecord)

ds.title = "Average along time axis of file " + INPUT_FILE

lat = ds.createVariable(LAT_NAME, np.float32, (LAT_NAME,))
lat.units = DEGREES_NORTH
lat.long_name = 'latitude'

lon = ds.createVariable(LON_NAME, np.float32, (LON_NAME,))
lon.units = DEGREES_EAST
lon.long_name = 'longitude'

prec = ds.createVariable(PR_NAME, np.float32, (LAT_NAME, LON_NAME))
prec.units = PR_UNITS
prec.long_name = 'precipitation'
# time = ds.createVariable('time', np.float64, ('time',))
# time.units = 'hours since 1800-01-01'
# time.long_name = 'time'

In [None]:
nlats = len(lat_dim); nlons = len(lon_dim)

lat = lats
lon = lons
prec = pr_out

print("-- Wrote data, prec.shape is now ", prec.shape)

### Final step

In [None]:
print(ds)

ds.close()
client.shutdown()
cluster.close()