# Tutorial

This is a run-through example for how to use this package. We scan a set of netCDF4/HDF5 files, and create a single emsemble, virtual dataset, which can be read in parallel from remote using zarr.

## Single file JSONs

The Kerchunk.hdf.SingleHdf5ToZarr is used to create a single .json file for each file passed to it. Here we use it to create a number of .json files pointing towrds the ERA5 pubic dataset on [AWS](https://registry.opendata.aws/ecmwf-era5/). We will compute a number of different times and variables to demonstrate different methods of combining them.

In [62]:
from kerchunk.hdf import SingleHdf5ToZarr 
import fsspec

# ERA5-pds is located in us-west-2
fs = fsspec.filesystem('s3', anon=True)
flist = (fs.glob('s3://era5-pds/2020/*/data/air_pressure_at_mean_sea_level.nc')[:2]
        + fs.glob('s3://era5-pds/2020/*/data/*sea_surface_temperature.nc')[:2])


In [21]:
fs2 = fsspec.filesystem('')  

so = dict(anon=True, default_fill_cache=False, default_cache_type='first')

In [31]:
from pathlib import Path
import os
import ujson

so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')

def gen_json(file):
    with fs.open(file, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, file, inline_threshold=300) 
        # inline threshold adjusts the Size below which binary blocks are included directly in the output
        # a higher inline threshold can result in a larger json file but faster loading time
        variable = file.split('/')[-1].split('.')[0]
        month = file.split('/')[2] 
        outf = f'{month}_{variable}.json'
        with fs2.open(outf, 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());

In [32]:
%%time
for file in flist:
    gen_json(file)

CPU times: user 23.8 s, sys: 2.83 s, total: 26.7 s
Wall time: 18min


These files can now be opened as virtual datasets through xarray

In [38]:
import xarray as xr
fs_ = fsspec.filesystem("reference", fo='01_air_pressure_at_mean_sea_level.json', ref_storage_args={'skip_instance_cache':True},
                       remote_protocol='s3', remote_options={'anon':True})
m = fs_.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={})
ds

Unnamed: 0,Array,Chunk
Bytes,2.88 GiB,0.92 MiB
Shape,"(744, 721, 1440)","(24, 100, 100)"
Count,3721 Tasks,3720 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 2.88 GiB 0.92 MiB Shape (744, 721, 1440) (24, 100, 100) Count 3721 Tasks 3720 Chunks Type float32 numpy.ndarray",1440  721  744,

Unnamed: 0,Array,Chunk
Bytes,2.88 GiB,0.92 MiB
Shape,"(744, 721, 1440)","(24, 100, 100)"
Count,3721 Tasks,3720 Chunks
Type,float32,numpy.ndarray


## Combine multiple kerchunk'd datasets into a single logical aggregate dataset

The Kerchunk.combine.MultiZarrtoZarr method uses the output generated above to create a single ensemble dataset, with one set of references pointing to all of the chunks in the individual files.

MultiZarrtoZarr provides a number of convenience methods to combine these files. The simplest is to concatenate along a specified dimension. Time0 in this instance

In [136]:
from kerchunk.combine import MultiZarrToZarr

In [137]:
json_list = fs2.glob("*_air_pressure_at_mean_sea_level.json")

mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims=['time0'],
    identical_dims = ['lat', 'lon'])

d = mzz.translate()

with fs2.open('air_pressure_at_mean_sea_level_combined.json', 'wb') as f:
    f.write(ujson.dumps(d).encode())

In [138]:
m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

If instead we wanted to concatenate these two files together along another dimension we can make use of coo_map to do so:

In [139]:
import re
ex = re.compile(r'.*(\d+)_air')

In [140]:
ex.match(json_list[0]).groups()[0]

'1'

In [141]:
mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    coo_map = {'new_time':ex},
    concat_dims=['new_time'],
    identical_dims = ['lat', 'lon']
)

d = mzz.translate()

In [142]:
m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

Similarly if we wanted each file to constitute a new variable name:

In [143]:
mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    coo_map = {"var":ex},
    concat_dims=['time0'],
    identical_dims = ['lat', 'lon']
)

d = mzz.translate()

In [144]:
m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

Or instead we can use the internal file attributes to create new variables:

In [145]:
mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    coo_map = {"var":"attr:institution"},
    concat_dims=['time0'],
    identical_dims = ['lat', 'lon']
)

d = mzz.translate()

In [146]:
m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

To merge two variables into one we can simply concatenate along an existing dimension

In [149]:
json_list = fs2.glob("01_sea_surface_temperature.json") + fs2.glob("01_air_pressure_at_mean_sea_level.json")

mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims=['time0'],
    identical_dims = ['lat', 'lon'])

d = mzz.translate()

m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

This however iterates across the concatenating variable in both datasets and thus can be slow, if we know the two datasets share identical coordinates we can add the relevant dictionary items from the one variable to the other:

In [152]:
with fs2.open(json_list[0], 'rb') as f:
    var_1 = ujson.load(f)

with fs2.open(json_list[1], 'rb') as f:
    var_2 = ujson.load(f)

var_1['refs'].update(var_2['refs'])

with fs2.open('vars_combined.json', 'wb') as f:
    f.write(ujson.dumps(var_1).encode())

In [153]:
m = fsspec.filesystem("reference", fo=var_1).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

Pre-process can be used to apply arbitrary functions to the refs item in the input jsons before combining, in this case dropping the air_pressure_at_mean_sea_level variable before combining both sea_surface_temperature variables

In [168]:
def pre_process(refs):
    for k in list(refs):
        if k.startswith('air_pressure_at_mean_sea_level'):
            refs.pop(k)
    return refs

In [169]:
json_list = fs2.glob("vars_combined.json") + fs2.glob("02_sea_surface_temperature.json")

In [171]:
mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims=['time0'],
    identical_dims = ['lat', 'lon'],
    preprocess = pre_process)

d = mzz.translate()


with fs2.open('sea_surface_temperature_combined.json', 'wb') as f:
    f.write(ujson.dumps(d).encode())
    
m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

Similarly post-process can be used to apply an arbitrary function to the final dictionary before returning.
A known issue with this partcular dataset is that no fill value has been assigned to the lat and lon coordinates and thus default to 0, here we use post process to change the zarr fill_value attribute by opening the final json as a zarr store.

In [172]:
import zarr
def modify_fill_value(out):
    out_ = zarr.open(out)
    out_.lon.fill_value = -999
    out_.lat.fill_value = -999
    return out

def postprocess(out):
    out = modify_fill_value(out)
    return out

In [178]:
json_list = fs2.glob("air_pressure_at_mean_sea_level_combined.json") + fs2.glob("sea_surface_temperature_combined.json")

mzz = MultiZarrToZarr(json_list,                
    remote_protocol='s3',
    remote_options={'anon':True},
    concat_dims=['time0'],
    identical_dims = ['lat', 'lon'],
    postprocess = postprocess)

d = mzz.translate()

with fs2.open('combined.json', 'wb') as f:
    f.write(ujson.dumps(d).encode())

m = fsspec.filesystem("reference", fo=d).get_mapper("")
ds = xr.open_dataset(m, engine="zarr", consolidated=False)
ds

## Using the output

This is what a user of the generated dataset would do. This person does not need to have kerchunk installed, or even h5py (the library we used to initially scan the files). 

Here opening a previously generated kerchunk sidecar file that contains 10 ERA5 variables across a 2 and a half year span.

In [179]:
#The sidecar file has been compressed to make it small enough (~14M)to store on github, this decompresses it 
import zstandard
with open('ERA5_2020_2022_multivar.json.zst', 'rb') as compressed:
    decomp = zstandard.ZstdDecompressor()
    with open('kerchunk.json', 'wb') as destination:
        decomp.copy_stream(compressed, destination)

In [186]:
%%time
fs = fsspec.filesystem("reference", fo='kerchunk.json', ref_storage_args={'skip_instance_cache':True},
                       remote_protocol='s3', remote_options={'anon':True})
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine="zarr", backend_kwargs={'consolidated':False}, chunks={'time0':96})
ds

CPU times: user 1.26 s, sys: 23.4 ms, total: 1.28 s
Wall time: 1.28 s


Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 81.87 GiB 3.66 MiB Shape (21168, 721, 1440) (96, 100, 100) Count 26521 Tasks 26520 Chunks Type float32 numpy.ndarray",1440  721  21168,

Unnamed: 0,Array,Chunk
Bytes,81.87 GiB,3.66 MiB
Shape,"(21168, 721, 1440)","(96, 100, 100)"
Count,26521 Tasks,26520 Chunks
Type,float32,numpy.ndarray


In [187]:
ds = ds.sel(time0 = '2021-01-01T00:00:00')

import hvplot.xarray
import panel as pn
import cartopy.crs as ccrs

variables = list(ds.data_vars)

sel = pn.widgets.Select(options=variables, name='Data Variable')
pn.Column(sel, ds.hvplot.image(z=sel, cmap = 'viridis', 
                               coastline=True,projection=ccrs.Orthographic(25, 5),
                               global_extent=True, frame_height=540))