In [None]:
# svm_classification.py
import numpy as np
import pandas as pd
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

EMBEDDING_FILE = "data/text_embeddings.npy"
DATA_FILE = "data/data.csv"

X = np.load(EMBEDDING_FILE)
y = pd.read_csv(DATA_FILE)["label"].values


embedding shape: (10000, 384), label: [0 1]


In [16]:
print(f"embedding shape: {X.shape}, label: {np.unique(y)}")

unique, counts = np.unique(y, return_counts=True)
label_dist = dict(zip(unique, counts))
print("Label Distribution:")
for label, count in label_dist.items():
    print(f"  Label {label}: {count} samples")

embedding shape: (10000, 384), label: [0 1]
Label Distribution:
  Label 0: 6455 samples
  Label 1: 3545 samples


In [None]:
import numpy as np
from sklearn.svm import SVC
from itertools import product
from collections import Counter

# ============== 可配置区域 ==============
PARAM_GRID_SVC = {
    "C": [0.1, 1.0, 10.0, 15.0],
    "gamma": [0.0001, 0.001, 0.01, 0.1, 1.0],
    "kernel": ["rbf"]
}

OUTER_K = 10
INNER_K = 3
SEED = 42
# ======================================


# ---- kfold indices ----
def simple_kfold_indices(n_samples, k, rng):
    indices = np.arange(n_samples)
    rng.shuffle(indices)
    # 第k折中包含的index
    return np.array_split(indices, k)


# ---- 准确率计算 ----
def accuracy_score(y_true, y_pred):
    return (np.asarray(y_true) == np.asarray(y_pred)).mean()


# ---- 构造模型 ----
def make_svc(params):
    # 哪些超参数是允许的（屏蔽掉多余的参数）
    allowed = {"C", "kernel", "gamma"}
    # 去除allowed中的值，放在一个新的字典 kwargs 中
    kwargs = {k: v for k, v in params.items() if k in allowed}
    # 返回模型对象
    return SVC(**kwargs)


# ---- 生成所有参数组合 ----
def param_grid_iter(param_grid):
    keys = list(param_grid.keys())
    for values in product(*[param_grid[k] for k in keys]):
        yield dict(zip(keys, values))


# ---- 内层交叉验证：选择最优参数 ----
def inner_cv_select_params(X, y, train_idx, inner_k, param_grid, rng, make_model_fn):
    # 在给定的训练样本 train_idx 上，用 inner_k 折交叉验证，对 param_grid 中所有候选超参数组合进行测试

    # len(folds)为inner_k，即inner_k组
    folds = simple_kfold_indices(len(train_idx), inner_k, rng)
    folds = [train_idx[f] for f in folds] # 相对索引转换成全局索引
    best_score = -np.inf
    best_param = None

    for params in param_grid_iter(param_grid):
        scores = []
        for f in range(inner_k):
            val_idx = folds[f]
            tr_idx = np.concatenate([folds[j] for j in range(inner_k) if j != f])

            model = make_model_fn(params)
            model.fit(X[tr_idx], y[tr_idx])
            pred = model.predict(X[val_idx])
            scores.append(accuracy_score(y[val_idx], pred))
        
        # 核心比较部分，只有大于average的才会被计入best
        avg_score = np.mean(scores)
        if avg_score > best_score:
            best_score = avg_score
            best_param = params

    return best_param 


# ---- 嵌套交叉验证 ----
def nested_cv(X, y,
              outer_k=OUTER_K,
              inner_k=INNER_K,
              param_grid=PARAM_GRID_SVC,
              make_model_fn=make_svc,
              seed=SEED):
    
    rng = np.random.default_rng(seed)
    folds = simple_kfold_indices(len(X), outer_k, rng)
    outer_scores = []
    chosen_params_each_fold = []

    for i in range(outer_k):
        test_idx = folds[i]
        train_idx = np.concatenate([folds[j] for j in range(outer_k) if j != i])
        
        best_param = inner_cv_select_params(X, y, train_idx, inner_k, param_grid, rng, make_model_fn)
        chosen_params_each_fold.append(best_param)

        model = make_model_fn(best_param)
        model.fit(X[train_idx], y[train_idx])
        pred = model.predict(X[test_idx])
        score = accuracy_score(y[test_idx], pred)
        outer_scores.append(score)

        print(f"[Outer Fold {i+1}/{outer_k}] best_params={best_param}, test_acc={score:.4f}")

    mean_acc = np.mean(outer_scores)
    std_acc = np.std(outer_scores)

    print("\n=== Final Results ===")
    print("Outer Fold Scores:", [round(s, 4) for s in outer_scores])
    print(f"Mean={mean_acc:.4f}, Std={std_acc:.4f}")

    # 打印各超参数选择频率
    selections = Counter([tuple(sorted(p.items())) for p in chosen_params_each_fold])
    print("\n=== Chosen Hyperparameters Across Folds ===")
    for combo, count in selections.items():
        print(f"{dict(combo)}: chosen {count} times")

    return {
        "outer_scores": outer_scores,
        "mean_acc": mean_acc,
        "std_acc": std_acc,
        "chosen_params_each_fold": chosen_params_each_fold
    }


# ============== 使用示例 ==============
# 假设 X, y 已定义为 numpy 数组且标签为 {0,1}
results = nested_cv(X, y)
print(results["mean_acc"], results["std_acc"])


In [None]:
import numpy as np
from sklearn.svm import SVC
from itertools import product
from collections import Counter

# ============== 可配置区域 ==============
PARAM_GRID_SVC = {
    "C": [0.1, 1.0, 10.0],
    "gamma": [0.01, 0.1, 1.0],
    "kernel": ["rbf"]
}
OUTER_K = 10
INNER_K = 3
SEED = 42
# ======================================


# -------------------------
# 基础工具：K折、二分类度量（从零实现）
# -------------------------
def simple_kfold_indices(n_samples, k, rng):
    idx = np.arange(n_samples)
    rng.shuffle(idx)
    return np.array_split(idx, k)

def confusion_matrix_binary(y_true, y_pred):
    y_true = np.asarray(y_true).astype(int)
    y_pred = np.asarray(y_pred).astype(int)
    tp = int(np.sum((y_true == 1) & (y_pred == 1)))
    tn = int(np.sum((y_true == 0) & (y_pred == 0)))
    fp = int(np.sum((y_true == 0) & (y_pred == 1)))
    fn = int(np.sum((y_true == 1) & (y_pred == 0)))
    return tp, fp, fn, tn  # 常用顺序：tp, fp, fn, tn

def precision_recall_f1_acc_from_conf(tp, fp, fn, tn, eps=1e-12):
    precision = tp / (tp + fp + eps)
    recall    = tp / (tp + fn + eps)
    accuracy  = (tp + tn) / (tp + tn + fp + fn + eps)
    f1        = 2 * precision * recall / (precision + recall + eps)
    return precision, recall, f1, accuracy

def predict_from_scores(y_scores, threshold=0.5):
    # y_scores 是正类(1)的概率或分数；这里用概率，阈值0.5
    return (y_scores >= threshold).astype(int)

def roc_curve_points(y_true, y_scores):
    """
    从零实现 ROC 曲线点：
    - y_true in {0,1}
    - y_scores: 正类分数/概率（越大越偏正）
    返回：fpr_array, tpr_array（都从0到1），按阈值从高到低扫描
    """
    y_true = np.asarray(y_true).astype(int)
    y_scores = np.asarray(y_scores)
    # 排序：分数从大到小
    order = np.argsort(-y_scores)
    y_true_sorted = y_true[order]
    y_scores_sorted = y_scores[order]

    P = np.sum(y_true_sorted == 1)
    N = np.sum(y_true_sorted == 0)
    if P == 0 or N == 0:
        # 极端情况：单类数据（理论上不会发生，但防御性处理）
        return np.array([0.0, 1.0]), np.array([0.0, 1.0])

    # 扫描阈值：在每个独特分数处更新
    tps = 0
    fps = 0
    fpr_list = [0.0]
    tpr_list = [0.0]
    prev_score = np.inf

    for i in range(len(y_scores_sorted)):
        score = y_scores_sorted[i]
        true_label = y_true_sorted[i]
        # 每次“越过”一个新的阈值（score变化）才记录一次点也可；
        # 但经典实现是逐样本累积计数（最终仍会落在相同曲线）。
        if true_label == 1:
            tps += 1
        else:
            fps += 1
        fpr = fps / N
        tpr = tps / P
        fpr_list.append(fpr)
        tpr_list.append(tpr)
        prev_score = score

    # 确保最后到 (1,1)
    if fpr_list[-1] != 1.0 or tpr_list[-1] != 1.0:
        fpr_list.append(1.0)
        tpr_list.append(1.0)

    return np.array(fpr_list), np.array(tpr_list)

def auc_trapezoid(x, y):
    """
    从零实现 AUC（梯形积分）。x应为单调非减（FPR），y为对应TPR。
    """
    x = np.asarray(x)
    y = np.asarray(y)
    # 排序（防御性，通常已是递增）
    order = np.argsort(x)
    x, y = x[order], y[order]
    area = 0.0
    for i in range(1, len(x)):
        dx = x[i] - x[i-1]
        avg_y = (y[i] + y[i-1]) / 2.0
        area += dx * avg_y
    return float(area)


# -------------------------
# 调参 & 嵌套交叉验证（以 F1 为内层目标）
# -------------------------
def make_svc(params):
    # 为了 ROC/AUC，需要概率
    kwargs = {k: v for k, v in params.items() if k in {"C", "kernel", "gamma"}}
    kwargs["probability"] = True
    return SVC(**kwargs)

def param_grid_iter(param_grid):
    keys = list(param_grid.keys())
    for values in product(*[param_grid[k] for k in keys]):
        yield dict(zip(keys, values))

def inner_cv_select_params(X, y, train_idx, inner_k, param_grid, rng, make_model_fn):
    # 内层折
    inner_folds_local = simple_kfold_indices(len(train_idx), inner_k, rng)
    inner_folds = [train_idx[f] for f in inner_folds_local]

    best_score = -np.inf
    best_param = None

    for params in param_grid_iter(param_grid):
        scores = []
        for f in range(inner_k):
            val_idx = inner_folds[f]
            tr_idx  = np.concatenate([inner_folds[j] for j in range(inner_k) if j != f])

            model = make_model_fn(params)
            model.fit(X[tr_idx], y[tr_idx])

            # 使用概率来计算阈值0.5的预测 + F1
            y_scores = model.predict_proba(X[val_idx])[:, 1]
            y_pred = predict_from_scores(y_scores, threshold=0.5)

            tp, fp, fn, tn = confusion_matrix_binary(y[val_idx], y_pred)
            precision, recall, f1, acc = precision_recall_f1_acc_from_conf(tp, fp, fn, tn)
            scores.append(f1)

        avg_f1 = float(np.mean(scores))
        if avg_f1 > best_score:
            best_score = avg_f1
            best_param = params

    return best_param  # 不做tie处理

def nested_cv(X, y,
              outer_k=OUTER_K,
              inner_k=INNER_K,
              param_grid=PARAM_GRID_SVC,
              make_model_fn=make_svc,
              seed=SEED):

    rng = np.random.default_rng(seed)
    folds = simple_kfold_indices(len(X), outer_k, rng)

    # 逐折记录
    outer_metrics = []   # 每折的度量字典
    chosen_params_each_fold = []

    for i in range(outer_k):
        test_idx = folds[i]
        train_idx = np.concatenate([folds[j] for j in range(outer_k) if j != i])

        # 1) 内层用 F1 选最优参数
        best_param = inner_cv_select_params(X, y, train_idx, inner_k, param_grid, rng, make_model_fn)
        chosen_params_each_fold.append(best_param)

        # 2) 用最优参数训练并在外层测试集上评估（从零计算所有度量）
        model = make_model_fn(best_param)
        model.fit(X[train_idx], y[train_idx])

        y_scores = model.predict_proba(X[test_idx])[:, 1]
        y_pred   = predict_from_scores(y_scores, threshold=0.5)

        tp, fp, fn, tn = confusion_matrix_binary(y[test_idx], y_pred)
        precision, recall, f1, acc = precision_recall_f1_acc_from_conf(tp, fp, fn, tn)
        fpr, tpr = roc_curve_points(y[test_idx], y_scores)
        auc = auc_trapezoid(fpr, tpr)

        fold_metrics = {
            "fold": i + 1,
            "best_params": best_param,
            "tp": tp, "fp": fp, "fn": fn, "tn": tn,
            "precision": precision,
            "recall": recall,
            "f1": f1,
            "accuracy": acc,
            "fpr": fpr,     # numpy arrays
            "tpr": tpr,     # numpy arrays
            "auc": auc
        }
        outer_metrics.append(fold_metrics)

        print(f"[Outer Fold {i+1}/{outer_k}] "
              f"best_params={best_param} | "
              f"Acc={acc:.4f} Prec={precision:.4f} Rec={recall:.4f} F1={f1:.4f} AUC={auc:.4f}")

    # 汇总统计（AUC 和各指标取均值/方差）
    accs = [m["accuracy"] for m in outer_metrics]
    precs = [m["precision"] for m in outer_metrics]
    recs = [m["recall"] for m in outer_metrics]
    f1s = [m["f1"] for m in outer_metrics]
    aucs = [m["auc"] for m in outer_metrics]

    print("\n=== Final (Outer-CV) Summary ===")
    print(f"Accuracy  : mean={np.mean(accs):.4f}, std={np.std(accs):.44f}")
    print(f"Precision : mean={np.mean(precs):.4f}, std={np.std(precs):.4f}")
    print(f"Recall    : mean={np.mean(recs):.4f}, std={np.std(recs):.4f}")
    print(f"F1        : mean={np.mean(f1s):.4f}, std={np.std(f1s):.4f}")
    print(f"AUC       : mean={np.mean(aucs):.4f}, std={np.std(aucs):.4f}")

    return {
        "outer_metrics": outer_metrics,  # 含每折详细结果（混淆矩阵、ROC点、AUC等）
        "chosen_params_each_fold": chosen_params_each_fold,
        "summary": {
            "accuracy_mean": float(np.mean(accs)),
            "accuracy_std": float(np.std(accs)),
            "precision_mean": float(np.mean(precs)),
            "precision_std": float(np.std(precs)),
            "recall_mean": float(np.mean(recs)),
            "recall_std": float(np.std(recs)),
            "f1_mean": float(np.mean(f1s)),
            "f1_std": float(np.std(f1s)),
            "auc_mean": float(np.mean(aucs)),
            "auc_std": float(np.std(aucs)),
        }
    }


# ============== 使用示例 ==============
# 假设 X, y 已定义（y 为 {0,1}），直接运行：
results = nested_cv(X, y)
print(results["summary"])

# 你也可以像这样拿到某一折的 ROC 点去画图（绘图允许用第三方库）：
import matplotlib.pyplot as plt
fpr, tpr = results["outer_metrics"][0]["fpr"], results["outer_metrics"][0]["tpr"]
plt.plot(fpr, tpr, label=f"Fold1 AUC={results['outer_metrics'][0]['auc']:.3f}")
plt.plot([0,1], [0,1], '--')
plt.xlabel("FPR"); plt.ylabel("TPR"); plt.title("ROC Curve (Fold 1)")
plt.legend(); plt.show()
