# 四轮驱动转向架正交参数化动力学仿真结果分析
核心功能覆盖 7 个步骤：
1) 数据整形 data reshaping（转置 Excel → 整洁表 tidy DataFrame）、中英列名别名解析与模糊匹配  
2) 派生变量重算：D_calc = Lx1 - Lx2、DiffWear_calc = RW Wear - IRW Wear  
3) 质量校验与范围检查 data validation  
4) 皮尔逊相关 Pearson correlation 与 部分相关 partial correlation（控制 controls，默认 Kpx）  
5) 一阶回归与交互（statsmodels OLS + HC3 标准误），含 poly(Lx3,2) 与交互（Lx3×Kpx/Lx1/Lx2/Chx）  
6) 必要散点与拟合线
7) VIF（方差膨胀因子 Variance Inflation Factor）与随机森林 Random Forest

In [None]:
# 环境与显示设置
import os
import sys
import json
import math
import re
import logging
import platform
import warnings
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Optional, Sequence, Tuple

import numpy as np
import pandas as pd

# 可选依赖检测（不自动安装，保持与脚本一致的“优雅降级”）
try:
    import statsmodels.api as sm
    import statsmodels.formula.api as smf
    HAVE_SM = True
except Exception:
    HAVE_SM = False

try:
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.inspection import permutation_importance
    HAVE_SK = True
except Exception:
    HAVE_SK = False

try:
    from scipy import stats as spstats
    HAVE_SCIPY = True
except Exception:
    HAVE_SCIPY = False

# patsy Q：公式中安全引用含空格/符号的列名
try:
    from patsy.builtins import Q
    HAVE_Q = True
except Exception:
    HAVE_Q = False

# Matplotlib：启用 inline 或 widget（交互）
from IPython import get_ipython
import matplotlib
import matplotlib.pyplot as plt
from IPython.display import display, Markdown

# —— 字体与负号：中文显示（Chinese font fallback） ——
def _pick_first_installed(candidates):
    installed = {f.name for f in matplotlib.font_manager.fontManager.ttflist}
    for name in candidates:
        if name in installed:
            return name
    return None

def setup_matplotlib_chinese():
    sys_name = platform.system()
    if sys_name == "Windows":
        candidates = ["Microsoft YaHei", "SimHei"]
    elif sys_name == "Darwin":
        candidates = ["PingFang SC", "Heiti SC", "Songti SC"]
    else:
        candidates = ["Noto Sans CJK SC", "WenQuanYi Zen Hei", "AR PL UMing CN"]
    chosen = _pick_first_installed(candidates) or "DejaVu Sans"
    matplotlib.rcParams["font.sans-serif"] = [chosen, "DejaVu Sans"]
    matplotlib.rcParams["axes.unicode_minus"] = False
    return sys_name, chosen

# 默认 inline；如需 3D 交互可改为 "widget"
BACKEND_MODE = "inline"  # 或 "widget"
ip = get_ipython()
if ip:
    ip.run_line_magic("matplotlib", BACKEND_MODE)

sys_name, font_used = setup_matplotlib_chinese()
print(f"[Info] OS={sys_name}, Matplotlib backend={matplotlib.get_backend()}, 中文字体={font_used}")
print(f"[Info] HAVE_SM={HAVE_SM}, HAVE_SK={HAVE_SK}, HAVE_SCIPY={HAVE_SCIPY}, HAVE_Q={HAVE_Q}")

# 基本参数（路径、表名、ROI、控制变量等）

- 请根据实际文件修改 `INPUT_XLSX`、`SHEET_NAME`、`OUTDIR`。  
- ROI（感兴趣区间 region of interest）字符串示例：`"Lx1:[0.60,0.64];Lx2:[0,0.04];Lx3:[-0.6,-0.1]"`  
- 控制变量 controls 默认 ["Kpx"]（可多选）

In [None]:
# 设置路径与参数
INPUT_XLSX = Path(r"F:\ResearchMainStream\0.ResearchBySection\C.动力学模型\C23参数优化\参数优化实现\ParallelSweepSimpack\ParallelSweep_OrthogonalDoE.xlsx")
SHEET_NAME = "Sheet1"   # 或 None 使用首个工作表
OUTDIR = Path.cwd() / "Results_OrthogonalDoE"
KEEP_POLICY = "any"     # 'any' 或 'opt_derived'
CONTROLS = ["Kpx"]      # 部分相关/回归中的控制变量
ROI_STRING = "Lx1:[0.60,0.64];Lx2:[0,0.04];Lx3:[-0.6,-0.1]"  # 可为空字符串

OUTDIR.mkdir(parents=True, exist_ok=True)
print("[Check] INPUT:", INPUT_XLSX, "存在:", INPUT_XLSX.exists())
print("[Check] OUTDIR:", OUTDIR)


# 函数库 · 读取与整形（Utilities: Reshaping）
工具函数（日志、保存、别名解析、读取转置 Excel、转整洁表）
- 别名解析 alias resolution：对列名进行标准化（大小写/空白/符号清理）并支持前缀/子串匹配，以应对“列名截断”问题。
- 转置 Excel → 整洁表：A–D 列为元信息（`row_type/eng/zh/flag`），E.. 为数值；按 `eng` 去重，优先保留 `row_type ∈ {DerivedQuantity, Derived, opt}`。

In [None]:
# 读取/整形工具函数
def setup_logging(outdir: Path, level=logging.INFO) -> None:
    outdir.mkdir(parents=True, exist_ok=True)
    logging.basicConfig(
        level=level,
        format="%(asctime)s [%(levelname)s] %(message)s",
        handlers=[
            logging.FileHandler(outdir / "run.log", encoding="utf-8"),
            logging.StreamHandler(sys.stdout),
        ],
    )

def save_csv(df: pd.DataFrame, path: Path) -> None:
    df.to_csv(path, index=False)

def save_json(obj: dict, path: Path) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2, ensure_ascii=False)

def normalize_token(s: str) -> str:
    if s is None:
        return ""
    s = str(s).strip().lower()
    s = re.sub(r"[\s_()\[\]\-–—]+", "", s)
    return s

def first_present(candidates: List[str], available: List[str]) -> Optional[str]:
    avail_norm = {normalize_token(a): a for a in available}
    cand_norm = [normalize_token(c) for c in candidates]
    for cn in cand_norm:
        if cn in avail_norm:
            return avail_norm[cn]
    for cn in cand_norm:
        for norm_a, a in avail_norm.items():
            if norm_a.startswith(cn) or cn.startswith(norm_a) or (cn in norm_a):
                return a
    return None

CANONICAL_SYNONYMS: Dict[str, List[str]] = {
    # Geometry / design knobs
    "Lx1": ["$_Lx1", "Lx1", "Lx_1", "Lx 1", "Lx1 (mm)", "Lx1(mm)"],
    "Lx2": ["$_Lx2", "Lx2", "Lx_2", "Lx 2", "Lx2 (mm)", "Lx2(mm)"],
    "Lx3": ["$_Lx3", "Lx3", "Lx_3", "Lx 3", "Local_Lx3", "Local Lx3", "Local-Lx3"],
    "Kpx": ["$_Kpx", "Kpx", "K_px", "K px"],
    "Chx": ["$_Chx", "Chx", "C_hx", "C hx"],
    "Cld": ["$_Cld", "Cld", "C_ld", "C ld"],
    "Kld": ["$_Kld", "Kld", "K_ld", "K ld"],
    "Ksx": ["$_Ksx", "Ksx", "K_sx", "K sx"],
    # Wear
    "RW Wear - Curve": ["RW Wear - Curve","RW Wear–Curve","RW Wear – Curve","RW_Wear_Curve","RW Wear(Curve)","RW Wear Curve","RW Wear-Curve"],
    "IRW Wear - Curve": ["IRW Wear - Curve","IRW Wear–Curve","IRW Wear – Curve","IRW_Wear_Curve","IRW Wear(Curve)","IRW Wear Curve","IRW Wear-Curve"],
    # Derived
    "DiffWear": ["DiffWear","Diff Wear","RW-IRW Wear","RW-IRW","RW minus IRW","DiffWear=RW-IRW Wear","ΔWear","DeltaWear"],
    "D": ["D","Lx1-Lx2","Lx1_minus_Lx2","D=Lx1-Lx2"],
    # Performance / comfort
    "RW Critical Vel - STR": ["RW Critical Vel - STR","RW Critical Velocity - STR","RW_crit_vel_STR","RW Critical Vel STR","RW Critical Vel-STR"],
    "RW SperlingY - STR": ["RW SperlingY - STR","RW Sperling Y - STR","RW_SperlingY_STR","SperlingY - STR (RW)","SperlingY_STR_RW"],
    "RW SperlingZ - STR": ["RW SperlingZ - STR","RW Sperling Z - STR","RW_SperlingZ_STR","SperlingZ - STR (RW)","SperlingZ_STR_RW"],
    "RBC": ["RBC","R.B.C","RBC_value"],
}

def resolve_aliases(columns: List[str], custom_aliases: Optional[Dict[str, List[str]]] = None) -> Dict[str, Optional[str]]:
    aliases = dict(CANONICAL_SYNONYMS)
    if custom_aliases:
        for k, v in custom_aliases.items():
            aliases.setdefault(k, [])
            aliases[k] = list(dict.fromkeys(aliases[k] + v))
    mapping = {}
    for canon, cands in aliases.items():
        found = first_present(cands + [canon], columns)
        mapping[canon] = found
    return mapping

def numeric_coerce(df: pd.DataFrame) -> pd.DataFrame:
    return df.apply(pd.to_numeric, errors="coerce")

def load_transposed_excel(path: Path, sheet: Optional[str]) -> pd.DataFrame:
    df = pd.read_excel(path, sheet_name=sheet, header=None, engine="openpyxl")
    df = df.replace({r"^\s*$": np.nan}, regex=True)
    return df

@dataclass
class TidyResult:
    tidy: pd.DataFrame
    row_meta: pd.DataFrame
    used_rows: pd.DataFrame
    info: dict

def transposed_to_tidy(
    df_raw: pd.DataFrame,
    keep_policy: str = "any",
    prefer_row_type: Tuple[str, ...] = ("DerivedQuantity", "Derived", "opt")
) -> TidyResult:
    meta = df_raw.iloc[:, :4].copy()
    meta.columns = ["row_type", "eng", "zh", "flag"]
    runs = df_raw.iloc[:, 4:].copy()
    runs_num = numeric_coerce(runs)

    m_eng = meta["eng"].notna()
    m_numeric = runs_num.apply(lambda r: r.notna().any(), axis=1)
    if keep_policy == "opt_derived":
        m_type = meta["row_type"].astype(str).str.lower().str.startswith(("opt", "derived"))
        keep = m_eng & m_numeric & m_type
    else:
        keep = m_eng & m_numeric

    df_keep = pd.concat([meta.loc[keep, ["row_type","eng","zh","flag"]],
                         runs_num.loc[keep, :]], axis=1)

    def pref_order(s):
        s = str(s) if pd.notna(s) else ""
        s_low = s.lower()
        for i, p in enumerate(prefer_row_type):
            if s_low.startswith(p.lower()):
                return i
        return len(prefer_row_type)

    df_keep["pref_rank"] = df_keep["row_type"].map(pref_order)
    df_keep = df_keep.sort_values(by=["eng","pref_rank"]).drop_duplicates(subset=["eng"], keep="first")
    df_keep = df_keep.drop(columns=["pref_rank"])

    run_values = df_keep.iloc[:, 4:].copy()
    run_values.index = df_keep["eng"].astype(str).str.strip()
    wide = run_values.T.copy()
    wide.index.name = "sample_id"
    wide.reset_index(inplace=True)
    wide["sample_id"] = wide.index.astype(int)
    tidy = wide.copy()

    info = {"n_runs": int(tidy.shape[0]), "n_vars": int(tidy.shape[1]-1),
            "keep_policy": keep_policy, "prefer_row_type": list(prefer_row_type)}

    return TidyResult(
        tidy=tidy,
        row_meta=df_keep[["row_type","eng","zh","flag"]].reset_index(drop=True),
        used_rows=df_keep,
        info=info
    )


# 函数库 · 分析与可视化（Utilities: Analytics）
分析函数（相关/部分相关、VIF、散点拟合、OLS、RF、ROI）

- partial correlation：残差化 residualization 后计算相关系数；可选 `SciPy` 给出 p 值。  
- OLS（HC3）：`y ~ poly(Lx3,2) + controls + interactions(Lx3:Z)`；保存系数表与摘要。  
- 随机森林：若 `scikit-learn` 可用，计算置换重要性 permutation importance。

In [None]:
# 分析工具函数
def pearson_corr(df: pd.DataFrame, cols: Sequence[str]) -> pd.DataFrame:
    sub = df[list(cols)].copy()
    return sub.corr(method="pearson")

def partial_corr(df: pd.DataFrame, x: str, y: str, controls: Sequence[str]) -> Tuple[float, Optional[float], int]:
    cols = [x, y] + list(controls)
    sub = df[cols].dropna()
    n = sub.shape[0]
    if n < (len(controls) + 3):
        return np.nan, None, n

    Xc = sub[list(controls)].values
    Xc = np.column_stack([np.ones(len(Xc)), Xc]) if Xc.size else np.ones((len(sub), 1))

    def resid(v: np.ndarray) -> np.ndarray:
        beta, *_ = np.linalg.lstsq(Xc, v, rcond=None)
        return v - Xc @ beta

    rx = resid(sub[x].values.astype(float))
    ry = resid(sub[y].values.astype(float))
    r = np.corrcoef(rx, ry)[0, 1]

    pval = None
    if HAVE_SCIPY and np.isfinite(r):
        k = len(controls)
        dfree = n - k - 2
        if dfree > 0 and abs(r) < 1:
            tstat = r * math.sqrt(dfree / (1 - r**2))
            pval = 2 * spstats.t.sf(abs(tstat), df=dfree)
    return float(r), (float(pval) if pval is not None else None), int(n)

def vif_table(df: pd.DataFrame, cols: Sequence[str]) -> Optional[pd.DataFrame]:
    if not HAVE_SM:
        return None
    X = df[list(cols)].dropna()
    if X.shape[1] < 2 or X.shape[0] <= X.shape[1]:
        return None
    X = sm.add_constant(X)
    from statsmodels.stats.outliers_influence import variance_inflation_factor
    vifs = []
    for i, name in enumerate(X.columns):
        if name == "const":
            continue
        vifs.append({"feature": name, "VIF": variance_inflation_factor(X.values, i)})
    return pd.DataFrame(vifs)

def scatter_with_fit(df: pd.DataFrame, x: str, y: str, outpath: Path, title: Optional[str] = None, show: bool = True):
    sub = df[[x, y]].dropna()
    if sub.shape[0] < 2:
        return
    xvals = sub[x].values
    yvals = sub[y].values
    coeff = np.polyfit(xvals, yvals, 1)
    xgrid = np.linspace(np.nanmin(xvals), np.nanmax(xvals), 200)
    yfit = coeff[0] * xgrid + coeff[1]

    plt.figure()
    plt.scatter(xvals, yvals, s=16)
    plt.plot(xgrid, yfit, linewidth=2)
    plt.xlabel(x); plt.ylabel(y)
    plt.title(title or f"{x} vs {y}")
    plt.tight_layout()
    plt.savefig(outpath, dpi=200)
    if show:
        plt.show()
    plt.close()

def ols_lx3_model(df: pd.DataFrame, y: str, controls: Sequence[str], interactions: Sequence[Tuple[str, str]], out_prefix: Path) -> Optional[dict]:
    if not HAVE_SM:
        logging.warning("statsmodels not installed; OLS step skipped.")
        return None

    pieces = ["1", "np.power(Lx3,1)", "np.power(Lx3,2)"]
    for c in controls:
        pieces.append(c)
    for a, b in interactions:
        pieces.append(f"{a}:{b}")
        if a not in controls and a != "Lx3":
            pieces.append(a)
        if b not in controls and b != "Lx3":
            pieces.append(b)

    uniq = []
    for p in pieces:
        if p not in uniq:
            uniq.append(p)

    # 若 patsy.Q 可用，用 Q("y") 防止目标名含空格
    lhs = f'Q("{y}")' if HAVE_Q else y
    formula = f'{lhs} ~ ' + " + ".join(uniq)

    need_cols = ["Lx3", y] + list(controls) + [b for a, b in interactions if b not in controls and b != "Lx3"]
    sub = df[need_cols].dropna()
    if sub.shape[0] < 12:
        logging.warning("Too few rows for OLS on %s (n=%d); skipped.", y, sub.shape[0])
        return None

    model = smf.ols(formula, data=sub)
    res = model.fit(cov_type="HC3")

    coef = res.params.to_frame("coef").join(res.bse.to_frame("se")).join(res.pvalues.to_frame("pvalue")).reset_index().rename(columns={"index":"term"})
    coef_path = out_prefix.with_suffix("").as_posix() + "_coef.csv"
    coef.to_csv(coef_path, index=False)

    summary_path = out_prefix.with_suffix("").as_posix() + "_summary.txt"
    with open(summary_path, "w", encoding="utf-8") as f:
        f.write(res.summary().as_text())

    return {"formula": formula, "n": int(res.nobs), "rsq": float(res.rsquared), "adj_rsq": float(res.rsquared_adj),
            "coef_csv": coef_path, "summary_txt": summary_path}

def rf_permutation_importance(df: pd.DataFrame, y: str, features: Sequence[str], out_csv: Path) -> Optional[pd.DataFrame]:
    if not HAVE_SK:
        logging.info("scikit-learn not installed; RF skipped.")
        return None
    sub = df[list(features)+[y]].dropna()
    if sub.shape[0] < 50:
        logging.info("Too few rows for RF on %s (n=%d); skipped.", y, sub.shape[0])
        return None
    X = sub[features].values
    Y = sub[y].values.astype(float)
    rf = RandomForestRegressor(n_estimators=400, max_depth=None, min_samples_leaf=3, random_state=42, n_jobs=-1)
    rf.fit(X, Y)
    imp = permutation_importance(rf, X, Y, n_repeats=10, random_state=42, n_jobs=-1)
    tab = pd.DataFrame({"feature": features, "perm_importance": imp.importances_mean}).sort_values("perm_importance", ascending=False)
    tab.to_csv(out_csv, index=False)
    return tab

def parse_roi(s: Optional[str]) -> Dict[str, Tuple[Optional[float], Optional[float]]]:
    if not s:
        return {}
    out = {}
    parts = [p.strip() for p in s.split(";") if p.strip()]
    for p in parts:
        m = re.match(r"^([A-Za-z0-9_]+)\s*:\s*\[\s*([-\d\.eE]+)?\s*,\s*([-\d\.eE]+)?\s*\]\s*$", p)
        if not m:
            continue
        k = m.group(1)
        lo = float(m.group(2)) if m.group(2) else None
        hi = float(m.group(3)) if m.group(3) else None
        out[k] = (lo, hi)
    return out

def apply_roi(df: pd.DataFrame, roi: Dict[str, Tuple[Optional[float], Optional[float]]]) -> pd.DataFrame:
    sub = df.copy()
    for k, (lo, hi) in roi.items():
        if k not in sub.columns:
            continue
        if lo is not None:
            sub = sub[sub[k] >= lo]
        if hi is not None:
            sub = sub[sub[k] <= hi]
    return sub


# 读取与整形（Load & Tidy）
读取转置 Excel 并预览

- A–D 列为元信息（`row_type/eng/zh/flag`），E.. 为各试验列（数值）。  
- 展示原始形状与前几行，确认数据结构。

In [None]:
# 读取与预览
setup_logging(OUTDIR)
logging.info("Loading Excel: %s [sheet=%s]", INPUT_XLSX, SHEET_NAME)
df_raw = load_transposed_excel(INPUT_XLSX, SHEET_NAME)
print("df_raw.shape =", df_raw.shape)
# display(df_raw.head(8)) # 维度太大，暂不打印


# 转换为整洁表 tidy DataFrame（去重规则与元信息）

- 依据 `eng` 去重；优先保留 `row_type ∈ {DerivedQuantity, Derived, opt}`。  
- 输出：整洁表（每行一组试验）、行元信息 `row_meta`、摘要 `info`。

In [None]:
# 转换为 tidy
tidy_res = transposed_to_tidy(df_raw, keep_policy=KEEP_POLICY)
tidy = tidy_res.tidy.copy()          # 含 'sample_id' 列
row_meta = tidy_res.row_meta.copy()  # 选中的行字典
info = tidy_res.info

print("tidy.shape =", tidy.shape, "| n_runs =", info["n_runs"], "| n_vars =", info["n_vars"])
display(tidy.head(5))
display(row_meta.head(10))

# 保存中间结果
save_csv(tidy, OUTDIR / "tidy_raw.csv")
save_csv(row_meta, OUTDIR / "row_meta.csv")
print("[Saved] tidy_raw.csv, row_meta.csv")


# 别名解析与重命名（Alias Resolution & Renaming）
别名映射并重命名到“规范名”（不丢失原列）

- 从 `tidy` 提取全部变量列（除 `sample_id`），与内置 `CANONICAL_SYNONYMS` 做匹配。  
- 保存 `aliases_used.json` 以便溯源；展示映射表的前若干行。

In [None]:
# 解析别名与重命名
cols_all = [c for c in tidy.columns if c != "sample_id"]

# 可选：加载自定义别名 JSON（若有）
CUSTOM_ALIASES_PATH = None  # Path("custom_aliases.json")
custom_aliases = None
if CUSTOM_ALIASES_PATH and Path(CUSTOM_ALIASES_PATH).is_file():
    with open(CUSTOM_ALIASES_PATH, "r", encoding="utf-8") as f:
        custom_aliases = json.load(f)

alias_map = resolve_aliases(cols_all, custom_aliases)
save_json(alias_map, OUTDIR / "aliases_used.json")

# 将“真实列名”→“规范名”，形成重命名映射
canon_to_actual = {k: v for k, v in alias_map.items() if v is not None}
actual_to_canon = {v: k for k, v in canon_to_actual.items()}
renamed = tidy.rename(columns=actual_to_canon)
df = renamed.copy()

# 展示映射表
alias_df = pd.DataFrame([
    {"canonical": k, "actual": v if v is not None else "(not found)"}
    for k, v in alias_map.items()
]).sort_values("canonical")
display(alias_df.head(20))

# 列名概览
display(pd.DataFrame({"columns": df.columns.tolist()}).head(40))


# 派生变量与范围校验（Derived & Validation）
计算派生变量并进行范围检查

- `D_calc = Lx1 - Lx2`  
- `DiffWear_calc = (RW Wear - Curve) - (IRW Wear - Curve)`  
- 若存在原生 `DiffWear`，给出与 `DiffWear_calc` 的差值统计。  
- 根据预设 `RANGES` 做宽松范围校验，保存 `range_checks.json`。

In [None]:
# 派生变量与校验
RANGES = {
    "Lx1": (0.0, 0.64), "Lx2": (0.0, 0.64), "Lx3": (-0.6, 0.4),
    "Kpx": (50000, 15000000), "Chx": (100, 300000), "Cld": (10000, 200000),
    "Kld": (2000000, 20000000), "Ksx": (15000, 500000),
}

'''
参量项             定义	            是否优化	 基准值	     下限	     上限
$_sprCpz    一系悬挂阻尼-垂向	        1	    10000.00 	5000.00 	60000.00 
$_Kpx	    一系悬挂刚度-纵向	        1	    800000.00 	50000.00 	15000000.00 
$_Kpz	    一系悬挂刚度-垂向	        1	    600000.00 	500000.00 	2000000.00 
$_Ksx 	    二系悬挂刚度-纵向 	        1	    120000.00 	15000.00 	500000.00 
$_Ksz	    二系悬挂刚度-垂向	        1	    150000.00 	50000.00 	600000.00 
$_Csz	    二系悬挂阻尼-垂向	        1	    20000.00 	5000.00 	60000.00 
$_Kld	    横向减振器刚度	            1	    8000000.00 	2000000.00 	20000000.00  # 此下限值可能被善意突破
$_Cld	    横向减振器阻尼	            1	    50000.00 	10000.00 	200000.00 
$_Chx	    抗蛇行减振器阻尼	        1	    600000.00 	100.00 	    300000.00 
$_Lx1	    构架侧横向定位间距	        1	    0.20 	    0	        0.64
$_Lx2	    轴桥侧横向定位间距	        1	    0.58 	    0	        0.64
$_Lx3	    纵向定位调整值，可为负数     1	     0.00 	    -0.6	     0.4

'''

# D_calc
if "Lx1" in df.columns and "Lx2" in df.columns:
    df["D_calc"] = df["Lx1"] - df["Lx2"]

# DiffWear_calc
rw_col = "RW Wear - Curve" if "RW Wear - Curve" in df.columns else None
irw_col = "IRW Wear - Curve" if "IRW Wear - Curve" in df.columns else None
if rw_col and irw_col:
    df["DiffWear_calc"] = df[rw_col] - df[irw_col]

if "DiffWear" in df.columns and "DiffWear_calc" in df.columns:
    delta = (df["DiffWear"] - df["DiffWear_calc"]).abs()
    df["DiffWear_match"] = delta
    print("DiffWear vs DiffWear_calc | median abs diff:", float(delta.median()))

# 范围检查
validations = []
for k, (lo, hi) in RANGES.items():
    if k in df.columns:
        v = df[k].dropna()
        if v.empty:
            continue
        vmin, vmax = float(v.min()), float(v.max())
        ok = True
        if lo is not None and vmin < lo: ok = False
        if hi is not None and vmax > hi: ok = False
        validations.append({"var": k, "min": vmin, "max": vmax, "range_ok": ok})

valid_df = pd.DataFrame(validations)
display(valid_df)
save_json({"range_checks": validations}, OUTDIR / "range_checks.json")

# 保存整洁表（含派生列）
save_csv(df, OUTDIR / "tidy.csv")
print("[Saved] tidy.csv, range_checks.json")


# 相关分析（Correlation）
皮尔逊相关（设计变量 & 输出变量）

- 设计变量 *design knobs*：`Lx1, Lx2, Lx3, Kpx, Chx, Cld, Kld, Ksx`（若存在）  
- 输出/性能指标：磨耗与速度/舒适度（若存在）  
- 同时输出 `Lx3` 与磨耗目标的成对相关。

In [None]:
# 相关矩阵
design_knobs = [c for c in ["Lx1","Lx2","Lx3","Kpx","Chx","Cld","Kld","Ksx"] if c in df.columns]
outcomes = [c for c in ["RW Wear - Curve","IRW Wear - Curve","DiffWear","DiffWear_calc",
                        "RW Critical Vel - STR","RW SperlingY - STR","RW SperlingZ - STR"] if c in df.columns]

if design_knobs:
    corr_knobs = pearson_corr(df, design_knobs)
    display(Markdown("**设计变量相关矩阵（Pearson）**"))
    display(corr_knobs.round(3))
    save_csv(corr_knobs, OUTDIR / "corr_design_knobs.csv")
if outcomes:
    corr_outcomes = pearson_corr(df, outcomes)
    display(Markdown("**输出变量相关矩阵（Pearson）**"))
    display(corr_outcomes.round(3))
    save_csv(corr_outcomes, OUTDIR / "corr_outcomes.csv")

pairs = []
if "Lx3" in df.columns:
    for y in [c for c in ["RW Wear - Curve","IRW Wear - Curve","DiffWear","DiffWear_calc"] if c in df.columns]:
        r = df[["Lx3", y]].dropna().corr().iloc[0,1]
        pairs.append({"X":"Lx3","Y":y,"pearson_r": float(r)})
pairs_df = pd.DataFrame(pairs)
display(Markdown("**Lx3 与磨耗目标的成对相关**"))
display(pairs_df)
save_csv(pairs_df, OUTDIR / "corr_Lx3_vs_targets.csv")

# 部分相关（Partial Correlation）
 部分相关（控制变量 *controls*）

- 方法：对 `x` 与 `y` 分别以 `controls` 作 OLS 残差化，然后计算皮尔逊相关。  
- `SciPy` 可用时，给出 t 统计与双尾 p 值（自由度 `n - k - 2`）。

In [None]:
# 计算部分相关
pc_rows = []
controls_used = [c for c in CONTROLS if c in df.columns]
for (x, y) in [("Lx3","RW Wear - Curve"),("Lx3","IRW Wear - Curve"),
               ("Lx3","DiffWear"),("Lx3","DiffWear_calc")]:
    if x in df.columns and y in df.columns:
        r, p, n = partial_corr(df, x, y, controls=controls_used)
        pc_rows.append({"X":x, "Y":y, "controls":"+".join(controls_used), "pcorr": r, "pval": p, "n": n})
pc_df = pd.DataFrame(pc_rows)
display(pc_df)
save_csv(pc_df, OUTDIR / "partial_corr.csv")


# 散点与拟合线（Scatter & Fit）
绘制散点与线性拟合（并保存）

- X 固定为 `Lx3`，Y 依次为：`RW Wear - Curve / IRW Wear - Curve / DiffWear / DiffWear_calc`（若存在）  
- 每张图**先显示**，再保存至 `fig/` 目录。

In [None]:
# 逐一绘制并保存散点-拟合
figdir = OUTDIR / "fig"
figdir.mkdir(parents=True, exist_ok=True)

targets = [c for c in ["RW Wear - Curve","IRW Wear - Curve","DiffWear","DiffWear_calc"] if c in df.columns]
for y in targets:
    if "Lx3" in df.columns:
        outpath = figdir / f"Lx3_vs_{y.replace(' ', '_').replace('/', '-')}.png"
        scatter_with_fit(df, "Lx3", y, outpath, title=f"Lx3 vs {y}", show=True)
        print("[Saved]", outpath)


# OLS 回归（多项式与交互，HC3）
OLS（含 poly(Lx3,2) 与交互），保存摘要与系数并展示关键结果

- 目标 `y`：`RW Wear - Curve`、`IRW Wear - Curve`、`DiffWear_calc`  
- 控制变量：`["Lx1","Lx2","Kpx","Chx"]`（存在即用）  
- 交互项：`Lx3:Z`，其中 `Z` 来自以上控制变量  
- 输出：`ols_<y>_coef.csv` 与 `ols_<y>_summary.txt`；在下方**打印摘要片段并展示系数表头

In [None]:
# 运行 OLS 与展示结果
ols_info = {}
if "Lx3" in df.columns and HAVE_SM:
    base_controls = [c for c in ["Lx1","Lx2","Kpx","Chx"] if c in df.columns]
    inters = [("Lx3", z) for z in base_controls if z != "Lx3"]
    for y in [c for c in ["RW Wear - Curve","IRW Wear - Curve","DiffWear_calc"] if c in df.columns]:
        info = ols_lx3_model(df, y=y, controls=base_controls, interactions=inters,
                             out_prefix=OUTDIR / f"ols_{y.replace(' ', '_')}")
        if info:
            ols_info[y] = info
            # 展示系数表头
            coef_df = pd.read_csv(info["coef_csv"])
            display(Markdown(f"**OLS 系数表（{y}）**"))
            display(coef_df.head(10))
            # 打印摘要的前若干行
            with open(info["summary_txt"], "r", encoding="utf-8") as f:
                summary_text = "".join(f.readlines()[:40])
            print("—— 摘要片段 ——")
            print(summary_text)

# 保存元信息
save_json({"ols": ols_info}, OUTDIR / "ols_meta.json")
print("[Saved] ols_meta.json")

# 共线性与随机森林（VIF & Random Forest）
VIF（方差膨胀因子）与随机森林置换重要性（可选）
- VIF：需 `statsmodels`；当样本量足够且变量数 ≥ 2 时计算。  
- RF 重要性：需 `scikit-learn`；样本量不足则跳过。

In [None]:
# VIF 与 RF
vif_features = [c for c in ["Lx1","Lx2","Lx3","Kpx","Chx","Cld","Kld","Ksx"] if c in df.columns]
vifdf = vif_table(df, vif_features)
if vifdf is not None:
    save_csv(vifdf, OUTDIR / "vif_design_knobs.csv")
    display(Markdown("**VIF（设计变量）**"))
    display(vifdf)

rf_features = [c for c in ["Lx1","Lx2","Lx3","Kpx","Chx","Cld","Kld","Ksx"] if c in df.columns]
if rf_features:
    for y in [c for c in ["RW Wear - Curve","IRW Wear - Curve","DiffWear_calc"] if c in df.columns]:
        tab = rf_permutation_importance(df, y, rf_features, OUTDIR / f"rf_perm_importance_{y.replace(' ', '_')}.csv")
        if tab is not None:
            display(Markdown(f"**RF 置换重要性（{y}）**"))
            display(tab.head(10))


## RF 置换重要性（Permutation Importance）解读
功能：衡量模型对某一特征的依赖度（预测贡献）。做法是把该特征打乱（破坏其信息），比较模型分数（回归常用 R^2）的下降量：

Ij = Score(X,y)−Score(πj(X),y), πj表示只打乱第 j 列。可读信息：数值越大，模型越“离不开”该特征。与 线性回归的显著性不同，它是预测层面的度量，不是“系数为 0 否”的假设检验。

## 结果摘要：
- RW：Kpx(1.52) ≫ Lx1(0.60) ≳ Lx3(0.50) ≫ 其余 → Kpx 是最强驱动因子；
- IRW：Lx2(1.19) ≈ Kpx(1.18) > Lx1(0.83) > Lx3(0.67) → Lx2 与 Kpx 双强；
- DiffWear：Kpx(1.52) > Lx1(0.86) > Lx3(0.45) → 仍是 Kpx 主导。

提醒：这些数值是在各自目标 y 下的下降量，不同 y 之间不可直接比较；只在同一目标内比较相对大小。

# ROI 斜率估计（Slope within ROI）
在 ROI 内拟合 `DiffWear`（或 `DiffWear_calc`）对 `Lx3` 的线性斜率

- 解析 ROI 字符串，筛选子集后用最小二乘估计斜率。  
- 若 `statsmodels` 不可用，则回退至 `numpy.linalg.lstsq`。

In [None]:
# 计算 ROI 斜率并保存
roi = parse_roi(ROI_STRING)
roi_meta = {}
if roi and "Lx3" in df.columns and ("DiffWear_calc" in df.columns or "DiffWear" in df.columns):
    yvar = "DiffWear_calc" if "DiffWear_calc" in df.columns else "DiffWear"
    sub = apply_roi(df, roi)[["Lx3", yvar]].dropna()
    if sub.shape[0] >= 10:
        if HAVE_SM:
            X = sm.add_constant(sub["Lx3"].values)
        else:
            X = np.column_stack([np.ones(len(sub)), sub["Lx3"].values])
        beta, *_ = np.linalg.lstsq(X, sub[yvar].values.astype(float), rcond=None)
        slope = float(beta[-1])
        roi_meta = {"roi": roi, "n": int(sub.shape[0]), "y": yvar, "slope_dDeltaWear_dLx3": slope}
        save_json(roi_meta, OUTDIR / "roi_slope_DiffWear_vs_Lx3.json")
        print("[ROI] d(ΔWear)/d(Lx3) =", slope, "| n =", sub.shape[0])
    else:
        print("[ROI] 样本不足，跳过（n<10）")
else:
    print("[ROI] 未设定或缺少必要列，跳过。")


# 输出清单（Artifacts）
输出文件总览（按文件大小降序）

- 便于快速定位已生成的 CSV/JSON/PNG 等文件。

In [None]:
# 列出 OUTDIR 内容
files = []
for p in OUTDIR.rglob("*"):
    if p.is_file():
        files.append({"path": str(p.relative_to(OUTDIR)), "size_kb": round(p.stat().st_size/1024, 1)})
out_df = pd.DataFrame(files).sort_values("size_kb", ascending=False)
display(out_df.head(50))
print(f"[Done] 输出目录：{OUTDIR}")