In [12]:
import os
import re
import requests
from urllib.parse import urljoin

# ------------------------------------------------------------
# 1) Proxy setup (isolated, reusable)
# ------------------------------------------------------------
def setup_dwd_proxy(proxy="http://ofsquid.dwd.de:8080"):
    os.environ["HTTP_PROXY"]  = proxy
    os.environ["HTTPS_PROXY"] = proxy
    os.environ["http_proxy"]  = proxy
    os.environ["https_proxy"] = proxy
    return {"http": proxy, "https": proxy}

proxies = setup_dwd_proxy()

In [22]:
# ============================================================
# DWD OpenData NWP – per-leaf sampling crawler (fast + complete)
# - visits all directories up to MAX_DEPTH
# - if a directory contains files: sample only N files, do NOT go deeper
# - optionally skips runtime dirs like 2025123100/
# - shows live progress (overwrites line)
# ============================================================

SAMPLE_FILES_PER_LEAF = 3
SKIP_RUNTIME_DIRS = True

def show_progress(msg: str):
    print(f"\r{msg:140}", end="", flush=True)

def is_runtime_dir(name: str) -> bool:
    n = name.rstrip("/")
    return n.isdigit() and len(n) in (8, 10, 12)

def crawl_sampled(url: str, max_depth=MAX_DEPTH):
    """
    Crawl directories (depth-first) and collect *sampled* files from every leaf directory
    (leaf := directory containing any files).

    Returns:
      dirs: list of visited directories
      files: list of sampled file URLs (up to SAMPLE_FILES_PER_LEAF per leaf dir)
    """
    seen_dirs = set()
    dirs = []
    files = []

    def _rec(u, depth):
        show_progress(f"Crawling: {u}")

        if depth > max_depth:
            return
        if not u.endswith("/"):
            u += "/"
        if u in seen_dirs:
            return

        seen_dirs.add(u)
        dirs.append(u)

        try:
            links = fetch_links(u)
        except Exception:
            return

        # Split into dirs/files
        local_files = [l for l in links if not is_dir(l)]
        local_dirs  = [l for l in links if is_dir(l)]

        # If files exist here, sample and STOP descending below this directory
        if local_files:
            for f in local_files[:SAMPLE_FILES_PER_LEAF]:
                files.append(urljoin(u, f))
            return

        # Otherwise recurse into subdirectories
        for d in local_dirs:
            if SKIP_RUNTIME_DIRS and is_runtime_dir(d):
                continue
            _rec(urljoin(u, d), depth + 1)

    _rec(url, 0)
    print("\rCrawling complete." + " " * 100)
    return dirs, files


def summarize_model(model: str):
    model_url = urljoin(BASE_ROOT, f"{model}/")
    dirs, files = crawl_sampled(model_url, max_depth=MAX_DEPTH)

    product_map = {}

    def parent_dir(file_url: str) -> str:
        return file_url.rsplit("/", 1)[0] + "/"

    for fu in files:
        fname = fu.rsplit("/", 1)[-1]
        fmt = file_format(fname)
        if fmt not in ("grib", "netcdf"):
            continue

        pdir = parent_dir(fu)

        if pdir not in product_map:
            product_map[pdir] = {
                "product_dir": pdir,
                "formats": set(),
                "sample_files": {"grib": [], "netcdf": []},
                "variables": set(),
                "templates": set(),
            }

        entry = product_map[pdir]
        entry["formats"].add(fmt)

        # keep up to SAMPLE_FILES_PER_LEAF per format per product dir
        if len(entry["sample_files"][fmt]) < SAMPLE_FILES_PER_LEAF:
            entry["sample_files"][fmt].append(fu)
            entry["templates"].add(make_template(fname))
            var = extract_variable_from_filename(fname)
            if var:
                entry["variables"].add(var)

    products = []
    for pdir, v in sorted(product_map.items()):
        products.append({
            "product_dir": pdir,
            "formats": sorted(v["formats"]),
            "variables": sorted(v["variables"]),
            "templates": sorted(v["templates"]),
            "sample_files": v["sample_files"],
        })

    return {
        "model": model,
        "base_url": model_url,
        "crawl": {
            "max_depth": MAX_DEPTH,
            "dirs_visited": len(dirs),
            "leaf_dirs_with_samples": len(product_map),
            "files_sampled_total": len(files),
            "sample_files_per_leaf": SAMPLE_FILES_PER_LEAF,
            "skip_runtime_dirs": SKIP_RUNTIME_DIRS,
        },
        "products": products,
        "generated_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S UTC"),
    }




In [23]:
# ---------- run ----------
inventory = {"root": BASE_ROOT, "models": {}}
for m in MODELS:
    print(f"\n=== Scanning {m} ===")
    inventory["models"][m] = summarize_model(m)

# ---------- save machine-readable JSON ----------
json_path = "dwd_nwp_inventory.json"
with open(json_path, "w", encoding="utf-8") as f:
    json.dump(inventory, f, indent=2, ensure_ascii=False)

print(f"\n✅ Saved machine-readable inventory: {json_path}")


=== Scanning icon ===
Crawling complete.                                                                                                                          

=== Scanning icon-eu ===
Crawling complete.                                                                                                                          

=== Scanning icon-d2 ===
Crawling complete.                                                                                                                          

✅ Saved machine-readable inventory: dwd_nwp_inventory.json

--- Human-readable summary (Markdown) ---

| model   | product_dir                          | formats   |   #vars | vars (sample)                              | template (sample)                                                                               |
|:--------|:-------------------------------------|:----------|--------:|:-------------------------------------------|:----------------------------------------------------------------

In [24]:
def classify_product(product_dir: str):
    p = product_dir.lower()

    if "single-level" in p or "/00/" in p:
        level = "single-level"
    elif "model-level" in p:
        level = "model-level"
    elif "pressure-level" in p:
        level = "pressure-level"
    else:
        level = "unknown"

    if "icosa" in p:
        grid = "icosahedral"
    elif "lat-lon" in p:
        grid = "lat-lon"
    else:
        grid = "unknown"

    return level, grid

In [26]:
# ---------- build human-readable markdown table ----------
rows = []

for m in MODELS:
    md = inventory["models"][m]
    for p in md["products"]:
        vars_ = p.get("variables", [])
        templs = p.get("templates", [])

        rows.append({
            "model": m,
            "product_dir": p["product_dir"].replace(BASE_ROOT, ""),
            "formats": ", ".join(p.get("formats", [])),
            "#vars": len(vars_),
            "vars (sample)": ", ".join(vars_[:10]) + (" …" if len(vars_) > 10 else ""),
            "template (sample)": templs[0] if templs else "",
        })

df = pd.DataFrame(rows)

# Sort for readability
if not df.empty:
    df = df.sort_values(["model", "product_dir"]).reset_index(drop=True)

print("\n--- Human-readable summary (Markdown) ---\n")
print(df.to_markdown(index=False))


--- Human-readable summary (Markdown) ---

| model   | product_dir                          | formats   |   #vars | vars (sample)                              | template (sample)                                                                               |
|:--------|:-------------------------------------|:----------|--------:|:-------------------------------------------|:------------------------------------------------------------------------------------------------|
| icon    | icon/grib/00/alb_rad/                | grib      |       1 | SINGLE-LEVEL_ALB_RAD                       | icon_global_icosahedral_single-level_{ref_time}_{lead}_ALB_RAD.grib2.bz2                        |
| icon    | icon/grib/00/alb_seaice/             | grib      |       1 | TIME-INVARIANT_ALB_SEAICE                  | icon_global_icosahedral_time-invariant_{ref_time}_ALB_SEAICE.grib2.bz2                          |
| icon    | icon/grib/00/alhfl_s/                | grib      |       1 | SINGLE-LEVEL_ALHFL_

In [28]:
# ============================================================
# Condensation helpers for DWD NWP inventory
# ============================================================

import re
from collections import defaultdict


def canonical_variable(var: str) -> str:
    """
    Strip structural prefixes to get canonical variable names.
    """
    v = var.upper()
    for prefix in (
        "ICOSAHEDRAL_",
        "SINGLE-LEVEL_",
        "MODEL-LEVEL_",
        "PRESSURE-LEVEL_",
        "SOIL-LEVEL_",
        "TIME-INVARIANT_",
    ):
        if v.startswith(prefix):
            v = v[len(prefix):]
    return v


def infer_level(var: str) -> str:
    v = var.upper()
    if "MODEL-LEVEL" in v:
        return "model-level"
    if "PRESSURE-LEVEL" in v:
        return "pressure-level"
    if "SOIL-LEVEL" in v:
        return "soil-level"
    if "TIME-INVARIANT" in v:
        return "time-invariant"
    return "single-level"


def infer_grid(template: str) -> str:
    t = template.lower()
    if "regular-lat-lon" in t:
        return "regular-lat-lon"
    if "icosahedral" in t:
        return "icosahedral"
    return "unknown"


def infer_domain(model: str, template: str) -> str:
    t = template.lower()
    if "germany" in t or model == "icon-d2":
        return "germany"
    if "europe" in t or model == "icon-eu":
        return "europe"
    return "global"


def infer_ref_time_pattern(template: str) -> str:
    """
    Reference (cycle) time is encoded in the filename as YYYYMMDDHH.
    """
    return "YYYYMMDDHH" if "{ref_time}" in template else "unknown"


def infer_lead_time_pattern(template: str) -> str:
    """
    Forecast lead time is encoded in the filename as forecast hour.
    """
    return "forecast_hour" if "{lead}" in template else "unknown"


def condense_inventory(inventory: dict) -> list[dict]:
    """
    Condense product-level inventory into one row per (model, variable)
    with semantic availability metadata.
    """
    rows = []

    for model, mdata in inventory["models"].items():
        acc = defaultdict(lambda: {
            "model": model,
            "variable": None,
            "levels": set(),
            "domains": set(),
            "grids": set(),
            "formats": set(),
            "ref_time_pattern": set(),
            "lead_time_pattern": set(),
        })

        for p in mdata["products"]:
            for var in p["variables"]:
                cvar = canonical_variable(var)
                entry = acc[cvar]

                entry["variable"] = cvar
                entry["levels"].add(infer_level(var))
                entry["formats"].update(p["formats"])

                for templ in p["templates"]:
                    entry["grids"].add(infer_grid(templ))
                    entry["domains"].add(infer_domain(model, templ))
                    entry["ref_time_pattern"].add(
                        infer_ref_time_pattern(templ)
                    )
                    entry["lead_time_pattern"].add(
                        infer_lead_time_pattern(templ)
                    )

        for v in acc.values():
            rows.append({
                "model": v["model"],
                "variable": v["variable"],
                "levels": ", ".join(sorted(v["levels"])),
                "domain": ", ".join(sorted(v["domains"])),
                "grid": ", ".join(sorted(v["grids"])),
                "formats": ", ".join(sorted(v["formats"])),
                "reference_time": ", ".join(sorted(v["ref_time_pattern"])),
                "forecast_lead_time": ", ".join(sorted(v["lead_time_pattern"])),
            })

    return rows


In [29]:
# ============================================================
# Execute condensation and display result
# ============================================================

import json
import pandas as pd

# ---------- load inventory ----------
with open("dwd_nwp_inventory.json", "r", encoding="utf-8") as f:
    inventory = json.load(f)

# ---------- condense ----------
rows = condense_inventory(inventory)

df_condensed = (
    pd.DataFrame(rows)
    .sort_values(["model", "variable"])
    .reset_index(drop=True)
)

print("\n--- Condensed variable availability (Markdown) ---\n")
print(df_condensed.to_markdown(index=False))


--- Condensed variable availability (Markdown) ---

| model   | variable                               | levels                   | domain   | grid                         | formats   | reference_time   | forecast_lead_time   |
|:--------|:---------------------------------------|:-------------------------|:---------|:-----------------------------|:----------|:-----------------|:---------------------|
| icon    | ALB_RAD                                | single-level             | global   | icosahedral                  | grib      | YYYYMMDDHH       | forecast_hour        |
| icon    | ALB_SEAICE                             | time-invariant           | global   | icosahedral                  | grib      | YYYYMMDDHH       | unknown              |
| icon    | ALHFL_S                                | single-level             | global   | icosahedral                  | grib      | YYYYMMDDHH       | forecast_hour        |
| icon    | ASHFL_S                                | single-level  