## Data Quality: Confianza en tus Datos

La Calidad de Datos es el proceso de asegurar que los datos sean **precisos, consistentes, completos y fiables** a lo largo de todo su ciclo de vida. Es el sistema inmunitario de tu Lakehouse, protegiéndolo del famoso principio: **"Basura entra, basura sale" (Garbage In, Garbage Out)**.

Sin un framework de Data Quality, podrías estar tomando decisiones de negocio críticas basadas en datos incorrectos, duplicados o incompletos, lo que erosionaría por completo la confianza en tu plataforma de datos.

-----

### ¿Dónde se Aplican las Reglas de Calidad?

El punto más crítico para aplicar las reglas de calidad es en la transición de la **capa Bronce a la Plata**. La capa Bronce es el "salvaje oeste"; acepta todo tal como viene. La capa Plata, en cambio, es nuestra "ciudadela de la confianza". El pipeline que pasa datos de Bronce a Plata actúa como el **guardián en la puerta**, inspeccionando cada registro antes de dejarlo entrar.

-----

### Las Dimensiones de la Calidad de Datos

"Calidad" es un término abstracto. En la práctica, lo medimos a través de varias dimensiones. Aquí las más comunes aplicadas a nuestro dataset de Olist:

| Dimensión | Pregunta que Responde | Ejemplo en Nuestro Proyecto |
| :--- | :--- | :--- |
| **Completitud** | ¿Están presentes todos los datos que necesito? | ¿Hay algún pedido en la tabla `orders` que tenga un `customer_id` nulo? (¡No debería\!) |
| **Unicidad** | ¿Hay registros duplicados? | ¿Existe el mismo `product_id` más de una vez en nuestra `dim_productos`? (¡No debería\!) |
| **Validez** | ¿Los datos se ajustan al formato o a las reglas de negocio? | ¿El `order_status` es uno de los valores permitidos ('delivered', 'shipped', etc.)? ¿Hay algún `precio` negativo? |
| **Consistencia** | ¿Mis datos son coherentes entre diferentes tablas? | ¿Todos los `customer_id` en la tabla `fact_pedidos` existen en `dim_clientes`? (Las llaves foráneas nos ayudan con esto). |
| **Precisión** | ¿El dato es correcto y refleja la realidad? | ¿La suma de los `payment_value` de un pedido coincide con la suma de `price` + `freight_value` de sus items? |

-----

###  ¿Cómo Implementarlo en Databricks?

Databricks ofrece una solución muy potente y moderna para esto llamada **Delta Live Tables (DLT)** con **Expectations**.

**¿Qué son las "Expectations"?**
Son reglas o "contratos" que tú defines sobre tus datos. Le dices a DLT qué esperas que sea verdad, y DLT se encarga de validar los datos y tomar acciones.

**Ejemplo de cómo se vería en un pipeline de DLT:**

```sql
-- En un notebook de Delta Live Tables
CREATE STREAMING LIVE TABLE customers_silver (
  CONSTRAINT customer_id_not_null EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW,
  CONSTRAINT valid_state EXPECT (customer_state IN ('SP', 'RJ', 'MG', ...)) ON VIOLATION FAIL UPDATE
)
AS SELECT * FROM STREAM(sesion_5.bronze.customers_raw);
```

En este ejemplo:

  * `EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW`: Si un cliente llega con un ID nulo, esa fila se descarta y no llega a la capa Plata.
  * `EXPECT (customer_state IN ('SP', ...)) ON VIOLATION FAIL UPDATE`: Si llega un cliente con un estado inválido, el pipeline se detiene por completo y alerta a los desarrolladores para que investiguen el problema.

### Añadiendo un Paso de Calidad a Nuestro Notebook Actual

Aunque DLT es la mejor herramienta, podemos simular un paso de validación simple en nuestro notebook actual para entender el concepto. Podríamos añadir una fase nueva después de la exploración (EDA) y antes de la transformación a Silver.


## Fase 3.5: Implementación de Guardianes de Calidad (Data Quality Gates)

**Objetivo:** Validar la calidad de nuestros datos de la capa Bronce antes de que pasen a la Plata. Si los datos no cumplen con nuestras reglas de negocio, detendremos el pipeline para evitar la corrupción de datos.

Para lograr esto, crearemos una función de ayuda en Python que:
1.  Ejecuta una consulta SQL para contar el número de "filas malas" según una regla.
2.  Si el conteo es mayor que cero, lanza una excepción con un mensaje claro, lo que detendrá la ejecución del notebook y hará que el Job falle.
3.  Si el conteo es cero, imprime un mensaje de éxito.

Este es nuestro "guardián" automatizado.

In [0]:
%sql
USE sesion_5.bronze

In [0]:
# 1. Definir la configuración usando nuestra nueva estructura de catálogo
catalog_name = "sesion_5"
bronze_schema = "bronze"
silver_schema = "silver"

# 2. Definir la ruta al volumen de la capa Bronze
bronze_volume_path = f"/Volumes/{catalog_name}/{bronze_schema}/raw_files"

# 3. Cargar todos los datasets desde el volumen Bronze en DataFrames
print(f"Leyendo archivos desde: {bronze_volume_path}...")
options = {"header": "true", "inferSchema": "true"}

customers_df = spark.read.options(**options).csv(f"{bronze_volume_path}/olist_customers_dataset.csv")
order_items_df = spark.read.options(**options).csv(f"{bronze_volume_path}/olist_order_items_dataset.csv")
order_payments_df = spark.read.options(**options).csv(f"{bronze_volume_path}/olist_order_payments_dataset.csv")
orders_df = spark.read.options(**options).csv(f"{bronze_volume_path}/olist_orders_dataset.csv")

print("\n¡Carga completada! Los DataFrames de la capa Bronze están en memoria.")

In [0]:
# 1. Registrar todos los DataFrames relacionados con los pedidos como vistas temporales
orders_df.createOrReplaceTempView("orders_bronze_vw")
order_items_df.createOrReplaceTempView("order_items_bronze_vw")
order_payments_df.createOrReplaceTempView("order_payments_bronze_vw")
customers_df.createOrReplaceTempView("customers_bronze_vw")

print("Vistas temporales para pedidos, items y pagos creadas.")

In [0]:
%sql
-- Chequeo 1: Verificar que no haya pedidos sin cliente (Completitud)
SELECT
  COUNT(*) AS pedidos_sin_cliente
FROM orders_bronze_vw
WHERE customer_id IS NULL;

-- Chequeo 2: Verificar que no haya precios negativos (Validez)
SELECT
  COUNT(*) AS items_con_precio_negativo
FROM order_items_bronze_vw
WHERE price < 0;

-- Chequeo 3: Verificar duplicados en clientes (Unicidad)
SELECT
  customer_id,
  COUNT(*) as numero_de_apariciones
FROM customers_bronze_vw
GROUP BY customer_id
HAVING COUNT(*) > 1;

En un pipeline real, si alguna de estas consultas devuelve un resultado mayor a cero, podríamos decidir detener la ejecución del notebook y enviar una alerta. Esto asegura que solo datos que cumplen con un estándar mínimo de calidad sean procesados y guardados en la capa Silver.

In [0]:
# Función de ayuda para ejecutar nuestras validaciones de calidad de datos
def ejecutar_validacion_calidad(query, descripcion_validacion):
  """
  Ejecuta una consulta SQL que se espera que devuelva 0.
  Si devuelve un valor mayor a 0, lanza una excepción para detener el pipeline.
  
  :param query: La consulta SQL que cuenta las filas que violan la regla.
  :param descripcion_validacion: Una descripción clara de lo que se está validando.
  """
  print(f"Ejecutando validación: '{descripcion_validacion}'...")
  
  # Ejecutamos la consulta y obtenemos el resultado
  df_resultado = spark.sql(query)
  conteo_filas_malas = df_resultado.first()[0]

  display(df_resultado)

  display(conteo_filas_malas)
  
  # Verificamos si la validación falló
  if conteo_filas_malas > 0:
    # Si hay filas malas, lanzamos una excepción para detener todo.
    # El mensaje de la excepción aparecerá en los logs del Job.
    raise Exception(f"VALIDACIÓN FALLIDA: {descripcion_validacion}. Se encontraron {conteo_filas_malas} filas que violan la regla.")
  else:
    # Si todo está bien, imprimimos un mensaje de éxito.
    print(f"VALIDACIÓN EXITOSA: {descripcion_validacion}. No se encontraron problemas. ✔️")

# --- ¡Ahora usamos nuestra función para definir nuestras reglas! ---

# REGLA 1: La llave primaria de un cliente NUNCA debe ser nula.
query_pk_nula_cliente = "SELECT COUNT(*) FROM customers_bronze_vw WHERE customer_id IS NULL"
ejecutar_validacion_calidad(query_pk_nula_cliente, "La columna 'customer_id' en la tabla de clientes no debe contener nulos.")

# REGLA 2: El precio de un item NUNCA debe ser negativo.
query_precio_negativo = "SELECT COUNT(*) FROM order_items_bronze_vw WHERE price < 0"
ejecutar_validacion_calidad(query_precio_negativo, "La columna 'price' en la tabla de items no debe contener valores negativos.")

# REGLA 3: El estado de un pedido debe ser uno de los valores conocidos.
query_estado_invalido = """
  SELECT COUNT(*) 
  FROM orders_bronze_vw 
  WHERE order_status NOT IN ('delivered', 'shipped', 'canceled', 'invoiced', 'processing', 'approved', 'unavailable', 'created')
"""
ejecutar_validacion_calidad(query_estado_invalido, "La columna 'order_status' contiene valores no permitidos.")

# REGLA 4: No debe haber pedidos sin al menos un item.
query_pedidos_sin_items = """
  SELECT COUNT(o.order_id)
  FROM orders_bronze_vw o
  LEFT JOIN order_items_bronze_vw i ON o.order_id = i.order_id
  WHERE i.order_id IS NULL
"""
ejecutar_validacion_calidad(query_pedidos_sin_items, "Existen pedidos en la tabla 'orders' que no tienen ningún item asociado en 'order_items'.")

## Desglose de la Función `ejecutar_validacion_calidad`

Aquí está el código comentado línea por línea para entender qué hace cada parte:

```python
# La función acepta dos argumentos:
# 1. 'query': Una cadena de texto que contiene una consulta SQL.
# 2. 'descripcion_validacion': Una explicación en lenguaje natural de la regla que estamos probando.
def ejecutar_validacion_calidad(query, descripcion_validacion):
  """
  Ejecuta una consulta SQL que se espera que devuelva 0.
  Si devuelve un valor mayor a 0, lanza una excepción para detener el pipeline.
  """
  
  # Imprimimos un mensaje para saber qué regla se está ejecutando en este momento.
  # Esto es muy útil para depurar los logs de un Job.
  print(f"Ejecutando validación: '{descripcion_validacion}'...")
  
  # Aquí usamos Spark para ejecutar la consulta SQL que le pasamos como argumento.
  # El resultado de la consulta (que es una sola fila con una sola columna de conteo)
  # se guarda en un DataFrame llamado 'df_resultado'.
  df_resultado = spark.sql(query)
  
  # Un DataFrame es como una tabla. Para obtener el valor real del conteo, hacemos dos cosas:
  # 1. .first(): Obtenemos la primera (y única) fila del DataFrame.
  # 2. [0]: De esa fila, obtenemos el valor de la primera (y única) columna.
  # Ahora, 'conteo_filas_malas' es un número (ej. 0, 5, 100).
  conteo_filas_malas = df_resultado.first()[0]
  
  # --- AQUÍ ESTÁ EL CORAZÓN DE LA LÓGICA ---
  # Comparamos el número que obtuvimos con 0.
  if conteo_filas_malas > 0:
    
    # Si el conteo es MAYOR que cero, significa que encontramos datos que violan la regla.
    # Esta es la parte crítica: 'raise Exception(...)'.
    # 'raise' es la palabra clave de Python para generar un error intencionadamente.
    # Al hacer esto, la ejecución del notebook se detiene por completo y se marca como "Fallida".
    # El mensaje que escribimos aquí es el que aparecerá en el error del Job,
    # diciéndonos exactamente qué falló y cuántas filas malas se encontraron.
    raise Exception(f"VALIDACIÓN FALLIDA: {descripcion_validacion}. Se encontraron {conteo_filas_malas} filas que violan la regla.")
  
  else:
    # Si el conteo es igual a cero, significa que no se encontraron filas malas.
    # La regla se cumplió, así que imprimimos un mensaje de éxito y la función termina.
    # El notebook continúa su ejecución hacia la siguiente celda.
    print(f"VALIDACIÓN EXITOSA: {descripcion_validacion}. No se encontraron problemas. ✔️")
```

-----

## La Lógica en Acción: Un Ejemplo Práctico

Imagina que llamamos a la función así:

```python
# REGLA: El precio de un item NUNCA debe ser negativo.
query_precio_negativo = "SELECT COUNT(*) FROM order_items_bronze_vw WHERE price < 0"
ejecutar_validacion_calidad(query_precio_negativo, "El precio no debe ser negativo.")
```

**Escenario 1: Los datos están limpios**

1.  La consulta `SELECT COUNT(*)...` se ejecuta y devuelve `0`.
2.  `conteo_filas_malas` se convierte en `0`.
3.  La condición `if 0 > 0` es **falsa**.
4.  Se ejecuta el bloque `else`, imprimiendo el mensaje de "VALIDACIÓN EXITOSA".
5.  El pipeline continúa felizmente hacia la siguiente fase.

**Escenario 2: Hay 3 items con precio negativo**

1.  La consulta `SELECT COUNT(*)...` se ejecuta y devuelve `3`.
2.  `conteo_filas_malas` se convierte en `3`.
3.  La condición `if 3 > 0` es **verdadera**.
4.  Se ejecuta el bloque `if`, y la línea `raise Exception(...)` detiene todo.
5.  El Job de Databricks se marca como **"Failed"** y en la notificación de error recibirás el mensaje: `"VALIDACIÓN FALLIDA: El precio no debe ser negativo. Se encontraron 3 filas que violan la regla."`

En resumen, esta función es un patrón de diseño simple pero increíblemente poderoso que te permite construir **pipelines de datos defensivos**, es decir, pipelines que se protegen a sí mismos de datos de mala calidad.


## ¿Qué Acaba de Pasar?

El mensaje de error es muy claro:
`"VALIDACIÓN FALLIDA: Existen pedidos en la tabla 'orders' que no tienen ningún item asociado en 'order_items'. Se encontraron 775 filas que violan la regla."`

Esto significa que tu guardián de calidad de datos revisó los datos crudos de la capa Bronce y descubrió que hay **775 pedidos que no tienen ni un solo producto asociado**.

### ¿Por Qué Esto es un Problema de Negocio?

Un pedido sin productos es un dato anómalo. Podría significar varias cosas:

  * **Registros Incompletos**: Un error en el sistema de origen que creó el pedido pero no registró los artículos.
  * **Carritos Abandonados**: Podrían ser carritos que se iniciaron pero nunca se completaron.
  * **Fraude o Pruebas**: Podrían ser registros de prueba o intentos de fraude.

Si permitiéramos que estos 775 "pedidos fantasma" llegaran a nuestra capa Silver, cualquier análisis sobre el "número total de pedidos" estaría **inflado y sería incorrecto**. Un ejecutivo podría pensar que hubo 775 ventas más de las que realmente ocurrieron.

-----

## ¿Cómo se Soluciona en un Entorno Real?

Ahora que el guardián ha hecho su trabajo, como arquitecto de datos, tienes que tomar una decisión. El siguiente paso es actuar.

1.  **Investigar (¿Quiénes son?)**: El primer paso sería analizar esas 775 filas. ¿Son de un período de tiempo específico? ¿De un tipo de cliente? Esto nos daría pistas sobre la causa raíz.

2.  **Decidir la Estrategia (¿Qué hacemos con ellos?)**:

      * **Opción A (La más común): Filtrarlos.** Decidimos que estos registros no son válidos para el análisis y los excluimos de la capa Silver. Nos aseguramos de que solo los pedidos completos y válidos lleguen a nuestra fuente de la verdad.
      * **Opción B (Avanzada): Ponerlos en Cuarentena.** Movemos estas 775 filas a una "tabla de registros rechazados" para que un equipo pueda analizarlos más tarde o corregirlos manualmente.
      * **Opción C (Ideal): Corregir el Origen.** Contactamos al equipo que gestiona el sistema de e-commerce para informarles del problema y que lo solucionen en el sistema de origen.

### La Solución para Nuestro Taller (Opción A)

Para nuestro taller, la decisión es simple: **vamos a filtrar estos registros**.

La buena noticia es que el código que escribimos para crear la `fact_pedidos` **ya hace esto implícitamente**. Observa la consulta:

```sql
FROM orders_bronze_vw o
JOIN order_items_bronze_vw i ON o.order_id = i.order_id -- ¡Aquí está la clave!
...
```

Al usar un `JOIN` (que por defecto es un `INNER JOIN`), ya estamos diciendo "solo quiero los pedidos que tengan una correspondencia en la tabla de items". Es decir, ya estábamos filtrando correctamente. La validación que creamos fue simplemente una forma de **hacer explícito y consciente** este problema de calidad de datos.

**Para continuar, simplemente elimina o comenta la celda de validación que falló.** Hemos aprendido la lección: los datos crudos no son confiables, y nuestro `INNER JOIN` es la decisión de diseño correcta para asegurar la calidad en la capa Silver.