# Sample Code - MF

In [3]:
import numpy as np

def matrix_factorization(R, K, steps=1000, alpha=0.002, beta=0.02):
    """
    R: 사용자×아이템 평점 행렬 (numpy array)
    K: 잠재 요인(latent factors) 개수
    steps: 학습 반복 횟수
    alpha: 학습률
    beta: 정규화 파라미터
    """
    m, n = R.shape
    # 사용자 및 아이템 잠재 행렬 초기화
    P = np.random.rand(m, K)
    Q = np.random.rand(n, K)

    # SGD를 통한 최적화
    for step in range(steps):
        for i in range(m):
            for j in range(n):
                if R[i, j] > 0:
                    # 예측 오차 계산
                    eij = R[i, j] - np.dot(P[i, :], Q[j, :])
                    # P, Q 업데이트
                    for k in range(K):
                        P[i, k] += alpha * (2 * eij * Q[j, k] - beta * P[i, k])
                        Q[j, k] += alpha * (2 * eij * P[i, k] - beta * Q[j, k])
        # 오차 계산 및 수렴 체크
        error = 0
        for i in range(m):
            for j in range(n):
                if R[i, j] > 0:
                    error += (R[i, j] - np.dot(P[i, :], Q[j, :]))**2
                    for k in range(K):
                        error += (beta / 2) * (P[i, k]**2 + Q[j, k]**2)
        if error < 0.001:
            break

    return P, Q

# 예시 사용
if __name__ == "__main__":
    # 희소 평점 행렬 (0: 평점 없음)
    R = np.array([
        [5, 3, 0, 1],
        [4, 0, 0, 1],
        [1, 1, 0, 5],
        [1, 0, 0, 4],
        [0, 1, 5, 4],
    ])
    K = 2  # 잠재 요인 수
    P, Q = matrix_factorization(R, K)

    # 예측된 평점 행렬
    reconstructed_R = np.dot(P, Q.T)
    print("원본 R:\n", R)
    print("예측된 R:\n", np.round(reconstructed_R, 2))

원본 R:
 [[5 3 0 1]
 [4 0 0 1]
 [1 1 0 5]
 [1 0 0 4]
 [0 1 5 4]]
예측된 R:
 [[4.98 2.97 5.45 1.  ]
 [3.98 2.38 4.53 1.  ]
 [1.04 0.91 5.41 4.97]
 [0.98 0.82 4.47 3.98]
 [1.48 1.11 4.95 4.01]]


# Sample Code - ALS

In [29]:
import numpy as np

def als_explicit(R, K, steps=10, reg=0.1):
    """
    Explicit feedback ALS
    R: 사용자×아이템 평점 행렬 (numpy array, 0은 missing)
    K: 잠재 요인(latent factors) 수
    steps: ALS 반복 횟수
    reg: 정규화 파라미터
    """
    m, n = R.shape
    # 사용자 및 아이템 잠재 행렬 초기화
    P = np.random.rand(m, K) # 사용자 행렬
    Q = np.random.rand(n, K) # 아이템 행렬

    for _ in range(steps):
        # 사용자 행렬 업데이트 (아이템 행렬을 고정하고 사용자 행렬 업데이트)
        QtQ = Q.T @ Q + reg * np.eye(K)
        for i in range(m):
            P[i] = np.linalg.solve(QtQ, Q.T @ R[i]) # QtQ @ P[i] = Q.T @ R[i] 를 풀어서 P[i]값을 알려줌.
        # 아이템 행렬 업데이트 (사용자 행렬을 고정하고 아이템 행렬 업데이트)
        PtP = P.T @ P + reg * np.eye(K)
        for j in range(n):
            Q[j] = np.linalg.solve(PtP, P.T @ R[:, j]) # PtP @ Q[j] = P.T @ R[:, j]

    return P, Q

# 예시 사용
if __name__ == "__main__":
    # explicit feedback 예시
    print("Explicit matrix R : ")
    print(R)
    R = np.array([
        [5, 3, 0, 1],
        [4, 0, 0, 1],
        [1, 1, 0, 5],
        [1, 0, 0, 4],
        [0, 1, 5, 4],
    ], dtype=float)
    print("Explicit ALS로 예측된 R:")
    P, Q = als_explicit(R, K=10, steps=10, reg=0.1)
    print(np.round(P @ Q.T, 2), "\n")

Explicit matrix R : 
[[5. 3. 0. 1.]
 [4. 0. 0. 1.]
 [1. 1. 0. 5.]
 [1. 0. 0. 4.]
 [0. 1. 5. 4.]]
Explicit ALS로 예측된 R:
[[ 4.96  2.93 -0.03  0.98]
 [ 3.94  0.05 -0.03  0.98]
 [ 1.01  0.98  0.02  4.96]
 [ 0.99  0.02  0.01  3.97]
 [-0.04  0.97  4.91  3.95]] 



MF와 다르게 0을 관측되지 않은 값이라고 여기고 추론의 대상으로 여기지 않음.   
0점이라고 여기고 포함되서 계산되고 있음

In [37]:
import numpy as np

def als_masked(R, K, steps=10, reg=0.1):
    """
    Masked ALS for explicit feedback
    R: 사용자×아이템 평점 행렬 (numpy array, 0은 missing)
    K: 잠재 요인(latent factors) 수
    steps: ALS 반복 횟수
    reg: 정규화 파라미터
    """
    m, n = R.shape
    # 사용자 및 아이템 잠재 행렬 초기화
    P = np.random.rand(m, K)
    Q = np.random.rand(n, K)
    mask = (R > 0)  # 관측된 평점만 True

    for _ in range(steps):
        # 사용자 행렬 업데이트 (아이템 행렬을 고정)
        for i in range(m):
            idx = mask[i]               # 사용자 i가 관측한 아이템 인덱스
            Q_i = Q[idx]                # 관측된 아이템 벡터 (p_i x K)
            R_i = R[i, idx]             # 관측된 평점 벡터 (p_i)
            A = Q_i.T @ Q_i + reg * np.eye(K)
            b = Q_i.T @ R_i
            P[i] = np.linalg.solve(A, b)

        # 아이템 행렬 업데이트 (사용자 행렬을 고정)
        for j in range(n):
            idx = mask[:, j]            # 아이템 j를 관측한 사용자 인덱스
            P_j = P[idx]                # 관측된 사용자 벡터 (q_j x K)
            R_j = R[idx, j]             # 관측된 평점 벡터 (q_j)
            A = P_j.T @ P_j + reg * np.eye(K)
            b = P_j.T @ R_j
            Q[j] = np.linalg.solve(A, b)

    return P, Q

# 예시 사용
if __name__ == "__main__":
    R = np.array([
        [5, 3, 0, 1],
        [4, 0, 0, 1],
        [1, 1, 0, 5],
        [1, 0, 0, 4],
        [0, 1, 5, 4],
    ], dtype=float)
    print("rating matrix : ")
    print(R)
    P_masked, Q_masked = als_masked(R, K=2, steps=20, reg=0.1)
    reconstructed = P_masked @ Q_masked.T
    print("Masked ALS로 예측된 R:\n", np.round(reconstructed, 2))

rating matrix : 
[[5. 3. 0. 1.]
 [4. 0. 0. 1.]
 [1. 1. 0. 5.]
 [1. 0. 0. 4.]
 [0. 1. 5. 4.]]
Masked ALS로 예측된 R:
 [[4.98 2.96 2.24 1.01]
 [3.94 2.36 2.   1.  ]
 [1.02 0.98 5.79 4.91]
 [0.99 0.89 4.67 3.93]
 [1.22 1.03 4.92 4.11]]


In [10]:
import numpy as np

def als_implicit(Cui, K, alpha=40, reg=0.1, steps=10):
    """
    Implicit feedback ALS (Hu, Koren, Volinsky)
    Cui: 사용자×아이템 신호 행렬 (numpy array, non-negative)
    K: 잠재 요인 수
    alpha: confidence scaling 파라미터
    reg: 정규화 파라미터
    steps: ALS 반복 횟수
    """
    m, n = Cui.shape
    # 선호(preference) 및 신뢰도(confidence)
    Pui = (Cui > 0).astype(float)
    C = 1 + alpha * Cui

    # 사용자 및 아이템 잠재 행렬 초기화
    X = np.random.rand(m, K)
    Y = np.random.rand(n, K)

    for _ in range(steps):
        # 사용자 행렬 업데이트
        for i in range(m):
            Cu = C[i]           # 사용자 i의 confidence
            Pu = Pui[i]         # 사용자 i의 preference
            A = Y.T @ (Cu[:, None] * Y) + reg * np.eye(K)
            b = Y.T @ (Cu * Pu)
            X[i] = np.linalg.solve(A, b)
        # 아이템 행렬 업데이트
        for j in range(n):
            Cj = C[:, j]        # 아이템 j의 confidence
            Pj = Pui[:, j]      # 아이템 j의 preference
            A = X.T @ (Cj[:, None] * X) + reg * np.eye(K)
            b = X.T @ (Cj * Pj)
            Y[j] = np.linalg.solve(A, b)

    return X, Y

# 예시 사용
if __name__ == "__main__":
    # implicit feedback 예시 (동일 행렬을 신호로 가정)
    Cui = np.array([
        [5, 3, 0, 1],
        [4, 0, 0, 1],
        [1, 1, 0, 5],
        [1, 0, 0, 4],
        [0, 1, 5, 4],
    ], dtype=float)
    print("rating matrix: ")
    print(Cui)
    print("Implicit ALS로 예측된 선호 점수:")
    X, Y = als_implicit(Cui, K=2, alpha=40, reg=0.1, steps=10)
    print(np.round(X @ Y.T, 2))


rating matrix: 
[[5. 3. 0. 1.]
 [4. 0. 0. 1.]
 [1. 1. 0. 5.]
 [1. 0. 0. 4.]
 [0. 1. 5. 4.]]
Implicit ALS로 예측된 선호 점수:
[[1.   0.99 0.12 1.02]
 [1.   0.94 0.06 0.97]
 [1.   0.98 0.1  1.01]
 [1.   0.96 0.08 0.99]
 [0.28 1.   1.   1.  ]]


# 데이터셋에 적용

In [9]:
import sys
import os

# 현재 파일이 있는 디렉토리 기준으로 상위 폴더 경로 추가
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))  # 한 단계 상위 폴더
sys.path.append(parent_dir)

print(f"추가된 경로: {parent_dir}")

추가된 경로: /Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab


In [2]:
from src.tools.google_drive import ensure_data_files
import pandas as pd

# Ensure required data files are available
data_paths = ensure_data_files()

# Load data into Pandas DataFrames
print(data_paths)
review = pd.read_csv(data_paths["review"], index_col=0)

기존 data가 존재합니다. 파일 경로를 반환합니다.
{'reviewer': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/reviewer.csv', 'category': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/diner_category_raw.csv', 'review': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/review.csv', 'diner': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/diner.csv'}


In [3]:
print(review.isna().mean())
review.info()

diner_idx                0.000000
reviewer_id              0.000000
review_id                0.000000
reviewer_review          0.158498
reviewer_review_date     0.000000
reviewer_review_score    0.000000
dtype: float64
<class 'pandas.core.frame.DataFrame'>
Index: 2287474 entries, 0 to 2292954
Data columns (total 6 columns):
 #   Column                 Dtype  
---  ------                 -----  
 0   diner_idx              float64
 1   reviewer_id            int64  
 2   review_id              int64  
 3   reviewer_review        object 
 4   reviewer_review_date   object 
 5   reviewer_review_score  float64
dtypes: float64(2), int64(2), object(2)
memory usage: 122.2+ MB


In [8]:
import pandas as pd
from scipy.sparse import coo_matrix, csr_matrix

# 1) 사용할 칼럼만 뽑고 NaN 평점(drop) 처리
df2 = review[["reviewer_id", "diner_idx", "reviewer_review_score"]].dropna()

# 2) ID를 연속 정수 인덱스로 매핑
#    .astype("category")를 쓰면 고유값 순서대로 0,1,2,... 자동 매핑
user_cat = df2["reviewer_id"].astype("category")
item_cat = df2["diner_idx"].astype("category")

rows = user_cat.cat.codes           # 사용자 인덱스 (0 ~ num_users-1)
cols = item_cat.cat.codes           # 아이템 인덱스 (0 ~ num_items-1)
data = df2["reviewer_review_score"].values  # 평점 데이터

# 3) COO 포맷으로 희소행렬 생성 → CSR 포맷으로 변환
R_coo = coo_matrix(
    (data, (rows, cols)),
    shape=(len(user_cat.cat.categories), len(item_cat.cat.categories))
)
R_csr = R_coo.tocsr()

print("희소 행렬 크기:", R_csr.shape)
print("비제로 개수:", R_csr.nnz)

희소 행렬 크기: (668134, 163179)
비제로 개수: 2287452


In [1]:
# rating_matrix = review.pivot_table(
#     index="reviewer_id",
#     columns="diner_idx",
#     values="reviewer_review_score",
#     fill_value=0        # 미관측(원래 NaN)인 부분은 0으로 채움
# )

# # 3) numpy array로 변환이 필요하다면
# R = rating_matrix.values

# print("rating matrix shape:", R.shape)

## 직접 구현

In [None]:
import numpy as np
from scipy.sparse import coo_matrix
import pandas as pd

def als_implicit_sparse(Cui_csr, K, alpha=40, reg=0.1, steps=10):
    m, n = Cui_csr.shape
    X = np.random.rand(m, K)
    Y = np.random.rand(n, K)
    for _ in range(steps):
        # User updates
        for i in range(m):
            row = Cui_csr.getrow(i)
            idx, vals = row.indices, row.data
            C_u = 1 + alpha * vals
            # build A,b from only observed entries
            Q_i = Y[idx]
            A = Q_i.T @ (C_u[:, None] * Q_i) + reg * np.eye(K)
            b = Q_i.T @ C_u
            X[i] = np.linalg.solve(A, b)
        # Item updates (동일 패턴)
        for j in range(n):
            col = Cui_csr.getcol(j)
            idx, vals = col.indices, col.data
            C_j = 1 + alpha * vals
            P_j = X[idx]
            A = P_j.T @ (C_j[:, None] * P_j) + reg * np.eye(K)
            b = P_j.T @ C_j
            Y[j] = np.linalg.solve(A, b)
    return X, Y

# 사용 예시
df2 = review[["reviewer_id","diner_idx","reviewer_review_score"]].dropna()
user_cat = df2["reviewer_id"].astype("category")
item_cat = df2["diner_idx"].astype("category")
rows, cols = user_cat.cat.codes, item_cat.cat.codes
data = df2["reviewer_review_score"].values
Cui_csr = coo_matrix((data,(rows,cols))).tocsr()

X, Y = als_implicit_sparse(Cui_csr, K=10, alpha=40, reg=0.1, steps=5)

병렬처리를 하지 않으면 아무리 효율적으로 짜더라도 시간이 오래걸림

## 라이브러리 사용

In [None]:
import scipy.sparse as sp
from implicit.als import AlternatingLeastSquares
from scipy.sparse import coo_matrix, csr_matrix

df2 = review[["reviewer_id","diner_idx","reviewer_review_score"]].dropna()
user_cat = df2["reviewer_id"].astype("category")
item_cat = df2["diner_idx"].astype("category")
rows, cols = user_cat.cat.codes, item_cat.cat.codes
data = df2["reviewer_review_score"].values
Cui_csr = coo_matrix((data,(rows,cols))).tocsr()

alpha = 40
Cui_csr_conf = Cui_csr * alpha
Cui_csr_conf.data += 1.0  # now data = 1 + alpha * raw_count

model = AlternatingLeastSquares(
    factors=20,            # 잠재 요인 수 K
    regularization=0.1,    # reg
    iterations=15,         # steps
    use_gpu=False,         # GPU 없으면 False
    calculate_training_loss=True
)

# 이 메서드가 내부적으로 Cython/​OpenMP로 병렬 연산
model.fit(Cui_csr_conf)

# user_factors: (num_users, K), item_factors: (num_items, K)
user_factors = model.user_factors
item_factors = model.item_factors

100%|██████████| 15/15 [00:09<00:00,  1.63it/s, loss=0.00229]


In [8]:
import numpy as np

def predict_user_scores(user_index, user_factors, item_factors):
    """
    한 사용자(user_index)에 대한 모든 아이템의 예측 선호도 점수 계산
    """
    u = user_factors[user_index]         # shape: (K,)
    scores = u.dot(item_factors.T)       # shape: (num_items,)
    return scores

# -------------------------------------------------------------------
# 3) Top-N 추천 함수
# -------------------------------------------------------------------
def recommend_for_user(user_id, 
                       user_cat, item_cat, 
                       user_factors, item_factors, 
                       known_interactions,  # dict: user_index -> set(item_index)
                       N=10):
    """
    user_id: 실제 ID 값
    known_interactions: {user_idx: {item_idx, ...}, ...}
    """
    # 3.1 internal index
    try:
        user_index = user_cat.cat.categories.get_loc(user_id)
    except KeyError:
        raise ValueError(f"Unknown user_id: {user_id}")

    # 3.2 예측 점수
    scores = predict_user_scores(user_index, user_factors, item_factors)

    # 3.3 이미 본(평점 준) 아이템 필터링
    seen = known_interactions.get(user_index, set())
    scores[ list(seen) ] = -np.inf

    # 3.4 Top-N 인덱스 추출
    top_n_idx = np.argpartition(-scores, N)[:N]
    top_n_idx = top_n_idx[np.argsort(-scores[top_n_idx])]

    # 3.5 원래 ID로 매핑
    top_n_item_ids = item_cat.cat.categories[top_n_idx]

    return list(top_n_item_ids)

# -------------------------------------------------------------------
# 4) 예시 사용
# -------------------------------------------------------------------
# 4.1 known_interactions 사전 생성 (csr 행렬 이용)
#    Cui_csr: scipy.sparse.csr_matrix of shape (num_users, num_items)
known_interactions = {
    u: set(Cui_csr[u].indices)
    for u in range(Cui_csr.shape[0])
}

# 4.2 실제 추천 수행
user_id = 256114348  # 예시 사용자 ID
top10 = recommend_for_user(
    user_id, user_cat, item_cat,
    model.user_factors, model.item_factors,
    known_interactions, N=10
)
print(f"추천 Top-10 for user {user_id}:\n", top10)

추천 Top-10 for user 256114348:
 [1101448385.0, 21555096.0, 112760187.0, 377517929.0, 11027024.0, 1497730059.0, 1652929268.0, 1062153333.0, 2080551677.0, 1562029341.0]


# 구성한 파이프라인 사용

In [1]:
import sys
import os

# 현재 파일이 있는 디렉토리 기준으로 상위 폴더 경로 추가
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))  # 한 단계 상위 폴더
sys.path.append(parent_dir)

print(f"추가된 경로: {parent_dir}")

from src.tools.google_drive import ensure_data_files
import pandas as pd

# Ensure required data files are available
data_paths = ensure_data_files()

# Load data into Pandas DataFrames
print(data_paths)
review = pd.read_csv(data_paths["review"], index_col=0)

추가된 경로: /Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab
기존 data가 존재합니다. 파일 경로를 반환합니다.
{'reviewer': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/reviewer.csv', 'category': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/diner_category_raw.csv', 'review': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/review.csv', 'diner': '/Users/visuworks/Desktop/쩝쩝LAB/yamyam-lab/data/diner.csv'}


In [3]:
from src.model import ALSPipeline

als_pipeline = ALSPipeline()

In [4]:
als_pipeline.fit(review_df=review)

100%|██████████| 15/15 [00:09<00:00,  1.54it/s, loss=0.00229]


In [5]:
als_pipeline.recommend_for_user(user_id=256114348, N=10)

[8025375.0,
 11231325.0,
 25048467.0,
 26871883.0,
 13093208.0,
 47908415.0,
 11064200.0,
 1062153333.0,
 20557155.0,
 236724227.0]