# Thinking through CESM data access

We want to read a large number of netCDF files, combine them to form a single dataset, and then analyze that. How do we think about it?

In pseudocode we want
```python
# loop over every file and read in metadata
datasets = [xr.open_dataset(file) for file in files]
# optionally make modifications
preprocessed = [preprocess(dataset) for dataset in datasets]
# combine to create a single dataset
combined = xr.combine_XXX(preprocessed, ...)
```

Xarray's [`open_mfdataset`](https://docs.xarray.dev/en/stable/generated/xarray.open_mfdataset.html#xarray.open_mfdataset) implements this pattern with the option of parallelizing the loop over all files using `dask`. This can be quite handy.

## Creating a new data pipeline

### First create a list of files

The `glob` package is good for this ([docs](https://docs.python.org/3/library/glob.html))

> The glob module finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order. No tilde expansion is done, but *, ?, and character ranges expressed with [] will be correctly matched. 

```{important}
-  The tilde `~` is not expanded to the user's home directory. Use [`os.path.expanduser`](https://docs.python.org/3/library/os.path.html#os.path.expanduser) for that.
- The list of files returned by `glob` is not sorted ! Use [`sorted`](https://docs.python.org/3/library/functions.html#sorted) to sort the list.
```

Here's a list of files from the [CESM2 Large Ensemble](https://www.cesm.ucar.edu/community-projects/lens2): these are decadal timeseries files with daily precipitation output for years 1850-2100, and 50 ensemble members.

In [7]:
import glob

files = glob.glob(
    #"/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/*smbb**h1**18500101-21001231*"
    "/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/*smbb*"
)
files[:10] # list the first 10 files

['/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.019.cam.h1.PRECT.18500101-18591231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.19800101-19891231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BSSP370smbb.f09_g17.LE2-1301.018.cam.h1.PRECT.20150101-20241231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BSSP370smbb.f09_g17.LE2-1131.007.cam.h1.PRECT.20250101-20341231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.013.cam.h1.PRECT.19200101-19291231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BSSP370smbb.f09_g17.LE2-1231.019.cam.h1.PRECT.20450101-20541231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.

In [26]:
sorted(files)

['/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18500101-18591231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18600101-18691231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18700101-18791231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18800101-18891231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18900101-18991231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.19000101-19091231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-10

There are 1300 files, 26 decades per ensemble member (1300/26 = 50)

In [9]:
len(files)

1300

### Start by opening a single file

In [10]:
import xarray as xr

single = xr.open_dataset(files[0])
single

First check data size

In [11]:
single.nbytes / 1e9  # approx GB

0.807722764

Each single file is 0.8GB and we have 1300 of them, so approximately a terabyte in total. We will have to use dask.

That means we need to make chunking decisions.

Later on, we will extract time series at a single point, so let's chunk in space, and choosing chunksizes for the data variable `PRECT`.

Start by looking at dimension names for `PRECT`

In [12]:
single.PRECT

### Choosing a chunk size

This is a timeseries file with daily average output using the `noleap` calendar.

We will need to concatenante along two dimensions: `time` along decades, and ensemble members along a new dimension `"ensemble"`. Today, we *cannot* create an xarray dataset with chunksizes that span files. In other words, because there is one file per ensemble member, and we are concatenating ensemble members along a new dimension, the chunksize for the new dimension **will** be one. It is possible to rechunk later, but that will involve expensive communication that is best to avoid unless you really need to do so.

We *could* chunk along space because we want to plot time series at a single point later. After some experimenting we choose a size of 64 along `lat`, 128 along `lon`, and all timesteps in a single chunk, for a chunksize of ~114MB.

```{tip}
Many other chunking choices are possible, it all depends on what you want to do later.  For example we could have bigger spatial chunks, and smaller chunks along time. Here is some reading material on chunking/xarray/dask:
- [dask docs on best practices](https://docs.dask.org/en/stable/array-best-practices.html#select-a-good-chunk-size)
- [xarray docs](https://docs.xarray.dev/en/stable/user-guide/dask.html#optimization-tips)
- [dask docs on chunking](https://docs.dask.org/en/latest/array-chunks.html)
- [dask blog](https://blog.dask.org/2020/07/30/beginners-config)
```

> When choosing the size of chunks it is best to make them neither too small, nor too big (around 100MB is often reasonable). Each chunk needs to be able to fit into the worker memory and operations on that chunk should take some non-trivial amount of time (more than 100ms). For many more recommendations take a look at the docs on [chunks](https://docs.dask.org/en/latest/array-chunks.html)...




In [16]:
single.PRECT.chunk({"lat": 64, "lon": 128})

Unnamed: 0,Array,Chunk
Bytes,769.92 MiB,114.06 MiB
Shape,"(3650, 192, 288)","(3650, 64, 128)"
Dask graph,9 chunks in 2 graph layers,9 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 769.92 MiB 114.06 MiB Shape (3650, 192, 288) (3650, 64, 128) Dask graph 9 chunks in 2 graph layers Data type float32 numpy.ndarray",288  192  3650,

Unnamed: 0,Array,Chunk
Bytes,769.92 MiB,114.06 MiB
Shape,"(3650, 192, 288)","(3650, 64, 128)"
Dask graph,9 chunks in 2 graph layers,9 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


### Test with a small subset

Let's trying reading just 3 files to make sure the output looks as we expect.

We choose `combine="nested"` instead of `combine="by_coords"`. This will simply concatenate the files in the order provided. The term "nested" is used because it can accept a nested list-of-lists as input and concatenate along multiple dimensions. In contrast, `by_coords` will look at coordinate location values and make decisions about which dimensions to concatenate along. This can sometimes backfire, and it is almost always better to be explicit by providing the files in the right order, and specify `combine="nested"`.


In [21]:
sorted(files)[:30]

['/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18500101-18591231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18600101-18691231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18700101-18791231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18800101-18891231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18900101-18991231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.19000101-19091231.nc',
 '/glade/campaign/cgd/cesm/CESM2-LE/timeseries/atm/proc/tseries/day_1/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-10

In [25]:
xr.open_mfdataset(
    # make sure we sort
    sorted(files)[:30],
    # concatenate along a new dimension called "ensemble"
    concat_dim=["time", None, None, "ensemble"],
    # just concatenate them together
    combine="nested",
    chunks={"lat": 64, "lon": 128},
    parallel=True,
)

ValueError: concat_dims has length 4 but the datasets passed are nested in a 1-dimensional structure

Notice that *all* variables have been concatenated along the `ensemble` dimension even if we know it to be a constant: e.g. `P0`.


### Choosing combine options


Xarray has a number of options to control this concatenation behaviour. The [normal recommendation](https://docs.xarray.dev/en/stable/user-guide/io.html#reading-multi-file-datasets) is the hard-to-interpret sequence `data_vars="minimal", coords="minimal", compat="override"`. What does this mean?
1. `"minimal"` for `data_vars` and `coords` means only concatenate variables that have the concatenation dimension already.
2. For those variables without the concatenation dimension, xarray will look at the `compat` kwarg. For `compat="different"`, the default, Xarray will check for equality of the variable across all files. Those that are different get concatenated, those that are the same, are simply copied over. This can get quite expensive, so `compat="override"` allows you to skip equality checking and simply pick the variable from the first file. This is great for so-called 'static variables' such as grid variables that are invariant in time (and ensemble member).

Let's try that

In [8]:
combined = xr.open_mfdataset(
    # make sure we sort
    sorted(files[:3]),
    # concatenate along a new dimension called "ensemble"
    concat_dim="ensemble",
    chunks={"lat": 16, "lon": 32},
    data_vars="minimal",
    coords="minimal",
    compat="override",
    # just concatenate them together
    combine="nested",
    parallel=True,
)
combined

Unnamed: 0,Array,Chunk
Bytes,136.89 kiB,136.89 kiB
Shape,"(8761, 2)","(8761, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 136.89 kiB 136.89 kiB Shape (8761, 2) (8761, 2) Dask graph 1 chunks in 2 graph layers Data type object numpy.ndarray",2  8761,

Unnamed: 0,Array,Chunk
Bytes,136.89 kiB,136.89 kiB
Shape,"(8761, 2)","(8761, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,1.80 GiB,17.11 MiB
Shape,"(8761, 192, 288)","(8761, 16, 32)"
Dask graph,108 chunks in 2 graph layers,108 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 1.80 GiB 17.11 MiB Shape (8761, 192, 288) (8761, 16, 32) Dask graph 108 chunks in 2 graph layers Data type float32 numpy.ndarray",288  192  8761,

Unnamed: 0,Array,Chunk
Bytes,1.80 GiB,17.11 MiB
Shape,"(8761, 192, 288)","(8761, 16, 32)"
Dask graph,108 chunks in 2 graph layers,108 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Oops this doesn't work for us! We didn't concatenate `PRECT` along the new `ensemble` dimension.

In [9]:
combined.PRECT.dims

('time', 'lat', 'lon')

### Try preprocessing the dataset to make it work better

Our dataset doesn't really fit the assumptions of `open_mfdataset`. Luckily we can modify our datasets before the concatenation stage using the `preprocess` kwarg ([docs](https://docs.xarray.dev/en/stable/generated/xarray.open_mfdataset.html#xarray.open_mfdataset))
>  `preprocess`: If provided, call this function on each dataset prior to concatenation. You can find the file-name from which each dataset was loaded in ds.encoding["source"].

What we'll do is to add a new dimension `ensemble` to the `PRECT` variable using [`expand_dims`](https://docs.xarray.dev/en/stable/generated/xarray.DataArray.expand_dims.html#xarray.DataArray.expand_dims).

This makes it clear that only `PRECT` should be concatenated along the new `ensemble` dimension

In [10]:
def add_ensemble_dim(ds):
    ds["PRECT"] = ds.PRECT.expand_dims("ensemble")
    return ds


combined = xr.open_mfdataset(
    # make sure we sort
    sorted(files)[:3],
    # chunk the dataset from each file properly
    chunks={"lat": 16, "lon": 32},
    # concatenate along a new dimension called "ensemble"
    concat_dim="ensemble",
    data_vars="minimal",
    coords="minimal",
    compat="override",
    combine="nested",
    parallel=True,
    preprocess=add_ensemble_dim,
)
combined

Unnamed: 0,Array,Chunk
Bytes,136.89 kiB,136.89 kiB
Shape,"(8761, 2)","(8761, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 136.89 kiB 136.89 kiB Shape (8761, 2) (8761, 2) Dask graph 1 chunks in 2 graph layers Data type object numpy.ndarray",2  8761,

Unnamed: 0,Array,Chunk
Bytes,136.89 kiB,136.89 kiB
Shape,"(8761, 2)","(8761, 2)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,5.41 GiB,17.11 MiB
Shape,"(3, 8761, 192, 288)","(1, 8761, 16, 32)"
Dask graph,324 chunks in 10 graph layers,324 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.41 GiB 17.11 MiB Shape (3, 8761, 192, 288) (1, 8761, 16, 32) Dask graph 324 chunks in 10 graph layers Data type float32 numpy.ndarray",3  1  288  192  8761,

Unnamed: 0,Array,Chunk
Bytes,5.41 GiB,17.11 MiB
Shape,"(3, 8761, 192, 288)","(1, 8761, 16, 32)"
Dask graph,324 chunks in 10 graph layers,324 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Much better!

In [11]:
combined.PRECT

Unnamed: 0,Array,Chunk
Bytes,5.41 GiB,17.11 MiB
Shape,"(3, 8761, 192, 288)","(1, 8761, 16, 32)"
Dask graph,324 chunks in 10 graph layers,324 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 5.41 GiB 17.11 MiB Shape (3, 8761, 192, 288) (1, 8761, 16, 32) Dask graph 324 chunks in 10 graph layers Data type float32 numpy.ndarray",3  1  288  192  8761,

Unnamed: 0,Array,Chunk
Bytes,5.41 GiB,17.11 MiB
Shape,"(3, 8761, 192, 288)","(1, 8761, 16, 32)"
Dask graph,324 chunks in 10 graph layers,324 chunks in 10 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## Read and concatenate the whole dataset

### Create a dask cluster

We'll use an adaptive cluster to be polite.

The dask cluster helps by parallelizing the initial reading of every file.

In [12]:
import dask_jobqueue

cluster = dask_jobqueue.PBSCluster(
    cores=4,  # The number of cores you want
    memory="23GB",  # Amount of memory
    processes=1,  # How many processes
    queue="casper",  # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
    local_directory="/local_scratch/pbs.$PBS_JOBID/dask/spill",
    log_directory="/glade/scratch/dcherian/dask/",
    resource_spec="select=1:ncpus=4:mem=23GB",  # Specify resources
    project="ncgd0011",  # Input your project ID here
    walltime="02:00:00",  # Amount of wall time
    interface="ib0",  # Interface to use
)
# create an adaptive cluster with one job always requested,
# scale to a maximum of 6 jobs
# and hold on to each job for 600 seconds of idle time
cluster.adapt(minimum_jobs=1, maximum_jobs=6, wait_count=600)

import distributed

client = distributed.Client(cluster)

client

  from distributed.utils import tmpfile
Perhaps you already have a cluster running?
Hosting the HTTP server on port 45848 instead


0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.35:45783,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [13]:
import distributed

client = distributed.Client(cluster)

client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.35:45783,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status,Total threads: 0
Started: Just now,Total memory: 0 B


### Read

Now we can scale it up. 

We generalize a little by having `add_ensemble_dim` expand the dimensions of any variable with 3 or more dimensions.

In [14]:
def add_ensemble_dim(ds):
    # find all 3D variables
    names = [name for name, variable in ds.variables.items() if variable.ndim >= 3]
    # add a new dimension `ensemble` of size 1
    # and replace the existing 3D variables.
    ds = ds.update(ds[names].expand_dims("ensemble"))
    return ds


combined = xr.open_mfdataset(
    # make sure we sort
    sorted(files),
    # chunk each individual file
    chunks={"time": 365 * 5},
    # Add the ensemble dimension to 3D variables
    preprocess=add_ensemble_dim,
    # concatenate along a new dimension called "ensemble"
    concat_dim="ensemble",
    # only concatenate variables with the `ensemble` dimension.
    data_vars="minimal",
    coords="minimal",
    compat="override",
    combine="nested",
    # parallelize reading of each file using dask
    parallel=True,
)
combined

Unnamed: 0,Array,Chunk
Bytes,136.89 kiB,28.52 kiB
Shape,"(8761, 2)","(1825, 2)"
Dask graph,5 chunks in 2 graph layers,5 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray
"Array Chunk Bytes 136.89 kiB 28.52 kiB Shape (8761, 2) (1825, 2) Dask graph 5 chunks in 2 graph layers Data type object numpy.ndarray",2  8761,

Unnamed: 0,Array,Chunk
Bytes,136.89 kiB,28.52 kiB
Shape,"(8761, 2)","(1825, 2)"
Dask graph,5 chunks in 2 graph layers,5 chunks in 2 graph layers
Data type,object numpy.ndarray,object numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,90.24 GiB,384.96 MiB
Shape,"(50, 8761, 192, 288)","(1, 1825, 192, 288)"
Dask graph,250 chunks in 151 graph layers,250 chunks in 151 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 90.24 GiB 384.96 MiB Shape (50, 8761, 192, 288) (1, 1825, 192, 288) Dask graph 250 chunks in 151 graph layers Data type float32 numpy.ndarray",50  1  288  192  8761,

Unnamed: 0,Array,Chunk
Bytes,90.24 GiB,384.96 MiB
Shape,"(50, 8761, 192, 288)","(1, 1825, 192, 288)"
Dask graph,250 chunks in 151 graph layers,250 chunks in 151 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


## Note that on-disk chunking matters

Running
```
ncdump -sh /glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/b.e21.BHISTsmbb.f09_g17.LE2-1251.011.cam.h1.PRECT.18500101-21001231.nc
```
shows
```
float PRECT(time, lat, lon) ;
		PRECT:units = "m/s" ;
		PRECT:long_name = "Total (convective and large-scale) precipitation rate (liq + ice)" ;
		PRECT:cell_methods = "time: mean" ;
		PRECT:_Storage = "chunked" ;
		PRECT:_ChunkSizes = 1, 192, 288 ;
		PRECT:_DeflateLevel = 1 ;
		PRECT:_Shuffle = "true" ;
		PRECT:_Endianness = "little" ;
```

This bit is important: `PRECT:_ChunkSizes = 1, 192, 288 ;` The data on-disk is chunked to have a chunksize of 1 along time, and all spatial points in one chunk. This is orthogonal to our proposed chunking scheme of chunking small in space and big in time (`chunks={"lat": 16, "lon": 32}`).

Actually reading data with `chunks={"lat": 16, "lon": 32}` will be quite slow, because we will end up reading the whole file to extract data for a single chunk.


```{tip}
See this Unidata [blogpost](https://www.unidata.ucar.edu/blogs/developer/entry/chunking_data_why_it_matters) on netCDF chunking for more.
```