# CESM Timeseries on the Cloud Example

Here, we use a subset of the CESM2-Large Ensemble to take a look at performance of reading in datasets.

Here, we use a single ensemble member historical experiment with daily data from 1850 to 2009, with a total dataset size of 600+ GB, from 13 netCDF4 files.

| File Access Method | Dataset Lazy Read In Wall Time | Visualization Wall Time |
| --- | --- | --- |
| s3 netcdf | 21.3 s | 6 min 21 s |
| kerchunk | 2.2 s | 40.2 s |

The **"s3netcdf"** refers to accessing the data remotely using the traditional s3 api, the interface with Amazon's cloud storage. There are Google and Microsoft equivalents of this

**Kerchunk** is a new package, developed within the fsspec python community, which aims to improve io performance when reading from cloud hosted datasets which are not neccessarily in a ***"cloud optimized*** format (ex. netCDF vs. Zarr) where netCDF was engineered to performant on regular POSIX filesystems


## Imports

In [2]:
import os
import fsspec
import s3fs
import ujson   # fast json
from fsspec_reference_maker.hdf import SingleHdf5ToZarr 
from fsspec_reference_maker.combine import MultiZarrToZarr
import xarray as xr
import dask
import hvplot.xarray
import fsspec_reference_maker
from ncar_jobqueue import NCARCluster
from distributed import Client
import cftime
import glob

## Gather a list of files available from an Amazon s3 bucket
As mentioned previously, s3 is Amazon's object store file system. For this example, I moved a single ensemble member's worth of **daily historical timeseries output** from the CESM2-LE dataset. 

In [5]:
fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)

In [6]:
filelist = fs.glob(f'ncar-cesm2-lens/tseries/*nc')
filelist

['ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18500101-18591231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18600101-18691231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18700101-18791231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18800101-18891231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18900101-18991231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19000101-19091231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19100101-19191231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19200101-19291231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19300101-19391231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19400101-19491231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmi

We can check the size of a single dataset here

In [7]:
print(fs.size(filelist[0])/1e9, 'gb')

37.649093521 gb


We need to add the `s3://` portion at the beginning to specify this is from the s3 filesystem

In [8]:
urls = ["s3://" + f for f in filelist]

so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')

## Generate the Mapper Files

Set the output directory of the json mapping files

In [9]:
json_dir = 'test_cesm2_le'

We set up a sample file to help with generating these files

In [10]:
def gen_json(u):
    with fs.open(u, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        p = u.split('/')
        fname = p[-1]
        outf = f'{json_dir}.{fname}.json'
        print(outf)
        with open(f"test_cesm2_le/{outf}", 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());

### Spin up a Cluster
We will spin up a dask cluster to add distributed/parallel computing - we are spinning up 10 workers, each with 25 GB of RAM and a single processor

In [11]:
cluster = NCARCluster()
cluster.scale(10)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,

0,1
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.12.206.52:37201,Workers: 0
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/mgrover/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


Now that we have our cluster spun up, we can compute these reference files in parallel!

In [12]:
%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);

CPU times: user 7.61 s, sys: 923 ms, total: 8.53 s
Wall time: 4min 55s


(None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None)

### Merge the reference files together
The previous process produced a **single reference file for each netCDF file** - let's merge these all together to create single store.

In [13]:
furls = sorted(glob.glob('test_cesm2_le/*'))
furls

['test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18500101-18591231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18600101-18691231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18700101-18791231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18800101-18891231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18900101-18991231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19000101-19091231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19100101-19191231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19200101-19291231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19300101-19391231.nc.json',
 'test_cesm2_le/test_cesm2_le.b.e21.BHISTcmip6.f09_g17.

We need to apply a small fix, specifying the `_FillValue` as a string instead of a bytes object. This is applied to each file upon read-in. We also subset for a single variable, 3D air temperature (`T`)

In [14]:
def fix_attrs(ds):
    for var in ds:
        ds[var].attrs['_FillValue'] = ds[var]._FillValue.astype(str)
    return ds[['T']]

Now, we can generate the single store! We add a few extra arguements here... no need to worry about the details here.

In [15]:
mzz = MultiZarrToZarr(furls, 
    storage_options={'anon':False}, 
    remote_protocol='s3',
    remote_options={'anon' : 'True'},   #JSON files  
    xarray_open_kwargs={
        'decode_cf' : False,
        'mask_and_scale' : False,
        'decode_times' : False,
        'use_cftime' : False,
        'drop_variables': ['reference_time'],
        'decode_coords' : False
    },
    xarray_concat_args={
        "join": "override",
        "combine_attrs": "override",
        "dim": "time"
    },
    preprocess=fix_attrs
)

In [16]:
%%time
#%%prun -D multizarr_profile 
mzz.translate('cesm2-le-tseries-aws.json')

CPU times: user 12.3 s, sys: 411 ms, total: 12.7 s
Wall time: 17.1 s


---
## Read directly from S3 (without `kerchunk`)
We went through all that work previously, but let's take a look at what the performance would be ***without*** using this new `Kerchunk` package.

In [17]:
fs_s3 = s3fs.S3FileSystem(anon=True)

In [18]:
def read_through_xarray(filelist):
    ds_list = []

    for file in filelist:
        remote_file_obj = fs_s3.open(file, mode='rb')
        ds_list.append(xr.open_dataset(remote_file_obj, engine='h5netcdf', chunks={})[['T']])
        
    return xr.concat(ds_list, dim='time')

Our wall time for lazily reading in the datasets isn't **too** bad...

In [19]:
%%time
ds_read_directly = read_through_xarray(filelist)

CPU times: user 7.45 s, sys: 979 ms, total: 8.43 s
Wall time: 25.6 s


But, if we actually load in some data to plot, you'll see that it is pretty slow...

In [20]:
%%time
ds_read_directly.T[:1460, 0, 100, 100].hvplot(x='time', grid=True)



CPU times: user 18.1 s, sys: 1.07 s, total: 19.2 s
Wall time: 6min 45s


## Benchmark Reading in from `Kerchunk` mappings
`fsspec` recently implemented 

In [21]:
rpath = 'cesm2-le-tseries-aws.json'
s_opts = {'skip_instance_cache':True}
r_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=rpath,
                       remote_protocol='s3', remote_options=r_opts)

In [22]:
m = fs.get_mapper("")

In [25]:
%%time
ds = xr.open_dataset(m, engine="zarr", decode_times=False)

1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.


CPU times: user 2.76 s, sys: 82.7 ms, total: 2.84 s
Wall time: 3.32 s


### Plot the output from the Mapper Method

In [26]:
%%time
ds.T[:1460, 0, 100, 100].hvplot(x='time', grid=True)

CPU times: user 21.3 s, sys: 1.94 s, total: 23.2 s
Wall time: 40.3 s
