# Homework 1 (Programming): From Linear Regression (Scratch) to a 3-Layer MLP (Scratch)



- Plotting: `matplotlib`
- You may use GPU if available.

1. Completed TODOs (code).
2. Plots + short written answers in the designated markdown cells.
3. export notebook as `lastname_firstname_hw_mlp.pdf` and submit to gradescope.


## 0. Setup
Run the following cell first. It defines imports and small utilities used throughout the homework.

In [1]:
# === Imports ===
import math
import time
from typing import Dict, Tuple, Callable, Optional, List

import torch
from torch.utils.data import Dataset, DataLoader

import matplotlib.pyplot as plt

# === Device ===
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# === Reproducibility ===
def set_seed(seed: int = 0):
    import random
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(0)

# === Plot helper ===
def plot_curves(curves: Dict[str, List[float]], title: str = "", xlabel: str = "step", ylabel: str = "value"):
    plt.figure()
    for k, v in curves.items():
        plt.plot(v, label=k)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    if title:
        plt.title(title)
    plt.legend()
    plt.show()


Using device: cpu


## 1. Data: Synthetic Regression + Classification

You will use the following datasets to test your models. Feel free to modify the data functions **only if you clearly explain why**.

In [None]:
class SyntheticRegression(Dataset):
    '''
    Nonlinear regression dataset:
        y = sin(x0) + x1^2 - 0.5*x2 + noise
    '''
    def __init__(self, n=4000, d=3, noise_std=0.1):
        assert d >= 3, "d must be >= 3 for this formula"
        X = torch.randn(n, d)
        y = torch.sin(X[:, 0]) + (X[:, 1] ** 2) - 0.5 * X[:, 2]
        y = y + noise_std * torch.randn_like(y)
        self.X = X.float()
        self.y = y.float().reshape(-1, 1)

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, i):
        return self.X[i], self.y[i]


class SyntheticBinaryClassification(Dataset):
    '''
    Nonlinear binary classification dataset:
        logits = sin(x0) + x1^2 - 0.25*x2 + noise
        y = 1[logits > 0]
    '''
    def __init__(self, n=4000, d=3, noise_std=0.2):
        assert d >= 3, "d must be >= 3 for this formula"
        X = torch.randn(n, d)
        logits = torch.sin(X[:, 0]) + (X[:, 1] ** 2) - 0.25 * X[:, 2]
        logits = logits + noise_std * torch.randn_like(logits)
        y = (logits > 0).float().reshape(-1, 1)
        self.X = X.float()
        self.y = y.float()

    def __len__(self):
        return self.X.shape[0]

    def __getitem__(self, i):
        return self.X[i], self.y[i]


def make_loaders(dataset: Dataset, batch_size=64, val_ratio=0.2, shuffle=True):
    n = len(dataset)
    n_val = int(n * val_ratio)
    n_train = n - n_val
    train_ds, val_ds = torch.utils.data.random_split(
        dataset,
        [n_train, n_val],
        generator=torch.Generator().manual_seed(0),
    )
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=shuffle)
    val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
    return train_loader, val_loader


# quick sanity check
train_loader, val_loader = make_loaders(SyntheticRegression(n=512, d=3), batch_size=64)
Xb, yb = next(iter(train_loader))
print("Regression batch shapes:", Xb.shape, yb.shape)

train_loader, val_loader = make_loaders(SyntheticBinaryClassification(n=512, d=3), batch_size=64)
Xb, yb = next(iter(train_loader))
print("Classification batch shapes:", Xb.shape, yb.shape)


Regression batch shapes: torch.Size([64, 3]) torch.Size([64, 1])
Classification batch shapes: torch.Size([64, 3]) torch.Size([64, 1])


# Part 1 — Refactor Training into Reusable Functions (10 pts)

### Task
Implement:
- `train_epoch(model, dataloader, loss_fn, updater)`
- `evaluate(model, dataloader, loss_fn, metric_fn=None)`
- `fit(...)` that calls the above and returns histories

**Required:** keep `loss_history` as **per-step** loss (like your linear regression notebook).


In [None]:
# ===== Part 1 TODO =====

@torch.no_grad()
def evaluate(model: Callable, dataloader: DataLoader, loss_fn: Callable, metric_fn: Optional[Callable] = None):
    '''
    Returns:
        avg_loss: float
        avg_metric: float or None
    '''
    model_mode = getattr(model, "train_mode", None)
    if callable(model_mode):
        model.train_mode(False)

    total_loss, total_metric, n = 0.0, 0.0, 0
    for X, y in dataloader:
        X, y = X.to(device), y.to(device)
        yhat = model(X)
        loss = loss_fn(yhat, y).item()
        bs = X.shape[0]
        total_loss += loss * bs
        n += bs
        if metric_fn is not None:
            total_metric += metric_fn(yhat, y) * bs

    avg_loss = total_loss / max(n, 1)
    avg_metric = (total_metric / max(n, 1)) if metric_fn is not None else None
    return avg_loss, avg_metric


def train_epoch(model: Callable, dataloader: DataLoader, loss_fn: Callable, updater: Callable, log_every: int = 50):
    '''
    Runs one training epoch.
    Returns:
        loss_history_step: list of float (per step)
    '''
    model_mode = getattr(model, "train_mode", None)
    if callable(model_mode):
        model.train_mode(True)

    loss_history = []
    for step, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        yhat = model(X)
        loss = loss_fn(yhat, y)

        # TODO: zero grads
        # TODO: backward
        # TODO: updater()

        raise NotImplementedError("Implement the training step (zero_grad/backward/update).")

        # loss_history.append(loss.item())
        # if (step + 1) % log_every == 0:
        #     print(f"  step {step+1:4d} | loss {loss_history[-1]:.4f}")

    return loss_history


def fit(
    model: Callable,
    train_loader: DataLoader,
    val_loader: DataLoader,
    loss_fn: Callable,
    updater: Callable,
    epochs: int = 10,
    metric_fn: Optional[Callable] = None,
):
    '''
    Returns:
        history dict with keys: train_loss_step, train_loss_epoch, val_loss_epoch, val_metric_epoch
    '''
    train_loss_step = []
    train_loss_epoch = []
    val_loss_epoch = []
    val_metric_epoch = []

    for ep in range(1, epochs + 1):
        step_losses = train_epoch(model, train_loader, loss_fn, updater)
        train_loss_step.extend(step_losses)
        train_loss_epoch.append(sum(step_losses) / max(len(step_losses), 1))

        vloss, vmetric = evaluate(model, val_loader, loss_fn, metric_fn=metric_fn)
        val_loss_epoch.append(vloss)
        val_metric_epoch.append(vmetric)

        msg = f"Epoch {ep:02d} | train_loss {train_loss_epoch[-1]:.4f} | val_loss {vloss:.4f}"
        if metric_fn is not None and vmetric is not None:
            msg += f" | val_metric {vmetric:.4f}"
        print(msg)

    return {
        "train_loss_step": train_loss_step,
        "train_loss_epoch": train_loss_epoch,
        "val_loss_epoch": val_loss_epoch,
        "val_metric_epoch": val_metric_epoch,
    }


# Part 2 — Implement a 3-Layer MLP for Regression (20 pts)

### Model
- Two hidden layers + ReLU
- Output layer is linear (for regression)

### Tasks
1. Implement `init_params_mlp(d, h1, h2)` (returns a dict of tensors with `requires_grad=True`).
2. Implement `relu(x)`.
3. Implement `MLP3` forward.
4. Implement MSE loss from scratch.
5. Implement SGD updater from scratch.

### Report
- Plot per-step training loss for **Linear Model vs MLP3** on the nonlinear regression dataset.
- Explain (2–4 sentences) why MLP improves.


In [None]:
# ===== Part 1 TODO =====

def relu(x: torch.Tensor) -> torch.Tensor:
    # TODO: implement ReLU without nn.ReLU
    raise NotImplementedError

def mse_loss(yhat: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
    # TODO: implement mean squared error
    raise NotImplementedError

def init_params_mlp(d: int, h1: int, h2: int, seed: int = 0, scale: float = 0.01) -> Dict[str, torch.Tensor]:
    '''
    Baseline init: Normal(0, scale)
    Return dict with W1,b1,W2,b2,W3,b3 on correct device and requires_grad=True.
    '''
    set_seed(seed)
    # TODO: implement
    raise NotImplementedError

class MLP3:
    def __init__(self, d: int, h1: int, h2: int, out_dim: int = 1):
        self.params = init_params_mlp(d, h1, h2, seed=0, scale=0.01)
        self._train = True

    def train_mode(self, is_train: bool = True):
        self._train = is_train

    def __call__(self, X: torch.Tensor) -> torch.Tensor:
        P = self.params
        # TODO: implement forward:
        # H1 = relu(X @ P["W1"] + P["b1"])
        # H2 = relu(H1 @ P["W2"] + P["b2"])
        # yhat = H2 @ P["W3"] + P["b3"]
        raise NotImplementedError

    def zero_grad(self):
        for p in self.params.values():
            if p.grad is not None:
                p.grad.zero_()

def sgd_updater(params: Dict[str, torch.Tensor], lr: float, weight_decay: float = 0.0):
    '''
    Returns a function updater() that applies SGD to all params.
    If weight_decay > 0, include L2 regularization (Part 3A).
    '''
    def step():
        # TODO: implement SGD update: p -= lr * grad (and + weight_decay * p if enabled)
        raise NotImplementedError
    return step



class LinearModel:
    def __init__(self, d: int):
        self.W = (0.01 * torch.randn(d, 1, device=device)).requires_grad_(True)
        self.b = torch.zeros(1, device=device, requires_grad=True)
        self._train = True

    def train_mode(self, is_train: bool = True):
        self._train = is_train

    def __call__(self, X: torch.Tensor) -> torch.Tensor:
        return X @ self.W + self.b

    def zero_grad(self):
        if self.W.grad is not None: self.W.grad.zero_()
        if self.b.grad is not None: self.b.grad.zero_()


def linear_mse(yhat, y):
    return ((yhat - y) ** 2).mean()


def linear_sgd_updater(model: LinearModel, lr: float):
    def step():
        with torch.no_grad():
            model.W -= lr * model.W.grad
            model.b -= lr * model.b.grad
    return step


In [None]:
# === Regression experiment runner ===
# After TODOs are done, run this cell.

set_seed(0)
d = 3
train_loader, val_loader = make_loaders(SyntheticRegression(n=4000, d=d, noise_std=0.1), batch_size=64)

lin = LinearModel(d)
lin_updater = linear_sgd_updater(lin, lr=0.05)

mlp = MLP3(d=d, h1=64, h2=32, out_dim=1)
mlp_updater = sgd_updater(mlp.params, lr=0.05)

print("\n--- Linear ---")
hist_lin = fit(lin, train_loader, val_loader, loss_fn=linear_mse, updater=lin_updater, epochs=20)

print("\n--- MLP3 ---")
hist_mlp = fit(mlp, train_loader, val_loader, loss_fn=mse_loss, updater=mlp_updater, epochs=20)

plot_curves({"linear": hist_lin["train_loss_step"], "mlp": hist_mlp["train_loss_step"]},
            title="Regression: loss vs step", ylabel="MSE")


# Part 3 — Initialization Experiments (20 pts)


Compare:
- Tiny Gaussian
- Xavier
- He/Kaiming 

### Report
- Provide a small table (can be markdown) with best validation metric for each setting.
- 3–6 sentences: What helped most and why?


In [None]:


def init_params_xavier(d: int, h1: int, h2: int, seed: int = 0) -> Dict[str, torch.Tensor]:
    # TODO: implement Xavier init for W1,W2,W3; zeros for biases
    raise NotImplementedError

def init_params_he(d: int, h1: int, h2: int, seed: int = 0) -> Dict[str, torch.Tensor]:
    # TODO: implement He/Kaiming init for ReLU layers
    raise NotImplementedError


class MLP3_DropoutInit(MLP3):
    def __init__(self, d: int, h1: int, h2: int, out_dim: int = 1, p_drop: float = 0.0, init: str = "gauss"):
        self._train = True
        self.p_drop = p_drop
        if init == "gauss":
            self.params = init_params_mlp(d, h1, h2, seed=0, scale=0.01)
        elif init == "xavier":
            self.params = init_params_xavier(d, h1, h2, seed=0)
        elif init == "he":
            self.params = init_params_he(d, h1, h2, seed=0)
        else:
            raise ValueError("init must be one of: gauss, xavier, he")

    def __call__(self, X: torch.Tensor) -> torch.Tensor:
        P = self.params
        # TODO: forward with dropout on H1 and/or H2 (training only)
        raise NotImplementedError


In [None]:
# === Part 3 experiment  (fill after TODOs) ===


inits = ["gauss", "xavier", "he"]

results = []
for init in inits:
            set_seed(0)
            train_loader, val_loader = make_loaders(SyntheticBinaryClassification(n=4000, d=3), batch_size=64)
            m = MLP3_Init(d=3, h1=64, h2=32,  init=init)
            up = sgd_updater(m.params, lr=0.05)
            hist = fit(m, train_loader, val_loader, loss_fn=bce_with_logits_loss, updater=up,
                       epochs=10, metric_fn=accuracy_from_logits)
            best_acc = max([v for v in hist["val_metric_epoch"] if v is not None])
            results.append((wd, p, init, best_acc))

results_sorted = sorted(results, key=lambda x: x[-1], reverse=True)
results_sorted[:10]


## Part 3 Report (write here)

Create a markdown table like:

| weight_decay | dropout_p | init | best_val_acc |
|---:|---:|---|---:|
| ... | ... | ... | ... |

Then discuss what helped most.


In [None]:
# === Tiny gradient check example (run after your Part 1/2 are working) ===
# set_seed(0)
# d = 3
# X = torch.randn(8, d, device=device)
# y = (torch.randn(8, 1, device=device) > 0).float()

# m_tiny = MLP3(d=d, h1=4, h2=3, out_dim=1)

# result = finite_difference_check(m_tiny, bce_with_logits_loss, X, y, param_name="W1", idx=(0, 0), eps=1e-4)
# print(result)

# Expected: rel_error ~ 1e-2 or smaller (often much smaller).
