# Análisis y Diseño de Algoritmos: Heaps en Top-K Ranking

**Autor:** Pedro Shiguihara

**Fecha:** 2 de febrero de 2026

---

Este notebook contiene la implementación completa del sistema de recomendación
Top-K basado en MinHeap, su versión naive, y el benchmark comparativo.
Está diseñado para ejecutarse en **Google Colab** haciendo streaming del dataset
[McAuley-Lab/Amazon-Reviews-2023](https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023)
directamente desde HuggingFace, sin necesidad de descargar los archivos a disco.

## 1. Instalación de dependencias

Se instala `requests` (para streaming HTTP) y `matplotlib` (para el scatterplot).
Ambas suelen estar disponibles en Colab por defecto, pero se asegura su presencia.

In [None]:
!pip install -q requests matplotlib

## 2. Imports

In [None]:
import json
import math
import time

import matplotlib.pyplot as plt
import requests

## 3. MinHeap — Implementación CLRS con indexación 0-based

Estructura de datos pura que almacena tuplas `(score, item_id)` y mantiene
el elemento con menor score en la raíz. Es la pieza central de `SistemaRec1`
para seleccionar los Top-K productos en O(n log K).

In [None]:
class MinHeap:
    """Min-heap que almacena tuplas (score, item_id)."""

    def __init__(self):
        self._data: list = []

    @staticmethod
    def parent(i: int) -> int:
        return (i - 1) // 2

    @staticmethod
    def left(i: int) -> int:
        return 2 * i + 1

    @staticmethod
    def right(i: int) -> int:
        return 2 * i + 2

    def min_heapify(self, i: int) -> None:
        """Corrige el sub-árbol con raíz en i (recursivo, CLRS)."""
        n = len(self._data)
        smallest = i
        l = self.left(i)
        r = self.right(i)
        if l < n and self._data[l] < self._data[smallest]:
            smallest = l
        if r < n and self._data[r] < self._data[smallest]:
            smallest = r
        if smallest != i:
            self._data[i], self._data[smallest] = self._data[smallest], self._data[i]
            self.min_heapify(smallest)

    def build_min_heap(self, array: list) -> None:
        """Construye el heap in-place a partir de array en O(n)."""
        self._data = list(array)
        for i in range(len(self._data) // 2 - 1, -1, -1):
            self.min_heapify(i)

    def heap_minimum(self):
        if not self._data:
            raise IndexError("heap_minimum en heap vacío")
        return self._data[0]

    def heap_extract_min(self):
        if not self._data:
            raise IndexError("heap_extract_min en heap vacío")
        minimum = self._data[0]
        self._data[0] = self._data[-1]
        self._data.pop()
        if self._data:
            self.min_heapify(0)
        return minimum

    def heap_decrease_key(self, i: int, key) -> None:
        if key > self._data[i]:
            raise ValueError("La nueva clave es mayor que la clave actual")
        self._data[i] = key
        while i > 0 and self._data[self.parent(i)] > self._data[i]:
            p = self.parent(i)
            self._data[i], self._data[p] = self._data[p], self._data[i]
            i = p

    def min_heap_insert(self, key) -> None:
        self._data.append((float("inf"), ""))
        self.heap_decrease_key(len(self._data) - 1, key)

    def __len__(self) -> int:
        return len(self._data)

    def __repr__(self) -> str:
        return f"MinHeap({self._data})"

## 4. Función de score

Combina calidad (rating promedio) con popularidad (cantidad de reviews):

$$\text{score} = \overline{\text{rating}} \times \ln(1 + N_{\text{reviews}})$$

Se define como función independiente para que ambos sistemas la compartan.

In [None]:
def compute_score(sum_ratings: float, count: int) -> float:
    """Calcula score = mean_rating * log(1 + N_reviews)."""
    mean_rating = sum_ratings / count
    return mean_rating * math.log(1 + count)

## 5. Streaming del dataset desde HuggingFace

Lee el archivo JSONL de una categoría directamente desde HuggingFace
vía HTTP streaming, sin guardarlo en disco. Cada línea se parsea y
descarta inmediatamente, manteniendo en memoria solo el diccionario
de agregación `{parent_asin: [sum_ratings, count]}`.

Esto permite ejecutar el notebook en Google Colab sin necesidad de
espacio en disco para archivos de hasta 31 GB.

In [None]:
HF_BASE_URL = (
    "https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023"
    "/resolve/main/raw/review_categories"
)


def stream_and_aggregate(category: str) -> dict:
    """Hace streaming del JSONL desde HuggingFace y agrega ratings.

    Retorna dict[parent_asin] -> [sum_ratings, count].
    """
    url = f"{HF_BASE_URL}/{category}.jsonl"
    print(f"  Streaming: {url}")

    aggregated: dict[str, list] = {}
    n_reviews = 0

    with requests.get(url, stream=True, timeout=300) as r:
        r.raise_for_status()
        for line in r.iter_lines(decode_unicode=True):
            if not line:
                continue
            review = json.loads(line)
            parent_asin = review["parent_asin"]
            rating = review["rating"]
            if parent_asin in aggregated:
                aggregated[parent_asin][0] += rating
                aggregated[parent_asin][1] += 1
            else:
                aggregated[parent_asin] = [rating, 1]
            n_reviews += 1

    print(f"  {n_reviews:,} reviews -> {len(aggregated):,} productos únicos")
    return aggregated

## 6. SistemaRec1 — Top-K con MinHeap O(n log K)

Mantiene un MinHeap de tamaño fijo K. Para cada producto:
- Si el heap tiene menos de K elementos: insertar.
- Si el score supera al mínimo del heap: extraer mínimo e insertar.

Al final, extrae todos los elementos y los invierte para orden descendente.

In [None]:
class SistemaRec1:
    """Top-K con MinHeap — O(n log K)."""

    def __init__(self, k: int = 10):
        self.k = k

    def top_k(self, aggregated: dict) -> list[tuple[float, str]]:
        heap = MinHeap()
        for parent_asin, (sum_ratings, count) in aggregated.items():
            score = compute_score(sum_ratings, count)
            if len(heap) < self.k:
                heap.min_heap_insert((score, parent_asin))
            elif score > heap.heap_minimum()[0]:
                heap.heap_extract_min()
                heap.min_heap_insert((score, parent_asin))
        result = []
        while len(heap) > 0:
            result.append(heap.heap_extract_min())
        result.reverse()
        return result

## 7. SistemaRecNaive — Top-K con sort completo O(n log n)

Calcula el score de todos los n productos, los ordena con `sorted()`
en O(n log n) y toma los primeros K. Es el enfoque directo pero
asintóticamente peor que SistemaRec1 cuando K << n.

In [None]:
class SistemaRecNaive:
    """Top-K con sort completo — O(n log n)."""

    def __init__(self, k: int = 10):
        self.k = k

    def top_k(self, aggregated: dict) -> list[tuple[float, str]]:
        scored = [
            (compute_score(sum_r, count), parent_asin)
            for parent_asin, (sum_r, count) in aggregated.items()
        ]
        scored.sort(reverse=True)
        return scored[: self.k]

## 8. Métricas de sistemas de recomendación

Se definen las métricas para comparar los rankings producidos por ambos sistemas:

| Métrica | Propósito |
|---|---|
| **Precision@K** | Fracción de items en común entre ambos top-K |
| **AP@K** | Penaliza items relevantes en posiciones tardías |
| **NDCG@K** | Calidad de ranking con descuento logarítmico |
| **Jaccard@K** | Solapamiento global de conjuntos |
| **Spearman ρ** | Correlación de orden entre rankings |

In [None]:
def precision_at_k(ref: list, evl: list) -> float:
    set_ref = {item_id for _, item_id in ref}
    set_evl = {item_id for _, item_id in evl}
    if not set_evl:
        return 0.0
    return len(set_ref & set_evl) / len(set_evl)


def average_precision_at_k(ref: list, evl: list) -> float:
    set_ref = {item_id for _, item_id in ref}
    if not set_ref:
        return 0.0
    hits = 0
    sum_precision = 0.0
    for i, (_, item_id) in enumerate(evl, 1):
        if item_id in set_ref:
            hits += 1
            sum_precision += hits / i
    return sum_precision / min(len(ref), len(evl)) if evl else 0.0


def _dcg(gains: list[float]) -> float:
    return sum(g / math.log2(i + 2) for i, g in enumerate(gains))


def ndcg_at_k(ref: list, evl: list) -> float:
    relevance = {item_id: score for score, item_id in ref}
    eval_gains = [relevance.get(item_id, 0.0) for _, item_id in evl]
    actual_dcg = _dcg(eval_gains)
    ideal_gains = sorted(relevance.values(), reverse=True)
    ideal_dcg = _dcg(ideal_gains)
    if ideal_dcg == 0:
        return 0.0
    return actual_dcg / ideal_dcg


def jaccard_at_k(ranking_a: list, ranking_b: list) -> float:
    set_a = {item_id for _, item_id in ranking_a}
    set_b = {item_id for _, item_id in ranking_b}
    union = set_a | set_b
    if not union:
        return 0.0
    return len(set_a & set_b) / len(union)


def spearman_rho(ranking_a: list, ranking_b: list) -> float:
    rank_a = {item_id: i for i, (_, item_id) in enumerate(ranking_a)}
    rank_b = {item_id: i for i, (_, item_id) in enumerate(ranking_b)}
    common = set(rank_a) & set(rank_b)
    n = len(common)
    if n < 2:
        return float("nan")
    d_sq_sum = sum((rank_a[item] - rank_b[item]) ** 2 for item in common)
    return 1 - (6 * d_sq_sum) / (n * (n ** 2 - 1))

## 9. Configuración del benchmark

Se definen las categorías con archivos JSONL menores a 2 GB y los
valores de K a evaluar (`range(5, 1000, 100)`).

El streaming se realiza una sola vez por categoría. Luego, la fase
de ranking se ejecuta para cada valor de K, midiendo únicamente el
costo algorítmico (sin I/O) para aislar la diferencia entre
O(n log K) y O(n log n).

In [None]:
CATEGORIES_UNDER_2GB = [
    "Subscription_Boxes",       # 8.95 MB
    "Magazine_Subscriptions",   # 33.3 MB
    "Gift_Cards",               # 50.2 MB
    "Digital_Music",            # 78.8 MB
    "Health_and_Personal_Care", # 227 MB
    "Handmade_Products",        # 289 MB
    "All_Beauty",               # 326.6 MB
    "Appliances",               # 929.5 MB
    "Amazon_Fashion",           # 1.05 GB
    "Musical_Instruments",      # 1.56 GB
    "Software",                 # 1.87 GB
]

K_VALUES = list(range(5, 1000, 100))

## 10. Streaming y agregación de todas las categorías

Se hace streaming de cada categoría una sola vez y se almacena el
diccionario agregado en memoria. Esto evita re-descargar los datos
en cada iteración del benchmark.

In [None]:
aggregated_data = {}

for cat in CATEGORIES_UNDER_2GB:
    print(f"\n[Streaming] {cat}")
    try:
        aggregated_data[cat] = stream_and_aggregate(cat)
    except Exception as e:
        print(f"  [ERROR] {e}")

print(f"\nCategorías cargadas: {len(aggregated_data)}")

## 11. Benchmark completo: variación de K

Para cada categoría cargada, se ejecutan ambos sistemas con cada valor
de K en `range(5, 1000, 100)`, midiendo únicamente el tiempo de la
fase de ranking (sin I/O). Se calculan también las métricas de
recomendación usando `SistemaRecNaive` como referencia.

Los resultados se almacenan en una lista para generar el scatterplot
y el reporte final.

In [None]:
benchmark_results = []

for cat, aggregated in aggregated_data.items():
    print(f"\n{'='*60}")
    print(f"Categoría: {cat} ({len(aggregated):,} productos)")
    print(f"{'='*60}")

    for k in K_VALUES:
        rec1 = SistemaRec1(k=k)
        naive = SistemaRecNaive(k=k)

        # Tiempo SistemaRec1
        t0 = time.perf_counter()
        results_rec1 = rec1.top_k(aggregated)
        time_rec1_ms = (time.perf_counter() - t0) * 1000

        # Tiempo SistemaRecNaive
        t0 = time.perf_counter()
        results_naive = naive.top_k(aggregated)
        time_naive_ms = (time.perf_counter() - t0) * 1000

        # Métricas
        prec = precision_at_k(results_naive, results_rec1)
        ap = average_precision_at_k(results_naive, results_rec1)
        ndcg = ndcg_at_k(results_naive, results_rec1)
        jacc = jaccard_at_k(results_rec1, results_naive)
        rho = spearman_rho(results_naive, results_rec1)

        row = {
            "category": cat,
            "k": k,
            "n_products": len(aggregated),
            "time_rec1_ms": round(time_rec1_ms, 4),
            "time_naive_ms": round(time_naive_ms, 4),
            "precision_at_k": round(prec, 4),
            "ap_at_k": round(ap, 4),
            "ndcg_at_k": round(ndcg, 4),
            "jaccard_at_k": round(jacc, 4),
            "spearman_rho": round(rho, 4) if not math.isnan(rho) else "NaN",
        }
        benchmark_results.append(row)

        print(f"  K={k:>4}  Rec1: {time_rec1_ms:>8.2f} ms  "
              f"Naive: {time_naive_ms:>8.2f} ms  "
              f"P@K={prec:.2f}  NDCG={ndcg:.4f}")

print(f"\nTotal de mediciones: {len(benchmark_results)}")

## 12. Reporte CSV

Se genera un CSV con todas las mediciones para su análisis posterior.
En Google Colab, el archivo queda disponible en el panel de archivos
de la izquierda para descargarlo.

In [None]:
import csv

CSV_PATH = "benchmark_completo_colab.csv"
CSV_COLUMNS = [
    "category", "k", "n_products",
    "time_rec1_ms", "time_naive_ms",
    "precision_at_k", "ap_at_k", "ndcg_at_k",
    "jaccard_at_k", "spearman_rho",
]

with open(CSV_PATH, "w", newline="", encoding="utf-8") as f:
    writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
    writer.writeheader()
    writer.writerows(benchmark_results)

print(f"CSV guardado en: {CSV_PATH}")

## 13. Scatterplot: Tiempo de ejecución vs Top-K

Se genera un scatterplot por cada categoría evaluada. El eje X
representa el valor de K y el eje Y el tiempo de ejecución en
milisegundos. Dado que el streaming se realizó previamente, estos
tiempos reflejan **únicamente la fase de ranking**, lo que permite
observar la diferencia algorítmica real entre O(n log K) y O(n log n)
sin el ruido del I/O.

In [None]:
categories_in_results = list(dict.fromkeys(
    row["category"] for row in benchmark_results
))

for cat in categories_in_results:
    rows_cat = [r for r in benchmark_results if r["category"] == cat]
    ks = [r["k"] for r in rows_cat]
    t_rec1 = [r["time_rec1_ms"] for r in rows_cat]
    t_naive = [r["time_naive_ms"] for r in rows_cat]

    fig, ax = plt.subplots(figsize=(10, 6))
    ax.scatter(ks, t_rec1, label="SistemaRec1 (MinHeap)", marker="o")
    ax.scatter(ks, t_naive, label="SistemaRecNaive (sort)", marker="s")
    ax.set_xlabel("top-k")
    ax.set_ylabel("Tiempo de ejecución (ms)")
    ax.set_title(f"SistemaRec1 vs SistemaRecNaive — {cat}")
    ax.legend()
    ax.grid(True, alpha=0.3)
    fig.tight_layout()

    png_path = f"benchmark_{cat}.png"
    fig.savefig(png_path, dpi=150)
    plt.show()
    print(f"Guardado: {png_path}\n")

## 14. Tabla resumen de métricas por categoría (K=505)

Se muestra una tabla resumen con las métricas de recomendación
para el valor central de K como referencia rápida.

In [None]:
k_mid = K_VALUES[len(K_VALUES) // 2]

print(f"{'Categoría':<30} {'P@K':>6} {'AP@K':>6} {'NDCG':>7} "
      f"{'Jacc':>6} {'Spear':>7} {'Rec1(ms)':>10} {'Naive(ms)':>10}")
print("-" * 95)

for row in benchmark_results:
    if row["k"] == k_mid:
        print(f"{row['category']:<30} "
              f"{row['precision_at_k']:>6.2f} "
              f"{row['ap_at_k']:>6.2f} "
              f"{row['ndcg_at_k']:>7.4f} "
              f"{row['jaccard_at_k']:>6.2f} "
              f"{str(row['spearman_rho']):>7} "
              f"{row['time_rec1_ms']:>10.2f} "
              f"{row['time_naive_ms']:>10.2f}")