# (Homework) Week 6 - DataScience Bootcamp Fall 2025

All solution cells are replaced with `# TODO` placeholders so you can fill them in.

**Name:** \
**Email:**

---

### Problem 1: Dataset Splitting

1. You have recordings of 44 phones from 100 people; each person records ~200 phones/day for 5 days.
   - Design a valid training/validation/test split strategy that ensures the model generalizes to **new speakers**.

2. You now receive an additional dataset of 10,000 phone recordings from **Kilian**, a single speaker.
   - You must train a model that performs well **specifically for Kilian**, while also maintaining generalization.

*Describe your proposed split strategy and reasoning.* (Theory)

In [None]:

# Problem 1 — Dataset Splitting (Speaker-Generalization + Kilian Adaptation)
# This cell provides:
# 1) A clear written plan (displayed as Markdown) for how to split the data to ensure generalization to new speakers.
# 2) Reusable code utilities to perform strict speaker-level splits and a Kilian-specific split,
#    with unit-test style checks that verify speaker exclusivity across splits.

from IPython.display import Markdown, display
import numpy as np
import pandas as pd

plan = r"""
### Proposed Split Strategy (Speaker Independence + Kilian Adaptation)

**Global (100 speakers) split — by speaker only (not by recording):**
- Train: 70 speakers (all their days/recordings)
- Validation: 15 speakers (all their days/recordings)
- Test: 15 speakers (all their days/recordings)

Rationale: Splitting *by speaker* ensures that validation/test contain **unseen people**, forcing generalization to new voices. 
Splitting within a speaker would leak vocal idiosyncrasies and inflate performance.

**Kilian (single speaker, 10,000 recordings) — within-speaker split:**
- Train: 80% (8,000)
- Validation: 10% (1,000)
- Test: 10% (1,000)

**Training approach (two-stage):**
1. Train a base model on the 100-speaker training set → learn robust, speaker-independent representations.
2. Fine-tune (or adapt) on Kilian-train (8k) using a small learning rate / last layers / adapter blocks.
   - Early stop on Kilian-val (1k).
   - Evaluate on _both_ the global test (15 unseen speakers) and Kilian-test (1k).

**Optional personalization:**
- Add a speaker embedding (e.g., x-vector) or adapter tuned on Kilian.
- Or multi-task: (global loss) + (Kilian-focused loss) with a small weight.

**Reporting:**
- Report **two metrics**: (a) Global test (unseen speakers) and (b) Kilian test.
"""

display(Markdown(plan))

# --------------------------
# Utilities for strict splits
# --------------------------

def speaker_level_split(
    df: pd.DataFrame,
    speaker_col: str = "speaker_id",
    train_frac: float = 0.70,
    val_frac: float = 0.15,
    test_frac: float = 0.15,
    random_state: int = 42,
):
    """
    Splits a metadata DataFrame into train/val/test by **speaker**.
    Assumes df contains a column `speaker_col`. All rows for a given speaker go to exactly one split.
    Returns: (df_train, df_val, df_test)
    """
    assert abs(train_frac + val_frac + test_frac - 1.0) < 1e-8, "Fractions must sum to 1."
    rng = np.random.default_rng(random_state)
    speakers = df[speaker_col].dropna().unique().tolist()
    rng.shuffle(speakers)

    n = len(speakers)
    n_train = int(round(train_frac * n))
    n_val = int(round(val_frac * n))
    # Ensure all speakers are assigned
    n_test = n - n_train - n_val

    train_speakers = set(speakers[:n_train])
    val_speakers = set(speakers[n_train:n_train + n_val])
    test_speakers = set(speakers[n_train + n_val:])

    df_train = df[df[speaker_col].isin(train_speakers)].copy()
    df_val   = df[df[speaker_col].isin(val_speakers)].copy()
    df_test  = df[df[speaker_col].isin(test_speakers)].copy()

    # Sanity checks: disjointness and coverage
    assert train_speakers.isdisjoint(val_speakers)
    assert train_speakers.isdisjoint(test_speakers)
    assert val_speakers.isdisjoint(test_speakers)

    return df_train, df_val, df_test


def kilian_within_speaker_split(
    df_kilian: pd.DataFrame,
    train_frac: float = 0.80,
    val_frac: float = 0.10,
    test_frac: float = 0.10,
    random_state: int = 42,
):
    """
    Splits Kilian's single-speaker data within-speaker by rows (recordings).
    Returns: (kil_train, kil_val, kil_test)
    """
    assert abs(train_frac + val_frac + test_frac - 1.0) < 1e-8, "Fractions must sum to 1."
    rng = np.random.default_rng(random_state)
    idx = np.arange(len(df_kilian))
    rng.shuffle(idx)

    n = len(idx)
    n_train = int(round(train_frac * n))
    n_val = int(round(val_frac * n))
    n_test = n - n_train - n_val

    train_idx = idx[:n_train]
    val_idx   = idx[n_train:n_train + n_val]
    test_idx  = idx[n_train + n_val:]

    kil_train = df_kilian.iloc[train_idx].copy()
    kil_val   = df_kilian.iloc[val_idx].copy()
    kil_test  = df_kilian.iloc[test_idx].copy()

    return kil_train, kil_val, kil_test


# Example (toy) usage with synthetic metadata (run to sanity-check the split logic).
if __name__ == "__main__":
    # Create synthetic metadata for 100 speakers, 5 days, ~200 rec/day
    rows = []
    for spk in range(100):
        for day in range(1, 6):
            for r in range(200):
                rows.append({"speaker_id": f"S{spk:03d}", "day": day, "rec_id": f"S{spk:03d}_d{day}_r{r}"})
    meta = pd.DataFrame(rows)

    df_tr, df_va, df_te = speaker_level_split(meta, "speaker_id", 0.70, 0.15, 0.15, random_state=123)

    # Speaker exclusivity checks:
    assert set(df_tr.speaker_id).isdisjoint(set(df_va.speaker_id))
    assert set(df_tr.speaker_id).isdisjoint(set(df_te.speaker_id))
    assert set(df_va.speaker_id).isdisjoint(set(df_te.speaker_id))

    # Kilian synthetic 10k rows
    kilian_meta = pd.DataFrame({"speaker_id": ["Kilian"]*10_000, "day": np.random.randint(1,6,10_000), "rec_id": [f"K_{i}" for i in range(10_000)]})
    ktr, kva, kte = kilian_within_speaker_split(kilian_meta, 0.80, 0.10, 0.10, random_state=123)

    print("Global split shapes:", tuple(map(len, (df_tr, df_va, df_te))))
    print("Kilian split shapes:", tuple(map(len, (ktr, kva, kte))))


### Problem 2: K-Nearest Neighbors

1. **1-NN Classification:** Given dataset:

   Positive: (1,2), (1,4), (5,4)

   Negative: (3,1), (3,2)

   Plot the 1-NN decision boundary and classify new points visually.

2. **Feature Scaling:** Consider dataset:

   Positive: (100,2), (100,4), (500,4)

   Negative: (300,1), (300,2)

   What would the 1-NN classify point (500,1) as **before and after scaling** to [0,1] per feature?

3. **Handling Missing Values:** How can you modify K-NN to handle missing features in a test point?

4. **High-dimensional Data:** Why can K-NN still work well for images even with thousands of pixels?


In [None]:

# Problem 2 — K-Nearest Neighbors: 1-NN boundary + Feature Scaling demo
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

# Dataset A (small coordinates)
Xa_pos = np.array([[1,2],[1,4],[5,4]])
Xa_neg = np.array([[3,1],[3,2]])
Xa = np.vstack([Xa_pos, Xa_neg])
ya = np.array([1]*len(Xa_pos) + [0]*len(Xa_neg))

# 1-NN decision boundary for Dataset A
knn1 = KNeighborsClassifier(n_neighbors=1)
knn1.fit(Xa, ya)

# grid for plotting
h = 0.05
x_min, x_max = Xa[:,0].min()-1, Xa[:,0].max()+1
y_min, y_max = Xa[:,1].min()-1, Xa[:,1].max()+1
xx, yy = np.meshgrid(np.arange(x_min, x_max, h),
                     np.arange(y_min, y_max, h))
Z = knn1.predict(np.c_[xx.ravel(), yy.ravel()]).reshape(xx.shape)

plt.figure()
plt.contourf(xx, yy, Z, alpha=0.2)
plt.scatter(Xa_pos[:,0], Xa_pos[:,1], marker='o', label='Positive')
plt.scatter(Xa_neg[:,0], Xa_neg[:,1], marker='x', label='Negative')
plt.legend()
plt.title("Problem 2 — 1-NN Decision Boundary (Dataset A)")
plt.xlabel("x1"); plt.ylabel("x2")
plt.show()

# Dataset B (same geometry but x1 scaled ~100x)
Xb_pos = np.array([[100,2],[100,4],[500,4]])
Xb_neg = np.array([[300,1],[300,2]])
Xb = np.vstack([Xb_pos, Xb_neg])
yb = np.array([1]*len(Xb_pos) + [0]*len(Xb_neg))

# Without scaling
knn_no_scale = KNeighborsClassifier(n_neighbors=1).fit(Xb, yb)

x_min, x_max = Xb[:,0].min()-50, Xb[:,0].max()+50
y_min, y_max = Xb[:,1].min()-1,  Xb[:,1].max()+1
xx, yy = np.meshgrid(np.arange(x_min, x_max, 2.0),
                     np.arange(y_min, y_max, 0.05))
Z = knn_no_scale.predict(np.c_[xx.ravel(), yy.ravel()]).reshape(xx.shape)

plt.figure()
plt.contourf(xx, yy, Z, alpha=0.2)
plt.scatter(Xb_pos[:,0], Xb_pos[:,1], marker='o', label='Positive')
plt.scatter(Xb_neg[:,0], Xb_neg[:,1], marker='x', label='Negative')
plt.legend()
plt.title("Problem 2 — 1-NN on Dataset B (No Scaling)")
plt.xlabel("x1"); plt.ylabel("x2")
plt.show()

# With Standardization
pipe = Pipeline([("scaler", StandardScaler()), ("knn", KNeighborsClassifier(n_neighbors=1))])
pipe.fit(Xb, yb)

Zs = pipe.predict(np.c_[xx.ravel(), yy.ravel()]).reshape(xx.shape)

plt.figure()
plt.contourf(xx, yy, Zs, alpha=0.2)
plt.scatter(Xb_pos[:,0], Xb_pos[:,1], marker='o', label='Positive')
plt.scatter(Xb_neg[:,0], Xb_neg[:,1], marker='x', label='Negative')
plt.legend()
plt.title("Problem 2 — 1-NN on Dataset B (With StandardScaler)")
plt.xlabel("x1"); plt.ylabel("x2")
plt.show()

print("Observation: Without scaling, the large x1 range dominates distance; with scaling, axes contribute comparably.")


### Problem 3: Part 1

You are given a fully trained Perceptron model with weight vector **w**, along with training set **D_TR** and test set **D_TE**.

1. Your co-worker suggests evaluating $h(x) = sign(w \cdot x)$ for every $(x, y)$ in D_TR and D_TE. Does this help determine whether test error is higher than training error?
2. Why is there no need to compute training error explicitly for the Perceptron algorithm?

In [None]:

# Problem 3 — Part 1 (Perceptron evaluation theory)
from IPython.display import Markdown, display
txt = r"""
**1) Is evaluating only on $D_{TE}$ enough?**  
No. You should report **both** training and test performance. For a Perceptron trained on linearly separable data, the
training error becomes **zero** upon convergence. However, zero training error does **not** imply low test error. 
Generalization depends on properties like margin, data radius, and sample size—not simply the achieved training error.
Use a held-out validation/test set to estimate generalization.

**Margin bound intuition:** If the data are separable with margin $\gamma$ and inputs are bounded by radius $R$,
the Perceptron makes at most $(R/\gamma)^2$ mistakes during training. This is **not** a direct test-error bound, but it
explains why larger margins typically correlate with better generalization.

**2) Why don't we need to compute training error explicitly?**  
After the Perceptron **converges** on separable data, it classifies all training examples correctly; hence the training
error is identically **0%**. If the data are **not** separable, the algorithm does not converge; in that case, report the
empirical training error by evaluating $\mathrm{sign}(w \cdot x)$ on $D_{TR}$ (but typical Perceptron training loops
monitor mistakes per epoch anyway).
"""
display(Markdown(txt))


### Problem 3: Two-point 2D Dataset (Part 2)

Run the Perceptron algorithm **by hand or in code** on the following data:

1. Positive class: (10, -2)
2. Negative class: (12, 2)

Start with $w_0 = (0, 0)$ and a learning rate of 1.

- Compute how many updates are required until convergence.
- Write down the sequence of $w_i$ vectors.

In [None]:

# Problem 3 — Part 2: Perceptron on two points
import numpy as np

X = np.array([[10, -2],   # positive
              [12,  2]])  # negative
y = np.array([+1, -1])

w = np.array([0.0, 0.0])
lr = 1.0

history = [w.copy()]
updates = 0

def predict(w, x):
    s = np.dot(w, x)
    return 1 if s >= 0 else -1

converged = False
# Iterate with a small safety cap
for epoch in range(100):
    mistakes = 0
    for xi, yi in zip(X, y):
        if predict(w, xi) != yi:
            w = w + lr * yi * xi
            history.append(w.copy())
            updates += 1
            mistakes += 1
    if mistakes == 0:
        converged = True
        break

print("Converged:", converged)
print("Total updates:", updates)
print("Sequence of w:")
for i, wi in enumerate(history):
    print(f"w_{i} = {wi}")


### Problem 4: Reconstructing the Weight Vector

Given the log of Perceptron updates:

| x | y | count |
|---|---|--------|
| (0, 0, 0, 0, 4) | +1 | 2 |
| (0, 0, 6, 5, 0) | +1 | 1 |
| (3, 0, 0, 0, 0) | -1 | 1 |
| (0, 9, 3, 6, 0) | -1 | 1 |
| (0, 1, 0, 2, 5) | -1 | 1 |

Assume learning rate = 1 and initial weight $w_0 = (0, 0, 0, 0, 0)$.

Compute the final weight vector after all updates.

In [None]:

# Problem 4 — Reconstruct final Perceptron weight vector from update log
import numpy as np

# Each entry: (x, y, count)
entries = [
    ((0, 0, 0, 0, 4), +1, 2),
    ((0, 0, 6, 5, 0), +1, 1),
    ((3, 0, 0, 0, 0), -1, 1),
    ((0, 9, 3, 6, 0), -1, 1),
    ((0, 1, 0, 2, 5), -1, 1),
]
w = np.zeros(5, dtype=float)
for x, y, c in entries:
    w += c * y * np.array(x, dtype=float)

print("Final weight vector w =", w)


### Problem 5: Visualizing Perceptron Convergence

Implement a Perceptron on a small 2D dataset with positive and negative examples.

- Plot the data points.
- After each update, visualize the decision boundary.
- Show how it converges to a stable separator.

In [None]:

# Problem 5 — Visualizing Perceptron Convergence on a 2D toy set
import numpy as np
import matplotlib.pyplot as plt

# Simple linearly separable set
pos = np.array([[2,2],[3,3],[2.5,3.5]])
neg = np.array([[0,1],[1,0],[1,1]])

X = np.vstack([pos, neg])
y = np.array([+1]*len(pos) + [-1]*len(neg))

w = np.zeros(2, dtype=float)
b = 0.0
lr = 1.0

def predict_raw(w, b, X):
    return X @ w + b

def sign(z):
    return np.where(z >= 0, 1, -1)

def plot_boundary(w, b, title):
    # grid
    x_min, x_max = X[:,0].min()-0.5, X[:,0].max()+0.5
    y_min, y_max = X[:,1].min()-0.5, X[:,1].max()+0.5
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200),
                         np.linspace(y_min, y_max, 200))
    Z = predict_raw(w, b, np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)

    plt.figure()
    plt.contourf(xx, yy, (Z>=0).astype(int), alpha=0.2)
    plt.scatter(pos[:,0], pos[:,1], marker='o', label='Positive')
    plt.scatter(neg[:,0], neg[:,1], marker='x', label='Negative')
    plt.contour(xx, yy, Z, levels=[0], linewidths=2)
    plt.title(title)
    plt.xlabel("x1"); plt.ylabel("x2")
    plt.legend()
    plt.show()

# Initial boundary
plot_boundary(w, b, "Perceptron — Initial boundary")

# Train; after each mistake, replot (limit to a few for brevity)
plots = 0
max_plots = 4
for epoch in range(10):
    mistakes = 0
    for xi, yi in zip(X, y):
        if yi * (np.dot(w, xi) + b) <= 0:
            w = w + lr * yi * xi
            b = b + lr * yi
            mistakes += 1
            if plots < max_plots:
                plot_boundary(w, b, f"After update {plots+1}: w={w}, b={b}")
                plots += 1
    if mistakes == 0:
        break

# Final boundary
plot_boundary(w, b, "Perceptron — Final boundary")
print("Final parameters:", w, b)
