In [1]:
# util.models

import dataclasses
import pandas as pd
from typing import Dict, List


@dataclasses.dataclass(frozen=True)
# 추천 시스템의 학습과 평가에 사용하는 데이터셋
class Dataset:
    # 학습용 평갓값 데이터셋
    train: pd.DataFrame
    # 테스트용 평갓값 데이터셋
    test: pd.DataFrame
    # 순위 지표의 테스트 데이터셋. 키는 사용자 ID, 값은 사용자가 높이 평가한 아이템의 ID 리스트
    test_user2items: Dict[int, List[int]]
    # 아이템 콘텐츠 정보
    item_content: pd.DataFrame


@dataclasses.dataclass(frozen=True)
# 추천 시스템 예측 결과
class RecommendResult:
    # 테스트 데이터셋의 예측 평갓값. RMSE 평가
    rating: pd.DataFrame
    # 키는 사용자 ID, 값은 추천 아이템 ID 리스트. 순위 지표 평가.
    user2items: Dict[int, List[int]]


@dataclasses.dataclass(frozen=True)
# 추천 시스템 평가
class Metrics:
    rmse: float
    precision_at_k: float
    recall_at_k: float

    # 평가 결과는 소수 셋째 자리까지만 출력한다
    def __repr__(self):
        return f"rmse={self.rmse:.3f}, Precision@K={self.precision_at_k:.3f}, Recall@K={self.recall_at_k:.3f}"


In [2]:
# util.metric_calculator

import numpy as np
from sklearn.metrics import mean_squared_error
# from util.models import Metrics
from typing import Dict, List


class MetricCalculator:
    def calc(
        self,
        true_rating: List[float],
        pred_rating: List[float],
        true_user2items: Dict[int, List[int]],
        pred_user2items: Dict[int, List[int]],
        k: int,
    ) -> Metrics:
        rmse = self._calc_rmse(true_rating, pred_rating)
        precision_at_k = self._calc_precision_at_k(true_user2items, pred_user2items, k)
        recall_at_k = self._calc_recall_at_k(true_user2items, pred_user2items, k)
        return Metrics(rmse, precision_at_k, recall_at_k)

    def _precision_at_k(self, true_items: List[int], pred_items: List[int], k: int) -> float:
        if k == 0:
            return 0.0

        p_at_k = (len(set(true_items) & set(pred_items[:k]))) / k
        return p_at_k

    def _recall_at_k(self, true_items: List[int], pred_items: List[int], k: int) -> float:
        if len(true_items) == 0 or k == 0:
            return 0.0

        r_at_k = (len(set(true_items) & set(pred_items[:k]))) / len(true_items)
        return r_at_k

    def _calc_rmse(self, true_rating: List[float], pred_rating: List[float]) -> float:
        return np.sqrt(mean_squared_error(true_rating, pred_rating))

    def _calc_recall_at_k(
        self, true_user2items: Dict[int, List[int]], pred_user2items: Dict[int, List[int]], k: int
    ) -> float:
        scores = []
        # テストデータに存在する各ユーザーのrecall@kを計算
        for user_id in true_user2items.keys():
            r_at_k = self._recall_at_k(true_user2items[user_id], pred_user2items[user_id], k)
            scores.append(r_at_k)
        return np.mean(scores)

    def _calc_precision_at_k(
        self, true_user2items: Dict[int, List[int]], pred_user2items: Dict[int, List[int]], k: int
    ) -> float:
        scores = []
        # 테스트 데이터에 존재하는 각 사용자의 precision@k를 계산한다
        for user_id in true_user2items.keys():
            p_at_k = self._precision_at_k(true_user2items[user_id], pred_user2items[user_id], k)
            scores.append(p_at_k)
        return np.mean(scores)


In [3]:
# src.base_recommender

from abc import ABC, abstractmethod
# from util.data_loader import DataLoader
# from util.metric_calculator import MetricCalculator
# from util.models import Dataset, RecommendResult


class BaseRecommender(ABC):
    @abstractmethod
    def recommend(self, dataset: Dataset, **kwargs) -> RecommendResult:
        pass

    def run_sample(self) -> None:
        # Movielens 데이터 취득
        movielens = DataLoader(num_users=1000, num_test_items=5, data_path="../data/ml-10M100K/").load()
        # 추천 계산
        recommend_result = self.recommend(movielens)
        # 추천 결과 평가
        metrics = MetricCalculator().calc(
            movielens.test.rating.tolist(),
            recommend_result.rating.tolist(),
            movielens.test_user2items,
            recommend_result.user2items,
            k=10,
        )
        print(metrics)


In [5]:
# util.data_loader

import pandas as pd  # pandas 라이브러리를 pd 별칭으로 import
import os  # os 라이브러리를 import
# from util.models import Dataset  # util.models 모듈에서 Dataset 클래스 import

class DataLoader:  # DataLoader 클래스 정의
    def __init__(self, num_users: int = 1000, num_test_items: int = 5, data_path: str = "../data/ml-10M100K/"):  # 생성자 정의
        # 클래스 멤버 변수 초기화
        self.num_users = num_users
        self.num_test_items = num_test_items
        self.data_path = data_path

    def load(self) -> Dataset:  # load 메서드 정의
        ratings, movie_content = self._load()  # 데이터를 로드하는 함수 호출
        movielens_train, movielens_test = self._split_data(ratings)  # 데이터를 분할하는 함수 호출
        movielens_test_user2items = (
            movielens_test[movielens_test.rating >= 4].groupby("user_id").agg({"movie_id": list})["movie_id"].to_dict()
        )  # 사용자가 높게 평가한 영화로 구성된 딕셔너리 생성
        return Dataset(movielens_train, movielens_test, movielens_test_user2items, movie_content)  # Dataset 객체 반환

    def _split_data(self, movielens: pd.DataFrame) -> (pd.DataFrame, pd.DataFrame):  # 데이터 분할 메서드
        movielens["rating_order"] = movielens.groupby("user_id")["timestamp"].rank(ascending=False, method="first")  # 각 사용자의 평점 시간순으로 순위 부여
        movielens_train = movielens[movielens["rating_order"] > self.num_test_items]  # 학습 데이터 생성
        movielens_test = movielens[movielens["rating_order"] <= self.num_test_items]  # 테스트 데이터 생성
        return movielens_train, movielens_test  # 데이터 반환

    def _load(self) -> (pd.DataFrame, pd.DataFrame):  # 데이터 로드 메서드
        m_cols = ["movie_id", "title", "genre"]  # 필드 이름 정의
        movies = pd.read_csv(  # movies.dat 파일을 읽어 데이터프레임 생성
            os.path.join(self.data_path, "movies.dat"), names=m_cols, sep="::", encoding="latin-1", engine="python"
        )
        movies["genre"] = movies.genre.apply(lambda x: x.split("|"))  # 장르 정보를 리스트로 변환

        t_cols = ["user_id", "movie_id", "tag", "timestamp"]  # 필드 이름 정의
        user_tagged_movies = pd.read_csv(  # tags.dat 파일을 읽어 데이터프레임 생성
            os.path.join(self.data_path, "tags.dat"), names=t_cols, sep="::", engine="python"
        )
        user_tagged_movies["tag"] = user_tagged_movies["tag"].str.lower()  # 태그를 소문자로 변환
        movie_tags = user_tagged_movies.groupby("movie_id").agg({"tag": list})  # 영화별 태그 정보 생성

        movies = movies.merge(movie_tags, on="movie_id", how="left")  # 영화 정보에 태그 정보 결합

        r_cols = ["user_id", "movie_id", "rating", "timestamp"]  # 필드 이름 정의
        ratings = pd.read_csv(os.path.join(self.data_path, "ratings.dat"), names=r_cols, sep="::", engine="python")  # ratings.dat 파일을 읽어 데이터프레임 생성

        valid_user_ids = sorted(ratings.user_id.unique())[: self.num_users]  # 유효한 사용자 ID 범위 지정
        ratings = ratings[ratings.user_id <= max(valid_user_ids)]  # 유효한 사용자 ID만 사용

        movielens_ratings = ratings.merge(movies, on="movie_id")  # 영화 정보와 평점 정보 결합

        return movielens_ratings, movies  # 데이터 반환

In [6]:
# src.random

# from util.models import RecommendResult, Dataset
# from src.base_recommender import BaseRecommender
from collections import defaultdict
import numpy as np

np.random.seed(0)


class RandomRecommender(BaseRecommender):
    def recommend(self, dataset: Dataset, **kwargs) -> RecommendResult:
        # 사용자 ID와 아이템 ID에 대해 0부터 시작하는 인덱스를 할당한다
        unique_user_ids = sorted(dataset.train.user_id.unique())
        unique_movie_ids = sorted(dataset.train.movie_id.unique())
        user_id2index = dict(zip(unique_user_ids, range(len(unique_user_ids))))
        movie_id2index = dict(zip(unique_movie_ids, range(len(unique_movie_ids))))

        # 사용자 x 아이템의 행렬에서 각 셀의 예측 평갓값은 0.5~5.0의 균등 난수로 한다
        pred_matrix = np.random.uniform(0.5, 5.0, (len(unique_user_ids), len(unique_movie_ids)))

        # rmse 평가용으로 테스트 데이터에 나오는 사용자와 아이템의 예측 평갓값을 저장한다
        movie_rating_predict = dataset.test.copy()
        pred_results = []
        for i, row in dataset.test.iterrows():
            user_id = row["user_id"]
            # 테스트 데이터의 아이템 ID가 학습용으로 등장하지 않는 경우도 난수를 저장한다
            if row["movie_id"] not in movie_id2index:
                pred_results.append(np.random.uniform(0.5, 5.0))
                continue
            # 테스트 데이터에 나타나는 사용자 ID와 아이템 ID의 인덱스를 얻어, 평갓값 행렬값을 얻는다
            user_index = user_id2index[row["user_id"]]
            movie_index = movie_id2index[row["movie_id"]]
            pred_score = pred_matrix[user_index, movie_index]
            pred_results.append(pred_score)
        movie_rating_predict["rating_pred"] = pred_results

        # 순위 평가용 데이터 작성
        # 각 사용자에 대한 추천 영화는, 해당 사용자가 아직 평가하지 않은 영화 중에서 무작위로 10개 작품으로 한다
        # 키는 사용자 ID, 값은 추천 아이템의 ID 리스트
        pred_user2items = defaultdict(list)
        # 사용자가 이미 평가한 영화를 저장한다
        user_evaluated_movies = dataset.train.groupby("user_id").agg({"movie_id": list})["movie_id"].to_dict()
        for user_id in unique_user_ids:
            user_index = user_id2index[user_id]
            movie_indexes = np.argsort(-pred_matrix[user_index, :])
            for movie_index in movie_indexes:
                movie_id = unique_movie_ids[movie_index]
                if movie_id not in user_evaluated_movies[user_id]:
                    pred_user2items[user_id].append(movie_id)
                if len(pred_user2items[user_id]) == 10:
                    break
        return RecommendResult(movie_rating_predict.rating_pred, pred_user2items)


if __name__ == "__main__":
    RandomRecommender().run_sample()


rmse=1.883, Precision@K=0.000, Recall@K=0.001


In [7]:
# 부모 폴더의 경로 추가
import sys; sys.path.insert(0, '..')

from util.data_loader import DataLoader
from util.metric_calculator import MetricCalculator

In [8]:
# Movielens 데이터 로딩
data_loader = DataLoader(num_users=1000, num_test_items=5, data_path='../data/ml-10M100K/')
movielens = data_loader.load()

In [9]:
# 무작위 추천
from src.random import RandomRecommender  # RandomRecommender 클래스 import
recommender = RandomRecommender()  # RandomRecommender 객체 생성
recommend_result = recommender.recommend(movielens)  # RandomRecommender를 통해 무작위 추천 결과 생성

In [10]:
# 평가
metric_calculator = MetricCalculator()  # MetricCalculator 객체 생성
metrics = metric_calculator.calc(  # 메트릭 계산
    movielens.test.rating.tolist(), recommend_result.rating.tolist(),  # 실제 평점과 추천 결과 평점 전달
    movielens.test_user2items, recommend_result.user2items, k=10)  # 테스트 사용자-아이템 딕셔너리 및 추천 결과 사용자-아이템 딕셔너리 전달
print(metrics)  # 계산된 메트릭 출력

rmse=1.883, Precision@K=0.000, Recall@K=0.001
