In [63]:
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from typing import Tuple , Dict
from sklearn.decomposition import TruncatedSVD

In [64]:
columns = ['user_id', 'item_id', 'rating', 'timestamp']
data = pd.read_csv('ml-100k/u.data',sep='\t',names=columns)

In [65]:
data

Unnamed: 0,user_id,item_id,rating,timestamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596
...,...,...,...,...
99995,880,476,3,880175444
99996,716,204,5,879795543
99997,276,1090,1,874795795
99998,13,225,2,882399156


In [84]:
class MRS:
    def __init__(self , data : pd.DataFrame , user_column : str , movie_column : str , rating_column : str , recommend_count : int = 10 , k_value : int = 5 , mode : str = "user-based") -> None:
        if not all(col in data.columns for col in [user_column, movie_column, rating_column]):
            raise ValueError("One or more specified columns not found in DataFrame")
        self.recommend_count : int = recommend_count
        self.k_value : int = k_value
        self.mode = mode
        if self.mode == "user-based":
            self.user_item_matrix = data.pivot_table(index=user_column , columns=movie_column , values=rating_column)
        elif self.mode == "item-based":
            self.user_item_matrix = data.pivot_table(index=movie_column , columns=user_column , values=rating_column)
        else:
            raise ValueError("mode must be 'user-based' or 'item-based'")
        self.__predicted = self.user_item_matrix.copy()
        self.__predicted.fillna(0 , inplace=True)
        self.__similarity = cosine_similarity(self.__predicted)
        self.__similarity = pd.DataFrame(self.__similarity,index= self.__predicted.index, columns= self.__predicted.index)

    def fit_svd(self, n_components: int = 20):
        matrix = self.user_item_matrix.fillna(0).values
        svd = TruncatedSVD(n_components=n_components, random_state=42)
        U = svd.fit_transform(matrix)
        Sigma = svd.singular_values_
        VT = svd.components_
        reconstructed = np.dot(U, VT)
    
        self.__predicted_svd = pd.DataFrame(
            reconstructed,
            index=self.user_item_matrix.index,
            columns=self.user_item_matrix.columns
        )
    
        print(f"SVD fitted with {n_components} latent factors.")

    
    def __knn(self ,user_id : int) -> Dict[str, np.ndarray]:
        user_sim = self.__similarity.loc[user_id]
        sorted_scores = user_sim.sort_values(ascending=False)
        sorted_scores = sorted_scores.iloc[1:self.k_value+1]  
        return {'index': sorted_scores.index.to_numpy(),
                'values': sorted_scores.values}
    
    def __predict(self , user_id: int, col: int) -> float:
        if self.mode == "user-based":
            if pd.isna(self.user_item_matrix.loc[user_id, col]):
                numerator = 0
                denominator = 0
                nn = self.__knn(user_id)
                for idx,value in enumerate(nn['index']):
                    if not pd.isna(self.user_item_matrix.loc[value, col]):
                        numerator += self.user_item_matrix.loc[value, col] * nn['values'][idx]
                        denominator += nn['values'][idx]
                if denominator > 0:
                    return numerator / denominator
                else:
                    return 0
            else:
                return self.user_item_matrix.loc[user_id, col]

        elif self.mode == 'item-based':
            if pd.isna(self.user_item_matrix.loc[col, user_id]):
                numerator = 0
                denominator = 0
                nn = self.__knn(col)  
                for idx, value in enumerate(nn['index']):
                    if not pd.isna(self.user_item_matrix.loc[value, user_id]):
                        numerator += self.user_item_matrix.loc[value, user_id] * nn['values'][idx]
                        denominator += nn['values'][idx]
                return numerator / denominator if denominator > 0 else 0
            else:
                return self.user_item_matrix.loc[col, user_id]

    def recommend(self , user_id: int) -> Tuple[pd.Index, np.ndarray]:
        if self.mode == "user-based":
            if user_id not in self.user_item_matrix.index:
                raise ValueError(f"User {user_id} not found in data")
            unrated_movies = self.user_item_matrix.loc[user_id][self.user_item_matrix.loc[user_id].isna()].index
            for movie in unrated_movies:
                self.__predicted.loc[user_id,movie] = self.__predict(user_id, movie)
            top_movies = self.__predicted.loc[user_id, unrated_movies].sort_values(ascending=False).head(self.recommend_count)
            return top_movies.index, top_movies.values
            
        elif self.mode == 'item-based':
            if user_id not in self.user_item_matrix.columns:
                raise ValueError(f"User {user_id} not found in data")
            unrated_movies = self.user_item_matrix[self.user_item_matrix[user_id].isna()].index
            for movie in unrated_movies:
                self.__predicted.loc[movie, user_id] = self.__predict(user_id, movie)
            top_movies = self.__predicted.loc[unrated_movies, user_id].sort_values(ascending=False).head(self.recommend_count)
            return top_movies.index, top_movies.values
            
    def recommend_svd(self, user_id: int, top_n: int = 10):
        if not hasattr(self, '_MRS__predicted_svd'):
            raise ValueError("SVD model not fitted. Call fit_svd() first.")
        user_ratings = self.user_item_matrix.loc[user_id]
        unseen = user_ratings[user_ratings.isna()].index
        preds = self.__predicted_svd.loc[user_id, unseen]
        top_movies = preds.sort_values(ascending=False).head(top_n)
        return top_movies.index, top_movies.values

    def evaluate_svd(self, test_df: pd.DataFrame, n_components: int = 20, k: int = 10) -> float:
        hits = 0
        total = 0
        self.fit_svd(n_components=n_components)
        for user_id, group in test_df.groupby('user_id'):
            if user_id not in self.user_item_matrix.index:
                continue
            recommended, _ = self.recommend_svd(user_id, top_n=k)
            relevant = group[group['rating'] >= 3]['item_id'].values
            hits += len(set(recommended).intersection(set(relevant)))
            total += len(recommended)
        return hits / total if total > 0 else 0

    def evaluate(self, test_data: pd.DataFrame, k: int = 10) -> float:
        precisions = []
        for user_id in test_data['user_id'].unique():
            if user_id not in self.user_item_matrix.index:
                continue
            recommended_items, _ = self.recommend(user_id)
            actual_items = test_data[test_data['user_id'] == user_id]['item_id'].values
            hits = len(set(recommended_items[:k]).intersection(set(actual_items)))
            precisions.append(hits / k)
        if len(precisions) == 0:
            return 0.0
        return sum(precisions) / len(precisions)


In [33]:
mrs = MRS(data,user_column='user_id',movie_column='item_id',rating_column='rating')

In [36]:
mrs.recommend(1)

(Index([943, 512, 527, 302, 640], dtype='int64', name='item_id'),
 array([5., 5., 5., 5., 5.]))

In [58]:
train = pd.read_csv('ml-100k/u1.base',sep='\t',names=columns)
test = pd.read_csv('ml-100k/u1.test',sep='\t',names=columns)

In [59]:
mrs = MRS(data = train,user_column='user_id',movie_column='item_id',rating_column='rating',mode = "item-based",recommend_count=10,k_value=1)

In [60]:
mrs.evaluate(test)

0.23028322440087146

In [61]:
mrs = MRS(data = train,user_column='user_id',movie_column='item_id',rating_column='rating',mode = "user-based",recommend_count=10,k_value=1)

In [62]:
mrs.evaluate(test)

0.2586056644880174

In [112]:
mrs = MRS(data = train,user_column='user_id',movie_column='item_id',rating_column='rating',mode = "user-based",recommend_count=10,k_value=1)

In [113]:
mrs.fit_svd(n_components=40)
mrs.recommend_svd(1,top_n = 5)

SVD fitted with 40 latent factors.


(Index([174, 183, 423, 318, 100], dtype='int64', name='item_id'),
 array([2.95667371, 2.82274834, 2.64457909, 2.54274636, 2.33055542]))

In [114]:
mrs.evaluate_svd(test, k = 5)

SVD fitted with 20 latent factors.


0.5298474945533769