# Preamble

In [1]:
%matplotlib inline

# Import necessary libraries
import csv
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from numpy import dot
from math import floor
import copy
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
from scipy.stats import multivariate_normal


#-----------------------------------------------------------------------------


# !!!IMPORTANT!!!
Insert your details below. You should see a green checkmark.



In [2]:
# Input student details (replace these with actual input values)
student = {
    "name": "Shaaz Feerasta",                    # Replace with your name
    "email": "feerasta@ualberta.ca",       # Replace with your email
    "ccid": "feerasta",                    # Replace with your CCID
    "idnumber": 1704756,                  # Replace with your ID number
}

In [3]:
# Define the default and user-provided student dictionaries
def_student = {
    "name": "Shaaz Feerasta",
    "email": "feerasta@ualberta.ca",
    "ccid": "feerasta",
    "idnumber": 1704756
}


# Validation checks
assert set(def_student.keys()) == set(student.keys()),   "You don't have all the right entries! Make sure you have `name`, `email`, `ccid`, `idnumber`. ❌"
assert not any(value == "" for value in student.values()), "You haven't filled in all your details! No field should be empty. ❌"
assert all(isinstance(student[k], type(def_student[k])) for k in def_student),    "Your types seem to be off: `name::String`, `email::String`, `ccid::String`, `idnumber::Int`. ❌"
assert student["email"].endswith("@ualberta.ca"), "Your email must end with '@ualberta.ca'. ❌"

print(f"Welcome {student['name']}! ✅")


Welcome Shaaz Feerasta! ✅


# Models

## The model interface
- AbstractModel: This is an abstract type which is used to derive all the model types in this assignment.

- predict: This takes a matrix of samples and returns the prediction doing the proper data transforms.

- get_features: This transforms the features according to the non-linear transform of the model (which is the identity for linear).

- update_transform: This "trains" the transform. Using the data provided we update the PCA or Kernel prototypes used according to the strategy.

- get_linear_model: All models are based on a linear model with transformed features, and thus have a linear model.

- copy: This returns a new copy of the model.



	AbstractModel

Used as the root for all models in this notebook. We provide a helper `predict` function for `AbstractVectors` which transposes the features to a row vector. We also provide a default `update_transform` which does nothing.

In [5]:
class AbstractModel(ABC):
    @abstractmethod
    def predict(self, x):
        """
        Abstract method for prediction. Subclasses must implement this.
        """
        pass
    def update_transform(self, *args):
        """
        Default update_transform method which does nothing.
        """
        pass

In [6]:
# Helper function
def predict(model, x):
    """
    Helper predict function for AbstractModel.
    Transposes the input vector and takes the first element of the result.
    """
    if not isinstance(model, AbstractModel):
        raise TypeError("model must be an instance of AbstractModel.")

    # Convert x to a row vector and call the model's predict
    x_row = np.array(x).reshape(1, -1)  # Transpose to row vector
    return model.predict(x_row)[0]  # Return the first element of the result


## Linear Model


We define a linear model as a linear map:

$$
\hat{y} = \mathbf{x} \mathbf{w}
$$

$\mathbf{x}$ is a row vector. This row vector comes from data matrix $\mathbf{X}$ of size $(\text{samples}, \text{features})$. We can also write the linear model as

$$
\hat{Y} = \mathbf{X} \mathbf{w}
$$

To simplify the `predict` function, we provide a utility function that ensures a `Vector` (interpreted as a column vector) is transformed into a row vector (or a $1 \times n$ matrix). This way, you can call `predict(model, np.random.rand(10))` without worrying about whether \(x\) is a column or row vector.


In [7]:
class LinearModel:
    """
    A linear model that maps inputs to outputs using a weight matrix.
    """
    def __init__(self, in_features, out_features=1):
        # Initialize the weight matrix with zeros (feature_size x output_size)
        self.W = np.zeros((in_features, out_features))

    def predict(self, X):
        """
        Perform a linear transformation: Ŷ = np.dot(X, W)
        """
        if X.ndim == 1:
          X = X.reshape(1, -1)

        return X@self.W

    def copy(self):
        """
        Create a copy of the current model.
        """
        new_model = LinearModel(self.W.shape[0], self.W.shape[1])
        new_model.W = self.W.copy()
        return new_model



## Kernel Functions for Similarity Features

In this section, we will be defining the components to use kernel functions as similarity features to transform our input data. To implement the `KernelModel`, we need to implement a helper function and some similarity functions to use with the model.

- `get_features`
- `cosine_similarity`
- `RBF`


In [8]:
class KernelModel:
    """
    This model transforms the features and then uses a linear model to learn.
    The structure has three components:
    `kern` is the kernel function used to build the features.
    """
    def __init__(self, kern, prototypes=None, out=1, prototype_selection_strategy=None):
        if callable(kern):
            # Check that the kernel function produces a float64 result when applied to random data
            assert isinstance(kern(np.random.rand(5), np.random.rand(5)), float), \
                "Kernel function must return a float64 result."


        # If prototypes are provided, initialize the model with prototypes
        if prototypes is not None:
            assert len(prototypes) > 0, "Prototypes list must have at least one element."
            self.kern = kern
            self.prototype_selection_strategy = prototype_selection_strategy
            self.prototypes = prototypes
            self.model = LinearModel(len(prototypes[1]), out)

        else:
            # If no prototypes, initialize a blank kernel model
            self.kern = kern
            self.prototype_selection_strategy = prototype_selection_strategy
            self.prototypes = []  # Empty prototypes initially
            self.model = LinearModel(0, out)

    def get_linear_model(self):
        """
        Return the linear model component.
        """
        return self.model

    def copy(self):
        """
        Create a copy of the current KernelModel.
        """
        new_kernel_model = KernelModel(
            self.kern,
            list(self.prototypes),
            self.model.W.shape[1],
            self.prototype_selection_strategy
        )
        new_kernel_model.model = self.model.copy()
        return new_kernel_model



### `get_features`
The first function you need to implement is `get_features` which transforms a matrix of features `X` with dimensions `(num_samples, features)` according to the kernel function $K$ and the collection of $N$ prototypes $C = \{\mathbf{c}_1, \mathbf{c}_2, \ldots, \mathbf{c}_N\}$. For each sample $\mathbf{x}$, the new feature vector $\tilde{\mathbf{x}}$ is of dimension $N$ and is constructed using

$$
\tilde{x}_i = K(\mathbf{x}, \mathbf{c}_i)
$$


In [9]:
def get_features(km, X):
    """
    Transforms the matrix of features X with dimensions (num_samples, features)
    using the kernel function and the collection of prototypes.

    Args:
    - km: KernelModel instance containing the kernel function and prototypes.
    - X: Matrix of features (num_samples, num_features).

    Returns:
    - Transformed features matrix (num_samples, len(prototypes)).
    """
    kern, prototypes = km.kern, km.prototypes

    # Create an empty matrix to store the new features
    new_features = np.zeros((X.shape[0], len(prototypes)))

    # BEGIN SOLUTION
    # Fill the new features matrix with kernel function results
    for i in range(X.shape[0]):
        for j in range(len(prototypes)):
            new_features[i][j] = kern(X[i], prototypes[j])
    # END SOLUTION

    return new_features


In [10]:
# #####################
# Test Block
# #####################

# Example kernel function
def simple_kernel(x, c):
    return np.dot(x, c)  # A simple dot product kernel



km = KernelModel(kern=simple_kernel, prototypes=[np.array([1, 2]), np.array([3, 4])])

# Example input matrix X
X = np.array([[1, 2], [3, 4]])

# Get the transformed features
features = get_features(km, X)

print("Transformed Features:\n", features)


assert np.allclose(features, np.array([[5, 11], [11, 25]]), 0.1), "Test Failed!'. ❌"

print(f"Test passed! ✅")




Transformed Features:
 [[ 5. 11.]
 [11. 25.]]
Test passed! ✅


Notice how `get_features` interacts with predict.


In [11]:
def predict(self, x):
        """
        Predicts the output for the KernelModel using the transformed features.

        Args:
        - km: KernelModel instance containing the kernel function, prototypes, and the model.
        - x: Matrix of features (num_samples, num_features).

        Returns:
        - Predictions based on the kernel-transformed features.
        """
        # Transform the input features using the kernel function
        transformed_features = get_features(self, x)

        # Use the linear model to predict the output based on the transformed features
        return (km.model.predict(transformed_features))

# Add the method to MyClass
KernelModel.predict = predict

In [12]:
def update_transform(km, X, Y):
    """
    Updates the kernel model by selecting prototypes and adjusting the linear model.

    Args:
    - km: KernelModel instance containing the kernel function, prototype selection strategy, and model.
    - X: Matrix of features (num_samples, num_features).
    - Y: Vector of target values (num_samples,).

    Updates the KernelModel prototypes and model based on the given data.
    """
    # If nothing, just use the prototypes that already exist.
    try:
      css = km.prototype_selection_strategy
      prototypes = select_prototypes(css, km.kern, X, Y)

        # Update the prototypes and the linear model
      km.prototypes = prototypes
      km.model = LinearModel(len(prototypes), km.model.W.shape[1])

    except:
        pass


### `cosine_similarity`
The cosine similarity measures the angle between two vectors. It is defined as

$$
\frac{\langle x, c \rangle}{\vert\vert x \vert\vert_2 \vert\vert c \vert\vert_2}
$$



In [13]:
def cosine_similarity():
    """
    Returns a function that calculates the cosine similarity between two vectors.
    """
    def similarity(x, c):
        """
        Calculates the cosine similarity between two vectors.

        Args:
        - x: First vector (numpy array).
        - c: Second vector (numpy array).

        Returns:
        - Cosine similarity between the vectors.
        """

        # BEGIN SOLUTION
        return np.dot(x, c)/(np.linalg.norm(x, 2) * np.linalg.norm(c, 2))
        # END SOLUTION

    return similarity


In [15]:
# #####################
# Test Block
# #####################

# Test cosine similarity with two vectors of ones

assert np.allclose(cosine_similarity()(np.ones(5), np.ones(5)), 1), "Test Failed! ❌"

print(f"Test passed! ✅")


Test passed! ✅


### `RBF`
The RBF kernel is widely used in kernel regression and has many appealing properties. Check the Fixed Representations section of the notes for how to implement this function.

In [16]:
def RBF(σ):
    """
    Returns a function that calculates the RBF kernel similarity between two vectors.

    Args:
    - σ: The standard deviation (σ) for the RBF kernel.

    Returns:
    - A function that takes two vectors and returns the RBF similarity.
    """
    def similarity(x, c):
        # BEGIN SOLUTION
        return np.exp(-(np.linalg.norm(x-c)**2/(2*σ**2)))
        # END SOLUTION

    return similarity


In [17]:
# #####################
# Test Block
# #####################


assert np.isclose(RBF(0.1)(np.ones(5), np.ones(5)), 1.0), "First Test Failed! Check your RBF implementation. ❌"
assert np.isclose(RBF(1.0)(np.ones(5), np.zeros(5)), 0.08208499862389876), "Second Test Failed! Check your RBF implementation. ❌"

print(f"Tests passed! ✅")

Tests passed! ✅


In [18]:
# #####################
# Test Block
# #####################

n = 5
rng = np.random.default_rng(10)

# 10 prototypes with dimensions 5
km = KernelModel(cosine_similarity(), [rng.random(n) for _ in range(10)], 1)

X = rng.random((7, n))

# Define the feature function gf
gf = lambda x: [km.kern(x, c) for c in km.prototypes]

# Generate features using list comprehensions, transposing the result for each row of X
feats = np.vstack([gf(x) for x in X])
feats_2 = get_features(km, X)
feats_3 = np.vstack([get_features(km, x.reshape(1, -1)) for x in X])
feats_4 = np.vstack([get_features(km, X[i:i+1, :]) for i in range(X.shape[0])])

# Test if the features are approximately equal and not all zeros

assert np.allclose(feats, feats_2) and np.allclose(feats, feats_3) and np.allclose(feats, feats_4), "Values are not approximately equal! ❌"
assert not np.all(feats == 0.0), "All values are zero! ❌"

print(f"Test passed! ✅")

Test passed! ✅


# Learning

In this section, you will be implementing Lasso regression. We provide implementations of ordinary least squares (OLS) and ridge regression as examples for how the interface works.

## Recap: OLS, Ridge, and Lasso

Before we get into gradient descent, lets review OLS and two common regularization techniques. The first, Ridge, uses $p=2$ (the $\ell_2$ norm), and you implemented this in Assignment 1. We provide the implementation here, since you do not need to do it again. The second, Lasso, is for regularization when $p=1$ (the $\ell_1$ norm). You will be completing this implementation.


## Ridge

Remember ridge regression corresponds to L2 regularization with the cost function

$$
c(\mathbf{w}) = \lVert \mathbf{X} \mathbf{w} - \mathbf{y} \rVert_2^2 + \lambda \lVert \mathbf{w} \rVert_2
$$

and with solution

$$
\mathbf{w}_{\text{MAP}} = (\mathbf{X}^\top \mathbf{X} + \lambda I)^{-1} \mathbf{X}^\top \mathbf{y}
$$

where $I$ is the identity matrix. We can get the OLS solution by setting `λ=0.0`, so we re-use the code for Ridge to get the OLS solution.


In [19]:
class Ridge:
    def __init__(self, λ: float):
        self.λ = λ

    def copy(self):
        return Ridge(self.λ)

# Convenience constructor for OLS (λ=0.0)
def OLS():
    return Ridge(0.0)

def train_ridge(ridge: Ridge, model, X, Y):
    λ = ridge.λ
    n = X.shape[1]
    model.W = np.linalg.inv(X.T @ X + λ * np.eye(n)) @ X.T @ Y


## Lasso regression
Lasso regression corresponds to the $\ell_1$ regularized problem with the cost function:

$$
c(\mathbf{w}) = \lVert \mathbf{X} \mathbf{w} - \mathbf{y} \rVert_2^2 + \lambda \lVert \mathbf{w} \rVert_1
$$

Unlike $\ell_2$ regularization, there is no closed-form solution. So, we have to solve iteratively. Further, this objective is non-differentiable when $ \mathbf{w} = \mathbf{0}$, making gradient descent perform poorly. Instead, we use proximal gradient descent, to get the Lasso regressor. See your notes for the algorithm. You will need to fill in two functions for this portion:
- `prox_l1` : the proximal operator taking a weight, a step size, and the regularization parameter
- `train` : which performs Lasso regression using `prox_l1`


In [20]:
class Lasso:
    def __init__(self, λ: float, τ: float):
        self.λ = λ
        self.τ = τ  # Tolerance

    def copy(self):
        return Lasso(self.λ, self.τ)


In [21]:
def prox_l1(w, η, λ):
  #BEGIN SOLUTION
  if w > η * λ:
    return w - (η * λ)
  elif w < -η * λ:
    return w + (η * λ)
  else:
    return 0
  #END SOLUTION


In [22]:
# ##############
# Test Block
# ##############

assert prox_l1(0.1, 1.0, 0.15) == 0.0, "First test Failed! ❌"
assert prox_l1(1.0, 0.1, 0.15) == 0.985, "Second test Failed! ❌"
assert prox_l1(-1.3, 0.03, 0.2) == -1.294, "Third test Failed! ❌"



print(f"Tests passed! ✅")



Tests passed! ✅


In [23]:
def train_lasso(lasso, model, X, Y):
    n = X.shape[0]  # number of samples
    λ = lasso.λ
    τ = lasso.τ  # tolerance
    err = float('inf')  # Initialize error as infinity
    η = 1/(2*np.linalg.norm((1 / n) * np.dot(X.T, X))) # Learning rate
    Y = np.atleast_2d(Y).T

    def c(x):
        # Calculate the cost function
        e = model.predict(x) - Y
        return np.dot(e.T,e) + λ * np.sum(np.abs(model.W))  # L1 regularization term

    #### BEGIN SOLUTION

    # Covariance matrix of input data X
    XX = (1/n) * (X.T @ X)
    # Calculate XY
    XY = (1/n) * (X.T @ Y)
    while np.abs(c(X) - err) > τ:
        # Update the error with the current loss value
        err = c(X)
        # Compute the gradient and update the weights (ΔW is the weight update)
        g = XX @ model.W - XY
        # Apply the soft thresholding function to each weight in the model
        w = model.W - η*g
        for i, _ in enumerate(w):
            w[i] = prox_l1(w[i], η, λ)
        # Apply L1 regularization to all weights
        model.W = w

    #### END SOLUTION
Lasso.train = train_lasso

In [24]:
# ##############
# Test Block
# ##############
np.random.seed(2)

# Creating Lasso object with λ = 0.1, τ = 0.01
ols = Lasso(0.1, 0.01)

# Generating data
X = np.random.rand(1000, 6)
W = np.random.rand(6, 1)
Y = np.dot(X, W) + np.random.randn(1000, 1) * 0.1  # Adding noise
Y = Y.flatten()

# Linear model
m = LinearModel(6, 1)

# Train the model using Lasso regularization
Lasso.train(ols, m, X, Y)

# Testing mean squared error (MSE)
mse = np.mean((np.dot(X, m.W) - np.atleast_2d(Y).T) ** 2)

print(f"Computed MSE: {mse}")  # Debug: Print the computed MSE

# Assert the computed MSE is close to the expected value
assert np.isclose(mse, 0.049461019642855916, atol=1e-6), "Check your train_lasso function! ❌"  # Compare with the expected value


print(f"Tests passed! ✅")


Computed MSE: 0.04946101964285601
Tests passed! ✅


# Moving to the KernelModel and Prototype Representations

The `KernelModel` does nonlinear regression by first transforming the features using a kernel representation (which we also call a prototype representation) and then calling our standard linear regression algorithms. But, for these models, instead of using the closed-form solution above, we will move to gradient descent. We do so because the dimension of our features can be much higher, and so the closed-form solution is more expensive that doing gradient descent. Note also that with more features it is more likely for $\mathbf{X}^\top \mathbf{X}$ to be ill-conditioned or not invertible, and stochastic gradient descent provides some robustness to this ill-conditioning.


In [25]:
def train(ls, model, X, Y):
	# build K matrix
	K = get_features(model, X)
	train(ls, model.model, K, Y)

## Gradient Descent
In this notebook, we will focus on minibatch gradient descent wiht a constant learning rate, implemented in `ConstantLR`. We also provide `RMSProp` for you, primarily to run as a comparison and for your interest.

Below, you need to implement the function `epoch` which goes through the dataset in minibatches of size `mbgd.n`. Remember to randomize how you go through the data **and** ensure you are using the correct targets for the data passed to the learning update. In this implementation, you will use:

```python
update(model, lossfunc, opt, X_batch, Y_batch)


In [26]:
class MiniBatchGD:
    def __init__(self, n: int):
        self.n = n

class AbstractModel(ABC):
    @abstractmethod
    def predict(self, X):
        pass
class Optimizer(ABC): #added by me
    @abstractmethod
    def copy(self):
        pass

In [27]:
def epoch(mbgd, model, lossfunc, opt, X, Y):
    """
    Perform one epoch of mini-batch gradient descent.

    Args:
        mbgd (MiniBatchGD): The MiniBatchGD object containing batch size.
        model (AbstractModel): The model to update.
        lossfunc (function): The loss function to compute the gradient.
        opt (Optimizer): The optimizer to use for the update.
        X (numpy.ndarray): Input features.
        Y (numpy.ndarray): Target values.
    """

    rand_idx = np.random.RandomState(seed=42).permutation(Y.shape[0])  # Randomize the indices

   	#### BEGIN SOLUTION

    # Calculate number of batches
    num_batches = mbgd.n

    for i in range(num_batches):

        # Determine the batch indices
        batch = np.array_split(rand_idx, num_batches)[i]
        X_batch = X[batch]
        Y_batch = Y[batch]
        # Update the model using the current batch
        opt.update(model, lossfunc, opt, X_batch, Y_batch)


    #### END SOLUTION


In [28]:
def train(mbgd, model, lossfunc, opt, X, Y, num_epochs):
    if isinstance(model, AbstractModel):
        model = model.get_linear_model()
        X = model.get_features(X)

    # Initialize loss array
    L = np.zeros(num_epochs + 1)

    L[0] = calculate_loss(model, lossfunc, X, Y)

    # Training loop
    for i in range(num_epochs):
        epoch(mbgd, model, lossfunc, opt, X, Y)
        L[i + 1] = calculate_loss(model, lossfunc, X, Y)

    return L

In [29]:
# ################
# Test Block
# ################
class LR():
    pass

class LF():
    pass

def gradient(lm, lf, X, Y):
    return X.sum(axis=0)

def update(self, lm, lf, opt, X, Y):
    ΔW = gradient(lm, lf, X, Y)
    lm.W -= ΔW.reshape(-1,1)
LR.update = update

lm = LinearModel(3, 1)
opt = LR()
lf = LF()
X = np.ones((10, 3))
Y = np.arange(0.0, 1.0, 0.1)
mbgd = MiniBatchGD(5)
epoch(mbgd, lm, lf, opt, X, Y)

assert np.all(lm.W == -10.0),  "Test Failed! ❌"

print(f"Tests passed! ✅")

Tests passed! ✅


In [30]:
class TestModel:
    def __init__(self, X, Y):
        self.X = np.zeros_like(X)
        self.Y = np.zeros_like(Y)
        self.position = 1
        self.W = np.zeros((X.shape[1], 1))  # Initialize weights (W) as zeros, assuming linear model


    def update(self, X, Y):
        end_idx_X = self.position + X.shape[0] - 1
        end_idx_Y = self.position + Y.shape[0] - 1

        self.X[self.position:end_idx_X + 1, :] = X
        self.Y[self.position:end_idx_Y + 1, :] = Y
        self.position += X.shape[0]


In [31]:
# Initialize objects for the test
__epoch_opt = LR()   # Optimizer
__epoch_lf = LF()    # Loss function

# Random data generation for X and Y
__epoch_X = np.random.RandomState(seed=42).rand(10, 3)
__epoch_Y = np.random.RandomState(seed=42).rand(10)

# Create the model instance
__epoch_model = TestModel(__epoch_X, __epoch_Y)

# Create MiniBatchGD instance
__epoch_mbgd = MiniBatchGD(5)

# Call the epoch function (this is the core logic to test)
epoch(__epoch_mbgd, __epoch_model, __epoch_lf, __epoch_opt, __epoch_X, __epoch_Y)

# Check if the model data has been updated (asserting that model X and Y are not equal to input X and Y)
test_passed = not (np.array_equal(__epoch_model.X, __epoch_X) or np.array_equal(__epoch_model.Y, __epoch_Y))
assert test_passed,  "Test Failed! ❌"

print(f"Tests passed! ✅")



Tests passed! ✅


## Loss Functions
For this notebook, we will only be using MSE, but we still introduce the abstract type `LossFunction` for future use. Below, you will need to implement the `loss`  function and the `gradient` function for MSE.

Please use this scaled MSE for the loss and gradient:

$$
\text{MSE}(\hat{\mathbf{y}}, \mathbf{y}) = \frac{1}{2n} \sum_{i=1}^n (\hat{y_i} - y_i)^2
$$


In [32]:
class LossFunction(ABC):
    pass

class MSE(LossFunction):
    """
    MSE computes the mean squared error loss.
    """
    pass


In [33]:
def loss(self, y_hat, y):
    """
    Compute the mean squared error loss between target `y` and prediction `y_hat`
    """
    #### BEGIN SOLUTION
    # Compute the error between predictions and actual values

    mse = np.mean(np.square(y_hat - y))

    # Return the mean squared error (scaled by 1 / (2))

    return mse/2
    #### END SOLUTION

MSE.loss = loss

In [34]:
def calculate_loss(self, lf, X, Y):
    """
    Compute the loss between the target `Y` and the prediction of `lm` on input data `X`
    """
    Y_hat = 0  # Initialize the predicted values to 0


    Y_hat = self.predict(X)  # Compute the predictions using the model

    # Calculate and return the loss using the provided loss function
    return lf.loss(Y_hat, Y)

LinearModel.calculate_loss = calculate_loss


In [71]:
def gradient(self, mse, X, Y):
    """
    Compute the gradient of the loss function with respect to the model weights.
    """
    deltaW = np.zeros_like(self.W)  # gradients should be the size of the weights

    #### BEGIN SOLUTION


    # Calculate the error (predictions - actual values)
    prediction = self.predict(X) - Y.reshape(-1,1)
    # Compute the gradient
    deltaW = (X.T @ prediction) / len(Y)
    #### END SOLUTION


    # Ensure that the gradient shape matches the weights shape
    assert deltaW.shape == self.W.shape
    return deltaW

LinearModel.gradient = gradient

In [72]:
# ############
# Test Block
# ############


result_mseloss = LinearModel(3, 1).calculate_loss(MSE(), np.ones((4, 3)), np.array([1, 2, 3, 4]))
print(f"Computed MSE loss: {result_mseloss}")
assert result_mseloss == 3.75, "Error in MSE loss calculation. ❌"


gradient_result = LinearModel(3, 1).gradient( MSE(), np.ones((4, 3)), np.array([1, 2, 3, 4]))
print(f"Computed gradient: {gradient_result}")
__check_msegrad = np.all(gradient_result == -2.5)
assert  __check_msegrad, "Error in MSE gradient calculation. ❌"

print(f"Tests passed! ✅")


Computed MSE loss: 3.75
Computed gradient: [[-2.5]
 [-2.5]
 [-2.5]]
Tests passed! ✅


## Optimizers
Below you will need to implement  optimizer:

- Constant learning rate


#### Constant Learning Rate

`ConstantLR` updates the weights using a constant learning rate $η$

$$
\mathbf{w} = \mathbf{w} - η \cdot \mathbf{g}
$$

where $g$ is the gradient defined by the loss function.


In [73]:
class ConstantLR:
    """
    ConstantLR represents an optimizer with a constant learning rate.
    """
    def __init__(self, eta):
        self.eta = float(eta)

    def copy(self):
        """
        Create a copy of the ConstantLR instance.
        """
        return ConstantLR(self.eta)


In [74]:
def update(self, lm, lf, opt, x, y):
    """
    Update the weights of the LinearModel using the provided loss function, optimizer,
    input data, and target values.

    Parameters:
        lm: LinearModel
            The linear model to be updated.
        lf: LossFunction
            The loss function to compute gradients.
        opt: ConstantLR
            The optimizer with a constant learning rate.
        x: numpy.ndarray
            The input data matrix.
        y: numpy.ndarray
            The target values vector.
    """
    # Compute the gradient
    g = gradient(lm, lf, x, y)

    #### BEGIN SOLUTION
    # Update weights
    lm.W -= opt.eta * g

    #### END SOLUTION
ConstantLR.update = update

In [75]:
# ##############
# Test Block
# ##############

lm = LinearModel(3, 1)  # Create a LinearModel with 3 features and 1 output
opt = ConstantLR(0.1)   # Optimizer with a learning rate of 0.1
lf = MSE()              # Mean Squared Error loss function
X = np.ones((4, 3))     # Input data: 4 samples with 3 features each
Y = np.array([0.1, 0.2, 0.3, 0.4])  # Target values
opt.update(lm, lf, opt, X, Y)  # Perform a single update step
assert np.all(lm.W == 0.025), "Failed ConstantLR test. ❌"


print(f"Tests passed! ✅")

Tests passed! ✅


#### RMSProp

RMSProp is a first-order adaptive stepsize optimizer proposed by Geoff Hinton [in this lecture](http://www.cs.toronto.edu/~tijmen/csc321/slides/lecture_slides_lec6.pdf), similar to ADAM but without Momentum. We provide this optimizer for you, as it is only here as a comparator for your interest. The update equations are as follows:

$
v_i = \rho \cdot v_i + (1 - \rho) \cdot g_i^2
$

$
W_i = W_i - \frac{\eta}{\sqrt{v_i + \epsilon}} \cdot g_i
$


In [76]:
class RMSProp:
    """
    RMSProp optimizer class.

    Attributes:
    - eta (float): Step size
    - rho (float): Decay parameter
    - v (np.ndarray): Exponentially decaying average
    - epsilon (float): Small constant to prevent division by zero
    """
    def __init__(self, eta, rho, v=None, epsilon=1e-5):
        self.eta = eta  # Step size
        self.rho = rho  # Decay parameter
        self.v = v if v is not None else np.zeros((1, 1))  # Default v is a 1x1 zero matrix
        self.epsilon = epsilon  # Small constant for numerical stability

    @classmethod
    def from_linear_model(cls, eta, rho, lm):
        """
        Alternative constructor for initializing RMSProp with a LinearModel.
        """
        return cls(eta, rho, np.zeros_like(lm.W), 1e-5)


    def copy(self):
        """
        Create a copy of the RMSProp instance, resetting the exponential average to zeros.
        """
        return RMSProp(self.eta, self.rho, np.zeros_like(self.v), self.epsilon)


In [77]:
def update_rmsprop(self, lm, lf, opt, x, y):
    """
    Update the weights of a LinearModel using the RMSProp optimizer.

    Parameters:
    - lm: LinearModel instance.
    - lf: LossFunction instance.
    - opt: RMSProp optimizer instance.
    - x: Input matrix (features).
    - y: Target vector.
    """

    # Compute gradient
    g = gradient(lm, lf, x, y)

    # Ensure `v` is of the correct shape
    if g.shape != opt.v.shape:
        opt.v = np.zeros_like(g)

    # Unpack optimizer parameters
    eta, rho, v, epsilon = opt.eta, opt.rho, opt.v, opt.epsilon

    # Update `v` and `lm.W`
    # Element-wise update for `v`
    v[:] = rho * v + (1 - rho) * np.square(g)
    # Update weights with RMSProp rule
    lm.W[:] -= eta * (g / np.sqrt(v + epsilon))

RMSProp.update = update_rmsprop

In [78]:
# #################
# Test Block:
# #################

# Create a LinearModel, RMSProp optimizer, and sample data
lm = LinearModel(2, 1)
opt = RMSProp.from_linear_model(0.1, 0.9, lm)
X = np.array([[0.1, 0.5],
              [0.5, 0.0],
              [1.0, 0.2]])
Y = np.array([1, 2, 3])

# Perform the update
opt.update(lm, MSE(), opt, X, Y)


# Define expected values for `v` and `W`
true_v = np.array([[0.18677777777777768], [0.013444444444444445]])

true_W = np.array([[0.31621930100905377], [0.3161102262149725]])

# Check if the computed values match the expected ones
assert np.allclose(opt.v, true_v, atol=1e-3), "Wrong values for v. ❌"
assert np.allclose(lm.W, true_W, atol=1e-4), "Wrong values for W. ❌"


print(f"Tests passed! ✅")


Tests passed! ✅


# Prototype Selection for Kernels

Now that we have some learning methods defined, we can explore prototype selection strategies for our kernel models. Prototype selection involves picking samples $\mathbf{x}_i$ from our dataset as representative or prototypical points.

1. **Random Prototype Selection**: A random subset of prototypes are selected. This naive baseline is often hard to beat.
2. **Lasso-Based Prototype Selection**: Leverages the sparsity induced by Lasso regression to identify prototypes.


In [79]:
class PrototypeSelectionStrategy(ABC):
    pass


### Random Prototypes from Data

This algorithm selects a random set of `p` prototypes from the data. Surprisingly, this approach is quite effe`tive, as demonstrated in the experiments below.


In [80]:
class RandomPrototypes:
    """
    A prototype selection strategy that selects a random set of p prototypes from the data.
    """
    def __init__(self, p):
        """
        Parameters:
            p (int): Number of prototypes to select.
        """
        self.p = p

    def select_prototypes(self, kern_func, X, Y):
        """
        Selects random prototypes from the data.

        Parameters:
            kern_func: Kernel function (not used in this implementation).
            X: Data matrix.
            Y: Labels vector (not used in this implementation).

        Returns:
            List of selected prototypes.
        """
        # Randomly select 'p' prototypes from X
        prototype_idx = np.random.choice(X.shape[0], self.p, replace=False)
        return [X[i, :] for i in prototype_idx]


### L1 Strategy for selecting prototypes

In this section we will make a novel center selection algorithm based on using L1 regularization. You will need to implement `select_prototypes` which returns an array of prototypes (see `RandomPrototypes` for an example). These prototypes will be selected based on our implementation of `Lasso`. You will use `Lasso` with parameters `L1Prototypes.λ` and `L1Prototypes.τ`, and do a regression on a random set of prototypes `start_p`. Once you do this regression you will use `maxk_idx` to return the top `p` prototypes based off the absolute values of the model weights.

In [81]:
class L1Prototypes:
    """
    A prototype selection strategy that uses L1 regularization to select prototypes.
    """
    def __init__(self, λ, τ, p, start_p):
        """
        Parameters:
            λ (float): Regularization parameter for Lasso regression.
            τ (float): Decay factor.
            p (int): Number of prototypes to select.
            start_p (int): Starting number of prototypes.
        """
        self.λ = λ
        self.τ = τ
        self.p = p
        self.start_p = start_p


In [82]:
def select_prototypes(self, kern_func, X, Y):

    #### BEGIN SOLUTION

    # Randomly select initial prototypes
    prototype_idx = np.random.choice(X.shape[0], self.p, replace=False)
    cur_prototypes = X[prototype_idx, :]

    # Create the KernelModel
    km = KernelModel(kern_func, prototype_idx)
    # Use previously defined Lasso model
    lasso = Lasso(0.1, 0.1)
    # Train the Lasso model
    train_lasso(lasso, km, X, Y)
    # Get the indices of the n largest coefficients (Lasso weights)

    # Return the selected prototypes based on the sorted indices

    #### END SOLUTION

    return X[sorted_idx]

L1Prototypes.select_prototypes = select_prototypes

In [83]:
# #############
# Test Block
# #############

# Setting random seed for reproducibility
np.random.seed(2)

# Kernel model with RBF kernel and L1Prototypes
kern_func = RBF(0.9)
# Generate random input data
rng = np.random.default_rng(2)
X = rng.random((10, 4))  # Example: generates a 10x4 random array

# Define the target function
def m(x):
    return np.sin(np.pi * x[0]) + np.sin(np.pi * x[1]) + np.sin(np.pi * (x[2] + x[3]) / 2)

# Generate target labels
Y = np.array([[m(x) for x in X]])

# Select prototypes using L1Prototypes first
prototype_selection_strategy = L1Prototypes(0.1, 0.001, 4, 10)
selected_prototypes = prototype_selection_strategy.select_prototypes(kern_func, X, Y)

# Now, pass the selected prototypes to the KernelModel
km = KernelModel(kern_func, prototypes=selected_prototypes, prototype_selection_strategy=prototype_selection_strategy)


# Update the model with the new data and check the number of prototypes
update_transform(km, X, Y.T)

assert len(km.prototypes) == 4, "Wrong values for W. ❌"

print(f"Tests passed! ✅")


TypeError: object of type 'numpy.int64' has no len()

# Evaluating models

In the following section, we provide a few helper functions and structs to make evaluating methods straightforward. The abstract type `LearningProblem` with children `GDLearningProblem` is used to construct a learning problem. You will notice these structs contain all the information needed to `train` a model for both gradient descent and for OLS. We also provide the `run_GD` function. These will update the transform according to the provided data and train the model. `run_GD` does this with a copy of the learning problem.

In [None]:
class LearningProblem(ABC):
    """
    Abstract base class for defining a learning problem.
    """
    @abstractmethod
    def train(self):
        pass


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
# Define placeholder types for the model, optimizer, and loss function
M = TypeVar('M', bound='AbstractModel')
O = TypeVar('O', bound='Optimizer')
LF = TypeVar('LF', bound='LossFunction')

class GDLearningProblem:
    """
    This is a class for keeping the necessary gradient descent learning setting components together.
    """
    def __init__(self, gd, model, opt, loss):
        self.gd = gd
        self.model = model
        self.opt = opt
        self.loss = loss

    def copy(self):
        """
        Create a copy of the GDLearningProblem instance.

        Returns:
            GDLearningProblem: A new instance with copied components.
        """
        return GDLearningProblem(
            self.gd,
            self.model.copy(),
            self.opt.copy(),
            self.loss
        )



: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
def run_GD(lp, X, Y, num_epochs):
    # Update the model transformation with the provided data
    update_transform(lp.model, X, Y)

    # Train the model using the gradient descent optimizer
    loss = train(lp.gd, lp.model, lp.loss, lp.opt, X, Y, num_epochs)

    return lp, loss


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In this section you will implement two functions to implement cross validation with random repeated subsampling.`random_dataset_split`: randomly splits the data `X` and `Y` into a training set and a validation set. This will also be used to split out data into a training set and test set. This function returns two tuples `(X_train, Y_train), (X_test, Y_test)`. `cross_validation` $(_check_complete(__check_cross_validation)): This does `k` independent experiments of the given LearningProblem. The function trains the model and then stores the error according to the **Root Mean Squared Error** (no matter what the loss is in the learning problem), storing this in `cv_err`. `cv_err` is then returned. Because these functions require randomness, the check marks which have been used to check correctness before only check for returning the correct datatypes (for `random_dataset_split`) or that the returned vector of numbers `cv_err` is non-zero.


In [None]:
def random_dataset_split(X, Y, n_train):
    # Randomly shuffle the indices
    indices = np.random.permutation(len(Y))

    # BEGIN SOLUTION

    # Split the indices into training and testing sets


    # Split the data into training and testing sets


    # Return two tuples: (X_train, Y_train), (X_test, Y_test)
    return
    # END SOLUTION


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
# #############
# Test Block
# #############
# Set the random seed
rng = np.random.default_rng(10)

# Generate random X and Y
X = rng.random((10, 3))
Y = rng.random(10)

# Call the random dataset split function
data = random_dataset_split(X, Y, 8)

assert data is not None, "Data is None. ❌"

d_train = data[0]
d_test = data[1]

# Check dimensions of training and test sets
assert    d_train[0].shape == (8, 3), "Wrong shape for X train. ❌"
assert    d_train[1].shape == (8,),   "Wrong shape for Y train. ❌"
assert    d_test[0].shape == (2, 3), "Wrong shape for X test. ❌"
assert    d_test[1].shape == (2,), "Wrong shape for Y test. ❌"

print(f"Tests passed! ✅")


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
def cross_validation(lp, X, Y, num_epochs, k, train_size=None):
    """
    Perform k-fold cross-validation for a given LearningProblem.

    Parameters:
        lp: LearningProblem object.
        X: np.ndarray - Input data.
        Y: np.ndarray - Target labels.
        num_epochs: int - Number of epochs for training.
        k: int - Number of folds.
        train_size: int (optional) - Size of the training set (default 90% of data).

    Returns:
        np.ndarray - Cross-validation errors (MSE for each fold).
    """
    if train_size is None:
        train_size = int(floor(X.shape[0] * 0.9))

    cv_err = np.zeros(k)

    for i in range(k):
        # BEGIN SOLUTION
        # Split data into training and validation sets

        # Train the model (use run_GD function)


        # Predict on validation data

        # Compute MSE and store in cv_err
        cv_err[i] =

        # END SOLUTION

    return cv_err


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
#############
# Test Block
# #############

np.random.seed(2)
num_epochs = 100

# Generate random data
X = np.random.rand(5000, 10)
X[:, 1] = 0.1 * X[:, 2]
X[:, 4] = 0.5 * (X[:, 5] + X[:, 6])

# Generate weights and target Y
w = np.random.rand(10)
Y = (X @ w) + np.random.randn(5000) * 0.001

# Set up the learning problem
lp = GDLearningProblem(
    MiniBatchGD(30),          # Mini-batch gradient descent with batch size 30
    LinearModel(X.shape[1], 1),          # Linear model with input size equal to X's columns
    ConstantLR(0.01),                    # Constant learning rate of 0.01
    MSE()                                # Mean Squared Error loss function
)

# Perform cross-validation
cv_err = cross_validation(lp, X, Y, 100, 10)

# Check if all errors are non-zero
assert np.all(cv_err != 0.0), "CV error cannot be 0. ❌"
print("Mean error: " + str(np.mean(cv_err)))
assert np.isclose(4.997695483209219e-07, np.mean(cv_err), atol = 1e-07), "Wrong values. ❌"

print(f"Tests passed! ✅")

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

# Experiments

In this section, we will run three experiments on the different algorithms we implemented above. We provide the data in the `Data` section, and then follow with the three experiments and their descriptions. You will need to analyze and understand the first two experiments for the written portion of this assignment.

## Data

This section creates the datasets we will use in our comparisons. Feel free to play with them.

In [None]:
def unit_normalize_columns(df):
    """
    Normalizes each column in the DataFrame to the range [0, 1].

    Args:
    df (pandas.DataFrame): The DataFrame to normalize.

    Modifies:
    df: The DataFrame is modified in place with normalized columns.
    """
    for column in df.columns:
        min_val = df[column].min()
        max_val = df[column].max()
        # Avoid division by zero in case all values in the column are the same
        if max_val != min_val:
            df[column] = (df[column] - min_val) / (max_val - min_val)
        else:
            df[column] = 0  # or df[column] = 1 based on how you want to handle this case
    return df



: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

### [Boston Housing dataset](https://www.kaggle.com/vikrishnan/boston-house-prices)

The Boston Housing dataset is a dataset from the UCI Machine Learning Repository. It contains information collected by the U.S Census Service concerning housing in the area of Boston Mass, published in 1978. This dataset includes attributes like per capita crime rate, average number of rooms per dwelling, accessibility to highways, and a median value of owner-occupied homes.


In [None]:
# Reading the CSV file with custom column names
names = ["CRIM", "ZN", "INDUS", "CHAS", "NOX", "RM", "AGE", "DIS", "RAD", "TAX", "PTRATIO", "B", "LSTAT", "MEDV"]
housing_data = pd.read_csv("housing.csv", delim_whitespace=True, names=names)

# Normalize the data using the existing function
housing_data = unit_normalize_columns(housing_data)

housing_data.head()

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

### [SUSY dataset](https://archive.ics.uci.edu/ml/datasets/SUSY)

The SUSY dataset consists of 5 million samples of a signal process that produces supersymmetric particles in proton-proton collisions. It is used in particle physics and machine learning research to distinguish between a signal process and a background process. The dataset contains 18 features, with the first column being the label (1 for the signal, 0 for the background) and the other 17 features being kinematic properties measured by the particle detectors in the accelerator.


In [None]:
# Read the SUSY dataset
susy_dataset = pd.read_csv("susysubset.csv", header=None)

# Normalize the dataset using the existing function
susy_dataset = unit_normalize_columns(susy_dataset)
susy_dataset.head()

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

### Highly Correlated dataset

This dataset is similar to the examples found in the original
[Lasso paper](https://rss.onlinelibrary.wiley.com/doi/epdf/10.1111/j.2517-6161.1996.tb02080.x). This dataset simulates heavily correlated data. Below we will use this to test `OLS` (Ordinary Least Squares), `Ridge`, and `Lasso` regression techniques.


In [None]:
def simulate_dataset(rho, N):
    s = 8
    # Create the covariance matrix
    Sigma = np.array([[rho**abs(i - j) for i in range(s)] for j in range(s)])
    # Create a multivariate normal distribution
    mv_g = multivariate_normal(mean=np.zeros(s), cov=Sigma)

    # Coefficients
    beta = np.array([1, 0, 0, 1.5, 1, 0, 0, 1])
    # Standard deviation of the noise
    sigma = 3

    # Generate data points
    X = mv_g.rvs(N)
    # Generate response variable
    Y = X@beta + sigma * np.random.randn(N)

    return X, Y


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

## Plotting our data

The `plot_data` function produces two plots that can be displayed horizontally or vertically. The left or top plot is a box plot over the cross-validation (CV) errors, while the right or bottom plot is a bar graph displaying average CV errors with standard error bars. This function will be used for all the experiments, and you should use this to finish your written experiments.


In [None]:
def plot_data(algs, errs):
    """
    Generates a box plot and a bar graph to display MSE errors.

    Args:
    algs (list): List of algorithm names.
    errs (list of lists): List containing lists of errors for each algorithm.
    vert (bool): If True, plots are displayed vertically. Otherwise, horizontally.

    Returns:
    None
    """
    # Ensure errs is a 2D array/list
    errs = np.array(errs)

    # Standard error calculation
    stderr = lambda x: np.sqrt(np.var(x) / len(x))

    # Prepare data for plotting
    mean_errs = [np.mean(err) for err in errs]
    std_errs = [stderr(err) for err in errs]


    # Creating the box plot and bar graph

    fig, axs = plt.subplots(1, 2, figsize=(10, 5))   # 1 row, 2 columns

    # Box plot
    axs[0].boxplot(errs.T, vert=True, patch_artist=True, showfliers=True, labels=algs)
    axs[0].set_ylabel('MSE')

    # Bar graph
    axs[1].bar(algs, mean_errs, yerr=std_errs, capsize=5)
    axs[1].set_ylabel('MSE' if True else '')

    # Adjusting layout
    plt.tight_layout()

    # Show the plots
    plt.show()


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

## OLS, Ridge, and Lasso

We will compare OLS, Ridge, and Lasso on simulated data mentioned above.

Below we use two plot types to compare the the MSE with 20 training samples.

In [None]:
ols_settings = {
    "ρ": 0.7,
    "k": 100,
    "N": 20
}


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
ρ, N, k = ols_settings['ρ'], ols_settings['N'], ols_settings['k']
algs_ols = ['OLS', 'Ridge', 'Lasso']

np.random.seed(5)


n_features = 8

errs = [[],[],[]]
for i in range(k):
    # Simulating 30 samples
    sim_X, sim_Y = simulate_dataset(ρ, 30)

    # Train test split
    train_X = sim_X[:N]
    test_X = sim_X[N:]

    train_Y = sim_Y[:N]
    test_Y = sim_Y[N:]

    # Ordinary linear Regression
    # Using rigde with λ=0
    λ=0
    ols = Ridge(λ)
    m = LinearModel(n_features, 1)
    train_ridge(ols, m, train_X, train_Y)
    err = np.mean(np.square(test_X@m.W - test_Y))
    errs[0].append(err)

    # Ridge
    λ=0.5
    ols = Ridge(λ)
    m = LinearModel(n_features, 1)
    train_ridge( ols, m, train_X, train_Y)
    err = np.mean(np.square(test_X@m.W - test_Y))
    m2 = m.W
    errs[1].append(err)

    # Lasso
    λ = 0.5
    τ = 0.0005
    ols = Lasso(λ, τ)
    m = LinearModel(n_features, 1)
    train_lasso( ols, m, train_X, train_Y)
    err = np.mean(np.square(np.dot(test_X, m.W) - np.atleast_2d(test_Y).T))
    errs[2].append(err)


plot_data(algs_ols, np.array(errs))


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

## Non-linear feature transforms

We will compare linear to non-linear models using the [Boston Housing dataset](https://www.kaggle.com/vikrishnan/boston-house-prices). We compare a linear representation, cosine similarity with random prototypes, RBF kernels with random prototypes, and RBF kernels with L1 selection. We use cross validation with `k folds=50` and `train_size=450`.



In [None]:
algs = ["Linear", "Cos-Rand", "RBF-Rand", "RBF-L1"]

np.random.seed(43)

# Convert to NumPy array
housing_data_np = housing_data.to_numpy()

# X is all rows, columns 1 to the second-to-last column (exclusive of the last column)
X = housing_data_np[:, :-1]

# Y is all rows, last column
Y = housing_data_np[:, -1]

errs = []

batch_size = 256
n_features = 13
train_size = 475
k_folds = 50
learning_rate = 0.01
epochs = 3

# Normal GD
GD =  GDLearningProblem(
        MiniBatchGD(batch_size),
        LinearModel(n_features, 1),
        ConstantLR(learning_rate),
        MSE())

cv_err = cross_validation(GD, X, Y, epochs, k_folds, train_size=train_size)
errs.append(cv_err)


# Cosine transformation
GD =  GDLearningProblem(
        MiniBatchGD(batch_size),
        LinearModel(n_features, 1),
        ConstantLR(learning_rate),
        MSE())

km = KernelModel(cosine_similarity(), RandomPrototypes(n_features).select_prototypes( cosine_similarity , X , Y ))
gf = lambda x: [km.kern(x, c) for c in km.prototypes]
feats = np.vstack([gf(x) for x in X])

cv_err = cross_validation(GD, feats, Y, epochs, k_folds, train_size=train_size)
errs.append(cv_err)


# RBF transformation
σ = 3.0
GD =  GDLearningProblem(
        MiniBatchGD(batch_size),
        LinearModel(n_features, 1),
        ConstantLR(learning_rate),
        MSE())

km = KernelModel(RBF(σ), RandomPrototypes(n_features).select_prototypes(RBF(σ) , X , Y ))
gf = lambda x: [km.kern(x, c) for c in km.prototypes]
feats = np.vstack([gf(x) for x in X])

cv_err = cross_validation(GD, feats, Y, epochs, k_folds, train_size=train_size)
errs.append(cv_err)


np.random.seed(17)

# L1Prototypes
# Reducing the number of features to 10
new_n = 10
λ = 0.5
τ = 0.9
GD =  GDLearningProblem(
        MiniBatchGD(batch_size),
        LinearModel(new_n, 1),
        ConstantLR(learning_rate),
        MSE())

km = KernelModel(RBF(σ), prototypes = L1Prototypes(λ, τ, new_n, n_features).select_prototypes(RBF(σ) , X ,Y ) , prototype_selection_strategy=L1Prototypes(λ, τ, new_n, n_features) )
gf = lambda x: [km.kern(x, c) for c in km.prototypes]
feats = np.vstack([gf(x) for x in X])

cv_err = cross_validation(GD, feats, Y, epochs, k_folds, train_size=train_size)
errs.append(cv_err)




plot_data(algs, errs)


: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 


## Learning Rate Adaptation

We will compare the different learning rate algorithms on a subset of the [Susy dataset](https://archive.ics.uci.edu/ml/datasets/SUSY). We will be predicting the first component.

In this experiment we compare constant learning rates and RMSProp with cross validation with `k folds=10` and `train_size=95000`.

In [None]:
susy_dataset.head()

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

In [None]:
algs_lr = ["ConstantLR", "RMSProp"]

np.random.seed(34)

susy_dataset_np = susy_dataset.to_numpy()

# X is all rows, columns 1 to the second-to-last column
X = susy_dataset_np[:, 1:]

# Y is all rows, first
Y = susy_dataset_np[:, 0]

train_size = 95000
k_folds = 10
epochs = 1
batch_size = 512
n_features = 8
learning_rate = 0.01
decay_parameter = 0.9

lr_adapt_problems = [
    GDLearningProblem(
        MiniBatchGD(batch_size),
        LinearModel(n_features, 1),
        ConstantLR(learning_rate),
        MSE()),
    GDLearningProblem(
        MiniBatchGD(batch_size),
        LinearModel(n_features, 1),
        RMSProp(learning_rate, decay_parameter),
        MSE())
]

errs = []
for idx, problems in enumerate(lr_adapt_problems):
    cv_err = cross_validation(problems, X, Y, epochs, k_folds, train_size=train_size)
    errs.append(cv_err)

plot_data(algs_lr, errs)

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 

: 