### **Cuaderno: Módulo 2 - Más Allá del `GROUP BY`: Desbloqueando Análisis con Funciones de Ventana**

**Objetivo del Módulo:** Al finalizar este cuaderno, los participantes dominarán el uso de funciones de ventana para realizar cálculos sobre conjuntos de filas relacionadas. Podrán calcular rankings (`RANK`, `DENSE_RANK`), análisis de series temporales (`LEAD`, `LAG`) y agregados móviles de manera eficiente, eliminando la necesidad de costosos `self-joins`.

#### **Configuración del Entorno y Datos**

**Objetivo:** Primero, se necesita un entorno de datos robusto. Este script asegura que la base de datos `curso_arquitecturas` esté en uso y que las tablas `usuarios_silver`, `productos_silver`, `pedidos_silver` y `dim_fecha` estén disponibles y pobladas con datos consistentes.

**Acción:** Ejecutar la siguiente celda de código para preparar el entorno para los ejercicios del módulo.


In [0]:
# Celda 1: Script para generar y configurar el conjunto de datos del taller
from pyspark.sql import SparkSession
from pyspark.sql.types import DecimalType # Importante: Importar el tipo de dato necesario
from faker import Faker
import random
from datetime import datetime, timedelta

# --- SEMILLA PARA REPRODUCIBILIDAD ---
# Garantiza que los datos generados sean siempre los mismos.
SEED = 2025
Faker.seed(SEED)
random.seed(SEED)

# Inicializar Faker
fake = Faker('es_ES')

# --- FUNCIONES DE GENERACIÓN DE DATOS ---
def generar_usuarios(n=1000):
    """Genera una lista de diccionarios de usuarios."""
    return [{'id_usuario': 1000 + i, 'nombre': fake.name(), 'email': fake.email(), 'fecha_registro': fake.date_time_between(start_date='-2y'), 'ciudad': fake.city()} for i in range(n)]

def generar_productos(n=500):
    """Genera una lista de productos."""
    categorias = ['Electrónica', 'Hogar', 'Ropa', 'Libros', 'Deportes', 'Juguetes', 'Alimentos']
    data = []
    for i in range(n):
        data.append({
            'id_producto': 2000 + i,
            'nombre_producto': fake.word().capitalize() + " " + fake.word(),
            'categoria': random.choice(categorias),
            'precio_unitario': round(random.uniform(5.0, 350.0), 2)
        })
    return data

def generar_pedidos(usuarios, productos, n=10000):
    """Genera una lista de pedidos, vinculando usuarios y productos."""
    data = []
    for i in range(n):
        usuario = random.choice(usuarios)
        producto = random.choice(productos)
        cantidad = random.randint(1, 5)
        data.append({
            'id_pedido': 3000 + i,
            'id_usuario': usuario['id_usuario'],
            'id_producto': producto['id_producto'],
            'cantidad': cantidad,
            'monto': round(cantidad * producto['precio_unitario'], 2),
            'fecha_pedido': fake.date_time_between(start_date=usuario['fecha_registro'])
        })
    return data

# --- CREACIÓN DE DATAFRAMES ---
print("Generando datos simulados...")
usuarios_data = generar_usuarios()
productos_data = generar_productos()
pedidos_data = generar_pedidos(usuarios_data, productos_data)

usuarios_df = spark.createDataFrame(usuarios_data)
productos_df = spark.createDataFrame(productos_data)
pedidos_df = spark.createDataFrame(pedidos_data)

# --- CREACIÓN DE BASE DE DATOS Y TABLAS SILVER ---
db_name = "curso_arquitecturas"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
spark.sql(f"USE {db_name}")

print(f"Usando la base de datos: '{db_name}'")

# Guardar usuarios
usuarios_df.selectExpr("id_usuario", "nombre as nombre_usuario", "email", "fecha_registro", "ciudad") \
    .write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("usuarios_silver")

# ==================== LA CORRECCIÓN ESTÁ AQUÍ ====================
# 1. Definimos explícitamente el tipo de dato para 'precio_unitario' antes de escribir.
productos_df_casteado = productos_df.withColumn("precio_unitario", productos_df["precio_unitario"].cast(DecimalType(10, 2)))

# 2. Escribimos el DataFrame corregido, permitiendo la sobreescritura del esquema para robustez.
productos_df_casteado.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("productos_silver")
# =================================================================

# Convertimos el DataFrame de pedidos a una tabla SQL temporal para poder usar funciones SQL
pedidos_df.createOrReplaceTempView("pedidos_temp")

# Guardar pedidos con el esquema bien definido usando SQL
spark.sql("""
CREATE OR REPLACE TABLE pedidos_silver AS
SELECT 
  id_pedido, 
  id_usuario, 
  id_producto, 
  cantidad, 
  CAST(monto AS DECIMAL(18, 2)) AS monto_total, 
  fecha_pedido AS ts_pedido,
  TO_DATE(fecha_pedido) as id_fecha 
FROM pedidos_temp
""")

# --- CREACIÓN DE DIM_FECHA ---
spark.sql("""
CREATE OR REPLACE TABLE dim_fecha (
  id_fecha DATE NOT NULL, anio INT, mes INT, dia INT, trimestre INT,
  nombre_mes STRING, nombre_dia_semana STRING, tipo_dia STRING
);
""")
spark.sql("""
INSERT INTO dim_fecha
SELECT
  fecha AS id_fecha, YEAR(fecha) AS anio, MONTH(fecha) AS mes, DAY(fecha) AS dia,
  QUARTER(fecha) AS trimestre, DATE_FORMAT(fecha, 'MMMM') AS nombre_mes,
  DATE_FORMAT(fecha, 'EEEE') AS nombre_dia_semana,
  CASE WHEN DAYOFWEEK(fecha) IN (1, 7) THEN 'Fin de Semana' ELSE 'Día de Semana' END AS tipo_dia
FROM (SELECT EXPLODE(SEQUENCE(TO_DATE('2022-01-01'), TO_DATE('2026-12-31'), INTERVAL 1 DAY)) AS fecha);
""")

print("Tablas Silver y dim_fecha creadas/actualizadas exitosamente.")
print("\n¡Entorno listo para el Módulo 2!")

#### **El Porqué (Problema de Negocio)**

**Contexto:** El equipo de producto necesita realizar análisis más sofisticados que una simple agregación. Específicamente, quieren responder dos preguntas clave:

1.  **Análisis de Crecimiento:** ¿Cómo varían las ventas de un producto de un mes a otro? Necesitan calcular el crecimiento porcentual mensual para cada producto.
2.  **Ranking de Productos:** ¿Cuál fue el producto más vendido (por monto total) dentro de cada categoría, para cada mes?

#### **La "Mala" Solución (Anti-Patrón con `SELF JOIN`)**

**Análisis:** Para calcular el crecimiento mes a mes, la aproximación tradicional en SQL implica un `self-join`.

**Problemas:**
* **Complejidad:** La lógica del `JOIN` es difícil de escribir y entender.
* **Rendimiento:** Los `self-joins` en tablas grandes son extremadamente costosos.

In [0]:
%sql
-- La "Mala" Solución para el crecimiento mes a mes
WITH VentasMensuales AS (
  SELECT
    p.id_producto,
    d.anio,
    d.mes,
    SUM(p.monto_total) AS ventas_mes
  FROM pedidos_silver p
  JOIN dim_fecha d ON p.id_fecha = d.id_fecha
  GROUP BY p.id_producto, d.anio, d.mes
)
SELECT
  actual.id_producto,
  actual.anio,
  actual.mes,
  actual.ventas_mes,
  anterior.ventas_mes AS ventas_mes_anterior
FROM
  VentasMensuales actual
  LEFT JOIN VentasMensuales anterior ON actual.id_producto = anterior.id_producto
  AND actual.anio = anterior.anio 
  AND actual.mes = anterior.mes + 1
LIMIT 10;

#### **Celda 5: La "Buena" Solución - Introduciendo las Funciones de Ventana**

**Análisis:** Las funciones de ventana nos permiten realizar cálculos sobre un conjunto de filas (`la ventana`) que están relacionadas con la fila actual, sin tener que colapsar las filas como lo hace `GROUP BY`. Usamos `LAG` para "mirar hacia atrás" una fila.

In [0]:
%sql
-- La "Buena" Solución para el crecimiento mes a mes
WITH VentasMensuales AS (
  SELECT
    p.id_producto,
    d.anio,
    d.mes,
    SUM(p.monto_total) AS ventas_mes
  FROM pedidos_silver p
  JOIN dim_fecha d ON p.id_fecha = d.id_fecha
  GROUP BY p.id_producto, d.anio, d.mes
)
SELECT
  id_producto,
  anio,
  mes,
  ventas_mes,
  LAG(ventas_mes, 1, 0) OVER (PARTITION BY id_producto ORDER BY anio, mes) AS ventas_mes_anterior
FROM
  VentasMensuales
ORDER BY
  id_producto, anio, mes
LIMIT 10;

#### **El "Cómo Funciona" - Anatomía de una Función de Ventana**

`LAG(ventas_mes, 1, 0) OVER (PARTITION BY id_producto ORDER BY anio, mes)`

1.  **`LAG(ventas_mes, 1, 0)`**: La función a aplicar.
2.  **`OVER (...)`**: La cláusula que define la ventana.
    * **`PARTITION BY id_producto`**: Divide los datos en grupos. Es el `GROUP BY` de las funciones de ventana.
    * **`ORDER BY anio, mes`**: Ordena las filas *dentro* de cada grupo. Es **crucial** para `LAG`.

#### **Aplicando la Solución de Crecimiento Completa**

Ahora que tenemos las ventas del mes anterior, podemos calcular fácilmente el crecimiento porcentual.

In [0]:
%sql
-- Celda 9: Solución completa con cálculo de crecimiento
WITH VentasMensuales AS (
  SELECT
    p.id_producto,
    d.anio,
    d.mes,
    SUM(p.monto_total) AS ventas_mes
  FROM pedidos_silver p
  JOIN dim_fecha d ON p.id_fecha = d.id_fecha
  GROUP BY p.id_producto, d.anio, d.mes
),
VentasConMesAnterior AS (
  SELECT
    id_producto,
    anio,
    mes,
    ventas_mes,
    LAG(ventas_mes, 1, 0) OVER (PARTITION BY id_producto ORDER BY anio, mes) AS ventas_mes_anterior
  FROM
    VentasMensuales
)
SELECT
  id_producto,
  anio,
  mes,
  ventas_mes,
  ventas_mes_anterior,
  ROUND((ventas_mes - ventas_mes_anterior) / ventas_mes_anterior * 100, 2) AS crecimiento_pct
FROM
  VentasConMesAnterior
WHERE
  ventas_mes_anterior > 0
ORDER BY
  id_producto, anio, mes
LIMIT 10;

#### **Resolviendo el Segundo Problema - Ranking de Productos**

**Contexto:** Ahora, se necesita identificar el producto más vendido (por monto) en cada categoría, cada mes.

#### **La "Buena" Solución con Funciones de Ranking**

**Análisis:** Las funciones de ventana de ranking (`RANK`, `DENSE_RANK`, `ROW_NUMBER`) están diseñadas precisamente para este problema.

In [0]:
%sql
-- La "Buena" Solución para Ranking
WITH VentasMensualesPorProducto AS (
  SELECT
    pr.categoria,
    d.anio,
    d.mes,
    pr.nombre_producto,
    SUM(p.monto_total) AS ventas_producto_mes
  FROM pedidos_silver p
  JOIN productos_silver pr ON p.id_producto = pr.id_producto
  JOIN dim_fecha d ON p.id_fecha = d.id_fecha
  GROUP BY pr.categoria, d.anio, d.mes, pr.nombre_producto
),
RankingDeVentas AS (
  SELECT
    categoria,
    anio,
    mes,
    nombre_producto,
    ventas_producto_mes,
    RANK() OVER (PARTITION BY categoria, anio, mes ORDER BY ventas_producto_mes DESC) AS ranking
  FROM
    VentasMensualesPorProducto
)
SELECT
  categoria,
  anio,
  mes,
  nombre_producto,
  ventas_producto_mes
FROM
  RankingDeVentas
WHERE
  ranking = 1
ORDER BY
  anio, mes, categoria;

#### **¡A Practicar! 🏋️**

**Tu Tarea:** Escribe una consulta que, para cada pedido de cada cliente, calcule la **diferencia en días** entre la fecha de ese pedido y la fecha de su pedido **inmediatamente anterior**.

In [0]:
%sql
-- Celda 14: Escribe tu solución aquí...

#### **Resumen del Módulo**

**Ideas Clave:**
* **Funciones de Ventana** realizan cálculos sobre un conjunto de filas (`la ventana`) sin colapsarlas.
* **`OVER()`** es la cláusula que define la ventana.
* **`PARTITION BY`** crea los grupos.
* **`ORDER BY`** es crucial para ordenar las filas dentro de cada grupo.

#### **Solución del Ejercicio de Frecuencia de Compra**

In [0]:
%sql
-- Celda 16: Solución del Ejercicio de Frecuencia de Compra
WITH PedidosConFechaAnterior AS (
  SELECT
    id_usuario,
    id_pedido,
    ts_pedido,
    LAG(ts_pedido, 1) OVER (PARTITION BY id_usuario ORDER BY ts_pedido) AS fecha_pedido_anterior
  FROM
    pedidos_silver
)
SELECT
  id_usuario,
  id_pedido,
  ts_pedido,
  DATEDIFF(ts_pedido, fecha_pedido_anterior) AS dias_desde_ultimo_pedido
FROM
  PedidosConFechaAnterior
WHERE
  id_usuario BETWEEN 1000 AND 1002 -- Limitar para una visualización clara
ORDER BY
  id_usuario, ts_pedido;