In [None]:
import pandas as pd
import numpy as np
from math import sqrt

from sklearn.decomposition import NMF
from sklearn.preprocessing import MinMaxScaler, normalize
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score, mean_squared_error, mean_absolute_error

import warnings
warnings.filterwarnings('ignore')

In [None]:
def load_data(ratings_file_path, features_file_path):
    """
    Loads and returns datasets for ratings and movie features from specified file paths.

    Parameters
    ----------
    ratings_file_path : str
        Path to the CSV file containing user ratings with columns: userId, movieId, and rating.
    features_file_path : str
        Path to the CSV file containing movie features.

    Returns
    -------
    tuple of pd.DataFrame
        Returns two pandas DataFrames: one for ratings and another for movie features.

    """
    dtype_ratings = {
        'userId': 'int32',
        'movieId': 'int32',
        'rating': 'float32'
    }
    dtype_features = {
        'movieId': 'int32'
    }

    df_ratings = pd.read_csv(ratings_file_path, dtype=dtype_ratings)
    df_features = pd.read_csv(features_file_path, dtype=dtype_features)
    
    return df_ratings, df_features

In [None]:
traindf, df_movies = load_data('./datasets/training_data.csv', './datasets/movies.csv')

In [None]:
# Ensure 'movieId' columns are of type int
traindf['movieId'] = traindf['movieId'].astype('int')
df_movies['movieId'] = df_movies['movieId'].astype('int')

In [None]:
# Align movie features to training data
df_movies_aligned = df_movies[df_movies['movieId'].isin(traindf['movieId'].unique())]
df_movies_aligned = df_movies_aligned.dropna(subset=['genres'])

In [None]:
# Convert user and movie IDs to categorical types
user_categories = pd.Categorical(traindf['userId'])
item_categories = pd.Categorical(traindf['movieId'])

In [None]:
# Convert user and item IDs to numerical codes
user_ids = user_categories.codes
item_ids = item_categories.codes

In [None]:
# Create rating matrix
rating_matrix = np.zeros((user_categories.categories.size, item_categories.categories.size))
rating_matrix[user_ids, item_ids] = traindf['rating']

In [None]:
# Scale the rating matrix
scaler = MinMaxScaler(feature_range=(0.5, 5))
rating_matrix_scaled = scaler.fit_transform(rating_matrix)

In [None]:
# Repeat alignment of movie features to training data
df_movies_aligned = df_movies[df_movies['movieId'].isin(traindf['movieId'].unique())]
df_movies_aligned = df_movies_aligned.dropna(subset=['genres'])

In [None]:
# Vectorize movie genres using TF-IDF
vectorizer = TfidfVectorizer(max_features=100)
tags_features = vectorizer.fit_transform(df_movies_aligned['genres'].fillna(''))

In [None]:
# Calculate user weights based on rating counts
user_counts = traindf['userId'].value_counts()
user_weights = user_counts / user_counts.max()

# Adjust scaled rating matrix by user weights
for idx, row in traindf.iterrows():
    user_idx = user_categories.categories.get_loc(row['userId'])
    item_idx = item_categories.categories.get_loc(row['movieId'])
    user_id = row['userId']
    rating_matrix_scaled[user_idx, item_idx] *= user_weights.loc[user_id]

In [None]:
importance_of_genre = 0.5
content_weighted_features = tags_features.multiply(importance_of_genre).toarray()
full_features_matrix = np.hstack([rating_matrix_scaled.T, content_weighted_features]).T

In [None]:
# NMF with importance_of_genre = 0.5
model = NMF(n_components=15, init='nndsvd', max_iter=30, random_state=42)
W = model.fit_transform(full_features_matrix)
H = model.components_

In [None]:
def get_top_n_recommendations(user_id, n, W, H):
    """
    Generates top N movie recommendations for a given user based on NMF model predictions.

    Parameters
    ----------
    user_id : int
        The user ID for whom recommendations are to be made.
    n : int
        Number of top recommendations to generate.
    W : np.array
        User feature matrix from NMF.
    H : np.array
        Item feature matrix from NMF.

    Returns
    -------
    pd.DataFrame
        DataFrame containing top N recommended movies with columns: movieId, title, and genres.
    """
    if user_id not in user_categories.categories:
        return pd.DataFrame()

    # Predict ratings for the user
    user_idx = user_categories.categories.get_loc(user_id)
    predicted_ratings = np.dot(W[user_idx, :], H)

    # Get top N movie recommendations
    top_n_indices = np.argsort(predicted_ratings)[-n:]
    top_n_movie_ids = item_categories.categories[top_n_indices]

    return df_movies[df_movies['movieId'].isin(top_n_movie_ids)][['movieId', 'title', 'genres']]

In [None]:
recommended_movies = get_top_n_recommendations(45, 20, W, H)
recommended_movies

In [None]:
def load_test_data(test_file_path):
    """
    Loads test dataset and aligns user and movie IDs with the training dataset categories.

    Parameters
    ----------
    test_file_path : str
        Path to the CSV file containing the test dataset.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the test dataset with user and movie IDs aligned to training categories.
    """
    df_test = pd.read_csv(test_file_path)
    
    df_test['userId'] = pd.Categorical(df_test['userId'], categories=user_categories.categories)
    df_test['movieId'] = pd.Categorical(df_test['movieId'], categories=item_categories.categories)
    
    return df_test

In [None]:
testdf = load_test_data('./datasets/testing_data.csv')
test_rating_matrix = np.zeros((user_categories.categories.size, item_categories.categories.size))

test_user_ids = testdf['userId'].cat.codes
test_item_ids = testdf['movieId'].cat.codes

test_rating_matrix[test_user_ids, test_item_ids] = testdf['rating']

predicted_test_ratings = np.dot(W, H)
test_predicted_ratings = predicted_test_ratings[test_user_ids, test_item_ids]

In [None]:
mse_test = mean_squared_error(testdf['rating'], test_predicted_ratings)
rmse_test = sqrt(mse_test)
print("Test MSE:", mse_test)
print("Test RMSE:", rmse_test)

In [None]:
def calculate_precision_recall_f1(testdf, predicted_ratings, k=10):
    """
    Calculates precision, recall, and F1-score for top-k recommendations.

    Parameters
    ----------
    testdf : pd.DataFrame
        DataFrame containing the test dataset.
    predicted_ratings : np.array
        Array of predicted ratings.
    k : int, optional
        Number of top recommendations to consider, default is 10.

    Returns
    -------
    tuple
        Precision, recall, and F1-score.
    """
    testdf['predicted_rating'] = predicted_ratings
    testdf_sorted = testdf.sort_values(by='predicted_rating', ascending=False)
    top_k_recommendations = testdf_sorted.groupby('userId').head(k)

    true_positives = (top_k_recommendations['rating'] >= 4).sum()
    precision = true_positives / (k * testdf['userId'].nunique())
    recall = true_positives / (testdf['rating'] >= 4).sum()

    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    return precision, recall, f1

precision, recall, f1 = calculate_precision_recall_f1(testdf, test_predicted_ratings)
print(f"Precision: {precision}, Recall: {recall}, F1-Score: {f1}")

In [None]:
mae_test = mean_absolute_error(testdf['rating'], test_predicted_ratings)
print("Test MAE:", mae_test)

In [None]:
def calculate_coverage(top_n_recommendations, all_items):
    """
    Calculates the coverage of recommendations.

    Parameters
    ----------
    top_n_recommendations : pd.DataFrame
        DataFrame containing the top N recommended movies.
    all_items : pd.DataFrame
        DataFrame containing all items.

    Returns
    -------
    float
        Coverage value.
    """
    recommended_items = set(top_n_recommendations['movieId'].unique())
    all_items_set = set(all_items['movieId'].unique())
    coverage = len(recommended_items) / len(all_items_set)
    return coverage

coverage = calculate_coverage(recommended_movies, df_movies)
print("Coverage:", coverage)

In [None]:
def calculate_hit_rate(testdf, predicted_ratings, k=10):
    """
    Calculates the hit rate of top-k recommendations.

    Parameters
    ----------
    testdf : pd.DataFrame
        DataFrame containing the test dataset.
    predicted_ratings : np.array
        Array of predicted ratings.
    k : int, optional
        Number of top recommendations to consider, default is 10.

    Returns
    -------
    float
        Hit rate value.
    """
    testdf['predicted_rating'] = predicted_ratings
    testdf_sorted = testdf.sort_values(by='predicted_rating', ascending=False)
    top_k_recommendations = testdf_sorted.groupby('userId').head(k)

    hits = top_k_recommendations[top_k_recommendations['rating'] >= 4].groupby('userId').size()
    hit_rate = hits.sum() / testdf['userId'].nunique()
    return hit_rate

hit_rate = calculate_hit_rate(testdf, test_predicted_ratings)
print("Hit Rate:", hit_rate)

In [None]:
def precision_at_k(testdf, predicted_ratings, k=10):
    """
    Calculates precision at top-k recommendations.

    Parameters
    ----------
    testdf : pd.DataFrame
        DataFrame containing the test dataset.
    predicted_ratings : np.array
        Array of predicted ratings.
    k : int, optional
        Number of top recommendations to consider, default is 10.

    Returns
    -------
    float
        Precision at top-k value.
    """
    testdf['predicted_rating'] = predicted_ratings
    testdf_sorted = testdf.sort_values(by='predicted_rating', ascending=False)
    top_k_recommendations = testdf_sorted.groupby('userId').head(k)

    precision_at_k = (top_k_recommendations['rating'] >= 4).sum() / (k * testdf['userId'].nunique())
    return precision_at_k

def recall_at_k(testdf, predicted_ratings, k=10):
    """
    Calculates recall at top-k recommendations.

    Parameters
    ----------
    testdf : pd.DataFrame
        DataFrame containing the test dataset.
    predicted_ratings : np.array
        Array of predicted ratings.
    k : int, optional
        Number of top recommendations to consider, default is 10.

    Returns
    -------
    float
        Recall at top-k value.
    """
    testdf['predicted_rating'] = predicted_ratings
    testdf_sorted = testdf.sort_values(by='predicted_rating', ascending=False)
    top_k_recommendations = testdf_sorted.groupby('userId').head(k)

    recall_at_k = (top_k_recommendations['rating'] >= 4).sum() / (testdf['rating'] >= 4).sum()
    return recall_at_k

precision_k = precision_at_k(testdf, test_predicted_ratings, k=10)
recall_k = recall_at_k(testdf, test_predicted_ratings, k=10)
print(f"Precision@10: {precision_k}, Recall@10: {recall_k}")