# Content-Based Recommender System

Note: similar to the task in Week 9, for this part, you are free to only
use a subset of the metadata file, e.g., only using items that are in the
train/test splits or using the first n characters/words of the text content.
You are also welcome to use additional metadata (e.g., by crawling the
internet). Please report and justify your choice.

In [116]:
import pandas as pd

preprocessed_meta_file = 'preprocessed_metadata.tsv'
df_preprocessed_meta = pd.read_csv(preprocessed_meta_file, sep='\t')

Transform the description column of each item into a TF-IDF represen-
tation or other numerical value, e.g., token-count based, that can represent
the summaries. Select at least one other factor that can be used as an
item feature, for example title. Apply the appropriate preprocessing on
the features. These choices should also be reported and justified.

In [117]:
import pickle
with open('map_asin_id.pickle', 'rb') as handle:
    map_asin_id = pickle.load(handle)

map_id_asin = {v: k for k, v in map_asin_id.items()}

In [118]:
import pandas as pd
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import LabelEncoder
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
import string
from sklearn.feature_extraction.text import TfidfVectorizer

original_vocab_de = set()
for desc in df_preprocessed_meta['description']:
    original_vocab_de.update(word_tokenize(desc.lower()))
print(f"Vocabulary size of description before preprocessing: {len(original_vocab_de)}")

original_vocab_ti = set()
for desc in df_preprocessed_meta['title']:
    original_vocab_ti.update(word_tokenize(desc.lower()))
print(f"Vocabulary size of title before preprocessing: {len(original_vocab_ti)}")

def preprocess_title(text):
    text = text.lower()
    text = text.translate(str.maketrans('', '', string.punctuation))
    tokens = word_tokenize(text)
    stop_words = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stop_words]

    stemmer = PorterStemmer()
    tokens = [stemmer.stem(word) for word in tokens]
    return tokens

df_preprocessed_meta['processed_title'] = df_preprocessed_meta['title'].apply(preprocess_title)
df_preprocessed_meta['processed_description'] = df_preprocessed_meta['description'].apply(preprocess_title)

processed_vocab_de = set()
for tokens in df_preprocessed_meta['processed_description']:
    processed_vocab_de.update(tokens)
print(f"Vocabulary size of descriptipn after preprocessing: {len(processed_vocab_de)}")

processed_vocab_ti = set()
for tokens in df_preprocessed_meta['processed_title']:
    processed_vocab_ti.update(tokens)
print(f"Vocabulary size of title after preprocessing: {len(processed_vocab_ti)}")


Vocabulary size of description before preprocessing: 5996
Vocabulary size of title before preprocessing: 1630
Vocabulary size of descriptipn after preprocessing: 4470
Vocabulary size of title after preprocessing: 1476


In [119]:
df_preprocessed_meta['processed_title'] = df_preprocessed_meta['processed_title'].apply(
    lambda x: ' '.join(x) if isinstance(x, list) else x
)


vectorizer = TfidfVectorizer()
tit_tfidf_matrix = vectorizer.fit_transform(df_preprocessed_meta['processed_title'])
df_preprocessed_meta['processed_description'] = df_preprocessed_meta['processed_description'].apply(
    lambda x: ' '.join(x) if isinstance(x, list) else x
)

des_tfidf_matrix = vectorizer.fit_transform(df_preprocessed_meta['processed_description'])


print(f"description TF-IDF matrix shape: {des_tfidf_matrix.shape}")
des_tfidf_matrix.data
print(f"title TF-IDF matrix shape: {tit_tfidf_matrix.shape}")
tit_tfidf_matrix.data


description TF-IDF matrix shape: (421, 4420)
title TF-IDF matrix shape: (421, 1455)


array([0.12066681, 0.2901492 , 0.16270751, ..., 0.21471708, 0.21471708,
       0.21471708])

In [120]:
# Combine Representations (Concatenation)
from scipy.sparse import hstack
item_feature_matrix = hstack([des_tfidf_matrix, tit_tfidf_matrix])
print(f"item_vectors matrix shape: {item_feature_matrix.shape}")
item_feature_matrix.data

item_vectors matrix shape: (421, 5875)


array([0.01969378, 0.02450674, 0.09562047, ..., 0.21471708, 0.21471708,
       0.21471708])

After you represent each item in a vector space, represent each user in
the same vector space. This can be done by using a simple average of the
items the user rated. Note that you can also try to implement better ways
to build the user representation.

Calculate the user-item rating prediction for an item by using a simi-
larity/distance metric between the user and the item representations. A
metric such as cosine similarity or Euclidean distance can be used. Moti-
vate your choice.

In [121]:
import numpy as np
from typing import Dict, Any, List
from collections import defaultdict
# Load train and test data
train_df = pd.read_pickle("train_dataframe.pkl") 
test_df = pd.read_pickle("test_dataframe.pkl")

# Build user ratings dictionary
def extract_user_ratings(df: pd.DataFrame) -> Dict[str, Dict[str, float]]:

    user_ratings = defaultdict(dict)
    for _, row in df.iterrows():
        user_ratings[row["user_id"]][row["item_id"]] = row["rating"]
    
    return user_ratings

user_ratings = extract_user_ratings(train_df)

def compute_user_mean_ratings(user_ratings: Dict[str, Dict[str, float]]) -> Dict[str, float]:

    user_mean_ratings = {}

    for user_id, ratings in user_ratings.items():
        if ratings:
            mean_rating = sum(ratings.values()) / len(ratings)
            user_mean_ratings[user_id] = mean_rating
        else:
            user_mean_ratings[user_id] = 0.0

    return user_mean_ratings
    
def get_top_k_user_i(predictions: Dict[str, Dict[str, float]], user_id: str, k: int) -> List[Any]:
    top_k = []

    if user_id in predictions:
        sorted_items = sorted(predictions[user_id].items(), key=lambda x: x[1], reverse=True)
        top_k = [(asin, round(rating, 3)) for asin, rating in sorted_items[:k]]  

    return top_k


In [122]:
from sklearn.metrics.pairwise import cosine_similarity
from pprint import pprint
from typing import Dict, Any, List
# Compute User Profile
def compute_user_profile(user_id: str, user_ratings: Dict[str, Dict[str, float]], item_feature_matrix) -> np.ndarray:
    if user_id not in user_ratings:
        return np.zeros(item_feature_matrix.shape[1])  # Return a zero vector for missing users
    # Items rated by the user
    rated_items = user_ratings[user_id]  
    item_vectors = []
    weights = []

    for asin, rating in rated_items.items():
        if asin in map_asin_id:
            item_id = map_asin_id[asin]
            item_vectors.append(item_feature_matrix[item_id].toarray()) 
            weights.append(rating)  # Use rating as weight

    if not item_vectors:
        return np.zeros(item_feature_matrix.shape[1])  # Empty profile

    item_vectors = np.vstack(item_vectors)
    weights = np.array(weights).reshape(-1, 1)
    user_profile = np.sum(item_vectors * weights, axis=0) / np.sum(weights)

    return user_profile  


# Compute predictions
def compute_recommendations(user_ratings, item_feature_matrix, map_id_asin):
    recommendations = {}

    user_mean_ratings = compute_user_mean_ratings(user_ratings)

    for user_id in user_ratings.keys():
        user_profile = compute_user_profile(user_id, user_ratings, item_feature_matrix)

        # Compute cosine similarity
        similarities = cosine_similarity(user_profile.reshape(1, -1), item_feature_matrix).flatten()
        # Get items the user has already rated
        rated_items = set(user_ratings[user_id].keys())
        # excluding rated items
        recommendations[user_id] = {
            map_id_asin[i]: float(np.clip(user_mean_ratings[user_id] + similarities[i] * 4, 1.0, 5.0)) 
            for i in range(len(similarities))
            if map_id_asin[i] not in rated_items  # Exclude rated items
        }

    return recommendations

# Compute cosine similarity between users and items
predictions = compute_recommendations(user_ratings, item_feature_matrix, map_id_asin)


user_id = 'AFIOMYWY7N3H6KHV3EHENDBAN7MA' 
topk_5 = get_top_k_user_i(predictions , user_id, 5 )
print(f"Top-5 recommended items for user '{user_id}':")
pprint(topk_5)

topk_10 = get_top_k_user_i(predictions , user_id, 10 )
print(f"Top-10 recommended items for user '{user_id}':")
pprint(topk_10)

topk_20 = get_top_k_user_i(predictions , user_id, 20 )
print(f"Top-20 recommended items for user '{user_id}':")
pprint(topk_20)

Top-5 recommended items for user 'AFIOMYWY7N3H6KHV3EHENDBAN7MA':
[('B07D5W5X3Z', 3.91),
 ('B07DWLYGKH', 3.459),
 ('B098KXQJVY', 3.444),
 ('B07B16JL73', 3.43),
 ('B07DWY7R2X', 3.415)]
Top-10 recommended items for user 'AFIOMYWY7N3H6KHV3EHENDBAN7MA':
[('B07D5W5X3Z', 3.91),
 ('B07DWLYGKH', 3.459),
 ('B098KXQJVY', 3.444),
 ('B07B16JL73', 3.43),
 ('B07DWY7R2X', 3.415),
 ('B079Y9L1G1', 3.409),
 ('B07V46KRD8', 3.392),
 ('B06XB3FQKB', 3.385),
 ('B01C5TBX68', 3.385),
 ('B0B95V41NR', 3.369)]
Top-20 recommended items for user 'AFIOMYWY7N3H6KHV3EHENDBAN7MA':
[('B07D5W5X3Z', 3.91),
 ('B07DWLYGKH', 3.459),
 ('B098KXQJVY', 3.444),
 ('B07B16JL73', 3.43),
 ('B07DWY7R2X', 3.415),
 ('B079Y9L1G1', 3.409),
 ('B07V46KRD8', 3.392),
 ('B06XB3FQKB', 3.385),
 ('B01C5TBX68', 3.385),
 ('B0B95V41NR', 3.369),
 ('B07B4S63NN', 3.36),
 ('B0B2T1LF1L', 3.358),
 ('B0BZ1XQX97', 3.358),
 ('B0BKZ5F8BS', 3.343),
 ('B0064RTS0G', 3.326),
 ('B09BF8XDF4', 3.31),
 ('B09G5KDVWW', 3.294),
 ('B095XZJ99J', 3.274),
 ('B015QK3GUO', 3.2

Report Precision@10, MAP@10, MRR@10, hit rate and coverage using
ratings ≥4 in the test set. Compare the results with the models from
previous weeks.

In [123]:
from collections import defaultdict

def get_top_k_for_all_users(predictions: Dict[str, Dict[str, float]], k: int) -> defaultdict:
    
    top_k_recommendations = defaultdict(list)

    for user_id, user_predictions in predictions.items():
        sorted_items = sorted(user_predictions.items(), key=lambda x: x[1], reverse=True)
        top_k_items = [(item_id, round(rating, 3)) for item_id, rating in sorted_items[:k]]
        top_k_recommendations[user_id] = top_k_items

    return top_k_recommendations

k = 10
top10_tfidf = get_top_k_for_all_users(predictions, k)



In [124]:
test_df['new_label'] = test_df['rating'].apply(lambda x: 1 if x >= 3 else 0)

In [125]:
import numpy as np
from __future__ import (absolute_import, division, print_function, unicode_literals)
from collections import defaultdict
from surprise import Dataset


def precision_at_k(top_k: Dict[str, List[str]], df_test: pd.DataFrame, k: int) -> Dict[str, float]:
    """Compute precision at k for each user
    Args:
        top_k: A dictionary where keys are user ids (str) and values are lists of (item_id, rating_estimation) tuples.
        df_test: Pandas DataFrame containing user-item ratings in the test split.
        k: The number of recommendations to output for each user.
    Returns:
        A dictionary where keys are user ids (str) and values are P@k (float) for each user.
    """
    
    precisions = defaultdict(float)
    
    # Only consider relevant items (rating ≥ 4.0)
    relevant_items = df_test[df_test['new_label'] == 1].groupby("user_id")["item_id"].apply(set).to_dict()
    
    for user, recommended_items in top_k.items():
        recommended_set = {item for item, _ in recommended_items[:k]}  # Take top-k items
        
        if user in relevant_items:
            num_relevant_at_k = len(recommended_set & relevant_items.get(user, set()))  # Intersection count
            if k > 0:  # Avoid division by zero
                precisions[user] = round(num_relevant_at_k / min(len(recommended_items), k), 3)  # Compute Precision@k

    return precisions



def mean_average_precision(top_k: Dict[str, List[str]], df_test: pd.DataFrame, k: int) -> float:
    """Compute mean average precision (MAP@k)
    Args:
        top_k: A dictionary where keys are user ids (str) and values are lists of (item_id, rating_estimation) tuples.
        df_test: Pandas DataFrame containing user-item ratings in the test split.
        k: The number of recommendations to output for each user.
    Returns:
        MAP@k (float)
    """
    
    average_precision_users = []
    
    # Get relevant items per user
    relevant_items = df_test[df_test['new_label'] == 1].groupby("user_id")["item_id"].apply(set).to_dict()

    for user, recommended_items in top_k.items():
        relevant_set = relevant_items.get(user, set())  # Get relevant items, default to empty set
        
        num_relevant = 0
        precision_sum = 0.0
        
        for i, (item, _) in enumerate(recommended_items[:k]):  # Iterate over top-K items
            if item in relevant_set:
                num_relevant += 1
                precision_sum += num_relevant / (i + 1)  # Precision at each relevant item

        # Avoid division by zero
        avg_precision = precision_sum / min(len(recommended_items), k) if num_relevant > 0 else 0
        average_precision_users.append(avg_precision)

    return np.mean(average_precision_users) if average_precision_users else 0.0


def mean_reciprocal_rank(top_k: Dict[str, List[str]], df_test: pd.DataFrame, k: int) -> float:
    """Compute mean reciprocal rank (MRR@k)
    Args:
        top_k: A dictionary where keys are user ids (str) and values are lists of (item_id, rating_estimation) tuples.
        df_test: Pandas DataFrame containing user-item ratings in the test split.
        k: The number of recommendations to output for each user.
    Returns:
        MRR@k (float)
    """
    
    reciprocal_ranks = []
    
    # Get relevant items per user
    relevant_items = df_test[df_test['new_label'] == 1].groupby("user_id")["item_id"].apply(set).to_dict()

    for user, recommended_items in top_k.items():
        relevant_set = relevant_items.get(user, set())  # Get relevant items, default to empty set
        found_relevant = False
        
        for i, (item, _) in enumerate(recommended_items[:k]):  # Iterate over top-K items
            if item in relevant_set:  # Find first relevant item
                reciprocal_ranks.append(1 / (i + 1))
                found_relevant = True
                break  # Stop after first relevant item

        if not found_relevant:
            reciprocal_ranks.append(0)  # Assign 0 if no relevant item is found

    return np.mean(reciprocal_ranks) if reciprocal_ranks else 0.0



def hit_rate(top_k: Dict[str, List[str]],
             df_test: pd.DataFrame) -> float:
    """Compute the hit rate
    Args:
        top_k: A dictionary where keys are user (raw) ids and values are lists of tuples:
        [(raw item id, rating estimation), ...] of size n (output of get_top_k())
        df_test: Pandas DataFrame containing user-item ratings in 
            the test split.
    Returns:
        The average hit rate
    """

    hits = 0
    # Get relevant items per user
    relevant_items = df_test[df_test['new_label'] == 1].groupby("user_id")["item_id"].apply(set).to_dict()
    total_users = len(df_test[df_test['new_label'] == 1]['user_id'].unique())

    for user, recommended_items in top_k.items():
        recommended_set = {item for item, _ in recommended_items}  # Extract recommended item IDs
        if user in relevant_items:
            if recommended_set & relevant_items[user]:  # Check if there is any intersection
                hits += 1  

    return round(hits / total_users, 3) if total_users > 0 else 0.0



def coverage(top_k: Dict[str, List[str]], df_test: pd.DataFrame, df_train: pd.DataFrame) -> float:
    """
    Compute catalog coverage.

    Args:
        top_k: A dictionary where keys are user (raw) ids and values are lists of tuples:
               [(raw item id, rating estimation), ...] (output of get_top_k()).
        df_test: Pandas DataFrame containing the training data (user-item interactions).

    Returns:
        Coverage as a float (rounded to 3 decimals).
    """
    if not top_k:
        return 0.0  # No recommendations made

    recommended_items = {item for recommendations in top_k.values() for item, _ in recommendations}
    all_items = set(df_train["item_id"].unique()) | set(df_test["item_id"].unique()) 

    coverage_score = len(recommended_items) / len(all_items) if all_items else 0

    return round(coverage_score, 3)  # Round to 3 decimal places

In [126]:
print("Metrics for TF IDF:")
# PRECISION
precisions_nb = precision_at_k(top10_tfidf, test_df, k=10)
print("Averaged P@10: {:.3f}".format(sum(prec for prec in precisions_nb.values()) / len(precisions_nb)))
# MAP 
map_nb = mean_average_precision(top10_tfidf, test_df, k=10)
print("MAP@10: {:.3f}".format(map_nb))
# MRR
mrr_nb = mean_reciprocal_rank(top10_tfidf, test_df, k=10)
print("MRR@10: {:.3f}".format(mrr_nb))
# hit rate
hit_r = hit_rate(top10_tfidf, test_df)
print("Hit rate@10: {:.3f}".format(hit_r))
# coverage
cover = coverage(top10_tfidf, test_df, train_df)
print("Coverage@10: {:.3f}".format(cover))

Metrics for TF IDF:
Averaged P@10: 0.013
MAP@10: 0.002
MRR@10: 0.021
Hit rate@10: 0.132
Coverage@10: 0.633


In [127]:
import pickle

with open('prediction_TFIDF.pickle', 'wb') as file:
    pickle.dump(predictions, file)

### Long tail analysis

In [128]:
from typing import Dict, List, Tuple
import pandas as pd

def hit_rate(top_k: Dict[str, List[tuple]], df_test: pd.DataFrame) -> float:
    hits = 0
    relevant_items = df_test[df_test['new_label'] == 1].groupby("user_id")["item_id"].apply(set).to_dict()
    total_users = len(relevant_items)

    for user, recommended_items in top_k.items():
        if user in relevant_items:
            recommended_set = {item for item, _ in recommended_items}
            if recommended_set & relevant_items[user]:
                hits += 1

    return round(hits / total_users, 3) if total_users > 0 else 0.0

def top_and_last_20_hit_rate(top_k: Dict[str, List[tuple]], df_test: pd.DataFrame, df_train: pd.DataFrame) -> Tuple[float, float]:
    user_interaction_counts = df_train.groupby('user_id').size().sort_values(ascending=False)
    num_users = len(user_interaction_counts)
    top_20_users = set(user_interaction_counts.head(int(0.2 * num_users)).index)
    last_20_users = set(user_interaction_counts.tail(int(0.2 * num_users)).index)

    df_test_top_20 = df_test[df_test['user_id'].isin(top_20_users)]
    df_test_last_20 = df_test[df_test['user_id'].isin(last_20_users)]

    top_20_hr = hit_rate(top_k, df_test_top_20)
    last_20_hr = hit_rate(top_k, df_test_last_20)

    return top_20_hr, last_20_hr

top20_hit_tf, last20_hit_tf= top_and_last_20_hit_rate(top10_tfidf, test_df, train_df)

print("TF-IDF Hit Rate - Top 20%: {:.3f}, Last 20%: {:.3f}".format(top20_hit_tf, last20_hit_tf))

TF-IDF Hit Rate - Top 20%: 0.167, Last 20%: 0.163


In [129]:
from typing import Dict, List, Set, Tuple
def coverage(top_k: Dict[str, List[str]], relevant_items: Set[str]) -> float:

    recommended_items = {item for recs in top_k.values() for item, _ in recs}
    matched = recommended_items & relevant_items
    return round(len(matched)/len(relevant_items), 3) if relevant_items else 0

def get_item_groups(df_train: pd.DataFrame) -> Tuple[Set[str], Set[str]]:

    item_counts = df_train['item_id'].value_counts()
    split_idx = int(len(item_counts) * 0.2)
    return set(item_counts.head(split_idx).index), set(item_counts.tail(split_idx).index)

# Correct usage
top_items, tail_items = get_item_groups(train_df)  # Use actual training data

# Calculate coverage for different groups
def calculate_group_coverage(top_k: Dict[str, List[str]], items: Set[str]) -> float:
    return coverage(top_k, items)

top20_cov_tf = calculate_group_coverage(top10_tfidf, top_items)
last20_cov_tf = calculate_group_coverage(top10_tfidf, tail_items)

print(f"TF-ODF coverage - Top 20%: {top20_cov_tf}, Tail 20%: {last20_cov_tf}")

TF-ODF coverage - Top 20%: 0.782, Tail 20%: 0.564
