In [1]:
import numpy as np

In [2]:
class MatrixFactorization:
    def __init__(self, num_items, num_users, num_factors, learning_rate, regularization_rate, num_iterations):
        """
        Initialize the Matrix Factorization model.

        Args:
            num_items (int): Number of items.
            num_users (int): Number of users.
            num_factors (int): Number of latent factors.
            learning_rate (float): Learning rate for gradient descent.
            regularization_rate (float): Regularization rate for L2 regularization.
            num_iterations (int): Number of iterations for training.
        """
        self.num_items = num_items
        self.num_users = num_users
        self.num_factors = num_factors
        self.learning_rate = learning_rate
        self.regularization_rate = regularization_rate
        self.num_iterations = num_iterations


        # Initialize Q and P matrices with random values
        # Start your code
        #self.P = np.random.normal(
          #  scale=1./self.num_factors, size=(self.num_users, self.num_factors))
        #self.Q = np.random.normal(
         #   scale=1./self.num_factors, size=(self.num_items, self.num_factors))
        self.P = np.random.rand(self.num_users, self.num_factors)
        self.Q = np.random.rand(self.num_items, self.num_factors)
        # End your code

    def sigmoid(self, x):
        """
        Compute the sigmoid function.

        Args:
            x (float): Input value.

        Returns:
            float: Sigmoid value.
        """
        return 1 / (1 + np.exp(-x))

    def update_parameters(self, R):
        """
        Update the parameters Q and P using Stochastic Gradient Descent.

        Args:
            R (ndarray): Rating matrix.
        """
        # Start your code

        self.R = R
        self.samples = [
            (i, j, self.R[i, j])
            for i in range(self.num_users)
            for j in range(self.num_items)
            if self.R[i, j] > 0
        ]
        for n in range(self.num_iterations):
            np.random.shuffle(self.samples)
            for u, i, r in self.samples:
              # Computer prediction and error
              prediction = self.predict_rating(i, u)
              sigmoid_grad = 1 - self.sigmoid(r - prediction)
              # Update user and item latent feature matrices
              self.Q[i, :] -= self.learning_rate * (((-1) * sigmoid_grad * self.P[u, :]) + 2 * self.regularization_rate * self.Q[i, :])
              self.P[u, :] -= self.learning_rate * (((-1) * sigmoid_grad * self.Q[i, :]) + 2 * self.regularization_rate * self.P[u, :])
        # End your code

    def train(self, R):
        """
        Train the Matrix Factorization model.

        Args:
            R (ndarray): Rating matrix.
        """
        self.update_parameters(R)

    def predict_rating(self, i, u):
        """
        Predict the rating for item i and user u.

        Args:
            i (int): Item index.
            u (int): User index.

        Returns:
            float: Predicted rating.
        """
        # Start your code
        return self.Q[i, :].dot(self.P[u, :].T)
        # End your code

    def evaluate(self, users_list, groundTruth_list, topk=10):
        """
        Evaluate trained model for item i and user u

        Args:
            users_list (list): Users indexes list.
            groundTruth_list (list) : list of items in users test set
            topk (int): threshold for top item selection

        Returns:
            float: sum(Intersection between topk predicted items and user profile in test set / user profile size in test set) / len(users_list)
        """
        # Start your code

        num_users = len(users_list)
        total_precision = 0

        for u in users_list:

           ground_truth = groundTruth_list[u]
           predicted_ratings = np.dot(self.Q , self.P[u, :].T)
           sorted_indices = np.argsort(predicted_ratings)[::-1]
           top_items = sorted_indices[:topk]
           intersection = set(top_items).intersection(ground_truth)
           precision = len(intersection) / len(ground_truth)
           total_precision += precision

        average_precision = total_precision / num_users

        return average_precision

        # End your code

In [3]:
def read_data(filename):
    data = []

    with open(filename, 'r') as file:
        for line in file:
            items = line.strip().split(' ')
            user_id = int(items[0])
            item_ids = [int(item) for item in items[1:]]
            data.append((user_id, item_ids))

    return data

In [4]:
from google.colab import drive
import pandas as pd
drive.mount('/content/drive')

path = "/content/drive/My Drive/data-bd.txt"
data = read_data(path)

Mounted at /content/drive


In [5]:
num_items = None
num_users = len(data)
num_factors = 200
regularization_rate = 0.1
num_iterations = 50

In [6]:
item_set = set()

for user_id, item_ids in data:
  item_set.update(item_ids)

num_items = len(item_set)

R = np.zeros((num_users, num_items))
for i, (user_id, item_ids) in enumerate(data):
        for item_id in item_ids:
            R[i, item_id] = 1


In [7]:
import random

R_train = R
user_test_list = random.sample(range(0, num_users), int(num_users*0.4))
for i in user_test_list:
   (user_id, item_ids) = data[i]
   item_test_list = random.sample(item_ids, int(len(item_ids)*0.2))
   for item_id in item_test_list:
            R_train[i, item_id] = 0


In [8]:
R_train.shape

(29858, 40981)

In [9]:
learning_rate_list = [0.0001,0.001,0.015,0.1]

for learning_rate in learning_rate_list:

    model = MatrixFactorization(num_items, num_users, num_factors, learning_rate, regularization_rate,
                            num_iterations)
    model.train(R_train)

    print(f"Learning_rate = {learning_rate}")
    # Test prediction for item 0 and user 0
    item_index = 0
    user_index = 0
    prediction = model.predict_rating(item_index, user_index)
    print(f"Predicted rating for item {item_index} and user {user_index}: {prediction}")

    # Evaluate model for users in test set


    user_indexes = user_test_list
    groudTruths = {}

    for i in user_test_list :
        (user_id, item_ids) = data[i]
        groudTruths[user_id]=item_ids

    result = model.evaluate(user_indexes, groudTruths)
    print(f"Accuracy for model: {result}")
    print("********************************************")

Learning_rate = 0.0001
Predicted rating for item 0 and user 0: 75.71717332965171
Accuracy for model: 0.016721959788646965
********************************************


  return 1 / (1 + np.exp(-x))


Learning_rate = 0.001
Predicted rating for item 0 and user 0: 1296.0185373579154
Accuracy for model: 0.006240015502583984
********************************************
Learning_rate = 0.015
Predicted rating for item 0 and user 0: 1.1173554975690635e+32
Accuracy for model: 0.0064323211414480455
********************************************
Learning_rate = 0.1
Predicted rating for item 0 and user 0: 3.5574048719900194e+235
Accuracy for model: 0.004659644174148825
********************************************
