# Prueba Técnica: Pipeline de Procesamiento de Datos con PySpark y OmegaConf

Este notebook implementa un pipeline ETL completo para procesar datos de entregas de productos. El proceso es totalmente parametrizable a través de un archivo de configuración `config.yaml`, utiliza PySpark para las transformaciones y está diseñado para ser robusto y auditable.

**Entorno de Ejecución:**
*   **Databricks Runtime:** 17.0
*   **Apache Spark:** 4.0.0
*   **OmegaConf:** 2.3.0

**El flujo de trabajo se divide en las siguientes fases:**
1.  **Configuración y Herramientas:** Se definen y cargan la configuración y las funciones de utilidad.
2.  **Definición del Pipeline:** Se define la lógica ETL principal encapsulada en una clase.
3.  **Ejecución del Pipeline:** Se ejecutan varios escenarios de prueba para demostrar la flexibilidad del pipeline.
4.  **Auditoría de Resultados:** Se verifican los datos escritos en la capa final.

## Fase 0: Configuración y Herramientas

En esta fase inicial, importamos las librerías necesarias, establecemos la estructura de directorios y generamos/cargamos la configuración `config.yaml` que gobernará todo el proceso.

#### 0.1 - Importación de Librerías

In [0]:
# --- Librerías Principales de PySpark ---
from pyspark.sql import SparkSession, DataFrame # Para iniciar Spark y manejar su estructura de datos principal (el DataFrame).
from pyspark.sql.functions import col, lit, when, to_date, current_timestamp, sum as spark_sum, count, isnull, concat_ws # Herramientas para manipular y transformar columnas.
from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType, TimestampType, StructType, StructField # Para definir la estructura (schema) de nuestros datos.

# --- Librerías para Configuración y Sistema de Archivos ---
from omegaconf import OmegaConf # Ayuda a leer y manejar nuestro archivo de configuración YAML de forma sencilla.
import yaml # La librería base para trabajar con archivos YAML en Python.
from pyspark.dbutils import DBUtils # Utilidades de Databricks para interactuar con su sistema de archivos (DBFS).

# --- Librerías Auxiliares de Python ---
import os
from datetime import datetime

# Inicialización de utilidades de Databricks
dbutils = DBUtils(spark)

print("✅ Librerías y utilidades cargadas correctamente.")

In [0]:
# Versión de Apache Spark que está corriendo en el clúster
print(spark.version)

In [0]:
# Versión de la librería OmegaConf previamente instalada en el clúster
import omegaconf
print(omegaconf.__version__)

#### 0.2 - Creación de la Estructura de Directorios

Aseguramos que la estructura de carpetas necesaria para el pipeline exista en DBFS.

In [0]:
# Definimos la ruta base para todo el proyecto
base_path = "/FileStore/etl_pipeline_final"

# Lista de directorios a crear
directories = [
    f"{base_path}/config",
    f"{base_path}/bronze",      # Para los datos de entrada (crudos)
    f"{base_path}/silver",      # Para los datos procesados y limpios
    f"{base_path}/quarantine",  # Para los registros que no pasen las validaciones
    f"{base_path}/logs"
]

# Creamos los directorios
for directory in directories:
    dbutils.fs.mkdirs(directory)

print(f"✅ Estructura de directorios creada o verificada en: {base_path}")

#### 0.3 - Generación Dinámica del Archivo `config.yaml`

Para esta demostración, el archivo `config.yaml` se genera y guarda directamente desde el notebook. En un entorno productivo, este archivo existiría de forma independiente en el repositorio.

In [0]:
# Contenido completo del archivo de configuración
yaml_config_content = f"""
# =============================================================================
# Configuración Central para el Pipeline ETL de Entregas
# =============================================================================

# --- Rutas del Sistema de Archivos ---
paths:
  base: "{base_path}"
  input_data: "${{paths.base}}/bronze/entregas_productos_prueba.csv"
  output_processed: "${{paths.base}}/silver/entregas_productos_procesados"
  output_quarantine: "${{paths.base}}/quarantine/entregas_productos_invalidos"

# --- Parámetros de Ejecución ---
# Estos son los parámetros que típicamente se cambian en cada ejecución.
run_params:
  start_date: "2025-01-01"   # Formato: YYYY-MM-DD
  end_date: "2025-06-30"     # Formato: YYYY-MM-DD
  country_code: "GT"         # Código de país a procesar (ej: GT, PE, SV)

# --- Reglas de Negocio y Transformaciones ---
business_logic:
  unit_conversion:
    CS: 20
    ST: 1
  
  delivery_types:
    routine: ["ZPRE", "ZVE1"]
    bonus: ["Z04", "Z05"]

# --- Controles de Calidad de Datos ---
data_quality:
  allow_duplicates: false # Si es 'true', no se eliminarán filas duplicadas.
  
  # Columnas que no pueden ser nulas o vacías para que un registro sea válido.
  required_columns: ["pais", "fecha_proceso", "material", "cantidad", "unidad", "tipo_entrega"]

# --- Estructura y Enriquecimiento del Resultado Final ---
schema_and_enrichment:
  # Mapeo para estandarizar los nombres de las columnas.
  column_rename_map:
    pais: "codigo_pais"
    fecha_proceso: "fecha_proceso"
    transporte: "id_transporte"
    ruta: "id_ruta"
    material: "id_material"
    precio: "precio"
    cantidad: "cantidad_original"
    unidad: "unidad_original"
  
  # Mapeo para añadir el nombre completo del país.
  country_mapping:
    GT: "Guatemala"
    PE: "Perú"
    EC: "Ecuador"
    SV: "El Salvador"
    HN: "Honduras"
    JM: "Jamaica"

# --- Configuración de Salida ---
output:
  partition_column: "fecha_proceso"
  format: "parquet"
  mode: "overwrite"
"""

# Guardamos el contenido en un archivo en DBFS
config_path_dbfs = f"{base_path}/config/config.yaml"
dbutils.fs.put(config_path_dbfs, yaml_config_content, overwrite=True)

print(f"✅ Archivo config.yaml generado y guardado en: {config_path_dbfs}")
print("\n--- Vista Previa del Archivo ---")
print(dbutils.fs.head(config_path_dbfs, 500))

#### 0.4 - Carga de la Configuración

Leemos el archivo `config.yaml` y lo cargamos en un objeto de OmegaConf para un fácil acceso durante el pipeline.

In [0]:
def dbfs_to_local_path(dbfs_path: str) -> str:
    """Convierte una ruta DBFS a una ruta de sistema de archivos local."""
    return dbfs_path.replace("dbfs:", "/dbfs")

try:
    # Convertimos la ruta DBFS a una que OmegaConf pueda leer
    local_config_path = dbfs_to_local_path(config_path_dbfs)
    
    # Cargamos la configuración en una variable global para el notebook
    CONFIG = OmegaConf.load(local_config_path)
    
    print("✅ Configuración cargada exitosamente en la variable 'CONFIG'.")
    print(f"País a procesar por defecto: {CONFIG.run_params.country_code}")
    print(f"Ruta de entrada: {CONFIG.paths.input_data}")
    
except Exception as e:
    print(f"❌ Error al cargar la configuración: {e}")
    # Detenemos la ejecución si la configuración no se puede cargar
    dbutils.notebook.exit("Fallo en la carga de configuración.")

## Fase 1: Definición del Pipeline (Lógica ETL)

Encapsulamos toda la lógica de extracción, transformación y carga en una clase `DataPipeline`. Esto mantiene el código organizado, reutilizable y fácil de entender.

In [0]:
class DataPipeline:
    """
    Clase que contiene toda la lógica para procesar los datos de entregas.
    """
    def __init__(self, spark_session: SparkSession, config: OmegaConf):
        """
        Inicializa el pipeline con la sesión de Spark y la configuración.
        """
        self.spark = spark_session
        self.config = config
        
        # Definimos el esquema de entrada para una lectura de datos más robusta
        self.input_schema = StructType([
            StructField("pais", StringType(), True),
            StructField("fecha_proceso", StringType(), True), # Leemos como string para manejar el formato yyyyMMdd
            StructField("transporte", StringType(), True),
            StructField("ruta", StringType(), True),
            StructField("tipo_entrega", StringType(), True),
            StructField("material", StringType(), True),
            StructField("precio", DoubleType(), True),
            StructField("cantidad", DoubleType(), True),
            StructField("unidad", StringType(), True)
        ])

    def read_source_data(self) -> DataFrame:
        """Lee los datos desde la ruta de entrada especificada en la configuración."""
        print(f"📖 Leyendo datos desde {self.config.paths.input_data}...")
        df = self.spark.read \
            .option("header", "true") \
            .schema(self.input_schema) \
            .csv(self.config.paths.input_data)
        print(f"   Total de registros leídos: {df.count()}")
        return df

    def clean_and_quarantine_data(self, df: DataFrame) -> (DataFrame, DataFrame):
        """
        Aplica reglas de calidad, elimina duplicados y separa los datos válidos
        de los inválidos (cuarentena).
        """
        print("🧹 Aplicando limpieza y separando datos inválidos...")
        
        # 1. Manejo de duplicados
        if not self.config.data_quality.allow_duplicates:
            initial_count = df.count()
            df = df.dropDuplicates()
            print(f"   - Duplicados eliminados: {initial_count - df.count()} filas.")

        # 2. Identificar registros inválidos
        reasons = []
        # Chequeo de nulos en columnas requeridas
        for col_name in self.config.data_quality.required_columns:
            reasons.append(when(col(col_name).isNull(), f"Columna '{col_name}' es nula"))
        
        # Chequeo de tipos de entrega válidos
        valid_delivery_types = self.config.business_logic.delivery_types.routine + self.config.business_logic.delivery_types.bonus
        reasons.append(when(~col("tipo_entrega").isin(valid_delivery_types), "Tipo de entrega no válido"))

        # Chequeo de material vacío
        reasons.append(when(col("material") == "", "Material está vacío"))

        # Concatenar todas las razones de rechazo
        df_with_reason = df.withColumn("rejection_reason",
            concat_ws("; ", *[r for r in reasons if r is not None])
        )
        # Un registro es válido si no tiene razón de rechazo
        df_with_validity = df_with_reason.withColumn("is_valid", col("rejection_reason") == "")

        # 3. Separar en dos DataFrames
        df_clean = df_with_validity.filter(col("is_valid")).drop("is_valid", "rejection_reason")
        df_quarantine = df_with_validity.filter(~col("is_valid")).select(df.columns + ["rejection_reason"])

        print(f"   - Registros válidos para procesar: {df_clean.count()}")
        print(f"   - Registros enviados a cuarentena: {df_quarantine.count()}")
        
        return df_clean, df_quarantine

    def transform_data(self, df: DataFrame) -> DataFrame:
        """Aplica todas las transformaciones de negocio al DataFrame limpio."""
        print("✨ Aplicando transformaciones de negocio...")

        # 1. Convertir fecha al formato correcto y filtrar por rango
        df_transformed = df.withColumn("fecha_proceso_dt", to_date(col("fecha_proceso"), "yyyyMMdd"))
        
        df_filtered = df_transformed.filter(
            (col("fecha_proceso_dt") >= lit(self.config.run_params.start_date)) &
            (col("fecha_proceso_dt") <= lit(self.config.run_params.end_date)) &
            (col("pais") == lit(self.config.run_params.country_code))
        )
        print(f"   - Registros después de filtrar por fecha y país: {df_filtered.count()}")

        # 2. Renombrar columnas
        for old_name, new_name in self.config.schema_and_enrichment.column_rename_map.items():
            df_filtered = df_filtered.withColumnRenamed(old_name, new_name)
        print("   - Columnas renombradas según estándar.")

        # 3. Normalizar unidades
        unit_map = self.config.business_logic.unit_conversion
        df_filtered = df_filtered.withColumn("cantidad_total_unidades",
            when(col("unidad_original") == "CS", col("cantidad_original") * unit_map.CS)
            .when(col("unidad_original") == "ST", col("cantidad_original") * unit_map.ST)
            .otherwise(col("cantidad_original"))
        )
        print("   - Cantidades normalizadas a unidades.")

        # 4. Clasificar tipo de entrega
        delivery_map = self.config.business_logic.delivery_types
        df_filtered = df_filtered \
            .withColumn("es_entrega_rutina", col("tipo_entrega").isin(delivery_map.routine).cast("boolean")) \
            .withColumn("es_entrega_bonificacion", col("tipo_entrega").isin(delivery_map.bonus).cast("boolean"))
        print("   - Tipos de entrega clasificados.")

        # 5. Enriquecer con nombre del país
        country_map = self.config.schema_and_enrichment.country_mapping
        mapping_expr = when(col("codigo_pais") == "GT", country_map.GT)
        for code, name in country_map.items():
            mapping_expr = mapping_expr.when(col("codigo_pais") == code, name)
        df_filtered = df_filtered.withColumn("nombre_pais", mapping_expr.otherwise("Desconocido"))
        print("   - Nombre del país añadido.")

        # 6. Añadir metadatos de ejecución
        df_final = df_filtered.withColumn("fecha_ejecucion_etl", current_timestamp())
        print("   - Metadatos de ejecución añadidos.")

        return df_final
        
    def write_data(self, df: DataFrame, path: str):
        """Escribe un DataFrame en la ruta especificada, particionado por fecha."""
        if df.count() == 0:
            print(f"⚠️ No hay datos para escribir en {path}. Se omite la escritura.")
            return

        print(f"💾 Escribiendo {df.count()} registros en {path}...")
        
        df.write \
          .mode(self.config.output.mode) \
          .partitionBy(self.config.output.partition_column) \
          .format(self.config.output.format) \
          .save(path)
        
        print("   - Escritura completada.")

print("✅ Clase DataPipeline definida correctamente.")

## Fase 2: Orquestación y Ejecución del Pipeline

Aquí definimos una función `run_pipeline` que orquesta todos los pasos: instancia la clase `DataPipeline`, llama a sus métodos en orden y maneja la escritura de los datos procesados y de cuarentena.

In [0]:
def run_pipeline(config: OmegaConf, **overrides):
    """
    Orquesta la ejecución completa del pipeline para una configuración dada.
    Permite sobrescribir parámetros de la configuración para ejecuciones específicas.
    """
    run_config = config.copy()
    if overrides:
        # Si se pasan parámetros extra (ej: pais="PE"), se fusionan con la config
        print(f"🔩 Sobrescribiendo configuración con: {overrides}")
        override_conf = OmegaConf.create({"run_params": overrides})
        run_config = OmegaConf.merge(run_config, override_conf)

    execution_name = f"Ejecución para {run_config.run_params.country_code} ({run_config.run_params.start_date} a {run_config.run_params.end_date})"
    print(f"\n🚀 --- Iniciando: {execution_name} --- 🚀")
    
    # 1. Instanciar el pipeline
    pipeline = DataPipeline(spark, run_config)
    
    # 2. Leer datos
    df_raw = pipeline.read_source_data()
    
    # 3. Limpiar y separar
    df_clean, df_quarantine = pipeline.clean_and_quarantine_data(df_raw)
    
    # 4. Transformar
    df_transformed = pipeline.transform_data(df_clean)
    
    # 5. Inspección Pre-Escritura
    print("\n🕵️ --- Inspección Pre-Escritura (Datos Procesados) ---")
    if df_transformed.count() > 0:
        df_transformed.select(
            "codigo_pais", "fecha_proceso", "id_material", "cantidad_total_unidades", 
            "es_entrega_rutina", "es_entrega_bonificacion", "nombre_pais"
        ).show(5, truncate=False)
    else:
        print("No hay datos procesados para mostrar.")

    # 6. Escribir resultados
    pipeline.write_data(df_transformed, run_config.paths.output_processed)
    pipeline.write_data(df_quarantine, run_config.paths.output_quarantine)
    
    print(f"🏁 --- Finalizado: {execution_name} --- 🏁\n")


print("✅ Función de orquestación `run_pipeline` definida.")

## Fase 3: Ejecuciones de Prueba

Ahora usamos la función `run_pipeline` para ejecutar el proceso con diferentes configuraciones, demostrando su parametrización y flexibilidad.

#### 3.1 - Ejecución con Configuración por Defecto

In [0]:
# Ejecutar el pipeline con la configuración cargada por defecto desde el config.yaml (País: GT)
run_pipeline(CONFIG)

#### 3.2 - Ejecución para Múltiples Escenarios

Se define una lista de escenarios para procesar diferentes países y rangos de fechas en un solo bucle.

In [0]:
scenarios = [
    {"country_code": "PE", "start_date": "2025-01-01", "end_date": "2025-01-31"},
    {"country_code": "EC", "start_date": "2025-02-01", "end_date": "2025-02-28"},
    {"country_code": "SV", "start_date": "2025-03-01", "end_date": "2025-03-31"},
    {"country_code": "HN", "start_date": "2025-03-01", "end_date": "2025-03-31"},
    {"country_code": "JM", "start_date": "2025-06-01", "end_date": "2025-06-30"},
]

for scenario in scenarios:
    run_pipeline(CONFIG, **scenario)

## Fase 4: Auditoría de Resultados

Finalmente, verificamos los resultados escritos en el sistema de archivos. Creamos una función de auditoría para inspeccionar las particiones y leer una muestra de los datos guardados, tanto de los procesados como de los que están en cuarentena.

In [0]:
def audit_output_folder(path: str, description: str):
    """
    Lista las particiones de una ruta de salida y muestra una muestra
    de la primera partición que encuentre.
    """
    print(f"\n🔎 --- Auditoría de la Carpeta: {description} ---")
    print(f"Ruta: {path}")
    
    try:
        partitions = dbutils.fs.ls(path)
        dir_partitions = [p for p in partitions if p.isDir() and p.name.startswith(f"{CONFIG.output.partition_column}=")]
        
        if not dir_partitions:
            print("   - No se encontraron particiones.")
            return

        print(f"   - Se encontraron {len(dir_partitions)} particiones.")
        for p in dir_partitions[:3]: # Mostramos las primeras 3
            print(f"     - {p.name}")
        
        # Leer y mostrar una muestra de la primera partición
        first_partition_path = dir_partitions[0].path
        print(f"\n   - Leyendo muestra de la partición: {dir_partitions[0].name}")
        
        df_sample = spark.read.format(CONFIG.output.format).load(first_partition_path)
        df_sample.show(5, truncate=False)

    except Exception as e:
        if "java.io.FileNotFoundException" in str(e):
            print(f"   - La carpeta de salida no existe o está vacía.")
        else:
            print(f"   - ❌ Error durante la auditoría: {e}")

# Ejecutar la auditoría en las dos carpetas de salida
audit_output_folder(CONFIG.paths.output_processed, "Datos Procesados (Silver)")
audit_output_folder(CONFIG.paths.output_quarantine, "Datos en Cuarentena")

## Conclusión de la Solución y Resumen de Hallazgos

El pipeline desarrollado implementa una solución ETL completa y robusta, cumpliendo con todos los requisitos de la prueba técnica. Su diseño modular y parametrizable garantiza que la solución es mantenible, escalable y adaptable a futuras necesidades de negocio o cambios en las fuentes de datos.

### Manejo de Calidad de Datos y Hallazgos

Durante el análisis exploratorio, se identificaron varias irregularidades en los datos de origen:
- **Tipos de entrega no especificados:** Presencia del código `COBR` que no correspondía a `rutina` ni a `bonificación`.
- **Registros incompletos:** Filas con la columna `material` vacía.
- **Datos duplicados:** Existencia de registros idénticos.
- **Valores inválidos:** Precios con valor cero.

Para manejar estos casos de forma sistemática, se implementó un **mecanismo de cuarentena**. En lugar de simplemente descartar los datos, los registros que no cumplen con las reglas de validación son desviados a una ruta de salida separada. Crucialmente, se les añade una columna (`rejection_reason`) que detalla el motivo exacto del rechazo. Este enfoque permite una auditoría posterior completa y evita la pérdida de información.

### Características Destacadas de la Implementación

Para asegurar la robustez y calidad de la solución, se incorporaron las siguientes prácticas:

- **Diseño Orientado a Objetos:** La lógica ETL se encapsuló en la clase `DataPipeline`, promoviendo un código limpio, reutilizable y fácil de mantener.
- **Definición Explícita de Esquema:** Se definió un esquema estricto (`StructType`) para la lectura de los datos. Esto mejora el rendimiento y previene errores de inferencia de tipos que podrían ocurrir en un entorno productivo.
- **Estandarización y Enriquecimiento:** Se aplicó un estándar de nomenclatura consistente a todas las columnas finales y se enriquecieron los datos con información de valor, como el nombre completo del país y timestamps de procesamiento para auditoría.
- **Orquestación Flexible:** La función `run_pipeline` actúa como un orquestador que permite ejecutar el proceso para múltiples escenarios con diferentes parámetros, demostrando la alta flexibilidad de la solución.

### Consideraciones de Diseño

En un entorno de producción, el archivo `config.yaml` se gestionaría como un artefacto independiente dentro del control de versiones.

Para esta prueba técnica, se ha incluido la capacidad de generar y modificar la configuración de manera programática directamente desde el notebook. Esta decisión se tomó deliberadamente para demostrar un dominio avanzado de las herramientas (`dbutils`, `OmegaConf`, `PyYAML`) y para entregar un notebook interactivo y auto-contenido que permite probar diferentes escenarios sin necesidad de editar archivos manualmente.