# Load Input Data in Parallel with Dask and UXarray 

## Overview

This usage example demonstrates how to load unstructured input data with UXarray and Dask to minimize memory. Loading in parallel and chunking and their respective performances are also showcased.



In [None]:
import uxarray as ux
from dask.distributed import Client, LocalCluster
import xarray as xr
import warnings

warnings.filterwarnings("ignore")

### Data

Data loaded in this notebook is the simulated output from the Department of Energy (DOE) Energy Exascale Earth System Model (E3SM) version 2. The case is set up as an atmosphere-only (AMIP) simulation with present-day control forcing (F2010) at a 1-degree horizontal resolution (ne30pg2), where sea surface temperatures and sea ice set as default as in the E3SMv2 model. The case is run for 6 years.

## Chunking

Chunks, which are small pieces of the array of interest, can be divided with Dask to be small enough to fit in memory. 

UXarray inherited the chunking feature from Dask, where the chunks of the data can be specified when loading. 

### Loading Data with Chunking

The following example demonstrates loading one monthly output from E3SM. By supplying the `chunks` argument, the data loaded will be split in the way as specified in the given dictionary. In the following example, the data is split by the vertical levels in the atmosphere `vert`, as specified in the dictionary `{'vert'=4}`.


In [None]:
data_file_monthonly = "/glade/campaign/cisl/vast/uxarray/data/e3sm_keeling/ENSO_ctl_1std/unstructured/20231220.F2010.ENSO_ctl.lagreg.ne30pg2_EC30to60E2r2.keeling.eam.h0.0006-12.nc"
grid_file = (
    "/glade/campaign/cisl/vast/uxarray/data/e3sm_keeling/E3SM_grid/ne30pg2_grd.nc"
)
uxds_e3sm_mon = ux.open_dataset(grid_file, data_file_monthonly, chunks={"lev": 4})

Now look at one of the data arrays in the loaded dataset. 

By calling one of the variables `Q` - specific humidity, we can look at the data array dimensions. The full data array has 1 point in time, 72 vertical levels, and a total of 21600 faces in the simulation grid, corresponding to the single monthly we loaded, and the info shown below: `time: 1, lev: 72, n_face: 21600`. 

The chunk size is also shown in the second line, where they contain 4 vertical levels instead of 72 (see `chunksize=(1,4,21600)`), proving we have successfully chunked the data.

In [None]:
uxds_e3sm_mon.Q

UXarray also supports the same feature when loading multiple files at once with `open_mfdataset` and the same argument `chunks` as shown above. 

Chunk size is important as it can be significant to performance, depending on the algorithm and usage. There are multiple possible configurations for chunking, such as splitting by uniform dimension size, specific chunk shape. Chunking can also be done using the default chunking, or with the automatic chunking feature as specified in Dask with the special values `-1` for no chunking, `None` for no change in original chunking (in rechunking), and `auto` for automatic chunking to best fit the default ideal chunk size, which by default in Dask is 128MB. 

More details on the possible configurations and guidelines on deciding chunk size can be found on [Dask's Page about Array Chunks](https://docs.dask.org/en/stable/array-chunks.html).

## Loading Data with the `parallel` argument

Similar to Xarray, UXarray also supports loading data in parallel. Performance may not be significant due to the chosen dataset for this notebook; and Dask client configuration requires customization depending on the data. Loading data in parallel using Dask can be helpful where the dataset of interest does not fit in memory and/or executions are to be distributed over several CPU cores or machines independently. 

### Loading 6-year monthly data in serial 

In [None]:
%%time
# Regular Load
data_files = "/glade/campaign/cisl/vast/uxarray/data/e3sm_keeling/ENSO_ctl_1std/unstructured/*.nc"
uxds_e3sm_basic_load = ux.open_mfdataset(grid_file, data_files, parallel=False)

### Loading 6-year monthly data in Parallel 
The following code demonstrates setting up a local cluster with the use of 128 cores (`n_workers`), with 2 jobs (`threads_per_worker`) for each core. Using a local cluster allows multi-process computation on your local machine (e.g. laptop) and provides a diagnostic dashboard for monitoring process performances. 

In [None]:
cluster = LocalCluster(n_workers=128, threads_per_worker=2)
client = Client(cluster)
client

In [None]:
%%time
# Parallel load
uxds_e3sm_parallel_load = ux.open_mfdataset(grid_file, data_files, parallel=True)

In [None]:
uxds_e3sm_parallel_load

Loading the data in parallel results in a 1.5x speed up. 

After all computations are done, it is of best practice to explicitly clean all dask workers and scheduler up by shutting down the cluster. 

In [10]:
client.shutdown()