# CBU5201 Mini-Project
# 1. Author

**Student Name:** Minghui Pan

**Student ID:** 231220208

In [None]:
from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Tuple

# ---------------------------------------------------------------------
# 0) Environment setup (keeps CPU threading stable inside notebooks)
# ---------------------------------------------------------------------
os.environ.setdefault("OMP_NUM_THREADS", "1")
os.environ.setdefault("OPENBLAS_NUM_THREADS", "1")
os.environ.setdefault("MKL_NUM_THREADS", "1")
os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1")
os.environ.setdefault("NUMEXPR_NUM_THREADS", "1")
os.environ.setdefault("NUMBA_DISABLE_JIT", "1")

import librosa
import numpy as np
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Tuple

# ---------------------------------------------------------------------
# 1) Paths and constants (edit for your machine)
# ---------------------------------------------------------------------
DATASET_DIR = Path("/Users/panmingh/Code/ML_Coursework/Data/MLEndHWII_sample_800")
DEFAULT_OUTPUT_DIR = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results")

try:
    from tqdm import tqdm
except Exception:  # pragma: no cover - fallback when tqdm is unavailable
    tqdm = None


# ---------------------------------------------------------------------
# 2) Configuration object
# ---------------------------------------------------------------------
@dataclass
class FeatureConfig:
    sr: int = 22050
    n_mfcc: int = 13
    hop_length: int = 512
    fmin: float = librosa.note_to_hz("C2")
    fmax: float = librosa.note_to_hz("C7")
    onset_backtrack: bool = True


# ---------------------------------------------------------------------
# 3) Small numeric helpers
# ---------------------------------------------------------------------
def _safe_stats(x: np.ndarray) -> Tuple[float, float, float, float]:
    if x.size == 0:
        return 0.0, 0.0, 0.0, 0.0
    return float(np.mean(x)), float(np.std(x)), float(np.max(x)), float(np.min(x))


def _safe_mean_std(x: np.ndarray) -> Tuple[float, float]:
    if x.size == 0:
        return 0.0, 0.0
    return float(np.mean(x)), float(np.std(x))


def _nan_to_num(x: np.ndarray) -> np.ndarray:
    return np.nan_to_num(x, nan=0.0, posinf=0.0, neginf=0.0)


# ---------------------------------------------------------------------
# 4) Audio loading
# ---------------------------------------------------------------------
def load_audio(path: str, cfg: FeatureConfig) -> Tuple[np.ndarray, int]:
    y, sr = librosa.load(path, sr=cfg.sr, mono=True)
    if y.size == 0:
        return np.zeros(1, dtype=np.float32), cfg.sr
    return y, sr


# ---------------------------------------------------------------------
# 5) MFCC + delta features
# ---------------------------------------------------------------------
def mfcc_features(y: np.ndarray, cfg: FeatureConfig) -> List[float]:
    mfcc = librosa.feature.mfcc(
        y=y, sr=cfg.sr, n_mfcc=cfg.n_mfcc, hop_length=cfg.hop_length
    )
    delta = librosa.feature.delta(mfcc)

    mfcc = _nan_to_num(mfcc)
    delta = _nan_to_num(delta)

    feats: List[float] = []
    feats.extend(np.mean(mfcc, axis=1).tolist())
    feats.extend(np.std(mfcc, axis=1).tolist())
    feats.extend(np.mean(delta, axis=1).tolist())
    feats.extend(np.std(delta, axis=1).tolist())
    return feats


# ---------------------------------------------------------------------
# 6) F0 / pitch contour features (pyin)
# ---------------------------------------------------------------------
def f0_features(y: np.ndarray, cfg: FeatureConfig) -> List[float]:
    f0, voiced_flag, voiced_prob = librosa.pyin(
        y,
        fmin=cfg.fmin,
        fmax=cfg.fmax,
        sr=cfg.sr,
        hop_length=cfg.hop_length,
    )
    f0 = _nan_to_num(f0)
    voiced_flag = voiced_flag.astype(np.float32) if voiced_flag is not None else None

    voiced_ratio = 0.0
    if voiced_flag is not None and voiced_flag.size > 0:
        voiced_ratio = float(np.mean(voiced_flag))

    f0_mean, f0_std = _safe_mean_std(f0)
    if f0_std > 0:
        f0_norm = (f0 - f0_mean) / f0_std
    else:
        f0_norm = f0 - f0_mean

    f0n_mean, f0n_std = _safe_mean_std(f0_norm)
    f0n_max, f0n_min = (float(np.max(f0_norm)), float(np.min(f0_norm))) if f0_norm.size else (0.0, 0.0)
    f0n_median = float(np.median(f0_norm)) if f0_norm.size else 0.0

    # Interval in semitones between adjacent frames.
    f0_nonzero = np.where(f0 > 0, f0, np.nan)
    intervals = 12.0 * np.log2(f0_nonzero[1:] / f0_nonzero[:-1])
    intervals = _nan_to_num(intervals)

    int_mean, int_std, int_max, int_min = _safe_stats(intervals)
    int_median = float(np.median(intervals)) if intervals.size else 0.0
    int_iqr = float(np.percentile(intervals, 75) - np.percentile(intervals, 25)) if intervals.size else 0.0
    int_abs_mean = float(np.mean(np.abs(intervals))) if intervals.size else 0.0
    int_abs_std = float(np.std(np.abs(intervals))) if intervals.size else 0.0
    int_pos_ratio = float(np.mean(intervals > 0)) if intervals.size else 0.0
    int_neg_ratio = float(np.mean(intervals < 0)) if intervals.size else 0.0

    # Melodic contour proportions.
    eps = 1e-4
    up_ratio = float(np.mean(intervals > eps)) if intervals.size else 0.0
    down_ratio = float(np.mean(intervals < -eps)) if intervals.size else 0.0
    flat_ratio = float(np.mean(np.abs(intervals) <= eps)) if intervals.size else 0.0

    feats = [
        f0n_mean,
        f0n_std,
        f0n_max,
        f0n_min,
        f0n_median,
        int_mean,
        int_std,
        int_max,
        int_min,
        int_median,
        int_iqr,
        int_abs_mean,
        int_abs_std,
        int_pos_ratio,
        int_neg_ratio,
        up_ratio,
        down_ratio,
        flat_ratio,
        voiced_ratio,
        float(np.mean(_nan_to_num(voiced_prob))) if voiced_prob is not None else 0.0,
    ]
    return feats


# ---------------------------------------------------------------------
# 7) Rhythm / onset features
# ---------------------------------------------------------------------
def _estimate_tempo(onset_env: np.ndarray, cfg: FeatureConfig) -> float:
    if onset_env.size < 2:
        return 0.0
    onset_env = onset_env - np.mean(onset_env)
    if np.allclose(onset_env, 0.0):
        return 0.0
    ac = np.correlate(onset_env, onset_env, mode="full")[onset_env.size - 1 :]
    min_bpm, max_bpm = 30.0, 240.0
    min_lag = int((60.0 * cfg.sr) / (max_bpm * cfg.hop_length))
    max_lag = int((60.0 * cfg.sr) / (min_bpm * cfg.hop_length))
    min_lag = max(min_lag, 1)
    max_lag = min(max_lag, ac.size - 1)
    if max_lag <= min_lag:
        return 0.0
    lag = int(np.argmax(ac[min_lag : max_lag + 1]) + min_lag)
    return float(60.0 * cfg.sr / (cfg.hop_length * lag))


def rhythm_features(y: np.ndarray, cfg: FeatureConfig) -> List[float]:
    onset_env = librosa.onset.onset_strength(y=y, sr=cfg.sr, hop_length=cfg.hop_length)
    onset_frames = librosa.onset.onset_detect(
        onset_envelope=onset_env,
        sr=cfg.sr,
        hop_length=cfg.hop_length,
        backtrack=cfg.onset_backtrack,
    )
    onset_times = librosa.frames_to_time(onset_frames, sr=cfg.sr, hop_length=cfg.hop_length)
    ioi = np.diff(onset_times)
    ioi_mean, ioi_std, ioi_max, ioi_min = _safe_stats(ioi)

    tempo = _estimate_tempo(onset_env, cfg)
    duration = float(len(y)) / float(cfg.sr) if cfg.sr > 0 else 0.0
    onsets_per_sec = float(len(onset_times) / duration) if duration > 0 else 0.0

    return [
        ioi_mean,
        ioi_std,
        ioi_max,
        ioi_min,
        tempo,
        onsets_per_sec,
    ]


# ---------------------------------------------------------------------
# 8) Full feature vector helpers
# ---------------------------------------------------------------------
def extract_features(path: str, cfg: FeatureConfig | None = None) -> np.ndarray:
    cfg = cfg or FeatureConfig()
    y, _ = load_audio(path, cfg)
    feats: List[float] = []
    feats.extend(mfcc_features(y, cfg))
    feats.extend(f0_features(y, cfg))
    feats.extend(rhythm_features(y, cfg))
    return np.asarray(feats, dtype=np.float32)


def batch_extract(
    paths: Iterable[str], cfg: FeatureConfig | None = None
) -> Tuple[np.ndarray, List[str]]:
    cfg = cfg or FeatureConfig()
    features: List[np.ndarray] = []
    ok_paths: List[str] = []
    path_list = list(paths)
    if tqdm is not None:
        iterator = tqdm(path_list, desc="Extracting features", unit="file")
    else:
        iterator = path_list
    for idx, p in enumerate(iterator, start=1):
        feats = extract_features(p, cfg)
        features.append(feats)
        ok_paths.append(p)
        if tqdm is None and idx % 50 == 0:
            print(f"Processed {idx}/{len(path_list)} files...")
    return np.vstack(features), ok_paths


def feature_names(cfg: FeatureConfig | None = None) -> List[str]:
    cfg = cfg or FeatureConfig()
    names: List[str] = []
    for i in range(cfg.n_mfcc):
        names.append(f"mfcc_mean_{i+1}")
    for i in range(cfg.n_mfcc):
        names.append(f"mfcc_std_{i+1}")
    for i in range(cfg.n_mfcc):
        names.append(f"mfcc_delta_mean_{i+1}")
    for i in range(cfg.n_mfcc):
        names.append(f"mfcc_delta_std_{i+1}")

    names.extend(
        [
            "f0n_mean",
            "f0n_std",
            "f0n_max",
            "f0n_min",
            "f0n_median",
            "interval_mean",
            "interval_std",
            "interval_max",
            "interval_min",
            "interval_median",
            "interval_iqr",
            "interval_abs_mean",
            "interval_abs_std",
            "interval_pos_ratio",
            "interval_neg_ratio",
            "contour_up_ratio",
            "contour_down_ratio",
            "contour_flat_ratio",
            "voiced_ratio",
            "voiced_prob_mean",
        ]
    )

    names.extend(
        [
            "ioi_mean",
            "ioi_std",
            "ioi_max",
            "ioi_min",
            "tempo",
            "onsets_per_sec",
        ]
    )
    return names


def as_dict(path: str, cfg: FeatureConfig | None = None) -> Dict[str, float]:
    cfg = cfg or FeatureConfig()
    feats = extract_features(path, cfg)
    names = feature_names(cfg)
    return {k: float(v) for k, v in zip(names, feats)}


# ---------------------------------------------------------------------
# 9) Dataset helpers (metadata + saving)
# ---------------------------------------------------------------------
def _parse_metadata(path: Path) -> Dict[str, str]:
    stem = path.stem
    parts = stem.split("_")
    meta = {"subject": "", "mode": "", "take": "", "song": ""}
    if len(parts) >= 4:
        meta["subject"] = parts[0]
        meta["mode"] = parts[1]
        meta["take"] = parts[2]
        meta["song"] = "_".join(parts[3:])
    else:
        meta["song"] = stem
    return meta


def collect_wav_paths(data_dir: Path) -> List[Path]:
    return sorted([p for p in data_dir.glob("*.wav") if p.is_file()])


def save_features(
    features: np.ndarray,
    paths: List[str],
    names: List[str],
    meta: List[Dict[str, str]],
    out_dir: Path,
    prefix: str,
) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)
    npz_path = out_dir / f"{prefix}.npz"
    csv_path = out_dir / f"{prefix}.csv"

    labels = [m.get("song", "") for m in meta]
    np.savez_compressed(
        npz_path,
        X=features,
        labels=np.asarray(labels),
        paths=np.asarray(paths),
        feature_names=np.asarray(names),
        subject=np.asarray([m.get("subject", "") for m in meta]),
        mode=np.asarray([m.get("mode", "") for m in meta]),
        take=np.asarray([m.get("take", "") for m in meta]),
    )

    header = ["path", "label", "subject", "mode", "take"] + names
    with csv_path.open("w", encoding="utf-8", newline="") as f:
        f.write(",".join(header) + "\n")
        for row_idx, p in enumerate(paths):
            row = [
                p,
                labels[row_idx],
                meta[row_idx].get("subject", ""),
                meta[row_idx].get("mode", ""),
                meta[row_idx].get("take", ""),
            ]
            feat_str = [f"{v:.8f}" for v in features[row_idx].tolist()]
            f.write(",".join(row + feat_str) + "\n")
# NOTE: We already extracted and saved the features offline, so re-running
# this block is unnecessary and can be time-consuming.
# cfg = FeatureConfig()
# wav_paths = collect_wav_paths(DATASET_DIR)
# features, ok_paths = batch_extract([str(p) for p in wav_paths], cfg)
# meta = [_parse_metadata(Path(p)) for p in ok_paths]
# names = feature_names(cfg)
# save_features(features, ok_paths, names, meta, DEFAULT_OUTPUT_DIR, "features")


In [None]:
##数据增强代码
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Tuple

import librosa
import numpy as np
from scipy.io import wavfile

# ---------------------------------------------------------------------
# 1) Paths (edit for your machine)
# ---------------------------------------------------------------------
DATASET_DIR = Path("/Users/panmingh/Code/ML_Coursework/Data/MLEndHWII_sample_800")
DEFAULT_OUTPUT_DIR = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/data/augmented")


# ---------------------------------------------------------------------
# 2) Augmentation config
# ---------------------------------------------------------------------
@dataclass
class AugmentConfig:
    sr: int = 22050
    pitch_shift_steps: Tuple[int, int] = (-3, 3)
    time_stretch_range: Tuple[float, float] = (0.9, 1.1)
    snr_db_range: Tuple[float, float] = (20.0, 40.0)


# ---------------------------------------------------------------------
# 3) Audio loading
# ---------------------------------------------------------------------
def load_audio(path: str | Path, cfg: AugmentConfig) -> np.ndarray:
    y, _ = librosa.load(str(path), sr=cfg.sr, mono=True)
    if y.size == 0:
        return np.zeros(1, dtype=np.float32)
    return y.astype(np.float32, copy=False)


# ---------------------------------------------------------------------
# 4) Augmentation ops
# ---------------------------------------------------------------------
def pitch_shift(y: np.ndarray, cfg: AugmentConfig, rng: np.random.Generator) -> np.ndarray:
    steps = rng.integers(cfg.pitch_shift_steps[0], cfg.pitch_shift_steps[1] + 1)
    if steps == 0:
        return y
    return librosa.effects.pitch_shift(y, sr=cfg.sr, n_steps=int(steps))


def time_stretch(y: np.ndarray, cfg: AugmentConfig, rng: np.random.Generator) -> np.ndarray:
    rate = rng.uniform(cfg.time_stretch_range[0], cfg.time_stretch_range[1])
    if np.isclose(rate, 1.0):
        return y
    return librosa.effects.time_stretch(y, rate=float(rate))


def add_noise(y: np.ndarray, cfg: AugmentConfig, rng: np.random.Generator) -> np.ndarray:
    snr_db = rng.uniform(cfg.snr_db_range[0], cfg.snr_db_range[1])
    if y.size == 0:
        return y
    signal_power = np.mean(y**2) + 1e-12
    snr_linear = 10 ** (snr_db / 10.0)
    noise_power = signal_power / snr_linear
    noise = rng.normal(0.0, np.sqrt(noise_power), size=y.shape).astype(np.float32)
    return y + noise


def augment_sample(
    y: np.ndarray, cfg: AugmentConfig, rng: np.random.Generator
) -> List[np.ndarray]:
    variants = []
    variants.append(pitch_shift(y, cfg, rng))
    variants.append(time_stretch(y, cfg, rng))
    variants.append(add_noise(y, cfg, rng))
    return variants


# ---------------------------------------------------------------------
# 5) Batch helpers
# ---------------------------------------------------------------------
def augment_path(
    path: str | Path,
    cfg: AugmentConfig | None = None,
    seed: int | None = None,
) -> List[np.ndarray]:
    cfg = cfg or AugmentConfig()
    rng = np.random.default_rng(seed)
    y = load_audio(path, cfg)
    return augment_sample(y, cfg, rng)


def batch_augment(
    paths: Iterable[str | Path],
    cfg: AugmentConfig | None = None,
    seed: int | None = None,
) -> List[Tuple[Path, List[np.ndarray]]]:
    cfg = cfg or AugmentConfig()
    rng = np.random.default_rng(seed)
    out: List[Tuple[Path, List[np.ndarray]]] = []
    for p in paths:
        path = Path(p)
        y = load_audio(path, cfg)
        out.append((path, augment_sample(y, cfg, rng)))
    return out


# ---------------------------------------------------------------------
# 6) Saving to .wav (optional)
# ---------------------------------------------------------------------
def _normalize_to_int16(y: np.ndarray) -> np.ndarray:
    if y.size == 0:
        return np.zeros(1, dtype=np.int16)
    peak = np.max(np.abs(y))
    if peak <= 0:
        return np.zeros_like(y, dtype=np.int16)
    y = y / peak
    return (y * 32767.0).astype(np.int16)


def save_augmented(
    src_path: Path,
    variants: List[np.ndarray],
    out_dir: Path,
    cfg: AugmentConfig,
) -> None:
    out_dir.mkdir(parents=True, exist_ok=True)
    stem = src_path.stem
    suffixes = ["ps", "ts", "noise"]
    for idx, y_aug in enumerate(variants):
        tag = suffixes[idx] if idx < len(suffixes) else f"aug{idx+1}"
        out_path = out_dir / f"{stem}_{tag}.wav"
        wavfile.write(out_path, cfg.sr, _normalize_to_int16(y_aug))


In [None]:
##数据集分割代码
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
from sklearn.model_selection import GroupShuffleSplit

# ---------------------------------------------------------------------
# 1) Paths (edit for your machine)
# ---------------------------------------------------------------------
DATASET_DIR = Path("/Users/panmingh/Code/ML_Coursework/Data/MLEndHWII_sample_800")
DEFAULT_FEATURES = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results/features.npz")
DEFAULT_OUT_DIR = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results/splits")

try:
    from tqdm import tqdm
except Exception:  # pragma: no cover - fallback when tqdm is unavailable
    tqdm = None


# ---------------------------------------------------------------------
# 2) Config for subject-wise split
# ---------------------------------------------------------------------
@dataclass
class SplitConfig:
    train_ratio: float = 0.7
    val_ratio: float = 0.15
    test_ratio: float = 0.15
    seed: int = 42


# ---------------------------------------------------------------------
# 3) IO helpers for .npz features
# ---------------------------------------------------------------------
def load_features_npz(path: Path) -> Dict[str, np.ndarray]:
    data = np.load(path, allow_pickle=True)
    return {k: data[k] for k in data.files}


def save_features_npz(
    out_path: Path,
    X: np.ndarray,
    labels: np.ndarray,
    paths: np.ndarray,
    feature_names: np.ndarray,
    subject: np.ndarray,
    mode: np.ndarray,
    take: np.ndarray,
) -> None:
    out_path.parent.mkdir(parents=True, exist_ok=True)
    np.savez_compressed(
        out_path,
        X=X,
        labels=labels,
        paths=paths,
        feature_names=feature_names,
        subject=subject,
        mode=mode,
        take=take,
    )


# ---------------------------------------------------------------------
# 4) Split by subject (group split)
# ---------------------------------------------------------------------
def split_by_subject(
    subjects: np.ndarray, cfg: SplitConfig
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    if not np.isclose(cfg.train_ratio + cfg.val_ratio + cfg.test_ratio, 1.0):
        raise ValueError("Train/val/test ratios must sum to 1.0")

    gss = GroupShuffleSplit(
        n_splits=1, train_size=cfg.train_ratio, random_state=cfg.seed
    )
    idx = np.arange(subjects.shape[0])
    train_idx, temp_idx = next(gss.split(idx, groups=subjects))

    temp_subjects = subjects[temp_idx]
    val_ratio = cfg.val_ratio / (cfg.val_ratio + cfg.test_ratio)
    gss2 = GroupShuffleSplit(n_splits=1, train_size=val_ratio, random_state=cfg.seed)
    val_sub_idx, test_sub_idx = next(gss2.split(temp_idx, groups=temp_subjects))
    val_idx = temp_idx[val_sub_idx]
    test_idx = temp_idx[test_sub_idx]
    return train_idx, val_idx, test_idx


# ---------------------------------------------------------------------
# 5) Augment audio and extract features
# ---------------------------------------------------------------------
def extract_features_from_audio(y: np.ndarray, cfg: FeatureConfig) -> np.ndarray:
    feats: List[float] = []
    feats.extend(mfcc_features(y, cfg))
    feats.extend(f0_features(y, cfg))
    feats.extend(rhythm_features(y, cfg))
    return np.asarray(feats, dtype=np.float32)


def augment_and_extract(
    paths: List[str],
    subjects: np.ndarray,
    modes: np.ndarray,
    takes: np.ndarray,
    cfg_feat: FeatureConfig,
    cfg_aug: AugmentConfig,
    seed: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    rng = np.random.default_rng(seed)
    feats_list: List[np.ndarray] = []
    labels_list: List[str] = []
    paths_list: List[str] = []
    subj_list: List[str] = []
    mode_list: List[str] = []
    take_list: List[str] = []

    iterator = tqdm(paths, desc="Augmenting train", unit="file") if tqdm else paths
    for i, p in enumerate(iterator):
        y = load_audio(p, cfg_aug)
        variants = augment_sample(y, cfg_aug, rng)
        base_label = Path(p).stem.split("_", 3)[-1]
        for j, y_aug in enumerate(variants):
            feats_list.append(extract_features_from_audio(y_aug, cfg_feat))
            labels_list.append(base_label)
            paths_list.append(f"{p}::aug{j+1}")
            subj_list.append(str(subjects[i]))
            mode_list.append(str(modes[i]))
            take_list.append(str(takes[i]))
        if tqdm is None and (i + 1) % 50 == 0:
            print(f"Augmented {i+1}/{len(paths)} files...")

    return (
        np.vstack(feats_list) if feats_list else np.zeros((0, 0), dtype=np.float32),
        np.asarray(labels_list),
        np.asarray(paths_list),
        np.asarray(subj_list),
        np.asarray(mode_list),
        np.asarray(take_list),
    )

In [None]:
##特征可视化代码
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

# ---------------------------------------------------------------------
# 1) Paths (keep original .npz path)
# ---------------------------------------------------------------------
DEFAULT_FEATURES = Path(
    "/Users/panmingh/Code/ML_Coursework/MyCourse/results/splits/train_full.npz"
)
DEFAULT_FIG_DIR = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results/figures")


# ---------------------------------------------------------------------
# 2) Config
# ---------------------------------------------------------------------
@dataclass
class VizConfig:
    max_points: int = 2000
    random_seed: int = 42
    tsne_perplexity: float = 30.0
    tsne_iter: int = 1000


# ---------------------------------------------------------------------
# 3) Data loading
# ---------------------------------------------------------------------
def load_features_npz(path: Path) -> pd.DataFrame:
    data = np.load(path, allow_pickle=True)
    features = data["X"]
    labels = data["labels"].astype(str)
    names = data["feature_names"].astype(str).tolist()
    df = pd.DataFrame(features, columns=names)
    df["label"] = labels
    return df


def ensure_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)


def _subset_df(df: pd.DataFrame, cfg: VizConfig) -> pd.DataFrame:
    if len(df) <= cfg.max_points:
        return df
    return df.sample(n=cfg.max_points, random_state=cfg.random_seed)


# ---------------------------------------------------------------------
# 4) Plotting helpers (show inline + optional save)
# ---------------------------------------------------------------------
def _finish_plot(fig: plt.Figure, out_path: Path | None, show: bool) -> None:
    fig.tight_layout()
    if out_path is not None:
        fig.savefig(out_path, dpi=150)
    if show:
        plt.show()
    plt.close(fig)


def plot_box_violin_kde(
    df: pd.DataFrame,
    features: Iterable[str],
    out_dir: Path | None = None,
    show: bool = True,
) -> None:
    if out_dir is not None:
        ensure_dir(out_dir)
    for feat in features:
        if feat not in df.columns:
            continue
        values = df[feat].to_numpy()
        if np.nanstd(values) == 0.0:
            print(f"[warn] skip {feat}: constant values")
            continue
        fig, axes = plt.subplots(1, 3, figsize=(16, 4))
        sns.boxplot(data=df, x="label", y=feat, ax=axes[0])
        axes[0].set_title(f"Box Plot - {feat}")
        axes[0].tick_params(axis="x", rotation=45)

        sns.violinplot(data=df, x="label", y=feat, ax=axes[1], cut=0)
        axes[1].set_title(f"Violin Plot - {feat}")
        axes[1].tick_params(axis="x", rotation=45)

        sns.kdeplot(
            data=df,
            x=feat,
            hue="label",
            ax=axes[2],
            fill=False,
            common_norm=False,
            warn_singular=False,
        )
        axes[2].set_title(f"KDE - {feat}")

        out_path = out_dir / f"{feat}_dist.png" if out_dir is not None else None
        _finish_plot(fig, out_path, show)


def plot_corr_heatmap(
    df: pd.DataFrame, out_dir: Path | None = None, show: bool = True
) -> None:
    if out_dir is not None:
        ensure_dir(out_dir)
    corr = df.drop(columns=["label"]).corr()
    fig, ax = plt.subplots(figsize=(12, 10))
    sns.heatmap(corr, cmap="coolwarm", center=0.0, ax=ax)
    ax.set_title("Feature Correlation Heatmap")
    out_path = out_dir / "correlation_heatmap.png" if out_dir is not None else None
    _finish_plot(fig, out_path, show)


def plot_pca_tsne(
    df: pd.DataFrame,
    out_dir: Path | None = None,
    cfg: VizConfig | None = None,
    show: bool = True,
) -> None:
    if out_dir is not None:
        ensure_dir(out_dir)
    cfg = cfg or VizConfig()
    df_sub = _subset_df(df, cfg)
    X = df_sub.drop(columns=["label"]).to_numpy()
    y = df_sub["label"].to_numpy()
    if X.shape[0] < 2:
        print("[warn] skip PCA/t-SNE: not enough samples")
        return
    std = np.nanstd(X, axis=0)
    keep = std > 1e-8
    if not np.any(keep):
        print("[warn] skip PCA/t-SNE: all features are constant")
        return
    X = X[:, keep]
    if np.unique(X, axis=0).shape[0] < 2:
        print("[warn] skip PCA/t-SNE: only one unique sample")
        return

    pca = PCA(n_components=2, random_state=cfg.random_seed)
    X_pca = pca.fit_transform(X)

    fig, ax = plt.subplots(figsize=(7, 6))
    sns.scatterplot(x=X_pca[:, 0], y=X_pca[:, 1], hue=y, s=30, ax=ax)
    ax.set_title("PCA (2D)")
    ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
    out_path = out_dir / "pca_2d.png" if out_dir is not None else None
    _finish_plot(fig, out_path, show)

    tsne_kwargs = dict(
        n_components=2,
        perplexity=cfg.tsne_perplexity,
        random_state=cfg.random_seed,
        init="pca",
        learning_rate="auto",
    )
    try:
        tsne = TSNE(max_iter=cfg.tsne_iter, **tsne_kwargs)
    except TypeError:
        tsne = TSNE(n_iter=cfg.tsne_iter, **tsne_kwargs)
    X_tsne = tsne.fit_transform(X)
    fig, ax = plt.subplots(figsize=(7, 6))
    sns.scatterplot(x=X_tsne[:, 0], y=X_tsne[:, 1], hue=y, s=30, ax=ax)
    ax.set_title("t-SNE (2D)")
    ax.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
    out_path = out_dir / "tsne_2d.png" if out_dir is not None else None
    _finish_plot(fig, out_path, show)


def default_feature_list() -> List[str]:
    return [
        "interval_mean",
        "interval_std",
        "tempo",
        "ioi_std",
        "f0n_std",
        "mfcc_mean_1",
    ]
df = load_features_npz(DEFAULT_FEATURES)
feats = default_feature_list()
plot_box_violin_kde(df, feats, out_dir=None, show=True)
plot_corr_heatmap(df, out_dir=None, show=True)
plot_pca_tsne(df, out_dir=None, show=True)

In [None]:
##无监督聚类评估代码
from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.cluster import KMeans
from sklearn.metrics import (
    adjusted_rand_score,
    normalized_mutual_info_score,
    silhouette_score,
)
from sklearn.metrics.cluster import contingency_matrix

# ---------------------------------------------------------------------
# 1) Paths (keep original .npz path)
# ---------------------------------------------------------------------
DEFAULT_FEATURES = Path(
    "/Users/panmingh/Code/ML_Coursework/MyCourse/results/splits/train_full.npz"
)
DEFAULT_OUT_DIR = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results")


# ---------------------------------------------------------------------
# 2) Config
# ---------------------------------------------------------------------
@dataclass
class UnsupervisedConfig:
    n_clusters: int = 8
    random_seed: int = 42
    max_points: int = 2000


# ---------------------------------------------------------------------
# 3) Data helpers
# ---------------------------------------------------------------------
def load_features_npz(path: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    data = np.load(path, allow_pickle=True)
    X = data["X"]
    labels = data["labels"].astype(str)
    feature_names = data["feature_names"].astype(str)
    return X, labels, feature_names


def subset_data(
    X: np.ndarray, y: np.ndarray, cfg: UnsupervisedConfig
) -> Tuple[np.ndarray, np.ndarray]:
    if X.shape[0] <= cfg.max_points:
        return X, y
    rng = np.random.default_rng(cfg.random_seed)
    idx = rng.choice(X.shape[0], size=cfg.max_points, replace=False)
    return X[idx], y[idx]


# ---------------------------------------------------------------------
# 4) Metrics + plots (show inline + optional save)
# ---------------------------------------------------------------------
def compute_metrics(X: np.ndarray, y: np.ndarray, clusters: np.ndarray) -> Dict[str, float]:
    nmi = normalized_mutual_info_score(y, clusters)
    ari = adjusted_rand_score(y, clusters)
    sil = silhouette_score(X, clusters) if len(np.unique(clusters)) > 1 else 0.0
    return {"nmi": float(nmi), "ari": float(ari), "silhouette": float(sil)}


def _finish_plot(fig: plt.Figure, out_path: Path | None, show: bool) -> None:
    fig.tight_layout()
    if out_path is not None:
        fig.savefig(out_path, dpi=150)
    if show:
        plt.show()
    plt.close(fig)


def plot_confusion(
    y: np.ndarray,
    clusters: np.ndarray,
    out_path: Path | None = None,
    show: bool = True,
) -> None:
    labels = np.unique(y)
    conf = contingency_matrix(y, clusters)
    row_sums = conf.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0] = 1
    conf_norm = conf / row_sums
    df = pd.DataFrame(
        conf_norm, index=labels, columns=[f"C{i}" for i in range(conf.shape[1])]
    )
    fig, ax = plt.subplots(figsize=(8, 6))
    sns.heatmap(
        df,
        annot=True,
        fmt=".2f",
        cmap="Blues",
        vmin=0.0,
        vmax=1.0,
        ax=ax,
    )
    ax.set_title("Cluster vs Label Confusion (Row-Normalized)")
    ax.set_xlabel("Cluster")
    ax.set_ylabel("Label")
    _finish_plot(fig, out_path, show)


def save_metrics(metrics: Dict[str, float], out_dir: Path | None = None) -> None:
    if out_dir is None:
        return
    out_dir.mkdir(parents=True, exist_ok=True)
    metrics_path = out_dir / "unsupervised_metrics.json"
    with metrics_path.open("w", encoding="utf-8") as f:
        json.dump(metrics, f, ensure_ascii=False, indent=2)
# ---------------------------------------------------------------------
# 5) Example "notebook flow" (copy into cells if desired)
# ---------------------------------------------------------------------
cfg = UnsupervisedConfig(n_clusters=8)
X, y, _ = load_features_npz(DEFAULT_FEATURES)
X, y = subset_data(X, y, cfg)
kmeans = KMeans(n_clusters=cfg.n_clusters, random_state=cfg.random_seed, n_init=10)
clusters = kmeans.fit_predict(X)
metrics = compute_metrics(X, y, clusters)
metrics
plot_confusion(y, clusters, out_path=None, show=True)
save_metrics(metrics, DEFAULT_OUT_DIR)


In [None]:
###有监督分类训练代码

from __future__ import annotations

import json
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Tuple

import numpy as np
from joblib import dump
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
from sklearn.model_selection import StratifiedKFold
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, StandardScaler

try:
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, Dataset
except Exception:
    torch = None

try:
    from tqdm import tqdm
except Exception:  # pragma: no cover
    tqdm = None

import librosa

# ---------------------------------------------------------------------
# 1) Paths (keep original .npz paths)
# ---------------------------------------------------------------------
DEFAULT_TRAIN = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results/splits/train_full.npz")
DEFAULT_VAL = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results/splits/val.npz")
DEFAULT_OUT = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results")
DEFAULT_MODEL = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/models/final_model.pkl")
DEFAULT_MODEL_DIR = DEFAULT_MODEL.parent
DEFAULT_MODEL_PATHS = {
    "knn": DEFAULT_MODEL_DIR / "knn_model.pkl",
    "rf": DEFAULT_MODEL_DIR / "rf_model.pkl",
    "mlp": DEFAULT_MODEL_DIR / "mlp_model.pkl",
    "cnn": DEFAULT_MODEL_DIR / "cnn_model.pkl",
    "final": DEFAULT_MODEL,
}


# ---------------------------------------------------------------------
# 2) Config
# ---------------------------------------------------------------------
@dataclass
class TrainConfig:
    n_splits: int = 5
    random_seed: int = 42


@dataclass
class MLPStableConfig:
    hidden_layer_sizes: Tuple[int, int] = (128, 64)
    alpha: float = 1e-3
    lr: float = 1e-3
    epochs: int = 200


# ---------------------------------------------------------------------
# 3) Loading + logging
# ---------------------------------------------------------------------
def load_features(path: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    data = np.load(path, allow_pickle=True)
    X = data["X"].astype(np.float32)
    y = data["labels"].astype(str)
    paths = data["paths"].astype(str)
    return X, y, paths


def log_feature_stats(name: str, X: np.ndarray) -> None:
    finite_mask = np.isfinite(X)
    finite_ratio = float(np.mean(finite_mask))
    print(
        f"[{name}] shape={X.shape} finite={finite_ratio:.3f} "
        f"min={np.nanmin(X):.4f} max={np.nanmax(X):.4f}"
    )


def log_label_stats(name: str, y: np.ndarray) -> None:
    unique, counts = np.unique(y, return_counts=True)
    dist = ", ".join([f"{u}:{c}" for u, c in zip(unique, counts)])
    print(f"[{name}] samples={len(y)} classes={len(unique)} dist={{ {dist} }}")


# ---------------------------------------------------------------------
# 4) Preprocessing helpers
# ---------------------------------------------------------------------
def sanitize_features(
    X_train: np.ndarray, X_val: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
    X_train = np.nan_to_num(X_train, nan=0.0, posinf=0.0, neginf=0.0)
    X_val = np.nan_to_num(X_val, nan=0.0, posinf=0.0, neginf=0.0)
    low = np.percentile(X_train, 1, axis=0)
    high = np.percentile(X_train, 99, axis=0)
    X_train = np.clip(X_train, low, high)
    X_val = np.clip(X_val, low, high)
    return X_train, X_val


def compute_clip_bounds(
    X: np.ndarray, low_q: float = 1.0, high_q: float = 99.0
) -> Tuple[np.ndarray, np.ndarray]:
    low = np.percentile(X, low_q, axis=0)
    high = np.percentile(X, high_q, axis=0)
    return low, high


def apply_clip(X: np.ndarray, low: np.ndarray, high: np.ndarray) -> np.ndarray:
    return np.clip(X, low, high)


def preprocess_mlp(
    X_train: np.ndarray, X_val: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, StandardScaler, Tuple[np.ndarray, np.ndarray]]:
    X_train = np.nan_to_num(X_train, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float64)
    X_val = np.nan_to_num(X_val, nan=0.0, posinf=0.0, neginf=0.0).astype(np.float64)
    low, high = compute_clip_bounds(X_train, low_q=1.0, high_q=99.0)
    X_train = apply_clip(X_train, low, high)
    X_val = apply_clip(X_val, low, high)

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val = scaler.transform(X_val)

    X_train = np.clip(X_train, -3.0, 3.0)
    X_val = np.clip(X_val, -3.0, 3.0)
    return X_train, X_val, scaler, (low, high)


# ---------------------------------------------------------------------
# 5) Metrics
# ---------------------------------------------------------------------
def macro_auc(y_true: np.ndarray, y_proba: np.ndarray, classes: List[int]) -> float:
    try:
        return float(
            roc_auc_score(
                y_true,
                y_proba,
                multi_class="ovr",
                average="macro",
                labels=classes,
            )
        )
    except Exception:
        return float("nan")


def evaluate_metrics(
    y_true: np.ndarray, y_pred: np.ndarray, y_proba: np.ndarray
) -> Dict[str, float]:
    classes = sorted(np.unique(y_true).tolist())
    return {
        "macro_auc": macro_auc(y_true, y_proba, classes),
        "accuracy": float(accuracy_score(y_true, y_pred)),
        "macro_f1": float(f1_score(y_true, y_pred, average="macro")),
    }


# ---------------------------------------------------------------------
# 6) CV search
# ---------------------------------------------------------------------
def cv_score(
    build_model: Any,
    param_grid: List[Dict[str, Any]],
    X: np.ndarray,
    y: np.ndarray,
    cfg: TrainConfig,
) -> Tuple[Dict[str, Any], float, int]:
    skf = StratifiedKFold(n_splits=cfg.n_splits, shuffle=True, random_state=cfg.random_seed)
    best_params: Dict[str, Any] = {}
    best_score = -np.inf
    failed_folds = 0
    iterator = tqdm(param_grid, desc="CV params", unit="cfg") if tqdm else param_grid
    for params in iterator:
        scores = []
        for train_idx, val_idx in skf.split(X, y):
            model = build_model(params)
            try:
                model.fit(X[train_idx], y[train_idx])
                y_proba = model.predict_proba(X[val_idx])
                score = macro_auc(y[val_idx], y_proba, sorted(np.unique(y).tolist()))
                if not np.isnan(score):
                    scores.append(score)
            except ValueError:
                failed_folds += 1
                continue
        mean_score = float(np.mean(scores)) if scores else float("-inf")
        if mean_score > best_score:
            best_score = mean_score
            best_params = params
    return best_params, best_score, failed_folds


def build_knn(params: Dict[str, Any]) -> Pipeline:
    return Pipeline(
        [
            ("scaler", StandardScaler()),
            ("clf", KNeighborsClassifier(**params)),
        ]
    )


def build_rf(params: Dict[str, Any]) -> RandomForestClassifier:
    return RandomForestClassifier(random_state=42, **params)


def train_mlp_stable(
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_val: np.ndarray,
    y_val: np.ndarray,
    cfg: MLPStableConfig,
) -> Tuple[MLPClassifier, Dict[str, float]]:
    model = MLPClassifier(
        hidden_layer_sizes=cfg.hidden_layer_sizes,
        solver="adam",
        activation="relu",
        learning_rate_init=cfg.lr,
        learning_rate="adaptive",
        alpha=cfg.alpha,
        max_iter=cfg.epochs,
        early_stopping=True,
        validation_fraction=0.1,
        n_iter_no_change=10,
        random_state=42,
    )
    model.fit(X_train, y_train)
    y_proba = model.predict_proba(X_val)
    y_pred = np.argmax(y_proba, axis=1)
    metrics = evaluate_metrics(y_val, y_pred, y_proba)
    return model, metrics


# ---------------------------------------------------------------------
# 7) CNN utilities (optional)
# ---------------------------------------------------------------------
class MFCCDataset(Dataset):
    def __init__(
        self,
        paths: List[str],
        labels: np.ndarray,
        label_encoder: LabelEncoder,
        sr: int,
        n_mfcc: int,
    ):
        self.paths = paths
        self.labels = label_encoder.transform(labels)
        self.sr = sr
        self.n_mfcc = n_mfcc

    def __len__(self) -> int:
        return len(self.paths)

    def __getitem__(self, idx: int) -> Tuple[np.ndarray, int]:
        path = self.paths[idx]
        y, _ = librosa.load(path, sr=self.sr, mono=True)
        mfcc = librosa.feature.mfcc(y=y, sr=self.sr, n_mfcc=self.n_mfcc)
        return mfcc.astype(np.float32), int(self.labels[idx])


class SimpleCNN(nn.Module):
    def __init__(self, n_mfcc: int, n_classes: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv1d(n_mfcc, 32, kernel_size=5, padding=2),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(32, 64, kernel_size=5, padding=2),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.AdaptiveMaxPool1d(1),
        )
        self.fc = nn.Linear(64, n_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.net(x)
        x = x.squeeze(-1)
        return self.fc(x)


def collate_pad(batch: List[Tuple[np.ndarray, int]]) -> Tuple[torch.Tensor, torch.Tensor]:
    lengths = [b[0].shape[1] for b in batch]
    max_len = max(lengths)
    n_mfcc = batch[0][0].shape[0]
    padded = np.zeros((len(batch), n_mfcc, max_len), dtype=np.float32)
    labels = np.zeros(len(batch), dtype=np.int64)
    for i, (mfcc, label) in enumerate(batch):
        padded[i, :, : mfcc.shape[1]] = mfcc
        labels[i] = label
    return torch.from_numpy(padded), torch.from_numpy(labels)


def train_cnn(
    train_paths: List[str],
    train_labels: np.ndarray,
    val_paths: List[str],
    val_labels: np.ndarray,
    label_encoder: LabelEncoder,
    epochs: int = 20,
    batch_size: int = 16,
    lr: float = 1e-3,
    device: str = "cpu",
) -> Tuple[Dict[str, float], Dict[str, Any]]:
    if torch is None:
        raise RuntimeError("PyTorch is required for CNN training.")

    n_mfcc = 13
    n_classes = len(label_encoder.classes_)
    train_ds = MFCCDataset(train_paths, train_labels, label_encoder, sr=22050, n_mfcc=n_mfcc)
    val_ds = MFCCDataset(val_paths, val_labels, label_encoder, sr=22050, n_mfcc=n_mfcc)
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, collate_fn=collate_pad)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False, collate_fn=collate_pad)

    model = SimpleCNN(n_mfcc=n_mfcc, n_classes=n_classes).to(device)
    optim = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(1, epochs + 1):
        model.train()
        batch_iter = train_loader
        if tqdm:
            batch_iter = tqdm(
                train_loader,
                desc=f"CNN train {epoch}/{epochs}",
                unit="batch",
                leave=False,
            )
        for xb, yb in batch_iter:
            xb = xb.to(device)
            yb = yb.to(device)
            optim.zero_grad()
            logits = model(xb)
            loss = criterion(logits, yb)
            loss.backward()
            optim.step()

    model.eval()
    all_probs = []
    all_true = []
    with torch.no_grad():
        for xb, yb in val_loader:
            xb = xb.to(device)
            logits = model(xb)
            probs = torch.softmax(logits, dim=1).cpu().numpy()
            all_probs.append(probs)
            all_true.append(yb.numpy())

    y_true = np.concatenate(all_true)
    y_proba = np.vstack(all_probs)
    y_pred = np.argmax(y_proba, axis=1)
    metrics = evaluate_metrics(y_true, y_pred, y_proba)
    return metrics, {"model": model, "label_encoder": label_encoder}


def filter_existing_audio(paths: np.ndarray, labels: np.ndarray) -> Tuple[List[str], np.ndarray]:
    out_paths: List[str] = []
    out_labels: List[str] = []
    for p, y in zip(paths, labels):
        if "::" in p:
            continue
        if Path(p).is_file():
            out_paths.append(p)
            out_labels.append(y)
    return out_paths, np.asarray(out_labels)


def save_payload(payload: Dict[str, Any], path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    dump(payload, path)


def retrain_and_save_sklearn_models(
    results: Dict[str, Any],
    X_train: np.ndarray,
    X_val: np.ndarray,
    y_train_enc: np.ndarray,
    y_val_enc: np.ndarray,
    label_encoder: LabelEncoder,
    mlp_cfg: MLPStableConfig,
    model_paths: Dict[str, Path] | None = None,
) -> Dict[str, Path]:
    paths = model_paths or DEFAULT_MODEL_PATHS
    saved: Dict[str, Path] = {}
    X_full = np.vstack([X_train, X_val])
    y_full = np.concatenate([y_train_enc, y_val_enc])

    if "knn" in results and "best_params" in results["knn"]:
        model = build_knn(results["knn"]["best_params"])
        model.fit(X_full, y_full)
        save_payload({"model": model, "label_encoder": label_encoder}, paths["knn"])
        saved["knn"] = paths["knn"]

    if "rf" in results and "best_params" in results["rf"]:
        model = build_rf(results["rf"]["best_params"])
        model.fit(X_full, y_full)
        save_payload({"model": model, "label_encoder": label_encoder}, paths["rf"])
        saved["rf"] = paths["rf"]

    if "mlp" in results and "config" in results["mlp"]:
        X_full_mlp, _, mlp_scaler, clip_bounds = preprocess_mlp(X_full, X_full)
        model = MLPClassifier(
            hidden_layer_sizes=mlp_cfg.hidden_layer_sizes,
            solver="adam",
            activation="relu",
            learning_rate_init=mlp_cfg.lr,
            learning_rate="adaptive",
            alpha=mlp_cfg.alpha,
            max_iter=mlp_cfg.epochs,
            early_stopping=True,
            validation_fraction=0.1,
            n_iter_no_change=10,
            random_state=42,
        )
        model.fit(X_full_mlp, y_full)
        payload = {
            "model": model,
            "label_encoder": label_encoder,
            "scaler": mlp_scaler,
            "clip_bounds": clip_bounds,
        }
        save_payload(payload, paths["mlp"])
        saved["mlp"] = paths["mlp"]

    return saved


def save_cnn_artifacts(
    cnn_artifacts: Dict[str, Any] | None, model_paths: Dict[str, Path] | None = None
) -> Path | None:
    if cnn_artifacts is None:
        return None
    paths = model_paths or DEFAULT_MODEL_PATHS
    save_payload(cnn_artifacts, paths["cnn"])
    return paths["cnn"]

In [None]:
# Train执行代码
# ---------------------------------------------------------------------
# 8) Example "notebook flow" (copy into cells if desired)
# ---------------------------------------------------------------------
X_train, y_train, train_paths = load_features(DEFAULT_TRAIN)
X_val, y_val, val_paths = load_features(DEFAULT_VAL)
X_train, X_val = sanitize_features(X_train, X_val)
log_feature_stats("train", X_train)
log_feature_stats("val", X_val)
log_label_stats("train", y_train)
log_label_stats("val", y_val)
print(f"[paths] train_paths={len(train_paths)} val_paths={len(val_paths)}")

le = LabelEncoder()
le.fit(np.concatenate([y_train, y_val]))
y_train_enc = le.transform(y_train)
y_val_enc = le.transform(y_val)

cfg = TrainConfig()
results: Dict[str, Any] = {}

knn_grid = [
    {"n_neighbors": k, "weights": w}
    for k in [3, 5, 7, 9]
    for w in ["uniform", "distance"]
]
rf_grid = [
    {"n_estimators": n, "max_depth": d}
    for n in [100, 200]
    for d in [5, 10, None]
]
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=UserWarning)

for name, builder, grid in [
    ("knn", build_knn, knn_grid),
    ("rf", build_rf, rf_grid),
]:
    if tqdm is None:
        print(f"Training {name}...")
    print(f"[{name}] cv_folds={cfg.n_splits} grid_size={len(grid)}")
    best_params, cv_auc, failed_folds = cv_score(builder, grid, X_train, y_train_enc, cfg)
    try:
        model = builder(best_params)
        model.fit(X_train, y_train_enc)
        y_proba = model.predict_proba(X_val)
        y_pred = np.argmax(y_proba, axis=1)
        metrics = evaluate_metrics(y_val_enc, y_pred, y_proba)
        results[name] = {"best_params": best_params, "cv_macro_auc": cv_auc, "val": metrics}
        print(
            f"[{name}] cv_macro_auc={cv_auc:.4f} "
            f"val_auc={metrics['macro_auc']:.4f} "
            f"val_acc={metrics['accuracy']:.4f} "
            f"val_f1={metrics['macro_f1']:.4f} "
            f"failed_folds={failed_folds}"
        )
    except ValueError as exc:
        results[name] = {
            "best_params": best_params,
            "cv_macro_auc": cv_auc,
            "error": str(exc),
        }
        print(f"[{name}] failed after CV: {exc} failed_folds={failed_folds}")

mlp_cfg = MLPStableConfig()
X_train_mlp, X_val_mlp, mlp_scaler, clip_bounds = preprocess_mlp(X_train, X_val)
try:
    mlp_model, mlp_metrics = train_mlp_stable(
        X_train_mlp, y_train_enc, X_val_mlp, y_val_enc, mlp_cfg
    )
    results["mlp"] = {
        "config": mlp_cfg.__dict__,
        "val": mlp_metrics,
    }
    print(
        f"[mlp] val_auc={mlp_metrics['macro_auc']:.4f} "
        f"val_acc={mlp_metrics['accuracy']:.4f} "
        f"val_f1={mlp_metrics['macro_f1']:.4f}"
    )
except ValueError as exc:
    results["mlp"] = {"config": mlp_cfg.__dict__, "error": str(exc)}
    print(f"[mlp] failed: {exc}")

# Optional CNN (requires torch)
include_cnn = True
if include_cnn and torch is not None:
    train_audio, y_train_audio = filter_existing_audio(train_paths, y_train)
    val_audio, y_val_audio = filter_existing_audio(val_paths, y_val)
    if train_audio and val_audio:
        cnn_metrics, cnn_artifacts = train_cnn(
            train_audio,
            y_train_audio,
            val_audio,
            y_val_audio,
            le,
            epochs=20,
        )
        results["cnn"] = {"val": cnn_metrics}
    else:
        results["cnn"] = {"error": "No valid audio paths for CNN."}

def _score(name: str) -> float:
    val_score = results.get(name, {}).get("val", {}).get("macro_auc", float("-inf"))
    return val_score if not np.isnan(val_score) else float("-inf")

best_name = max(results.keys(), key=_score)
results["best_model"] = best_name

DEFAULT_OUT.mkdir(parents=True, exist_ok=True)
with (DEFAULT_OUT / "train_metrics.json").open("w", encoding="utf-8") as f:
    json.dump(results, f, ensure_ascii=False, indent=2)

if best_name == "cnn":
    payload = cnn_artifacts
else:
    # Retrain best sklearn model on Train+Val
    X_full = np.vstack([X_train, X_val])
    y_full = np.concatenate([y_train_enc, y_val_enc])
    if best_name == "knn":
        final_model = build_knn(results["knn"]["best_params"])
        final_model.fit(X_full, y_full)
        payload = {"model": final_model, "label_encoder": le}
    elif best_name == "rf":
        final_model = build_rf(results["rf"]["best_params"])
        final_model.fit(X_full, y_full)
        payload = {"model": final_model, "label_encoder": le}
    else:
        X_full_mlp, _, mlp_scaler, clip_bounds = preprocess_mlp(X_full, X_full)
        final_model = MLPClassifier(
            hidden_layer_sizes=mlp_cfg.hidden_layer_sizes,
            solver="adam",
            activation="relu",
            learning_rate_init=mlp_cfg.lr,
            learning_rate="adaptive",
            alpha=mlp_cfg.alpha,
            max_iter=mlp_cfg.epochs,
            early_stopping=True,
            validation_fraction=0.1,
            n_iter_no_change=10,
            random_state=42,
        )
        final_model.fit(X_full_mlp, y_full)
        payload = {
            "model": final_model,
            "label_encoder": le,
            "scaler": mlp_scaler,
            "clip_bounds": clip_bounds,
        }
    DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True)
    dump(payload, DEFAULT_MODEL)

# Save all sklearn models (KNN/RF/MLP) for multi-model evaluation
saved_paths = retrain_and_save_sklearn_models(
    results,
    X_train,
    X_val,
    y_train_enc,
    y_val_enc,
    le,
    mlp_cfg,
    model_paths=DEFAULT_MODEL_PATHS,
)
print(f"saved sklearn models: {saved_paths}")

# Save CNN if trained
if "cnn" in results and "val" in results["cnn"]:
    cnn_path = save_cnn_artifacts(cnn_artifacts, model_paths=DEFAULT_MODEL_PATHS)
    print(f"saved cnn model: {cnn_path}")

[train] shape=(2252, 78) finite=1.000 min=-741.9331 max=258.3984
[val] shape=(115, 78) finite=1.000 min=-715.3228 max=258.3984
[train] samples=2252 classes=8 dist={ Feeling:276, Friend:292, Happy:284, Married:288, Necessities:284, NewYork:272, RememberMe:284, TryEverything:272 }
[val] samples=115 classes=8 dist={ Feeling:14, Friend:15, Happy:14, Married:12, Necessities:14, NewYork:16, RememberMe:15, TryEverything:15 }
[paths] train_paths=2252 val_paths=115
[knn] cv_folds=5 grid_size=8


CV params: 100%|██████████| 8/8 [00:00<00:00, 36.92cfg/s]


[knn] cv_macro_auc=0.9430 val_auc=0.5652 val_acc=0.2000 val_f1=0.1948 failed_folds=0
[rf] cv_folds=5 grid_size=6


CV params: 100%|██████████| 6/6 [00:19<00:00,  3.21s/cfg]


[rf] cv_macro_auc=0.9775 val_auc=0.7795 val_acc=0.4000 val_f1=0.3937 failed_folds=0
[mlp] val_auc=0.7301 val_acc=0.3217 val_f1=0.3163


                                                                   

saved sklearn models: {'knn': PosixPath('/Users/panmingh/Code/ML_Coursework/MyCourse/models/knn_model.pkl'), 'rf': PosixPath('/Users/panmingh/Code/ML_Coursework/MyCourse/models/rf_model.pkl'), 'mlp': PosixPath('/Users/panmingh/Code/ML_Coursework/MyCourse/models/mlp_model.pkl')}
saved cnn model: /Users/panmingh/Code/ML_Coursework/MyCourse/models/cnn_model.pkl


In [None]:
#Eval代码

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from sklearn.metrics import (
    ConfusionMatrixDisplay,
    accuracy_score,
    auc,
    confusion_matrix,
    f1_score,
    roc_auc_score,
    roc_curve,
)

try:
    from joblib import load
except Exception:  # pragma: no cover
    load = None

try:
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, Dataset
except Exception:  # pragma: no cover
    torch = None
    nn = None
    DataLoader = None
    Dataset = object

try:
    import librosa
except Exception:  # pragma: no cover
    librosa = None

# ---------------------------------------------------------------------
# 1) Paths (keep original .npz/model paths)
# ---------------------------------------------------------------------
DEFAULT_TEST = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results/splits/test.npz")
DEFAULT_MODEL = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/models/final_model.pkl")
DEFAULT_OUT = Path("/Users/panmingh/Code/ML_Coursework/MyCourse/results")
DEFAULT_MODELS = {"final": DEFAULT_MODEL}
METRIC_ORDER = ("accuracy", "macro_f1", "macro_auc")


# ---------------------------------------------------------------------
# 2) Config
# ---------------------------------------------------------------------
@dataclass
class EvalConfig:
    out_dir: Path = DEFAULT_OUT


# ---------------------------------------------------------------------
# 3) Loading + preprocessing
# ---------------------------------------------------------------------
def load_features(path: Path) -> Tuple[np.ndarray, np.ndarray]:
    data = np.load(path, allow_pickle=True)
    X = data["X"].astype(np.float32)
    y = data["labels"].astype(str)
    return X, y


def load_features_with_paths(path: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    data = np.load(path, allow_pickle=True)
    X = data["X"].astype(np.float32)
    y = data["labels"].astype(str)
    paths = data["paths"].astype(str)
    return X, y, paths


def sanitize_features(X: np.ndarray) -> np.ndarray:
    X = np.nan_to_num(X, nan=0.0, posinf=0.0, neginf=0.0)
    low = np.percentile(X, 1, axis=0)
    high = np.percentile(X, 99, axis=0)
    return np.clip(X, low, high)


def load_model(path: Path):
    if load is None:
        raise RuntimeError("joblib is required to load the model.")
    payload = load(path)
    if isinstance(payload, dict) and "model" in payload:
        return payload["model"], payload.get("label_encoder")
    return payload, None


# ---------------------------------------------------------------------
# 4) Metrics + plots (show inline + optional save)
# ---------------------------------------------------------------------
def compute_metrics(
    y_true: np.ndarray, y_pred: np.ndarray, y_proba: np.ndarray
) -> Dict[str, float]:
    labels = sorted(np.unique(y_true).tolist())
    metrics = {
        "accuracy": float(accuracy_score(y_true, y_pred)),
        "macro_f1": float(f1_score(y_true, y_pred, average="macro")),
    }
    try:
        metrics["macro_auc"] = float(
            roc_auc_score(
                y_true,
                y_proba,
                multi_class="ovr",
                average="macro",
                labels=labels,
            )
        )
    except Exception:
        metrics["macro_auc"] = float("nan")
    return metrics


def _finish_plot(fig: plt.Figure, out_path: Path | None, show: bool) -> None:
    fig.tight_layout()
    if out_path is not None:
        fig.savefig(out_path, dpi=150)
    if show:
        plt.show()
    plt.close(fig)


def plot_confusion_matrix(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    out_path: Path | None = None,
    show: bool = True,
    title_suffix: str | None = None,
) -> None:
    labels = sorted(np.unique(y_true).tolist())
    conf = confusion_matrix(y_true, y_pred, labels=labels)
    fig, ax = plt.subplots(figsize=(8, 6))
    disp = ConfusionMatrixDisplay(confusion_matrix=conf, display_labels=labels)
    disp.plot(ax=ax, cmap="Blues", values_format="d", colorbar=False)
    title = "Confusion Matrix (Test)"
    if title_suffix:
        title = f"{title} - {title_suffix}"
    ax.set_title(title)
    _finish_plot(fig, out_path, show)


def plot_confusion_normalized(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    out_path: Path | None = None,
    show: bool = True,
    title_suffix: str | None = None,
) -> None:
    labels = sorted(np.unique(y_true).tolist())
    conf = confusion_matrix(y_true, y_pred, labels=labels)
    row_sums = conf.sum(axis=1, keepdims=True)
    row_sums[row_sums == 0] = 1
    conf_norm = conf / row_sums
    fig, ax = plt.subplots(figsize=(8, 6))
    sns.heatmap(
        conf_norm,
        cmap="Blues",
        vmin=0.0,
        vmax=1.0,
        annot=True,
        fmt=".2f",
        xticklabels=labels,
        yticklabels=labels,
        ax=ax,
    )
    title = "Confusion Matrix (Row-Normalized)"
    if title_suffix:
        title = f"{title} - {title_suffix}"
    ax.set_title(title)
    ax.set_xlabel("Predicted")
    ax.set_ylabel("True")
    _finish_plot(fig, out_path, show)


def plot_roc_curves(
    y_true: np.ndarray,
    y_proba: np.ndarray,
    out_path: Path | None = None,
    show: bool = True,
    title_suffix: str | None = None,
) -> None:
    labels = sorted(np.unique(y_true).tolist())
    fig, ax = plt.subplots(figsize=(8, 6))
    for i, label in enumerate(labels):
        y_bin = (y_true == label).astype(int)
        fpr, tpr, _ = roc_curve(y_bin, y_proba[:, i])
        ax.plot(fpr, tpr, label=f"{label} (AUC {auc(fpr, tpr):.2f})")
    ax.plot([0, 1], [0, 1], "k--", linewidth=1)
    title = "ROC Curves (One-vs-Rest)"
    if title_suffix:
        title = f"{title} - {title_suffix}"
    ax.set_title(title)
    ax.set_xlabel("False Positive Rate")
    ax.set_ylabel("True Positive Rate")
    ax.legend(fontsize=8, loc="lower right")
    _finish_plot(fig, out_path, show)


def save_metrics(metrics: Dict[str, float], out_dir: Path | None = None) -> None:
    if out_dir is None:
        return
    out_dir.mkdir(parents=True, exist_ok=True)
    with (out_dir / "test_metrics.json").open("w", encoding="utf-8") as f:
        json.dump(metrics, f, ensure_ascii=False, indent=2)


def save_metrics_report(
    metrics_by_model: Dict[str, Dict[str, float]], out_dir: Path | None = None
) -> None:
    if out_dir is None:
        return
    out_dir.mkdir(parents=True, exist_ok=True)
    with (out_dir / "test_metrics_all.json").open("w", encoding="utf-8") as f:
        json.dump(metrics_by_model, f, ensure_ascii=False, indent=2)


def print_metrics_table(metrics_by_model: Dict[str, Dict[str, float]]) -> None:
    if not metrics_by_model:
        return
    keys = [k for k in METRIC_ORDER if any(k in m for m in metrics_by_model.values())]
    header = ["model", *keys]
    rows = []
    for model_name, metrics in metrics_by_model.items():
        row = [model_name]
        for key in keys:
            value = metrics.get(key, float("nan"))
            if np.isfinite(value):
                row.append(f"{value:.4f}")
            else:
                row.append("nan")
        rows.append(row)
    widths = [max(len(row[i]) for row in [header, *rows]) for i in range(len(header))]
    line = " | ".join(cell.ljust(widths[i]) for i, cell in enumerate(header))
    sep = "-+-".join("-" * w for w in widths)
    print(line)
    print(sep)
    for row in rows:
        print(" | ".join(cell.ljust(widths[i]) for i, cell in enumerate(row)))


class MFCCDataset(Dataset):
    def __init__(self, paths: List[str], labels: np.ndarray, sr: int, n_mfcc: int):
        self.paths = paths
        self.labels = labels
        self.sr = sr
        self.n_mfcc = n_mfcc

    def __len__(self) -> int:
        return len(self.paths)

    def __getitem__(self, idx: int) -> Tuple[np.ndarray, int]:
        path = self.paths[idx]
        y, _ = librosa.load(path, sr=self.sr, mono=True)
        mfcc = librosa.feature.mfcc(y=y, sr=self.sr, n_mfcc=self.n_mfcc)
        return mfcc.astype(np.float32), int(self.labels[idx])


def collate_pad(batch: List[Tuple[np.ndarray, int]]) -> Tuple[torch.Tensor, torch.Tensor]:
    lengths = [b[0].shape[1] for b in batch]
    max_len = max(lengths)
    n_mfcc = batch[0][0].shape[0]
    padded = np.zeros((len(batch), n_mfcc, max_len), dtype=np.float32)
    labels = np.zeros(len(batch), dtype=np.int64)
    for i, (mfcc, label) in enumerate(batch):
        padded[i, :, : mfcc.shape[1]] = mfcc
        labels[i] = label
    return torch.from_numpy(padded), torch.from_numpy(labels)


def _filter_existing_audio(paths: np.ndarray, labels: np.ndarray) -> Tuple[List[str], np.ndarray]:
    out_paths: List[str] = []
    out_labels: List[str] = []
    for p, y in zip(paths, labels):
        if "::" in p:
            continue
        if Path(p).is_file():
            out_paths.append(p)
            out_labels.append(y)
    return out_paths, np.asarray(out_labels)


def is_torch_model(model: object) -> bool:
    if torch is None or nn is None:
        return False
    return isinstance(model, nn.Module)


def predict_proba_cnn(
    model: nn.Module,
    paths: np.ndarray,
    y_true: np.ndarray,
    batch_size: int = 16,
    device: str = "cpu",
) -> Tuple[np.ndarray, np.ndarray]:
    if torch is None or librosa is None:
        raise RuntimeError("PyTorch and librosa are required for CNN evaluation.")
    audio_paths, labels = _filter_existing_audio(paths, y_true)
    if len(audio_paths) == 0:
        raise RuntimeError("No valid audio paths found for CNN evaluation.")
    dataset = MFCCDataset(audio_paths, labels, sr=22050, n_mfcc=13)
    loader = DataLoader(dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_pad)
    model = model.to(device)
    model.eval()
    all_probs: List[np.ndarray] = []
    all_true: List[np.ndarray] = []
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            logits = model(xb)
            probs = torch.softmax(logits, dim=1).cpu().numpy()
            all_probs.append(probs)
            all_true.append(yb.numpy())
    return np.concatenate(all_true), np.vstack(all_probs)


def evaluate_models(
    model_paths: Dict[str, Path],
    test_path: Path = DEFAULT_TEST,
    out_dir: Path | None = DEFAULT_OUT,
    show: bool = True,
) -> Dict[str, Dict[str, float]]:
    X_test, y_test, test_paths = load_features_with_paths(test_path)
    X_test = sanitize_features(X_test)
    metrics_by_model: Dict[str, Dict[str, float]] = {}
    for model_name, model_path in model_paths.items():
        if not model_path.exists():
            print(f"[warn] skip {model_name}: missing {model_path}")
            continue
        model, label_encoder = load_model(model_path)
        y_true = label_encoder.transform(y_test) if label_encoder is not None else y_test
        if hasattr(model, "predict_proba"):
            y_proba = model.predict_proba(X_test)
            y_pred = np.argmax(y_proba, axis=1)
        elif is_torch_model(model):
            y_true, y_proba = predict_proba_cnn(model, test_paths, y_true)
            y_pred = np.argmax(y_proba, axis=1)
        else:
            print(f"[warn] skip {model_name}: unsupported model type")
            continue
        metrics = compute_metrics(y_true, y_pred, y_proba)
        metrics_by_model[model_name] = metrics
        fig_dir = out_dir / "figures" / model_name if out_dir is not None else None
        if fig_dir is not None:
            fig_dir.mkdir(parents=True, exist_ok=True)
        plot_confusion_matrix(
            y_true,
            y_pred,
            out_path=(fig_dir / "test_confusion.png") if fig_dir is not None else None,
            show=show,
            title_suffix=model_name,
        )
        plot_confusion_normalized(
            y_true,
            y_pred,
            out_path=(fig_dir / "test_confusion_norm.png") if fig_dir is not None else None,
            show=show,
            title_suffix=model_name,
        )
        plot_roc_curves(
            y_true,
            y_proba,
            out_path=(fig_dir / "test_roc_curves.png") if fig_dir is not None else None,
            show=show,
            title_suffix=model_name,
        )
    print_metrics_table(metrics_by_model)
    if out_dir is not None:
        save_metrics_report(metrics_by_model, out_dir)
    return metrics_by_model
# ---------------------------------------------------------------------
# 5) Example "notebook flow" (copy into cells if desired)
# ---------------------------------------------------------------------
models = {
    "knn": Path("/Users/panmingh/Code/ML_Coursework/MyCourse/models/knn_model.pkl"),
    "rf": Path("/Users/panmingh/Code/ML_Coursework/MyCourse/models/rf_model.pkl"),
    "mlp": Path("/Users/panmingh/Code/ML_Coursework/MyCourse/models/mlp_model.pkl"),
    "cnn": Path("/Users/panmingh/Code/ML_Coursework/MyCourse/models/cnn_model.pkl"),
}
evaluate_models(models, test_path=DEFAULT_TEST, out_dir=DEFAULT_OUT, show=True)