***
# Let's try it out!

### Import `kerchunk` and make sure it's at the latest version (`0.0.6` at the time of writing)

In [None]:
import kerchunk
kerchunk.__version__

_If Kerchunk is not at the latest version, update with pip/conda: and **restart the kernel**_

In [None]:
# !pip install --upgrade kerchunk

In [None]:
import xarray as xr
from kerchunk.hdf import SingleHdf5ToZarr
from kerchunk.combine import MultiZarrToZarr
import fsspec
from glob import glob
import ujson

## `fsspec` -- What is it?
* Provides unified interface to different filesystem types
* Local, cloud, http, dropbox, Google Drive, etc
    * All accessible with the same API

In [None]:
from fsspec.registry import known_implementations
known_implementations.keys()

### Open a new filesystem, of type `s3` (Amazon Web Services storage)
This tells `fsspec` what type of storage system to use (AWS S3) and any authentication options (this is a public dataset, so use anonymous mode `anon=True`)

In [None]:
fs = fsspec.filesystem('s3', anon=True)

Use `fs.glob()` to generate a list of files in a certain directory. Goes data is stored in `s3://noaa-goes16/<product>/<year>/<day_of_year>/<hour>/<datafile>.nc` format.

This `glob()` returns all files in the 210th day of 2020 (July 28th, 2020)

In [None]:
flist_full = fs.glob('s3://noaa-goes16/ABI-L1b-RadC/2024/050/*/*.nc')

In [None]:
print(f"{len(flist_full)} files. First 10:")
flist_full[:10]

### Prepend `s3://` to the URLS

In [None]:
flist = ['s3://' + f for f in flist_full[::20]]

In [None]:
len(flist)

### Opening directly with xarray is slow, even in the same region:

if you're in AWS, this will tell you your region name.

In [None]:
import boto3
boto3.client('s3').meta.region_name

The cell below opens the first 10 files with `xr.open_mfdataset()` and takes ~50 seconds to run

In [None]:
%%time
n_files_to_open = 10
ds = xr.open_mfdataset(
    [
        fs.open(f) for f in flist[:n_files_to_open]
    ], 
    combine='nested',
    concat_dim='t'
)
ds

## Example of creating a single reference

In [None]:
u = flist[0]
u

Open the file object with `fsspec`, and process with Kerchunk. The `SingleHdf5ToZarr` class takes in the file object and its url as required arguments. The `inline_threshold` parameter sets the number of bytes a chunk must be to be stored directly in the metadata file (instead of a referenced byte-range). 

`.translate()` returns a `dict` of extracted metadata for use with `ReferenceFileSystem`. This can be aggregated with other metadata or written to disk.

In [None]:
%%time
reference = SingleHdf5ToZarr(u, inline_threshold=100, storage_options={'anon':True}).translate()

In [None]:
with open('test.json', 'w') as outf:
    outf.write(ujson.dumps(reference))

## Create references for all files in `flist`

## With a Dask cluster
[Dask](https://dask.org/) is a python package that allows for easily parallelizing python code. This section starts a local client (using whatever processors are available on the current machine). This can also be done just as easily using [Dask clusters](https://docs.dask.org/en/stable/deploying.html). 

In [None]:
from dask.distributed import client

In [None]:
client = Client()

### Using ESIP Jupyterhub

This should be used instead of the above local Dask cluster if running on ESIP's JupyterHub at https://nebari.esipfed.org

In [None]:
import sys  
sys.path.insert(1, '/shared/users/lib/')

from nebari_tools import start_dask_cluster, stop_dask_cluster

client,cluster = start_dask_cluster(region='us-east-1', worker_min=2)

In [None]:
client

## Definte function to return a reference dictionary for a given S3 file URL

This function does the following:
1. `so` is a dictionary of options for `fsspec.open()`
2. Use `fsspec.open()` to open the file given by URL `f`
3. Using `kerchunk.SingleHdf5ToZarr()` and supplying the file object `infile` and URL `f`, generate reference with `.translate()`

In [None]:
def gen_ref(f):
    return f, SingleHdf5ToZarr(
        f, 
        inline_threshold=300, 
        storage_options={
            'anon' : True
        }
    ).translate()

### Map `gen_ref` to each member of `flist_bag` and compute
Dask bag is a way to map a function to a set of inputs. This next couple blocks of code tell Dask to take all the files in `flist`, break them up into the same amount of partitions and map each partition to the `gen_ref()` function -- essentially mapping each file path to `gen_ref()`. Calling `bag.compute()` on this runs `gen_ref()` in parallel with as many workers as are available in Dask client.

_Note: if running interactively on Binder, this will take a while since only one worker is available and the references will have to be generated in serial. See option for loading from jsons below_

In [None]:
import dask.bag as db
bag = db.from_sequence(flist).map(gen_ref)

In [None]:
bag.visualize()

In [None]:
%time dicts = bag.compute()

Now, each url in `flist` has been used to generate a dictionary of reference data in `dicts`

### _Save/load references to/from JSON files (optional)_
The individual dictionaries can be saved as JSON files if desired

In [None]:
import os
os.makedirs('./jsons', exist_ok=True)

In [None]:
import ujson
for (url, d) in dicts:
    # Generate name from corresponding URL:
    # Grab URL, strip everything but the filename, 
    # and replace .nc with .json
    json_name = url.split('/')[-1].replace('.nc','.json')
    with open(f'./jsons/{json_name}', 'w') as outf:
        outf.write(ujson.dumps(d))

These generated jsons can then be loaded back in as a dict

In [None]:
# import ujson
# dicts = []

# for f in sorted(glob('./example_jsons/individual/*.json')):
#     with open(f,'r') as fin:
#         dicts.append(ujson.load(fin))

In [None]:
references = [d[1] for d in dicts]

***
### Use `MultiZarrToZarr` to combine the 24 individual references into a single reference
In this example we passed a list of reference dictionaries, but you can also give it a list of `.json` filepaths (commented out)

_Note: the Kerchunk `MultiZarrToZarr` API changed between versions 0.0.5 and 0.0.6. This part assumes the latest version at this time (0.0.6). Please see https://fsspec.github.io/kerchunk/reference.html#kerchunk.combine.MultiZarrToZarr for more details_

In [None]:
mzz = MultiZarrToZarr(
    references,
    # sorted((glob('./example_jsons/individual/*.json'))),
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims='t',
    inline_threshold=0
)

References can be saved to a file (`combined.json`) or passed back as a dictionary (`mzz_dict`)

In [None]:
%time mzz.translate('./combined.json')
# mzz_dict = mzz.translate()