# Lecture 10B — Notebook 10B.4: Mini-App — Simple Classification with MFCC Features

**Purpose:** Use MFCC features from your recorded clips to build a tiny end-to-end ML demo (vowel vs fricative, or speaker style).


In [None]:
import os, json, math, re
from pathlib import Path

import numpy as np
import scipy.signal as sig
import scipy.fft as fft
import matplotlib.pyplot as plt

# Optional audio playback
try:
    from IPython.display import Audio, display
    HAS_IPY_AUDIO = True
except Exception:
    HAS_IPY_AUDIO = False

# Optional recording (works only if your environment supports it)
try:
    import sounddevice as sd
    HAS_SD = True
except Exception as e:
    HAS_SD = False
    print("sounddevice not available (recording disabled).", e)

# ============================================================
# 0) Robust project/manifest discovery
# ============================================================
# We try a few common locations so the notebook runs:
# - inside a shared folder (manifest.json next to the notebook)
# - inside the provided course folder EE519_L10B_Project
# - in this sandbox environment (/mnt/data/manifest.json)
CANDIDATE_MANIFESTS = [
    Path("manifest.json"),
    Path.cwd() / "manifest.json",
    Path.cwd() / "EE519_L10B_Project" / "manifest.json",
    Path("/mnt/data/manifest.json"),
]

def first_existing(paths):
    for p in paths:
        try:
            if p.exists():
                return p
        except Exception:
            pass
    return None

MANIFEST_PATH = first_existing(CANDIDATE_MANIFESTS)

# If nothing exists yet, we create a default course folder structure
if MANIFEST_PATH is None:
    PROJECT_ROOT = Path.cwd() / "EE519_L10B_Project"
    PROJECT_ROOT.mkdir(parents=True, exist_ok=True)
    MANIFEST_PATH = PROJECT_ROOT / "manifest.json"
else:
    PROJECT_ROOT = MANIFEST_PATH.parent

# Default subfolders (created if you want to record/organize locally)
REC_DIR = PROJECT_ROOT / "recordings"
FIG_DIR = PROJECT_ROOT / "figures"
RES_DIR = PROJECT_ROOT / "results"
for d in [REC_DIR, FIG_DIR, RES_DIR]:
    d.mkdir(parents=True, exist_ok=True)

print("Using PROJECT_ROOT:", PROJECT_ROOT)
print("Using MANIFEST_PATH:", MANIFEST_PATH)

# ============================================================
# 1) Manifest helpers
# ============================================================
def load_manifest(path=MANIFEST_PATH):
    if path.exists():
        return json.loads(path.read_text())
    return {"course":"EE519","lecture":"10B","created_utc":None,"clips":[]}

def save_manifest(manifest, path=MANIFEST_PATH):
    if manifest.get("created_utc") is None:
        manifest["created_utc"] = str(np.datetime64("now"))
    path.write_text(json.dumps(manifest, indent=2))
    print("Saved manifest:", path)

def save_fig(fig, name, dpi=150):
    out = FIG_DIR / name
    fig.savefig(out, dpi=dpi, bbox_inches="tight")
    print("Saved figure:", out)
    return out

manifest = load_manifest()
print("Clips in manifest:", len(manifest.get("clips", [])))

def list_clips():
    clips = manifest.get("clips", [])
    if len(clips) == 0:
        print("(No clips in manifest yet)")
        return
    for i,c in enumerate(clips):
        print(f"[{i}] {c.get('label','?'):14s}  {c.get('filename','?')}  fs={c.get('fs','?')}  notes={c.get('notes','')}")

# ============================================================
# 2) WAV I/O (pure-Python, robust)
# ============================================================
import wave

def write_wav(path: Path, x: np.ndarray, fs: int):
    x = np.asarray(x, dtype=np.float32)
    x = np.clip(x, -1.0, 1.0)
    x_i16 = (x * 32767.0).astype(np.int16)
    with wave.open(str(path), "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(int(fs))
        wf.writeframes(x_i16.tobytes())

def read_wav(path: Path):
    """Read 16-bit PCM mono WAV -> float32 in [-1, 1]."""
    with wave.open(str(path), "rb") as wf:
        fs = wf.getframerate()
        n = wf.getnframes()
        chans = wf.getnchannels()
        sampwidth = wf.getsampwidth()
        if chans != 1 or sampwidth != 2:
            raise ValueError(f"Expected mono 16-bit PCM WAV. Got channels={chans}, sampwidth={sampwidth} bytes")
        x = np.frombuffer(wf.readframes(n), dtype=np.int16).astype(np.float32) / 32768.0
    return int(fs), x

def peak_normalize(x, target=0.98):
    m = float(np.max(np.abs(x)) + 1e-12)
    return (x / m) * target

def play_audio(x, fs, label="audio"):
    if not HAS_IPY_AUDIO:
        print("(Audio playback not available)", label)
        return
    display(Audio(x, rate=fs))

def record_clip(seconds=2.0, fs=16000):
    if not HAS_SD:
        raise RuntimeError("sounddevice not available. Load wav files instead.")
    print(f"Recording {seconds:.1f}s @ {fs} Hz ...")
    x = sd.rec(int(seconds*fs), samplerate=fs, channels=1, dtype="float32")
    sd.wait()
    return int(fs), x.squeeze()

def add_clip_to_manifest(filename, label, fs, notes=""):
    clip = {
        "filename": filename,
        "label": label,
        "fs": int(fs),
        "notes": notes,
        "added_utc": str(np.datetime64("now")),
        "selections": {}
    }
    manifest.setdefault("clips", []).append(clip)
    save_manifest(manifest)
    return len(manifest["clips"]) - 1

# ============================================================
# 3) Audio path resolver (important for sharing notebooks)
# ============================================================
def resolve_audio_path(filename: str) -> Path:
    """Try common locations for audio files referenced by the manifest."""
    p = Path(filename)
    if p.is_absolute() and p.exists():
        return p

    candidates = [
        PROJECT_ROOT / filename,
        PROJECT_ROOT / "recordings" / filename,
        REC_DIR / filename,
        Path.cwd() / filename,
        Path.cwd() / "recordings" / filename,
        Path("/mnt/data") / filename,
    ]
    for c in candidates:
        if c.exists():
            return c
    # If nothing found, return the most likely path (helps debug)
    return PROJECT_ROOT / filename

# ---------- Selection + framing helpers ----------
def seconds_to_samples(t0, t1, fs, xlen):
    s0 = int(max(0, round(float(t0)*fs)))
    s1 = int(min(int(xlen), round(float(t1)*fs)))
    if s1 <= s0:
        raise ValueError("Bad selection: t1 must be > t0")
    return s0, s1

def samples_to_frame_range(s0, s1, N, H, xlen):
    f0 = max(0, int((s0 - N)//H) + 1)
    f1 = min(int((s1)//H), int((xlen-N)//H))
    if f1 < f0:
        f0 = max(0, int(s0//H))
        f1 = min(int((xlen-N)//H), f0)
    return f0, f1

def frame_signal(x, N, H):
    x = np.asarray(x, dtype=np.float64)
    if len(x) < N:
        x = np.pad(x, (0, N-len(x)))
    num = 1 + (len(x) - N)//H
    idx = np.arange(N)[None,:] + H*np.arange(num)[:,None]
    return x[idx]

def db(x):
    return 20*np.log10(np.maximum(x, 1e-12))

# ---------- Auditory scale helpers ----------
def hz_to_mel(hz):
    return 2595.0 * np.log10(1.0 + hz/700.0)

def mel_to_hz(mel):
    return 700.0 * (10**(mel/2595.0) - 1.0)

def mel_filterbank(fs, nfft, n_mels=26, fmin=0.0, fmax=None):
    if fmax is None:
        fmax = fs/2
    mmin, mmax = hz_to_mel(fmin), hz_to_mel(fmax)
    m_pts = np.linspace(mmin, mmax, n_mels+2)
    hz_pts = mel_to_hz(m_pts)

    freqs = np.linspace(0, fs/2, nfft//2 + 1)
    bins = np.floor((nfft+1) * hz_pts / fs).astype(int)

    fb = np.zeros((n_mels, len(freqs)), dtype=np.float64)
    for i in range(n_mels):
        b0, b1, b2 = bins[i], bins[i+1], bins[i+2]
        b0 = np.clip(b0, 0, len(freqs)-1)
        b1 = np.clip(b1, 0, len(freqs)-1)
        b2 = np.clip(b2, 0, len(freqs)-1)
        if b1 == b0: b1 = min(b0+1, len(freqs)-1)
        if b2 == b1: b2 = min(b1+1, len(freqs)-1)

        fb[i, b0:b1] = (np.arange(b0, b1) - b0) / (b1 - b0 + 1e-12)
        fb[i, b1:b2] = (b2 - np.arange(b1, b2)) / (b2 - b1 + 1e-12)
    return fb, freqs, hz_pts

# quick peek
list_clips()


## 0) What this mini-app does

We will:
1) choose two (or three) clips (e.g., vowel vs fricative)
2) compute MFCC features per frame
3) aggregate statistics per clip (mean + std)
4) train a simple classifier (logistic regression) and evaluate

**Note:** This is a teaching “toy” pipeline: it’s about feature intuition, not SOTA accuracy.


## 1) Load clips + assign class labels

This notebook supports two common workflows:

1) **Use an existing `manifest.json` + wav files** (recommended for grading / sharing).  
2) **Record new clips locally** (if `sounddevice` is available).

### How class labels work (for ML)
We will build a dataset where **each clip becomes one training example**:
- Input features: MFCC summary statistics over your selected segment (mean + std).
- Target label: derived from the clip's `label` field (e.g., `vowel_a`, `vowel_i`, `fricative_s`, `sentence`).

You can:
- **Use automatic mapping** from the `label` strings (default), or  
- **Override** with a custom `LABEL_MAP` / `KEEP_LABELS` list below.


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

list_clips()

# ============================================================
# Class-label control
# ============================================================
# The manifest clip field `label` is a string, e.g.:
#   "vowel_a", "vowel_i", "fricative_s", "sentence"
#
# Option A (default): use labels as-is (maybe filtered by KEEP_LABELS).
# Option B: map multiple raw labels into a smaller set of classes via LABEL_MAP.

# Keep only these labels (set to None to keep everything)
KEEP_LABELS = None
# Example:
# KEEP_LABELS = ["vowel_a", "vowel_i", "fricative_s"]

# Map labels -> merged classes (set to None to disable)
LABEL_MAP = None
# Example (binary vowel vs fricative):
# LABEL_MAP = {
#     "vowel_a": "vowel",
#     "vowel_i": "vowel",
#     "fricative_s": "fricative",
# }

def clip_to_class(label: str) -> str:
    lab = str(label).strip()
    if LABEL_MAP is not None:
        return LABEL_MAP.get(lab, None)
    return lab

def should_keep(label: str) -> bool:
    if KEEP_LABELS is None:
        return True
    return str(label).strip() in set(KEEP_LABELS)

print("KEEP_LABELS:", KEEP_LABELS)
print("LABEL_MAP:", LABEL_MAP)


## 2) MFCC feature extractor (reusing earlier steps)


In [None]:
def dct2_matrix(N, K):
    n = np.arange(N)[None,:]
    k = np.arange(K)[:,None]
    D = np.cos(np.pi/N * (n + 0.5) * k)
    D[0,:] *= 1/np.sqrt(N)
    D[1:,:] *= np.sqrt(2/N)
    return D

def compute_mfcc_for_segment(fs, xseg, win_ms=25, hop_ms=10, n_mels=40, n_ceps=13):
    N = int(win_ms*1e-3*fs)
    H = int(hop_ms*1e-3*fs)
    window = sig.windows.hann(N, sym=False)
    frames = frame_signal(xseg, N, H)
    NFFT = 2048 if fs <= 16000 else 4096
    X = fft.rfft(frames * window[None,:], n=NFFT, axis=1)
    P = (np.abs(X)**2) / NFFT
    fb, _, _ = mel_filterbank(fs, NFFT, n_mels=n_mels)
    logM = np.log(P @ fb.T + 1e-12)
    D = dct2_matrix(n_mels, n_ceps)
    mfcc = logM @ D.T
    return mfcc  # (T, n_ceps)

def aggregate_stats(feat):
    # simple clip-level representation: mean and std of each coefficient
    mu = np.mean(feat, axis=0)
    sd = np.std(feat, axis=0)
    return np.concatenate([mu, sd], axis=0)


## 3) Build dataset (X, y)


In [None]:
# ============================================================
# 3) Build dataset (choose your granularity)
# ============================================================
# Why your dataset can look "very small":
# - In the simplest setting we create **one feature vector per clip**
#   by summarizing MFCCs across time (mean+std). That yields only as
#   many samples as you have recorded clips (often just a few).
#
# But MFCC analysis produces many *frames* per clip. To leverage that,
# we can create **multiple training examples per clip** by chunking a
# segment into overlapping windows (recommended), or by using each MFCC
# frame as an example (very large, can overfit, more care needed).
#
# Choose one of:
#   DATASET_MODE = "clip"    -> 1 example per clip (smallest)
#   DATASET_MODE = "window"  -> many examples per clip (best demo)
#   DATASET_MODE = "frame"   -> 1 example per MFCC frame (largest)
DATASET_MODE = "window"

# Windowing settings (only used when DATASET_MODE == "window")
WINDOW_SEC = 0.25       # seconds per window
WINDOW_HOP_SEC = 0.05   # seconds hop between windows

# Use all saved analysis segments (seg1, seg2, ...) instead of only seg1
USE_ALL_SEGMENTS = True

# If a clip has no saved selections, fall back to using the full clip
FALLBACK_TO_FULL_CLIP = True

# Minimum number of MFCC frames required to keep an example
MIN_MFCC_FRAMES = 3

def aggregate_stats(feat_TxD):
    """feat_TxD -> vector of length 2D (mean+std)."""
    mu = np.mean(feat_TxD, axis=0)
    sd = np.std(feat_TxD, axis=0)
    return np.concatenate([mu, sd], axis=0)

X_list, y_list, name_list = [], [], []

clips = manifest.get("clips", [])
for i, clip in enumerate(clips):
    raw_label = clip.get("label", "")
    if not should_keep(raw_label):
        continue

    yname = clip_to_class(raw_label)
    if yname is None:
        continue

    wav_path = resolve_audio_path(clip.get("filename", ""))
    if not wav_path.exists():
        print(f"⚠️ Missing wav for clip {i}: {wav_path}")
        continue

    fs, x = read_wav(wav_path)
    x = peak_normalize(x)

    segs = (clip.get("selections", {}) or {}).get("analysis_segments", {}) or {}

    # If no saved selections exist, optionally use the full clip
    if len(segs) == 0 and FALLBACK_TO_FULL_CLIP:
        segs = {
            "full": {"s0": 0, "s1": len(x), "win_ms": 25.0, "hop_ms": 10.0, "n_mels": 40, "n_ceps": 13}
        }

    if len(segs) == 0:
        # nothing to train on
        continue

    # Decide which segment(s) to use
    seg_keys = list(segs.keys())
    if not USE_ALL_SEGMENTS:
        seg_keys = ["seg1"] if "seg1" in segs else [seg_keys[0]]

    for seg_name in seg_keys:
        sel = segs[seg_name]

        s0 = int(sel.get("s0", 0))
        s1 = int(sel.get("s1", len(x)))
        s0 = max(0, min(s0, len(x)-1))
        s1 = max(s0+1, min(s1, len(x)))
        xseg = x[s0:s1]

        win_ms = float(sel.get("win_ms", 25.0))
        hop_ms = float(sel.get("hop_ms", 10.0))
        n_mels = int(sel.get("n_mels", 40))
        n_ceps = int(sel.get("n_ceps", 13))

        mfcc = compute_mfcc_for_segment(
            fs, xseg,
            win_ms=win_ms, hop_ms=hop_ms,
            n_mels=n_mels, n_ceps=n_ceps
        )  # (T, n_ceps)

        T = mfcc.shape[0]
        if T < MIN_MFCC_FRAMES:
            continue

        if DATASET_MODE == "clip":
            X_list.append(aggregate_stats(mfcc))
            y_list.append(yname)
            name_list.append(f"{clip.get('name','clip'+str(i))}:{seg_name}")

        elif DATASET_MODE == "frame":
            # Each MFCC frame is one sample (D = n_ceps)
            # To keep the classifier comparable, we still output mean+std style vector:
            # here we just use the frame itself and zeros for std (or you can use deltas).
            for t in range(T):
                f = mfcc[t:t+1, :]  # 1 x D
                vec = np.concatenate([mfcc[t, :], np.zeros(n_ceps)], axis=0)
                X_list.append(vec)
                y_list.append(yname)
                name_list.append(f"{clip.get('name','clip'+str(i))}:{seg_name}:t{t}")

        elif DATASET_MODE == "window":
            # Make overlapping windows in MFCC-frame units
            frames_per_window = max(1, int(round(WINDOW_SEC / (hop_ms * 1e-3))))
            hop_frames = max(1, int(round(WINDOW_HOP_SEC / (hop_ms * 1e-3))))

            if T < frames_per_window:
                # If segment shorter than window, just keep one example
                X_list.append(aggregate_stats(mfcc))
                y_list.append(yname)
                name_list.append(f"{clip.get('name','clip'+str(i))}:{seg_name}:short")
            else:
                for t0 in range(0, T - frames_per_window + 1, hop_frames):
                    block = mfcc[t0:t0 + frames_per_window, :]
                    if block.shape[0] < MIN_MFCC_FRAMES:
                        continue
                    X_list.append(aggregate_stats(block))
                    y_list.append(yname)
                    name_list.append(f"{clip.get('name','clip'+str(i))}:{seg_name}:w{t0}")

        else:
            raise ValueError("Unknown DATASET_MODE: " + str(DATASET_MODE))

X = np.array(X_list, dtype=np.float64)
y = np.array(y_list)

print(f"Built dataset with DATASET_MODE='{DATASET_MODE}':")
print("  X shape:", X.shape, "(N_samples, N_features)")
print("  y shape:", y.shape)

# Show per-class counts
if len(y) > 0:
    labs, cnt = np.unique(y, return_counts=True)
    print("\nSamples per class:")
    for lab, c in zip(labs, cnt):
        print(f"  {lab:>15s}: {c}")


## 4) Train + evaluate (toy demo)

If you have only 2–4 samples, evaluation is unstable.  
**Tip:** record multiple vowels/fricatives (fast/slow/loud/soft) to create more samples.


In [None]:
if len(y) < 6:
    print("⚠️ Very small dataset. Record more clips for a meaningful ML demo.")

if len(np.unique(y)) < 2:
    raise ValueError("Need at least 2 classes to train a classifier. Check KEEP_LABELS/LABEL_MAP and your manifest labels.")

# Stratify only if every class has >= 2 samples
uniq, cnt = np.unique(y, return_counts=True)
can_stratify = np.all(cnt >= 2)

Xtr, Xte, ytr, yte = train_test_split(
    X, y,
    test_size=0.5,
    random_state=0,
    stratify=y if can_stratify else None
)

clf = Pipeline([
    ("scaler", StandardScaler()),
    ("lr", LogisticRegression(max_iter=500, multi_class="auto"))
])

clf.fit(Xtr, ytr)
pred = clf.predict(Xte)

print("Accuracy:", accuracy_score(yte, pred))
print()
print("Classification report:")
print(classification_report(yte, pred))

cm = confusion_matrix(yte, pred, labels=np.unique(y))
print("Confusion matrix (rows=true, cols=pred):")
print(cm)

# Simple confusion matrix plot
fig, ax = plt.subplots(figsize=(4.5, 4.0))
im = ax.imshow(cm, aspect="auto")
ax.set_title("Confusion Matrix")
labs = np.unique(y)
ax.set_xticks(range(len(labs))); ax.set_xticklabels(labs, rotation=45, ha="right")
ax.set_yticks(range(len(labs))); ax.set_yticklabels(labs)
ax.set_xlabel("Predicted"); ax.set_ylabel("True")

for r in range(cm.shape[0]):
    for c in range(cm.shape[1]):
        ax.text(c, r, str(cm[r, c]), ha="center", va="center")

plt.tight_layout()
save_fig(fig, "mfcc_confusion_matrix.png")
plt.show()


## Wrap-up
**What you learned:** MFCC features can feed directly into a simple classifier; clip-level mean/std captures coarse “spectral shape” differences.  

## Reflection
1) Which MFCC statistics (mean vs std) do you think separate vowel vs fricative better? Why?  
2) What would you change to make this a better experiment (more clips, cross-validation, per-frame classification, etc.)?  
3) How would noise affect MFCC features, and what could you do about it?
