# What is behind auto-kerchunk?

Auto-kerchunk was developed in 2021, following [this blog post example of applying kerchunk to NetCDF4 datas stored in S3](https://medium.com/pangeo/fake-it-until-you-make-it-reading-goes-netcdf4-data-on-aws-s3-as-zarr-for-rapid-data-access-61e33f8fe685), by adapting it to an HPC environment and automating the creation of a kerchunk catalog of many existing datasets in an HPC datalake.

This notebook explains what is behind auto-kerchunk by manually proceeding with the creation of a kerchunk catalog with recent update from the kerchunk capability 'auto_dask' .

For ODATIS data sets which are available in DATARMOR cluster and online access, in this example, we creates kerchunk catalogue in two format, 'HPC' and 'cloud'



auto-kerchunk was created to 
- convert multiple NetCDF files to kerchunk catalogue, 
- make an intake catalogue to be able to submit a job to PBS scheduler, 
- auto-mate that using bash script and submit that to PBS scheduler.  

Please refer to [this documentation](https://pangeo-data.github.io/clivar-2022/pangeo101/chunking_introduction.html) to understand what kerchunk is (as well as what zarr and 'chunk' mean).


## Starting Dask cluster on HPC

Please refer to the [Dask-hpcconfig datarmor example Jupyter notebook](https://github.com/umr-lops/dask-hpcconfig/tree/main/docs/tutorials) to understand what the next three cells mean.  

In [None]:
import dask_hpcconfig
from distributed import Client

In [None]:
overrides = {}
# overrides = { "cluster.cores": 28 , "cluster.processes": 6 }

cluster = dask_hpcconfig.cluster("datarmor", **overrides)
client = Client(cluster)
cluster.scale(20)
client


In [None]:
!qstat -u todaka

## Converting Multiple NetCDF Files to a Kerchunk Catalogue

We use `kerchunk.combine.auto_dask`  with `kerchunk.hdf.SingleHdf5ToZarr`  

`kerchunk.combine.auto_dask`  convert each NetCDF files into kerchunk catalogue and concatenate them to one kerchunk catalogue all at once, in parallel using dask. 

## We will use [Atlantic - European North West Shelf - Ocean Physics Analysis and Forecast from Copernicus Marine services](https://data.marine.copernicus.eu/product/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/services) 

This data has different filename for same date range. (below tagged as 'tag')
We open each file and see if we can make concatenation easily.


In [None]:
# Transformation of 
#/home/datawork-taos-s/public/data_tmp/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/NWS-MFC_004_013_mdt.nc

import glob
import xarray as xr


base_url='/home/datawork-taos-s/public/data_tmp/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/'


years=[str(i)  for i in range(2022,2023)]
months=["0"+str(i)  for i in range(6,9)]

tags=[ ]


file_paths=[]

file_pattern = "*.nc"
newpath=glob.glob(dir_url + file_pattern)
print(newpath)
display(xr.open_mfdataset(newpath,chunks={}))#,engine='h5netcdf'))
file_paths.extend([newpath[0]])
#print(dir_url + file_pattern,(glob.glob(dir_url + file_pattern)))
#file_paths.extend(glob.glob(dir_url + file_pattern))
            #print(dir_url + file_pattern)
#print(file_paths)
def translate_dask(file):
    url = "file://" + file
    #print("working on ", file)
    with fsspec.open(url) as inf:
        h5chunks = SingleHdf5ToZarr(inf, url, inline_threshold=100)
        return h5chunks.translate()


b = db.from_sequence(file_paths)
result_indask = b.map(translate_dask)
a = result_indask.compute()[0]

In [None]:
#transformation of 3D value, then 2D value

import glob
import xarray as xr


base_url='/home/datawork-taos-s/public/data_tmp/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/'


years=[str(i)  for i in range(2022,2023)]
months=["0"+str(i)  for i in range(6,9)]


tags=["CUR",  "SAL",  "TEM", ]
tags=["SSS", "SST", "SSC","BED","MLD","SSH",]



file_paths=[]
for tag in tags:
    for year in years:
        newpath=[]
        for month in months:
            dir_url=base_url
            file_pattern = year+"/" + month + "/*"+ tag +"*.nc"
            newpath.extend(glob.glob(dir_url + file_pattern))
    print([newpath[0]])
    display(xr.open_mfdataset([newpath[0]],chunks={},engine='h5netcdf'))
    file_paths.extend(newpath)
            

### Method 1

We use `kerchunk.hdf.SingleHdf5ToZarr` with `dask.bag` to convert each NetCDF file to kerchunk catalogs, and then concatenate them with `kerchunk.combine.MultiZarrToZarr` to create a single kerchunk catalog. This workflow is used in the first version of auto-kerchunk.


In [None]:
%%time
import dask.bag as db
import fsspec
from kerchunk.hdf import SingleHdf5ToZarr


def translate_dask(file):
    url = "file://" + file
    #print("working on ", file)
    with fsspec.open(url) as inf:
        h5chunks = SingleHdf5ToZarr(inf, url, inline_threshold=100)
        return h5chunks.translate()


b = db.from_sequence(file_paths)
result_indask = b.map(translate_dask)
result = result_indask.compute()

from kerchunk.combine import MultiZarrToZarr

mzz = MultiZarrToZarr(
    result,
    concat_dims=["time"],
)
a = mzz.translate()

### Method 2

We use `kerchunk.combine.auto_dask`  instead of `kerchunk.combine.MultiZarrToZarr`  as described above.


### Method 3. 

We use `kerchunk.combine.auto_dask`  with `kerchunk.hdf.SingleHdf5ToZarr`  

`kerchunk.combine.auto_dask`  convert each NetCDF files into kerchunk catalogue and concatenate them to one kerchunk catalogue all at once, in parallel using dask.   

## Loading data to Xarray using kerchunk and verify.

In [None]:
%%time
import xarray as xr

test = xr.open_dataset(
    "reference://",
    engine="zarr",
    backend_kwargs={
        "storage_options": {
            "fo": a,
        },
        "consolidated": False,
    },
    chunks={},
)
test

## Publishing the kerchunk for the usage from HPC 


In [None]:
import os
#base_url='/home/ref-cersat-public/sea-surface-temperature/odyssea/'
base_url='/home/datawork-taos-s/intranet/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/'
#base_url='/home/datawork-taos-s/public/data/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/MetO-NWS-PHY-hi-TEM/'

name='datatmp_mdt'
name='datatmp_2022_3D'
name='datatmp_2022_2D'


name=base_url+name.replace('/', '_')
os.makedirs(name, exist_ok=True)
print(name)

In [None]:
import json
jsonfile=name+'.json.zstd'#+'IFR-L4_HRSST-SSTfnd-ODYSSEA-ATL_002-v02.1-fv01.0.json.zstd'
storage_options_in= {"compression": "zstd"}
with open(jsonfile, mode='w') as f :
    json.dump(a, f)

# make sure to do above json creation for each group of kerchunk file (2D, 3D, mdt) 

## Publishing the kerchunk for the usage from Cloud 


In [None]:
names=!find /home/datawork-taos-s/intranet/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/ |grep json.zst |grep datatmp
#names=[names[0]]
names

In [None]:

import os
paths= [ os.path.dirname(name)   for name in names]
paths=list(set(paths))
def createpath(path):
    newpath=path.replace('intranet', 'public')
    return os.makedirs(newpath, exist_ok=True)
createpath= [ createpath(path)   for path in paths]

In [None]:
%%time
import xarray as xr
test= xr.open_dataset(
    "reference://", engine="zarr",
    backend_kwargs={
        "storage_options": {
            "fo":'file://'+names[0],
        },
        "consolidated": False
    },chunks={}
)


In [None]:
lat=48.43
lon=-5.021
test.sel().sel(lat=lat,lon=lon,method='nearest',depth=0).thetao.plot()


## Translate kerchunk catalogue for intranet access to https access

In [None]:
import fsspec
import ujson
import dask



def match_keys(mapping, value):
    for k in mapping:
        if k in value: 
            return k
        
    raise ValueError(f"could not find {value} in mapping") 
    
def match_in_keys(mapping,value):
    try:
        match_keys(mapping,value)
        return True
    except ValueError:
        return False    

def rename_target(refs, renames):
    #from kerchunk.utils import conslidate
    print('in rename_target')
    """Utility to change URLs in a reference set in a predictable way

    For reference sets including templates, this is more easily done by
    using template overrides at access time; but rewriting the references
    and saving a new file means not having to do that every time.

    Parameters
    ----------
    refs: dict
        Reference set
    renames: dict[str, str]
        Mapping from the old URL (including protocol, if this is how they appear
        in the original) to new URL

    Returns
    -------
    dict: the altered reference set, which can be saved
    """
    fs = fsspec.filesystem("reference", fo=refs)  # to produce normalised refs
    refs = fs.references
    out = {}
    for k, v in refs.items():
        if isinstance(v, list) and v[0] in renames:
            out[k] = [renames[v[0]]] + v[1:]
        elif isinstance(v, list) and match_in_keys(renames, v[0]) :
            url = v[0]
            #print(url)
            key = match_keys(renames, url)
            new_url = url.replace(key, renames[key])
            out[k] = [new_url] + v[1:]
            #print(new_url)
        else:
            out[k] = v
        #    print('boo')
    return consolidate(out)

def rename_target_files(
    url_in, renames, url_out=None, storage_options_in=None, storage_options_out=None):
    #print('in rename_target_files',url_in,renames,url_out,storage_options_in, storage_options_out)

    """Perform URL renames on a reference set - read and write from JSON

    Parameters
    ----------
    url_in: str
        Original JSON reference set
    renames: dict
        URL renamings to perform (see ``renate_target``)
    url_out: str | None
        Where to write to. If None, overwrites original
    storage_options_in: dict | None
        passed to fsspec for opening url_in
    storage_options_out: dict | None
        passed to fsspec for opening url_out. If None, storage_options_in is used.

    Returns
    -------
    None
    """
    with fsspec.open(url_in, **(storage_options_in or {})) as f:
        print(url_in,storage_options_in)
        old = ujson.load(f)
    new = rename_target(old, renames)
    if url_out is None:
        url_out = url_in
    if storage_options_out is None:
        storage_options_out = storage_options_in
    with fsspec.open(url_out, mode="wt", **(storage_options_out or {})) as f:
        ujson.dump(new, f)

def consolidate(refs):
    """Turn raw references into output"""
    out = {}
    for k, v in refs.items():
        if isinstance(v, bytes):
            try:
                # easiest way to test if data is ascii
                out[k] = v.decode("ascii")
            except UnicodeDecodeError:
                out[k] = (b"base64:" + base64.b64encode(v)).decode()
        else:
            out[k] = v
    return {"version": 1, "refs": out}

#@dask.delayed
def translate(name):
    in_path='file:///home/datawork-taos-s/intranet/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/MetO-NWS-PHY-hi-TEM/'
    in_path='file:///home/datawork-taos-s/intranet/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/'

    out_path='file:///home/datawork-taos-s/public/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/MetO-NWS-PHY-hi-TEM/'
    out_path='file:///home/datawork-taos-s/public/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/'
    name=name.replace('/home/datawork-taos-s/intranet/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/MetO-NWS-PHY-hi-TEM/','')
    name=name.replace('/home/datawork-taos-s/intranet/kerchunk/NORTHWESTSHELF_ANALYSIS_FORECAST_PHY_004_013/','')
    url_in = in_path+name
    url_out = out_path+name

    renames={'file:///home/datawork-taos-s/public/':'https://data-taos.ifremer.fr/'} 

    storage_options_in= {}#{"compression": "zstd"}
    storage_options_out= {"compression": "zstd"}
    return rename_target_files(
        url_in, renames, url_out, storage_options_in=storage_options_in
        , storage_options_out=storage_options_out ) 

In [None]:
translated= [ translate(name)   for name in names]

Create intake catalogue

In [None]:
!cat /home/datawork-taos-s/public/kerchunk/ref-copernicus.yaml
!cat /home/datawork-taos-s/intranet/kerchunk/ref-copernicus.yaml


In [None]:
import fsspec
import intake
import xarray as xr


In [None]:
catalogue = "/home/datawork-taos-s/intranet/kerchunk/ref-copernicus.yaml"
#catalogue = "https://data-taos.ifremer.fr/kerchunk/ref-copernicus.yaml"
#catalogue = "https://data-taos.ifremer.fr/kerchunk/ref-cersat.yaml"

cat = intake.open_catalog(catalogue)
cat.data_tmp

In [None]:
test=cat.data_tmp(type='mdt').to_dask()
lat=48.43
lon=-5.021
test#.sel().sel(lat=lat,lon=lon,method='nearest',depth=0).thetao.plot()


In [None]:
test=cat.data_tmp(year='2022').to_dask()
lat=48.43
lon=-5.021
test#.sel().sel(lat=lat,lon=lon,method='nearest',depth=0).thetao.plot()
