In [1]:
import time
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split as sklearn_train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.decomposition import NMF
from surprise import SVD, Dataset, Reader
from surprise.model_selection import train_test_split as surprise_train_test_split
from surprise import accuracy

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

In [2]:
class MatrixFactorizationRecommender:
    def __init__(
        self,
        n_factors=50,
        learning_rate=0.005,
        regularization=0.02,
        n_epochs=10,
        min_learning_rate=0.0001,
        patience=5,
        random_state=42,
    ):
        self.n_factors = n_factors
        self.learning_rate = learning_rate
        self.regularization = regularization
        self.n_epochs = n_epochs
        self.min_learning_rate = min_learning_rate
        self.patience = patience
        self.random_state = random_state
        np.random.seed(random_state)

        self.user_factors = None
        self.item_factors = None
        self.user_bias = None
        self.item_bias = None
        self.global_bias = None
        self.n_users = None
        self.n_items = None

        self.train_losses = []
        self.val_losses = []

    def _initialize_parameters(self, n_users, n_items):
        self.n_users = n_users
        self.n_items = n_items
        scale = np.sqrt(2.0 / (n_users + self.n_factors))
        self.user_factors = np.random.normal(0, scale, (n_users, self.n_factors))
        self.item_factors = np.random.normal(0, scale, (n_items, self.n_factors))
        self.user_bias = np.zeros(n_users)
        self.item_bias = np.zeros(n_items)
        self.global_bias = 0.0

    def _validate_indices(self, user_idx, item_idx):
        user_idx = int(user_idx)
        item_idx = int(item_idx)

        if user_idx < 0 or user_idx >= self.n_users:
            raise ValueError(
                f"User index {user_idx} out of bounds [0, {self.n_users - 1}]"
            )
        if item_idx < 0 or item_idx >= self.n_items:
            raise ValueError(
                f"Item index {item_idx} out of bounds [0, {self.n_items - 1}]"
            )

        return user_idx, item_idx

    def _predict_single(self, user_idx, item_idx):
        user_idx, item_idx = self._validate_indices(user_idx, item_idx)
        return (
            self.global_bias
            + self.user_bias[user_idx]
            + self.item_bias[item_idx]
            + np.dot(self.user_factors[user_idx], self.item_factors[item_idx])
        )

    def _compute_loss(self, data):
        predictions = []
        actuals = []
        for u, i, r in data:

            pred = self._predict_single(int(u), int(i))
            predictions.append(pred)
            actuals.append(r)

        if not predictions:
            return float("inf")
        return np.sqrt(mean_squared_error(actuals, predictions))

    def _adaptive_learning_rate(self, epoch):
        return max(self.learning_rate * (0.9**epoch), self.min_learning_rate)

    def fit(self, train_data, val_data=None, verbose=True):
        n_users = int(np.max(train_data[:, 0])) + 1
        n_items = int(np.max(train_data[:, 1])) + 1

        if val_data is not None:
            n_users = max(n_users, int(np.max(val_data[:, 0])) + 1)
            n_items = max(n_items, int(np.max(val_data[:, 1])) + 1)

        self._initialize_parameters(n_users, n_items)
        self.global_bias = np.mean(train_data[:, 2])

        best_val_loss = float("inf")
        patience_counter = 0

        for epoch in range(self.n_epochs):
            np.random.shuffle(train_data)

            current_lr = self._adaptive_learning_rate(epoch)

            epoch_loss = 0.0
            valid_updates = 0

            for user_idx, item_idx, rating in train_data:

                user_idx, item_idx = self._validate_indices(
                    int(user_idx), int(item_idx)
                )

                prediction = self._predict_single(user_idx, item_idx)
                error = rating - prediction
                epoch_loss += error**2
                valid_updates += 1

                self.user_bias[user_idx] += current_lr * (
                    error - self.regularization * self.user_bias[user_idx]
                )
                self.item_bias[item_idx] += current_lr * (
                    error - self.regularization * self.item_bias[item_idx]
                )

                user_factor = self.user_factors[user_idx].copy()
                item_factor = self.item_factors[item_idx].copy()

                self.user_factors[user_idx] += current_lr * (
                    error * item_factor - self.regularization * user_factor
                )
                self.item_factors[item_idx] += current_lr * (
                    error * user_factor - self.regularization * item_factor
                )

            train_loss = np.sqrt(epoch_loss / valid_updates)
            self.train_losses.append(train_loss)

            if val_data is not None:
                val_loss = self._compute_loss(val_data)
                self.val_losses.append(val_loss)

                if verbose:
                    print(
                        f"Epoch {epoch + 1}/{self.n_epochs} - "
                        f"Train RMSE: {train_loss:.4f} - "
                        f"Val RMSE: {val_loss:.4f}"
                    )

                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    patience_counter = 0
                else:
                    patience_counter += 1
                    if patience_counter >= self.patience:
                        if verbose:
                            print(f"Early stopping at epoch {epoch + 1}")
                        break
            elif verbose:
                print(
                    f"Epoch {epoch + 1}/{self.n_epochs} - Train RMSE: {train_loss:.4f}"
                )

        return self

    def predict(self, user_idx, item_idx):
        return self._predict_single(user_idx, item_idx)

    def evaluate(self, test_data):
        predictions = []
        actuals = []

        for u, i, r in test_data:

            pred = self._predict_single(int(u), int(i))
            predictions.append(pred)
            actuals.append(r)

        if not predictions:
            return {"rmse": float("inf"), "mae": float("inf")}

        return {
            "rmse": np.sqrt(mean_squared_error(actuals, predictions)),
            "mae": mean_absolute_error(actuals, predictions),
        }

In [3]:
data = pd.read_csv(
    "../../ml-100k/u.data",
    sep="\t",
    names=["user_id", "item_id", "rating", "timestamp"],
)

print(f"Number of users: {data['user_id'].nunique()}")
print(f"Number of items: {data['item_id'].nunique()}")
print(f"Number of ratings: {len(data)}")

Number of users: 943
Number of items: 1682
Number of ratings: 100000


In [4]:
user_ids = sorted(data["user_id"].unique())
item_ids = sorted(data["item_id"].unique())

user_id_map = {old_id: new_id for new_id, old_id in enumerate(user_ids)}
item_id_map = {old_id: new_id for new_id, old_id in enumerate(item_ids)}

In [5]:
data["user_id"] = data["user_id"].map(user_id_map)
data["item_id"] = data["item_id"].map(item_id_map)

In [6]:
train_data, test_data = sklearn_train_test_split(
    data[["user_id", "item_id", "rating"]].values, test_size=0.2, random_state=42
)

val_data, test_data = sklearn_train_test_split(
    test_data, test_size=0.5, random_state=42
)

In [7]:
print(f"Training set: {len(train_data)} samples")
print(f"Validation set: {len(val_data)} samples")
print(f"Test set: {len(test_data)} samples")

Training set: 80000 samples
Validation set: 10000 samples
Test set: 10000 samples


In [8]:
start_time = time.time()
model = MatrixFactorizationRecommender(
    n_factors=50,
    learning_rate=0.005,
    regularization=0.02,
    n_epochs=10,
    patience=5,
)

model.fit(train_data, val_data, verbose=True)
custom_time = time.time() - start_time

custom_metrics = model.evaluate(test_data)

print(f"Training time: {custom_time:.2f} seconds")
print(f"Test RMSE: {custom_metrics['rmse']:.4f}")
print(f"Test MAE: {custom_metrics['mae']:.4f}")

Epoch 1/10 - Train RMSE: 1.0480 - Val RMSE: 1.0075
Epoch 2/10 - Train RMSE: 0.9840 - Val RMSE: 0.9818
Epoch 3/10 - Train RMSE: 0.9631 - Val RMSE: 0.9707
Epoch 4/10 - Train RMSE: 0.9518 - Val RMSE: 0.9644
Epoch 5/10 - Train RMSE: 0.9445 - Val RMSE: 0.9602
Epoch 6/10 - Train RMSE: 0.9394 - Val RMSE: 0.9576
Epoch 7/10 - Train RMSE: 0.9354 - Val RMSE: 0.9556
Epoch 8/10 - Train RMSE: 0.9323 - Val RMSE: 0.9541
Epoch 9/10 - Train RMSE: 0.9297 - Val RMSE: 0.9530
Epoch 10/10 - Train RMSE: 0.9276 - Val RMSE: 0.9520
Training time: 15.04 seconds
Test RMSE: 0.9430
Test MAE: 0.7466


In [9]:
n_users = len(user_id_map)
n_items = len(item_id_map)
R = np.zeros((n_users, n_items))
for user, item, rating in train_data:
    R[int(user), int(item)] = rating

start_time = time.time()
sklearn_model = NMF(
    n_components=50,
    init="random",
    random_state=42,
    max_iter=10,
)

sklearn_model.fit(R)
sklearn_time = time.time() - start_time

sklearn_preds = sklearn_model.transform(R) @ sklearn_model.components_
test_preds = []
test_actuals = []
for user, item, rating in test_data:
    test_preds.append(sklearn_preds[int(user), int(item)])
    test_actuals.append(rating)

sklearn_rmse = np.sqrt(mean_squared_error(test_actuals, test_preds))
sklearn_mae = mean_absolute_error(test_actuals, test_preds)

print(f"Training time: {sklearn_time:.2f} seconds")
print(f"Test RMSE: {sklearn_rmse:.4f}")
print(f"Test MAE: {sklearn_mae:.4f}")

Training time: 0.15 seconds
Test RMSE: 2.6800
Test MAE: 2.4184


In [10]:
reader = Reader(rating_scale=(1, 5))
surprise_data = Dataset.load_from_df(data[["user_id", "item_id", "rating"]], reader)

trainset, testset = surprise_train_test_split(
    surprise_data, test_size=0.2, random_state=42
)

start_time = time.time()
surprise_model = SVD(
    n_factors=50, n_epochs=10, lr_all=0.005, reg_all=0.02, random_state=42
)

surprise_model.fit(trainset)
surprise_time = time.time() - start_time

surprise_predictions = surprise_model.test(testset)
surprise_rmse = accuracy.rmse(surprise_predictions, verbose=False)
surprise_mae = accuracy.mae(surprise_predictions, verbose=False)

print(f"Training time: {surprise_time:.2f} seconds")
print(f"Test RMSE: {surprise_rmse:.4f}")
print(f"Test MAE: {surprise_mae:.4f}")

Training time: 0.33 seconds
Test RMSE: 0.9465
Test MAE: 0.7500


In [11]:
print(f"{'Metric':<15} {'Custom Model':<15} {'Scikit-learn':<15} {'Surprise':<15}")
print("-" * 60)
print(
    f"{'Training Time':<15} {custom_time:.2f}s{'':<9} {sklearn_time:.2f}s{'':<10} {surprise_time:.2f}s"
)
print(
    f"{'Test RMSE':<15} {custom_metrics['rmse']:.4f}{'':<9} {sklearn_rmse:.4f}{'':<9} {surprise_rmse:.4f}"
)
print(
    f"{'Test MAE':<15} {custom_metrics['mae']:.4f}{'':<9} {sklearn_mae:.4f}{'':<9} {surprise_mae:.4f}"
)

Metric          Custom Model    Scikit-learn    Surprise       
------------------------------------------------------------
Training Time   15.04s          0.15s           0.33s
Test RMSE       0.9430          2.6800          0.9465
Test MAE        0.7466          2.4184          0.7500
