# Título: El Modelo de Documentos en el Lakehouse

**Objetivo:** En esta sección, tomaremos nuestros datos de noticias crudos y los transformaremos en una tabla Delta Lake que imita la estructura y flexibilidad de una base de datos documental como MongoDB o Cosmos DB. Aprenderemos a manejar datos semi-estructurados usando tipos complejos y a aprovechar los beneficios del Lakehouse.

**Fases del Taller:**
1.  **Ingesta a Bronce:** Cargar los datos crudos del CSV a nuestra capa de ingesta.
2.  **Transformación a Plata:** Limpiar, estructurar y enriquecer los datos, creando una tabla que se comporte como una colección de documentos.
3.  **Consulta y Evolución:** Demostrar cómo consultar y evolucionar el esquema de nuestra tabla "documental".

In [0]:
# Configuración del Entorno

# Es una buena práctica definir nuestras variables de configuración al inicio.
# Esto hace que el cuaderno sea más fácil de mantener y reutilizar.

db_name = "curso_arquitecturas"
bronze_table_name = "noticias_bronze"
silver_table_name = "noticias_silver"
csv_path = "/Volumes/sesion_5/bronze/raw_files/Noticias.csv"

# Usamos la base de datos que hemos creado a lo largo del curso.
spark.sql(f"USE {db_name}")

print(f"Base de datos activa: '{db_name}'")
print(f"Ruta del archivo de noticias: '{csv_path}'")

### Paso 1: Ingesta a la Capa Bronce (Bronze)

Recordemos el principio de la capa Bronce: **es un archivo fiel y sin procesar de los datos de origen**.

Nuestra primera tarea es tomar el archivo `Noticias.csv` y cargarlo en una tabla Delta sin aplicar ninguna transformación. Esto nos da un punto de partida auditable y nos permite reprocesar los datos en el futuro si las reglas de negocio cambian.

In [0]:
# Leer el archivo CSV y guardarlo como una tabla Delta en la capa Bronce

# Leemos el archivo CSV.
# - "header=True": Usa la primera fila como nombres de columna.
# - "inferSchema=True": Spark intentará adivinar los tipos de datos. Para la capa Bronce, esto es aceptable.
noticias_raw_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_path)

# Guardamos el DataFrame como una tabla Delta.
# - "mode("overwrite")": Si la tabla ya existe, la reemplazará.
# - "saveAsTable(...)": Guarda los datos y registra la tabla en el catálogo.
noticias_raw_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(bronze_table_name)

print(f"Tabla '{bronze_table_name}' creada exitosamente en la capa Bronce.")

In [0]:
%sql
-- Verificación Rápida de la Capa Bronce

-- Echemos un vistazo a los datos tal como fueron ingeridos.
-- Observa que los datos están crudos, con los nombres de columna originales
-- y sin ninguna limpieza aplicada.

SELECT * FROM noticias_bronze LIMIT 5;

### Paso 2: Hacia la Capa Plata (Silver) - Limpieza y Estructuración

La capa Plata es donde ocurre la magia. Aquí transformamos los datos crudos en un activo de datos confiable y bien estructurado.

Para replicar un **modelo de documentos**, no solo limpiaremos los datos, sino que también los reestructuraremos. Nuestro objetivo es agrupar campos relacionados en una sola columna `STRUCT`, creando una entidad cohesiva y fácil de consultar.

**Plan de Transformación:**
1.  **Renombrar columnas:** Usar nombres claros y consistentes (ej. `Enlaces` -> `enlace`).
2.  **Generar un ID único:** Crear una clave primaria (`id_noticia`) para identificar cada documento.
3.  **Extraer y crear metadatos:** Extraer la fecha (simulada por ahora) y la etiqueta (`Etiqueta`) para anidarlas en una nueva columna `metadatos`.
4.  **Limpieza:** Eliminar filas donde el contenido de la noticia esté vacío.
5.  **Tipado de datos:** Asegurar que cada columna tenga el tipo de dato correcto.

In [0]:
# Cargar datos de la capa Bronce para iniciar la transformación

df_bronze = spark.read.table(bronze_table_name)

print("Datos de la capa Bronce cargados y listos para transformar.")
df_bronze.printSchema()

In [0]:
from pyspark.sql.functions import col
df_bronze.groupBy("Etiqueta").count().orderBy(col("Count").desc()).show()

In [0]:
categories = [
    "archivo",
    "colombia",
    "opinion",
    "deportes",
    "cultura",
    "economia",
    "bogota",
    "justicia",
    "mundo",
    "vida",
    "politica",
    "tecnosfera",
    "salud",
    "historias-el-tiempo",
    "contenido-comercial",
    "mundial",
    "elecciones",
    "podcast"
]
df_bronze=df_bronze.filter(col("Etiqueta").isin(categories))
df_bronze.groupBy("Etiqueta").count().orderBy(col("Count").desc()).show()


In [0]:
df_bronze.groupBy("Etiqueta").count().orderBy(col("Count").desc()).show()# Aplicar las transformaciones de limpieza y estructuración

from pyspark.sql.functions import col, sha2, concat_ws, current_timestamp, struct

# Aplicamos la lógica de transformación
df_silver = df_bronze \
    .withColumnRenamed("Enlaces", "enlace") \
    .withColumnRenamed("Título", "titulo") \
    .withColumnRenamed("Etiqueta", "etiqueta") \
    .withColumn("id_noticia", sha2(concat_ws("||", col("titulo"), col("enlace")), 256)) \
    .withColumn("metadatos", struct(
        col("etiqueta"),
        current_timestamp().alias("fecha_ingesta") # Simulamos una fecha
    )) \
    .select(
        "id_noticia",
        "enlace",
        "titulo",
        "info",
        "contenido",
        "metadatos"
    ) \
    .filter(col("contenido").isNotNull())

print("Transformaciones a la capa Plata completadas.")
df_silver.printSchema()

### El Poder de `STRUCT`: Datos Anidados en una Tabla

La celda anterior es el corazón de nuestro modelo documental. La función `struct()` nos permitió crear una columna (`metadatos`) que es, en sí misma, una pequeña tabla. Contiene los campos `etiqueta` y `fecha_ingesta`.

Esto es análogo a un objeto JSON anidado en un documento de MongoDB. Nos da la capacidad de agrupar información contextual sin necesidad de crear una tabla separada y un `JOIN`.

In [0]:
# Guardar la tabla transformada en la Capa Plata

df_silver.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable(silver_table_name)

print(f"Tabla '{silver_table_name}' creada exitosamente en la capa Plata.")

In [0]:
%sql
-- Explorando la Tabla Silver: Nuestro "Documento" Estructurado

-- Ahora, consultemos nuestra nueva tabla.
-- Fíjate en la columna `metadatos`: es una estructura compleja.
-- ¡Hemos creado una colección de documentos dentro de nuestro Lakehouse!

SELECT * FROM noticias_silver LIMIT 5;

In [0]:
%sql DESCRIBE HISTORY noticias_silver

### Paso 3: Consultando y Evolucionando el Modelo

Ahora que tenemos nuestra tabla `noticias_silver` con datos anidados, podemos explotar su flexibilidad.

#### Consultando Campos Anidados
Una de las grandes ventajas de Spark SQL es que puede consultar campos dentro de un `STRUCT` de manera muy intuitiva, usando la **notación de punto**, igual que harías en muchos lenguajes de programación.

In [0]:
%sql
-- Usando la notación de punto para acceder a campos dentro del STRUCT

SELECT
  id_noticia,
  titulo,
  metadatos.etiqueta AS categoria, -- Accedemos al campo 'etiqueta'
  metadatos.fecha_ingesta -- Accedemos al campo 'fecha_ingesta'
FROM noticias_silver
WHERE metadatos.etiqueta = 'colombia'

LIMIT 10;