BeMF, Bernoulli Matrix Factorization

¿Qué aporta de nuevo?:
  - Divide el problema en cinco clasificadores binarios (¿rating=1? … ¿rating=5?).  
  - Usa una sigmoide para obtener **probabilidades** por puntuación, aportando interpretabilidad.  
  - Combina lo discreto (ratings enteros) con lo continuo (probabilidades).  

In [1]:
import os, numpy as np, pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, precision_score, recall_score, f1_score


In [2]:
train_df = pd.read_csv('./data/processed/train.csv')
test_df = pd.read_csv('./data/processed/test.csv')

### Inicialización del modelo

Declaramos parámetros (factores latentes, tamaño de paso para el descenso de gradiente y penalización L2) y las dimensiones de la matriz (num_users x num_items).

Creamos la matriz 'ratings' inicializada en None, cambiando con el rating real después.

In [3]:
import random

# Parámetros
NUM_FACTORS    = 7
LEARNING_RATE  = 0.001
REGULARIZATION = 0.1

NUM_USERS = int(max(train_df.user_id.max(), test_df.user_id.max()))
NUM_ITEMS = int(max(train_df.book_id.max(), test_df.book_id.max()))

ratings = [[None for _ in range(NUM_ITEMS)] for _ in range(NUM_USERS)]

#### Generamos U y V como arrays uniformes en [0,1)

Creamos cinco pares de matrices latentes `U[s]` y `V[s]`, uno para cada rating en `SCORES`, inicializados aleatoriamente con valores uniformes en \([0,1)\).  

In [4]:
SCORES = [1, 2, 3, 4, 5]

In [5]:
U = { 
    s: np.random.rand(NUM_USERS, NUM_FACTORS) 
    for s in SCORES 
}

V = { 
    s: np.random.rand(NUM_ITEMS, NUM_FACTORS) 
    for s in SCORES 
}

#### Cálculo de predicciones

La función `logit`, sigmoidea, aplica la sigmoide al producto escalar de factores para obtener una probabilidad, y `compute_prediction` recorre cada score 1–5 para devolver el rating más probable junto con su probabilidad.  


In [6]:
import math

def logit(x):
    return 1.0 / (1.0 + math.exp(-x))

def compute_prediction(u, i):
    """
    Devuelve (predicted_score, probability) para usuario u, ítem i.
    u, i son índices base 0.
    """
    best_s, best_p = None, 0.0
    for idx, s in enumerate(SCORES):
        # Producto escalar P^s_u · Q^s_i
        dot = np.dot(U[s][u], V[s][i])
        p   = logit(dot)
        if p > best_p:
            best_p = p
            best_s = s
    return best_s, best_p

#### Aprendizaje factores latentes

In [7]:
NUM_ITERATIONS = 10

for it in range(NUM_ITERATIONS):
    print(f"Iteración {it+1} de {NUM_ITERATIONS}")
    # Para cada clase de puntuación
    for s in SCORES:
        P = U[s]
        Q = V[s]

        # Actualizar factores de usuario
        for u in range(NUM_USERS):
            for i in range(NUM_ITEMS):
                r_ui = ratings[u][i]
                if r_ui is None:
                    continue

                # Predecimos probabilidad de que rating==s
                z = np.dot(P[u], Q[i])
                p = logit(z)
                y = 1.0 if r_ui == s else 0.0
                e = p - y

                # Gradientes y actualización
                for f in range(NUM_FACTORS):
                    grad_p = e * Q[i][f] + REGULARIZATION * P[u][f]
                    P[u][f] -= LEARNING_RATE * grad_p

        # Actualizar factores de ítem
        for i in range(NUM_ITEMS):
            for u in range(NUM_USERS):
                r_ui = ratings[u][i]
                if r_ui is None:
                    continue

                z = np.dot(P[u], Q[i])
                p = logit(z)
                y = 1.0 if r_ui == s else 0.0
                e = p - y

                for f in range(NUM_FACTORS):
                    grad_q = e * P[u][f] + REGULARIZATION * Q[i][f]
                    Q[i][f] -= LEARNING_RATE * grad_q

        # Guardamos de nuevo
        U[s], V[s] = P, Q

Iteración 1 de 10
Iteración 2 de 10
Iteración 3 de 10
Iteración 4 de 10
Iteración 5 de 10
Iteración 6 de 10
Iteración 7 de 10
Iteración 8 de 10
Iteración 9 de 10
Iteración 10 de 10


#### Cálculo de recomendaciones

`get_recommendations` ordena todos los ítems por puntuación predicha, filtra los ya valorados por el usuario y devuelve los top-N no vistos.  


In [2]:
def get_recommendations(user_id, pred_matrix, train_df, N=5):
    u = user_id - 1
    # ítems ya valorados
    seen = set(train_df.loc[train_df.user_id == user_id, 'book_id'])
    # scores para ese usuario
    scores = pred_matrix[u]
    # ordenados por score descendente
    ranked = np.argsort(scores)[::-1]
    # tomar los N primeros
    recs = []
    for idx in ranked:
        book_id = idx + 1
        if book_id not in seen:
            recs.append(book_id)
            if len(recs) >= N:
                break
    return recs

#### Predicciones y métricas

In [None]:
from math import floor

predictions = [[None for _ in range(NUM_ITEMS)] for _ in range(NUM_USERS)]

# sólo donde tenemos un rating de test
for _, row in test_df.iterrows():
    u = int(row.user_id) - 1
    i = int(row.book_id) - 1
    pred_s, _ = compute_prediction(u, i)
    predictions[u][i] = pred_s
    
y_true = []
y_pred = []
for _, row in test_df.iterrows():
    u, i, true_r = int(row.user_id)-1, int(row.book_id)-1, row.rating
    y_true.append(true_r)
    y_pred.append(predictions[u][i])

mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mean_squared_error(y_true, y_pred))

# binarizar
threshold = floor(np)
y_true_bin = (np.array(y_true) >= threshold).astype(int)
y_pred_bin = (np.array(y_pred) >= threshold).astype(int)

# precisión
precision = precision_score(y_true_bin, y_pred_bin, zero_division=0)
recall    = recall_score   (y_true_bin, y_pred_bin, zero_division=0)
f1        = f1_score       (y_true_bin, y_pred_bin, zero_division=0)

pred_matrix = np.full((NUM_USERS, NUM_ITEMS), np.nan)
for u in range(NUM_USERS):
    for i in range(NUM_ITEMS):
        if predictions[u][i] is not None:
            pred_matrix[u, i] = predictions[u][i]
            
K_NDCG = 10

def get_user_ndcg(u, pred_matrix, test_df, K=K_NDCG, rel_col='rating'):
    user_ratings = test_df[test_df.user_id == (u+1)]
    if user_ratings.empty:
        return None
    true_rels = {int(row.book_id)-1: row[rel_col] for _, row in user_ratings.iterrows()}
    scores = pred_matrix[u]
    top_k = np.argsort(scores)[::-1][:K]
    dcg = sum((2**true_rels.get(item, 0) - 1) / np.log2(rank+1)
              for rank, item in enumerate(top_k, start=1))
    ideal_rels = sorted(true_rels.values(), reverse=True)[:K]
    idcg = sum((2**rel - 1) / np.log2(idx+1)
               for idx, rel in enumerate(ideal_rels, start=1))
    return (dcg / idcg) if idcg > 0 else 0.0

def get_ndcg(pred_matrix, test_df, K=K_NDCG, rel_col='rating'):
    total, count = 0.0, 0
    for u in range(pred_matrix.shape[0]):
        ndcg_u = get_user_ndcg(u, pred_matrix, test_df, K, rel_col)
        if ndcg_u is not None:
            total += ndcg_u
            count += 1
    return total/count if count else 0.0

# K=10
ndcg_10 = get_ndcg(pred_matrix, test_df)
print(f"nDCG@{K_NDCG} = {ndcg_10:.4f}")
print("BeMF (Bernoulli MF) Metrics:")
print(f"  MAE       = {mae:.4f}")
print(f"  RMSE      = {rmse:.4f}")
print(f"  Precision = {precision:.4f}")
print(f"  Recall    = {recall:.4f}")
print(f"  F1        = {f1:.4f}")
print(f"  nDCG@10    = {ndcg_10:.4f}")