# (Homework) Week 6 - DataScience Bootcamp Fall 2025

All solution cells are replaced with `# TODO` placeholders so you can fill them in.

Name: Keith Wang

Email: hw3345@nyu.edu

---

### Problem 1: Dataset Splitting

1. You have recordings of 44 phones from 100 people; each person records ~200 phones/day for 5 days.
   - Design a valid training/validation/test split strategy that ensures the model generalizes to **new speakers**.

2. You now receive an additional dataset of 10,000 phone recordings from **Kilian**, a single speaker.
   - You must train a model that performs well **specifically for Kilian**, while also maintaining generalization.

*Describe your proposed split strategy and reasoning.* (Theory)

In [None]:
#Todo
#1
import numpy as np
from collections import defaultdict
from typing import Dict, List, Tuple, Iterable

def split_by_speaker(
    speaker_ids: Iterable[str],
    train_ratio=0.70, val_ratio=0.15, seed=2025
) -> Dict[str, List[str]]:
    
    speaker_ids = np.array(list(speaker_ids))
    rng = np.random.default_rng(seed)
    perm = rng.permutation(len(speaker_ids))
    speaker_ids = speaker_ids[perm]

    n = len(speaker_ids)
    n_train = int(round(train_ratio * n))
    n_val   = int(round(val_ratio   * n))
    n_test  = n - n_train - n_val

    splits = {
        "train": list(speaker_ids[:n_train]),
        "val":   list(speaker_ids[n_train:n_train+n_val]),
        "test":  list(speaker_ids[n_train+n_val:]),
    }
    return splits

#2
from typing import Tuple
def split_kilian_utterances(
    kilian_ids: Iterable[str],  # e.g., utterance IDs for Kilian
    train_ratio=0.80, val_ratio=0.10, seed=2025
) -> Dict[str, List[str]]:
   
    arr = np.array(list(kilian_ids))
    rng = np.random.default_rng(seed)
    perm = rng.permutation(len(arr))
    arr = arr[perm]

    n = len(arr)
    n_train = int(round(train_ratio * n))
    n_val   = int(round(val_ratio   * n))
    splits = {
        "train": list(arr[:n_train]),
        "val":   list(arr[n_train:n_train+n_val]),
        "test":  list(arr[n_train+n_val:])
    }
    return splits

class Model:
    def __init__(self):
        pass
    def load_state_dict(self, sd): pass
    def state_dict(self): return {}

def train_global(model, train_data, val_data, max_epochs=20, early_stop_patience=3):
    best_sd, best_val = None, -np.inf
    no_improve = 0
    for epoch in range(max_epochs):
        # train_one_epoch(model, train_data)
        # val_metric = evaluate(model, val_data)
        val_metric = np.random.rand()
        if val_metric > best_val:
            best_val = val_metric; best_sd = model.state_dict(); no_improve = 0
        else:
            no_improve += 1
            if no_improve >= early_stop_patience:
                break
    model.load_state_dict(best_sd)
    return model

def finetune_kilian(model, kilian_train, kilian_val, max_epochs=10, lr=1e-4, freeze_backbone=True):
    if freeze_backbone:
        pass
    best_sd, best_val = None, -np.inf
    no_improve = 0
    for epoch in range(max_epochs):
        # train_one_epoch(model, kilian_train, lr=lr)
        # val_metric = evaluate(model, kilian_val)
        val_metric = np.random.rand()
        if val_metric > best_val:
            best_val = val_metric; best_sd = model.state_dict(); no_improve = 0
        else:
            no_improve += 1
            if no_improve >= 3:
                break
    model.load_state_dict(best_sd)
    return model

global_model = Model()
global_model = train_global(global_model, train_data="global-train", val_data="global-val")
personalized_for_kilian = finetune_kilian(global_model, kilian_train="kilian-train", kilian_val="kilian-val")

### Problem 2: K-Nearest Neighbors

1. **1-NN Classification:** Given dataset:

   Positive: (1,2), (1,4), (5,4)

   Negative: (3,1), (3,2)

   Plot the 1-NN decision boundary and classify new points visually.

2. **Feature Scaling:** Consider dataset:

   Positive: (100,2), (100,4), (500,4)

   Negative: (300,1), (300,2)

   What would the 1-NN classify point (500,1) as **before and after scaling** to [0,1] per feature?

3. **Handling Missing Values:** How can you modify K-NN to handle missing features in a test point?

4. **High-dimensional Data:** Why can K-NN still work well for images even with thousands of pixels?


In [None]:
#Todo
# 1
import numpy as np
import matplotlib.pyplot as plt

# dataset from the prompt
X = np.array([[1,2],[1,4],[5,4],   # positive
              [3,1],[3,2]])        # negative
y = np.array([+1,+1,+1,-1,-1], dtype=int)

def grid_1nn_plot(X, y, k=1, step=0.05):
    x1_min, x1_max = X[:,0].min()-1, X[:,0].max()+1
    x2_min, x2_max = X[:,1].min()-1, X[:,1].max()+1
    xs = np.arange(x1_min, x1_max, step)
    ys = np.arange(x2_min, x2_max, step)
    xx, yy = np.meshgrid(xs, ys)
    Z = np.zeros_like(xx, dtype=int)
    for i in range(xx.shape[0]):
        for j in range(xx.shape[1]):
            q = np.array([xx[i,j], yy[i,j]])
            Z[i,j] = knn_predict(X, y, q, k=k)
    # plot
    plt.figure()
    plt.contourf(xx, yy, Z, alpha=0.2, levels=[-1,0,1])
    plt.scatter(X[y==1,0], X[y==1,1], label="+1")
    plt.scatter(X[y==-1,0], X[y==-1,1], label="-1")
    plt.legend(); plt.title("1-NN decision regions"); plt.xlabel("x1"); plt.ylabel("x2")
    plt.show()

grid_1nn_plot(X, y, k=1)

test_points = np.array([[2,3],[4,3]])
for q in test_points:
    print("q=", q, " -> 1-NN:", knn_predict(X, y, q, k=1))

# 2
X2_pos = np.array([[100,2],[100,4],[500,4]])
X2_neg = np.array([[300,1],[300,2]])
X2 = np.vstack([X2_pos, X2_neg])
y2 = np.array([+1,+1,+1,-1,-1], dtype=int)
q = np.array([500,1])

# before scaling
pred_before = knn_predict(X2, y2, q, k=1)

# after scaling to [0,1] per feature
mins = X2.min(axis=0); maxs = X2.max(axis=0)
def minmax_scale(A): return (A - mins) / (maxs - mins + 1e-12)
pred_after = knn_predict(minmax_scale(X2), y2, minmax_scale(q), k=1)

print("P2(2) (500,1): before scaling ->", pred_before, "; after scaling ->", pred_after)

# 3
def knn_predict_with_mask(X_train, y_train, x_query, mask, k=1):
    # 1 for present feature, 0 for missing
    M = mask.astype(float)
    diffs = (X_train - x_query[None,:]) * M[None,:]
    dists = np.linalg.norm(diffs, axis=1) / (M.sum() + 1e-12)  # normalize by #present dims
    nn_idx = np.argpartition(dists, kth=k-1)[:k]
    votes = y_train[nn_idx].sum()
    return 1 if votes >= 0 else -1

# example: query with x2 missing
mask = np.array([1,0], dtype=int)
q_missing = np.array([2.5, 0.0])  # second dim ignored by mask
print("P2(3) with missing x2 ->", knn_predict_with_mask(X, y, q_missing, mask, k=3))

# 4
rng = np.random.default_rng(0)
X_high = np.hstack([X + rng.normal(scale=0.05, size=X.shape) for _ in range(100)])  # ~200D
from numpy.linalg import svd
Xc = X_high - X_high.mean(axis=0, keepdims=True)
U,S,Vt = svd(Xc, full_matrices=False)
X_pca2 = U[:, :2] * S[:2]     # top-2 PCs scores
q_high = np.hstack([np.array([2.5,3.0]) for _ in range(100)])
q_high = q_high + rng.normal(scale=0.05, size=q_high.shape)
# project query to PC space
q_pca2 = (q_high - X_high.mean(axis=0)) @ Vt[:2].T
print("P2(4) PCA-then-1NN ->", knn_predict(X_pca2, y, q_pca2, k=1))


### Problem 3: Part 1

You are given a fully trained Perceptron model with weight vector **w**, along with training set **D_TR** and test set **D_TE**.

1. Your co-worker suggests evaluating $h(x) = sign(w \cdot x)$ for every $(x, y)$ in D_TR and D_TE. Does this help determine whether test error is higher than training error?
2. Why is there no need to compute training error explicitly for the Perceptron algorithm?

In [None]:
#Todo
import numpy as np

def sign(z: float) -> int:
    return 1 if z >= 0 else -1  # tie -> +1

def predict_linear(w: np.ndarray, x: np.ndarray) -> int:
    return sign(float(w @ x))

def training_error(X: np.ndarray, y: np.ndarray, w: np.ndarray) -> float:
    # X: shape (n, d) [augment with 1 if you want bias inside w]
    # y: shape (n,) in {-1, +1}
    # w: shape (d,)
    
    preds = np.array([predict_linear(w, xi) for xi in X])
    wrong = np.count_nonzero(preds != y)
    return wrong / len(y)


### Problem 3: Two-point 2D Dataset (Part 2)

Run the Perceptron algorithm **by hand or in code** on the following data:

1. Positive class: (10, -2)
2. Negative class: (12, 2)

Start with $w_0 = (0, 0)$ and a learning rate of 1.

- Compute how many updates are required until convergence.
- Write down the sequence of $w_i$ vectors.

In [None]:
import numpy as np

X = np.array([[10., -2.],   # positive
              [12.,  2.]])  # negative
y = np.array([+1, -1])
w = np.array([0., 0.])

sequence = [w.copy()]
changed = True
steps = 0
while changed and steps < 50:
    changed = False
    for xi, yi in zip(X, y):
        margin = yi * float(w @ xi)
        if margin <= 0:
            w = w + yi * xi
            sequence.append(w.copy())
            changed = True
    steps += 1

print("Weight sequence:", sequence)
print("Total updates:", len(sequence)-1)


### Problem 4: Reconstructing the Weight Vector

Given the log of Perceptron updates:

| x | y | count |
|---|---|--------|
| (0, 0, 0, 0, 4) | +1 | 2 |
| (0, 0, 6, 5, 0) | +1 | 1 |
| (3, 0, 0, 0, 0) | -1 | 1 |
| (0, 9, 3, 6, 0) | -1 | 1 |
| (0, 1, 0, 2, 5) | -1 | 1 |

Assume learning rate = 1 and initial weight $w_0 = (0, 0, 0, 0, 0)$.

Compute the final weight vector after all updates.

In [None]:
import numpy as np

# Given update log: (x, y, count)
updates = [
    ((0, 0, 0, 0, 4), +1, 2),
    ((0, 0, 6, 5, 0), +1, 1),
    ((3, 0, 0, 0, 0), -1, 1),
    ((0, 9, 3, 6, 0), -1, 1),
    ((0, 1, 0, 2, 5), -1, 1),
]

w = np.zeros(5, dtype=int)
for x, y, c in updates:
    w += y * c * np.array(x, dtype=int)

print("Final weight vector w =", w.tolist())

### Problem 5: Visualizing Perceptron Convergence

Implement a Perceptron on a small 2D dataset with positive and negative examples.

- Plot the data points.
- After each update, visualize the decision boundary.
- Show how it converges to a stable separator.

In [None]:
#Todo
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

# 1
X_pos = np.array([[2.0, 3.0],
                  [3.0, 2.0],
                  [4.0, 3.0]])
X_neg = np.array([[0.0, 0.5],
                  [1.0, 1.0],
                  [1.0, 0.0]])
X = np.vstack([X_pos, X_neg])
y = np.array([+1]*len(X_pos) + [-1]*len(X_neg), dtype=int)

# Augment features to include bias: x' = (x1, x2, 1)
Xa = np.hstack([X, np.ones((X.shape[0], 1))])

# 2
w = np.zeros(3, dtype=float)  # (w1, w2, b)
history = [] # store weights after each update

max_epochs = 50
for _ in range(max_epochs):
    updated = False
    for xi, yi in zip(Xa, y):
        if yi * float(w @ xi) <= 0:
            w = w + yi * xi
            history.append(w.copy())
            updated = True
    if not updated:
        break

print("Number of updates:", len(history))
print("Final w (w1, w2, b):", w)

# 3
out_dir = Path("perceptron_frames")
out_dir.mkdir(parents=True, exist_ok=True)

def plot_points(ax):
    ax.scatter(X_pos[:, 0], X_pos[:, 1], label="+1")
    ax.scatter(X_neg[:, 0], X_neg[:, 1], label="-1")
    ax.set_xlabel("x1")
    ax.set_ylabel("x2")
    ax.set_title("Perceptron update step")
    ax.legend(loc="best")
    # fix view so boundaries are comparable across frames
    ax.set_xlim(-1, 5)
    ax.set_ylim(-0.5, 4)

def plot_boundary(ax, w_vec):
    # Boundary: w1*x + w2*y + b = 0  ->  y = -(w1/w2)*x - b/w2
    xs = np.linspace(-1, 5, 200)
    if abs(w_vec[1]) < 1e-12:
        # vertical line x = -b/w1
        x0 = -w_vec[2] / (w_vec[0] if abs(w_vec[0]) > 1e-12 else 1e-12)
        ax.axvline(x=x0)
    else:
        ys = - (w_vec[0] / w_vec[1]) * xs - (w_vec[2] / w_vec[1])
        ax.plot(xs, ys)

frame_paths = []
for i, wi in enumerate(history, start=1):
    fig, ax = plt.subplots()
    plot_points(ax)
    plot_boundary(ax, wi)
    p = out_dir / f"update_{i:02d}.png"
    fig.savefig(p, bbox_inches="tight")
    plt.close(fig)
    frame_paths.append(str(p))

print("Saved frames:", frame_paths[:3], "... total:", len(frame_paths))