#### 평점 기반 추천

In [1]:
# -*- coding: utf-8 -*-
"""
간단한 Item-based CF 추천 시스템 (Jupyter Notebook용)
"""

import json
import math
from collections import defaultdict

import numpy as np
import pandas as pd
from tqdm import tqdm


def load_rating(path):
    """평점 데이터를 로드합니다."""
    rows = []
    with open(path, encoding="utf-8") as f:
        for line in f:
            d = json.loads(line)
            rows.append(
                {"user": d["user_id"], "biz": d["business_id"], "stars": d["stars"]}
            )
    return pd.DataFrame(rows)


def leave_one_out(ratings):
    """Leave-One-Out 방식으로 학습/테스트 데이터를 분리합니다."""
    np.random.seed(42)
    train_idx = []
    test = {}  # user->biz

    for u, grp in ratings.groupby("user"):
        idx = grp.index.values
        if len(idx) > 1:
            hold = np.random.choice(idx)
            test[u] = ratings.loc[hold, "biz"]
            train_idx.extend([i for i in idx if i != hold])
        else:
            train_idx.extend(idx)

    return ratings.loc[train_idx].reset_index(drop=True), test


def build_item_user_maps(ratings):
    """아이템-사용자 및 사용자-아이템 맵을 구성합니다."""
    item_users = defaultdict(dict)
    user_items = defaultdict(dict)

    for _, r in ratings.iterrows():
        item_users[r["biz"]][r["user"]] = r["stars"]
        user_items[r["user"]][r["biz"]] = r["stars"]

    return item_users, user_items


def cosine_similarity(a, b):
    """두 벡터 간의 코사인 유사도를 계산합니다."""
    common = set(a) & set(b)
    if not common:
        return 0.0

    num = sum(a[u] * b[u] for u in common)
    den = (
        math.sqrt(sum(v * v for v in a.values()))
        * math.sqrt(sum(v * v for v in b.values()))
        + 1e-9
    )
    return num / den


def precompute_item_sim(item_users):
    """모든 아이템 쌍 간의 유사도를 미리 계산합니다."""
    items = list(item_users)
    sims = defaultdict(dict)

    for i, a in tqdm(enumerate(items), total=len(items), desc="아이템 유사도 계산"):
        for b in items[i + 1:]:
            s = cosine_similarity(item_users[a], item_users[b])
            if s > 0:
                sims[a][b] = s
                sims[b][a] = s

    return sims


def recommend(user, user_items, item_sims, n=5):
    """사용자에게 추천할 상위 N개 아이템을 반환합니다."""
    seen = set(user_items[user].keys())
    scores = defaultdict(float)

    for item, rating in user_items[user].items():
        for similar_item, sim in item_sims.get(item, {}).items():
            if similar_item not in seen:
                scores[similar_item] += sim * rating

    return [item for item, _ in sorted(scores.items(), key=lambda x: -x[1])[:n]]


def precision_at_k(test, recs, k=5):
    return sum(1 for u, gt in test.items() if gt in recs.get(u, [])[:k]) / len(test)


def recall_at_k(test, recs, k=5):
    return precision_at_k(test, recs, k)


def ndcg_at_k(test, recs, k=5):
    total = 0.0
    for u, gt in test.items():
        rec_list = recs.get(u, [])[:k]
        if gt in rec_list:
            idx = rec_list.index(gt)
            total += 1 / math.log2(idx + 2)
    return total / len(test)


# ================================
# Jupyter 환경 실행 코드
# ================================
rating_path = "review_5up_5aspect_3sentiment_vectorized_clean.json"  # 경로를 실제 위치에 맞게 수정
topn = 5
min_ratings = 5

print(f"데이터 로드 중: {rating_path}")
ratings = load_rating(rating_path)

user_counts = ratings.groupby("user").size()
valid_users = user_counts[user_counts >= min_ratings].index
ratings = ratings[ratings["user"].isin(valid_users)]

print(
    f"총 {len(ratings):,}개 평점, {ratings['user'].nunique():,}명 사용자, {ratings['biz'].nunique():,}개 아이템"
)

print("학습/테스트 데이터 분리 중...")
train, test = leave_one_out(ratings)
print(f"학습: {len(train):,}개, 테스트: {len(test):,}개")

print("아이템-사용자 맵 구성 중...")
item_users, user_items = build_item_user_maps(train)

print("아이템 유사도 계산 중...")
item_sims = precompute_item_sim(item_users)

print(f"상위 {topn}개 아이템 추천 중...")
recommendations = {}
for user in tqdm(test.keys()):
    if user in user_items and len(user_items[user]) > 0:
        recommendations[user] = recommend(user, user_items, item_sims, topn)

print("성능 평가 중...")
p5 = precision_at_k(test, recommendations, 5)
r5 = recall_at_k(test, recommendations, 5)
n5 = ndcg_at_k(test, recommendations, 5)

print(f"Precision@5: {p5:.4f}")
print(f"Recall@5: {r5:.4f}")
print(f"NDCG@5: {n5:.4f}")


데이터 로드 중: review_5up_5aspect_3sentiment_vectorized_clean.json
총 451,185개 평점, 28,465명 사용자, 6,832개 아이템
학습/테스트 데이터 분리 중...
학습: 422,720개, 테스트: 28,465개
아이템-사용자 맵 구성 중...
아이템 유사도 계산 중...


아이템 유사도 계산: 100%|██████████| 6828/6828 [03:30<00:00, 32.50it/s]  


상위 5개 아이템 추천 중...


100%|██████████| 28465/28465 [06:04<00:00, 78.00it/s] 

성능 평가 중...
Precision@5: 0.0561
Recall@5: 0.0561
NDCG@5: 0.0370





#### 하이브리드 추천(평점 0.8/ absa0.2)

수정하시려면 precompute_hybrid_sims 부분에 alpha값 조정

In [None]:
import json
import math
from collections import defaultdict

import numpy as np
import pandas as pd
from tqdm import tqdm

# ----- Load data with sentiment vector -----
def load_rating_with_sentiment(path):
    rows = []
    with open(path, encoding="utf-8") as f:
        for line in f:
            d = json.loads(line)
            rows.append({
                "user": d["user_id"],
                "biz": d["business_id"],
                "stars": d["stars"],
                "sentiment_vector": np.array(d["sentiment_vector"])
            })
    return pd.DataFrame(rows)

# ----- Leave-One-Out split -----
def leave_one_out(ratings):
    np.random.seed(42)
    train_idx = []
    test = {}
    for u, grp in ratings.groupby("user"):
        idx = grp.index.values
        if len(idx) > 1:
            hold = np.random.choice(idx)
            test[u] = ratings.loc[hold, "biz"]
            train_idx.extend([i for i in idx if i != hold])
        else:
            train_idx.extend(idx)
    return ratings.loc[train_idx].reset_index(drop=True), test

# ----- Build mappings -----
def build_maps(train_df):
    item_users = defaultdict(dict)
    user_items = defaultdict(dict)
    item_sentiments = defaultdict(list)
    for _, r in train_df.iterrows():
        item_users[r["biz"]][r["user"]] = r["stars"]
        user_items[r["user"]][r["biz"]] = r["stars"]
        item_sentiments[r["biz"]].append(r["sentiment_vector"])
    # 평균 벡터 계산
    item_sentiment_avg = {biz: np.mean(vectors, axis=0) for biz, vectors in item_sentiments.items()}
    return item_users, user_items, item_sentiment_avg

# ----- Cosine similarity -----
def cosine_similarity_dict(a, b):
    common = set(a) & set(b)
    if not common:
        return 0.0
    num = sum(a[u] * b[u] for u in common)
    den = (
        math.sqrt(sum(v * v for v in a.values())) *
        math.sqrt(sum(v * v for v in b.values())) + 1e-9
    )
    return num / den

def cosine_similarity_vec(a, b):
    if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0:
        return 0.0
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

# ----- Precompute hybrid item similarities -----
def precompute_hybrid_sims(item_users, item_sentiment_avg, alpha=0.8):
    items = list(item_users)
    sims = defaultdict(dict)

    for i, a in tqdm(enumerate(items), total=len(items), desc="하이브리드 유사도 계산"):
        for b in items[i + 1:]:
            rating_sim = cosine_similarity_dict(item_users[a], item_users[b])
            sent_sim = cosine_similarity_vec(
                item_sentiment_avg.get(a, np.zeros(15)),
                item_sentiment_avg.get(b, np.zeros(15))
            )
            hybrid_sim = alpha * rating_sim + (1 - alpha) * sent_sim
            if hybrid_sim > 0:
                sims[a][b] = hybrid_sim
                sims[b][a] = hybrid_sim
    return sims

# ----- Recommendation -----
def recommend(user, user_items, item_sims, n=5):
    seen = set(user_items[user].keys())
    scores = defaultdict(float)
    for item, rating in user_items[user].items():
        for similar_item, sim in item_sims.get(item, {}).items():
            if similar_item not in seen:
                scores[similar_item] += sim * rating
    return [item for item, _ in sorted(scores.items(), key=lambda x: -x[1])[:n]]

# ----- Evaluation -----
def precision_at_k(test, recs, k=5):
    return sum(1 for u, gt in test.items() if gt in recs.get(u, [])[:k]) / len(test)

def recall_at_k(test, recs, k=5):
    return precision_at_k(test, recs, k)

def ndcg_at_k(test, recs, k=5):
    total = 0.0
    for u, gt in test.items():
        rec_list = recs.get(u, [])[:k]
        if gt in rec_list:
            idx = rec_list.index(gt)
            total += 1 / math.log2(idx + 2)
    return total / len(test)


# ================================
# 실행 코드 (Jupyter 셀에서 사용)
# ================================
rating_path = "review_5up_5aspect_3sentiment_vectorized_clean.json"
topn = 5
min_ratings = 5
alpha = 0.8  # 평점 기반 가중치

print(f"데이터 로드 중: {rating_path}")
ratings = load_rating_with_sentiment(rating_path)

user_counts = ratings.groupby("user").size()
valid_users = user_counts[user_counts >= min_ratings].index
ratings = ratings[ratings["user"].isin(valid_users)]

print(f"총 {len(ratings):,}개 평점, {ratings['user'].nunique():,}명 사용자, {ratings['biz'].nunique():,}개 아이템")

print("학습/테스트 데이터 분리 중...")
train, test = leave_one_out(ratings)
print(f"학습: {len(train):,}개, 테스트: {len(test):,}개")

print("맵 및 감성 벡터 평균 계산 중...")
item_users, user_items, item_sentiment_avg = build_maps(train)

print("하이브리드 유사도 계산 중...")
item_sims = precompute_hybrid_sims(item_users, item_sentiment_avg, alpha=alpha)

print(f"상위 {topn}개 아이템 추천 중...")
recommendations = {}
for user in tqdm(test.keys()):
    if user in user_items and len(user_items[user]) > 0:
        recommendations[user] = recommend(user, user_items, item_sims, topn)

print("성능 평가 중...")
p5 = precision_at_k(test, recommendations, 5)
r5 = recall_at_k(test, recommendations, 5)
n5 = ndcg_at_k(test, recommendations, 5)

print(f"Precision@5: {p5:.4f}")
print(f"Recall@5: {r5:.4f}")
print(f"NDCG@5: {n5:.4f}")


데이터 로드 중: review_5up_5aspect_3sentiment_vectorized_clean.json
총 451,185개 평점, 28,465명 사용자, 6,832개 아이템
학습/테스트 데이터 분리 중...
학습: 422,720개, 테스트: 28,465개
맵 및 감성 벡터 평균 계산 중...


KeyboardInterrupt: 