Generate a dcb master dataframe (with Daily dcb and/or Monthly code P1P2 and P1C1)

In [None]:
## Build a master satellite DCB table (C2W–C1C) by combining daily SINEX-BIAS files and monthly CODE DCB products.
"""
Output: CSV with one row per (date, PRN) containing:
  date, year, doy, prn, C2W-C1C_sat_m, source,
  daily_file, monthly_p1c1, monthly_p1p2

Paths are configured via the CONFIG section below. Adapt them to your layout.
"""

from __future__ import annotations

from pathlib import Path
from datetime import date, timedelta
import re
import math
import pandas as pd

# ----------------------------------------------------------------------
# CONFIG (EDIT FOR YOUR SETUP)
# ----------------------------------------------------------------------

# Base project directory (by default: folder containing this script)
PROJECT_ROOT = Path(__file__).resolve().parent

# Directory with daily SINEX-BIAS / DCB files (e.g., CAS0MGXRAP_YYYYDOY....BIA/BSX)
DAILY_DIR = PROJECT_ROOT / "data" / "dcb_daily"

# Directory with monthly CODE DCB files (e.g., P1C1YYMM.DCB, P1P2YYMM_ALL.DCB)
MONTHLY_DIR = PROJECT_ROOT / "data" / "dcb_monthly"

# Output CSV
OUT_CSV = PROJECT_ROOT / "outputs" / "all_dcb_master.csv"

# Date range (inclusive)
START = date(2015, 10, 1)
END   = date(2025, 9, 26)

# Preference order for SINEX rows (DSB vs DCB)
PREFER_ORDER = ("DSB", "DCB")

# Import the DCB parser module (make sure this file is in the same repo / PYTHONPATH)
# NOTE: rename your parser file to a valid module name, e.g.:
#   latest_bsx_dsb_dcb_sat_c2w_c1c_parser_corr.py
import latest_bsx_dsb_dcb_sat_c2w_c1c_parser_corr as bsx_mod  # type: ignore

# ----------------------------------------------------------------------
# HELPER FUNCTIONS
# ----------------------------------------------------------------------

def yymm_from_ym(y: int, m: int) -> str:
    """Return YYMM string from year, month."""
    return f"{y % 100:02d}{m:02d}"


def monthly_paths(y: int, m: int) -> tuple[Path | None, Path | None]:
    """
    Return candidate monthly CODE paths for P1C1 and P1P2.

    Examples:
      P1C1YYMM.DCB
      P1P2YYMM_ALL.DCB
    """
    yymm = yymm_from_ym(y, m)
    p1c1 = MONTHLY_DIR / f"P1C1{yymm}.DCB"
    p1p2 = MONTHLY_DIR / f"P1P2{yymm}_ALL.DCB"
    return (p1c1 if p1c1.exists() else None,
            p1p2 if p1p2.exists() else None)


def cache_monthly(cache: dict, y: int, m: int):
    """
    Load and cache monthly P1C1 / P1P2 DCB tables for a given (year, month).

    Returns (month_p1c1, month_p1p2, p1c1_path, p1p2_path) where
    month_p1c1 and month_p1p2 are dict-like {prn: value_in_meters}.
    """
    key = (y, m)
    if key in cache:
        return cache[key]

    p1c1_path, p1p2_path = monthly_paths(y, m)
    m1 = bsx_mod.parse_code_monthly_table(str(p1c1_path)) if p1c1_path else None
    m2 = bsx_mod.parse_code_monthly_table(str(p1p2_path)) if p1p2_path else None

    cache[key] = (m1, m2, p1c1_path, p1p2_path)
    return cache[key]


def fn_candidates(y: int, doy: int) -> list[str]:
    """
    Return candidate daily filename stems for given (year, day-of-year).
    Adjust this if you use other IAACs or naming conventions.
    """
    return [
        f"CAS0OPSRAP_{y}{doy:03d}0000_01D_01D_DCB.BIA",
        f"CAS0MGXRAP_{y}{doy:03d}0000_01D_01D_DCB.BSX",
        f"GFZ0OPSRAP_{y}{doy:03d}0000_01D_01D_DCB.BIA",
    ]


def pick_daily_file(y: int, doy: int) -> Path | None:
    """
    Look for a daily file (.BIA/.BSX or .gz) in DAILY_DIR matching the IAAC patterns.
    Returns a Path or None if nothing is found.
    """
    for stem in fn_candidates(y, doy):
        p = DAILY_DIR / stem
        if p.exists():
            return p
        gz = p.with_suffix(p.suffix + ".gz")
        if gz.exists():
            return gz
    return None


def build_from_monthlies(month_p1c1: dict | None,
                         month_p1p2: dict | None) -> list[tuple[str, float, str]]:
    """
    Build a list of (prn, value_m, source) from monthly CODE products.

    Uses the relation:
      C2W–C1C = (P1–C1) – (P1–P2)
    when both monthly tables are available.
    """
    rows: list[tuple[str, float, str]] = []

    if not (month_p1c1 and month_p1p2):
        return rows

    common = set(month_p1c1.keys()) & set(month_p1p2.keys())
    for prn in sorted(common):
        val = month_p1c1[prn] - month_p1p2[prn]  # meters
        rows.append((prn, float(val), "derived:monthly(P1-C1)-(P1-P2)"))

    return rows


# ----------------------------------------------------------------------
# MAIN LOGIC
# ----------------------------------------------------------------------

def main() -> None:
    records: list[dict] = []
    monthly_cache: dict = {}

    cur = START
    while cur <= END:
        y = cur.year
        doy = int(cur.strftime("%j"))
        m = cur.month

        daily_path = pick_daily_file(y, doy)

        # Load monthly DCBs for this year/month (used if daily is missing or incomplete)
        month_p1c1, month_p1p2, p1c1_path, p1p2_path = cache_monthly(monthly_cache, y, m)

        if daily_path:
            # 1) Try daily SINEX-based table, possibly backed by monthlies
            try:
                rows = bsx_mod.parse_sinex_bias(str(daily_path))
            except Exception as exc:  # noqa: F841
                rows = []

            tbl = bsx_mod.build_table(
                rows,
                prefer=PREFER_ORDER,
                monthly_p1c1=month_p1c1,
                monthly_p1p2=None,  # we only use P1C1 here; P1P2 goes through build_from_monthlies if needed
            )

            if not tbl:
                # Fallback: no usable SINEX rows, derive from monthlies only (if available)
                tbl = build_from_monthlies(month_p1c1, month_p1p2)

            for prn, val_m, src in tbl:
                records.append(
                    {
                        "date": cur.isoformat(),
                        "year": y,
                        "doy": doy,
                        "prn": prn,
                        "C2W-C1C_sat_m": val_m,
                        "source": src,
                        "daily_file": daily_path.name if daily_path else "",
                        "monthly_p1c1": p1c1_path.name if p1c1_path else "",
                        "monthly_p1p2": p1p2_path.name if p1p2_path else "",
                    }
                )
        else:
            # No daily file: build from monthlies if possible
            tbl = build_from_monthlies(month_p1c1, month_p1p2)
            for prn, val_m, src in tbl:
                records.append(
                    {
                        "date": cur.isoformat(),
                        "year": y,
                        "doy": doy,
                        "prn": prn,
                        "C2W-C1C_sat_m": val_m,
                        "source": src,
                        "daily_file": "",
                        "monthly_p1c1": p1c1_path.name if p1c1_path else "",
                        "monthly_p1p2": p1p2_path.name if p1p2_path else "",
                    }
                )

        cur += timedelta(days=1)

    # Build DataFrame and export
    df = pd.DataFrame.from_records(
        records,
        columns=[
            "date",
            "year",
            "doy",
            "prn",
            "C2W-C1C_sat_m",
            "source",
            "daily_file",
            "monthly_p1c1",
            "monthly_p1p2",
        ],
    )

    OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(OUT_CSV, index=False)
    print(f"Rows: {len(df)} | written: {OUT_CSV}")

    # Quick summary by source
    print("\nCounts by source:")
    print(df["source"].value_counts(dropna=False))


if __name__ == "__main__":
    main()

Generate a navigation master dataframe

In [None]:
"""
Mini-batch RINEX-2 Navigation --> single DataFrame (and optional CSV).

- Scans a directory for RINEX-2 navigation files (*.??n / *.n, case-insensitive)
- Uses the parser in nav_rinex2_to_df.py
- Outputs a master DataFrame with one row per (PRN, toc)
- Optionally writes the result to a CSV file

Adapt the CONFIG section to match your local layout.
"""

from __future__ import annotations

from pathlib import Path
import pandas as pd

# ----------------------------------------------------------------------
# CONFIG (EDIT FOR YOUR SETUP)
# ----------------------------------------------------------------------

# Base project directory (defaults to folder containing this script)
PROJECT_ROOT = Path(__file__).resolve().parent

# Directory containing RINEX-2 navigation files (*.??n / *.n)
IN_DIR = PROJECT_ROOT / "data" / "rinex_nav"

# Output CSV (if WRITE_CSV=True)
OUT_CSV = PROJECT_ROOT / "outputs" / "all_nav_master.csv"

# Whether to search subdirectories
RECURSIVE = False  # True = include subfolders

# Whether to write CSV to disk
WRITE_CSV = True

# Import the navigation parser module from this repository
# Make sure nav_rinex2_to_df.py is in the same folder (or on PYTHONPATH)
import nav_rinex2_to_df as nav_mod  # type: ignore

# ----------------------------------------------------------------------
# MAIN LOGIC
# ----------------------------------------------------------------------

def collect_nav_files(in_dir: Path, recursive: bool = False) -> list[Path]:
    """Collect RINEX-2 NAV files (*.??n, *.n) from a directory."""
    if not in_dir.is_dir():
        raise SystemExit(f"Input directory not found: {in_dir}")

    patterns = ["*.??n", "*.n", "*.??N", "*.N"]
    files: set[Path] = set()

    for pat in patterns:
        if recursive:
            files.update(p for p in in_dir.rglob(pat) if p.is_file())
        else:
            files.update(p for p in in_dir.glob(pat) if p.is_file())

    files_sorted = sorted(p.resolve() for p in files)
    print(f"Found NAV files: {len(files_sorted)}")
    return files_sorted


def parse_nav_files(files: list[Path]) -> pd.DataFrame:
    """
    Parse a list of RINEX-2 NAV files with nav_mod.parse_rinex2_nav()
    and aggregate into a single DataFrame.
    """
    rows_all: list[dict] = []

    for p in files:
        try:
            ephs = nav_mod.parse_rinex2_nav(str(p))  # list of dicts
            if not ephs:
                print(f"Warning: no ephemerides read from: {p.name}")
                continue

            for e in ephs:
                d = dict(e)
                # Convert datetime → ISO 8601 string for 'toc'
                if "toc" in d and hasattr(d["toc"], "isoformat"):
                    d["toc"] = d["toc"].isoformat()
                d["nav_file"] = p.name  # traceability
                rows_all.append(d)

        except Exception as ex:  # noqa: BLE001
            print(f"Warning: error while parsing {p.name}: {ex}")

    # Expected columns (order) based on parser structure
    cols_order = [
        "prn", "toc", "af0", "af1", "af2",
        "IODE", "Crs", "d_n", "M0", "Cuc", "e", "Cus", "sqrtA",
        "Toe", "Cic", "Omega0", "Cis", "i0", "Crc", "omega", "OMEGA_DOT",
        "IDOT", "week", "sv_accuracy", "sv_health", "Tgd", "IODC",
        "TxTime", "fit_int",
        "nav_file",
    ]

    df_nav = pd.DataFrame(rows_all)

    if df_nav.empty:
        print("No ephemerides parsed; DataFrame is empty.")
        return df_nav

    # Keep only known columns (if some are missing/NaN it's not fatal)
    df_nav = df_nav[[c for c in cols_order if c in df_nav.columns]].copy()

    # De-duplicate: one row per (PRN, toc)
    if {"prn", "toc"}.issubset(df_nav.columns):
        before = len(df_nav)
        df_nav.drop_duplicates(subset=["prn", "toc"], inplace=True)
        after = len(df_nav)
        if after != before:
            print(f"Deduplication: {before} → {after} rows (key: prn+toc)")

        df_nav.sort_values(["prn", "toc"], inplace=True, ignore_index=True)

    print(f"Aggregated rows: {len(df_nav)}")
    return df_nav


def main() -> None:
    files = collect_nav_files(IN_DIR, recursive=RECURSIVE)
    df_nav = parse_nav_files(files)

    # Show a small preview in the console
    if not df_nav.empty:
        print("\nHead of NAV master DataFrame:")
        print(df_nav.head(20))

    # Optional: write a single CSV to disk
    if WRITE_CSV and not df_nav.empty:
        OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
        df_nav.to_csv(OUT_CSV, index=False)
        print(f"\nCSV written: {OUT_CSV}")


if __name__ == "__main__":
    main()

Generate Parquet files from RINEX files

In [None]:
# Build lean observation master from RINEX-2 OBS files
#
# This cell:
#   - Scans a directory for RINEX-2 observation files (*.??o / *.o)
#   - Uses `parse_rinex2_obs_lean()` to extract a minimal observable:
#       time (UTC), prn, P2_minus_C1_m
#   - Aggregates results into a single DataFrame
#   - Optionally writes CSV and Parquet master files
#
# Requirements:
#   - "rinex2_obs_lean.py" in the same folder (or on PYTHONPATH)
#   - The input directory contains standard RINEX-2 observation files

from pathlib import Path
import pandas as pd

from rinex2_obs_lean import parse_rinex2_obs_lean  # local parser module

# ----------------------------------------------------------------------
# CONFIG (EDIT THESE PATHS / FLAGS FOR YOUR SETUP)
# ----------------------------------------------------------------------

# Base project directory: for a notebook, this is usually the current working dir
PROJECT_ROOT = Path.cwd()

# Directory containing *.??o / *.o RINEX observation files
IN_DIR = PROJECT_ROOT / "data" / "rinex_obs"  # adapt to your layout

# Whether to search subdirectories
RECURSIVE = True  # set to False to only scan IN_DIR

# Output files
WRITE_CSV = True
WRITE_PARQUET = True
OUT_MASTER_CSV = PROJECT_ROOT / "outputs" / "obs_master_lean.csv"
OUT_PARQUET = PROJECT_ROOT / "outputs" / "obs_master_lean.parquet"

# Optional: restrict processing to a slice of the file list
# Set END_INDEX = None to process all files from START_INDEX
START_INDEX = 0
END_INDEX = None  # e.g., 278 to mimic files[1200:1478] with different offsets

# ----------------------------------------------------------------------
# COLLECT RINEX-2 OBS FILES
# ----------------------------------------------------------------------

patterns = ["*.??o", "*.o", "*.??O", "*.O"]
files = []

for pat in patterns:
    if RECURSIVE:
        files.extend(IN_DIR.rglob(pat))
    else:
        files.extend(IN_DIR.glob(pat))

files = sorted({p.resolve() for p in files if p.is_file()})
print(f"Found OBS files: {len(files)}")

# Apply optional slice
if END_INDEX is not None:
    files_to_process = files[START_INDEX:END_INDEX]
else:
    files_to_process = files[START_INDEX:]

print(f"Files to process: {len(files_to_process)}")

# ----------------------------------------------------------------------
# PARSE OBS FILES AND BUILD MASTER DATAFRAME
# ----------------------------------------------------------------------

dfs = []

for i, p in enumerate(files_to_process, 1):
    try:
        df = parse_rinex2_obs_lean(str(p))
        if not df.empty:
            df["source_file"] = p.name  # traceability
            df["source_file"] = df["source_file"].astype("category")
            dfs.append(df)
        else:
            print(f"  - {p.name}: no readable C1/P2 observations")
    except Exception as exc:  # noqa: BLE001
        print(f"Warning on {p.name}: {exc}")

if dfs:
    obs_lean = pd.concat(dfs, ignore_index=True)
    obs_lean.sort_values(["time", "prn"], inplace=True, ignore_index=True)
    print(f"Total rows: {len(obs_lean)}")
else:
    obs_lean = pd.DataFrame(
        columns=["time", "prn", "P2_minus_C1_m", "source_file"]
    )
    print("No data parsed.")

# Preview in the notebook
try:
    from IPython.display import display  # type: ignore
    display(obs_lean.head(20))
except Exception:
    print(obs_lean.head(20))

# ----------------------------------------------------------------------
# WRITE MASTER FILES (CSV / PARQUET)
# ----------------------------------------------------------------------

if (WRITE_CSV or WRITE_PARQUET) and not obs_lean.empty:
    if WRITE_CSV:
        OUT_MASTER_CSV.parent.mkdir(parents=True, exist_ok=True)
        obs_lean.to_csv(OUT_MASTER_CSV, index=False)
        print(f"CSV written: {OUT_MASTER_CSV}")

    if WRITE_PARQUET:
        OUT_PARQUET.parent.mkdir(parents=True, exist_ok=True)
        obs_lean.to_parquet(OUT_PARQUET, index=False)
        print(f"Parquet written: {OUT_PARQUET}")


Generate Receiver bias per day over a period: WLS vs Minimum-Scalloping

In [None]:
"""
Estimate one daily receiver code bias using two methods:
  - Weighted Least Squares (WLS, two-pass with residual clipping)
  - Minimum-Scalloping (MS) using nighttime VTEC smoothness

Inputs (expected layout, adapt as needed):

- Observation master (Parquet/CSV):
    columns: time (UTC, ISO8601), prn, P2_minus_C1_m
    directory: data/obs_master/

- Navigation master (CSV):
    columns: prn, toc, week, Toe, sqrtA, d_n, e, M0, Cuc, Cus, Crc, Crs, Cic, Cis, i0, IDOT, Omega0, OMEGA_DOT, omega
    path: data/nav/NEW_all_nav_master.csv

- DCB master (CSV):
    columns: date (or year + doy), prn, C2W-C1C_sat_m
    path: data/dcb/NEW_all_dcb_master.csv

Output:

- CSV with one row per day:
    date, D_rx_WLS_m, D_rx_MS_m, delta_MS_minus_WLS_m, N_obs_pass1, N_obs_pass2, N_epochs, Nsat_mean, MS_score_min

This script is self-contained except for the input files.
"""

from __future__ import annotations

import math
from pathlib import Path
from typing import Any, Dict, Optional

import numpy as np
import pandas as pd

# ============================================================================
# CONFIGURATION
# ============================================================================

PROJECT_ROOT = Path(__file__).resolve().parent

# Observation master directory (Parquet and/or CSV)
OBS_DIR = PROJECT_ROOT / "data" / "obs_master"

# Navigation master CSV (broadcast ephemerides)
NAV_FILE = PROJECT_ROOT / "data" / "nav" / "NEW_all_nav_master.csv"

# DCB master CSV (satellite code biases)
DCB_FILE = PROJECT_ROOT / "data" / "dcb" / "NEW_all_dcb_master.csv"

# Output CSV for daily receiver biases
OUT_DRX_CSV = PROJECT_ROOT / "outputs" / "Drx_WLS_vs_MS_daily.csv"

# Date range (UTC)
DATE_START = "2015-10-01"
DATE_END = "2025-09-26"

# Receiver coordinates (ECEF, meters)
RX_XYZ = (5411106.3084, -747628.5974, 3286931.3223)

# Geometry / QC parameters
ELEV_MIN_DEG = 10.0
H_IONO = 350_000.0
R_EARTH = 6_378_137.0
MAX_RESID_M = 5.0
MAX_VTEC_TECU = 200.0
MERGE_TOL = "2h"  # obs-ephemeris nearest tolerance

# GPS L1/L2
f1 = 1575.42e6
f2 = 1227.60e6
K_m_per_TECU = 40.3 * (1.0 / f2**2 - 1.0 / f1**2)  # ~0.105 m / TECU

# ============================================================================
# NAV / DCB LOADING
# ============================================================================


def load_nav(path: Path) -> pd.DataFrame:
    """
    Load broadcast ephemeris master CSV.

    Expected columns:
        prn, toc (ISO8601), week, Toe, sqrtA, d_n, e, M0,
        Cuc, Cus, Crc, Crs, Cic, Cis, i0, IDOT, Omega0, OMEGA_DOT, omega
    """
    nav = pd.read_csv(path)
    nav["prn"] = nav["prn"].astype(str).str.upper().str.strip()
    nav["toc"] = pd.to_datetime(nav["toc"], utc=True, errors="coerce")

    # Reference GPS time epoch
    gps_epoch = pd.to_datetime("1980-01-06", utc=True)

    nav["ephem_time"] = gps_epoch + pd.to_timedelta(nav["week"] * 7, unit="D") + pd.to_timedelta(
        nav["Toe"], unit="s"
    )

    keep = [
        "prn",
        "ephem_time",
        "week",
        "Toe",
        "sqrtA",
        "d_n",
        "e",
        "M0",
        "Cuc",
        "Cus",
        "Crc",
        "Crs",
        "Cic",
        "Cis",
        "i0",
        "IDOT",
        "Omega0",
        "OMEGA_DOT",
        "omega",
    ]
    nav = nav[keep].sort_values(["prn", "ephem_time"]).reset_index(drop=True)
    return nav


def load_dcb(path: Path) -> pd.DataFrame:
    """
    Load satellite DCB master CSV.

    Expected columns:
        - Either: 'date' (YYYY-MM-DD) and 'prn'
        - Or: 'year', 'doy' and 'prn'
        - 'C2W-C1C_sat_m' (meters)
    """
    dcb = pd.read_csv(path)
    dcb["prn"] = dcb["prn"].astype(str).str.upper().str.strip()

    if "date" in dcb.columns:
        dcb["dcb_date"] = pd.to_datetime(dcb["date"], utc=True).dt.floor("D")
    else:
        base = pd.to_datetime(dcb["year"].astype(int).astype(str), format="%Y", utc=True)
        dcb["dcb_date"] = (base + pd.to_timedelta(dcb["doy"].astype(int) - 1, unit="D")).dt.floor("D")

    dcb = dcb.rename(columns={"C2W-C1C_sat_m": "sat_dcb_m"})
    return dcb[["prn", "dcb_date", "sat_dcb_m"]]


# ============================================================================
# OBSERVATION LOADING
# ============================================================================


def _read_day_slice(fp: Path, t0: pd.Timestamp, t1: pd.Timestamp) -> pd.DataFrame:
    """
    Read a time slice [t0, t1) from a Parquet or CSV file,
    keeping only 'time', 'prn', 'P2_minus_C1_m'.
    """
    cols = ["time", "prn", "P2_minus_C1_m"]

    if fp.suffix.lower() == ".parquet":
        try:
            df = pd.read_parquet(
                fp,
                columns=cols,
                filters=[("time", ">=", t0), ("time", "<", t1)],
            )
        except Exception:  # noqa: BLE001
            df = pd.read_parquet(fp, columns=cols)
            df["time"] = pd.to_datetime(df["time"], utc=True)
            df = df[(df["time"] >= t0) & (df["time"] < t1)]
    else:
        df = pd.read_csv(fp, usecols=cols, parse_dates=["time"])
        df["time"] = pd.to_datetime(df["time"], utc=True)
        df = df[(df["time"] >= t0) & (df["time"] < t1)]

    return df


def stack_obs_for_day(t0: pd.Timestamp) -> Optional[pd.DataFrame]:
    """
    Stack all observation files for a given UTC day into a single DataFrame.
    """
    t1 = t0 + pd.Timedelta(days=1)
    files = sorted(list(OBS_DIR.glob("*.parquet")) + list(OBS_DIR.glob("*.csv")))

    if not files:
        raise SystemExit(f"No observation files found in: {OBS_DIR}")

    parts: list[pd.DataFrame] = []
    for fp in files:
        df = _read_day_slice(fp, t0, t1)
        if df is not None and len(df):
            parts.append(df)

    if not parts:
        return None

    obs = pd.concat(parts, ignore_index=True)
    obs["prn"] = obs["prn"].astype(str).str.upper().str.strip()
    obs["time"] = pd.to_datetime(obs["time"], utc=True)
    obs = obs.dropna(subset=["time", "prn", "P2_minus_C1_m"])

    return obs.sort_values(["prn", "time"]).reset_index(drop=True)


# ============================================================================
# GEOMETRY
# ============================================================================


def ecef_to_geodetic(x: float, y: float, z: float) -> tuple[float, float, float]:
    """
    Convert ECEF (x, y, z) -> (lat, lon, h) in radians, radians, meters.
    WGS84 ellipsoid.
    """
    a = 6378137.0
    f = 1.0 / 298.257223563
    e2 = f * (2.0 - f)
    b = a * (1.0 - f)
    ep2 = (a * a - b * b) / (b * b)

    r = math.hypot(x, y)
    E2 = a * a - b * b
    F = 54.0 * b * b * z * z
    G = r * r + (1.0 - e2) * z * z - e2 * E2
    c = (e2 * e2 * F * r * r) / (G * G * G)
    s = (1.0 + c + math.sqrt(c * c + 2.0 * c)) ** (1.0 / 3.0)
    P = F / (3.0 * (s + 1.0 / s + 1.0) ** 2 * G * G)
    Q = math.sqrt(1.0 + 2.0 * e2 * e2 * P)
    r0 = -(P * e2 * r) / (1.0 + Q) + math.sqrt(
        0.5 * a * a * (1.0 + 1.0 / Q)
        - (P * (1.0 - e2) * z * z) / (Q * (1.0 + Q))
        - 0.5 * P * r * r
    )
    U = math.sqrt((r - e2 * r0) ** 2 + z * z)
    V = math.sqrt((r - e2 * r0) ** 2 + (1.0 - e2) * z * z)
    Z0 = b * b * z / (a * V)
    h = U * (1.0 - b * b / (a * V))
    lat = math.atan2(z + ep2 * Z0, r)
    lon = math.atan2(y, x)
    return lat, lon, h


def elevation_deg(rx_xyz: tuple[float, float, float], sat_xyz: tuple[float, float, float]) -> float:
    """
    Compute elevation angle (degrees) of satellite relative to receiver.
    """
    x, y, z = rx_xyz
    lat, lon, _ = ecef_to_geodetic(x, y, z)

    sl, cl = math.sin(lat), math.cos(lat)
    slon, clon = math.sin(lon), math.cos(lon)

    R = np.array(
        [
            [-slon, clon, 0.0],
            [-clon * sl, -slon * sl, cl],
            [clon * cl, slon * cl, sl],
        ]
    )

    d = np.array([sat_xyz[0] - x, sat_xyz[1] - y, sat_xyz[2] - z])
    e, n, u = R @ d
    return math.degrees(math.atan2(u, math.hypot(e, n)))


def sat_ecef_from_navrow(t_utc: pd.Timestamp, rnav: pd.Series) -> tuple[float, float, float]:
    """
    Compute satellite ECEF position from a NAV ephemeris row at time t_utc.
    """
    MU = 3.986005e14
    OMEGA_E = 7.2921151467e-5

    t = pd.Timestamp(t_utc).tz_convert("UTC")
    toe_time = pd.to_datetime(rnav["ephem_time"], utc=True)
    tk = (t - toe_time).total_seconds()

    half = 302400.0
    if tk > half:
        tk -= 2.0 * half
    if tk < -half:
        tk += 2.0 * half

    A = float(rnav["sqrtA"]) ** 2
    dn = float(rnav["d_n"])
    ecc = float(rnav["e"])
    M0 = float(rnav["M0"])
    Cuc = float(rnav["Cuc"])
    Cus = float(rnav["Cus"])
    Crc = float(rnav["Crc"])
    Crs = float(rnav["Crs"])
    Cic = float(rnav["Cic"])
    Cis = float(rnav["Cis"])
    i0 = float(rnav["i0"])
    IDOT = float(rnav["IDOT"])
    OMG0 = float(rnav["Omega0"])
    OMG_DOT = float(rnav["OMEGA_DOT"])
    omg = float(rnav["omega"])
    toe_s = float(rnav["Toe"])

    n0 = math.sqrt(MU / A**3)
    n = n0 + dn
    M = M0 + n * tk

    # Solve Kepler's equation for E
    E = M
    for _ in range(12):
        dE = (M + ecc * math.sin(E) - E) / (1.0 - ecc * math.cos(E))
        E += dE
        if abs(dE) < 1e-13:
            break

    v = math.atan2(math.sqrt(1.0 - ecc * ecc) * math.sin(E), math.cos(E) - ecc)
    phi = v + omg

    du = Cuc * math.cos(2.0 * phi) + Cus * math.sin(2.0 * phi)
    dr = Crc * math.cos(2.0 * phi) + Crs * math.sin(2.0 * phi)
    di = Cic * math.cos(2.0 * phi) + Cis * math.sin(2.0 * phi)

    u = phi + du
    r = A * (1.0 - ecc * math.cos(E)) + dr
    i = i0 + IDOT * tk + di

    x_orb = r * math.cos(u)
    y_orb = r * math.sin(u)

    OMG = OMG0 + (OMG_DOT - OMEGA_E) * tk - OMEGA_E * toe_s
    cosO, sinO = math.cos(OMG), math.sin(OMG)
    cosi, sini = math.cos(i), math.sin(i)

    x = x_orb * cosO - y_orb * cosi * sinO
    y = x_orb * sinO + y_orb * cosi * cosO
    z = y_orb * sini

    return x, y, z


def merge_nav(
    obs_day: pd.DataFrame, nav: pd.DataFrame, tol: Optional[str] = None
) -> pd.DataFrame:
    """
    Merge observation and navigation by PRN via nearest-time (merge_asof).
    """
    out: list[pd.DataFrame] = []

    for prn, left in obs_day.groupby("prn", sort=False):
        right = nav[nav["prn"] == prn]
        if right.empty:
            continue

        left = left.sort_values("time")
        right = right.sort_values("ephem_time")

        kw: Dict[str, Any] = {"direction": "nearest"}
        if tol is not None:
            kw["tolerance"] = pd.Timedelta(tol)

        m = pd.merge_asof(
            left=left,
            right=right,
            left_on="time",
            right_on="ephem_time",
            **kw,
        )
        if m is None or m.empty:
            continue

        if "prn_x" in m.columns and "prn" not in m.columns:
            m = m.rename(columns={"prn_x": "prn"})
        if "prn_y" in m.columns:
            m = m.drop(columns=["prn_y"])

        out.append(m)

    if not out:
        return pd.DataFrame()

    return pd.concat(out, ignore_index=True)


# ============================================================================
# MAPPING + DCB
# ============================================================================


def mapping_M(el_deg: np.ndarray, Re: float = R_EARTH, H: float = H_IONO) -> np.ndarray:
    """
    Simple thin-shell mapping function M(elevation) = 1 / sqrt(1 - (Re / (Re + H) * cos el)^2).
    """
    el = np.radians(el_deg)
    arg = (Re * np.cos(el)) / (Re + H)
    M = 1.0 / np.sqrt(1.0 - np.clip(arg * arg, 0.0, 0.9999))
    return np.clip(M, 1.0, 20.0)


def attach_sat_dcb(m: pd.DataFrame, dcb_day: pd.DataFrame) -> pd.DataFrame:
    """
    Join satellite DCBs and build y_m = (P2-C1) - DCB_sat.
    """
    mm = m.merge(dcb_day, on="prn", how="inner").dropna(subset=["sat_dcb_m"])
    if mm.empty:
        return mm
    mm["y_m"] = mm["P2_minus_C1_m"] - mm["sat_dcb_m"]
    return mm


def mapping_block(df: pd.DataFrame) -> pd.DataFrame:
    """
    Add mapping function M, coefficient P = K * M, and weight w = 1 / M^2.
    """
    df["M"] = mapping_M(df["elev_deg"].values)
    df["P"] = K_m_per_TECU * df["M"].values
    df["w"] = 1.0 / (df["M"].values**2)
    return df


# ============================================================================
# WLS (CLOSED FORM, TWO PASSES)
# ============================================================================


def solve_closed_form(df: pd.DataFrame) -> tuple[Optional[float], Optional[pd.DataFrame], Optional[dict]]:
    """
    Closed-form solution for daily bias D and per-epoch VTEC
    using the block structure across satellites and epochs.
    """
    df = df.dropna(subset=["y_m", "M", "P", "w", "time"])
    if df.empty:
        return None, None, None

    d = df.assign(
        wP=df["w"] * df["P"],
        wP2=df["w"] * (df["P"] ** 2),
        wy=df["w"] * df["y_m"],
        wPy=df["w"] * df["P"] * df["y_m"],
    )

    G = d.groupby("time", sort=True)[["w", "wP", "wP2", "wPy"]].sum()
    S0, S1, S2, b_t = G["w"], G["wP"], G["wP2"], G["wPy"]
    b0 = d["wy"].sum()

    valid = (S2 > 0) & (S0 >= 3)
    if not valid.any():
        return None, None, None

    S0, S1, S2, b_t = S0[valid], S1[valid], S2[valid], b_t[valid]
    num = b0 - np.sum(S1 * b_t / S2)
    den = np.sum(S0) - np.sum((S1**2) / S2)

    if not np.isfinite(den) or den == 0:
        return None, None, None

    D = num / den
    V_t = (b_t - S1 * D) / S2

    times = pd.DatetimeIndex(G.index[valid])
    if times.tz is None:
        times = times.tz_localize("UTC")
    times = times.tz_convert("UTC").rename(None)

    vt = pd.DataFrame(
        {
            "time": times.to_numpy(copy=False),
            "VTEC_TECU": np.asarray(V_t, dtype=float),
        }
    )
    vt.sort_values(by="time", inplace=True, kind="mergesort")
    vt.reset_index(drop=True, inplace=True)

    return float(D), vt, {"N_obs": int(len(df))}


def wls_two_pass(
    df: pd.DataFrame,
    max_resid_m: float = MAX_RESID_M,
) -> tuple[Optional[float], Optional[pd.DataFrame], dict]:
    """
    Two-pass WLS:
      1) initial fit via closed form
      2) residual-based clipping and refit
    """
    D1, vt1, _ = solve_closed_form(df)
    if D1 is None:
        return None, None, {}

    df["time"] = pd.to_datetime(df["time"], utc=True)
    vt1["time"] = pd.to_datetime(vt1["time"], utc=True)

    vt1_idx = vt1.set_index("time")["VTEC_TECU"]
    d1 = df.join(vt1_idx, on="time", how="inner")

    pred = D1 + d1["P"].values * d1["VTEC_TECU"].values
    resid = d1["y_m"].values - pred

    keep = np.isfinite(resid) & (np.abs(resid) <= max_resid_m)
    d2 = d1.loc[keep].copy()

    D2, vt2, diag = solve_closed_form(d2)
    if D2 is None:
        return None, None, {}

    vt2 = vt2[(vt2["VTEC_TECU"] >= 0.0) & (vt2["VTEC_TECU"] <= MAX_VTEC_TECU)].copy()

    diag = {
        "N_obs_pass1": int(len(d1)),
        "N_obs_pass2": int(len(d2)),
        "N_epochs": int(len(vt2)),
        "Nsat_mean": float(d2.groupby("time").size().mean()) if len(d2) else np.nan,
    }
    return float(D2), vt2, diag


# ============================================================================
# MINIMUM-SCALLOPING (MS)
# ============================================================================


def _scalloping_score(D: float, df: pd.DataFrame, night_only: bool = True) -> float:
    """
    Measure nocturnal "ripples" of VTEC for a given D.
    Lower score = smoother (preferred).
    """
    vtec_i = (df["y_m"].values - D) / df["P"].values
    tmp = pd.DataFrame({"time": pd.to_datetime(df["time"], utc=True), "vtec": vtec_i})

    if night_only:
        lt = tmp["time"].dt.tz_convert("Africa/Casablanca").dt.hour
        tmp = tmp[(lt <= 4) | (lt >= 22)]
        if tmp.empty:
            return float("inf")

    def mad(a: np.ndarray) -> float:
        m = np.median(a)
        return float(np.median(np.abs(a - m)))

    disp = tmp.groupby("time")["vtec"].apply(lambda a: mad(a.to_numpy()))
    disp = disp[np.isfinite(disp)]

    if not len(disp):
        return float("inf")
    return float(np.median(disp))


def minimum_scalloping_D(
    df: pd.DataFrame,
    D_init: Optional[float] = None,
    span_m: float = 10.0,
    step_m: float = 0.1,
    night_only: bool = True,
) -> tuple[Optional[float], dict]:
    """
    Grid-search around D_init to minimize nocturnal scalloping.
    """
    if D_init is None:
        D0, _, _ = solve_closed_form(df)
        if D0 is None:
            return None, {}
        D_init = D0

    grid = np.arange(D_init - span_m, D_init + span_m + step_m, step_m)
    scores = [_scalloping_score(D, df, night_only=night_only) for D in grid]

    k = int(np.argmin(scores))
    return float(grid[k]), {
        "MS_score_min": float(scores[k]),
        "MS_grid_step": float(step_m),
    }


# ============================================================================
# ONE DAY WRAPPER
# ============================================================================


def drx_for_one_day(
    day_str: str,
    elev_min_deg: float = ELEV_MIN_DEG,
    night_only: bool = True,
    nav: Optional[pd.DataFrame] = None,
    dcb: Optional[pd.DataFrame] = None,
) -> Optional[dict]:
    """
    Estimate daily receiver bias for a single UTC day.
    """
    if nav is None:
        nav = load_nav(NAV_FILE)
    if dcb is None:
        dcb = load_dcb(DCB_FILE)

    t0 = pd.Timestamp(day_str, tz="UTC").floor("D")

    obs_day = stack_obs_for_day(t0)
    if obs_day is None or obs_day.empty:
        return None

    m = merge_nav(obs_day, nav, tol=MERGE_TOL)
    if m is None or m.empty:
        return None

    dcb_day = dcb[dcb["dcb_date"] == t0.floor("D")][["prn", "sat_dcb_m"]]
    if dcb_day.empty:
        return None

    # Satellite positions and elevation
    sv = np.vstack([sat_ecef_from_navrow(row["time"], row) for _, row in m.iterrows()])
    m["sv_x"], m["sv_y"], m["sv_z"] = sv[:, 0], sv[:, 1], sv[:, 2]
    m["elev_deg"] = [
        elevation_deg(RX_XYZ, (x, y, z)) for x, y, z in zip(m["sv_x"], m["sv_y"], m["sv_z"])
    ]

    m = attach_sat_dcb(m, dcb_day)
    if m.empty:
        return None

    m = m[m["elev_deg"] >= float(elev_min_deg)].copy()
    m = mapping_block(m)

    D_wls, vt, diag = wls_two_pass(m, max_resid_m=MAX_RESID_M)
    if D_wls is None:
        return None

    D_ms, info_ms = minimum_scalloping_D(
        m,
        D_init=D_wls,
        span_m=10.0,
        step_m=0.1,
        night_only=night_only,
    )

    delta = (D_ms - D_wls) if D_ms is not None else np.nan

    return {
        "date": t0.date(),
        "D_rx_WLS_m": float(D_wls),
        "D_rx_MS_m": float(D_ms) if D_ms is not None else np.nan,
        "delta_MS_minus_WLS_m": float(delta) if np.isfinite(delta) else np.nan,
        "N_obs_pass1": diag.get("N_obs_pass1", np.nan),
        "N_obs_pass2": diag.get("N_obs_pass2", np.nan),
        "N_epochs": diag.get("N_epochs", np.nan),
        "Nsat_mean": diag.get("Nsat_mean", np.nan),
        "MS_score_min": info_ms.get("MS_score_min", np.nan),
    }


# ============================================================================
# PERIOD RUNNER
# ============================================================================


def run_period(
    date_start: str = DATE_START,
    date_end: str = DATE_END,
    out_csv: Path = OUT_DRX_CSV,
    elev_min_deg: float = ELEV_MIN_DEG,
) -> None:
    """
    Loop over a date range and compute daily receiver bias for each day.
    """
    nav = load_nav(NAV_FILE)
    dcb = load_dcb(DCB_FILE)

    days = pd.date_range(
        pd.Timestamp(date_start, tz="UTC"),
        pd.Timestamp(date_end, tz="UTC"),
        freq="D",
    )

    rows: list[dict] = []

    for t0 in days:
        day_txt = str(t0.date())
        try:
            rec = drx_for_one_day(
                day_txt,
                elev_min_deg=elev_min_deg,
                night_only=True,
                nav=nav,
                dcb=dcb,
            )
            if rec is None:
                print(f"{day_txt}: no result (missing obs/nav/dcb or no solution).")
                continue

            rows.append(rec)
            print(
                f"{day_txt}  WLS={rec['D_rx_WLS_m']:.3f}  "
                f"MS={rec['D_rx_MS_m']:.3f}  "
                f"Δ={rec['delta_MS_minus_WLS_m']:.3f}"
            )
        except Exception as exc:  # noqa: BLE001
            print(f"{day_txt}: error → {exc}")

    if rows:
        out_csv.parent.mkdir(parents=True, exist_ok=True)
        df = pd.DataFrame(rows).sort_values("date")
        df.to_csv(out_csv, index=False)
        print(f"\nSaved receiver-bias table to: {out_csv}")
    else:
        print("No rows produced for the requested period.")


# ============================================================================
# MAIN
# ============================================================================

if __name__ == "__main__":
    pd.set_option("display.width", 160)
    pd.set_option("display.max_columns", 20)
    run_period()


Generate a vtec_gim dataframe

In [None]:
"""
Sample co-located Global Ionospheric Maps (GIM, IONEX format) at a given station latitude/longitude and build a continuous 30-minute VTEC 
time series over a specified date range.

Outputs a CSV:
    time (UTC), vtec_gim (TECU)

Assumptions:
- IONEX files are already decompressed into a single directory.
- Both legacy CODG (short) and new COD0OPSFIN/COD0OPSRAP (long-name) formats
  may be present; the script chooses the appropriate product per date.
- TEC is stored as integers multiplied by 10^EXPONENT (standard IONEX).

Dependencies:
    numpy, pandas

Author: Mohamed Kaab (MIT License)
"""

from __future__ import annotations

import io
import subprocess
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Optional, Tuple, List

import numpy as np
import pandas as pd

# =============================================================================
# CONFIGURATION
# =============================================================================

PROJECT_ROOT = Path(__file__).resolve().parent

# Directory containing ALL decompressed IONEX files (.i / .INX, possibly .Z/.gz)
DEC_DIR = PROJECT_ROOT / "data" / "ionex_decompressed"

# Output CSV
OUT_CSV = PROJECT_ROOT / "outputs" / "VTEC_GIM_30min_20151001_20250926.csv"

# Station coordinates (geodetic degrees)
STA_LAT = 31.206
STA_LON = -7.866

# Date range (inclusive)
START_DATE = date(2015, 10, 1)
END_DATE = date(2025, 9, 26)

# =============================================================================
# LOW-LEVEL I/O HELPERS
# =============================================================================


def _open_text(p: Path) -> io.StringIO:
    """
    Open an IONEX/IONEX-compressed file as text and return a StringIO.

    Supports:
      - plain text (.i, .INX)
      - .gz via `gzip -dc`
      - .Z via `gzip -dc` first, then `7z e -so` as fallback

    Requires `gzip` (and optionally `7z`) in PATH for compressed files.
    """
    p = Path(p)
    if not p.exists():
        raise FileNotFoundError(p)

    ext = p.suffix.lower()

    if ext == ".gz":
        out = subprocess.run(["gzip", "-dc", str(p)], capture_output=True)
        if out.returncode != 0:
            raise RuntimeError(f"gzip -dc failed on {p}")
        return io.StringIO(out.stdout.decode("ascii", "ignore"))

    if ext == ".z":
        # Try gzip first
        gz = subprocess.run(["gzip", "-dc", str(p)], capture_output=True)
        if gz.returncode == 0 and gz.stdout:
            return io.StringIO(gz.stdout.decode("ascii", "ignore"))

        # Fallback to 7z
        sz = subprocess.run(["7z", "e", "-so", str(p)], capture_output=True)
        if sz.returncode != 0:
            raise RuntimeError(f"7z -so failed on {p}")
        return io.StringIO(sz.stdout.decode("ascii", "ignore"))

    # Plain text
    return io.StringIO(p.read_text(encoding="ascii", errors="ignore"))


def ionex_first_epoch_date(path: Path) -> Optional[date]:
    """
    Read an IONEX file and return the date of the first map epoch,
    based on the 'EPOCH OF FIRST MAP' header line.
    """
    try:
        f = _open_text(path)
    except Exception:
        return None

    for _ in range(400):
        line = f.readline()
        if not line:
            break
        if "EPOCH OF FIRST MAP" in line:
            yr, mo, dy, hh, mm, ss = map(int, line[:60].split()[:6])
            return datetime(yr, mo, dy, hh, mm, ss, tzinfo=timezone.utc).date()
        if "END OF HEADER" in line:
            break

    return None


# =============================================================================
# IONEX PARSING
# =============================================================================


def read_ionex(path: Path) -> Tuple[pd.DatetimeIndex, np.ndarray, np.ndarray, np.ndarray]:
    """
    Parse a (possibly compressed) IONEX file and return:

        times (UTC DatetimeIndex), lats (deg), lons (deg), TEC (array)
    
    where:
        TEC has shape (Ntimes, Nlat, Nlon)
        TEC is in TECU (floats, NaN for missing grid points).
    """
    f = _open_text(path)

    exp = -1
    lat1 = lat2 = dlat = None
    lon1 = lon2 = dlon = None

    # --- header ---
    while True:
        line = f.readline()
        if not line:
            raise ValueError("Incomplete IONEX header")

        if "EXPONENT" in line:
            s = line[:8].strip()
            exp = int(s) if s else -1

        if "LAT1 / LAT2 / DLAT" in line:
            lat1, lat2, dlat = map(float, line[:60].split()[:3])

        if "LON1 / LON2 / DLON" in line:
            lon1, lon2, dlon = map(float, line[:60].split()[:3])

        if "END OF HEADER" in line:
            break

    if None in (lat1, lat2, dlat, lon1, lon2, dlon):
        raise ValueError("Missing IONEX grid definition")

    nlat = int(round((lat2 - lat1) / dlat)) + 1
    nlon = int(round((lon2 - lon1) / dlon)) + 1
    lats = np.linspace(lat1, lat2, nlat)
    lons = np.linspace(lon1, lon2, nlon)

    times: List[pd.Timestamp] = []
    maps: List[np.ndarray] = []

    # --- TEC maps ---
    while True:
        line = f.readline()
        if not line:
            break

        if "START OF TEC MAP" not in line:
            continue

        # Find EPOCH OF CURRENT MAP
        line = f.readline()
        while line and "EPOCH OF CURRENT MAP" not in line:
            line = f.readline()
        if not line:
            break

        yr, mo, dy, hh, mm, ss = map(int, line[:60].split()[:6])
        t = pd.Timestamp(datetime(yr, mo, dy, hh, mm, ss, tzinfo=timezone.utc))

        tec_map = np.full((nlat, nlon), np.nan, dtype=float)
        bad = False

        # One record per latitude
        for ilat in range(nlat):
            hdr = f.readline()
            if not hdr or "LAT/LON1/LON2/DLON/H" not in hdr:
                bad = True
                break

            vals: List[float] = []
            while len(vals) < nlon:
                data = f.readline()
                if (
                    not data
                    or ("START OF" in data)
                    or ("END OF" in data)
                    or ("LAT/LON1" in data)
                ):
                    bad = True
                    break

                chunks = [data[i : i + 5] for i in range(0, len(data.rstrip()), 5)]
                for c in chunks:
                    c = c.strip().upper()
                    if c == "" or c == "9999":
                        vals.append(np.nan)
                    else:
                        try:
                            vals.append(float(c) * (10.0**exp))
                        except Exception:
                            vals.append(np.nan)
                    if len(vals) == nlon:
                        break

            if bad:
                break
            if len(vals) < nlon:
                vals += [np.nan] * (nlon - len(vals))

            tec_map[ilat, :] = np.asarray(vals, dtype=float)

        if bad:
            # Skip until END OF TEC MAP
            x = hdr
            while x:
                if "END OF TEC MAP" in x:
                    break
                x = f.readline()
            continue

        times.append(t)
        maps.append(tec_map)

    if not maps:
        raise ValueError(f"No TEC maps found in IONEX: {path}")

    TEC = np.stack(maps, axis=0)

    # Normalize longitude range to [-180, 180) if needed
    if float(np.nanmin(lons)) >= 0.0 and float(np.nanmax(lons)) > 180.0:
        order = np.argsort(((lons + 180.0) % 360.0) - 180.0)
        lons = (((lons + 180.0) % 360.0) - 180.0)[order]
        TEC = TEC[:, :, order]

    # Ensure latitudes increasing
    if lats[0] > lats[-1]:
        lats = lats[::-1]
        TEC = TEC[:, ::-1, :]

    times_idx = pd.to_datetime(times, utc=True)
    return times_idx, lats, lons, TEC


# =============================================================================
# INTERPOLATION HELPERS
# =============================================================================


def bilinear(
    lats: np.ndarray,
    lons: np.ndarray,
    grid: np.ndarray,
    lat: float,
    lon: float,
) -> float:
    """
    Simple bilinear interpolation on a regular lat/lon grid.

    lats, lons are 1D arrays (ascending).
    grid is 2D (nlat, nlon) matching lats/lons.
    """
    i = np.searchsorted(lats, lat) - 1
    j = np.searchsorted(lons, lon) - 1
    i = np.clip(i, 0, len(lats) - 2)
    j = np.clip(j, 0, len(lons) - 2)

    y1, y2 = lats[i], lats[i + 1]
    x1, x2 = lons[j], lons[j + 1]

    Q11 = grid[i, j]
    Q12 = grid[i, j + 1]
    Q21 = grid[i + 1, j]
    Q22 = grid[i + 1, j + 1]

    if (x2 - x1) == 0.0 or (y2 - y1) == 0.0:
        return float(Q11)

    wx = (lon - x1) / (x2 - x1)
    wy = (lat - y1) / (y2 - y1)

    return float(
        (1.0 - wx) * (1.0 - wy) * Q11
        + wx * (1.0 - wy) * Q12
        + (1.0 - wx) * wy * Q21
        + wx * wy * Q22
    )


def to_utc_index(x) -> pd.DatetimeIndex:
    idx = pd.DatetimeIndex(x)
    if idx.tz is None:
        idx = idx.tz_localize("UTC")
    else:
        idx = idx.tz_convert("UTC")
    return idx.sort_values().unique()


def make_30min_grid(day: date) -> pd.DatetimeIndex:
    """
    Return a 30-min UTC grid for a given date (48 slots).
    """
    t0 = pd.Timestamp(day, tz="UTC")
    return pd.date_range(t0, periods=48, freq="30min")


# =============================================================================
# PRODUCT SELECTION FOR A GIVEN DATE
# =============================================================================


def product_window(day: date) -> str:
    """
    Decide which CODG/COD0OPSFIN/COD0OPSRAP product window applies
    for a given date.

    Returns:
        'OLD'    for legacy codgDDD0.yyi (short name),
        'OPSFIN' for COD0OPSFIN_*_GIM.INX,
        'OPSRAP' for COD0OPSRAP_*_GIM.INX.
    """
    if day <= date(2022, 11, 27):  # inclusive: 2022-DOY330
        return "OLD"
    if day <= date(2025, 9, 20):   # inclusive: 2025-DOY263
        return "OPSFIN"
    return "OPSRAP"                # after → RAPID


def pick_ionex_for_day(day: date) -> Optional[Path]:
    """
    Select the appropriate IONEX file in DEC_DIR for a given date.

    Logic:
      - For 'OLD', look for codgDDD0.yyi / CODGDDD0.yyi and validate by header.
      - For 'OPSFIN' or 'OPSRAP', prefer COD0OPSFIN or COD0OPSRAP long-name
        products for that year+DOY, again validated by header.
    """
    yy = f"{day.year % 100:02d}"
    doy = f"{int(pd.Timestamp(day).strftime('%j')):03d}"
    mode = product_window(day)

    # Legacy CODG (short name)
    if mode == "OLD":
        candidates = [f"codg{doy}0.{yy}i", f"CODG{doy}0.{yy}I"]
        for name in candidates:
            p = DEC_DIR / name
            if p.exists() and ionex_first_epoch_date(p) == day:
                return p

        # If several codg*.i exist, scan by header
        for q in DEC_DIR.glob(f"codg*{yy}i"):
            if ionex_first_epoch_date(q) == day:
                return q
        for q in DEC_DIR.glob(f"CODG*{yy}I"):
            if ionex_first_epoch_date(q) == day:
                return q
        return None

    # Long-name products
    patterns: list[str]
    if mode == "OPSFIN":
        patterns = [
            f"COD0OPSFIN_*{day.year}{doy}*_GIM.INX",
            f"COD0OPSRAP_*{day.year}{doy}*_GIM.INX",
        ]
    else:  # OPSRAP window
        patterns = [
            f"COD0OPSRAP_*{day.year}{doy}*_GIM.INX",
            f"COD0OPSFIN_*{day.year}{doy}*_GIM.INX",
        ]

    for pat in patterns:
        for q in DEC_DIR.glob(pat):
            if ionex_first_epoch_date(q) == day:
                return q

    return None


def vtec_30min_from_ionex(
    ionex_path: Path,
    day: date,
    lat: float,
    lon: float,
) -> pd.Series:
    """
    Sample a single IONEX file at (lat, lon) and interpolate in time
    to a 30-minute local grid for the given day.

    Returns a Series with:
        index: 30-min UTC timestamps,
        values: vtec_gim (TECU).
    """
    times, lats, lons, TEC = read_ionex(ionex_path)

    # If IONEX longitudes are [0, 360], convert station longitude accordingly
    lon_match = lon
    if float(np.nanmin(lons)) >= 0.0 and float(np.nanmax(lons)) > 180.0:
        lon_match = (lon + 360.0) % 360.0

    vals = [bilinear(lats, lons, TEC[k], lat, lon_match) for k in range(len(times))]

    ser = pd.Series(vals, index=to_utc_index(times), name="vtec_gim")

    # Interpolate onto local 30-min grid for this day
    t30 = make_30min_grid(day)
    union = ser.index.union(t30)
    ser30 = ser.reindex(union).interpolate("time").reindex(t30)

    return ser30


# =============================================================================
# MAIN LOOP
# =============================================================================


def main() -> None:
    all_rows: list[pd.DataFrame] = []

    d = START_DATE
    while d <= END_DATE:
        ionex_path = pick_ionex_for_day(d)

        if ionex_path is None:
            # Missing day → NaN series for this day
            t30 = make_30min_grid(d)
            df_day = pd.DataFrame({"time": t30, "vtec_gim": np.nan})
            all_rows.append(df_day)
            print(f"{d} : no IONEX file found, filled with NaN")
        else:
            ser30 = vtec_30min_from_ionex(ionex_path, d, STA_LAT, STA_LON)
            df_day = pd.DataFrame({"time": ser30.index, "vtec_gim": ser30.values})
            all_rows.append(df_day)
            print(f"{d} : processed {ionex_path.name}")

        d += timedelta(days=1)

    df = pd.concat(all_rows, axis=0, ignore_index=True)
    df.sort_values("time", inplace=True, ignore_index=True)

    OUT_CSV.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(OUT_CSV, index=False)

    print(f"\nWritten: {OUT_CSV}  |  rows: {len(df)}")


if __name__ == "__main__":
    main()


Cellule produisant deux fichiers csv par technique WLS ou MS

In [None]:
"""
VTEC statistics with fixed receiver bias (MS & WLS) --> four CSV products.

This script:
- Reads daily P2-C1 observations (Parquet/CSV) for one station.
- Merges them with broadcast ephemerides and satellite DCBs.
- Uses pre-estimated daily receiver biases (MS and WLS).
- Computes:
  * 30-min VTEC stats (two outputs: MS, WLS),
  * daily VTEC stats (two outputs: MS, WLS),
  * basic obs–ephemeris match statistics (one output).
"""

import math
from pathlib import Path
from typing import Optional, Tuple, Dict, Any, List

import numpy as np
import pandas as pd

# ---------- CONFIG (ADAPT THESE PATHS) ----------
# Directory with observation files containing: time, prn, P2_minus_C1_m
OBS_DIR = Path("data/obs")  # e.g., Path("data/All_Obs_master/Parquet")

# Broadcast ephemeris master file
NAV_FILE = Path("data/master/NEW_all_nav_master.csv")

# Satellite DCB master file (daily + monthly merged)
DCB_FILE = Path("data/master/NEW_all_dcb_master.csv")

# Daily receiver biases (output of the WLS vs MS script)
# Must contain columns: date, D_rx_MS_m, D_rx_WLS_m
DRX_FILE = Path("data/master/Drx_WLS_vs_MS_daily.csv")

# 30-min GIM VTEC at station (optional, for offset diagnostics)
GIM_FILE = Path("data/master/VTEC_GIM_30min_20151001_20250926.csv")

# Output CSV files
OUT_30MIN_MS = Path("outputs/2015_2025_MS_VTEC_30min_stats.csv")
OUT_30MIN_WL = Path("outputs/2015_2025_WLS_VTEC_30min_stats.csv")
OUT_DAILY_MS = Path("outputs/2015_2025_MS_VTEC_daily_stats.csv")
OUT_DAILY_WL = Path("outputs/2015_2025_WLS_VTEC_daily_stats.csv")
OUT_MATCH    = Path("outputs/2015_2025_MS_WLS_match_rates.csv")

# Date range to process
DATE_START = "2015-10-01"
DATE_END   = "2025-09-26"

# Receiver ECEF coordinates (m)
RX_XYZ = (5411106.3084, -747628.5974, 3286931.3223)

# Elevation masks (deg)
ELEV_MIN_DEG_BASE  = 10.0
ELEV_MIN_DEG_BOOST = 15.0

# Earth and ionospheric shell
R_EARTH = 6378137.0
H_IONO  = 350_000.0

# GPS L1/L2 frequencies
f1 = 1575.42e6
f2 = 1227.60e6

# Conversion factor: meters per TECU for P2-C1 combination
# (TEC in TECU, 1 TECU = 1e16 el/m^2)
K_m_per_TECU = 40.3e16 * (1.0 / f2**2 - 1.0 / f1**2)

# Obs–ephemeris merge tolerance
MERGE_TOL = "2h"

# VTEC bounds and outlier control
MAX_VTEC_TECU       = 200.0
Hampel_k            = 3.0
GIM_OFFSET_FLAG_TECU = 20.0

# ---------- GEO / NAV GEOMETRY ----------

def ecef_to_geodetic(x: float, y: float, z: float) -> Tuple[float, float, float]:
    """ECEF → (lat, lon, h) using a standard WGS-84 approximation."""
    a = 6378137.0
    f = 1 / 298.257223563
    e2 = f * (2 - f)
    b = a * (1 - f)
    ep2 = (a * a - b * b) / (b * b)
    r = math.hypot(x, y)
    E2 = a * a - b * b
    F = 54 * b * b * z * z
    G = r * r + (1 - e2) * z * z - e2 * E2
    c = (e2 * e2 * F * r * r) / (G * G * G)
    s = (1 + c + math.sqrt(c * c + 2 * c)) ** (1 / 3)
    P = F / (3 * (s + 1 / s + 1) ** 2 * G * G)
    Q = math.sqrt(1 + 2 * e2 * e2 * P)
    r0 = -(P * e2 * r) / (1 + Q) + math.sqrt(
        0.5 * a * a * (1 + 1 / Q) - (P * (1 - e2) * z * z) / (Q * (1 + Q)) - 0.5 * P * r * r
    )
    U = math.sqrt((r - e2 * r0) ** 2 + z * z)
    V = math.sqrt((r - e2 * r0) ** 2 + (1 - e2) * z * z)
    Z0 = b * b * z / (a * V)
    h = U * (1 - b * b / (a * V))
    lat = math.atan2(z + ep2 * Z0, r)
    lon = math.atan2(y, x)
    return lat, lon, h


def elevation_deg(rx_xyz: Tuple[float, float, float],
                  sat_xyz: Tuple[float, float, float]) -> float:
    """Elevation angle (deg) from receiver to satellite in ECEF."""
    x, y, z = rx_xyz
    lat, lon, _ = ecef_to_geodetic(x, y, z)
    sl, cl = math.sin(lat), math.cos(lat)
    slon, clon = math.sin(lon), math.cos(lon)
    R = np.array(
        [
            [-slon,        clon,       0.0],
            [-clon * sl, -slon * sl,   cl ],
            [ clon * cl,  slon * cl,   sl ],
        ]
    )
    d = np.array([sat_xyz[0] - x, sat_xyz[1] - y, sat_xyz[2] - z])
    e, n, u = R @ d
    return math.degrees(math.atan2(u, math.hypot(e, n)))


def mapping_M(el_deg: np.ndarray,
              Re: float = R_EARTH,
              H: float = H_IONO) -> np.ndarray:
    """Thin-shell mapping function M(elev)."""
    el = np.radians(el_deg)
    sin_zp = (Re / (Re + H)) * np.cos(el)
    sin_zp = np.clip(sin_zp, -1.0, 1.0)
    zp = np.arcsin(sin_zp)
    return 1.0 / np.cos(zp)


def sat_ecef_from_navrow(t_utc: pd.Timestamp,
                         rnav: pd.Series) -> Tuple[float, float, float]:
    """Compute satellite ECEF from broadcast ephemeris row at time t_utc."""
    MU = 3.986005e14
    OMEGA_E = 7.2921151467e-5

    t = pd.Timestamp(t_utc)
    t = t.tz_localize("UTC") if t.tzinfo is None else t.tz_convert("UTC")
    toe_time = pd.to_datetime(rnav["ephem_time"], utc=True)
    tk = (t - toe_time).total_seconds()
    half = 302400.0
    if tk > half:
        tk -= 2 * half
    if tk < -half:
        tk += 2 * half

    A = float(rnav["sqrtA"]) ** 2
    dn = float(rnav["d_n"])
    ecc = float(rnav["e"])
    M0 = float(rnav["M0"])
    Cuc = float(rnav["Cuc"])
    Cus = float(rnav["Cus"])
    Crc = float(rnav["Crc"])
    Crs = float(rnav["Crs"])
    Cic = float(rnav["Cic"])
    Cis = float(rnav["Cis"])
    i0 = float(rnav["i0"])
    IDOT = float(rnav["IDOT"])
    OMG0 = float(rnav["Omega0"])
    OMG_DOT = float(rnav["OMEGA_DOT"])
    omg = float(rnav["omega"])
    toe_s = float(rnav["Toe"])

    n0 = math.sqrt(MU / A**3)
    n = n0 + dn
    M = M0 + n * tk

    E = M
    for _ in range(12):
        dE = (M + ecc * math.sin(E) - E) / (1 - ecc * math.cos(E))
        E += dE
        if abs(dE) < 1e-13:
            break

    v = math.atan2(math.sqrt(1 - ecc * ecc) * math.sin(E),
                   math.cos(E) - ecc)
    phi = v + omg

    du = Cuc * math.cos(2 * phi) + Cus * math.sin(2 * phi)
    dr = Crc * math.cos(2 * phi) + Crs * math.sin(2 * phi)
    di = Cic * math.cos(2 * phi) + Cis * math.sin(2 * phi)

    u = phi + du
    r = A * (1 - ecc * math.cos(E)) + dr
    i = i0 + IDOT * tk + di

    x_orb = r * math.cos(u)
    y_orb = r * math.sin(u)

    OMG = OMG0 + (OMG_DOT - OMEGA_E) * tk - OMEGA_E * toe_s
    cosO, sinO = math.cos(OMG), math.sin(OMG)
    cosi, sini = math.cos(i), math.sin(i)

    x = x_orb * cosO - y_orb * cosi * sinO
    y = x_orb * sinO + y_orb * cosi * cosO
    z = y_orb * sini
    return x, y, z

# ---------- I/O HELPERS ----------

def load_nav(path: Path) -> pd.DataFrame:
    """Load merged RINEX-2 NAV master CSV."""
    nav = pd.read_csv(path)
    nav["prn"] = nav["prn"].str.upper().str.strip()
    nav["toc"] = pd.to_datetime(nav["toc"], utc=True, errors="coerce")
    nav["ephem_time"] = (
        pd.to_datetime("1980-01-06", utc=True)
        + pd.to_timedelta(nav["week"] * 7, unit="D")
        + pd.to_timedelta(nav["Toe"], unit="s")
    )
    keep = [
        "prn", "ephem_time", "week", "Toe", "sqrtA", "d_n", "e", "M0",
        "Cuc", "Cus", "Crc", "Crs", "Cic", "Cis", "i0", "IDOT",
        "Omega0", "OMEGA_DOT", "omega"
    ]
    return nav[keep].sort_values(["prn", "ephem_time"]).reset_index(drop=True)


def load_dcb(path: Path) -> pd.DataFrame:
    """Load satellite DCB master (daily C2W-C1C)."""
    dcb = pd.read_csv(path)
    dcb["prn"] = dcb["prn"].str.upper().str.strip()
    base = pd.to_datetime(dcb["year"].astype(int).astype(str),
                          format="%Y", utc=True)
    dcb["dcb_date"] = (
        base + pd.to_timedelta(dcb["doy"].astype(int) - 1, unit="D")
    ).dt.floor("D")
    dcb = dcb.rename(columns={"C2W-C1C_sat_m": "sat_dcb_m"})
    return dcb[["prn", "dcb_date", "sat_dcb_m"]]


def load_drx(path: Path) -> pd.DataFrame:
    """Load daily receiver bias file (must have date, D_rx_MS_m, D_rx_WLS_m)."""
    df = pd.read_csv(path)
    df["date"] = pd.to_datetime(df["date"], utc=True).dt.floor("D")

    # Try to standardize column names if slightly different
    if "D_rx_MS_m" not in df.columns:
        c = [c for c in df.columns if c.lower().startswith("d_rx_ms")] \
            or [c for c in df.columns if "MS" in c]
        if c:
            df = df.rename(columns={c[0]: "D_rx_MS_m"})
    if "D_rx_WLS_m" not in df.columns:
        c = [c for c in df.columns if c.lower().startswith("d_rx_wls")] \
            or [c for c in df.columns if "WLS" in c]
        if c:
            df = df.rename(columns={c[0]: "D_rx_WLS_m"})

    required = {"date", "D_rx_MS_m", "D_rx_WLS_m"}
    if not required.issubset(df.columns):
        raise ValueError("DRX_FILE must have: date, D_rx_MS_m, D_rx_WLS_m")

    return df[list(required)]


# Pre-resolve observation files
OBS_FILES: List[Path] = sorted(
    [*Path(OBS_DIR).glob("*.parquet"), *Path(OBS_DIR).glob("*.csv")]
)
print(f"Found observation files: {len(OBS_FILES)}")


def _read_day_slice(fp: Path,
                    t0: pd.Timestamp,
                    t1: pd.Timestamp) -> pd.DataFrame:
    """Read a single-day slice from one obs file."""
    cols = ["time", "prn", "P2_minus_C1_m"]
    if fp.suffix.lower() == ".parquet":
        try:
            df = pd.read_parquet(
                fp,
                columns=cols,
                engine="pyarrow",
                filters=[("time", ">=", t0), ("time", "<", t1)],
            )
        except Exception:
            df = pd.read_parquet(fp, columns=cols)
            df["time"] = pd.to_datetime(df["time"], utc=True)
            df = df[(df["time"] >= t0) & (df["time"] < t1)]
    else:
        df = pd.read_csv(fp, usecols=cols, parse_dates=["time"])
        df["time"] = pd.to_datetime(df["time"], utc=True)
        df = df[(df["time"] >= t0) & (df["time"] < t1)]
    return df


def stack_obs_for_day(t0: pd.Timestamp) -> Optional[pd.DataFrame]:
    """Stack all obs files for a given UTC day [t0, t0+1)."""
    t1 = t0 + pd.Timedelta(days=1)
    parts = []
    for fp in OBS_FILES:
        df = _read_day_slice(fp, t0, t1)
        if df is not None and len(df):
            parts.append(df)
    if not parts:
        return None
    obs = pd.concat(parts, ignore_index=True)
    obs["prn"] = obs["prn"].str.upper().str.strip()
    obs["time"] = pd.to_datetime(obs["time"], utc=True)
    obs = obs.dropna(subset=["time", "prn", "P2_minus_C1_m"])
    return obs.sort_values(["prn", "time"], kind="mergesort").reset_index(drop=True)


def merge_nav(obs_day: pd.DataFrame,
              nav: pd.DataFrame,
              tol: Optional[str]) -> pd.DataFrame:
    """Merge observations with ephemerides via nearest-ephem time per PRN."""
    if obs_day is None or obs_day.empty:
        return pd.DataFrame()

    out = []
    for prn_val, left in obs_day.groupby("prn", sort=False):
        right = nav[nav["prn"] == prn_val]
        if right.empty:
            continue
        left = left.sort_values("time", kind="mergesort")
        right = right.sort_values("ephem_time", kind="mergesort")
        merge_kwargs: Dict[str, Any] = dict(direction="nearest")
        if tol:
            merge_kwargs["tolerance"] = pd.Timedelta(tol)
        m = pd.merge_asof(
            left=left,
            right=right,
            left_on="time",
            right_on="ephem_time",
            **merge_kwargs,
        )
        if m is None or m.empty:
            continue
        if "prn_x" in m.columns and "prn" not in m.columns:
            m = m.rename(columns={"prn_x": "prn"})
        if "prn_y" in m.columns:
            m = m.drop(columns=["prn_y"])
        if "prn" not in m.columns:
            m["prn"] = prn_val
        out.append(m)

    if not out:
        return pd.DataFrame()
    m_all = pd.concat(out, ignore_index=True)
    return m_all.dropna(subset=["ephem_time"]).copy()

# ---------- DCB & MAPPING ----------

def attach_sat_dcb(m: pd.DataFrame,
                   dcb_day: pd.DataFrame) -> pd.DataFrame:
    """Attach satellite DCB and form the corrected observable y_m."""
    m2 = m.merge(dcb_day, on="prn", how="inner").dropna(subset=["sat_dcb_m"])
    if m2.empty:
        return m2
    m2["y_m"] = m2["P2_minus_C1_m"] - m2["sat_dcb_m"]
    return m2


def mapping_block(df: pd.DataFrame) -> pd.DataFrame:
    """Add mapping M and factor P = K*M used for VTEC inversion."""
    df["M"] = mapping_M(df["elev_deg"].values)
    df["P"] = K_m_per_TECU * df["M"].values
    return df

# ---------- GIM HANDLING ----------

def load_gim(path: Path) -> Optional[pd.DataFrame]:
    """Load 30-min GIM VTEC at the station (optional)."""
    if path is None or not path.exists():
        return None
    g = pd.read_csv(path, parse_dates=["time"])
    g["time"] = pd.to_datetime(g["time"], utc=True)
    return g[["time", "vtec_gim"]]


def daily_gim_offset(vt_df: pd.DataFrame,
                     gim: Optional[pd.DataFrame]) -> Optional[float]:
    """Median daily VTEC offset between local VTEC and GIM."""
    if gim is None or vt_df is None or vt_df.empty:
        return None
    g = gim.set_index("time")["vtec_gim"].sort_index()
    v = vt_df.set_index("time")["VTEC_TECU"].sort_index()
    g_aligned = g.reindex(
        v.index, method="nearest", tolerance=pd.Timedelta("15min")
    )
    d = (v - g_aligned).dropna()
    return float(d.median()) if not d.empty else None

# ---------- VTEC FROM FIXED D_rx ----------

def vtec_from_fixed_drx(df_day: pd.DataFrame,
                        D_rx_m: float,
                        elev_min: float) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """
    Compute per-epoch VTEC with a fixed D_rx (MS or WLS) and elevation mask.
    Returns:
      - vt: DataFrame(time, VTEC_TECU)
      - diag: basic diagnostics
    """
    df = df_day[df_day["elev_deg"] >= elev_min].copy()
    if df.empty:
        return pd.DataFrame(), {}

    df = mapping_block(df)
    vtec_i = (df["y_m"].values - D_rx_m) / df["P"].values
    df["VTEC_i"] = vtec_i
    df = df[(df["VTEC_i"] >= 0) & (df["VTEC_i"] <= MAX_VTEC_TECU)]
    if df.empty:
        return pd.DataFrame(), {}

    vt = (
        df.groupby("time", sort=True)["VTEC_i"]
        .median()
        .rename("VTEC_TECU")
        .reset_index()
    )
    diag = {
        "N_obs_used": int(len(df)),
        "N_epochs":   int(len(vt)),
        "Nsat_mean":  float(df.groupby("time").size().mean()) if len(df) else np.nan,
    }
    return vt, diag

# ---------- 30-MIN BINNING + GIM ----------

def bin_30min_with_gim(vt_df: pd.DataFrame,
                       gim: Optional[pd.DataFrame]) -> pd.DataFrame:
    """Resample VTEC to 30-min bins and optionally attach GIM VTEC."""
    if vt_df is None or vt_df.empty:
        return pd.DataFrame()

    s = vt_df.set_index("time")["VTEC_TECU"].sort_index()
    r = s.resample("30min")

    v30 = pd.DataFrame(
        {
            "time":        r.median().index,
            "VTEC_median": r.median().values,
            "VTEC_mean":   r.mean().values,
            "VTEC_q25":    r.quantile(0.25).values,
            "VTEC_q75":    r.quantile(0.75).values,
            "N":           r.count().values,
        }
    ).dropna(subset=["VTEC_median", "N"])

    # Simple Hampel-type outlier suppression on 30-min medians
    med = v30["VTEC_median"]
    if med.notna().sum() >= 10:
        m0 = med.median()
        mad = (med - m0).abs().median()
        thresh = Hampel_k * 1.4826 * mad if mad > 0 else np.inf
        v30.loc[(med - m0).abs() > thresh, ["VTEC_median", "VTEC_mean"]] = np.nan

    # Attach GIM VTEC, if provided
    if gim is not None and not gim.empty:
        g = gim.set_index("time")["vtec_gim"].sort_index()
        g_al = g.reindex(
            v30["time"], method="nearest", tolerance=pd.Timedelta("15min")
        )
        v30["vtec_gim"] = g_al.to_numpy()
    else:
        v30["vtec_gim"] = np.nan

    return v30.dropna(how="all")


def append_csv(path: Path, df: pd.DataFrame) -> None:
    """Append a DataFrame to CSV, creating it with header if needed."""
    if df is None or df.empty:
        return
    path.parent.mkdir(parents=True, exist_ok=True)
    header = not path.exists()
    df.to_csv(path, mode="a", header=header, index=False)

# ---------- MAIN DRIVER ----------

def main() -> None:
    # Load inputs
    nav = load_nav(NAV_FILE)
    dcb = load_dcb(DCB_FILE)
    drx = load_drx(DRX_FILE)
    gim = load_gim(GIM_FILE)

    # Days: intersection of days in DCB and DRX, filtered by DATE_START/DATE_END
    days_dcb = pd.to_datetime(sorted(dcb["dcb_date"].unique()), utc=True)
    days_drx = pd.to_datetime(sorted(drx["date"].unique()), utc=True)
    all_days = pd.Index(days_dcb).intersection(days_drx)

    if DATE_START:
        all_days = all_days[all_days >= pd.Timestamp(DATE_START, tz="UTC")]
    if DATE_END:
        all_days = all_days[all_days <= pd.Timestamp(DATE_END, tz="UTC")]
    all_days = all_days.sort_values()

    if all_days.empty:
        raise SystemExit("No overlapping DCB/DRX days in the requested range.")

    print(f"Days: {len(all_days)} from {all_days[0].date()} to {all_days[-1].date()}")

    # Clear previous outputs
    for p in [OUT_30MIN_MS, OUT_30MIN_WL, OUT_DAILY_MS, OUT_DAILY_WL, OUT_MATCH]:
        if p.exists():
            p.unlink()

    match_rows: List[Dict[str, Any]] = []

    for t0 in all_days:
        day_str = str(t0.date())

        # Receiver biases (MS, WLS) for this day
        row = drx[drx["date"] == t0.floor("D")]
        if row.empty:
            print(f"Skip {day_str}: missing D_rx.")
            continue

        D_ms = float(row["D_rx_MS_m"].iloc[0])
        D_wls = float(row["D_rx_WLS_m"].iloc[0])

        # Observations + NAV merge
        obs_day = stack_obs_for_day(t0)
        if obs_day is None or obs_day.empty:
            print(f"Skip {day_str}: no observations.")
            continue

        m = merge_nav(obs_day, nav, tol=MERGE_TOL)
        total_obs = int(len(obs_day))
        matched = 0 if m is None or m.empty else int(m["ephem_time"].notna().sum())
        rate = 100.0 * matched / max(total_obs, 1)

        if matched > 0:
            sel = m["ephem_time"].notna()
            dt_min = (
                m.loc[sel, "time"] - m.loc[sel, "ephem_time"]
            ).dt.total_seconds() / 60.0
            p50 = float(np.nanmedian(np.abs(dt_min)))
            p95 = float(np.nanpercentile(np.abs(dt_min), 95))
        else:
            p50 = p95 = np.nan

        match_rows.append(
            {
                "date": t0.floor("D"),
                "total_obs": total_obs,
                "matched_obs": matched,
                "rate_pct": rate,
                "abs_dt_p50_min": p50,
                "abs_dt_p95_min": p95,
            }
        )

        if m is None or m.empty:
            print(f"Warning {day_str}: no NAV match.")
            continue

        # DCB for this day
        dcb_day = dcb[dcb["dcb_date"] == t0.floor("D")][["prn", "sat_dcb_m"]]
        if dcb_day.empty:
            print(f"Skip {day_str}: missing DCB.")
            continue

        # Geometry
        sv = np.vstack(
            [sat_ecef_from_navrow(row["time"], row) for _, row in m.iterrows()]
        )
        m["sv_x"], m["sv_y"], m["sv_z"] = sv[:, 0], sv[:, 1], sv[:, 2]
        m["elev_deg"] = [
            elevation_deg(RX_XYZ, (x, y, z))
            for x, y, z in zip(m["sv_x"], m["sv_y"], m["sv_z"])
        ]

        # Corrected observable y_m
        m = attach_sat_dcb(m, dcb_day)
        if m.empty:
            print(f"Skip {day_str}: no usable PRN.")
            continue

        # ---- MS branch ----
        vt_ms, diag_ms = vtec_from_fixed_drx(m, D_ms, elev_min=ELEV_MIN_DEG_BASE)
        if vt_ms is None or vt_ms.empty:
            print(f"Skip {day_str}: MS solution not usable.")
        else:
            gim_off_ms = daily_gim_offset(vt_ms, gim) if gim is not None else None
            used_mitig_ms = False

            # If |offset| >= 20 TECU, tighten elevation mask
            if gim_off_ms is not None and abs(gim_off_ms) >= GIM_OFFSET_FLAG_TECU:
                m2 = m[m["elev_deg"] >= ELEV_MIN_DEG_BOOST].copy()
                vt2, d2 = vtec_from_fixed_drx(m2, D_ms, elev_min=ELEV_MIN_DEG_BOOST)
                if vt2 is not None and not vt2.empty:
                    vt_ms, diag_ms, used_mitig_ms = vt2, d2, True

            # 30-min stats + GIM
            v30_ms = bin_30min_with_gim(vt_ms, gim)
            if not v30_ms.empty:
                append_csv(
                    OUT_30MIN_MS, v30_ms.assign(date_utc=t0.floor("D"))
                )

            # Daily stats
            vt_idx = vt_ms.set_index("time")["VTEC_TECU"].sort_index()
            gd = vt_idx.agg(
                VTEC_median="median",
                VTEC_mean="mean",
                VTEC_min="min",
                VTEC_max="max",
                VTEC_q25=lambda s: s.quantile(0.25),
                VTEC_q75=lambda s: s.quantile(0.75),
                VTEC_std="std",
                N="count",
            ).to_frame().T
            gd.insert(0, "date", t0.floor("D"))
            gd["D_rx_m_MS"] = float(D_ms)
            gd["gim_offset_tecu"] = (
                float(gim_off_ms) if gim_off_ms is not None else np.nan
            )
            gd["offset_flag_abs_ge_20"] = bool(
                gim_off_ms is not None and abs(gim_off_ms) >= GIM_OFFSET_FLAG_TECU
            )
            gd["mitigation_used"] = bool(used_mitig_ms)
            gd["N_epochs"] = diag_ms.get("N_epochs", np.nan)
            gd["Nsat_mean"] = diag_ms.get("Nsat_mean", np.nan)
            append_csv(OUT_DAILY_MS, gd)

        # ---- WLS branch ----
        vt_wl, diag_wl = vtec_from_fixed_drx(m, D_wls, elev_min=ELEV_MIN_DEG_BASE)
        if vt_wl is None or vt_wl.empty:
            print(f"Skip {day_str}: WLS solution not usable.")
        else:
            gim_off_wl = daily_gim_offset(vt_wl, gim) if gim is not None else None
            used_mitig_wl = False

            if gim_off_wl is not None and abs(gim_off_wl) >= GIM_OFFSET_FLAG_TECU:
                m2 = m[m["elev_deg"] >= ELEV_MIN_DEG_BOOST].copy()
                vt2, d2 = vtec_from_fixed_drx(m2, D_wls, elev_min=ELEV_MIN_DEG_BOOST)
                if vt2 is not None and not vt2.empty:
                    vt_wl, diag_wl, used_mitig_wl = vt2, d2, True

            v30_wl = bin_30min_with_gim(vt_wl, gim)
            if not v30_wl.empty:
                append_csv(
                    OUT_30MIN_WL, v30_wl.assign(date_utc=t0.floor("D"))
                )

            vt_idx = vt_wl.set_index("time")["VTEC_TECU"].sort_index()
            gd = vt_idx.agg(
                VTEC_median="median",
                VTEC_mean="mean",
                VTEC_min="min",
                VTEC_max="max",
                VTEC_q25=lambda s: s.quantile(0.25),
                VTEC_q75=lambda s: s.quantile(0.75),
                VTEC_std="std",
                N="count",
            ).to_frame().T
            gd.insert(0, "date", t0.floor("D"))
            gd["D_rx_m_WLS"] = float(D_wls)
            gd["gim_offset_tecu"] = (
                float(gim_off_wl) if gim_off_wl is not None else np.nan
            )
            gd["offset_flag_abs_ge_20"] = bool(
                gim_off_wl is not None and abs(gim_off_wl) >= GIM_OFFSET_FLAG_TECU
            )
            gd["mitigation_used"] = bool(used_mitig_wl)
            gd["N_epochs"] = diag_wl.get("N_epochs", np.nan)
            gd["Nsat_mean"] = diag_wl.get("Nsat_mean", np.nan)
            append_csv(OUT_DAILY_WL, gd)

        print(
            f"{day_str}  D_rx_MS={D_ms:.3f}  "
            f"D_rx_WLS={D_wls:.3f}  match={rate:.1f}%"
        )

    # Save match stats
    pd.DataFrame(match_rows).sort_values("date").to_csv(OUT_MATCH, index=False)
    print(
        f"\n→ 30-min MS : {OUT_30MIN_MS}\n"
        f"→ 30-min WLS : {OUT_30MIN_WL}\n"
        f"→ Daily MS   : {OUT_DAILY_MS}\n"
        f"→ Daily WLS  : {OUT_DAILY_WL}\n"
        f"→ Match      : {OUT_MATCH}"
    )


if __name__ == "__main__":
    pd.set_option("display.width", 160)
    pd.set_option("display.max_columns", 20)
    main()


Generate dataframe with solar and geo indices with daily max

In [None]:
"""
Merge daily VTEC statistics with solar/geomagnetic indices and the time of
maximum VTEC (UTC).

Inputs:
- Daily VTEC stats CSV (one row per day).
- 30-min VTEC stats CSV (to extract daily max VTEC and its time).
- F10.7 index (CSV).
- GFZ index file (Kp, Ap, sunspot number).
- Kyoto Dst index file (text).

Output:
- A single daily CSV with:
  * VTEC daily stats
  * F10.7 (obs/adj)
  * Kp (mean, max), Ap, SN
  * Dst (min, mean, count)
  * time / bin of max VTEC
  * basic solar label (Low/High/NA).
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Union

import numpy as np
import pandas as pd
from datetime import datetime, timezone

# ============================
# CONFIG (ADAPT THESE PATHS)
# ============================
STATS_CSV = Path("data/2015_2025_MS_CS_VTEC_daily_stats.csv")
CSV_30MIN = Path("data/2015_2025_MS_CS_VTEC_30min_stats.csv")  # time (UTC), VTEC_median at least
CSV_F107  = Path("data/indices/F10_7_102015-092025.csv")
FILE_KP   = Path("data/indices/GFZ_all_indices.txt")
FILE_DST  = Path("data/indices/Kyoto_DST_index.txt")

LOCAL_TZ  = "Africa/Casablanca"  # currently unused, kept for future local-time work

# Thresholds (edit as needed)
THR_F107_LOW = 120.0   # sfu, "low solar" vs "high solar"

# Additional thresholds, if you later add a geomagnetic label
THR_KP_Q  = 2.0     # Kp_max <= 2
THR_AP_Q  = 7.0     # Ap <= 7
THR_DST_Q = -20.0   # Dst_min >= -20 nT

THR_F107  = 120.0   # same as THR_F107_LOW, kept for compatibility
THR_KP    = 3.0
THR_DST   = -30.0

# ---------- Generic helpers ----------

def _to_utc_series(tcol: pd.Series) -> pd.Series:
    """Convert a time column to UTC timestamps."""
    if pd.api.types.is_numeric_dtype(tcol):
        # If numeric and expressed as (days since 1970-01-01) or similar,
        # adjust accordingly; here we use Unix seconds via 2440587.5 as example.
        return pd.to_datetime((tcol.astype(float) - 2440587.5) * 86400.0, unit="s", utc=True)
    return pd.to_datetime(tcol, utc=True, errors="coerce")


def _coerce_date_utc_ts(df: pd.DataFrame, col: str = "date_utc") -> pd.DataFrame:
    """Ensure df[col] is a normalized (midnight) UTC timestamp."""
    if col not in df.columns:
        return df
    s = pd.to_datetime(df[col], errors="coerce", utc=True)
    s = s.dt.tz_convert("UTC").dt.normalize()
    df[col] = s
    return df

# ---------- F10.7 (CSV) → daily UTC ----------

def load_f107_utc(csv_path: Union[str, Path]) -> pd.DataFrame:
    """
    Load F10.7 index from CSV and build one row per UTC day with:
      date_utc, f107_obs, f107_adj
    """
    df = pd.read_csv(csv_path)
    tcol, obs_col, adj_col = "time", "F10_7_obs", "F10_7_adj"
    if tcol not in df.columns:
        raise ValueError("F10.7: missing 'time' column.")

    if obs_col not in df.columns:
        # Try to infer observed column name
        cand = [
            c for c in df.columns
            if "obs" in c.lower().replace(" ", "")
            and any(k in c.lower() for k in ("10.7", "f107", "f10_7"))
        ]
        if not cand:
            raise ValueError("F10.7: cannot find an 'observed' column.")
        obs_col = cand[0]

    ts_utc = _to_utc_series(df[tcol])
    out = pd.DataFrame(
        {
            "date_utc": ts_utc.dt.floor("D"),
            "f107_obs": pd.to_numeric(df[obs_col], errors="coerce"),
            "f107_adj": pd.to_numeric(df[adj_col], errors="coerce") if adj_col in df.columns else np.nan,
        }
    ).dropna(subset=["date_utc"])

    out = (
        out.drop_duplicates(subset=["date_utc"], keep="last")
           .sort_values("date_utc")
    )
    return _coerce_date_utc_ts(out, "date_utc")

# ---------- GFZ text → daily Kp (mean/max), Ap, SN ----------

# GFZ format (classic all-indices file), per day:
# tokens:
# 0:Y 1:M 2:D 3:days 4:days_m 5:BSR 6:dB
# 7..14: 8 × Kp
# 15..22: 8 × ap
# 23: Ap
# 24: SN
# 25: Fobs
# 26: Fadj
# 27: D

def parse_gfz_kp_daily_utc(path: Path) -> pd.DataFrame:
    """Parse GFZ daily file to Kp, Ap, SN per UTC day."""
    if not path or not path.exists():
        return pd.DataFrame(columns=["date_utc"])

    rows = []
    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            if not line.strip() or line.startswith("#"):
                continue
            toks = line.split()
            if len(toks) < 27:
                # Fallback for atypical line, try to read date from fixed positions
                try:
                    year = int(line[0:4])
                    month = int(line[5:7])
                    day = int(line[8:10])
                except Exception:
                    continue
                dt = pd.Timestamp(datetime(year, month, day, tzinfo=timezone.utc))
                rows.append(
                    {
                        "date_utc": dt,
                        "kp_daily_mean": np.nan,
                        "kp_daily_max": np.nan,
                        "Ap": np.nan,
                        "SN": np.nan,
                    }
                )
                continue

            try:
                year, month, day = map(int, toks[:3])
                kp_vals = np.array(list(map(float, toks[7:15])), float)   # 8 × Kp
                ap_vals = np.array(list(map(float, toks[15:23])), float)  # 8 × ap (unused here)
                Ap = float(toks[23]) if toks[23] not in ("-1", "9999") else np.nan
                SN = float(toks[24]) if toks[24] not in ("-1", "9999") else np.nan
            except Exception:
                continue

            dt = pd.Timestamp(datetime(year, month, day, tzinfo=timezone.utc))
            rows.append(
                {
                    "date_utc": dt,
                    "kp_daily_mean": float(np.nanmean(kp_vals)) if np.isfinite(np.nanmean(kp_vals)) else np.nan,
                    "kp_daily_max": float(np.nanmax(kp_vals))  if np.isfinite(np.nanmax(kp_vals))  else np.nan,
                    "Ap": Ap,
                    "SN": SN,
                }
            )

    df = pd.DataFrame(rows).sort_values("date_utc").reset_index(drop=True)
    return _coerce_date_utc_ts(df, "date_utc")

# ---------- Kyoto Dst text → daily stats (UTC) ----------

def parse_dst_kyoto_daily_utc(path: Path) -> pd.DataFrame:
    """
    Parse DST text file from WDC Kyoto and compute daily:
      Dst_min, Dst_mean, Dst_n (number of hourly samples).
    """
    if not path or not path.exists():
        return pd.DataFrame(columns=["date_utc"])

    pat = re.compile(r"^DST(?P<yy>\d{2})(?P<mm>\d{2})\*(?P<dd>\d{2})")
    rows = []

    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            m = pat.match(line)
            if not m:
                continue

            yy = int(m.group("yy"))
            mm = int(m.group("mm"))
            dd = int(m.group("dd"))
            # For 2015–2025 use 2000+yy convention
            year = 2000 + yy

            # Extract values after the 'X###' marker
            after = re.split(r"X\d{3}", line, maxsplit=1)
            vals = []
            if len(after) == 2:
                nums = re.findall(r"-?\d+", after[1])
                vals = [float(n) for n in nums]
            if len(vals) < 24:
                continue

            hourly = np.array(vals[-24:], dtype=float)
            dt = pd.Timestamp(datetime(year, mm, dd, tzinfo=timezone.utc))
            rows.append(
                {
                    "date_utc": dt,
                    "Dst_min": float(np.nanmin(hourly)) if np.isfinite(np.nanmin(hourly)) else np.nan,
                    "Dst_mean": float(np.nanmean(hourly)) if np.isfinite(np.nanmean(hourly)) else np.nan,
                    "Dst_n": int(np.sum(~np.isnan(hourly))),
                }
            )

    df = pd.DataFrame(rows).sort_values("date_utc").reset_index(drop=True)
    return _coerce_date_utc_ts(df, "date_utc")

# ---------- Daily max from 30-min series (UTC) ----------

def daily_max_from_30min_utc(
    csv_30min: Union[str, Path],
    time_col: str = "time",
    tec_col: str = "VTEC_median",
) -> pd.DataFrame:
    """
    From a 30-min VTEC series, compute the daily maximum VTEC and its UTC time.

    Returns columns:
      date_utc, max_bin_utc, max_ts_utc, VTEC_max_from_30min,
      max_hour_utc, max_time_utc_str
    """
    raw = pd.read_csv(csv_30min)
    ts_utc = pd.to_datetime(raw[time_col], errors="coerce", utc=True)

    df = pd.DataFrame(
        {
            "date_utc": ts_utc.dt.floor("D"),
            "slot_utc": (ts_utc.dt.hour * 2 + (ts_utc.dt.minute // 30)).astype("Int64"),
            "ts_utc": ts_utc,
            "tec": pd.to_numeric(raw[tec_col], errors="coerce"),
        }
    ).dropna(subset=["tec"])

    # Keep the last record per (day, slot)
    df = (
        df.sort_values(["date_utc", "slot_utc", "ts_utc"])
          .drop_duplicates(["date_utc", "slot_utc"], keep="last")
    )

    if df.empty:
        return pd.DataFrame(
            columns=[
                "date_utc",
                "max_bin_utc",
                "max_ts_utc",
                "VTEC_max_from_30min",
                "max_hour_utc",
                "max_time_utc_str",
            ]
        )

    idx = df.groupby("date_utc")["tec"].idxmax()
    out = (
        df.loc[idx, ["date_utc", "slot_utc", "ts_utc", "tec"]]
        .rename(
            columns={
                "slot_utc": "max_bin_utc",
                "ts_utc": "max_ts_utc",
                "tec": "VTEC_max_from_30min",
            }
        )
        .sort_values("date_utc")
    )

    out["max_hour_utc"] = out["max_ts_utc"].dt.hour + out["max_ts_utc"].dt.minute / 60.0
    out["max_time_utc_str"] = out["max_ts_utc"].dt.strftime("%H:%M")
    return out

# ============================
#          MAIN
# ============================

def main() -> None:
    # 1) Daily VTEC stats (UTC)
    stats = pd.read_csv(STATS_CSV, parse_dates=["date"])
    if stats["date"].dt.tz is None:
        stats["date_utc"] = stats["date"].dt.tz_localize("UTC").dt.normalize()
    else:
        stats["date_utc"] = stats["date"].dt.tz_convert("UTC").dt.normalize()
    stats = _coerce_date_utc_ts(stats, "date_utc")

    # 2) Indices (UTC)
    f107 = load_f107_utc(CSV_F107)             # date_utc, f107_obs, f107_adj
    kp   = parse_gfz_kp_daily_utc(FILE_KP)     # date_utc, kp_daily_mean, kp_daily_max, Ap, SN
    dst  = parse_dst_kyoto_daily_utc(FILE_DST) # date_utc, Dst_min, Dst_mean, Dst_n

    # 3) Time of max VTEC (UTC)
    dmax = daily_max_from_30min_utc(CSV_30MIN)

    # Ensure all have consistent date_utc
    for _df in (stats, f107, kp, dst, dmax):
        _coerce_date_utc_ts(_df, "date_utc")

    # 4) Merge
    daily = (
        stats
        .merge(dmax, on="date_utc", how="left")
        .merge(f107, on="date_utc", how="left")
        .merge(kp,   on="date_utc", how="left")
        .merge(dst,  on="date_utc", how="left")
    )

    # 5) Column ordering (only if columns exist)
    first = [
        "date_utc",
        "VTEC_median", "VTEC_mean", "VTEC_min", "VTEC_max",
        "VTEC_q25", "VTEC_q75", "VTEC_std", "N",
        "D_rx_m",  # or D_rx_m_MS etc., depending on your input file
        "VTEC_max_from_30min", "max_bin_utc",
        "max_time_utc_str", "max_hour_utc", "max_ts_utc",
        "gim_offset_tecu", "offset_flag_abs_ge_20", "mitigation_used",
        "f107_obs", "f107_adj",
        "kp_daily_mean", "kp_daily_max", "Ap", "SN",
        "Dst_min", "Dst_mean", "Dst_n",
    ]
    cols = [c for c in first if c in daily.columns] + [
        c for c in daily.columns if c not in first
    ]
    daily = daily[cols]

    # 6) Solar label (Low / High / NA) based on F10.7 observed
    daily["solar_label"] = np.where(
        daily["f107_obs"].notna(),
        np.where(daily["f107_obs"] < THR_F107_LOW, "Low", "High"),
        "NA",
    )

    # If you want a geomagnetic label (Q/NQ/NA), uncomment and adapt:
    #
    # have_all = daily[["kp_daily_max", "Ap", "Dst_min"]].notna().all(axis=1)
    # quiet_mask = (
    #     (daily["kp_daily_max"] <= THR_KP_Q) &
    #     (daily["Ap"]          <= THR_AP_Q) &
    #     (daily["Dst_min"]     >= THR_DST_Q)
    # )
    # daily["geomag_label"] = np.where(
    #     ~have_all, "NA",
    #     np.where(quiet_mask, "Q", "NQ")
    # )

    # 7) Save
    out_path = STATS_CSV.with_name(STATS_CSV.stem + "_UTC_with_indices_and_max.csv")
    out_path.parent.mkdir(parents=True, exist_ok=True)
    daily.to_csv(out_path, index=False)

    # 8) Quick log
    print(f"UTC fusion saved: {out_path}")
    for col in [
        "f107_obs", "kp_daily_mean", "kp_daily_max",
        "Ap", "SN", "Dst_min", "VTEC_max_from_30min",
    ]:
        if col in daily.columns:
            print(f"Missing values — {col}: {daily[col].isna().sum()}")

    # Optional preview
    print(daily.head(5))


if __name__ == "__main__":
    main()


Générer une colonne indiquant le label Q ou D à partir des données GFZ

In [None]:
"""
Add GFZ Quiet/Disturbed labels (Q/D/NQ) to a daily VTEC stats CSV.

Inputs:
- Daily VTEC stats file (CSV) with a date column ("date_utc" or "date").
- GFZ Quiet/Disturbed days file, e.g. GFZ_Q_NQ_days_2010-2025.txt
  in the usual "10 Q + 5 D per month" format.

Output:
- A new CSV with additional columns:
    gfz_label  (Q / D / NaN)
    gfz_rank   (1..10 for Q, 1..5 for D)
    gfz_flag   (e.g. '*', 'A', 'K', etc.)
    gfz_code   (e.g. 'Q1', 'Q1*', 'D3', ...)
    geomag_label_gfz_QDNQ  (Q / D / NQ; NQ = days not listed by GFZ)
"""

from __future__ import annotations

import re
from pathlib import Path
from datetime import date

import numpy as np
import pandas as pd

# ============================
# CONFIG (ADAPT THESE PATHS)
# ============================

GFZ_FILE = Path("data/indices/GFZ_Q_NQ_days_2010-2025.txt")
CSV_DAILY_WITH_OFF = Path(
    "data/2015_2025_WLS_VTEC_daily_stats_UTC_with_indices_and_max.csv"
)

# ---------- Helpers for daily CSV ----------

def _load_daily_csv(path: Path) -> pd.DataFrame:
    """
    Load daily VTEC stats CSV and ensure we have a 'date_utc' column
    of type datetime.date.
    """
    df = pd.read_csv(path)

    if "date_utc" in df.columns:
        dt = pd.to_datetime(df["date_utc"], utc=True, errors="coerce")
    elif "date" in df.columns:
        dt = pd.to_datetime(df["date"], utc=True, errors="coerce")
    else:
        raise ValueError("Daily CSV must contain 'date_utc' or 'date' column.")

    df["date_utc"] = dt.dt.date
    return df

# ---------- GFZ Q/D file parser ----------

# Month name → month number
MONTHS = {
    "jan": 1, "feb": 2, "mar": 3, "apr": 4, "may": 5, "jun": 6,
    "jul": 7, "aug": 8, "sep": 9, "sept": 9, "oct": 10, "nov": 11, "dec": 12,
}

# Tokens look like "20", "20*", "7A", "12K", ...
token_re = re.compile(r"^(\d{1,2})([A-Za-z\*]?)$")

def parse_token(tok: str) -> tuple[int | None, str]:
    """
    Parse a single GFZ token.
    Returns (day:int, flag:str), or (None, "") if invalid.
    """
    tok = tok.strip()
    m = token_re.match(tok)
    if not m:
        return None, ""
    d = int(m.group(1))
    flag = m.group(2)  # '', '*', 'A', 'K', ...
    return d, flag


def parse_gfz_qd_file(path: Path) -> pd.DataFrame:
    """
    Parse a GFZ Q/NQ text file of the form:

        Month YYYY  d1 d2 ... (10 Quiet tokens) ... (5 Disturbed tokens)

    and return a DataFrame with columns:
        date_utc (datetime.date),
        gfz_label ('Q' or 'D'),
        gfz_rank  (1..10 for Q, 1..5 for D),
        gfz_flag  ('', '*', 'A', 'K', ...),
        gfz_code  (e.g. 'Q1', 'Q1*', 'D3', ...).
    """
    rows = []

    with path.open("r", encoding="utf-8", errors="ignore") as f:
        for raw in f:
            line = raw.strip()
            if not line or line.startswith("#"):
                continue

            parts = line.split()
            if len(parts) < 2:
                continue

            # Month key is the first token, e.g. 'Jan', 'Feb', 'Sept', ...
            mkey = parts[0].lower()[:4].strip()
            if mkey not in MONTHS:
                # Probably a header or malformed line
                continue

            try:
                yr = int(parts[1])
            except ValueError:
                continue

            mm = MONTHS[mkey]
            tokens = parts[2:]  # remaining tokens

            # Expected structure: 10 Quiet + 5 Disturbed = 15 tokens minimum
            # We still try to parse even if less, to be robust.
            q_tokens = tokens[:10]
            d_tokens = tokens[10:15]

            # Quiet days Q1..Q10
            for rank, tok in enumerate(q_tokens, start=1):
                day_num, flag = parse_token(tok)
                if day_num is None:
                    continue

                # Validate date (avoid 31 in short months)
                try:
                    dt = date(yr, mm, day_num)
                except ValueError:
                    continue

                rows.append(
                    {
                        "date_utc": dt,
                        "gfz_label": "Q",
                        "gfz_rank": rank,
                        "gfz_flag": flag,
                        "gfz_code": f"Q{rank}{flag}".strip(),
                    }
                )

            # Disturbed days D1..D5
            for rank, tok in enumerate(d_tokens, start=1):
                day_num, flag = parse_token(tok)
                if day_num is None:
                    continue
                try:
                    dt = date(yr, mm, day_num)
                except ValueError:
                    continue

                rows.append(
                    {
                        "date_utc": dt,
                        "gfz_label": "D",
                        "gfz_rank": rank,
                        "gfz_flag": flag,
                        "gfz_code": f"D{rank}{flag}".strip(),
                    }
                )

    df = pd.DataFrame(rows)
    if df.empty:
        return df

    # Keep at most one row per (date_utc, gfz_label) pair
    df = df.drop_duplicates(subset=["date_utc", "gfz_label"]).reset_index(drop=True)
    return df

# ---------- Main ----------

def main() -> None:
    # Load daily stats and GFZ Q/D info
    if not CSV_DAILY_WITH_OFF.exists():
        raise FileNotFoundError(f"Daily stats file not found: {CSV_DAILY_WITH_OFF}")
    if not GFZ_FILE.exists():
        raise FileNotFoundError(f"GFZ Q/D file not found: {GFZ_FILE}")

    daily = _load_daily_csv(CSV_DAILY_WITH_OFF)
    gfz = parse_gfz_qd_file(GFZ_FILE)

    # Merge GFZ labels into daily stats (do not overwrite existing columns)
    daily2 = daily.merge(
        gfz[["date_utc", "gfz_label", "gfz_rank", "gfz_flag", "gfz_code"]],
        on="date_utc",
        how="left",
    )

    # Build a simple "Q / D / NQ" label from GFZ only
    daily2["geomag_label_gfz_QDNQ"] = np.where(
        daily2["gfz_label"].eq("Q"),
        "Q",
        np.where(daily2["gfz_label"].eq("D"), "D", "NQ"),
    )

    # OPTIONAL: if you want to also drive an existing 'geomag_label' column:
    # daily2["geomag_label"] = daily2["geomag_label_gfz_QDNQ"]

    # Output file
    out_path = CSV_DAILY_WITH_OFF.with_name(
        CSV_DAILY_WITH_OFF.stem + "_with_GFZlabels.csv"
    )
    out_path.parent.mkdir(parents=True, exist_ok=True)
    daily2.to_csv(out_path, index=False)

    # Summary
    nQ = int((daily2["gfz_label"] == "Q").sum())
    nD = int((daily2["gfz_label"] == "D").sum())
    print(f"GFZ labels added. Written: {out_path}")
    print(f"Days recognized by GFZ: Q={nQ}, D={nD}, remaining = NQ")

if __name__ == "__main__":
    main()
