In [1]:
%load_ext jupyter_black

In [1]:
import re
import os
from pathlib import Path
import xarray as xr

FILE_NAME_PATTERN = re.compile(r"/([A-Za-z]+(?:-|_)?[A-Za-z]+)+")
data = Path(os.path.abspath(__name__)).parents[1] / "data"
data

PosixPath('/workspaces/mmmpy/data')

In [2]:
files = sorted((data / "MRMS_MergedReflectivity").glob("*grib2"))[:4]
files

[PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_00.50_20220819-202841.grib2'),
 PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_00.50_20220819-203041.grib2'),
 PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_00.75_20220819-202841.grib2'),
 PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_00.75_20220819-203041.grib2')]

In [4]:
def dims(ds: xr.Dataset) -> xr.Dataset:
    duplicates = ["heightAboveSea"]
    # if more than one file was passed the valid_time should be greater than 1
    if ds.valid_time.size > 1:
        # for which we add a new validTime dimension
        ds = ds.expand_dims({"validTime": ds["valid_time"].to_numpy()})
        duplicates.append("validTime")

    return ds.drop("valid_time").drop_duplicates(duplicates)


def name(ds: xr.Dataset) -> xr.Dataset:
    if len(ds.data_vars) != 1:
        # mrms grib2 data should only have one variable
        raise Exception
    (ds_name,) = ds
    # not storing history, will use the history object to infer a name
    hist = ds.attrs.pop("history", None)
    # if a name was not explicility provided
    # if not name:
    # use the known name if unknow infer one from the file name
    if ds_name != "unknown":
        name = ds_name
    else:
        name_list = FILE_NAME_PATTERN.findall(hist)
        if name_list:
            name = name_list[-1]
        else:
            raise Exception

    return ds.rename({ds_name: name})


def main():
    ds = (
        xr.open_mfdataset(
            files,
            chunks={},
            engine="cfgrib",
            data_vars="minimal",
            combine="nested",
            concat_dim=["heightAboveSea"],
            backend_kwargs=dict(
                mask_and_scale=True,
                decode_times=True,
                concat_characters=True,
                decode_coords=True,
                # use_cftime="%Y-%m",
                decode_timedelta=None,
                lock=None,
                indexpath="{path}.{short_hash}.idx",
                filter_by_keys={},
                read_keys=[],
                encode_cf=("parameter", "time", "geography", "vertical"),
                squeeze=True,
                time_dims={"valid_time"},
            ),
        )
        .pipe(dims)
        .pipe(name)
    )
    return ds


# ds = main()
# ds

In [8]:
mfds = ds.copy()
mfds.to_zarr(data / "mfds")

<xarray.backends.zarr.ZarrStore at 0x7fa2dc88e8f0>

In [11]:
xr.open_dataset(data / "mfds", engine="zarr")

In [5]:
def generate():
    for file in files[:4]:
        ds1 = xr.open_dataset(
            file,
            engine="cfgrib",
            chunks={},
            # data_vars="minimal",
            # combine="nested",
            # concat_dim=["heightAboveSea"],
            backend_kwargs=dict(
                mask_and_scale=True,
                decode_times=True,
                concat_characters=True,
                decode_coords=True,
                # use_cftime="%Y-%m",
                decode_timedelta=None,
                lock=None,
                indexpath="{path}.{short_hash}.idx",
                filter_by_keys={},
                read_keys=[],
                encode_cf=("parameter", "time", "geography", "vertical"),
                squeeze=True,
                time_dims={"valid_time"},
            ),
        )
        yield ds1.expand_dims(
            {
                "validTime": [ds1["valid_time"].to_numpy()],
                "heightAboveSea": [ds1["heightAboveSea"].to_numpy()],
            }
        ).drop_duplicates(["validTime", "heightAboveSea"]).drop("valid_time").pipe(
            name
        )


datasets = tuple(generate())

In [110]:
for x in xr.concat(datasets,dim="validTime").drop_duplicates("validTime").groupby("validTime"):
    print(x)

(numpy.datetime64('2022-08-19T20:28:00.000000000'), <xarray.Dataset>
Dimensions:                    (latitude: 3500, longitude: 7000,
                                heightAboveSea: 2)
Coordinates:
  * latitude                   (latitude) float64 54.99 54.98 ... 20.02 20.01
  * longitude                  (longitude) float64 230.0 230.0 ... 300.0 300.0
  * heightAboveSea             (heightAboveSea) float64 500.0 750.0
    validTime                  datetime64[ns] 2022-08-19T20:28:00
Data variables:
    MRMS_MergedReflectivityQC  (heightAboveSea, latitude, longitude) float32 dask.array<chunksize=(2, 3500, 7000), meta=np.ndarray>
Attributes:
    GRIB_edition:            2
    GRIB_centre:             161
    GRIB_centreDescription:  161
    GRIB_subCentre:          0
    Conventions:             CF-1.7
    institution:             161)
(numpy.datetime64('2022-08-19T20:30:00.000000000'), <xarray.Dataset>
Dimensions:                    (latitude: 3500, longitude: 7000,
                   

In [159]:
import shutil
import zarr

teststore = data / "test-bucket"
group_name = "3DRefl"

if teststore.exists():
    shutil.rmtree(teststore)
import numpy as np
for vt, ds in xr.concat(datasets,dim="validTime").drop_duplicates("validTime").groupby("validTime"):
    (dsname,) = ds
    ds = ds.expand_dims({"validTime":[vt]}).fillna(np.nan)

    ds["MRMS_MergedReflectivityQC"].attrs.clear() 
    if not teststore.exists():
        ds.to_zarr(
            teststore,
            mode="a",
            group=group_name,
            compute=True,
            
        )
    else:
        ds.drop(["latitude","longitude","heightAboveSea"]).to_zarr(
            teststore,
            mode="a",
            group=group_name,
            append_dim="validTime",
            compute=True,
        )

xr.open_zarr(teststore / group_name, consolidated=False)

  return to_zarr(  # type: ignore


Unnamed: 0,Array,Chunk
Bytes,373.84 MiB,186.92 MiB
Shape,"(2, 2, 3500, 7000)","(1, 2, 3500, 7000)"
Count,2 Graph Layers,2 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 373.84 MiB 186.92 MiB Shape (2, 2, 3500, 7000) (1, 2, 3500, 7000) Count 2 Graph Layers 2 Chunks Type float32 numpy.ndarray",2  1  7000  3500  2,

Unnamed: 0,Array,Chunk
Bytes,373.84 MiB,186.92 MiB
Shape,"(2, 2, 3500, 7000)","(1, 2, 3500, 7000)"
Count,2 Graph Layers,2 Chunks
Type,float32,numpy.ndarray


In [204]:
import pandas as pd
paths = pd.Series((data / "MRMS_MergedReflectivity").glob("*.grib2"))
vt=pd.to_datetime(paths.apply(lambda p: p.name).str.extract(r"(\d{8}-\d{6})",expand=False))
for vt,x in paths.groupby(vt):
    print(tuple(x))
    # print(x)
# pd.to_datetime(for p in paths)#pd.Series(x.name for x in (data / "MRMS_MergedReflectivity").glob("*.grib2")).str.extract(r"(\d{8}-\d{6})",expand=False))

(PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_00.50_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_00.75_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_01.00_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_01.25_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_01.50_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_01.75_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_02.00_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivityQC_02.25_20220819-202841.grib2'), PosixPath('/workspaces/mmmpy/data/MRMS_MergedReflectivity/MRMS_MergedReflectivi

In [46]:
# type(Group)
import zarr
root:zarr.Group = zarr.group(teststore / group)
def unpack_root()->tuple[zarr.Array,...]:
    yield from (root[key] for key in ("latitude","longitude","heightAboveSea","validTime"))
x,y,z,t = unpack_root()
tuple(z)
# x,y,z,t :tuple[zarr.Array,...]= root["latitude"], root["longitude"],  root["heightAboveSea"], root["validTime"],
# x,y
# z.values
# g:zarr.Group = zarr.open(teststore / group)
# tuple(g.array_keys())
# g.info.obj
# g.info_items()
# g.store.items()
# g.c
# vt: np.datetime64 = ds["validTime"].values[0]
# (vt,) = pd.to_datetime(ds["validTime"].values).strftime("%Y-%m-%dT")

# ds["validTime"].to_numpy().astype("datetime64[m]")
# vt
# np.str
# import zarr

# g: zarr.Group = zarr.open(teststore / "3DRefl")
# cs: zarr.DirectoryStore = g.chunk_store
# g.get("validTime").size
# # print(g.get("validTime"))
# # tuple(g)

(500.0,)