In [8]:
# Create sample training and validation data for the optimizer comparison
import numpy as np

# Generate a simple synthetic classification dataset without sklearn
np.random.seed(42)

# Create 1000 samples with 10 features
n_samples = 1000
n_features = 10
n_classes = 3

# Generate random features
X = np.random.randn(n_samples, n_features)

# Create class labels based on simple rules
y = np.zeros(n_samples)
for i in range(n_samples):
    if X[i, 0] + X[i, 1] > 0.5:
        y[i] = 0  # Class 0
    elif X[i, 2] - X[i, 3] > 0:
        y[i] = 1  # Class 1
    else:
        y[i] = 2  # Class 2

# Split into train (800) and validation (200) sets
train_size = 800
X_train = X[:train_size]
X_val = X[train_size:]
y_train = y[:train_size]
y_val = y[train_size:]

# Standardize the features manually
X_train_mean = np.mean(X_train, axis=0)
X_train_std = np.std(X_train, axis=0) + 1e-8  # Add small value to avoid division by zero
X_train = (X_train - X_train_mean) / X_train_std
X_val = (X_val - X_train_mean) / X_train_std

# Convert labels to one-hot encoding
def to_one_hot(y, num_classes):
    one_hot = np.zeros((len(y), num_classes))
    one_hot[np.arange(len(y)), y.astype(int)] = 1
    return one_hot

Y_train = to_one_hot(y_train, n_classes)
Y_val = to_one_hot(y_val, n_classes)

print(f"Training data shape: X_train={X_train.shape}, Y_train={Y_train.shape}")
print(f"Validation data shape: X_val={X_val.shape}, Y_val={Y_val.shape}")
print("Data arrays created successfully!")

Training data shape: X_train=(800, 10), Y_train=(800, 3)
Validation data shape: X_val=(200, 10), Y_val=(200, 3)
Data arrays created successfully!


In [9]:
# Simple Neural Network Model for the optimizer comparison
class SimpleMLPModel:
    def __init__(self, in_dim=10, hidden_dim=64, out_dim=3):
        self.in_dim = in_dim
        self.hidden_dim = hidden_dim
        self.out_dim = out_dim
        
        # Initialize weights and biases
        self.params = {
            'W1': np.random.randn(in_dim, hidden_dim) * 0.1,
            'b1': np.zeros((1, hidden_dim)),
            'W2': np.random.randn(hidden_dim, out_dim) * 0.1,
            'b2': np.zeros((1, out_dim))
        }
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, X):
        # First layer
        z1 = np.dot(X, self.params['W1']) + self.params['b1']
        a1 = self.relu(z1)
        
        # Second layer (output)
        z2 = np.dot(a1, self.params['W2']) + self.params['b2']
        a2 = self.softmax(z2)
        
        # Cache for backward pass
        cache = {
            'X': X,
            'z1': z1,
            'a1': a1,
            'z2': z2,
            'a2': a2
        }
        
        return a2, cache
    
    def compute_loss(self, probs, Y_true):
        # Cross-entropy loss
        m = Y_true.shape[0]
        log_likelihood = -np.log(probs + 1e-15)
        loss = np.sum(log_likelihood * Y_true) / m
        return loss
    
    def backward(self, cache, Y_true):
        m = Y_true.shape[0]
        
        # Extract cached values
        X = cache['X']
        a1 = cache['a1']
        a2 = cache['a2']
        z1 = cache['z1']
        
        # Output layer gradients
        dz2 = a2 - Y_true
        dW2 = np.dot(a1.T, dz2) / m
        db2 = np.sum(dz2, axis=0, keepdims=True) / m
        
        # Hidden layer gradients
        da1 = np.dot(dz2, self.params['W2'].T)
        dz1 = da1 * (z1 > 0)  # ReLU derivative
        dW1 = np.dot(X.T, dz1) / m
        db1 = np.sum(dz1, axis=0, keepdims=True) / m
        
        grads = {
            'W1': dW1,
            'b1': db1,
            'W2': dW2,
            'b2': db2
        }
        
        return grads
    
    def predict(self, X):
        probs, _ = self.forward(X)
        return np.argmax(probs, axis=1)

# Create the model instance
model = SimpleMLPModel(in_dim=X_train.shape[1], hidden_dim=64, out_dim=3)
print("Model created successfully!")

Model created successfully!


In [10]:
# ---------- Appended cell: Optimizers + 1000-epoch training & comparison ----------
# Paste this cell at the end of your backpropagation.ipynb (after your network, forward/backward, and data).

import numpy as np
import copy
import warnings
warnings.filterwarnings("ignore")

# ----------------- Unified Optimizer -----------------
class Optimizer:
    """
    Unified Optimizer supporting 'sgd', 'momentum', and 'adagrad'.
    lr_decay is applied with formula: lr = lr0 / (1 + lr_decay * epoch)
    NOTE: According to the hint, lr decay is computed BEFORE FP/BP in each epoch.
    """
    def __init__(self, params, optimizer_type='sgd', lr=0.1, lr_decay=0.0, momentum=0.9, epsilon=1e-8):
        self.optimizer_type = optimizer_type.lower()
        self.lr0 = lr
        self.lr_decay = lr_decay
        self.momentum = momentum
        self.epsilon = epsilon
        # state initialization (match params keys)
        self.state_v = {k: np.zeros_like(v) for k, v in params.items()}
        self.state_ss = {k: np.zeros_like(v) for k, v in params.items()}

    def get_lr(self, epoch):
        return self.lr0 / (1.0 + self.lr_decay * epoch)

    def step(self, params, grads, epoch):
        lr = self.get_lr(epoch)
        if self.optimizer_type == 'sgd':
            for k in params.keys():
                params[k] -= lr * grads[k]
        elif self.optimizer_type == 'momentum':
            for k in params.keys():
                self.state_v[k] = self.momentum * self.state_v[k] - lr * grads[k]
                params[k] += self.state_v[k]
        elif self.optimizer_type == 'adagrad':
            for k in params.keys():
                self.state_ss[k] += grads[k] * grads[k]
                adjusted_lr = lr / (np.sqrt(self.state_ss[k]) + self.epsilon)
                params[k] -= adjusted_lr * grads[k]
        else:
            raise ValueError("Unknown optimizer_type: " + str(self.optimizer_type))

# ----------------- Utilities -----------------
def compute_accuracy(model, X, Y_onehot):
    # expects model.predict(X) returning class indices OR returns probs
    if hasattr(model, 'predict'):
        preds = model.predict(X)
    else:
        probs, _ = model.forward(X)
        preds = np.argmax(probs, axis=1)
    true = np.argmax(Y_onehot, axis=1)
    return np.mean(preds == true)

def detect_stabilization(loss_history, window=20, tol=1e-5):
    # returns epoch index where stabilization detected (start of last window) or None
    if len(loss_history) < window * 2:
        return None
    last = np.mean(loss_history[-window:])
    prev = np.mean(loss_history[-2*window:-window])
    if abs(last - prev) < tol:
        return len(loss_history) - window
    return None

# ----------------- Find / prepare model and data in notebook namespace -----------------
# This cell expects the notebook to already define:
# - training data arrays: X_train, Y_train (one-hot), X_val, Y_val (one-hot)
# - a model instance 'model' OR a model class in globals() that can be instantiated
if 'X_train' not in globals() or 'Y_train' not in globals() or 'X_val' not in globals() or 'Y_val' not in globals():
    raise RuntimeError("Training/validation arrays X_train, Y_train, X_val, Y_val must exist in the notebook. Define them and re-run this cell.")

# Try to find a ready model instance
_base_model = None
if 'model' in globals():
    _base_model = model
else:
    # attempt to instantiate a model class (try common names)
    for name, obj in globals().items():
        if isinstance(obj, type) and any(key in name.lower() for key in ('mlp','network','net','model')):
            try:
                # try common constructor signatures (in_dim, hidden_dim, out_dim) - fallback if not available
                _base_model = obj(in_dim=X_train.shape[1], hidden_dim=64, out_dim=Y_train.shape[1])
                print(f"Instantiated model from class: {name}")
                break
            except Exception:
                # try empty constructor
                try:
                    _base_model = obj()
                    print(f"Instantiated model from class (no-arg): {name}")
                    break
                except Exception:
                    continue

if _base_model is None:
    raise RuntimeError("No model instance found. Either set a `model` variable in the notebook or ensure a Model class with an instantiable constructor exists.")

# Verify model has params dict access. If not, try to build one from W/b attributes.
if not hasattr(_base_model, 'params'):
    # attempt to collect weights named W1, b1, W2, b2 from model attributes
    params = {}
    for attr in ['W1','b1','W2','b2','weights','bias']:
        if hasattr(_base_model, attr):
            params[attr] = getattr(_base_model, attr)
    if len(params) == 0:
        raise RuntimeError("Model does not expose `params` dict or named weight attributes (W1, b1, W2, b2). Edit model to expose params or set model.params = {...}).")
    else:
        # expose params as model.params for optimizer compatibility
        _base_model.params = params

# ----------------- Training function -----------------
def train_copy_with_optimizer(base_model, optimizer_name='sgd', lr=0.1, lr_decay=1e-4, momentum=0.9, epochs=1000, print_every=100):
    # create a deep copy so original model remains unchanged
    model_copy = copy.deepcopy(base_model)
    if not hasattr(model_copy, 'params'):
        raise RuntimeError("Model copy missing params.")

    opt = Optimizer(model_copy.params, optimizer_type=optimizer_name, lr=lr, lr_decay=lr_decay, momentum=momentum)
    loss_history = []
    stabilize_epoch = None

    for epoch in range(epochs):
        # Apply learning rate decay BEFORE forward/backprop as required
        current_lr = opt.get_lr(epoch)

        # Forward pass (expects forward to return (probs, cache) - adjust if your forward returns only probs)
        out = model_copy.forward(X_train)
        if isinstance(out, tuple) and len(out) == 2:
            probs, cache = out
        else:
            # if forward returns probs only, no cache available -> can't backprop
            raise RuntimeError("model.forward must return (probs, cache). Adjust your forward to return cache used by backward().")

        loss = model_copy.compute_loss(probs, Y_train)
        loss_history.append(loss)

        # Backprop: expects backward(cache, Y_true) -> grads dict matching model.params keys
        grads = model_copy.backward(cache, Y_train)

        # Weight update (after lr decay)
        opt.step(model_copy.params, grads, epoch)

        # detect stabilization
        if stabilize_epoch is None:
            s = detect_stabilization(loss_history, window=20, tol=1e-5)
            if s is not None:
                stabilize_epoch = s

        # print progress
        if (epoch + 1) % print_every == 0 or epoch == 0:
            acc_train = compute_accuracy(model_copy, X_train, Y_train)
            acc_val = compute_accuracy(model_copy, X_val, Y_val)
            print(f"[{optimizer_name}] Epoch {epoch+1}/{epochs} | lr={current_lr:.6f} | loss={loss:.6f} | train_acc={acc_train:.4f} | val_acc={acc_val:.4f}")

    final_val_acc = compute_accuracy(model_copy, X_val, Y_val)
    return {
        'model': model_copy,
        'loss_history': loss_history,
        'stabilize_epoch': stabilize_epoch,
        'final_val_acc': final_val_acc
    }

# ----------------- Run experiments: compare two optimizers (you can pick any two) -----------------
# In this example we run Momentum vs Adagrad to compare â€” change names if you prefer.
experiments_to_run = [
    {'name': 'momentum', 'lr': 0.05, 'lr_decay': 1e-4, 'momentum': 0.9},
    {'name': 'adagrad',  'lr': 0.5,  'lr_decay': 1e-4, 'momentum': 0.0},
]

results = {}
for exp in experiments_to_run:
    print("\n" + "="*60)
    print(f"Starting experiment: {exp['name']}")
    res = train_copy_with_optimizer(_base_model, optimizer_name=exp['name'], lr=exp['lr'], lr_decay=exp['lr_decay'], momentum=exp['momentum'], epochs=1000, print_every=100)
    results[exp['name']] = res

# ----------------- Summarize comparison -----------------
print("\n" + "="*60)
print("Comparison summary (stabilize epoch = start of the last stable window):")
print("{:12s} | {:16s} | {:12s}".format("Optimizer","StabilizeEpoch","FinalValAcc"))
for k, v in results.items():
    se = v['stabilize_epoch'] if v['stabilize_epoch'] is not None else 'N/A'
    print("{:12s} | {:16s} | {:.4f}".format(k, str(se), v['final_val_acc']))


Starting experiment: momentum
[momentum] Epoch 1/1000 | lr=0.050000 | loss=1.081990 | train_acc=0.4150 | val_acc=0.3800
[momentum] Epoch 100/1000 | lr=0.049510 | loss=0.108952 | train_acc=0.9775 | val_acc=0.9600
[momentum] Epoch 200/1000 | lr=0.049024 | loss=0.061799 | train_acc=0.9950 | val_acc=0.9750
[momentum] Epoch 300/1000 | lr=0.048548 | loss=0.042032 | train_acc=1.0000 | val_acc=0.9750
[momentum] Epoch 200/1000 | lr=0.049024 | loss=0.061799 | train_acc=0.9950 | val_acc=0.9750
[momentum] Epoch 300/1000 | lr=0.048548 | loss=0.042032 | train_acc=1.0000 | val_acc=0.9750
[momentum] Epoch 400/1000 | lr=0.048082 | loss=0.031271 | train_acc=1.0000 | val_acc=0.9850
[momentum] Epoch 500/1000 | lr=0.047624 | loss=0.024449 | train_acc=1.0000 | val_acc=0.9900
[momentum] Epoch 600/1000 | lr=0.047174 | loss=0.019783 | train_acc=1.0000 | val_acc=0.9900
[momentum] Epoch 400/1000 | lr=0.048082 | loss=0.031271 | train_acc=1.0000 | val_acc=0.9850
[momentum] Epoch 500/1000 | lr=0.047624 | loss=0.02