# CUHK-STAT3009: Homework 1 **(due Oct 9)**

## **Q1: RS Metrics**


### Q1.1: Compute the Root Mean Squared Error (RMSE)

The Root Mean Squared Error (RMSE) is a commonly used metric to evaluate the accuracy of predictions. It is defined as:

$$
\text{RMSE} = \sqrt{\frac{1}{n} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2}
$$

where $y_i$ is the true value and $\hat{y}_i$ is the predicted value.

Given the following arrays `truth` and `pred`, compute the RMSE:

```python
import numpy as np

truth = np.array([3.0, -0.5, 2.0, 7.0])
pred = np.array([2.5, 0.0, 2.0, 8.0])
```

Implement the solution to calculate the RMSE:

In [None]:
import numpy as np
truth = np.array([3.0, -0.5, 2.0, 7.0])
pred = np.array([2.5, 0.0, 2.0, 8.0])

In [None]:
## Your solution here
def rmse(truth, predictions):
    return np.sqrt(np.mean(np.square(truth - predictions)))

print(rmse(truth, pred))

### Q1.2: Define and Test the Mean Absolute Error (MAE) Function

The Mean Absolute Error (MAE) is another popular metric to evaluate the accuracy of predictions. It is defined as:

$$
\text{MAE} = \frac{1}{n} \sum_{i=1}^{n} | y_i - \hat{y}_i |
$$

where \( y_i \) is the true value and \( \hat{y}_i \) is the predicted value.

#### Tasks:
1. Define a function `mae(true_ratings, pred_ratings)` that calculates the MAE given the true ratings and predicted ratings.
2. Test your function using the provided `truth` and `pred` arrays, and print the MAE.

Given arrays:

```python
import numpy as np

truth = np.array([3.0, -0.5, 2.0, 7.0])
pred = np.array([2.5, 0.0, 2.0, 8.0])
```

Define and test the `mae` function:

In [None]:
## Your solution here
def mae(truth, predictions):
    return np.mean(np.abs(truth - predictions))

mae(truth, pred)

## **Q2: Your First Custom sklearn-type RS**

**Task Description**

In this task, you will implement a user-item average based recommender system using the Netflix dataset from the CUHK-STAT3009 GitHub repository.

```python
import numpy as np
import pandas as pd

# Load the Netflix dataset from the CUHK-STAT3009 GitHub repository
# Repository link: https://github.com/statmlben/CUHK-STAT3009/tree/main/dataset/netflix

train = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/test.csv')

# Convert DataFrame to NumPy arrays
```

**New Recommender System - User-Item Average**

Create a custom class `UserItemAverageRS` that inherits from `sklearn.BaseEstimator`. Implement the `fit` method to compute the parameter, and the `predict` method to generate predictions based on the user-item average formula:

$$\widehat{r}_{ui} = \frac{\bar{r}_u + \bar{r}_i}{2}$$

where

$$\bar{r}_u = \frac{1}{|\mathcal{I}_u|} \sum_{i \in \mathcal{I}_u} r_{ui}, \quad \bar{r}_i = \frac{1}{|\mathcal{U}_i|} \sum_{u \in \mathcal{U}_i} r_{ui}$$

**Evaluate the Recommender System**

Fit the custom recommender system to the training data and generate predictions for the test data. Compute and report the Root Mean Squared Error (RMSE) for the predictions.

**Note**: Make sure to follow the sklearn API guidelines for implementing custom estimators.

In [None]:
import pandas as pd

# Load the Netflix dataset from the CUHK-STAT3009 GitHub repository
# Repository link: https://github.com/statmlben/CUHK-STAT3009/tree/main/dataset/netflix

train = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/test.csv')


In [None]:
## Your solution here
from sklearn.base import BaseEstimator
class UserItemAverageRS(BaseEstimator):
    def fit(self, X):
        self.user_average = X.groupby('user_id')['rating'].mean().to_dict()
        self.item_average = X.groupby('movie_id')['rating'].mean().to_dict()
        self.global_average = X['rating'].mean()
        return self
    
    def predict(self, X):
        user_means = X['user_id'].map(self.user_average).fillna(self.global_average)
        item_means = X['movie_id'].map(self.item_average).fillna(self.global_average)
        return ((user_means + item_means) / 2).values
    
model = UserItemAverageRS()
model.fit(train)

y_true = test['rating'].values
y_pred = model.predict(test)

print("RMSE:", round(rmse(y_true, y_pred), 4))

## **Q3: GridSearch CV for Ridge Regression**

**Task Description**

In this question, you will use the California Housing dataset to explore the use of GridSearch CV for hyperparameter tuning in *Ridge Regression* (similar to OLS but with penalty of the L2 norm of linear coefficients).

**Ridge Regression Formula**

Ridge regression is a linear regression technique that adds a regularization term to the cost function to reduce overfitting. The formula for ridge regression is:

$$\hat{y} = \mathbf{w}^T \mathbf{x} + b$$

where:

* $\hat{y}$ is the predicted value
* $\mathbf{w}$ is the weight vector
* $\mathbf{x}$ is the feature vector
* $b$ is the bias term

The cost function for ridge regression is:

$$J(\mathbf{w}, b) = \frac{1}{2} \sum_{i=1}^{n} (y_i - \hat{y}_i)^2 + \frac{\alpha}{2} \|\mathbf{w}\|^2$$

where:

* $y_i$ is the actual value
* $\hat{y}_i$ is the predicted value
* $n$ is the number of samples
* $\alpha$ is the regularization strength (hyperparameter)
* $\|\mathbf{w}\|^2$ is the L2 norm of the weight vector

**Hyperparameter to Tune**

The hyperparameter to tune in ridge regression is $\alpha$, which controls the strength of the regularization. A larger value of $\alpha$ will result in stronger regularization, which can help reduce overfitting but may also lead to underfitting. A smaller value of $\alpha$ will result in weaker regularization, which can improve model performance on the training data but may lead to overfitting.

The goal of hyperparameter tuning is to find the optimal value of $\alpha$ that balances the trade-off between model complexity and goodness of fit.


**Your task** is to find the optimal hyperparameters for Ridge Regression using `GridSearch` CV and evaluate its performance on the test set.

Please use the following code to load the dataset.

```python
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.model_selection import GridSearchCV

train = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/refs/heads/main/dataset/housing/train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/refs/heads/main/dataset/housing/test.csv')

feat_col = ['MedInc', 'HouseAge',
            'AveRooms', 'AveBedrms',
            'Population', 'AveOccup',
            'Latitude', 'Longitude']

target = 'MedHouseVal'

X_train, y_train = train[feat_col].values, train[target].values
X_test, y_test = test[feat_col].values, test[target].values
```


In [None]:
## Your solution here
from sklearn.linear_model import Ridge
from sklearn.model_selection import GridSearchCV

train = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/refs/heads/main/dataset/housing/train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/refs/heads/main/dataset/housing/test.csv')

feat_col = ['MedInc', 'HouseAge',
            'AveRooms', 'AveBedrms',
            'Population', 'AveOccup',
            'Latitude', 'Longitude']

target = 'MedHouseVal'

X_train, y_train = train[feat_col].values, train[target].values
X_test, y_test = test[feat_col].values, test[target].values

param_grid = {'alpha': [0.01, 0.1, 1, 10, 100]}
ridge = Ridge()

grid = GridSearchCV(ridge, param_grid, cv=5, scoring='neg_mean_squared_error')
grid.fit(X_train, y_train)

best_alpha = grid.best_params_['alpha']
best_model = grid.best_estimator_

# --- performance ---
y_pred = best_model.predict(X_test)

print("Best alpha:", best_alpha)
print("Test RMSE:", round(rmse(y_test, y_pred), 4))

## **Q4 (Bonus): Generlized Sequential RS**

**Task Description:**

Design and implement a general `sklearn.BaseEstimator` type class `seqRS` that supports sequential fitting and prediction based on a list of recommender system (RS) methods. Test the class by using `UserMeanRS` and `ItemMeanRS` with custom hps, and report the RMSE for the prediction.

**Motivation:**

As demonstrated in the lecture, we can first fit a `UserMeanRS` model, then fit an `ItemMeanRS` model on the residuals, and so on. This approach can be generalized to a sequence of RS methods.

**Requirements:**

* The `seqRS` class should take a list of RS methods (`RS_list`) as an argument. (Each RS has `fit` and `predict` methods)
* The `fit` method should sequentially fit each RS method in the list to the training data.
* The `predict` method should generate predictions for the test data based on the fitted RS models.

**Example Usage:**
```python
test_seqRS = seqRS(RS_list=[UserMeanRS(n_users, min_data=5), ItemMeanRS(n_items, min_data=3)])

test_seqRS.fit(X_train, y_train)
y_pred = test_seqRS.predict(X_test)
```
**Goal:** Implement the `seqRS` class to support this sequential fitting and prediction workflow.

**Note:** Using following python code to load data:

```python
import numpy as np
import pandas as pd

train = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/test.csv')

## RS data casting with ML format
X_train = train[['user_id', 'movie_id']].values
y_train = train['rating'].values

X_test = test[['user_id', 'movie_id']].values
y_test = test['rating'].values
```

The baseline methods are defined as:

```python
import numpy as np
from sklearn.base import BaseEstimator, RegressorMixin

class UserMeanRS(BaseEstimator, RegressorMixin):
    def __init__(self, n_users, min_data=3):
        self.n_users = n_users
        self.global_mean_ = 0
        self.min_data = min_data
        self.user_means_ = np.zeros(n_users)

    def fit(self, X, y):
        self.global_mean_ = np.mean(y)
        for user in range(self.n_users):
            user_indices = np.where(X[:, 0] == user)[0]
            if len(user_indices) <= self.min_data:
                self.user_means_[user] = self.global_mean_
            else:
                self.user_means_[user] = np.mean(y[user_indices])
        return self

    def predict(self, X):
        user_indices = X[:, 0]
        return self.user_means_[user_indices]

class ItemMeanRS(BaseEstimator, RegressorMixin):
    def __init__(self, n_items, min_data=3):
        self.n_items = n_items
        self.global_mean_ = 0
        self.min_data = min_data
        self.item_means_ = np.zeros(n_items)

    def fit(self, X, y):
        self.global_mean_ = np.mean(y)
        for item in range(self.n_items):
            item_indices = np.where(X[:, 1] == item)[0]
            if len(item_indices) <= self.min_data:
                self.item_means_[item] = self.global_mean_
            else:
                self.item_means_[item] = np.mean(y[item_indices])
        return self

    def predict(self, X):
        item_indices = X[:, 1]
        return self.item_means_[item_indices]
```

In [None]:
## Your solution here
train = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/train.csv')
test = pd.read_csv('https://raw.githubusercontent.com/statmlben/CUHK-STAT3009/main/dataset/netflix/test.csv')

X_train = train[['user_id', 'movie_id']].values
y_train = train['rating'].values

X_test = test[['user_id', 'movie_id']].values
y_test = test['rating'].values

n_users = int(train['user_id'].max() + 1)
n_items = int(train['movie_id'].max() + 1)

from sklearn.base import RegressorMixin

class UserMeanRS(BaseEstimator, RegressorMixin):
    def __init__(self, n_users, min_data=3):
        self.n_users = n_users
        self.global_mean_ = 0
        self.min_data = min_data
        self.user_means_ = np.zeros(n_users)

    def fit(self, X, y):
        self.global_mean_ = np.mean(y)
        for user in range(self.n_users):
            user_indices = np.where(X[:, 0] == user)[0]
            if len(user_indices) <= self.min_data:
                self.user_means_[user] = self.global_mean_
            else:
                self.user_means_[user] = np.mean(y[user_indices])
        return self

    def predict(self, X):
        user_indices = X[:, 0]
        return self.user_means_[user_indices]

class ItemMeanRS(BaseEstimator, RegressorMixin):
    def __init__(self, n_items, min_data=3):
        self.n_items = n_items
        self.global_mean_ = 0
        self.min_data = min_data
        self.item_means_ = np.zeros(n_items)

    def fit(self, X, y):
        self.global_mean_ = np.mean(y)
        for item in range(self.n_items):
            item_indices = np.where(X[:, 1] == item)[0]
            if len(item_indices) <= self.min_data:
                self.item_means_[item] = self.global_mean_
            else:
                self.item_means_[item] = np.mean(y[item_indices])
        return self

    def predict(self, X):
        item_indices = X[:, 1]
        return self.item_means_[item_indices]
    
class seqRS(BaseEstimator, RegressorMixin):
    def __init__(self, RS_list):
        self.RS_list = RS_list

    def fit(self, X, y):
        self.fitted_models= []
        residual = y.copy()
        for RS in self.RS_list:
            RS.fit(X, residual)
            pred = RS.predict(X)
            residual = residual - pred
            self.fitted_models.append(RS)
        return self

    def predict(self, X):
        total_pred = np.zeros(X.shape[0])
        for RS in self.fitted_models:
            total_pred += RS.predict(X)
        return total_pred

In [None]:
test_seqRS = seqRS(RS_list=[
    UserMeanRS(n_users, min_data=5),
    ItemMeanRS(n_items, min_data=3)
])

test_seqRS.fit(X_train, y_train)
y_pred = test_seqRS.predict(X_test)
print("RMSE:", rmse(y_test, y_pred))