In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, f1_score
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import numpy as np

# ==== 读取与按不同键合并 ====
Xy = pd.read_csv("clean_numeric_model.csv")        # 含 User_ID 与目标列
splits = pd.read_csv("splits_70_15_15_k5.csv")     # 含 row_id, split ∈ {train,val,test}

# 用 User_ID (左) ↔ row_id (右) 对齐
df = Xy.merge(splits[["row_id", "split"]], left_on="User_ID", right_on="row_id", how="inner")

# ==== 目标列设置（按你们实际情况二选一）====
TARGET = "Severity_ord"       # 如果你们用的是 severity_level，就改成 "severity_level"

# 若目标非整数，做编码（0..K-1）
if df[TARGET].dtype.kind not in "iu":
    le = LabelEncoder()
    df[TARGET] = le.fit_transform(df[TARGET].astype(str))
    print("Encoded target classes:", dict(enumerate(le.classes_)))

# ==== 组装特征与标签 ====
# 去掉目标、split、以及两个ID列，剩余都是特征
drop_cols = {TARGET, "split", "row_id", "User_ID"}
feature_cols = [c for c in df.columns if c not in drop_cols]
X = df[feature_cols]
y = df[TARGET].astype(int)

# 划分训练/测试
X_train, y_train = X[df["split"] == "train"], y[df["split"] == "train"]
X_test,  y_test  = X[df["split"] == "test"],  y[df["split"] == "test"]
print(f"Shapes -> X_train: {X_train.shape}, X_test: {X_test.shape}")

# ==== 模型（L2 多项逻辑回归） ====
clf = LogisticRegression(
    penalty="l2",
    solver="lbfgs",            # 支持 multinomial
    multi_class="multinomial",
    max_iter=1000,
    C=1.0,
    n_jobs=-1,
    random_state=42
)
clf.fit(X_train, y_train)

# ==== 预测与评估 ====
y_pred = clf.predict(X_test)
y_prob = clf.predict_proba(X_test)

print("\n=== Classification report ===")
print(classification_report(y_test, y_pred, zero_division=0, digits=4))

macro_f1 = f1_score(y_test, y_pred, average="macro", zero_division=0)
macro_auc = np.nan
try:
    # 多分类宏平均 AUC（OvR）
    macro_auc = roc_auc_score(y_test, y_prob, multi_class="ovr", average="macro")
except Exception as e:
    print("[Warn] AUC failed:", e)

print("\n=== Macro metrics ===")
print(f"Macro-F1       : {macro_f1:.4f}")
print("Macro ROC-AUC  :", "NaN" if np.isnan(macro_auc) else f"{macro_auc:.4f}")

ModuleNotFoundError: No module named 'sklearn'

In [None]:
# ==== 组装特征与标签（沿用你已有的代码到这里）====
drop_cols = {TARGET, "split", "row_id", "User_ID"}
feature_cols = [c for c in df.columns if c not in drop_cols]
X = df[feature_cols]
y = df[TARGET].astype(int)

# ===== 按 split 划分（只在 train 上做 SMOTE/拟合）=====
X_train, y_train = X[df["split"] == "train"], y[df["split"] == "train"]
X_test,  y_test  = X[df["split"] == "test"],  y[df["split"] == "test"]
print(f"Shapes -> X_train: {X_train.shape}, X_test: {X_test.shape}")

# ====== SMOTENC 设置 ======
# 1) 优先从 smote_config.json 读取 categorical_indices / numeric_indices
# 2) 若文件不存在，则按列名后缀自动推断：*_lbl / *_bin 视为类别，其余数值型为数值
import json, os
from pathlib import Path

smote_cfg_path = Path("smote_config.json")
if smote_cfg_path.exists():
    with open(smote_cfg_path, "r", encoding="utf-8") as f:
        smote_cfg = json.load(f)
    # indices 是基于 feature_cols 顺序的下标
    cat_idx = smote_cfg.get("categorical_indices", [])
    num_idx = smote_cfg.get("numeric_indices", [])
    # 修正: 只保留合法下标，避免越界
    cat_idx = [i for i in cat_idx if 0 <= i < len(feature_cols)]
    num_idx = [i for i in num_idx if 0 <= i < len(feature_cols)]
else:
    cat_cols = [c for c in feature_cols if c.endswith("_lbl") or c.endswith("_bin")]
    num_cols = [c for c in feature_cols if c not in cat_cols]
    cat_idx = [feature_cols.index(c) for c in cat_cols]
    num_idx = [feature_cols.index(c) for c in num_cols]

# 为后续 ColumnTransformer 也保留列名列表
cat_cols = [feature_cols[i] for i in cat_idx]
num_cols = [feature_cols[i] for i in num_idx]

print(f"[Info] Categorical columns: {len(cat_cols)} -> {cat_cols[:8]}{'...' if len(cat_cols)>8 else ''}")
print(f"[Info] Numeric columns    : {len(num_cols)} -> {num_cols[:8]}{'...' if len(num_cols)>8 else ''}")



In [None]:
# ====== 建 Pipeline：SMOTENC(仅 fit 时在训练集触发) -> 预处理 -> 逻辑回归 ======
from imblearn.over_sampling import SMOTENC
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, f1_score
import numpy as np

In [None]:


# SMOTENC 参数：如 smote_config.json 里有建议就读出来；否则给一套稳妥缺省
smote_params = {
    "categorical_features": cat_idx,
    "sampling_strategy": "auto",   # 可按需要改为 float/字典
    "k_neighbors": 5,
    "random_state": 42,
}
if smote_cfg_path.exists():
    # 将已有配置中常见键带过来（若存在）
    for k in ["sampling_strategy", "k_neighbors", "random_state"]:
        if k in smote_cfg:
            smote_params[k] = smote_cfg[k]

smote = SMOTENC(**smote_params)

# 预处理：类别 -> OneHot，数值 -> StandardScaler（保持均值0方差1，有助于LR收敛）
preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), cat_cols),
        ("num", StandardScaler(), num_cols),
    ],
    remainder="drop"
)

# 逻辑回归：多分类、L2；若仍不平衡，可再加 class_weight="balanced"（与SMOTE可二选一，不建议同时用）
clf = LogisticRegression(
    penalty="l2",
    solver="lbfgs",
    multi_class="multinomial",
    max_iter=1000,
    C=1.0,
    n_jobs=-1,
    random_state=42
)

pipe = ImbPipeline(steps=[
    ("smote", smote),          # 仅在 fit(X_train, y_train) 时调用 fit_resample
    ("prep", preprocess),
    ("clf", clf)
])

# ====== 训练（只用训练集；不会对测试集过采样）======
pipe.fit(X_train, y_train)

# ====== 预测与评估 ======
y_pred = pipe.predict(X_test)
y_prob = pipe.predict_proba(X_test)

print("\n=== Classification report ===")
print(classification_report(y_test, y_pred, zero_division=0, digits=4))

macro_f1 = f1_score(y_test, y_pred, average="macro", zero_division=0)
try:
    macro_auc = roc_auc_score(y_test, y_prob, multi_class="ovr", average="macro")
except Exception as e:
    macro_auc = np.nan
    print("[Warn] AUC failed:", e)

print("\n=== Macro metrics ===")
print(f"Macro-F1       : {macro_f1:.4f}")
print("Macro ROC-AUC  :", "NaN" if np.isnan(macro_auc) else f"{macro_auc:.4f}")
