In [12]:
import numpy as np
import pandas as pd
from collections import deque
from typing import Tuple, Optional

# ------------------------------------------------------------------------------
# 基础工具与公共常量
# ------------------------------------------------------------------------------

FLAG_Good = 0             # 正常
FLAG_Erroneous = 1        # 异常
FLAG_Suspect = 2          # 可疑
FLAG_Interpolated = 3     # 插值

# 阈值
Z_THR = 9
RB_THR = 100
df_metadata = pd.read_csv(r"../Metadata.csv")


# ------------------------------------------------------------------------------
# (1) 频率判定 + 规范化时间轴 + 同日/同月聚合
# ------------------------------------------------------------------------------

def detect_and_convert(df, date_col= "date", monthly_threshold = 2):
    """
    判断日频或月频，并做规范化处理：
    1) 将时间标准化到日期（去时分秒）；
    2) 先按“日”聚合（同日取均值）；
    3) 如果任一 (year, month) 的不同“日期”数 >= monthly_threshold，则判为日频(D)，否则判为月频(M)；
    4) 月频时将日期标准化到“月初”，并再次聚合（同月取均值）。

    返回: (freq, df_out) 其中 freq ∈ {'D','M'}
    """
    out = df.copy()
    out[date_col] = pd.to_datetime(out[date_col], errors="coerce").dt.normalize()
    out = out.dropna(subset=[date_col]).sort_values(date_col)

    # 先按天聚合（同日多值求均值）
    out = out.groupby(date_col, as_index=False).mean(numeric_only=True)

    # 统计每月包含的不同日期数量
    d = out[date_col]
    counts = d.groupby([d.dt.year, d.dt.month]).nunique()

    if (counts >= monthly_threshold).any():
        # 日频
        freq = "D"
    else:
        # 月频：标准化到月初，再聚合一次避免同月多条
        freq = "M"
        out[date_col] = out[date_col].dt.to_period("M").dt.to_timestamp(how="start")
        out = out.groupby(date_col, as_index=False).mean(numeric_only=True)

    out = out.sort_values(date_col).reset_index(drop=True)
    return freq, out


# ------------------------------------------------------------------------------
# (2) 在连续片段内补齐规则化时间索引（跨大空档不补）
# ------------------------------------------------------------------------------

def ensure_regular_time_index(df, date_col, freq, segment_break_d = 60, segment_break_m = 12):
    """
    将各“连续片段”内部补齐规则化的时间索引（不跨大空档）：
    - freq='D'：按日补，若相邻日期间隔 > segment_break_d 视为断段；
    - freq='M'：按月初补，若相邻月份间隔 > segment_break_m 视为断段。
    """
    out = df.copy()
    out[date_col] = pd.to_datetime(out[date_col], errors="coerce")
    out = out.dropna(subset=[date_col]).set_index(date_col).sort_index()

    if freq.upper().startswith("D"):
        segment_break = segment_break_d
        gaps = out.index.to_series().diff().dt.days.fillna(0).astype(int)
        new_segment = gaps > segment_break

        def make_full_index(idx):
            return pd.date_range(idx.min().normalize(), idx.max().normalize(), freq="D", name=out.index.name)

    elif freq.upper().startswith("M"):
        segment_break = segment_break_m
        this_p = out.index.to_period("M")
        prev_p = this_p.shift(1)
        gaps = ((this_p.year - prev_p.year) * 12 + (this_p.month - prev_p.month)).fillna(0).astype(int)
        new_segment = gaps > segment_break

        def make_full_index(idx):
            start = pd.Period(idx.min(), freq="M").start_time
            end = pd.Period(idx.max(), freq="M").start_time
            return pd.date_range(start, end, freq="MS", name=out.index.name)
    else:
        raise ValueError("freq 仅支持 'D' 或 'M'")

    seg_id = new_segment.cumsum()
    dtypes = out.dtypes

    pieces = []
    for _, seg_df in out.groupby(seg_id, sort=True):
        full_idx = make_full_index(seg_df.index)
        seg_re = seg_df.reindex(full_idx, copy=False)
        # 尝试恢复原列 dtype
        for col, dtype in dtypes.items():
            if col in seg_re.columns:
                try:
                    seg_re[col] = seg_re[col].astype(dtype)
                except Exception:
                    pass
        pieces.append(seg_re)

    out2 = (
        pd.concat(pieces, axis=0)
        .sort_index()
        .reset_index()
        .rename(columns={"index": date_col})
        .sort_values(date_col)
        .reset_index(drop=True)
    )
    return out2


# ------------------------------------------------------------------------------
# (3) 标准 Z 分数——FLAG_Erroneous
# ------------------------------------------------------------------------------

def std_z(x):
    """标准 Z 分数（总体标准差，ddof=0）。全 NaN 或 std=0 时返回 0。"""
    x = pd.to_numeric(x, errors="coerce")
    mean = np.nanmean(x.values)
    std = np.nanstd(x.values, ddof=0)
    if not np.isfinite(std) or std == 0:
        return pd.Series(np.zeros(len(x)), index=x.index)
    return pd.Series((x - mean) / std, index=x.index)


# ------------------------------------------------------------------------------
# (4) 稳健 Z 分数（Median/MAD）——FLAG_Erroneous
# ------------------------------------------------------------------------------

def robust_z(x, eps=1e-12):
    """
    基于 Median/MAD 的稳健 Z 分数，对极端值更不敏感。
    """
    x = pd.to_numeric(x, errors="coerce")
    if x.notna().sum() == 0:
        return pd.Series(np.zeros(len(x)), index=x.index)
    med = np.nanmedian(x)
    mad = np.nanmedian(np.abs(x - med))
    denom = 1.4826 * mad if mad > 0 else eps  # 1.4826 使正态分布下与 std 可比
    return (x - med) / denom



# ------------------------------------------------------------------------------
# (5) 大跳变检测与清洗（容忍平台跃迁）——FLAG_Erroneous
# ------------------------------------------------------------------------------

def big_jump_flag_one_clean(df, val_col, flag_col, date_col="date", max_gap_days=60, baseline_k=10, confirm_k=1, confirm_support=1, multiple=1.0, rebase_on_step=True ):
    """
    标记“异常尖峰（spike）”，并容忍“平台跃迁（step change）”。

    规则（动态阈值）：
    - 基线 = 最近 baseline_k 个“正常点”中位数；
    - 阈值 = multiple * |上一个正常值|（若无上一个正常值则跳过判定）；
    - |当前值 - 基线| ≥ 阈值 → 候选跳变；
        - 向前看 confirm_k 个点（间隔均 ≤ max_gap_days）；
        - 若其中 ≥ confirm_support 个与“当前值”接近（差 < 阈值），则判为 step（不置异常；如 rebase_on_step=True 则切换基线）；
        - 否则判为 spike（置 flag=1）。
    """
    out = df.copy().sort_values(date_col).reset_index(drop=True)
    out[flag_col] = out[flag_col].fillna(0).astype(int)

    dt = pd.to_datetime(out[date_col], errors="coerce")
    prev_dt: Optional[pd.Timestamp] = None
    clean_vals: deque[float] = deque(maxlen=baseline_k)
    prev_val: Optional[float] = None

    for i in range(len(out)):
        val = out.at[i, val_col]
        cur_dt = dt.iat[i]

        # 连续性检查
        if prev_dt is None:
            gap_ok = False
        else:
            gap_ok = (cur_dt - prev_dt) <= pd.Timedelta(days=max_gap_days)
        prev_dt = cur_dt

        # 满足条件才进行判定
        if gap_ok and len(clean_vals) > 0 and pd.notna(val):
            baseline = float(np.median(clean_vals))

            if prev_val is not None and np.isfinite(prev_val):
                threshold_i = max(abs(prev_val) * multiple, 1e-9)  # 避免 0 阈值
            else:
                threshold_i = 0.0

            if abs(float(val) - baseline) >= threshold_i and threshold_i > 0:
                # 候选跳变 → 前视确认
                support = 0
                last_dt = cur_dt
                for j in range(1, confirm_k + 1):
                    if i + j >= len(out):
                        break
                    if (dt.iat[i + j] - last_dt) > pd.Timedelta(days=max_gap_days):
                        break
                    nxt = out.at[i + j, val_col]
                    if pd.notna(nxt) and abs(float(nxt) - float(val)) < threshold_i:
                        support += 1
                    last_dt = dt.iat[i + j]

                if support >= confirm_support:
                    # step：不标异常，必要时重置基线
                    if rebase_on_step:
                        clean_vals.clear()
                    clean_vals.append(float(val))
                    prev_val = float(val)
                    continue
                else:
                    # spike：置异常
                    out.at[i, flag_col] = FLAG_Erroneous
                    continue

        # 正常点纳入基线
        if out.at[i, flag_col] == FLAG_Good and pd.notna(val):
            clean_vals.append(float(val))
            prev_val = float(val)

    return out



# ------------------------------------------------------------------------------
# (6) Level & Storage 单调性不匹配（简单规则版）——FLAG_Suspect
# ------------------------------------------------------------------------------

def flag_storage_to_level(df, level_col="level", storage_col="storage", flag_level_col="flag_level", flag_storage_col="flag_storage"):

    out = df.copy()
    for col in [flag_level_col, flag_storage_col]:
        out[col] = out[col].fillna(0).astype(int)

    # 只挑选两个 flag 都为 0 的数据
    mask_ok = (out[flag_level_col].eq(0)) & (out[flag_storage_col].eq(0))
    valid = out.loc[mask_ok, [level_col, storage_col]].dropna()

    if valid.empty:
        return out

    # 按 storage 升序排序
    sorted_vals = valid.sort_values(storage_col).reset_index()

    # 检查 level 是否随 storage 单调递增
    level_diff = sorted_vals[level_col].diff()

    for i in sorted_vals.index:
        if i == 0:
            continue
        if level_diff[i] < 0:  
            idx_bad = sorted_vals.loc[i, "index"]
            # 决策：到底是 level 错，还是 storage 错？
            # 简单规则：如果 storage 跟前后点很接近，但 level 明显偏离 → level 错
            # 否则认为是 storage 错
            prev_level = sorted_vals.loc[i-1, level_col]
            cur_level = sorted_vals.loc[i, level_col]
            prev_storage = sorted_vals.loc[i-1, storage_col]
            cur_storage = sorted_vals.loc[i, storage_col]

            if abs(cur_storage - prev_storage) < 1.0 and abs(cur_level - prev_level) > 5.0:
                out.loc[idx_bad, flag_level_col] = FLAG_Suspect

    return out


def flag_level_to_storage(df, level_col="level", storage_col="storage", flag_level_col="flag_level", flag_storage_col="flag_storage", abs_tol=0.1, rel_tol=0.002, side_k=5, require_both_sides=True):
    """
    在 level 升序视角下，使用左右各 side_k 个邻居的中位数作为“正常参考”，
    当当前 storage 同时偏离两侧参考值（或只要一侧，取决于 require_both_sides）超过
      abs_tol + rel_tol * median_side
    时，标记为异常（flag=3）。
    """
    g = df.copy()
    mask_ok = g[level_col].notna() & g[storage_col].notna() & g[flag_storage_col].eq(FLAG_Good) & g[flag_level_col].eq(FLAG_Good)
    if mask_ok.sum() <= 2:
        return g

    tmp = g.loc[mask_ok, [level_col, storage_col]].copy()
    order = tmp[level_col].argsort(kind="mergesort").to_numpy()
    idx_sorted = tmp.index.to_numpy()[order]
    L = tmp[level_col].to_numpy()[order].astype(float)
    S = tmp[storage_col].to_numpy()[order].astype(float)
    n = S.size

    bad_sorted = np.zeros(n, dtype=bool)

    for i in range(n):
        left_vals = S[max(0, i - side_k):i]
        right_vals = S[i + 1:min(n, i + 1 + side_k)]

        if left_vals.size == 0 or right_vals.size == 0:
            continue

        left_med = np.median(left_vals[np.isfinite(left_vals)])
        right_med = np.median(right_vals[np.isfinite(right_vals)])

        dl = abs(S[i] - left_med)
        dr = abs(S[i] - right_med)
        thr_l = abs_tol + rel_tol * max(left_med, 0.0)
        thr_r = abs_tol + rel_tol * max(right_med, 0.0)

        cond_l = dl > thr_l
        cond_r = dr > thr_r

        bad_sorted[i] = (cond_l and cond_r) if require_both_sides else (cond_l or cond_r)

    if bad_sorted.any():
        g.loc[idx_sorted[bad_sorted], flag_storage_col] = FLAG_Suspect

    return g


# ------------------------------------------------------------------------------
# (7) 单调映射互补：storage ↔ level——FLAG_Interpolated
# ------------------------------------------------------------------------------

def fill_by_counterpart(df, level_col="level", storage_col="storage", flag_level_col="flag_level",flag_storage_col="flag_storage"):
    """
    利用“单调映射”在 flag=0 的已知样本中建立两列的经验映射，互相补值。
    - 仅在另一列 flag=0 且当前列缺失的情况下尝试补值；
    - 命中精确/四舍五入匹配，否则用“邻区平均”兜底；
    - 成功补值且原 flag=0 → 置 flag=3（FLAG_Interpolated）。
    """
    g = df.copy()

    clean_both = g[flag_level_col].eq(FLAG_Good) & g[flag_storage_col].eq(FLAG_Good)

    # storage -> level（两列都干净）
    base_L = g.loc[clean_both, [level_col, storage_col]].dropna()
    s2l = base_L.groupby(storage_col)[level_col].median().sort_index()

    # level -> storage（两列都干净）
    base_S = g.loc[clean_both, [level_col, storage_col]].dropna()
    l2s = base_S.groupby(level_col)[storage_col].median().sort_index()


    def _round_layers_lookup(val, src, tgt, okmask):
        for rd in (2, 1, 0):
            m = okmask & g[src].notna() & g[tgt].notna() & (g[src].round(rd) == round(val, rd))
            if m.any():
                return g.loc[m, tgt].median()
        return np.nan

    def _bracket_average(series_map: pd.Series, val: float):
        if pd.isna(val) or series_map.empty:
            return np.nan
        ix = series_map.index.to_numpy(dtype=float)
        pos = np.searchsorted(ix, val, side="left")

        left_v = series_map.loc[ix[pos - 1]] if pos > 0 else np.nan
        right_v = series_map.loc[ix[pos]] if pos < ix.size else np.nan

        if np.isfinite(left_v) and np.isfinite(right_v):
            return 0.5 * (left_v + right_v)
        return left_v if np.isfinite(left_v) else (right_v if np.isfinite(right_v) else np.nan)

    def _find_level_from_storage(s):
        if s in s2l.index:
            return s2l.loc[s]
        v = _round_layers_lookup(s, storage_col, level_col, g[flag_storage_col].eq(FLAG_Good))
        return v if pd.notna(v) else _bracket_average(s2l, float(s))

    def _find_storage_from_level(lv):
        if lv in l2s.index:
            return l2s.loc[lv]
        v = _round_layers_lookup(lv, level_col, storage_col, g[flag_level_col].eq(FLAG_Good))
        return v if pd.notna(v) else _bracket_average(l2s, float(lv))

    # 补 level（要求 storage 有值且 storage 的 flag=Good）
    need_L = g[level_col].isna() & g[storage_col].notna()# & g[flag_storage_col].eq(FLAG_Good)
    for i, row in g.loc[need_L].iterrows():
        nv = _find_level_from_storage(row[storage_col])
        if pd.notna(nv):
            g.at[i, level_col] = nv
            if g.at[i, flag_level_col] == FLAG_Good:
                g.at[i, flag_level_col] = FLAG_Interpolated

    # 补 storage（要求 level 有值且 level 的 flag≠Erroneous--后续对 level 和 storage 都缺失的情况下进行单列插值）
    need_S = g[storage_col].isna() & g[level_col].notna()# & ~g[flag_level_col].eq(FLAG_Erroneous)
    for i, row in g.loc[need_S].iterrows():
        nv = _find_storage_from_level(row[level_col])
        if pd.notna(nv):
            g.at[i, storage_col] = nv
            if g.at[i, flag_storage_col] == FLAG_Good:
                g.at[i, flag_storage_col] = FLAG_Interpolated

    return g


# ------------------------------------------------------------------------------
# (8) 用 storage 推断缺失的 level——FLAG_Interpolated
# ------------------------------------------------------------------------------

def fill_level_from_storage(df, storage_col="storage", level_col="level", flag_level_col="flag_level", storage_tol=0.0 ):
    """
    仅在当前行 level 缺失且 storage 有值时尝试补值：
    1) 先看“相同 storage（容差内）”的已知 level：
       - 命中 1 个 → 直接用；
       - 命中 2 个 → 取均值；
       - 命中 ≥3 个 → 取中位数；
    2) 否则在“已知 (storage, level)”样本的 storage 升序上，找左右邻居：
       - 若左右均存在且 storage 不同 → 线性插值，并将 flag_level=2；
       - 若仅一侧存在 → 取该侧 level；
       - 否则不填。
    """
    out = df.copy()
    out[flag_level_col] = out[flag_level_col].fillna(FLAG_Good).astype(int)

    # 仅用“已知 level & storage”的点作为参考
    known_mask = out[level_col].notna() & out[storage_col].notna()
    if known_mask.sum() == 0:
        return out

    known = out.loc[known_mask, [storage_col, level_col]]
    order = np.argsort(known[storage_col].to_numpy(dtype=float))
    ks = known[storage_col].to_numpy(dtype=float)[order]
    kl = known[level_col].to_numpy(dtype=float)[order]

    miss_mask = out[level_col].isna() & out[storage_col].notna()

    for idx in out.index[miss_mask]:
        s = float(out.at[idx, storage_col])

        # 1) 相同 storage
        exact_hits = np.isclose(ks, s, atol=storage_tol, rtol=0.0) if storage_tol > 0 else (ks == s)
        if np.any(exact_hits):
            vals = kl[exact_hits]
            n = vals.size
            fill_val = float(vals[0]) if n == 1 else (float((vals[0] + vals[1]) / 2.0) if n == 2 else float(np.median(vals)))
            out.at[idx, level_col] = fill_val
            continue

        # 2) 上下邻居
        pos = np.searchsorted(ks, s, side="left")
        left_ok = (pos - 1) >= 0
        right_ok = pos < ks.size

        if left_ok and right_ok:
            sL, lL = ks[pos - 1], kl[pos - 1]
            sR, lR = ks[pos], kl[pos]

            if not np.isclose(sL, sR):
                t = (s - sL) / (sR - sL)
                out.at[idx, level_col] = float(lL + t * (lR - lL))
                out.at[idx, flag_level_col] = FLAG_Interpolated
            else:
                out.at[idx, level_col] = float(lL if abs(s - sL) <= abs(s - sR) else lR)
        elif left_ok:
            out.at[idx, level_col] = float(kl[pos - 1])
        elif right_ok:
            out.at[idx, level_col] = float(kl[pos])

    return out


# ------------------------------------------------------------------------------
# (9) 统一时间插值（只补内部缺口；可选补边缘）——FLAG_Interpolated
# ------------------------------------------------------------------------------

def final_time_interp(df, date_col="date", val_col="level", flag_col="flag_level", limit=3, fill_edges=False):
    """
    在规则化时间轴上，对指定列做线性插值：
    - 默认仅补内部缺口（不外推首尾）；
    - 成功补值且原 flag=0 → 置 flag=2。
    """
    g = df.copy()
    g[date_col] = pd.to_datetime(g[date_col], errors="coerce")
    g = g.dropna(subset=[date_col]).sort_values(date_col).set_index(date_col)

    if isinstance(g.index, pd.PeriodIndex):
        g.index = g.index.to_timestamp(how="start")
    else:
        g.index = pd.to_datetime(g.index, errors="coerce")
        g = g[~g.index.isna()]
        if getattr(g.index, "tz", None) is not None:
            g.index = g.index.tz_localize(None)

    if val_col in g.columns:
        before = g[val_col].isna()

        # ① 仅内部插值（limit_area="inside"）
        g[val_col] = g[val_col].interpolate(method="linear", limit=limit, limit_area="inside")
        filled_inside = before & g[val_col].notna()
        m = filled_inside & ~g[flag_col].isin([FLAG_Erroneous, FLAG_Suspect])
        g.loc[m, flag_col] = FLAG_Interpolated

        # ② 可选补边缘（前/后向内），仍不做外推
        if fill_edges:
            before_edges = g[val_col].isna()
            g[val_col] = g[val_col].interpolate(method="linear", limit=limit, limit_direction="forward")
            g[val_col] = g[val_col].interpolate(method="linear", limit=limit, limit_direction="backward")
            filled_edge = before_edges & g[val_col].notna()
            m = filled_edge & ~g[flag_col].isin([FLAG_Erroneous, FLAG_Suspect])
            g.loc[m, flag_col] = FLAG_Interpolated

    return g.reset_index().rename(columns={"index": date_col})



# ===========================================
# 单变量质控函数
# ===========================================

def qc_level_or_storage(df, station_col, date_col, value_col):
    """
    针对单一变量（level 或 storage）的质控与插补。
    """

    name = value_col.lower()
    if "level" in name:
        var, flag, raw = "level", "flag_level", "level_raw"
    elif "stor" in name:
        var, flag, raw = "storage", "flag_storage", "storage_raw"
    else:
        var, flag, raw = "level", "flag_level", "level_raw"

    rename_map = {station_col: "id", date_col: "date", value_col: var}
    g = df.rename(columns=rename_map).copy()[["id", "date", var]]

    id_val = g['id'].unique().item()
    g[var] = pd.to_numeric(g[var], errors="coerce")
    g[raw] = g[var]
    g[flag] = FLAG_Good
    g.loc[g[var] == 0, var] = np.nan
    
    frequency, g = detect_and_convert(g, "date")
    max_gap = 60 if str(frequency).upper().startswith('D') else 12

    # 标准差检测
    g[f"std_{var}"] = std_z(g[var]).abs()
    g.loc[g[f"std_{var}"] > Z_THR, flag] = FLAG_Erroneous
    g.loc[g[flag].isin([FLAG_Erroneous, FLAG_Suspect]), "level"] = np.nan
    
    # 稳健标准差检测
    g[f"rb_{var}"] = np.nan
    g[f"rb_{var}"] = robust_z(g[var]).abs()
    g.loc[g[f"rb_{var}"] > RB_THR, flag] = FLAG_Erroneous
    g.loc[g[flag].isin([FLAG_Erroneous, FLAG_Suspect]), var] = np.nan
    
    # 跳变检测
    g = big_jump_flag_one_clean(g, val_col=var, flag_col=flag, max_gap_days=max_gap, baseline_k=5)
    g.loc[g[flag].isin([FLAG_Erroneous, FLAG_Suspect]), var] = np.nan
    
    # 时间规则化与插值
    g = ensure_regular_time_index(g, "date", frequency)
    g[flag] = g[flag].fillna(FLAG_Interpolated)
    g = final_time_interp(g, val_col=var, flag_col=flag, limit=max_gap)

    # 标记已插补
    g.loc[g[raw].isna() & g[var].notna(), flag] = FLAG_Interpolated
    g = g.dropna(subset=[var], how="all")
    
    g["id"] = id_val
    cols_out = ["id", "date", raw, flag, var]
    return g[cols_out]


# ===========================================
# 双变量质控函数
# ===========================================

def qc_level_and_storage(df, station_col, date_col, level_col, storage_col):
    """
    对同一测站的水位(Level)和库容(Storage)时间序列进行联合质控与插补。
    """

    # ---- 1) 重命名列 ----
    rename_cols = {station_col: "id", date_col: "date", level_col: "level", storage_col: "storage"}
    g = df.rename(columns=rename_cols).copy()[["id", "date", "level", "storage"]]
    id_val = g['id'].unique().item()

    # ---- 2) 转换类型 & 初始化 ----
    g['level'] = pd.to_numeric(g['level'], errors='coerce')
    g['storage'] = pd.to_numeric(g['storage'], errors='coerce')
    g["level_raw"] = g["level"]
    g["storage_raw"] = g["storage"]
    g["flag_level"] = FLAG_Good
    g["flag_storage"] = FLAG_Good

    # 零值设为缺测
    g.loc[g["level"] == 0, "level"] = np.nan
    g.loc[g["storage"] == 0, "storage"] = np.nan

    g.loc[g["level_raw"] == 0, "flag_level"] = FLAG_Erroneous
    g.loc[g["level_raw"] == 0, "flag_storage"] = FLAG_Erroneous
    
    # ---- 3) 检测频率 ----
    frequency, g = detect_and_convert(g, "date")
    max_gap = 60 if str(frequency).upper().startswith('D') else 12

    # ---- 4) 标准差检测（std_z）----FLAG_Erroneous
    g["std_level"] = std_z(g["level"]).abs()
    g["std_storage"] = std_z(g["storage"]).abs()
    g.loc[g["std_level"] > Z_THR, "flag_level"] = FLAG_Erroneous
    g.loc[g["std_storage"] > Z_THR, "flag_storage"] = FLAG_Erroneous
    
    g.loc[g["flag_level"].isin([FLAG_Erroneous, FLAG_Suspect]), "level"] = np.nan
    g.loc[g["flag_storage"].isin([FLAG_Erroneous, FLAG_Suspect]), "storage"] = np.nan

    # ---- 5) 稳健标准差检测（robust_z）----FLAG_Erroneous
    g["rb_level"] = np.nan
    g["rb_storage"] = np.nan
    
    g["rb_level"] = robust_z(g["level"]).abs()
    g["rb_storage"] = robust_z(g["storage"]).abs()
    g.loc[g["rb_level"] > RB_THR, "flag_level"] = FLAG_Erroneous
    g.loc[g["rb_storage"] > RB_THR, "flag_storage"] = FLAG_Erroneous
    
    g.loc[g["flag_level"].isin([FLAG_Erroneous, FLAG_Suspect]), "level"] = np.nan
    g.loc[g["flag_storage"].isin([FLAG_Erroneous, FLAG_Suspect]), "storage"] = np.nan
    
    # ---- 6) 跳变检测 ----FLAG_Erroneous
    source = df_metadata.loc[df_metadata["RES_ID"] == int(id_val), "Source"].iloc[0]
    if source == 'DAHITI':
        g = big_jump_flag_one_clean(g, val_col="level", flag_col="flag_level", max_gap_days=max_gap, baseline_k=5)
    else:
        g = big_jump_flag_one_clean(g, val_col="level", flag_col="flag_level", max_gap_days=max_gap, baseline_k=5)
        g = big_jump_flag_one_clean(g, val_col="storage", flag_col="flag_storage", max_gap_days=max_gap, baseline_k=5, multiple=3)

    g.loc[g["flag_level"].isin([FLAG_Erroneous, FLAG_Suspect]), "level"] = np.nan
    g.loc[g["flag_storage"].isin([FLAG_Erroneous, FLAG_Suspect]), "storage"] = np.nan

    # ---- 7) Level 与 Storage 对应关系异常 ----FLAG_Suspect
    g = flag_storage_to_level(g, level_col="level", storage_col="storage", flag_level_col="flag_level", flag_storage_col="flag_storage")
    # g = flag_level_to_storage(g,level_col="level", storage_col="storage", flag_level_col="flag_level", flag_storage_col="flag_storage")

    g.loc[g["flag_level"].isin([FLAG_Erroneous, FLAG_Suspect]), "level"] = np.nan
    g.loc[g["flag_storage"].isin([FLAG_Erroneous, FLAG_Suspect]), "storage"] = np.nan

    # ---- 8) 规则化时间索引 ----
    g = ensure_regular_time_index(g, "date", frequency)
    g["flag_level"]   = g["flag_level"].fillna(FLAG_Good)
    g["flag_storage"] = g["flag_storage"].fillna(FLAG_Good)

    # ---- 9) 互补：仅在两 flag=0 的行，用单调映射互补 ----
    g = fill_by_counterpart(g)

    # ---- 10) 对level缺失，但storage存在的进行插值，因为 9) 不能全部补全，这个时候不能随便插，要考虑storage ----
    g = fill_level_from_storage(g, storage_col="storage", level_col="level", flag_level_col="flag_level", storage_tol=1e-6)

    # ---- 11) 对 level 和 storage 都缺失或者利用对应关系也不能补全的进行插值补全 ----
    g = final_time_interp(g, val_col="level", flag_col="flag_level", limit=max_gap)
    g = fill_by_counterpart(g)

    # ---- 12) 标记已插补 ----
    mL = g["level_raw"].isna()   & g["level"].notna()    & ~g["flag_level"].isin([FLAG_Erroneous, FLAG_Suspect])
    g.loc[mL, "flag_level"] = FLAG_Interpolated

    mS = g["storage_raw"].isna() & g["storage"].notna()  & ~g["flag_storage"].isin([FLAG_Erroneous, FLAG_Suspect])
    g.loc[mS, "flag_storage"] = FLAG_Interpolated

    # ---- 13) 清理输出 ----
    g = g.dropna(subset=["level", "storage"], how="all")
    g["id"] = id_val
    cols_out = ["id", "date", "level_raw", "flag_level", "level", "storage_raw", "flag_storage", "storage"]

    return g[cols_out]

In [14]:
#29 3549
df = pd.read_csv(r"D:\Zhang-MY\Python-Code\paper\全球水库水位数据下载和处理\Global Reservoir Long Time Series Water Level and Water Storage Data\4245.csv",dtype={0: str})
dfff = qc_level_or_storage(df, 'RES_ID', 'Date', 'Level').rename(columns={'id':'ID','date':'Date','level':'Level','flag_level':'Flag_Level','storage':'Storage','flag_storage':'Flag_Storage'})
dfff

# df = pd.read_csv(r"D:\Zhang-MY\Python-Code\paper\全球水库水位数据下载和处理\Global Reservoir Long Time Series Water Level and Water Storage Data\196.csv",dtype={0: str})
# dfff = qc_one_dataframe(df, 'RES_ID', 'Date', 'Storage').rename(columns={'id':'ID','date':'Date','level':'Level','flag_level':'Flag_Level','storage':'Storage','flag_storage':'Flag_Storage'})
# dfff

# /1022/69/1006/1003   1414没有同时存在level和storage的数据  检验最后一个插值37   3712、3427
# df = pd.read_csv(r"D:\Zhang-MY\Python-Code\paper\全球水库水位数据下载和处理\Global Reservoir Long Time Series Water Level and Water Storage Data\37.csv")
# dfff = qc_level_and_storage(df, 'RES_ID', 'Date', 'Level','Storage').rename(columns={'id':'RES_ID','date':'Date','level_raw':'Level_Raw','level':'Level','flag_level':'Flag_Level','storage_raw':'Storage_Raw','storage':'Storage','flag_storage':'Flag_Storage'})
# dfff.to_csv(r"D:\Desktop\aaa.csv",index = False)
# # dfff = dfff[dfff['Date'] > '1988-09-01']
# dfff

Unnamed: 0,ID,Date,level_raw,Flag_Level,Level
0,4245,1992-11-30,356.94,0.0,356.940
1,4245,1992-12-01,,3.0,356.930
2,4245,1992-12-02,,3.0,356.920
3,4245,1992-12-03,,3.0,356.910
4,4245,1992-12-04,,3.0,356.900
...,...,...,...,...,...
6844,4245,2022-04-02,,3.0,356.758
6845,4245,2022-04-03,,3.0,356.781
6846,4245,2022-04-04,,3.0,356.804
6847,4245,2022-04-05,,3.0,356.827


In [None]:
import os
import shutil
import pandas as pd
import numpy as np
from tqdm import tqdm
from pathlib import Path

folder_in = r"../Global Reservoir Long Time Series Water Level and Water Storage Data/"
bad_id = []
files = [f for f in os.listdir(folder_in) if f.endswith(".csv")]

for file in tqdm(files, desc="Processing CSV files", unit="file"):
    file_id = os.path.splitext(file)[0]  # 提取文件名（RES_ID_raw）
    path = os.path.join(folder_in, file)
    df = pd.read_csv(path)
    df = df.replace("0", np.nan).replace(0, np.nan)
    
    frequency, g = detect_and_convert(df, "Date")
    max_gap = 30 if str(frequency).upper().startswith('D') else 12

    if "Level" in df.columns and "Storage" in df.columns:
        length = max(len(df["Level"].dropna()), len(df["Storage"].dropna()))
    elif "Level" in df.columns:
        length = len(df["Level"].dropna())
    elif "Storage" in df.columns:
        length = len(df["Storage"].dropna())
    else:
        length = 0
        
    if length < max_gap:
        bad_id.append(int(file_id))
        continue
print(f"数据过少 {len(bad_id)} 个：")

df_Metadata = pd.read_csv(r"../Metadata.csv")
df_Metadata = df_Metadata[~df_Metadata["RES_ID"].isin(bad_id)].rename(columns={"RES_ID":"RES_ID_raw"})
df_Metadata["RES_ID"] = range(1, len(df_Metadata) + 1)
df_Metadata.to_csv("../Metadata.csv", index=False, encoding="utf-8-sig")
df_Metadata

In [5]:
import os
import shutil
import pandas as pd

df_Metadata = pd.read_csv(r"../Metadata.csv")
folder_in = r"../Global Reservoir Long Time Series Water Level and Water Storage Data/"
folder_out = r"../Global Reservoir Long Time Series Water Level and Water Storage Data New/"
os.makedirs(folder_out, exist_ok=True)

for _, row in df_Metadata.iterrows():
    old_name = f"{row['RES_ID_raw']}.csv"
    new_name = f"{row['RES_ID']}.csv"
    
    old_path = os.path.join(folder_in, old_name)
    new_path = os.path.join(folder_out, new_name)
    
    if os.path.exists(old_path):
        shutil.copy2(old_path, new_path)  # ✅ 复制文件
        
        # ✅ 修改 CSV 内部的 RES_ID 列为新文件名（去掉 .csv）
        try:
            df = pd.read_csv(new_path)
            if "RES_ID" in df.columns:
                df["RES_ID"] = row["RES_ID"]  # 或者用 new_name.replace(".csv", "")
                df.to_csv(new_path, index=False)
            else:
                print(f"⚠️ 文件 {new_name} 中未找到 'RES_ID' 列，跳过修改。")
        except Exception as e:
            print(f"❌ 修改 {new_name} 时出错：{e}")
    else:
        print(f"⚠️ 未找到文件：{old_name}")


In [3]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from pathlib import Path

folder_in = r"../Global Reservoir Long Time Series Water Level and Water Storage Data New/"
folder_out = r"../GROWL/GROWL_timeseries/"
os.makedirs(folder_out, exist_ok=True)
bad_id = []
files = [f for f in os.listdir(folder_in) if f.endswith(".csv")]
existing_ids = {os.path.splitext(f)[0] for f in os.listdir(folder_out) if f.endswith(".csv")}

for file in tqdm(files, desc="Processing CSV files", unit="file"):
    file_id = os.path.splitext(file)[0]
    if file_id in existing_ids:
        continue
        
    path = os.path.join(folder_in, file)
    # print(f"正在处理 {os.path.basename(file)}")
    df = pd.read_csv(path).replace(0, np.nan)

    has_level = "Level" in df.columns and df["Level"].nunique() > 1
    has_storage = "Storage" in df.columns and df["Storage"].nunique() > 1

    # 按列的存在情况调用 qc_and_dataframe / qc_one_dataframe
    if has_level and has_storage:
        df_clear = qc_level_and_storage(df, 'RES_ID', "Date", "Level", "Storage")
    elif has_level:
        df_clear = qc_level_or_storage(df, 'RES_ID', "Date", "Level")
    elif has_storage:
        df_clear = qc_level_or_storage(df, 'RES_ID', "Date", "Storage")
    
    else:
        bad_id.append(os.path.splitext(file)[0])
        continue

    df_clear = df_clear.rename(columns={
        "id": 'RES_ID',
        "date": "Date",
        "level_raw": "Level_Raw",
        "flag_level": "Flag_Level",
        "level": "Level",
        "storage_raw": "Storage_Raw",
        "storage": "Storage",
        "flag_storage": "Flag_Storage",
    })
    df_clear['RES_ID'] = df_clear['RES_ID'].astype(str)
    
    if "Storage" in df_clear.columns and df_clear["Storage"].isna().all():
        df_clear = df_clear.drop(columns=["Storage","Flag_Storage"])
    if "Level" in df_clear.columns and df_clear["Level"].isna().all():
        df_clear = df_clear.drop(columns=["Level","Flag_Level"])
        
    out_path = os.path.join(folder_out, file)
    df_clear.to_csv(out_path, index=False, encoding="utf-8-sig")
bad_id

Processing CSV files: 100%|██████████| 4200/4200 [1:42:52<00:00,  1.47s/file]  


[]

In [None]:
# import os
# import pandas as pd
# 
# folder_path = r"../GROWL/GROWL_timeseries/"
# for filename in os.listdir(folder_path):
#     if filename.endswith(".csv"):
#         file_path = os.path.join(folder_path, filename)
#         # 读取 csv
#         g = pd.read_csv(file_path).rename(columns={
#             "id": 'RES_ID',
#             "date": "Date",
#             "level_raw": "Level_Raw",
#             "flag_level": "Flag_Level",
#             "level": "Level",
#             "storage_raw": "Storage_Raw",
#             "storage": "Storage",
#             "flag_storage": "Flag_Storage"})
#         
#         g['RES_ID'] = int(os.path.splitext(filename)[0])
# 
#         cols_out = ['RES_ID', "Date","Level_Raw", "Flag_Level", "Level","Storage_Raw", "Flag_Storage", "Storage"]
#         cols_present = [c for c in cols_out if c in g.columns]
#         g = g[cols_present]
#         g.to_csv(file_path, index=False)
# 
#         print(f"✅ 已处理: {filename}")


In [6]:
# -------- 配置 --------
data_dir = Path("../GROWL/GROWL_timeseries")

# 最终导出的列（学术化命名）
FINAL_COLS = [
    "RES_ID", "Station", "Name", "Latitude", "Longitude",
    "Temporal_Resolution",
    "Level_Periods", "Storage_Periods",
    "Level_Record_Length_Years", "Storage_Record_Length_Years",
    "In-situ_Level_Ratio", "In-situ_Storage_Ratio",
    "Station_Elevation_m", "Vertical_Datum",
    "Hydrolakes_Match_Type", "Hydrolakes_Hylak_id", "Hydrolakes_Match_Distance_m",
    "Reservoirs_Match_Type", "Reservoirs_fid_1", "Reservoirs_Match_Distance_m",
    "GDW_Match_Type", "GDW_GDW_ID", "GDW_Match_Distance_m",
    "Type", "Country", "Source", "Source_Link", "Other_Metadata"]

# 若原始 metadata 的部分列名需要映射到最终名，这里一次性给出
ORIG_TO_FINAL = {
    "Altitude": "Station_Elevation_m",
    "AltitudeReference": "Vertical_Datum",
    "Hydrolakes_match_type": "Hydrolakes_Match_Type",
    "Hydrolakes_Hylak_id": "Hydrolakes_Hylak_id",
    "Hydrolakes_match_distance_m": "Hydrolakes_Match_Distance_m",
    "reservoirs_match_type": "Reservoirs_Match_Type",
    "reservoirs_fid_1": "Reservoirs_fid_1",
    "reservoirs_match_distance_m": "Reservoirs_Match_Distance_m",
    "GDW_match_type": "GDW_Match_Type",
    "GDW_GDW_ID": "GDW_GDW_ID",
    "GDW_match_distance_m": "GDW_Match_Distance_m",
    "Link": "Source_Link",
    "Other": "Other_Metadata"}

# -------- 工具函数 --------
def get_continuous_periods(dates, freq="D"):
    """输入日期序列，返回连续区间字符串"""
    dates = pd.Series(dates).dropna().sort_values().dt.date
    if dates.empty:
        return None

    periods = []
    start = dates.iloc[0]
    prev = start

    for current in dates.iloc[1:]:
        if freq == "D":
            continuous = ((current - prev).days == 1)
        elif freq == "M":
            # 月频的“相邻一月”判断
            continuous = (
                (current.year == prev.year and current.month == prev.month + 1) or
                (prev.month == 12 and current.year == prev.year + 1 and current.month == 1)
            )
        else:
            continuous = False

        if not continuous:
            periods.append(f"{start:%Y-%m-%d} – {prev:%Y-%m-%d}")
            start = current
        prev = current

    periods.append(f"{start:%Y-%m-%d} – {prev:%Y-%m-%d}")
    return ", ".join(periods)

def count_to_years(count, freq):
    """根据数据点数和频率换算成年数（保留 1 位小数）"""
    if not count or count == 0:
        return None
    if freq == "D":
        return round(count / 365, 1)
    if freq == "M":
        return round(count / 12, 1)
    return None

def infer_temporal_resolution(dates) -> str:
    """根据单月内样本量粗判日/月，并返回 'Daily' 或 'Monthly'"""
    s = pd.Series(dates)
    counts = s.groupby([s.dt.year, s.dt.month]).count()
    if (counts >= 10).any():
        return "Daily"
    return "Monthly"

# 读取原始 metadata 并把能直接映射的列换名到最终名（其余原样保留）
df_meta_raw = pd.read_csv(r"../Metadata.csv")
df_meta = df_meta_raw.rename(columns=ORIG_TO_FINAL).copy()

# 先确保最终列都存在，默认 None（不会覆盖已有同名列）
for col in FINAL_COLS:
    if col not in df_meta.columns:
        df_meta[col] = None

# 遍历逐站填充“计算类”字段
for idx, row in df_meta.iterrows():
    ID = str(row["RES_ID"])
    file_path = data_dir / f"{ID}.csv"
    if not file_path.exists():
        continue

    df = pd.read_csv(file_path, dtype={0: str})
    if "Date" not in df.columns:
        continue
    df["Date"] = pd.to_datetime(df["Date"], errors="coerce")

    # 判定时间分辨率
    temporal_res = infer_temporal_resolution(df["Date"])
    df_meta.at[idx, "Temporal_Resolution"] = temporal_res

    # Level
    if {"Level", "Flag_Level"}.issubset(df.columns):
        total = df["Flag_Level"].notna().sum()
        zero_ratio = (df["Flag_Level"] == 0).sum() / total if total > 0 else 0
        df_meta.at[idx, "In-situ_Level_Ratio"] = zero_ratio

        non_na = df["Level"].notna()
        if non_na.any():
            # 连续区间（按判定的时间分辨率计算相邻）
            freq_code = "D" if temporal_res == "Daily" else "M"
            df_meta.at[idx, "Level_Periods"] = get_continuous_periods(df.loc[non_na, "Date"], freq=freq_code)
            # 年数
            df_meta.at[idx, "Level_Record_Length_Years"] = count_to_years(non_na.sum(), freq_code)

    # Storage
    if {"Storage", "Flag_Storage"}.issubset(df.columns):
        total = df["Flag_Storage"].notna().sum()
        zero_ratio = (df["Flag_Storage"] == 0).sum() / total if total > 0 else 0
        df_meta.at[idx, "In-situ_Storage_Ratio"] = zero_ratio

        non_na = df["Storage"].notna()
        if non_na.any():
            freq_code = "D" if temporal_res == "Daily" else "M"
            df_meta.at[idx, "Storage_Periods"] = get_continuous_periods(df.loc[non_na, "Date"], freq=freq_code)
            df_meta.at[idx, "Storage_Record_Length_Years"] = count_to_years(non_na.sum(), freq_code)

# 只保留最终列并导出
df_Metadata = df_meta[FINAL_COLS].copy()
df_Metadata.to_csv(r"../GROWL/GROWL_metadata.csv", index=False, encoding="utf-8-sig")
df_Metadata

Unnamed: 0,RES_ID,Station,Name,Latitude,Longitude,Temporal_Resolution,Level_Periods,Storage_Periods,Level_Record_Length_Years,Storage_Record_Length_Years,...,Reservoirs_fid_1,Reservoirs_Match_Distance_m,GDW_Match_Type,GDW_GDW_ID,GDW_Match_Distance_m,Type,Country,Source,Source_Link,Other_Metadata
0,1,001886,Kagalurpak,60.970000,-164.010000,Daily,"1992-09-29 – 2001-10-14, 2001-12-22 – 2002-07-...",,25.1,,...,,,,,,Remote sensing,USA,G-REALM,https://ipad.fas.usda.gov/cropexplorer/global_...,lat_range_min:60.992;lat_range_max:60.952;sat_...
1,2,000497,Dall,60.330000,-163.800000,Daily,"1992-09-27 – 2002-10-13, 2002-12-22 – 2003-02-...",,28.4,,...,,,,,,Remote sensing,USA,G-REALM,https://ipad.fas.usda.gov/cropexplorer/global_...,lat_range_min:60.256;lat_range_max:60.374;sat_...
2,3,001469,Baird_Inlet,60.790000,-163.490000,Daily,"1992-10-09 – 2002-08-07, 2003-02-11 – 2003-02-...",,24.5,,...,,,,,,Remote sensing,USA,G-REALM,https://ipad.fas.usda.gov/cropexplorer/global_...,lat_range_min:60.813;lat_range_max:60.792;sat_...
3,4,001063,Nunavakpak,60.770000,-162.570000,Daily,"1992-09-27 – 2002-08-05, 2004-12-24 – 2005-01-...",,24.5,,...,,,,,,Remote sensing,USA,G-REALM,https://ipad.fas.usda.gov/cropexplorer/global_...,lat_range_min:60.756;lat_range_max:60.795;sat_...
4,5,16094150,"Ka Loko Reservoir near Kilauea, Kauai, HI",22.178968,-159.379061,Daily,2006-03-20 – 2025-07-20,,19.4,,...,19680.0,0.000000,buffer,26844.0,58.879466,Station,USA,USGS,https://waterdata.usgs.gov/nwis/sw,"Kauai County, Hawaii, Hydrologic Unit 20070000"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4195,4196,142108A,Sideling Ck at Lake Kurwongbah Dam HW,-27.250000,152.950000,Daily,"2009-12-12 – 2010-02-07, 2012-08-09 – 2021-05-04","2009-12-12 – 2010-02-07, 2012-08-09 – 2021-05-04",8.9,8.9,...,78451.0,0.000000,within,5385.0,0.000000,Station,Australia,WDO,http://www.bom.gov.au/waterdata/,QLD - Queensland Bulk Water Supply Authority (...
4196,4197,146033A,Nerang R at Hinze Dam HW,-28.050000,153.280000,Daily,"1986-09-30 – 2009-07-29, 2012-10-23 – 2013-01-...","1986-09-30 – 2009-07-29, 2012-10-23 – 2013-01-...",31.2,31.2,...,84659.0,1.328184,buffer,5397.0,291.396217,Station,Australia,WDO,http://www.bom.gov.au/waterdata/,QLD - Queensland Bulk Water Supply Authority (...
4197,4198,146034A,Little Nerang,-28.145197,153.284782,Daily,"1995-12-31 – 2008-09-17, 2012-10-15 – 2019-07-...","1995-12-31 – 2008-09-17, 2012-10-15 – 2019-07-...",20.5,20.5,...,,,,,,Station,Australia,WDO,http://www.bom.gov.au/waterdata/,QLD - Queensland Bulk Water Supply Authority (...
4198,4199,100265,Wakatipu,-44.960000,168.400000,Daily,"2008-12-13 – 2016-09-28, 2016-12-06 – 2016-12-...",,12.7,,...,,,,,,Remote sensing,New Zealand,G-REALM,https://ipad.fas.usda.gov/cropexplorer/global_...,lat_range_min:-44.987;lat_range_max:-44.934;sa...
