# A버전 LightGCN (implicit)
- rating 무시, 관측된 user-item을 positive edge로 간주
- 유저별 8/1/1 스플릿, popularity-aware 네거티브 샘플링
- 손실: BPR 기본, 필요 시 BCE로 교체 가능
- 평가지표: Recall@K, NDCG@K, Hit@K


In [None]:
# 환경 준비
import sys
from pathlib import Path
sys.path.append('..')  # 상위 디렉토리 import 허용

import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

from common import (
    load_interactions,
    encode_ids,
    split_userwise,
    build_user_pos_dict,
    item_popularity_weights,
    sample_negatives_popular,
)

print(torch.__version__)


## 설정

In [None]:
DATA_PATH = Path('../data/train.csv')
MODEL_DIR = Path('../codex_models')
MODEL_DIR.mkdir(parents=True, exist_ok=True)

SEED = 42
TRAIN_RATIO = 0.8
VAL_RATIO = 0.1

EMBED_DIM = 64
N_LAYERS = 3  # LightGCN propagation depth
LR = 1e-3
BATCH_SIZE = 1024
EPOCHS = 10  # 필요에 따라 조정
POP_RATIO = 0.7  # 인기 분포로 뽑는 확률
N_NEG = 1  # pos당 negative 샘플 수
K_EVAL = 10  # Recall@K, NDCG@K
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

rng = np.random.default_rng(SEED)


## 데이터 로드 및 인덱싱

In [None]:
df_raw = load_interactions(DATA_PATH)
print(df_raw.head())

df, user2idx, item2idx = encode_ids(df_raw)
print(f"users={len(user2idx)}, items={len(item2idx)}, interactions={len(df)}")


## 유저별 스플릿

In [None]:
train_df, val_df, test_df = split_userwise(
    df, train_ratio=TRAIN_RATIO, val_ratio=VAL_RATIO, seed=SEED
)
print(len(train_df), len(val_df), len(test_df))


## 유틸: 그래프/샘플러/지표

In [None]:
def make_normalized_adj(num_users: int, num_items: int, edges: pd.DataFrame):
    """LightGCN용 정규화 인접행렬(symmetric) 생성."""
    import scipy.sparse as sp

    rows = edges['user_idx'].to_numpy()
    cols = edges['item_idx'].to_numpy() + num_users  # item 인덱스 shift
    data = np.ones(len(edges), dtype=np.float32)

    n_nodes = num_users + num_items
    mat = sp.coo_matrix((data, (rows, cols)), shape=(n_nodes, n_nodes))
    mat = mat + mat.T

    deg = np.array(mat.sum(axis=1)).flatten()
    deg_inv_sqrt = np.power(deg, -0.5)
    deg_inv_sqrt[np.isinf(deg_inv_sqrt)] = 0.0
    D_inv_sqrt = sp.diags(deg_inv_sqrt)
    norm_mat = D_inv_sqrt @ mat @ D_inv_sqrt
    norm_mat = norm_mat.tocoo()

    indices = torch.tensor(np.vstack((norm_mat.row, norm_mat.col)), dtype=torch.long)
    values = torch.tensor(norm_mat.data, dtype=torch.float32)
    shape = torch.Size(norm_mat.shape)
    return torch.sparse_coo_tensor(indices, values, shape).coalesce()


def recall_at_k(ranked_items: np.ndarray, ground_truth: set, k: int) -> float:
    if not ground_truth:
        return 0.0
    hit = sum(1 for i in ranked_items[:k] if i in ground_truth)
    return hit / min(k, len(ground_truth))


def ndcg_at_k(ranked_items: np.ndarray, ground_truth: set, k: int) -> float:
    dcg = 0.0
    for idx, item in enumerate(ranked_items[:k]):
        if item in ground_truth:
            dcg += 1.0 / np.log2(idx + 2)
    ideal_hits = min(len(ground_truth), k)
    idcg = sum(1.0 / np.log2(i + 2) for i in range(ideal_hits))
    return dcg / idcg if idcg > 0 else 0.0


## 데이터셋/샘플러 정의

In [None]:
class BPRDataset(Dataset):
    def __init__(self, df: pd.DataFrame, num_items: int, user_pos: dict, pop_prob: np.ndarray,
                 n_neg: int = 1, pop_ratio: float = 0.7, seed: int = 42):
        self.users = df['user_idx'].to_numpy()
        self.pos_items = df['item_idx'].to_numpy()
        self.num_items = num_items
        self.user_pos = user_pos
        self.pop_prob = pop_prob
        self.n_neg = n_neg
        self.pop_ratio = pop_ratio
        self.rng = np.random.default_rng(seed)

    def __len__(self):
        return len(self.users)

    def __getitem__(self, idx):
        u = int(self.users[idx])
        i_pos = int(self.pos_items[idx])
        negs = []
        seen = self.user_pos.get(u, set())
        while len(negs) < self.n_neg:
            if self.rng.random() < self.pop_ratio:
                i_neg = int(self.rng.choice(self.num_items, p=self.pop_prob))
            else:
                i_neg = int(self.rng.integers(0, self.num_items))
            if i_neg in seen:
                continue
            negs.append(i_neg)
        return u, i_pos, negs[0]  # n_neg=1 가정


## LightGCN 모델

In [None]:
class LightGCN(nn.Module):
    def __init__(self, num_users: int, num_items: int, embed_dim: int, n_layers: int, adj: torch.Tensor):
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.n_layers = n_layers
        self.adj = adj
        self.embedding = nn.Embedding(num_users + num_items, embed_dim)
        nn.init.normal_(self.embedding.weight, std=0.1)

    def forward(self):
        all_emb = self.embedding.weight
        embs = [all_emb]
        x = all_emb
        for _ in range(self.n_layers):
            x = torch.sparse.mm(self.adj, x)
            embs.append(x)
        embs = torch.stack(embs, dim=0).mean(dim=0)
        users = embs[: self.num_users]
        items = embs[self.num_users :]
        return users, items

    def predict(self, users):
        user_emb, item_emb = self.forward()
        u = user_emb[users]
        scores = torch.matmul(u, item_emb.t())
        return scores


## 학습/평가 루프

In [None]:
def bpr_loss(u_emb, i_emb, j_emb, reg=1e-4):
    pos_scores = (u_emb * i_emb).sum(dim=1)
    neg_scores = (u_emb * j_emb).sum(dim=1)
    loss = -torch.log(torch.sigmoid(pos_scores - neg_scores) + 1e-8).mean()
    reg_loss = reg * (u_emb.norm(2).pow(2) + i_emb.norm(2).pow(2) + j_emb.norm(2).pow(2)) / u_emb.shape[0]
    return loss + reg_loss


def train_one_epoch(model, loader, optimizer, device):
    model.train()
    total_loss = 0.0
    for batch in loader:
        u, i_pos, i_neg = batch
        u = u.to(device)
        i_pos = i_pos.to(device)
        i_neg = i_neg.to(device)

        user_emb, item_emb = model.forward()
        u_emb = user_emb[u]
        pos_emb = item_emb[i_pos]
        neg_emb = item_emb[i_neg]

        loss = bpr_loss(u_emb, pos_emb, neg_emb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * u.shape[0]
    return total_loss / len(loader.dataset)


def evaluate(model, user_pos_train: dict, eval_df: pd.DataFrame, k: int = 10, device='cpu'):
    model.eval()
    user_emb, item_emb = model.forward()
    user_emb = user_emb.to(device)
    item_emb = item_emb.to(device)

    user_pos_eval = build_user_pos_dict(eval_df)
    recalls = []
    ndcgs = []
    with torch.no_grad():
        for u, gt_items in user_pos_eval.items():
            u_tensor = torch.tensor([u], device=device)
            scores = (user_emb[u_tensor] @ item_emb.T).squeeze(0)
            seen = user_pos_train.get(u, set())
            if len(seen) > 0:
                seen_idx = torch.tensor(list(seen), device=device)
                scores[seen_idx] = -1e9
            topk = torch.topk(scores, k=k).indices.cpu().numpy()
            recalls.append(recall_at_k(topk, gt_items, k))
            ndcgs.append(ndcg_at_k(topk, gt_items, k))
    return float(np.mean(recalls)), float(np.mean(ndcgs))


## 데이터 준비 및 모델 초기화

In [None]:
num_users = len(user2idx)
num_items = len(item2idx)
user_pos_train = build_user_pos_dict(train_df)
pop_prob = item_popularity_weights(train_df)

adj = make_normalized_adj(num_users, num_items, train_df).to(DEVICE)

train_dataset = BPRDataset(
    train_df, num_items=num_items, user_pos=user_pos_train, pop_prob=pop_prob,
    n_neg=N_NEG, pop_ratio=POP_RATIO, seed=SEED,
)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=False)

model = LightGCN(num_users, num_items, EMBED_DIM, N_LAYERS, adj).to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)


## 학습 실행 (필요 시 반복 횟수/하이퍼 수정)

In [None]:
for epoch in range(1, EPOCHS + 1):
    loss = train_one_epoch(model, train_loader, optimizer, DEVICE)
    if epoch % 1 == 0:
        rec, ndcg = evaluate(model, user_pos_train, val_df, k=K_EVAL, device=DEVICE)
        print(f"Epoch {epoch} loss={loss:.4f} recall@{K_EVAL}={rec:.4f} ndcg@{K_EVAL}={ndcg:.4f}")


## 모델 저장

In [None]:
ckpt_path = MODEL_DIR / 'codexa1_lightgcn.pth'
torch.save({
    'model_state': model.state_dict(),
    'num_users': num_users,
    'num_items': num_items,
    'embed_dim': EMBED_DIM,
    'n_layers': N_LAYERS,
}, ckpt_path)
print('saved to', ckpt_path)


## 추론 유틸 (추천 여부 O/X)

In [None]:
def predict_ox(model, user_enc: dict, item_enc: dict, csv_path: Path, k: int = 10):
    """
    입력 CSV(user,item[,rating])에 대해 O/X 반환.
    k: 사용자별 상위 k 내 포함 여부로 추천 판단.
    """
    df_in = pd.read_csv(csv_path)
    users = df_in['user'].map(user_enc)
    items = df_in['item'].map(item_enc)
    if users.isnull().any() or items.isnull().any():
        raise ValueError('미등록 user/item 존재. 매핑 필요.')

    user_tensor = torch.tensor(users.to_numpy(), device=DEVICE)
    item_tensor = torch.tensor(items.to_numpy(), device=DEVICE)

    user_emb, item_emb = model.forward()
    user_emb = user_emb.to(DEVICE)
    item_emb = item_emb.to(DEVICE)

    results = []
    with torch.no_grad():
        for u_idx, i_idx in zip(user_tensor.tolist(), item_tensor.tolist()):
            u = torch.tensor([u_idx], device=DEVICE)
            all_scores = (user_emb[u] @ item_emb.T).squeeze(0)
            topk = torch.topk(all_scores, k=k).indices.tolist()
            results.append('O' if i_idx in topk else 'X')
    df_in['recommend'] = results
    total_o = df_in['recommend'].eq('O').sum()
    total = len(df_in)
    print(df_in)
    print(f"====================
Total recommends = {total_o}/{total}
Not recommend = {total - total_o}/{total}")
    return df_in


## 추론 예시 (sample1.csv)

In [None]:
# 학습 및 체크포인트 로드 후 실행 예시
# predict_ox(model, user2idx, item2idx, Path('../data/sample1.csv'), k=10)
