In [1]:
from datetime import datetime, timedelta, timezone
from typing import cast

import icechunk
import numpy as np
import virtualizarr as vz
import xarray as xr
import zarr
from icechunk import VirtualChunkSpec
from obstore.store import MemoryStore, S3Store
from virtualizarr.manifests import ManifestArray
from virtualizarr.manifests.store import ObjectStoreRegistry
from zarr.abc.store import Store

from hrrrparser import HRRRParser
from hrrrparser.codecs import LEVEL_COORDINATES

#### AWS PDS HRRR bucket information

In [2]:
scheme = "s3://"
bucket = "noaa-hrrr-bdp-pds"
prefix = "hrrr.20250710/conus"

#### Configure the Virtualizarr parser to pre-generate all 18 values of the step dimension for appending.

In [3]:
parser = HRRRParser(steps=18)

#### Generate the ObjectStoreRegistry to be used for virtual chunk access.

In [4]:
object_store = S3Store(
    bucket=bucket,
    skip_signature=True,
)
registry = ObjectStoreRegistry({f"{scheme}{bucket}": object_store})

In [5]:
file_urls = [
    f"{scheme}{bucket}/{prefix.rstrip('/')}/hrrr.t22z.wrfsfcf16.grib2",
    f"{scheme}{bucket}/{prefix.rstrip('/')}/hrrr.t23z.wrfsfcf16.grib2",
]
file_urls

['s3://noaa-hrrr-bdp-pds/hrrr.20250710/conus/hrrr.t22z.wrfsfcf16.grib2',
 's3://noaa-hrrr-bdp-pds/hrrr.20250710/conus/hrrr.t23z.wrfsfcf16.grib2']

#### Use intermediate memory cache for scanning
The Gribberish library will need to scan every message in the GRIB file which can generate a lot of http requests to S3 and increase latency.
We can use obstore to create a MemoryStore, read the file into that MemoryStore and scan the data in the memory store as if it were actually coming from the S3 bucket but without the latency associated with a large number of requests

In [6]:
def cache_and_open_virtual_dataset(url, scheme, bucket, loadable_variables):
    store, path_in_store = registry.resolve(url)
    memory_store = MemoryStore()
    buffer = store.get(path_in_store).bytes()
    memory_store.put(path_in_store, buffer)
    cached_reg = ObjectStoreRegistry({f"{scheme}{bucket}": memory_store})
    vds = vz.open_virtual_dataset(
        url=url,
        parser=parser,
        registry=cached_reg,
        loadable_variables=loadable_variables,
    )
    return vds

#### Eagerly load GRIB file level variables
Several of the variables are extracted from message level metadata and are the same for all messages within the GRIB file.  For efficiency we eagerly load these variables so that we can later serialize them as native chunks rather than virtual chunks.

The GribberishCodec actually reads metadata from the first message in the file to extract values for LEVEL_COORDINATES and the other coordinate variables.  This is accomplished by having special logic for these variable keys in the codec's `_decode_single` method.  Icechunk expects to use the codec specified in the variable's `serializer` encoding key when serializing data, but since this codec is only intended for reading, the `_encode_single` method is not implemented in the codec so we need to remove this `serializer` key so Icechunk just writes the value directly.

In [7]:
loadable = LEVEL_COORDINATES + ["time", "step", "latitude", "longitude"]

vds1 = cache_and_open_virtual_dataset(
    url=file_urls[1],
    scheme=scheme,
    bucket=bucket,
    loadable_variables=loadable,
)

In [8]:
vds2 = cache_and_open_virtual_dataset(
    url=file_urls[0],
    scheme=scheme,
    bucket=bucket,
    loadable_variables=loadable,
)

In [9]:
def sanitize_variables(ds, loadable):
    for var in loadable:
        if var in ds:
            del ds[var].encoding["serializer"]

    for name, var in ds.variables.items():
        if "reference_date" in var.attrs:
            del var.attrs["reference_date"]
            del var.attrs["forecast_date"]
            del var.attrs["forecast_end_date"]

In [10]:
sanitize_variables(vds1, loadable)
sanitize_variables(vds2, loadable)

#### Configure Icechunk store
For testing we're using an in-memory Icechunk store.  We also need to configure the store so that it is authorized to read chunks data from the GRIB files stored in S3.

This is a multi-step process, we configure a `VirtualChunkContainer` that maps a url prefix to an Icechunk I/O store that can be used for reading chunks.  We also need to configure the container credentials that the store will use to authorize S3 requests.  In this case our S3 bucket allows anonymous access.

In [11]:
storage = icechunk.in_memory_storage()
config = icechunk.RepositoryConfig.default()
s3_chunk_store = icechunk.s3_store(
    region="us-east-1",
)
config.set_virtual_chunk_container(
    icechunk.VirtualChunkContainer(f"{scheme}{bucket}/", s3_chunk_store)
)
credentials = icechunk.containers_credentials(
    {f"{scheme}{bucket}/": icechunk.s3_anonymous_credentials()}
)
repo = icechunk.Repository.open_or_create(
    storage=storage, config=config, authorize_virtual_chunk_access=credentials
)
session = repo.writable_session("main")

#### Modify the time variable encoding
Icechunk doesn't yet support the extension `dtypes` recently added to Zarr so we can't directly serialize a `datetime64[s]` value.  We update the time variable encoding to use standard CF "seconds since" encoding.

In [12]:
encoding = vds1.time.encoding
encoding["units"] = "seconds since 1970-01-01"
encoding["calendar"] = "standard"
encoding["dtype"] = "int64"
vds1.time.encoding = encoding
vds2.time.encoding = encoding

#### Container validation.
Some of our `ChunkEntry`s use the Virtualizarr convention of `path=""` to represent an empty chunk.  Currently there is a small issue in `to_icechunk` which fails when validating these `ChunkEntry`s so we need to use `validate_containers=False` until a fix is applied.

In [13]:
vds1.vz.to_icechunk(session.store, validate_containers=False)

#### Utility functions for region writing

In [14]:
def generate_chunk_key(
    index: tuple[int, ...],
    time_index: int,
) -> list[int]:
    index_list = list(index)
    index_list[0] = time_index
    return index_list


def get_time_index(store: Store, time: np.datetime64):
    time_array = zarr.open_array(store, path="time", mode="r")
    epoch = np.datetime64("1970-01-01T00:00:00")
    seconds_since_epoch = (time - epoch) / np.timedelta64(1, "s")
    encoded_time = int(seconds_since_epoch)

    chunk_size = time_array.chunks[0] if time_array.chunks else 1000

    for i in range(0, time_array.shape[0], chunk_size):
        end_idx = min(i + chunk_size, time_array.shape[0])
        chunk = time_array[i:end_idx]

        # Find encoded value in current chunk
        local_indices = np.where(chunk == encoded_time)[0]
        if len(local_indices) > 0:
            return i + int(local_indices[0])

    return None


def extend_time_dimension(store: Store, time: np.datetime64):
    time_array = zarr.open_array(store, path="time", mode="a")
    old_len = time_array.shape[0]
    new_len = old_len + 1
    time_array.resize((new_len,))
    new_index = new_len - 1
    time_array[new_index] = time
    return new_index


def write_virtual_variable_region(
    name: str,
    var: xr.Variable,
    store: Store,
    time_index: int,
    increment_time: bool,
):
    ma = cast(ManifestArray, var.data)
    manifest = ma.manifest

    it = np.nditer(
        [manifest._paths, manifest._offsets, manifest._lengths],  # type: ignore[arg-type]
        flags=[
            "refs_ok",
            "multi_index",
            "c_index",
        ],
        op_flags=[["readonly"]] * 3,  # type: ignore
    )

    if increment_time:
        arr = zarr.open_array(store, path=name, mode="a")
        new_shape = list(arr.shape)
        new_shape[0] += 1
        arr.resize(tuple(new_shape))

    last_updated_at = datetime.now(timezone.utc) + timedelta(seconds=1)
    virtual_chunk_spec_list = [
        VirtualChunkSpec(
            index=generate_chunk_key(it.multi_index, time_index=time_index),
            location=path.item(),
            offset=offset.item(),
            length=length.item(),
            last_updated_at_checksum=last_updated_at,
        )
        for path, offset, length in it
        if path
    ]

    store.set_virtual_refs(
        array_path=name,
        chunks=virtual_chunk_spec_list,
        validate_containers=False,  # we already validated these before setting any refs
    )

#### Check if the GRIB forecast time already exists in the Icechunk store.
If it doesn't exist extend the time dimension with the value from the GRIB file to be appended.

In [15]:
time_index = get_time_index(store=session.store, time=vds2.time[0].values)
increment_time = False
if time_index is None:
    time_index = extend_time_dimension(store=session.store, time=vds2.time[0].values)
    increment_time = True

#### Write each virtual variable to the time/step region in the Icechunk store
If the Icechunk store variable needs to be extended with the new forecast time extend it.

In [16]:
virtual_variables = {
    name: var
    for name, var in vds2.variables.items()
    if isinstance(var.data, ManifestArray)
}

for name, var in virtual_variables.items():
    write_virtual_variable_region(
        name=name,
        var=var,
        store=session.store,
        time_index=time_index,
        increment_time=increment_time,
    )

In [18]:
ds = xr.open_zarr(session.store, group="/", zarr_format=3, consolidated=False)
ds

In [19]:
ds["tmp_isobar"].isel(y=100, x=100, step=16).values

array([[  0.        ,   0.        , 266.54208374, 283.99029541,
        295.74993896, 290.52233887, 291.21044922],
       [  0.        ,   0.        , 266.31001282, 283.92590332,
        295.52978516, 292.79974365, 290.33868408]])

In [20]:
ds["tmp_isobar"].isel(y=100, x=100).sel(
    time=np.datetime64("2025-07-10T18:00:00.000000000"), step=np.timedelta64(16, "h")
).values

array([  0.        ,   0.        , 266.31001282, 283.92590332,
       295.52978516, 292.79974365, 290.33868408])