# ESIP Cloud Computing Meeting Kerchunk Tutorial 2022-04-25
__This presentation is based on work I did during the [NCAR Summer Internship in Parallel Computational Science (SIParCS) program](https://www2.cisl.ucar.edu/siparcs)__
### Lucas Sterzinger -- Atmospheric Science PhD Candidate at UC Davis
* [Twitter](https://twitter.com/lucassterzinger)
* [GitHub](https://github.com/lsterzinger)
* [Website](https://lucassterzinger.com)



#  Motivation:
* NetCDF is not cloud optimized
* Other formats, like Zarr, aim to make accessing and reading data from the cloud fast and painless
* However, most geoscience datasets available in the cloud are still in their native NetCDF/HDF5, so a different access method is needed

# What do I mean when I say "Cloud Optimized"?
![Move to cloud diagram](images/cloud-move.png)

In traditional scientific workflows, data is archived in a repository and downloaded to a separate computer for analysis (left). However, datasets are becoming much too large to fit on personal computers, and transferring full datasets from an archive to a seperate machine can use lots of bandwidth.

In a cloud environment, the data can live in object storage (e.g. AWS S3), and analysis can be done in an adjacent compute instances, allowing for low-latency and high-bandwith access to the dataset.

## Why NetCDF doesn't work well in this workflow

NetCDF is probably the most common binary data format for atmospheric/earth sciences, and has a lot of official and community support. However, the NetCDF format/API requires either a) many small reads to access the metadata for a single file or b) use a serverside utility like THREDDS/OPeNDAP to extract metadata.

![NetCDF File Object](images/single_file_object.png)

## The Zarr Solution
The [Zarr data format](https://zarr.readthedocs.io/en/stable/) alleviates this problem by storing the metadata and chunks in seperate files that can be accessed as-needed and in parallel. Having consolidated metadata means that all the information about the dataset can be loaded and interpreted in a single read of a small plaintext file. With this metadata in-hand, a program can request exactly which chunks of data are needed for a given operation.

![Zarr](images/zarr.png)

## _However_
While Zarr proves to be very good for this cloud-centric workflow, most cloud-available data is currently only available in NetCDF/HDF5/GRIB2 format. While it would be _wonderful_ if all this data converted to Zarr overnight, it would be great if in the meantime there was a way to use some of the Zarr spec, right?

# Introducting `kerchunk`
[Github page](https://github.com/intake/kerchunk)

`kerchunk` works by doing all the heavy lifting of extracting the metadata, generating byte-ranges for each variable chunk, and creating a Zarr-spec metadata file. This file is plaintext and can opened and analyzed with xarray very quickly. When a user requests a certain chunk of data, the NetCDF4 API is bypassed entirely and the Zarr API is used to extract the specified byte-range.

![reference-maker vs zarr](images/referencemaker_v_zarr.png)

## How much of a difference does this make, really?
Testing this method on workflow processing of 24 hours of 5-minute GOES-16 data and accessing via native NetCDF, Zarr, and NetCDF + ReferenceMaker:

![workflow results](images/workflow_results.png)

*Notebooks used to benchmark these times are available here: https://github.com/lsterzinger/cloud-optimized-satellite-data-tests*

***
# Let's try it out!

### Import `kerchunk` and make sure it's at the latest version (`0.1.1` at the time of writing)

In [None]:
import kerchunk
kerchunk.__version__

_If Kerchunk is not at the latest version, update with pip/conda: and **restart the kernel**_

In [None]:
# !pip install --upgrade kerchunk

In [None]:
import xarray as xr
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import fsspec
from glob import glob

## `fsspec` -- What is it?
* Provides unified interface to different filesystem types
* Local, cloud, http, dropbox, Google Drive, etc
    * All accessible with the same API

In [None]:
from fsspec.registry import known_implementations
known_implementations.keys()

### Open a new filesystem, of type `s3` (Amazon Web Services storage)
This tells `fsspec` what type of storage system to use (AWS S3) and any authentication options (this is a public dataset, so use anonymous mode `anon=True`)

In [None]:
fs = fsspec.filesystem('s3', anon=True)

Use `fs.glob()` to generate a list of files in a certain directory. Goes data is stored in `s3://noaa-goes16/<product>/<year>/<day_of_year>/<hour>/<datafile>.nc` format.

This `glob()` returns all files in the 210th day of 2020 (July 28th, 2020)

In [None]:
flist = fs.glob("s3://noaa-goes16/ABI-L2-SSTF/2020/210/*/*.nc")

In [None]:
flist

### Prepend `s3://` to the URLS

In [None]:
flist = ['s3://' + f for f in flist]

In [None]:
flist

## Example of creating a single reference

In [None]:
u = flist[0]
u

In [None]:
with fsspec.open(u, mode="rb", anon=True) as infile:
    reference = SingleHdf5ToZarr(infile, u, inline_threshold=100).translate()

In [None]:
reference

## Create references for all 24 files in `flist`

## With a local Dask cluster
[Dask](https://dask.org/) is a python package that allows for easily parallelizing python code. This section starts a local client (using whatever processors are available on the current machine). This can also be done just as easily using [Dask clusters](https://docs.dask.org/en/stable/deploying.html). 

In [None]:
#from dask.distributed import Client
#client = Client()
#client

### Using ESIP Jupyterhub

This should be used instead of the above local Dask cluster if running on ESIP's JupyterHub at https://jupyter.qhub.esipfed.org

In [None]:
# first get the esip-qhub credentials via
# !cp -r ~/shared/users/.aws/ ~/

In [None]:
# import os
# import sys
# sys.path.append(os.path.join(os.environ['HOME'],'shared','users','lib'))
# import ebdpy as ebd

# ebd.set_credentials(profile='esip-qhub')

# profile = 'esip-qhub'
# region = 'us-west-2'
# endpoint = f's3.{region}.amazonaws.com'
# ebd.set_credentials(profile=profile, region=region, endpoint=endpoint)
# worker_max = 10
# client, cluster = ebd.start_dask_cluster(profile=profile,
#                                          worker_max=worker_max,
#                                          region=region,
#                                          use_existing_cluster=True,
#                                          adaptive_scaling=False,
#                                          wait_for_cluster=False,
#                                          worker_profile='Small Worker',
#                                          propagate_env=True)

In [None]:
%run /shared/users/environment_set_up/Start_Dask_Cluster_Nebari.ipynb

## Definte function to return a reference dictionary for a given S3 file URL

This function does the following:
1. `so` is a dictionary of options for `fsspec.open()`
2. Use `fsspec.open()` to open the file given by URL `f`
3. Using `kerchunk.SingleHdf5ToZarr()` and supplying the file object `infile` and URL `f`, generate reference with `.translate()`

In [None]:
def gen_ref(f):
    with fsspec.open(f, mode="rb", anon=True) as infile:
        return SingleHdf5ToZarr(infile, f, inline_threshold=300).translate()

### Map `gen_ref` to each member of `flist_bag` and compute
Dask bag is a way to map a function to a set of inputs. This next couple blocks of code tell Dask to take all the files in `flist`, break them up into the same amount of partitions and map each partition to the `gen_ref()` function -- essentially mapping each file path to `gen_ref()`. Calling `bag.compute()` on this runs `gen_ref()` in parallel with as many workers as are available in Dask client.

_Note: if running interactively on Binder, this will take a while since only one worker is available and the references will have to be generated in serial. See option for loading from jsons below_

In [None]:
import dask.bag as db
bag = db.from_sequence(flist, npartitions=len(flist)).map(gen_ref)

In [None]:
bag.visualize()

In [None]:
%time dicts = bag.compute()

Now, each url in `flist` has been used to generate a dictionary of reference data in `dicts`

In [None]:
dicts[0]

### _Save/load references to/from JSON files (optional)_
The individual dictionaries can be saved as JSON files if desired

In [None]:
# import ujson
# for d in dicts:
#     # Generate name from corresponding URL:
#     # Grab URL, strip everything but the filename, 
#     # and replace .nc with .json
#     name = './example_jsons/individual/'+ d['templates']['u'].split('/')[-1].replace('.nc', '.json')

#     with open(name, 'w') as outf:
#         outf.write(ujson.dumps(d))

These generated jsons can then be loaded back in as a dict

In [None]:
# import ujson
# dicts = []

# for f in sorted(glob('./example_jsons/individual/*.json')):
#     with open(f,'r') as fin:
#         dicts.append(ujson.load(fin))

In [None]:
dicts

***
### Use `MultiZarrToZarr` to combine the 24 individual references into a single reference
In this example we passed a list of reference dictionaries, but you can also give it a list of `.json` filepaths (commented out)

_Note: the Kerchunk `MultiZarrToZarr` API changed between versions 0.0.5 and 0.0.6. This part assumes the latest version at this time (0.0.6). Please see https://fsspec.github.io/kerchunk/reference.html#kerchunk.combine.MultiZarrToZarr for more details_

In [None]:
mzz = MultiZarrToZarr(
    dicts,
    # sorted((glob('./example_jsons/individual/*.json'))),
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims='t',
    inline_threshold=0
)

References can be saved to a file (`combined.json`) or passed back as a dictionary (`mzz_dict`)

In [None]:
%time mzz.translate('./combined.json')
# mzz_dict = mzz.translate()