In [1]:
# ────────────────────────────────────────────────────────────────
#  eis_helpers.py  ·  companion module for “Unified EIS v9”
# ────────────────────────────────────────────────────────────────
#
#  This **stand-alone** helper library implements every symbol that the
#  new v9 training / inference script imports.  Drop this file next to
#  your v9 main script (or add its directory to PYTHONPATH) and you’re
#  good to go.  All external dependencies are limited to NumPy / Pandas /
#  SciPy (for .mat I/O and simple interpolation).
#
#  Key capabilities
#  ----------------
#  • Canonical frequency grid + feature engineering  (magnitude, phase,
#    optional DRT placeholder, room for custom shape-model features)
#  • Robust filename metadata parsing for both EIS and capacity files
#  • Capacity-vs-cycle loader that derives SoH and builds a CPP→cycles
#    heuristic map
#  • Dataset assembly (“long table” with one feature vector per EIS file)
#  • Minimal plotting helper for projection curves (keeps PNG output)
#  • JSON-friendly serialisation helpers
#
#  Anything not needed by v9 but retained from earlier versions
#  (e.g. `train_models`, `load_bundle`) is stubbed so import errors vanish.
#  Replace the stubs with your own advanced logic whenever required,
#  without touching the main v9 script.
# ────────────────────────────────────────────────────────────────
from __future__ import annotations

import json
import math
import random
import re
import warnings
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List, Sequence, Tuple

import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.io import loadmat

# ╭───────────────────────────────╮
# │ 1 · GLOBAL CONSTANTS & SEEDS  │
# ╰───────────────────────────────╯
# Canonical 50-point log-spaced frequency grid (0.01 Hz → 10 kHz)
CANON_FREQ: np.ndarray = np.logspace(-2, 4, 50)


def set_seed(seed: int = 123) -> None:  # noqa: D401
    """Set global RNG seeds for reproducibility."""
    import os

    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)


# ╭─────────────────────────╮
# │ 2 · JSON SERIALISATION  │
# ╰─────────────────────────╯
def to_jsonable(obj: Any) -> Any:  # noqa: D401
    """Recursively convert NumPy / Path / set → vanilla JSON types."""
    if isinstance(obj, (np.integer, np.floating)):
        return obj.item()
    if isinstance(obj, (np.ndarray, set, tuple, list)):
        return [to_jsonable(x) for x in obj]
    if isinstance(obj, dict):
        return {k: to_jsonable(v) for k, v in obj.items()}
    if isinstance(obj, Path):
        return str(obj)
    return obj


# ╭────────────────────────────╮
# │ 3 · FILENAME META PARSERS │
# ╰────────────────────────────╯
_EIS_RE = re.compile(
    r"(?P<cell>[A-Za-z0-9\-]+)[_\-]?(?:Cycle|CYL)?(?P<cycle>\d+)[_\-]?(?:SoC)?(?P<soc>\d+)?",
    re.I,
)
_CAP_RE = re.compile(
    r"(?P<cell>[A-Za-z0-9\-]+)[_\-]?(?:Cap|CYC|Cycle)?(?P<cycle>\d+)",
    re.I,
)


def parse_eis_metadata(fp: Path) -> Dict[str, Any]:
    m = _EIS_RE.search(fp.stem)
    if not m:
        return dict(cell_id=fp.stem, cycle_idx=np.nan, soc=np.nan)
    d = m.groupdict()
    return dict(
        cell_id=d["cell"],
        cycle_idx=int(d["cycle"]) if d["cycle"] else np.nan,
        soc=float(d["soc"]) if d["soc"] else np.nan,
    )


def parse_cap_metadata(fp: Path) -> Dict[str, Any]:
    m = _CAP_RE.search(fp.stem)
    if not m:
        return dict(cell_id=fp.stem, cycle_idx=np.nan)
    d = m.groupdict()
    return dict(cell_id=d["cell"], cycle_idx=int(d["cycle"]) if d["cycle"] else np.nan)


# ╭───────────────────────────────────────────────────╮
# │ 4 · CAPACITY FILES  →  SoH & CPP HEURISTICS      │
# ╰───────────────────────────────────────────────────╯
def _load_one_capacity(fp: Path) -> pd.DataFrame:
    """Supports .csv, .xls, .xlsx with columns ≈ [Cycle, Capacity]."""
    if fp.suffix.lower() == ".csv":
        df = pd.read_csv(fp)
    else:
        df = pd.read_excel(fp, engine="openpyxl" if fp.suffix.lower() == ".xlsx" else None)

    df = df.rename(columns=lambda s: s.strip().lower())
    # normalise column names
    if "capacity" not in df.columns:
        for alt in ("capacity_ah", "cap_ah", "capacity (ah)"):
            if alt in df.columns:
                df["capacity"] = df[alt]
                break
    if "cycle" not in df.columns:
        raise ValueError(f"File {fp.name} has no 'Cycle' column")

    meta = parse_cap_metadata(fp)
    df = df[["cycle", "capacity"]].copy()
    df.insert(0, "cell_id", meta["cell_id"])
    return df


def load_capacity_info(cap_dir: Path) -> pd.DataFrame:
    """Concatenate all capacity-check files into tidy DataFrame."""
    dfs: list[pd.DataFrame] = []
    for fp in cap_dir.rglob("*"):
        if fp.suffix.lower() not in {".csv", ".xls", ".xlsx"}:
            continue
        try:
            dfs.append(_load_one_capacity(fp))
        except Exception as exc:  # noqa: BLE001
            warnings.warn(f"Skipping capacity file {fp.name}: {exc}")
    if not dfs:
        raise FileNotFoundError(f"No capacity files found under {cap_dir}")
    cap_df = pd.concat(dfs, ignore_index=True)
    cap_df["init_cap"] = cap_df.groupby("cell_id")["capacity"].transform("first")
    cap_df["soh_percent"] = 100 * cap_df["capacity"] / cap_df["init_cap"]
    return cap_df


def build_cpp_map(cap_dir: Path) -> Dict[float, float]:
    """
    Build *capacity-percent → median cycles-remaining* mapping across dataset.

    Used as heuristic fallback during inference.
    """
    cap_df = load_capacity_info(cap_dir)
    final_cycle = cap_df.groupby("cell_id")["cycle"].transform("max")
    cap_df["cycles_remaining"] = final_cycle - cap_df["cycle"]
    cap_df["cap_bin"] = cap_df["soh_percent"].round()  # 1 ppt bins
    med = cap_df.groupby("cap_bin")["cycles_remaining"].median().dropna()
    return med.to_dict()


def get_cpp(cap_pct: float, cpp_map: Dict[float, float]) -> float:
    """Return cycles-remaining via nearest-neighbour lookup in *cpp_map*."""
    if not cpp_map or math.isnan(cap_pct):
        return float("nan")
    keys = np.array(list(cpp_map))
    nearest = keys[(np.abs(keys - cap_pct)).argmin()]
    return float(cpp_map[nearest])


# ╭──────────────────────────────╮
# │ 5 · EIS → FEATURE VECTORS    │
# ╰──────────────────────────────╯
def _read_eis(fp: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Return (freq [Hz], Zreal [Ω], Zimag [Ω])."""
    suf = fp.suffix.lower()
    if suf in {".csv", ".txt"}:
        df = pd.read_csv(fp)
    elif suf in {".xls", ".xlsx"}:
        df = pd.read_excel(fp, engine="openpyxl" if suf == ".xlsx" else None)
    elif suf == ".mat":
        mdict = loadmat(fp)
        return np.squeeze(mdict["freq"]), np.squeeze(mdict["Zreal"]), np.squeeze(mdict["Zimag"])
    else:
        raise ValueError(f"Unsupported EIS file type: {fp.suffix}")
    df = df.rename(columns=lambda s: s.strip().lower())
    freq = df.iloc[:, 0].to_numpy(float)
    zre = df.iloc[:, 1].to_numpy(float)
    zim = df.iloc[:, 2].to_numpy(float)
    return freq, zre, zim


def _interp_to_grid(freq: np.ndarray, arr: np.ndarray) -> np.ndarray:
    """Log-log interpolation onto CANON_FREQ."""
    with np.errstate(divide="ignore", invalid="ignore"):
        f = interp1d(np.log10(freq), np.log10(np.abs(arr)), bounds_error=False, fill_value="extrapolate")
    return 10 ** f(np.log10(CANON_FREQ))


def _drt_stub(freq: np.ndarray, zre: np.ndarray, zim: np.ndarray) -> np.ndarray:
    """
    Placeholder for DRT features (returns zeros).  Replace with your own
    distribution-of-relaxation-times calculator if available.
    """
    return np.zeros_like(CANON_FREQ)


def featurize_any(fp: Path, include_drt: bool = True) -> np.ndarray:
    """Generate one concatenated feature vector."""
    freq, zre, zim = _read_eis(fp)
    mag = np.sqrt(zre**2 + zim**2)
    pha = np.degrees(np.arctan2(zim, zre))
    vecs = [
        _interp_to_grid(freq, mag),
        _interp_to_grid(freq, pha),
    ]
    if include_drt:
        vecs.append(_drt_stub(freq, zre, zim))
    return np.concatenate(vecs).astype(np.float32)


# ╭─────────────────────────────╮
# │ 6 · DATASET CONSTRUCTION    │
# ╰─────────────────────────────╯
def build_dataset(eis_dir: Path, cap_dir: Path, include_drt: bool = True) -> pd.DataFrame:
    """Return DataFrame with columns features | cell_id | cycle_idx | soc | file_id | final_cycle."""
    rows: list[dict[str, Any]] = []
    for fp in eis_dir.rglob("*"):
        if fp.suffix.lower() not in {".csv", ".txt", ".xls", ".xlsx", ".mat"}:
            continue
        meta = parse_eis_metadata(fp)
        rows.append(
            dict(
                features=featurize_any(fp, include_drt),
                cell_id=meta["cell_id"],
                cycle_idx=meta["cycle_idx"],
                soc=meta["soc"] / 100.0 if not math.isnan(meta["soc"]) else np.nan,
                file_id=fp.stem,
            )
        )
    if not rows:
        raise FileNotFoundError(f"No EIS files found under {eis_dir}")
    df = pd.DataFrame(rows)

    # merge final_cycle from capacity data (if available)
    try:
        caps = load_capacity_info(cap_dir)
        fin = caps.groupby("cell_id")["cycle"].max().rename("final_cycle").reset_index()
        df = df.merge(fin, on="cell_id", how="left")
    except FileNotFoundError:
        df["final_cycle"] = np.nan
    return df


# ╭──────────────────────────────────────────╮
# │ 7 · LEGACY STUBS (v7/v8 compatibility)   │
# ╰──────────────────────────────────────────╯
def train_models(*args, **kwargs):  # noqa: D401
    raise NotImplementedError("train_models() not used in v9")


def load_bundle(*args, **kwargs):  # noqa: D401
    raise NotImplementedError("load_bundle() not used in v9")


# ╭──────────────────────────────╮
# │ 8 · PROJECTION PLOT HELPER   │
# ╰──────────────────────────────╯
@lru_cache(None)
def _cell_soh_curve(cell_id: str, cap_dir: Path) -> Tuple[np.ndarray, np.ndarray]:
    """Return (cycles, SoH%) for one cell, else generic decay if unknown."""
    try:
        df = load_capacity_info(cap_dir)
        sub = df[df["cell_id"] == cell_id]
        if sub.empty:
            raise ValueError
        return sub["cycle"].to_numpy(), sub["soh_percent"].to_numpy()
    except Exception:  # noqa: BLE001
        cycles = np.linspace(0, 1_200, 240)
        soh = 80 + 20 * np.exp(-cycles / 400)
        return cycles, soh


def plot_projection(test_fp: Path) -> Tuple[np.ndarray, np.ndarray]:
    """Return reference projection curve for the cell in *test_fp*."""
    meta = parse_eis_metadata(test_fp)
    cycles, soh = _cell_soh_curve(meta["cell_id"], test_fp.parents[2])
    return cycles, soh


# ╭──────────────────────────────────────────────────╮
# │ 9 · SINGLE-FILE FEATURE BUILDER (FOR INFERENCE) │
# ╰──────────────────────────────────────────────────╯
def build_feature_vector(test_fp: Path, include_drt: bool = True) -> Dict[str, Any]:
    feats = featurize_any(test_fp, include_drt)
    meta = parse_eis_metadata(test_fp)
    cpp = 100.0  # default if no capacity data
    try:
        cap_dir = test_fp.parents[3]
        caps = load_capacity_info(cap_dir)
        row = caps[(caps["cell_id"] == meta["cell_id"]) & (caps["cycle"] == meta["cycle_idx"])]
        if not row.empty:
            cpp = float(row["soh_percent"].iloc[0])
    except Exception:  # noqa: BLE001
        pass
    return {"features": feats, "cpp": cpp}
