# NCAR Earth System Data Science WIP Talk
__This presentation is based on work I did during the [NCAR Summer Internship in Parallel Computational Science (SIParCS) program](https://www2.cisl.ucar.edu/siparcs)__
### Lucas Sterzinger -- Atmospheric Science PhD Candidate at UC Davis
* [Twitter](https://twitter.com/lucassterzinger)
* [GitHub](https://github.com/lsterzinger)
* [Website](https://lucassterzinger.com)



#  Motivation:
* NetCDF is not cloud optimized
* Other formats, like Zarr, aim to make accessing and reading data from the cloud fast and painless
* However, most geoscience datasets available in the cloud are still in their native NetCDF/HDF5, so a different access method is needed

# What do I mean when I say "Cloud Optimized"?
![Move to cloud diagram](images/cloud-move.png)

In traditional scientific workflows, data is archived in a repository and downloaded to a separate computer for analysis (left). However, datasets are becoming much too large to fit on personal computers, and transferring full datasets from an archive to a seperate machine can use lots of bandwidth.

In a cloud environment, the data can live in object storage (e.g. AWS S3), and analysis can be done in an adjacent compute instances, allowing for low-latency and high-bandwith access to the dataset.

## Why NetCDF doesn't work well in this workflow

NetCDF is probably the most common binary data format for atmospheric/earth sciences, and has a lot of official and community support. However, the NetCDF format/API requires either a) loading the entire dataset in order to access the header/metadata and retreive a chunk of data or b) use a serverside utility like THREDDS/OPeNDAP/

![NetCDF File Object](images/single_file_object.png)

## The Zarr Solution
The [Zarr data format](https://zarr.readthedocs.io/en/stable/) alleviates this problem by storing the metadata and chunks in seperate files that can be accessed as-needed and in parallel.

![Zarr](images/zarr.png)

## _However_
While Zarr proves to be very good for this cloud-centric workflow, most cloud-available data is currently only available in NetCDF/HDF5/GRIB2 format. While it would be _wonderful_ if all this data converted to Zarr overnight, it would be great if in the meantime there was a way to use some of the Zarr spec, right?

# Introducting `fsspec-reference-maker`
[Github page](https://github.com/intake/fsspec-reference-maker)

`fsspec-reference-maker` works by analysing the NetCDF header/metadata info, extracting byte-ranges for each variable chunk, and creating a Zarr-spec metadata file. This file is plaintext and can opened and analyzed with xarray very quickly. When a user requests a certain chunk of data, the NetCDF4 API is bypassed entirely and the Zarr API is used to extract the specified byte-range.

![reference-maker vs zarr](images/referencemaker_v_zarr.png)

## How much of a difference does this make, really?
Testing this method on 24 hours of 5-minute GOES-16 data and accessing via native NetCDF, Zarr, and NetCDF + ReferenceMaker:

![workflow results](images/workflow_results.png)

***
# Let's try it out!

### Import `fsspec-reference-maker` and make sure it's at the latest version (`0.0.3` at the time of writing)

In [None]:
import fsspec_reference_maker
fsspec_reference_maker.__version__

In [None]:
import xarray as xr
import matplotlib.pyplot as plt
from fsspec_reference_maker.hdf import SingleHdf5ToZarr
from fsspec_reference_maker.combine import MultiZarrToZarr
import fsspec
from glob import glob

## `fsspec` -- What is it?
* Provides unified interface to different filesystem types
* Local, cloud, http, dropbox, Google Drive, etc
    * All accessible with the same API

In [31]:
from fsspec.registry import known_implementations
known_implementations.keys()

dict_keys(['file', 'memory', 'dropbox', 'http', 'https', 'zip', 'tar', 'gcs', 'gs', 'gdrive', 'sftp', 'ssh', 'ftp', 'hdfs', 'arrow_hdfs', 'webhdfs', 's3', 's3a', 'wandb', 'oci', 'adl', 'abfs', 'az', 'cached', 'blockcache', 'filecache', 'simplecache', 'dask', 'dbfs', 'github', 'git', 'smb', 'jupyter', 'jlab', 'libarchive', 'reference'])

### Open a new filesystem, of type `s3` (Amazon Web Services storage)

In [None]:
fs = fsspec.filesystem('s3', anon=True)

Use `fs.glob()` to generate a list of files in a certain directory

In [None]:
flist = fs.glob("s3://noaa-goes16/ABI-L2-SSTF/2020/210/*/*.nc")

In [None]:
flist

### Prepend `s3://` to the URLS

In [None]:
flist = ['s3://' + f for f in flist]

In [None]:
flist[:3]

### Start a dask cluster
[Dask](https://dask.org/) is a python package that allows for easily parallelizing python code. 

In [None]:
from dask.distributed import Client
client = Client()
client

### Definte function to return a reference dictionary for a given S3 file URL

This function does the following:
1. `so` is a dictionary of options for `fsspec.open()`
2. Use `fsspec.open()` to open the file given by URL `f`
3. Using `fsspec-reference-maker.SingleHdf5ToZarr()` and supplying the file object `infile` and URL `f`, generate reference with `.translate()`

In [None]:
def gen_ref(f):
    so = dict(
        mode="rb", anon=True, default_fill_cache=False, default_cache_type="none"
    )
    with fsspec.open(f, **so) as infile:
        return SingleHdf5ToZarr(infile, f, inline_threshold=300).translate()

### Map `gen_ref` to each member of `flist_bag` and compute
_Note: if running interactively on Binder, this will take a while since only one worker is available and the references will have to be generated in serial. See option for loading from jsons below_

In [None]:
import dask.bag as db
flist_bag = db.from_sequence(flist, npartitions=len(flist))
flist_bag

In [None]:
%time dicts = flist_bag.map(gen_ref).compute()

In [None]:
dicts[0]

### _Save/load references to/from JSON files (optional)_
The individual dictionaries can be saved as JSON files if desired

In [None]:
# import ujson
# for d in dicts:
#     # Generate name from corresponding URL:
#     # Grab URL, strip everything but the filename, 
#     # and replace .nc with .json
#     name = d['templates']['u'].split('/')[-1].replace('.nc', '.json')

#     with open(name, 'w') as outf:
#         outf.write(ujson.dumps(d))

These generated jsons can then be loaded back in as a dict

In [None]:
# import ujson
# dicts = []

# for f in sorted(glob('./example_jsons/individual/*.json')):
#     with open(f,'r') as fin:
#         dicts.append(ujson.load(fin))

### Use `MultiZarrToZarr` to combine the 24 individual references into a single reference
In this example we passed a list of reference dictionaries, but you can also give it a list of `.json` filepaths (commented out)

In [None]:
mzz = MultiZarrToZarr(
    dicts,
    # sorted((glob('./example_jsons/individual/*.json'))),
    remote_protocol='s3',
    remote_options={'anon':True},
    xarray_open_kwargs={
        "decode_cf" : False,
        "mask_and_scale" : False,
        "decode_times" : False,
        "decode_timedelta" : False,
        "use_cftime" : False,
        "decode_coords" : False
    },
    xarray_concat_args={'dim' : 't'}
)

References can be saved to a file (`combined.json`) or passed back as a dictionary (`mzz_dict`)

In [None]:
%time mzz.translate('./combined.json')
# mzz_dict = mzz.translate()