In [None]:
import datetime
import glob
import os
import re
import shutil
import sys
import tempfile
import uuid
import warnings
from datetime import datetime, timedelta, timezone
from functools import reduce
from pathlib import Path, PurePath

import dask
import diskcache as dc
import netCDF4 as nc
import numpy as np
import pandas as pd
import xarray as xr  # >= 2022.11
import zarr
from tqdm.notebook import tqdm

# Overwrite defaults
xr.set_options(keep_attrs=True)

In [None]:
# debug
import inspect
import types
from typing import cast

# this_function_name = cast(types.FrameType, inspect.currentframe()).f_code.co_name

In [None]:
def rebase_path(path_base="urbisphere-dm", path_root=None):
    """return abs path of a higher level directory"""
    from pathlib import Path

    path_root = Path("__file__").parent.resolve() if not path_root else path_root
    path_parts = lambda p: p[0 : (p.index(path_base) + 1 if path_base in p else len(p))]
    return str(Path(*[n for n in path_parts(Path(path_root).parts)]))


sys.path.append(os.path.join(rebase_path(), "interfaces/metadb/notebooks/"))
sys.path.append(os.path.join(rebase_path(), "interfaces/metadb/src/"))
sys.path.append(os.path.join(rebase_path(), "interfaces/filedb/notebooks/"))
sys.path.append(os.path.join(rebase_path(), "common/plotly/notebooks/"))

from ipynb.fs.defs.filedb_datastore import filedb_decode, get_data_dict

# from ipynb.fs.defs.metadb_query import googlesheet_query as vocabulary_query
from ipynb.fs.defs.metadb_vocabulary import (
    dataframe_query,
    metadb_units_conv,
    metadb_vocabulary_query,
)

# from ipynb.fs.defs.metadb_attributes import metadb_combine_globalattrs
from metadb_attributes import metadb_combine_globalattrs

In [None]:
class StopExecution(Exception):
    def _render_traceback_(self):
        pass


def get_dictlist_permutations(input_subset):
    """Expand a dict of (strs|lists) into all possible permutations."""
    import itertools

    keys, values = zip(*input_subset.items())
    values = [v if isinstance(v, list) else [v] for v in values]
    permutations_dicts = [dict(zip(keys, v)) for v in itertools.product(*values)]
    return permutations_dicts


def get_dictlist_flatten(input_subset, joinstr="+"):
    """Flatten dict of (strs|lists) into dict of (strs)."""
    import itertools

    keys, values = zip(*input_subset.items())
    values = [joinstr.join(v) if isinstance(v, list) else (v) for v in values]
    return dict(zip(keys, values))

In [None]:
def get_creation_time(d=datetime.utcnow()):
    d_str = d.replace(microsecond=0, tzinfo=timezone.utc).isoformat()
    return d_str


# def get_gattrs(gattrs):
#     ga = gattrs[0]
#     gs = {}
#     if len(ga) > 1:
#         for g in gattrs[1:]:
#             bo = False
#             for k in gs.keys():
#                 if k in g and not bo:
#                     bo = any(re.compile(m).match(gs[k]) for m in g[k])
#                     if bo:
#                         for k, v in g.items():
#                             if k in ga:
#                                 ga[k] = v

#     gd = {
#         "version_id": version["id"],
#         "version_date": version["time"],
#         "creation_time": get_creation_time(),
#     }
#     ga = {k: "; ".join(v) if isinstance(v, list) else v for k, v in ga.items()}
#     ga = {k: v.format(**gd) for k, v in ga.items()}
#     return ga

In [None]:
def get_gattrs(gattrs, sep=";\n"):
    ga = gattrs[0]
    gs = {}
    if len(ga) > 1:
        for g in gattrs[1:]:
            bo = False
            for k in gs.keys():
                if k in g and not bo:
                    bo = any(re.compile(m).match(gs[k]) for m in g[k])
                    if bo:
                        for k, v in g.items():
                            if k in ga:
                                ga[k] = v

    ct = get_creation_time()
    gd = {
        "version_id": version["id"],
        "version_time": version["time"],
        "version_date": version["time"],
        "creation_time": ct,
        "creation_date": ct[0:10],
    }
    ga = {k: sep.join(v) if isinstance(v, list) else v for k, v in ga.items()}
    ga = {k: v.format(**gd) for k, v in ga.items()}
    return ga


def get_gattrs_pub(gattrs):
    publication_gattrs = {}
    for ga in gattrs:
        if "production_profile" in ga:
            pga = metadb_publication_query(
                ga["production_profile"], publication_name="datasets_default"
            )
            pga = {k: v for k, v in pga.items() if v != ""}
            publication_gattrs = {**publication_gattrs, **pga}
    return publication_gattrs

In [None]:
def parse_config(ioconf_file, ioconf_name=None, version_dict=None):
    """Read configuration file and extract dict that matches version['id']."""
    import toml
    from mergedeep import merge

    def read_config(ioconfig_file):
        if os.path.exists(ioconfig_file):
            with open(ioconfig_file) as f:
                ioconf = toml.load(f)
            return ioconf
        else:
            return None

    # read TOML config file
    ioconf = read_config(ioconfig_file)

    if isinstance(version_dict, dict):
        # lookup of settings
        if ioconfig_name:
            group = ioconfig_name
        else:
            group = Path(ioconfig_file).stem

        if group in ioconf:
            conf_list = [
                d
                for d in ioconf[group]
                if version_dict["id"].startswith(
                    d["version"]["id"] if "version" in d else "v"
                )
            ]
            config = merge(*conf_list)
        else:
            config = {}

        return config
    else:
        return ioconf

## Definitions

In [None]:
def sort_prefix(c_list, prefix=None):
    if not prefix:
        import string

        prefix = list(string.ascii_lowercase)
    p_list = [list(filter(lambda x: x.startswith(p), c_list)) for p in prefix]
    c_set = list(dict.fromkeys([n for m in p_list for n in m]))
    c_not = [x for x in c_list if not x in c_set]
    return c_set + c_not


def reset_coords(ds, sortby=None):
    new_coords = sort_prefix(list(ds.coords), prefix=sortby)
    new_vars = sort_prefix(list(ds.variables), prefix=sortby)
    ds = ds.reset_coords()[new_vars]
    ds = ds.set_coords([n for n in new_coords if not n in ds.dims])
    return ds

In [None]:
def preprocess_reduce(ds, drop_vars=[], drop_dims=[]):
    # drop dims
    ds = ds.drop_dims([k for k in drop_dims if k in list(ds.dims)])

    # drop coordinates/variables
    try:
        for n in drop_vars:
            if n in list(ds.variables):
                ds = ds.drop_vars([n])  # drop variables and coordinate variables
    except:
        pass

    # drop dims from coords
    for c in list(ds.coords):
        c_dims = list(ds.coords[c].dims)
        if len(c_dims) > 1:
            # print(c_dims)
            c_dim = c.split("_", 1)[0]
            ds = ds.assign_coords(
                **{
                    c: ds[c].isel(
                        **{k: 0 for k in [n for n in c_dims if not n == c_dim]},
                        drop=True
                    )
                }
            )

    return ds

In [None]:
def preprocess_merge(ds):
    # deducplicate and sort (time)
    _, index = np.unique(ds["time"], return_index=True)
    ds = ds.isel(time=index)
    ds = ds.sortby("time")

    # set multi-index
    midx = {
        n: [k for k in list(ds.coords) if k.startswith(f"{n}_")]
        for n in ["system", "station", "attributes"]
    }

    midx = {k: v for k, v in list(midx.items()) if len(v) > 0}

    ds = ds.set_index({k: v for k, v in list(midx.items()) if len(v) > 1})
    ds = ds.set_coords([v[0] for k, v in list(midx.items()) if len(v) == 1])
    return ds


def postprocess_merge(ds):
    # reset multi-index
    idx = [
        n
        for n in ds.indexes
        if isinstance(n, object) and n in ds.dims and n not in ["time"]
    ]
    if idx:
        ds = ds.reset_index(idx)

    # drop single-index
    idx = [n for n in ds.indexes if n not in ds.dims]
    if idx:
        ds = ds.drop_indexes(idx)

    # warning exception
    idx = [n for n in ds.indexes if n not in ["time"]]
    if idx:
        logging.warning("Not all indexes could be reset.")

    return ds


def merge_reduce(ds_list, merge_opts=dict(combine_attrs="drop_conflicts")):
    dx = None
    for ds in tqdm(ds_list, disable=len(ds_list) < 3, desc="merge_reduce"):
        if not dx:
            dx = ds
        else:
            dx = xr.merge([dx, ds], **merge_opts)

    return dx

In [None]:
def preprocess_nc(ds):
    # reset single or multi-index
    for d in ["station", "system"]:
        if d in ds.dims and d in ds.coords and d in ds.indexes:
            if isinstance(ds.indexes[d], pd.MultiIndex):
                # reset multi-index
                ds = ds.reset_index([d])  # drop multi index to coordinates
            else:
                # reset single-index
                ds = ds.drop_indexes([d])

        if any(x.startswith(d) for x in ds.indexes):
            # reset single-index, while making no assumptions on the suffix
            ds = ds.reset_index([x for x in ds.indexes if x.startswith(d)])

        if d in ds.dims and d in ds.coords:
            # reset single-index, while making assumptions on the suffix
            ds = ds.rename_vars({d: "{}_group".format(d)})

    return ds


def preprocess_store(ds):
    # reset single or multi-index
    for d in ["station", "system"]:
        if d in ds.dims and d in ds.coords and d in ds.indexes:
            if isinstance(ds.indexes[d], pd.MultiIndex):
                # reset multi-index
                ds = ds.reset_index([d])  # drop multi index to coordinates
            else:
                # reset single-index
                ds = ds.drop_indexes([d])

        if d in ds.dims and d in ds.coords:
            # reset single-index, while making assumptions on the suffix
            ds = ds.rename_vars({d: "{}_group".format(d)})

    return ds


def preprocess_attrs(ds):
    for x in list(ds.data_vars):
        dat = ds[x].attrs.copy()
        ds[x].attrs = {}
        ds[x] = ds[x].assign_attrs(
            {k: v for k, v in dat.items() if k not in ["coordinates"]}
        )
    return ds

In [None]:
def get_input_files(input_subset, default_subset={}, custom_subset={}, debug=False):
    if not default_subset:
        default_subset = dict(
            path_base=input_path_base,
            path=input_path,
            file=input_file,
        )

    def force_list(x):
        return x if isinstance(x, list) else [x]

    input_files = [
        {
            "type": n["file"]["type"],
            "path": os.path.join(n["path_base"], n["path"], n["file"]["path"]),
            "file": n["file"]["file"],
        }
        for n in get_dictlist_permutations(
            dict(
                path_base=force_list(default_subset["path_base"]),
                path=force_list(default_subset["path"]),
                file=default_subset["file"],
            )
        )
    ]

    # expand input_files
    fn_list = [
        {
            "type": input_file["type"],
            "file": os.path.join(input_file["path"], input_file["file"]).format(
                **{**ss, **custom_subset}
            ),
        }
        for input_file in input_files
        for ss in get_dictlist_permutations(input_subset)
    ]  #
    return (input_files, fn_list)


def get_output_files(dx, default_subset={}, custom_subset={}):
    if dx:
        # dataset lookup
        s_id = list(dx["station_id"].values) if "system_id" in list(dx.coords) else [""]
        i_id = list(dx["system_id"].values) if "system_id" in list(dx.coords) else [""]
        ii_id = list(dx["sensor_id"].values) if "sensor_id" in list(dx.coords) else [""]

        # modify output subset dict, based on input settings
        output_subset = get_dictlist_flatten(input_subset)
        output_subset["station_id"] = "".join(s_id) if len(s_id) == 1 else ""
        output_subset["system_id"] = "+".join(i_id)
        output_subset["sensor_id"] = "+".join(ii_id)
        output_subset["time_bounds"] = get_time_bounds(query_range)
        output_subset = {**output_subset, **default_subset}
    else:
        output_subset = default_subset

    output_subset = {**output_subset, **custom_subset}

    if not "output_files" in globals():
        output_files = {"path": output_subset["path"], "file": output_subset["file"]}

    output_file = os.path.join(output_files["path"], output_files["file"]).format(
        **output_subset
    )
    return output_file


def get_time_bounds(query_range):
    tr = query_range.strftime("%Y%m%dT%H%M%S%z").tolist()
    res = tr[0] if tr[0] == tr[1] else "{}_{}".format(*tr)
    return res

In [None]:
def datasets_fsspec_args(filepath):
    """generate a compounded url for fsspec, if relevant, plus arguments for xarray"""
    url = ""
    args = {}

    p = Path(filepath).resolve()

    # is the path an archive?
    if p.suffixes[-1] == ".zip" and len(p.suffixes) > 1:
        url += f"zip::"

        # define the suffix, in case of a trailing archive suffix
        sufx = p.suffixes[-2]
    else:
        sufx = p.suffixes[-1]

    if sufx == ".zarr":
        args["engine"] = "zarr"
    elif sufx == ".nc":
        args["engine"] = "netcdf4"

    # add the file path to the url
    # url += p.as_uri()
    if url != "":
        url += "file://"

    url += f"{p}"  # local files

    return url, args

In [None]:
def get_ds_index(ds):
    # equivalent: functionality similar to `preprocess_reduce`
    a_dict = {
        k: "".join(ds[k].squeeze().values.tolist())
        for k in ["system_group", "system_name", "sensor_name"]
        if k in list(ds.coords)
    }
    if "sensor_name" in a_dict and not "system_name" in a_dict:
        a_dict["system_name"] = a_dict["sensor_name"]
    return a_dict


def datastore_units_query(datastore_df):
    # unit converstion meta data for each file group
    dx = []
    for datastore_id, datastore in datastore_df.groupby(
        [
            ("encoded", "station_id"),
            ("decoded", "system_group"),
            ("decoded", "system_id"),
        ]
    ):
        ifn_series = datastore.iloc[0]
        fn = Path(
            *ifn_series[
                [("location", "path_base"), ("location", "path"), ("location", "file")]
            ].tolist()
        )

        store, store_args = datasets_fsspec_args(fn)
        with xr.open_dataset(store, **store_args) as ds:
            try:
                a = get_ds_index(ds)
                dx.append(tuple([datastore_id, a]))
            except:
                # print(ds)
                pass

    svoc_pat_list = [
        {**{k: re.escape(v) for k, v in dict(t).items()}, **{"original_name": ".*"}}
        for t in {tuple(d[1].items()) for d in dx}
    ]
    units_list = metadb_vocabulary_query(
        svoc_pat_list=svoc_pat_list,
        production_level="L1",
    )
    units_df = pd.concat(
        [
            pd.DataFrame(
                index=pd.MultiIndex.from_frame(
                    pd.DataFrame.from_dict(
                        {k: [v] for k, v in list(d["query"].items())}
                    )
                ),
                data={
                    (oKey, iKey): values
                    for oKey, iDict in d["response"].items()
                    for iKey, values in iDict.items()
                },
            )
            for d in units_list
        ]
    )
    return units_df

In [None]:
def valid_zip(fn):
    import sys
    import zipfile

    try:
        the_zip_file = zipfile.ZipFile(fn)
        ret = the_zip_file.testzip()
        if ret is not None:
            return False
        else:
            return True
    except Exception as ex:
        print("Exception:", ex)
        return False


def strip_extension(x):
    x = Path(x)
    e = []
    while x.suffix in {".zarr", ".zip", ".nc"}:
        e.append(x.suffix)
        x = x.with_suffix("")
    return str(x)  # , "".join(e)


def datasets_to_zarr_zip(dx, output_file, zipstore_args={}, **zarr_args):
    from pathlib import Path

    import zarr

    # on the safe side, always start with an empty zarr.zip file.
    zipstore_args_default = dict(
        mode="w",
        allowZip64=True,
    )
    kw = {**zipstore_args_default, **zipstore_args}

    if isinstance(dx, xr.Dataset):
        dx = {None: dx}

    fn = Path(output_file)
    if not (fn.suffixes[-2] == ".zarr" and fn.suffixes[-1] == ".zip"):
        fn = fn.with_suffix(".zarr.zip")

    for k, ds in dx.items():
        store = zarr.storage.ZipStore(fn, **kw)
        ds.to_zarr(store, group=k, **zarr_args)
        store.close()

        # Any further group is to be updated/appended, by default
        kw["mode"] = "a"

    if not valid_zip(fn):
        logging.warning("File corrupt, removed: `%s`", str(fn))
        fn.unlink()

In [None]:
def datastore_conform(datastore_df, datastore_units):
    import logging

    import cfunits

    cf_attr_dict = {"production_level": "L1"}
    g_attr_dict = get_gattrs(gattrs)

    # @dask.delayed
    def datastore_conform_func(rowid, fn_series, rebase_p):
        dx = []
        p_list = []

        # file paths
        fn = Path(
            *fn_series[
                [
                    ("location", "path_base"),
                    ("location", "path"),
                    ("location", "file"),
                ]
            ].tolist()
        )
        cache_fn = get_output_files(
            None,
            default_subset={
                "path": os.path.join(cache_path_base, cache_path),
                "file": cache_file,
            },
            custom_subset={
                **{n[1]: v for n, v in fn_series.items() if n[0] not in ["location"]},
                **cf_attr_dict,
            },
        )

        # avoid reduncancy
        if Path(cache_fn).is_file():
            logging.warning("File exists: `%s`", cache_fn)

            # overwrite cache or not
            if not query_latest:
                p_list.append((rowid, Path(cache_fn)))
                logging.warning("Skipping file: `%s`", cache_fn)
                return p_list
            else:
                logging.info("Replacing file: `%s`", cache_fn)

        store, store_args = datasets_fsspec_args(fn)
        with xr.open_dataset(store, **store_args) as ds:
            # ds = ds.drop_dims([k for k in ["attributes"] if k in ds.dims])
            dx = ds.coords.to_dataset()

            try:
                s_ind = get_ds_index(ds)
            except:
                logging.exception("Unable to open NetCDF file `%s`.", fn)

            for v_name in list(ds.data_vars):
                v_ind = {**s_ind, "original_name": v_name}
                logging.debug("Index : %s", tuple(v_ind.values()))
                if tuple(v_ind.values()) in datastore_units.index:
                    for _, c_ds in datastore_units.loc[
                        [tuple(v_ind.values())], :
                    ].iterrows():
                        # print(c_ds)
                        c_name = c_ds[("convention", "name")]

                        # data array for the variable
                        da = ds[v_name].copy()

                        # unit value conversion
                        da.values = da.pipe(
                            cfunits.Units.conform,
                            from_units=c_ds.loc[
                                ("convention", "units_conversion_units_from")
                            ],
                            to_units=c_ds.loc[
                                ("convention", "units_conversion_units_to")
                            ],
                        )

                        # assign data array to output dataset
                        dx = dx.assign({c_name: da})

                        # update variable attributes
                        dx[c_name] = dx[c_name].assign_attrs(
                            **{
                                "long_name": c_ds.loc[("convention", "long_name")],
                                "standard_name": c_ds.loc[
                                    ("convention", "standard_name")
                                ],
                                "units": str(
                                    c_ds.loc[
                                        (
                                            "convention",
                                            "units_conversion_units_to",
                                        )
                                    ]
                                ),
                                "cell_methods": c_ds.loc[
                                    ("convention", "cell_methods")
                                ],
                            }
                        )

            # consolidate output dataset
            dx = xr.merge([dx[k] for k in sorted(list(dx.data_vars))])

            # update attributes
            sys.path.append(os.path.join(rebase_p, "interfaces/metadb/src/"))
            from metadb_attributes import metadb_combine_globalattrs

            gattrs_list = [ds.attrs, g_attr_dict]
            gattrs_dict = metadb_combine_globalattrs(gattrs_list, module_base=rebase_p)
            # dx = assign_attrs(gattrs_dict)
            dx.attrs = gattrs_dict
            dx = dx.assign_attrs(**cf_attr_dict)

            # cleanup
            # dx = dx.transpose(*[n for n in dim_list if n in list(dx.dims)])
            dx = dx.transpose(
                "time",
                "station",
                "system",
                "sensor",
                "channel",
                "cell",
                "attributes",
                missing_dims="ignore",
            )
            dx = dx.chunk()
            dx = reset_coords(dx, sortby=dim_list)
            dx = dx.drop_encoding()

            # prepare
            # dx = preprocess_nc(dx)

            # export to cache
            p = Path(cache_fn)
            p.parent.mkdir(parents=True, exist_ok=True)

            try:
                store, store_args = datasets_fsspec_args(cache_fn)
                if store_args["engine"] == "zarr" and p.suffix == ".zip":
                    datasets_to_zarr_zip(dx, p)
                    p_list.append((rowid, p))
                else:
                    dx.to_netcdf(str(p))
                    p_list.append((rowid, p))
            except:
                # logging.info("Unable to write NetCDF file.")
                funcname = cast(types.FrameType, inspect.currentframe()).f_code.co_name
                logging.exception(
                    "`%s`: unable to write NetCDF file in `%s`.", funcname, str(p)
                )

        return p_list

    # unit converstion meta data for each file group
    dx = []
    p_list = []
    datastore_gb = datastore_df.groupby(
        [
            ("encoded", "station_id"),
            ("decoded", "system_group"),
            ("decoded", "system_id"),
        ]
    )
    for datastore_id, datastore in tqdm(datastore_gb, desc="Groupby loop"):
        logging.info("Group `%s` Count `%s`", datastore_id, str(datastore.shape[0]))

        if datastore.shape[0] < 2:
            store_gb = datastore.iterrows()
        else:
            store_gb = tqdm(
                datastore.iterrows(), desc="Datastore loop", total=len(datastore)
            )
        for rowid, fn_series in store_gb:
            p_list.append(datastore_conform_func(rowid, fn_series, rebase_path()))

    # dask.compute(p_list)

    return p_list

In [None]:
def datastore_concat(datastore_df, debug=False):
    import logging
    from copy import deepcopy

    import cfunits

    cf_attr_dict = {"production_level": "L1"}

    # unit converstion meta data for each file group
    dx = []
    dx_keys = []
    p_list = []
    datastore_gb = datastore_df.groupby(
        [
            ("encoded", "station_id"),
            ("decoded", "system_group"),
            ("decoded", "system_id"),
        ]
    )
    for datastore_id, datastore in tqdm(datastore_gb, desc="Groupby loop"):
        logging.info("Group `%s` Count `%s`", datastore_id, str(datastore.shape[0]))
        file_dx = []
        file_attrs = []
        file_key = datastore_id
        for rowid, fn_series in tqdm(
            datastore.iterrows(), desc="Datastore loop", total=len(datastore)
        ):
            # file paths
            fn = Path(
                *fn_series[
                    [
                        ("location", "path_base"),
                        ("location", "path"),
                        ("location", "file"),
                    ]
                ].tolist()
            )

            # avoid reduncancy
            if not Path(fn).is_file():
                logging.info("File exists")
                # continue

            try:
                store, store_args = datasets_fsspec_args(fn)
                ds = xr.open_dataset(store, **store_args, mode="r")
                # ds = xr.open_dataset(fn)
                reduce_kwarg = (
                    query_settings["concat_reduce"]
                    if "concat_reduce" in query_settings
                    else dict(
                        drop_vars=["system_id", "system_name"], drop_dims=["channel"]
                    )
                )
                if "attributes" in ds.dims:
                    ds = ds.drop_dims(["attributes"])
                if "channel" in ds.dims:
                    ds = ds.mean(dim="channel")
                ds = preprocess_reduce(ds, **reduce_kwarg)
                if "time" in ds.dims and "time" in ds.variables:
                    file_dx.append(ds)
                    file_attrs.append(deepcopy(ds.attrs))
            except:
                logging.info("File could not be opened by `xarray`: '%s'", fn)

        # concatenate
        if not file_dx:
            continue

        dx_dict[file_key] = xr.concat(
            file_dx, dim="time", combine_attrs="drop_conflicts"
        )

        # fix/simplifiy coords, drop dim channel
        ds = dx_dict[file_key]

        # time
        try:
            _, time_index = np.unique(ds["time"], return_index=True)
            ds = ds.isel(time=time_index)
            ds = ds.sortby("time")
            ds.time.encoding.update(**enc_conf["time"])
        except:
            logging.info("Skipping `concat` for group `%s`", str(datastore_id))
            continue

        # time helper variable
        time_bounds = pd.Index(
            [
                (
                    query_range[0]
                    if np.any((ds["time"] < query_range[0]).any().values)
                    else pd.to_datetime(ds["time"].min().values)
                ),
                (
                    query_range[1]
                    if np.any((ds["time"] > query_range[1]).any().values)
                    else pd.to_datetime(ds["time"].max().values)
                ),
            ]
        )

        # Slice `time` to query range bounds. Note: excluded here, only within the combine routine.
        # ds = ds.sel(time=slice(time_bounds[0],time_bounds[1]))

        # update attributes
        gattrs_list = file_attrs
        gattrs_dict = metadb_combine_globalattrs(gattrs_list)
        ds.attrs = deepcopy(gattrs_dict)

        # chunks
        # ds = ds.chunk({"time": 3600, "station": 1, "system": 1})

        # debug
        # ds.to_netcdf('/tmp/{}_{}_{}.nc'.format(*datastore_id))

        # pre-export cleanup
        try:
            ds = preprocess_attrs(ds)
            ds = preprocess_merge(ds)
            ds = preprocess_nc(ds)
            # ds = ds.transpose(*[n for n in dim_list if n in list(ds.dims)])
            ds = ds.transpose(
                "time",
                "station",
                "system",
                "sensor",
                "channel",
                "cell",
                "attributes",
                missing_dims="ignore",
            )
            ds = ds.chunk()
            ds = reset_coords(ds, sortby=dim_list)
        except Exception as Argument:
            logging.exception(
                "Unable to restructure `datastore_concat` results for `%s`.",
                str(file_key),
            )
            logging.info("%s", Argument)

        # store concatenated result in (temporary) file
        if nc_temp:
            nc_path = nc_temp.name
            cache_fn = get_output_files(
                ds,
                default_subset={
                    "path": os.path.join(nc_path, cache_path),
                    "file": cache_file,
                },
                custom_subset=reduce(
                    lambda a, b: {**a, **b},
                    [
                        {
                            n[1]: v
                            for n, v in fn_series.items()
                            if n[0] not in ["location"]
                        },
                        cf_attr_dict,
                        {"time_bounds": get_time_bounds(time_bounds)},
                    ],
                ),
            )

            # export to cache, force this stage to be NetCDF4 to shorten time
            p = Path(cache_fn)
            if p.suffix == ".zip":
                p = p.with_suffix("")
            if p.suffix == ".zarr":
                p = p.with_suffix(".nc")

            p.parent.mkdir(parents=True, exist_ok=True)

            try:
                store, store_args = datasets_fsspec_args(str(p))
                if store_args["engine"] == "zarr" and p.suffix == ".zip":
                    datasets_to_zarr_zip(ds, store)
                elif store_args["engine"] == "zarr" and p.suffix == ".zarr":
                    ds.to_zarr(str(store))
                else:
                    with warnings.catch_warnings():
                        warnings.simplefilter("ignore")
                        ds.to_netcdf(str(p))

                if debug:
                    p_debug = Path(os.path.join("../data/tmp/", p.name))
                    shutil.copy(
                        p,
                        p_debug,
                    )
                p_list.append((rowid, p))
            except:
                # logging.exception("Unable to write NetCDF file `%s`.",str((rowid, p)))
                funcname = cast(types.FrameType, inspect.currentframe()).f_code.co_name
                logging.exception(
                    "`%s`: unable to write NetCDF file in `%s`.", funcname, str(p)
                )

    return p_list

In [None]:
def datastore_combine(datastore_df):
    import logging
    import uuid
    from copy import deepcopy

    import cfunits

    def store_export(ds, p, store_path=None, store_mode="a"):
        """helper fuction for export of datasets"""
        p = Path(p)

        # DEBUG: export to cache, force this stage to be NetCDF4
        # if p.suffix == ".zip":
        #    p = p.with_suffix('')
        # if p.suffix == ".zarr":
        #    p = p.with_suffix('.nc')

        p.parent.mkdir(parents=True, exist_ok=True)
        try:
            store, store_args = datasets_fsspec_args(str(p))
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                if store_args["engine"] == "zarr" and p.suffix == ".zip":
                    datasets_to_zarr_zip(
                        {store_path: ds},
                        p,
                        zipstore_args={"mode": store_mode},
                        zarr_version=2,
                    )
                elif p.suffix == ".nc":
                    ds.to_netcdf(path=str(p), mode=store_mode, group=store_path)
        except:
            # logging.info("Unable to write NetCDF file.")
            funcname = cast(types.FrameType, inspect.currentframe()).f_code.co_name
            logging.exception(
                "`%s`: unable to write to file in `%s`.", funcname, str(p)
            )

    def store_pathname(dx, datastore_df, time_bounds, fn_series):
        globloc_str = (
            dx.attrs["network"]
            if "network" in dx.attrs
            else datastore_df[("pattern", "global_location")]
            .str.split(".")
            .apply(lambda x: ".".join(x[0:-1]))
            .unique()
            .tolist()[0]
        )

        nc_attr_dict = {
            "production_level": dx.attrs["production_level"],
            "global_location": globloc_str,
            "station_id": "",
            "sensor_id": "",
        }

        output_fn = get_output_files(
            dx,
            default_subset={
                "path": os.path.join(output_path_base, output_path),
                "file": output_file,
            },
            custom_subset=reduce(
                lambda a, b: {**a, **b},
                [
                    {n[1]: v for n, v in fn_series.items() if n[0] not in ["location"]},
                    nc_attr_dict,
                    {"time_bounds": get_time_bounds(time_bounds)},
                ],
            ),
        )

        return output_fn

    def store_copy(p, output_fn):
        if p.is_file():
            try:
                logging.info("Source     : `%s`", str(p))
                output_p = Path(output_fn).with_suffix("".join(p.suffixes))
                logging.info("Destination: `%s`", str(output_p))
                output_p.parent.mkdir(parents=True, exist_ok=True)
                shutil.copy(
                    p,
                    output_p,
                )

                # double check output strucure
                if p.suffixes == [".zarr", ".zip"]:
                    logging.info("Summary report")
                    store, store_args = datasets_fsspec_args(str(p))
                    dz = zarr.open(store, mode="r")
                    print(dz.tree())  # instead of display()

                return output_p
            except:
                # logging.info("Unable to write NetCDF file.")
                funcname = cast(types.FrameType, inspect.currentframe()).f_code.co_name
                logging.exception(
                    "`%s`: unable to write to file in `%s`.", funcname, str(p)
                )

    # parameters
    cf_attr_dict = {"production_level": "L1"}

    # unit converstion meta data for each file group
    dx_keys = []
    dx = []
    p_list = []
    datastore_gb = datastore_df.groupby(
        [
            ("encoded", "station_id"),
            ("decoded", "system_group"),
        ]
    )

    # store `time` concatenated results in (temporary) files
    if nc_temp:
        store_path = nc_temp.name
        store_file = "{}.{}".format(
            uuid.uuid4().hex, input_subset["extension"]
        )  # nc or zarr.zip
        store_temp = Path(*[store_path, store_file])
        store_mode = "w"

        logging.info("Temporary file `%s`", store_file)

        nc_attrs = []

        for datastore_id, datastore in tqdm(datastore_gb, desc="Groupby loop"):
            file_dx = []
            file_attrs = []
            file_key = datastore_id

            logging.info("Group `%s` Count `%s`", datastore_id, str(datastore.shape[0]))

            for rowid, fn_series in tqdm(
                datastore.iterrows(), desc="Datastore loop", total=len(datastore)
            ):
                # file paths
                fn = Path(
                    *fn_series[
                        [
                            ("location", "path_base"),
                            ("location", "path"),
                            ("location", "file"),
                        ]
                    ].tolist()
                )

                try:
                    store, store_args = datasets_fsspec_args(fn)
                    ds = xr.open_dataset(store, **store_args)
                    file_dx.append(ds)
                    file_attrs.append(deepcopy(ds.attrs))
                except:
                    logging.info("File could not be opened by `xarray`: '%s'", fn)

            # validate units before merge (todo)

            # combine, merge, store results in cache
            try:
                ds = xr.merge(
                    [preprocess_merge(d) for d in file_dx],
                    combine_attrs="drop_conflicts",
                )
                ds = postprocess_merge(ds)

            except:
                continue

            # time coordinate needs to sorted, unique
            try:
                _, time_index = np.unique(ds["time"], return_index=True)
                ds = ds.isel(time=time_index)
                ds = ds.sortby("time")
                ds.time.encoding.update(**enc_conf["time"])
            except:
                logging.info("Skipping `concat` for group `%s`", str(datastore_id))
                continue

            # trim `time` to query bounds
            ds = ds.sel(time=slice(query_range[0], query_range[1]))

            # chunks
            # ds = ds.chunk({"time": 3600, "station": 1, "system": 1})

            # update attributes
            gattrs_list = file_attrs
            gattrs_dict = metadb_combine_globalattrs(gattrs_list)
            ds.attrs = gattrs_dict
            nc_attrs.append(deepcopy(gattrs_dict))

            # collect coordinates
            dx_keys.append(file_key)
            dx_dict[file_key] = ds.coords.to_dataset()

            # pre-export cleanup
            ds = preprocess_attrs(ds)
            ds = preprocess_merge(ds)
            ds = postprocess_merge(ds)
            # ds = ds.transpose(*[n for n in dim_list if n in list(ds.dims)])
            ds = ds.transpose(
                "time",
                "station",
                "system",
                "sensor",
                "channel",
                "cell",
                "attributes",
                missing_dims="ignore",
            )
            ds = ds.chunk()
            ds = reset_coords(ds, sortby=dim_list)
            ds = ds.drop_encoding()

            # export to cache
            store_root = file_key[0]
            store_export(
                ds,
                store_temp,
                store_path=store_root,
                store_mode=store_mode,
            )

            # update mode for subequent loop items
            store_mode = "a"

        # merge coorindates
        dx = xr.merge(
            [preprocess_merge(dx_dict[k]) for k in dx_keys if k in list(dx_dict)],
            compat="override",
        )

        # merge attributes
        gattrs_list = nc_attrs
        gattrs_dict = metadb_combine_globalattrs(gattrs_list)
        dx.attrs = deepcopy(gattrs_dict)

        # time
        try:
            _, time_index = np.unique(dx["time"], return_index=True)
            dx = dx.isel(time=time_index)
            dx = dx.sortby("time")
            dx.time.encoding.update(**enc_conf["time"])
        except:
            logging.exception("Unable to sort `time`.")

        # time string helper variable
        time_bounds = pd.Index(
            [
                (
                    query_range[0]
                    if np.any((dx["time"] < query_range[0]).any().values)
                    else pd.to_datetime(dx["time"].min().values)
                ),
                (
                    query_range[1]
                    if np.any((dx["time"] > query_range[1]).any().values)
                    or query_latest
                    else pd.to_datetime(dx["time"].max().values)
                ),
            ]
        )
        # clip `time` ...needed?
        dx = dx.sel(time=slice(time_bounds[0], time_bounds[1]))

        # prepare export
        dx = preprocess_merge(dx)
        dx = postprocess_merge(dx)
        dx = dx.drop_encoding()

        # export merged result to cache
        store_root = datastore_df[("decoded", "location_city")].unique().tolist()[0]
        store_export(
            dx,
            store_temp,
            store_path=store_root,
            store_mode=store_mode,
        )

        # final destination output, copy from cache
        logging.info("Copy `combine` store to final output destination.")
        store_output = store_pathname(dx, datastore_df, time_bounds, fn_series)
        store_output = store_copy(store_temp, store_output)

        return store_temp, store_output

In [None]:
def datastore_cube(input_file_list, cache_file_list=[], output_file_list=[]):

    def get_store_temporary():
        if nc_temp:
            store_path = nc_temp.name
        else:
            store_path = Path("/tmp/")

        store_file = "{}.{}".format(
            uuid.uuid4().hex, input_subset["extension"]
        )  # nc or zarr.zip
        store_temp = Path(*[store_path, store_file])

        logging.info("Temporary file `%s`", store_temp)
        return store_temp

    def get_store_groups(fn, data_vars=["ta"], groups=None):
        """read and subset helper function"""
        store, store_args = datasets_fsspec_args(str(fn))

        store_groups = [
            n for n in list(zarr.open(store, mode="r").group_keys()) if len(n) != 2
        ]

        if groups and isinstance(groups, list):
            store_groups = [n for n in store_groups if n in groups]

        if not store_groups:
            store_groups = [None]

        ds_dict = {
            n: xr.open_dataset(store, **store_args, group=n, mode="r")
            for n in store_groups
        }

        if data_vars == []:
            data_vars = list(
                dict.fromkeys(
                    [
                        item
                        for sublist in [list(ds.data_vars) for ds in ds_dict.values()]
                        for item in sublist
                    ]
                )
            )

        if data_vars:
            ds_dict = {
                n: d[[v for v in data_vars if v in list(d.data_vars)]]
                for n, d in ds_dict.items()
                if any([v in list(d.data_vars) for v in data_vars])
            }
        else:
            ds_dict = {n: d.coords.to_dataset() for n, d in ds_dict.items()}

        return ds_dict, data_vars, store_groups

    def cache_store_groups(dx, cache_path="tmp/tmp_dev/"):

        import uuid

        store_list = []
        mode = "w"
        for gp, ds in dx.items():
            if isinstance(ds, xr.Dataset):
                ds = preprocess_store(ds)
                ds = ds.transpose(
                    "time",
                    "station",
                    "system",
                    "sensor",
                    "channel",
                    "cell",
                    "attributes",
                    missing_dims="ignore",
                )
                ds = ds.chunk({"time": "auto"})
                ds = ds.drop_encoding()

                # export
                cache_file = os.path.join(cache_path, f"{gp}_{uuid.uuid4().hex}.zarr")
                store = Path(cache_file)
                store_args = dict(group=gp, mode=mode)
                ds.to_zarr(store, **store_args)

                # file mode
                if Path(cache_file).is_dir():
                    mode = "a"

                # report
                store_list.append((store, gp))

        return store_list

    def combine_store_groups(ifn, data_vars=[], cache=None):

        # lazyload
        dc_dict, _, _ = get_store_groups(ifn, None, None)
        ds_dict, data_vars, store_groups = get_store_groups(ifn, data_vars, None)

        dc_list = [
            v for k, v in dc_dict.items() if isinstance(v, xr.Dataset)
        ]  # only coordinates
        ds_list = [
            v.close() for k, v in ds_dict.items() if isinstance(v, xr.Dataset)
        ]  # if variables

        # coordindates
        dc = merge_reduce([preprocess_merge(d.copy()) for d in dc_list])

        # global attributes

        # data_vars
        dx = {}
        for dv in tqdm(data_vars, desc="data_vars"):
            dx[dv] = None
            for sg in tqdm(store_groups, desc="store groups"):
                if not isinstance(dx[dv], xr.Dataset):
                    # _, dx = xr.align(preprocess_merge(dc),preprocess_merge(ds),join='left')
                    dx[dv] = dc.copy()

                ds_dict, _, _ = get_store_groups(ifn, [dv], [sg])
                ds_list = [
                    v for k, v in ds_dict.items() if v and isinstance(v, xr.Dataset)
                ]
                for ds in ds_list:
                    dx[dv] = merge_reduce([dx[dv], preprocess_merge(ds)])
                    ds.close()

            if cache:
                dx[dv] = cache_store_groups({dv: dx[dv]}, cache_path=cache)
                ds_list = [d.close() for d in ds_list]

        return dx

    def merge_store_groups_cache(cache_file_list, idx=None, replace=False):
        """read and reduce helper function"""
        if not isinstance(cache_file_list, list):
            bool_single = True
            cache_file_list = [cache_file_list]
        else:
            bool_single = False

        # list cache
        if not idx:
            idx = {}
            for cfn in cache_file_list:
                for store in sorted(glob.glob(cfn + "/*.zarr")):
                    if not cfn in idx:
                        idx[cfn] = {}
                    gp = "_".join(Path(store).name.split("_")[:-1])
                    idx[cfn][gp] = [(store, gp)]

        # merge, write
        ds_list = []
        for cfn in idx.keys():

            dx = []
            for vn, pn in idx[cfn].items():
                for vfn, gp in pn:
                    store, store_args = datasets_fsspec_args(str(vfn))

                    ds = xr.open_dataset(store, **store_args, group=gp, mode="r")
                    # ds = ds.drop_encoding()
                    ds = ds.chunk()
                    dx.append(ds)

                    # print(f"{vn} {ds['system_id'].values}")

            # try:
            #    ds = merge_reduce([preprocess_merge(ds) for ds in dx])
            # except:
            #    return ds
            ds = merge_reduce([preprocess_merge(ds) for ds in dx])

            if isinstance(ds, xr.Dataset):
                print("merge")
                ds = postprocess_merge(ds)
                print("sort")
                ds = ds.sortby("station_id")
                print("encoding")
                ds = ds.drop_encoding()
                print("chunk")
                ds = ds.chunk({"time": 2000000, "station": 100, "system": 1})
                print("transpose")
                ds = ds.transpose(
                    "time",
                    "station",
                    "system",
                    "sensor",
                    "channel",
                    "cell",
                    "attributes",
                    missing_dims="ignore",
                )
                print(ds.chunksizes)
                print("final")
                ds_list.append(ds)

        if bool_single:
            return ds_list[0]
        else:
            return ds_list

    def get_store_temporary():
        if nc_temp:
            store_path = nc_temp.name
        else:
            store_path = Path("/tmp/")

        store_file = "{}.{}".format(
            uuid.uuid4().hex, input_subset["extension"]
        )  # nc or zarr.zip
        store_temp = Path(*[store_path, store_file])

        logging.info("Temporary file `%s`", store_temp)
        return store_temp

    idx = {}

    if not isinstance(cache_file_list, list):
        store_input_list = [cache_file_list]

    cache_path = nc_temp.name
    cache_file_list = [
        str(PurePath(cache_path, strip_extension(Path(fn).stem)))
        for fn in input_file_list
    ]

    if not output_file_list:
        output_path = "data/L1/"
        output_file_list = [
            str(PurePath(output_path, Path(fn).name)) for fn in input_file_list
        ]
        output_file_list = [
            get_store_temporary() if Path(fn).is_file() else fn
            for fn in output_file_list
        ]
        # output_file_list = [get_store_temporary() for n in cache_file_list]

    # prepare cache
    for ifn, cfn in zip(input_file_list, cache_file_list):
        idx[cfn] = combine_store_groups(ifn, data_vars=[], cache=cfn)

    store_file_list = []
    for ifn, cfn, ofn in zip(input_file_list, cache_file_list, output_file_list):
        ds = merge_store_groups_cache(cfn)

        store_root = list(set([n[0:2] for n in ds.station_id.values.tolist()]))[0]
        #shutil.copy(ifn, ofn)
        datasets_to_zarr_zip(
            {store_root: ds},
            ofn,
            zipstore_args={"mode": "w"},
            mode="w",
        )
        store_file_list.append(ofn)

    return tuple([store_file_list, output_file_list])

## Configuration

## Static Configuration

In [None]:
# Version
version = {
    "id": "v1.0.0",
    "time": "2023-03-29",
}  # first version.
version = {
    "id": "v1.0.1",
    "time": "2023-09-27",
}  # updated version.
version = {
    "id": "v1.0.2",
    "time": "2025-02-21",
}  # updated version.


# Configuration file for input / output files
ioconfig_name = "datasets_conjoin"
try:
    ioconfig_file = "../conf/{}.toml".format(ioconfig_name)
    if not Path(ioconfig_file).exists():
        raise
except:
    ioconfig_file = "conf/{}.toml".format(ioconfig_name)

# ----- Papermill injection below this cell -----

In [None]:
# input/output config
ioconf = parse_config(ioconfig_file, ioconfig_name, version)

# validate config (to do)

In [None]:
"""
Note: the approach to set global helper variables should be revised. 
But was/is used in combination with papermill automation.
"""

# set global variables
query_from = ioconf["query"]["start"]
query_to = None if not "end" in ioconf["query"] else ioconf["query"]["end"]
query_period = ioconf["query"]["period"]
query_index = ioconf["query"]["system_index"]
query_latest = ioconf["query"]["latest"]
query_cache = ioconf["query"]["cache"]
query_dask = ioconf["query"]["dask"]
query_tasks = ioconf["query"]["tasks"]
query_settings = (
    {} if not "settings" in ioconf["query"] else ioconf["query"]["settings"]
)

input_path_base = ioconf["input"]["path_base"]
input_path = ioconf["input"]["path"]
input_file = ioconf["input"]["file"]
input_subset = ioconf["input"]["subset"]
cache_path_base = ioconf["cache"]["path_base"]
cache_path = ioconf["cache"]["path"]
cache_file = ioconf["cache"]["file"]
output_path_base = ioconf["output"]["path_base"]
output_path = ioconf["output"]["path"]
output_file = ioconf["output"]["file"]

log_path = ioconf["logging"]["path"]
log_file = ioconf["logging"]["file"]
log_format = ioconf["logging"]["format"]
log_filemode = (
    "a" if not "filemode" in ioconf["logging"] else ioconf["logging"]["filemode"]
)

gattrs = ioconf["gattrs"]

## Logging configuration

In [None]:
# create logger
import logging
import logging.handlers
from pprint import pformat

logging.basicConfig(
    encoding="utf-8",
    format=log_format,
    level=logging.INFO,
    # Declare handlers
    handlers=[
        logging.FileHandler(
            os.path.join(log_path, log_file).format(version_id=version["id"]),
            mode=log_filemode,
        ),
        # logging.StreamHandler(sys.stdout),
    ],
)

## Dynamic Configuration

In [None]:
# summarize
logging.info("`ioconf` file: %s", ioconfig_file)
logging.info(
    "`ioconf` dict:\n# start of item\n%s\n# end of item\n",
    pformat(ioconf, sort_dicts=False),
)

In [None]:
# reset structure
dim_list = ["time", "station", "system", "sensor", "channel", "cell", "attributes"]
enc_conf = {
    "time": {
        "units": "nanoseconds since 1970-01-01 00:00:00",  # " +0000"
        "calendar": "proleptic_gregorian",
    }
}
nc_list = []

In [None]:
if query_cache:
    # cache is stored in the temp
    temporary_path_base = "tmp/tmp_{}".format(uuid.uuid4().hex)
    Path(temporary_path_base).mkdir(parents=True, exist_ok=True)
    logging.info("Temporary folder created: `%s`", temporary_path_base)

    # cache
    temporary_cache = PurePath(temporary_path_base, "cache")
    dx_dict = dc.Cache(temporary_cache)
    dx_dict.clear()
    dx_dict.reset("size_limit", int(12e9))  # in bytes
    dx_dict.reset("cull_limit", 0)

    # tmp files
    temporary_tmp = PurePath(temporary_path_base, "temp")
    Path(temporary_tmp).mkdir(parents=True, exist_ok=True)
    nc_temp = tempfile.TemporaryDirectory(dir=temporary_tmp, suffix=None)
    temporary_path = nc_temp.name

    # path redirections
    logging.info("Temporary subfolder created (cache): `%s`", temporary_cache)
    logging.info("Temporary subfolder created (temp): `%s`", temporary_tmp)
    logging.info("Temporary subfolder created (temp/*): `%s`", temporary_path)

    def temp_cleanup(temporary_path_base, temporary_path, nc_temp, dx_dict):
        logging.info("Cleanup routines:")
        try:
            logging.info("Closing Cache.")
            logging.info(" Cache volume (current): '{}'".format(str(dx_dict.volume())))
            dx_dict.clear()
            logging.info(" Cache volume (cleared): '{}'".format(str(dx_dict.volume())))
            dx_dict.close()
            logging.info("Cache closed.")
        except:
            pass

        try:
            nc_temp.cleanup()
            logging.info("Temporary subfolder removed (temp/*): `%s`", temporary_path)
        finally:
            shutil.rmtree(temporary_path, ignore_errors=True)

        try:
            shutil.rmtree(temporary_path_base)
            logging.info("Temporary folder removed: `%s`", temporary_path_base)
        finally:
            shutil.rmtree(nc_temp.name, ignore_errors=True)

    # register cleanup upon exit
    import atexit

    atexit.register(temp_cleanup, temporary_path_base, temporary_path, nc_temp, dx_dict)

else:
    dx_dict = {}
    nc_temp = None

nc_list = []
nc_dict = {}

In [None]:
# time related helper variables
if not query_to and query_period:
    query_to = pd.to_datetime(query_from) + pd.tseries.frequencies.to_offset(
        query_period
    )
    if any(query_period.endswith(n) for n in ["M", "Y"]):
        query_to = query_to + pd.tseries.frequencies.to_offset("1D")
query_range = pd.to_datetime([query_from, query_to])

# MAIN

In [None]:
if __name__ == "__main__":
    if query_dask:
        from dask.distributed import Client, LocalCluster

        # Create cluster
        # cluster = LocalCluster(n_workers=1, processes=False, threads_per_worker=1, memory_limit="6GB")
        # client = Client(cluster)
        # client
        # the client has now been started
        # logging.info("Starting a new `dask` cluster and client.")

    else:
        from dask.distributed import Client

        #
        logging.info("Using an existing `dask` client, e.g., in jupyterlab.")
        client = Client("tcp://127.0.0.1:39743")
        client

        # dask `enabled` is required
        query_dask = True

    ## Convert L0 to L1 (vocabulary only)
    if "conform" in query_tasks:
        logging.info("Task `conform`")
        # input file patterns
        _, fn_list = get_input_files(input_subset, custom_subset=input_subset)

        # decode input files
        logging.info("Query source file database.")
        datastore_df = pd.concat(
            [pd.DataFrame(filedb_decode(fn)) for fn in set(n["file"] for n in fn_list)],
            axis=0,
        )

        # input file query
        logging.info("Subset source file database.")
        datastore_df = datastore_df.loc[
            (datastore_df[("decoded", "time_start")] >= query_range[0])
            & (datastore_df[("decoded", "time_end")] <= query_range[1]),
            :,
        ]

        if len(datastore_df) > 0:
            # lookup vocabulary
            logging.info("Query vocabulary database.")
            datastore_units = datastore_units_query(datastore_df)

            logging.info("Apply vocabulary translations.")
            p_list = datastore_conform(datastore_df, datastore_units.sort_index())
        else:
            logging.info(
                "No source files found in query subset of the source file database."
            )
        nc_dict["conform"] = p_list

    ## Concatenate DataSets in each path, along dimension `time`.
    if "concat" in query_tasks:
        logging.info("Task `concat`")

        # input file patterns
        default_subset = dict(
            path_base=cache_path_base,
            path=input_path,
            file=input_file,
        )
        _, fn_list = get_input_files(
            input_subset,
            default_subset=default_subset,
            custom_subset={**input_subset, **{"production_level": "L1"}},
        )

        # decode input files
        logging.info("Query source file database.")
        datastore_df = pd.concat(
            [pd.DataFrame(filedb_decode(fn)) for fn in set(n["file"] for n in fn_list)],
            axis=0,
        )

        # input file query
        logging.info("Subset source file database.")
        datastore_df = datastore_df.loc[
            (datastore_df[("decoded", "time_start")] >= query_range[0])
            & (datastore_df[("decoded", "time_end")] <= query_range[1]),
            :,
        ]

        if len(datastore_df) > 0:
            # lookup vocabulary
            logging.info("Concatenate grouped database.")
            p_list = datastore_concat(datastore_df)
        else:
            logging.info(
                "No source files found in query subset of the source file database."
            )
        nc_dict["concat"] = p_list

    ### Merge concatenated DataSets along Index `station` and `system`.
    if "combine" in query_tasks:
        logging.info("Task `combine`")
        nc_dict["combine"] = list(
            # set(glob.glob(pathname="tmp/tmp*/**/*.nc",recursive=True)) + glob.glob("../data/tmp/*.nc"))
            # set(glob.glob(pathname="{}/**/*.(nc|zarr.zip)".format(nc_temp.name), recursive=True))
            set(
                str(p)
                for p in Path(nc_temp.name).glob("**/*.*")
                if p.suffix in {".zip", ".nc"}
            )
        )

        datastore_df = pd.concat(
            [
                pd.DataFrame(filedb_decode(fn))
                for fn in set(str(n) for n in nc_dict["combine"])
            ],
            axis=0,
        )
        datastore_df = datastore_df.drop_duplicates().reset_index(drop=True)

        store_temp, store_output = datastore_combine(datastore_df)

    ### Merge DataSets along Index `station` and `system` and `time`, and variables.
    if "conflate" in query_tasks:
        logging.info("Task `conflate`")
        if "store_output" in locals():
            nc_dict["conflate"] = [Path(store_output)]
        elif "store_temp" in locals():
            nc_dict["conflate"] = [Path(store_temp)]
        else:
            nc_dict["conflate"] = list(
                set(
                    str(p)
                    for p in Path(nc_temp.name).glob("*.*")
                    if p.suffix in {".zip"}
                )
            )
        store_file_list, output_file_list = datastore_cube(nc_dict["conflate"])

    ### Cleanup
    if query_cache:
        # temp_cleanup()
        # no cleanup action here, look for the `atexit` instantation.
        pass

# Temp files removed on (clean) exit in python (not jupyter)

# DEV

In [None]:
def datastore_combine_nc_to_zarr(fn):

    p = Path(fn)
    # root
    with xr.open_dataset(p) as ds:
        t = p.with_suffix(".zarr")
        ds.to_zarr(t)

    # groups
    groups = list(nc.Dataset(p).groups.keys())
    for group in groups:
        print(group)
        with xr.open_dataset(p, group=group) as dx:
            t = p.with_suffix(".zarr")
            dx.to_zarr(t, group=group)

In [None]:
def export_ds(nc_ds, nc_mode="w", group=None):
    # fix
    nc_ds = nc_ds.chunk({"station": 1, "system": 1, "time": 3600})

    # reconstruct the dim and cooridnate order accoring to dimensions.
    nc_ds = preprocess_nc(nc_ds)
    nc_ds = nc_ds.transpose(*[n for n in dim_list if n in list(nc_ds.dims)])
    nc_ds = reset_coords(nc_ds, sortby=dim_list)

    nc_dict = dattrs(nc_ds)
    nc_path = "../data/L0/"
    nc_file = "urbisphere_set({global_location},{system_group},{time_bounds})_version({version}).{extension}"
    print("Output file: '{}'".format(os.path.join(nc_path, nc_file).format(**nc_dict)))

    print(nc_ds)

    nc_ds.to_netcdf(os.path.join(nc_path, nc_file).format(**nc_dict), mode=nc_mode)
    # nc_ds.to_zarr(os.path.join(nc_path, nc_file).format(**nc_dict))

In [None]:
def datasets_conjoin_cube_qc_1():
    # input file patterns
    default_subset = dict(
        path_base=cache_path_base,
        path=input_path,
        file=input_file,
    )
    _, fn_list = get_input_files(
        input_subset,
        default_subset=default_subset,
        custom_subset={**input_subset, **{"production_level": "L1"}},
    )

    # decode input files
    logging.info("Query source file database.")
    datastore_df = pd.concat(
        [pd.DataFrame(filedb_decode(fn)) for fn in set(n["file"] for n in fn_list)],
        axis=0,
    )

    # input file query
    logging.info("Subset source file database.")
    datastore_df = datastore_df.loc[
        (datastore_df[("decoded", "time_start")] >= query_range[0])
        & (datastore_df[("decoded", "time_end")] <= query_range[1]),
        :,
    ]

    subset = []
    for idg, group in datastore_df.groupby([("pattern", "global_location")]):
        print(idg)
        for idx, item in tqdm(group.iterrows()):
            fn = os.path.join(
                *(
                    item[
                        [
                            ("location", "path_base"),
                            ("location", "path"),
                            ("location", "file"),
                        ]
                    ]
                )
            )
            store, store_args = datasets_fsspec_args(fn)
            ds = xr.open_dataset(store, **store_args)
            if any(
                ds["system_id"] == item
                for item in [
                    "10000000",
                ]
            ):
                subset.append(fn)
            ds.close()
            pass

In [None]:
def datasets_conjoin_cube_qc_2(fn):

    # fn = "tmp/tmp_bc3811dcdd4e414cb3d8d2dd3488fa4f/temp/tmpftfjpo7t/f2b71cf1150942b0b3e09a3be9cda500.zarr.zip"
    if Path(fn).suffix == ".zip":
        groups = list(zarr.open(fn).group_keys())
    elif Path(fn).suffix == ".nc":
        groups = list(nc.Dataset(fn).groups.keys())

    for g in groups:
        store, store_args = datasets_fsspec_args(fn)
        dx = {g: xr.open_dataset(store, group=g, **store_args)}
        print({k: v["system_id"].values.tolist() for k, v in dx.items()})
        dx[g].close()

In [None]:
def datasets_conjoin_cube_qc_3(fn, variable="ta"):
    import matplotlib.pyplot as plt

    mono_font = {"fontname": "monospace", "fontsize": 8}

    # data
    store, store_args = datasets_fsspec_args(str(fn))
    ds = xr.open_dataset(store, **store_args, group=None, mode="r")
    da = ds[variable].mean(dim="system").resample(time="h").mean()

    ## Figure, overview
    fig_title = str(da.time[0].values.astype("datetime64[Y]"))
    fig_network = ds.attrs["network"]
    fig_name = (
        f"tmp_datasets_conjoin_cube_qc_3_"
        + f"set({fig_network},AWS_{variable},{fig_title})_"
        + f"version({ pd.to_datetime('today').strftime("%Y%m%d")  }).png"
    )

    fig, ax = plt.subplots(figsize=(8, 7))
    da.plot(
        x="time",
        y="station_id",
        vmin=-5,
        vmax=35,
        ax=ax,  # or figsize=[8, 8],
        yincrease=False,
        ylim=[-0.5, da.sizes["station"] - 0.5],
        xlim=pd.to_datetime(
            [f"{fig_title}{n}" for n in ["-01-01 00:00:00", "-12-31 23:59:59"]]
        ),
        cbar_kwargs={"shrink": 0.5},
    )
    ax.set_yticks(ax.get_yticks())
    ax.set_yticklabels(ax.get_yticklabels(), **mono_font)
    ax.invert_yaxis()

    fig.savefig(fig_name, dpi="figure")
    # plt.close(fig)

In [None]:
def datasets_conjoin_cube_qc_4(fn, variable="ta"):
    import matplotlib.pyplot as plt

    mono_font = {"fontname": "monospace", "fontsize": 8}

    # data
    store, store_args = datasets_fsspec_args(str(fn))
    ds = xr.open_dataset(store, **store_args, group=None, mode="r")
    da = ds[variable].resample(time="h").mean()

    ## Figure, overview
    fig_title = str(da.time[0].values.astype("datetime64[Y]"))
    fig_network = ds.attrs["network"]
    fig_name = (
        f"tmp_datasets_conjoin_cube_qc_4_"
        + f"set({fig_network},AWS_{variable},{fig_title})_"
        + f"version({ pd.to_datetime('today').strftime("%Y%m%d")  }).png"
    )

    fig, ax = plt.subplots(figsize=(8, 7))
    da.plot(
        x="time",
        y="system_id",
        vmin=-5,
        vmax=35,
        ax=ax,  # or figsize=[8, 8],
        yincrease=False,
        ylim=[-0.5, da.sizes["system"] - 0.5],
        xlim=pd.to_datetime(
            [f"{fig_title}{n}" for n in ["-01-01 00:00:00", "-12-31 23:59:59"]]
        ),
        cbar_kwargs={"shrink": 0.5},
    )
    ax.set_yticks(ax.get_yticks())
    ax.set_yticklabels(ax.get_yticklabels(), **mono_font)
    ax.invert_yaxis()

    fig.savefig(fig_name, dpi="figure")
    # plt.close(fig)

In [None]:
try:
    for store_file in store_file_list:
        # datasets_conjoin_cube_qc_3(store_file)
        # datasets_conjoin_cube_qc_4(store_file)
        pass
except:
    pass