In [1]:
# Cell 1

import os
import csv
import sys
import re
import math
import numpy as np
import random
from collections import defaultdict

import torch
import matplotlib.pyplot as plt

# 멀티스레드 설정
torch.set_num_threads(8)

print("Environment Ready.")

Environment Ready.


In [2]:
# Cell 2

!pip install scikit-surprise > /dev/null

from surprise import Dataset, Reader

class MovieLens:
    def __init__(self, ratings_path, movies_path):
        self.ratingsPath = ratings_path
        self.moviesPath = movies_path
        self.movieID_to_name = {}
        self.name_to_movieID = {}

    def loadMovieLensLatestSmall(self):
        reader = Reader(line_format='user item rating timestamp', sep=',', skip_lines=1, rating_scale=(1,5))
        ratingsDataset = Dataset.load_from_file(self.ratingsPath, reader=reader)

        with open(self.moviesPath, newline='', encoding='ISO-8859-1') as csvfile:
            movieReader = csv.reader(csvfile)
            next(movieReader)
            for row in movieReader:
                movieID = int(row[0])
                movieName = row[1]
                self.movieID_to_name[movieID] = movieName
                self.name_to_movieID[movieName] = movieID

        return ratingsDataset

    def getPopularityRanks(self):
        ratings = defaultdict(int)
        rankings = {}
        with open(self.ratingsPath, newline='', encoding='utf-8') as csvfile:
            ratingReader = csv.reader(csvfile)
            next(ratingReader)
            for row in ratingReader:
                movieID = int(row[1])
                ratings[movieID] += 1
        rank = 1
        for movieID, ratingCount in sorted(ratings.items(), key=lambda x: x[1], reverse=True):
            rankings[movieID] = rank
            rank += 1
        return rankings

    def getGenres(self):
        genres = defaultdict(list)
        genreIDs = {}
        maxGenreID = 0
        with open(self.moviesPath, newline='', encoding='ISO-8859-1') as csvfile:
            movieReader = csv.reader(csvfile)
            next(movieReader)
            for row in movieReader:
                movieID = int(row[0])
                genreList = row[2].split('|')
                genreIDList = []
                for genre in genreList:
                    if genre in genreIDs:
                        genreID = genreIDs[genre]
                    else:
                        genreID = maxGenreID
                        genreIDs[genre] = genreID
                        maxGenreID += 1
                    genreIDList.append(genreID)
                genres[movieID] = genreIDList

        for movieID, genreIDList in genres.items():
            bitfield = [0]*maxGenreID
            for g in genreIDList:
                bitfield[g] = 1
            genres[movieID] = bitfield
        return genres

def surprise_dataset_to_list(surprise_dataset):
    raw_data = surprise_dataset.raw_ratings
    data_list = []
    for (user, item, rating, _) in raw_data:
        data_list.append((int(user), int(item), float(rating)))
    return data_list

def train_test_split(data_list, test_ratio=0.2, seed=42):
    random.Random(seed).shuffle(data_list)
    cutoff = int(len(data_list)*(1 - test_ratio))
    train_data = data_list[:cutoff]
    test_data = data_list[cutoff:]
    return train_data, test_data

# 파일 경로 예시
ratings_path = "/content/ratings.csv"
movies_path = "/content/movies.csv"

ml = MovieLens(ratings_path, movies_path)
surprise_dataset = ml.loadMovieLensLatestSmall()
data_list = surprise_dataset_to_list(surprise_dataset)

train_data, test_data = train_test_split(data_list, test_ratio=0.2, seed=42)
print("Data Loaded.")
print("Train size:", len(train_data), "Test size:", len(test_data))

Data Loaded.
Train size: 80003 Test size: 20001


In [3]:
# Cell 3

import math
import numpy as np
from collections import defaultdict

def rmse(predictions, targets):
    return math.sqrt(np.mean((np.array(predictions) - np.array(targets))**2))

def mae(predictions, targets):
    return np.mean(np.abs(np.array(predictions) - np.array(targets)))

def prune_items_by_popularity(all_items, popularity_ranks, top_k=2000):
    items_with_rank = [(itm, popularity_ranks.get(itm,9999999)) for itm in all_items]
    items_with_rank.sort(key=lambda x: x[1])  # rank 오름차순(1이 가장 인기)
    pruned = [x[0] for x in items_with_rank[:top_k]]
    return set(pruned)

def get_topN_for_all_users(model, all_users, candidate_items, user_items_dict, N=10):
    user_topN = {}
    for u in all_users:
        rated_items = user_items_dict[u]
        cands = [i for i in candidate_items if i not in rated_items]
        scores = []
        for i in cands:
            pred = model.predict(u,i)
            scores.append((i,pred))
        scores.sort(key=lambda x:x[1], reverse=True)
        user_topN[u] = [x[0] for x in scores[:N]]
    return user_topN

def coverage_with_topN(user_topN, all_items):
    recommended = set()
    for u, top_items in user_topN.items():
        for i in top_items:
            recommended.add(i)
    return len(recommended)/len(all_items) if len(all_items)>0 else 0.0

def diversity_with_topN(user_topN, movie_genres):
    distances = []
    for u, top_items in user_topN.items():
        if len(top_items)<2:
            continue
        pair_sum=0.0
        pair_count=0
        for i1 in range(len(top_items)):
            for i2 in range(i1+1,len(top_items)):
                it1 = top_items[i1]
                it2 = top_items[i2]
                g1 = movie_genres.get(it1, [])
                g2 = movie_genres.get(it2, [])
                if len(g1)==len(g2):
                    inter=0
                    union=0
                    for x,y in zip(g1,g2):
                        if x==1 and y==1: inter+=1
                        if x==1 or y==1: union+=1
                    if union>0:
                        jacc_dist = 1 - (inter/union)
                        pair_sum+= jacc_dist
                        pair_count+=1
        if pair_count>0:
            distances.append(pair_sum/pair_count)
    if len(distances)==0:
        return 0.0
    return float(np.mean(distances))

def novelty_with_topN(user_topN, popularity_ranks):
    ranks=[]
    for u, top_items in user_topN.items():
        for i in top_items:
            ranks.append(popularity_ranks.get(i,9999999))
    if len(ranks)==0:
        return 0.0
    return float(np.mean(ranks))

def evaluate_topN_metrics(user_topN, test_data, rating_threshold=4.0, N=10):
    hits_hr = 0
    total_hr = 0
    user_hits_count=defaultdict(int)
    user_above_thr=defaultdict(int)
    sum_ranks=0
    count_ranks=0

    user_topN_rank={}
    for u, items in user_topN.items():
        rank_map={}
        for idx, it in enumerate(items):
            rank_map[it] = idx+1
        user_topN_rank[u]=rank_map

    for (u,i,r) in test_data:
        if r>=rating_threshold:
            total_hr+=1
            if u in user_topN and i in user_topN[u]:
                hits_hr+=1

            user_above_thr[u]+=1
            if u in user_topN_rank and i in user_topN_rank[u]:
                rank_i = user_topN_rank[u][i]
                user_hits_count[u]+=1
            else:
                rank_i = N+1
            sum_ranks += rank_i
            count_ranks+=1

    hr_val = hits_hr/total_hr if total_hr>0 else 0.0
    total_users_chr = sum(1 for x in user_above_thr if user_above_thr[x]>0)
    total_hits_chr = sum(user_hits_count.values())
    chr_val = total_hits_chr/total_users_chr if total_users_chr>0 else 0.0
    ahar_val = sum_ranks/count_ranks if count_ranks>0 else 0.0

    return hr_val, chr_val, ahar_val

In [4]:
# Cell 4

import pandas as pd
from surprise import KNNBasic, SVD, KNNWithMeans, KNNWithZScore, KNNBaseline

from surprise import Dataset, Reader

class ClassicRecommender:
    """
    A wrapper for various Surprise-based classic CF methods:
     - userKNN: user-based KNN
     - itemKNN: item-based KNN
     - svd: SVD
     etc.
    """

    def __init__(self, method='userKNN'):
        self.method = method
        self.algo = None

        if method == 'userKNN':
            sim_options = {'name': 'cosine', 'user_based': True}
            self.algo = KNNBasic(sim_options=sim_options)
        elif method == 'itemKNN':
            sim_options = {'name': 'cosine', 'user_based': False}
            self.algo = KNNBasic(sim_options=sim_options)
        elif method == 'svd':
            # 예시 파라미터
            self.algo = SVD(n_factors=50, n_epochs=20, lr_all=0.005, reg_all=0.02)
        else:
            # default
            self.algo = SVD()

    def train(self, train_data):
        df = pd.DataFrame(train_data, columns=['user','item','rating'])
        df['user'] = df['user'].astype(str)
        df['item'] = df['item'].astype(str)

        reader = Reader(rating_scale=(1,5))
        dataset = Dataset.load_from_df(df, reader=reader)
        trainset = dataset.build_full_trainset()
        self.algo.fit(trainset)

    def predict(self, user, item):
        user_str = str(user)
        item_str = str(item)
        est = self.algo.predict(user_str, item_str).est
        return est

In [5]:
# Cell 5

# 1) 모델 생성
userKNN_model = ClassicRecommender(method='userKNN')

# 2) 학습
userKNN_model.train(train_data)

# 3) 평가
all_users = set([d[0] for d in train_data] + [d[0] for d in test_data])
all_items = set([d[1] for d in train_data] + [d[1] for d in test_data])

user_items_dict = defaultdict(set)
for (u,i,r) in train_data:
    user_items_dict[u].add(i)

ml_obj = MovieLens(ratings_path, movies_path)
popularity_ranks = ml_obj.getPopularityRanks()
movie_genres = ml_obj.getGenres()

# -- RMSE, MAE
preds, trues = [], []
for (u,i,r) in test_data:
    p = userKNN_model.predict(u,i)
    preds.append(p)
    trues.append(r)
userKNN_rmse = rmse(preds,trues)
userKNN_mae  = mae(preds,trues)

# -- Top-N
pruned_items = prune_items_by_popularity(all_items, popularity_ranks, top_k=2000)
user_topN = get_topN_for_all_users(userKNN_model, all_users, pruned_items, user_items_dict, N=10)

hr_val, chr_val, ahar_val = evaluate_topN_metrics(user_topN, test_data, rating_threshold=4.0, N=10)
cov_val = coverage_with_topN(user_topN, all_items)
div_val = diversity_with_topN(user_topN, movie_genres)
nov_val = novelty_with_topN(user_topN, popularity_ranks)

print("\n=== [UserKNN] Evaluation ===")
print(f"RMSE = {userKNN_rmse:.4f}")
print(f"MAE  = {userKNN_mae:.4f}")
print(f"HR   = {hr_val:.4f}")
print(f"cHR  = {chr_val:.4f}")
print(f"AHAR = {ahar_val:.4f}")
print(f"Coverage  = {cov_val:.4f}")
print(f"Diversity = {div_val:.4f}")
print(f"Novelty   = {nov_val:.4f}")

# 4) 특정 사용자에게 추천된 영화 리스트 출력
sample_user = 1  # 예: user=1
if sample_user in user_topN:
    recommended_ids = user_topN[sample_user]
    recommended_titles = [ml_obj.movieID_to_name.get(mid, "Unknown") for mid in recommended_ids]
    print(f"\n[User {sample_user}] Top-10 Recommended Movie IDs:", recommended_ids)
    print(f"[User {sample_user}] Top-10 Recommended Movie Titles:", recommended_titles)
else:
    print(f"\nUser {sample_user} not found in user_topN.")

Computing the cosine similarity matrix...
Done computing similarity matrix.

=== [UserKNN] Evaluation ===
RMSE = 0.9822
MAE  = 0.7610
HR   = 0.0231
cHR  = 0.3698
AHAR = 10.8643
Coverage  = 0.0116
Diversity = 0.7305
Novelty   = 1027.5379

[User 1] Top-10 Recommended Movie IDs: [105844, 7502, 1948, 1147, 1260, 3067, 31410, 905, 306, 1873]
[User 1] Top-10 Recommended Movie Titles: ['Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown']


In [6]:
# Cell 6

itemKNN_model = ClassicRecommender(method='itemKNN')
itemKNN_model.train(train_data)

preds, trues = [], []
for (u,i,r) in test_data:
    p = itemKNN_model.predict(u,i)
    preds.append(p)
    trues.append(r)
itemKNN_rmse = rmse(preds,trues)
itemKNN_mae  = mae(preds,trues)

pruned_items = prune_items_by_popularity(all_items, popularity_ranks, top_k=2000)
user_topN = get_topN_for_all_users(itemKNN_model, all_users, pruned_items, user_items_dict, N=10)

hr_val, chr_val, ahar_val = evaluate_topN_metrics(user_topN, test_data, rating_threshold=4.0, N=10)
cov_val = coverage_with_topN(user_topN, all_items)
div_val = diversity_with_topN(user_topN, movie_genres)
nov_val = novelty_with_topN(user_topN, popularity_ranks)

print("\n=== [ItemKNN] Evaluation ===")
print(f"RMSE = {itemKNN_rmse:.4f}")
print(f"MAE  = {itemKNN_mae:.4f}")
print(f"HR   = {hr_val:.4f}")
print(f"cHR  = {chr_val:.4f}")
print(f"AHAR = {ahar_val:.4f}")
print(f"Coverage  = {cov_val:.4f}")
print(f"Diversity = {div_val:.4f}")
print(f"Novelty   = {nov_val:.4f}")

# 특정 사용자 예시
sample_user = 1
if sample_user in user_topN:
    recommended_ids = user_topN[sample_user]
    recommended_titles = [ml_obj.movieID_to_name.get(mid, "Unknown") for mid in recommended_ids]
    print(f"\n[User {sample_user}] Top-10 Recommended Movie IDs:", recommended_ids)
    print(f"[User {sample_user}] Top-10 Recommended Movie Titles:", recommended_titles)
else:
    print(f"\nUser {sample_user} not found in user_topN.")

Computing the cosine similarity matrix...
Done computing similarity matrix.

=== [ItemKNN] Evaluation ===
RMSE = 0.9856
MAE  = 0.7673
HR   = 0.0097
cHR  = 0.1556
AHAR = 10.9477
Coverage  = 0.1660
Diversity = 0.7955
Novelty   = 1311.8376

[User 1] Top-10 Recommended Movie IDs: [106696, 73, 60126, 105844, 66097, 546, 81932, 95167, 1873, 6867]
[User 1] Top-10 Recommended Movie Titles: ['Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown']


In [7]:
# Cell 7

svd_model = ClassicRecommender(method='svd')
svd_model.train(train_data)

preds, trues = [], []
for (u,i,r) in test_data:
    p = svd_model.predict(u,i)
    preds.append(p)
    trues.append(r)

svd_rmse = rmse(preds,trues)
svd_mae  = mae(preds,trues)

pruned_items = prune_items_by_popularity(all_items, popularity_ranks, top_k=2000)
user_topN = get_topN_for_all_users(svd_model, all_users, pruned_items, user_items_dict, N=10)

hr_val, chr_val, ahar_val = evaluate_topN_metrics(user_topN, test_data, rating_threshold=4.0, N=10)
cov_val = coverage_with_topN(user_topN, all_items)
div_val = diversity_with_topN(user_topN, movie_genres)
nov_val = novelty_with_topN(user_topN, popularity_ranks)

print("\n=== [SVD] Evaluation ===")
print(f"RMSE = {svd_rmse:.4f}")
print(f"MAE  = {svd_mae:.4f}")
print(f"HR   = {hr_val:.4f}")
print(f"cHR  = {chr_val:.4f}")
print(f"AHAR = {ahar_val:.4f}")
print(f"Coverage  = {cov_val:.4f}")
print(f"Diversity = {div_val:.4f}")
print(f"Novelty   = {nov_val:.4f}")

# 특정 사용자 예시
sample_user = 1
if sample_user in user_topN:
    recommended_ids = user_topN[sample_user]
    recommended_titles = [ml_obj.movieID_to_name.get(mid, "Unknown") for mid in recommended_ids]
    print(f"\n[User {sample_user}] Top-10 Recommended Movie IDs:", recommended_ids)
    print(f"[User {sample_user}] Top-10 Recommended Movie Titles:", recommended_titles)
else:
    print(f"\nUser {sample_user} not found in user_topN.")


=== [SVD] Evaluation ===
RMSE = 0.8878
MAE  = 0.6850
HR   = 0.0267
cHR  = 0.4284
AHAR = 10.8354
Coverage  = 0.0246
Diversity = 0.7556
Novelty   = 424.4048

[User 1] Top-10 Recommended Movie IDs: [318, 527, 1228, 858, 898, 1221, 1148, 1203, 912, 608]
[User 1] Top-10 Recommended Movie Titles: ['Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown', 'Unknown']
