In [1]:
import fsspec
import kerchunk.hdf
import ujson
import xarray as xr
import dask
import os

from tempfile import TemporaryDirectory

from kerchunk.combine import MultiZarrToZarr, merge_vars, drop
from kerchunk.hdf import SingleHdf5ToZarr
from tqdm import tqdm

### ALARO 4km output as netCDF files
Rerun of 3 years of 6-hourly ALARO forecasts run on ATOS
- forecast range: 60h
- hourly output

Stored as one netCDF-file per:
- starthour
- variable

Data available in s3-bucket at [LUMI-O](https://docs.lumi-supercomputer.eu/storage/lumio/)

In [2]:
# Create the S3 filesystem
fs = fsspec.filesystem('s3',endpoint_url="https://465001235.lumidata.eu",anon=True)

# Check the netCDF-files
nc_files = fs.glob("s3://public/*nc")
print(f"We have a total of {len(nc_files)} netCDF-files")
print("files:")
for file in nc_files[:10]:
    print(f"  - {os.path.basename(file)}")

# Create urls
urls = [f"s3://{url}" for url in nc_files]

We have a total of 8 netCDF-files
files:
  - msl_BE-04_AO40_EXT_WFP1_1hr_2022010100-2022010312.nc
  - msl_BE-04_AO40_EXT_WFP1_1hr_2022010106-2022010318.nc
  - msl_BE-04_AO40_EXT_WFP1_1hr_2022010112-2022010400.nc
  - msl_BE-04_AO40_EXT_WFP1_1hr_2022010118-2022010406.nc
  - t2m_BE-04_AO40_EXT_WFP1_1hr_2022010100-2022010312.nc
  - t2m_BE-04_AO40_EXT_WFP1_1hr_2022010106-2022010318.nc
  - t2m_BE-04_AO40_EXT_WFP1_1hr_2022010112-2022010400.nc
  - t2m_BE-04_AO40_EXT_WFP1_1hr_2022010118-2022010406.nc


In [3]:
# Check the contents of a netCDF-file
with fs.open(urls[0]) as fileObj:
    ds = xr.open_dataset(fileObj)

ds

## Using `kerchunk`
### 1. Creating a single file `.json` reference file

In [4]:
# Storage options
so = dict(
    endpoint_url="https://465001235.lumidata.eu",
    mode="rb",
    anon=True, 
    default_fill_cache=False, 
    default_cache_type='first'
)

# Function to generate json file
def generate_json_reference(url, out_path):
    # two options:
    # 1. open file directly with fsspec.open giving the storage options as dictionary (example here)
    # 2. Use the already created filesystem: fs.open() as done above
    with fsspec.open(url,**so) as infile:  
        h5chunks = SingleHdf5ToZarr(infile, url, inline_threshold=300)
        # inline threshold adjusts the Size below which binary blocks are included directly in the output
        # a higher inline threshold can result in a larger json file but faster loading 
        var = os.path.basename(url).split("_")[0]
        reftime = os.path.basename(url).split("_")[-1].split("-")[0]
        outfname = os.path.join(out_path,f"{var}-{reftime}.json")
        with open(outfname, "wb") as outfile:
            outfile.write(ujson.dumps(h5chunks.translate()).encode())
    return outfname


# Check for 1 file
# - create
reference_file = generate_json_reference(urls[0], "./")

# - read
fs_zarr = fsspec.filesystem(
    "reference",
    fo=reference_file,
    fs = fs,
    skip_instance_cache=True,
)

ds = xr.open_dataset(
    fs_zarr.get_mapper(""),
    engine="zarr",
    backend_kwargs={"consolidated": False}
)
ds
    

### 2. Creating a multifile `.json` reference file

In [5]:
# Mean Sea Level Pressure
nc_files = fs.glob("s3://public/msl*.nc")

# Create urls
urls = [f"s3://{url}" for url in nc_files]

# Create temporary directory
td = TemporaryDirectory()
temp_dir = td.name

In [6]:
# Create single reference files
output_files = []
for fil in tqdm(urls):
    outf = generate_json_reference(fil, temp_dir)
    output_files.append(outf)
output_files

# combine individual references into single consolidated reference
mzz = MultiZarrToZarr(
    output_files,
    identical_dims=["x", "y", "leadtime","Lambert_Conformal","lon","lat"], # dimensions/coordinate to keep constant
    coo_map={"reftime":"data:reftime"}, # special syntax to create a new dimension called "reftime" with values "reftime" from the individual netCDFs (data)
    concat_dims=["reftime"] # dimension along which to concatenate
)

multizarr = './msl_along_reftime.json'
with open(multizarr, 'wb') as f:
    f.write(ujson.dumps(mzz.translate()).encode())

100%|██████████| 4/4 [00:01<00:00,  3.03it/s]


In [7]:
# Check the json reference filie
fs_zarr = fsspec.filesystem(
    "reference",
    fo=multizarr,
    fs = fs,
    skip_instance_cache=True,
)

ds = xr.open_dataset(
    fs_zarr.get_mapper(""),
    engine="zarr",
    backend_kwargs={"consolidated": False}
)
ds


### 3. Merging different variables

In [8]:
# Get the different variables
nc_files = fs.glob("s3://public/*nc")
n_total_files = len(nc_files)
variables = []
for file in nc_files:
    var = os.path.basename(file).split("_")[0]
    if var not in variables:
        variables.append(var)


In [9]:
zarrs = []
#progress_bar = tqdm(
#    desc="creating zarrs",
#    total=n_total_files + len(variables), 
#    unit="tasks"
#)

for var in variables:
    # Create urls
    urls = fs.glob("s3://public/*nc")
    urls = [f"s3://{url}" for url in urls]
    output_files = []

    # Create single reference files
    for fil in urls:
        outf = generate_json_reference(fil, temp_dir)
        output_files.append(outf)
#        progress_bar.update(1)

    # Concatenate to 1 reference file per variable
    mzz = MultiZarrToZarr(
        output_files,
        identical_dims=["x", "y", "leadtime","Lambert_Conformal","lon","lat"], 
        coo_map={"reftime":"data:reftime"}, 
        concat_dims=["reftime"] 
    )

    zarr = os.path.join(temp_dir,f"{var}_along_reftime.json")
    zarrs.append(zarr)
    with open(zarr, 'wb') as f:
        f.write(ujson.dumps(mzz.translate()).encode())
#    progress_bar.update(1)


# Merge zarrs of different variables to 1 zarr
output_file = "alaro-4p0km.json"
mz = merge_vars(zarrs)
with open(output_file, "wb") as f:
    f.write(ujson.dumps(mz).encode())




In [10]:
# Check the json reference filie
fs_zarr = fsspec.filesystem(
    "reference",
    fo=output_file,
    fs = fs,
    skip_instance_cache=True,
)

ds = xr.open_dataset(
    fs_zarr.get_mapper(""),
    engine="zarr",
    backend_kwargs={"consolidated": False}
)
ds


You might notice that there has appeared an additional (useless) *height* coordinate.  
This coordinate is coming from the *t2m* netCDF files, we can get rid of it by applying by defining a preprocessing function  
and giving it to the `multiZarrToZarr` function

In [11]:
# drop is a pre-defined preprocessing function of kerchunk.combine 
# that drops the specified variables from the json reference file before combining them.
preproc = drop("height") 


In [12]:
# Same as above but now with pre-processing function
zarrs = []
#progress_bar = tqdm(
#    desc="creating zarrs",
#    total=n_total_files + len(variables), 
#    unit="tasks"
#)

for var in variables:
    # Create urls
    urls = fs.glob("s3://public/*nc")
    urls = [f"s3://{url}" for url in urls]
    output_files = []

    # Create single reference files
    for fil in urls:
        outf = generate_json_reference(fil, temp_dir)
        output_files.append(outf)
#        progress_bar.update(1)

    # Concatenate to 1 reference file per variable
    mzz = MultiZarrToZarr(
        output_files,
        identical_dims=["x", "y", "leadtime","Lambert_Conformal","lon","lat"], 
        coo_map={"reftime":"data:reftime"}, 
        concat_dims=["reftime"],
        preprocess=preproc  # <---------- Here we added the pre-processing function
    )

    zarr = os.path.join(temp_dir,f"{var}_along_reftime.json")
    zarrs.append(zarr)
    with open(zarr, 'wb') as f:
        f.write(ujson.dumps(mzz.translate()).encode())
#    progress_bar.update(1)


# Merge zarrs of different variables to 1 zarr
output_file = "alaro-4p0km.json"
mz = merge_vars(zarrs)
with open(output_file, "wb") as f:
    f.write(ujson.dumps(mz).encode())


In [13]:
# Check the json reference filie
fs_zarr = fsspec.filesystem(
    "reference",
    fo=output_file,
    fs = fs,
    skip_instance_cache=True,
)

ds = xr.open_dataset(
    fs_zarr.get_mapper(""),
    engine="zarr",
    backend_kwargs={"consolidated": False}
)
ds