In [3]:
import pandas as pd

ratings = pd.read_csv('../datasets/ml-latest-small/ratings.csv')

ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [4]:
from surprise import Dataset, Reader
from surprise.model_selection import train_test_split

reader = Reader(rating_scale=(1,5))
data = Dataset.load_from_df(
    ratings[['userId','movieId','rating']],
    reader
)

trainset, testset = train_test_split(
    data, 
    test_size= 0.2,
    random_state= 42
)

In [5]:
from surprise import SVD

model = SVD(
    n_factors = 100,
    n_epochs = 100,
    lr_all = 0.005,
    reg_all = 0.02
)
model.fit(trainset)

<surprise.prediction_algorithms.matrix_factorization.SVD at 0x164815430>

In [6]:
from collections import defaultdict
train_user_items = defaultdict(set)

for uid, iid, rating in trainset.build_testset():
    train_user_items[uid].add(iid)

In [7]:
all_items = set(ratings['movieId'].unique())

In [8]:
def generate_top_k(model, trainset, k=10):
    top_k = defaultdict(list)

    for uid in trainset.all_users():
        raw_uid = trainset.to_raw_uid(uid)

        rated_items = {
            trainset.to_raw_iid(iid)
            for iid, _ in trainset.ur[uid]
        }

        for iid in all_items:
            if iid not in rated_items:
                pred = model.predict(raw_uid, iid)
                top_k[raw_uid].append((iid, pred.est))

        top_k[raw_uid].sort(key=lambda x: x[1], reverse=True)
        top_k[raw_uid] = top_k[raw_uid][:k]

    return top_k

In [9]:
K = 10
top_k_recommendations = generate_top_k(model, trainset, k=K)

In [10]:
top_k_recommendations[1]

[(260, 5),
 (306, 5),
 (318, 5),
 (356, 5),
 (474, 5),
 (497, 5),
 (527, 5),
 (670, 5),
 (750, 5),
 (66297, 5)]

In [11]:
from surprise import accuracy
predictions = model.test(testset)
rmse = accuracy.rmse(predictions)

RMSE: 0.8915


In [12]:
from collections import defaultdict

test_user_items = defaultdict(set)

for uid, iid, rating in testset:
    if rating >= 3:  # relevance threshold
        test_user_items[uid].add(iid)

In [13]:
def precision_at_k(top_k, ground_truth, k):
    precisions = []

    for user in top_k:
        if user not in ground_truth:
            continue

        recommended_items = [iid for iid, _ in top_k[user][:k]]
        relevant_items = ground_truth[user]

        hits = len(set(recommended_items) & relevant_items)
        precisions.append(hits / k)

    return sum(precisions) / len(precisions)

In [14]:
def recall_at_k(top_k, ground_truth, k):
    recalls = []

    for user in top_k:
        if user not in ground_truth or len(ground_truth[user]) == 0:
            continue

        recommended_items = [iid for iid, _ in top_k[user][:k]]
        relevant_items = ground_truth[user]

        hits = len(set(recommended_items) & relevant_items)
        recalls.append(hits / len(relevant_items))

    return sum(recalls) / len(recalls)

In [15]:
import math

def ndcg_at_k(top_k, ground_truth, k):
    ndcgs = []

    for user in top_k:
        if user not in ground_truth:
            continue

        dcg = 0.0
        recommended_items = [iid for iid, _ in top_k[user][:k]]

        for i, item in enumerate(recommended_items):
            if item in ground_truth[user]:
                dcg += 1 / math.log2(i + 2)

        ideal_hits = min(len(ground_truth[user]), k)
        idcg = sum(1 / math.log2(i + 2) for i in range(ideal_hits))

        if idcg > 0:
            ndcgs.append(dcg / idcg)

    return sum(ndcgs) / len(ndcgs)

In [16]:
K = 10

precision = precision_at_k(top_k_recommendations, test_user_items, K)
recall = recall_at_k(top_k_recommendations, test_user_items, K)
ndcg = ndcg_at_k(top_k_recommendations, test_user_items, K)

print(f"Precision@{K}: {precision:.4f}")
print(f"Recall@{K}: {recall:.4f}")
print(f"NDCG@{K}: {ndcg:.4f}")

Precision@10: 0.0509
Recall@10: 0.0277
NDCG@10: 0.0643
