# **DIA5. Manejo de errores y logging en ETL**
## Ejercicio: Construir pipeline ETL con manejo de errores completo
----

### 1.- Configurar logging

In [1]:
import logging
import time
from functools import wraps

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('etl_ecommerce.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('etl_ecommerce')

def log_etapa(etapa):
    """Decorator para logging de etapas"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            logger.info(f"üöÄ Iniciando {etapa}")
            start_time = time.time()

            try:
                result = func(*args, **kwargs)
                duration = time.time() - start_time
                logger.info(f"‚úÖ {etapa} completada en {duration:.2f}s")
                return result
            except Exception as e:
                duration = time.time() - start_time
                logger.error(f"üí• {etapa} fall√≥ en {duration:.2f}s: {e}")
                raise e

        return wrapper
    return decorator

### 2.- Pipeline ETL con error handling

In [2]:
import pandas as pd
import numpy as np
from typing import Dict, Any

class ETLPipeline:
    def __init__(self):
        self.logger = logger
        self.errores = []

    @log_etapa("extracci√≥n de datos")
    def extract(self) -> pd.DataFrame:
        """Extraer datos con manejo de errores"""
        try:
            # Simular extracci√≥n (podr√≠a fallar)
            if np.random.random() < 0.1:  # 10% chance de error
                raise ConnectionError("Error de conexi√≥n a fuente de datos")

            # Datos de ejemplo
            datos = pd.DataFrame({
                'orden_id': range(1, 101),
                'cliente_id': np.random.randint(1, 21, 100),
                'producto': np.random.choice(['A', 'B', 'C', 'D'], 100),
                'cantidad': np.random.randint(1, 6, 100),
                'precio': np.round(np.random.uniform(10, 200, 100), 2)
            })

            self.logger.info(f"Extra√≠dos {len(datos)} registros")
            return datos

        except Exception as e:
            self.errores.append(f"Extract: {e}")
            raise e

    @log_etapa("transformaci√≥n de datos")
    def transform(self, datos: pd.DataFrame) -> pd.DataFrame:
        """Transformar datos con validaciones"""
        try:
            df = datos.copy()

            # Validar datos de entrada
            if df.empty:
                raise ValueError("No hay datos para transformar")

            # Transformaciones
            df['total'] = df['cantidad'] * df['precio']
            df['categoria_precio'] = pd.cut(
                df['precio'],
                bins=[0, 50, 100, 200],
                labels=['Bajo', 'Medio', 'Alto']
            )

            # Validar transformaciones
            if df['total'].isnull().any():
                raise ValueError("Transformaci√≥n produjo valores nulos")

            self.logger.info(f"Transformados {len(df)} registros")
            return df

        except Exception as e:
            self.errores.append(f"Transform: {e}")
            raise e

    @log_etapa("carga de datos")
    def load(self, datos: pd.DataFrame) -> bool:
        """Cargar datos con verificaci√≥n"""
        try:
            # Simular carga (podr√≠a fallar)
            if np.random.random() < 0.05:  # 5% chance de error
                raise Exception("Error de conexi√≥n a base de datos")

            # En producci√≥n: datos.to_sql('ventas', engine, if_exists='append')
            self.logger.info(f"Cargados {len(datos)} registros exitosamente")

            # Validar carga
            registros_esperados = len(datos)
            registros_cargados = len(datos)  # Simulado

            if registros_cargados != registros_esperados:
                raise ValueError(f"Carga incompleta: {registros_cargados}/{registros_esperados}")

            return True

        except Exception as e:
            self.errores.append(f"Load: {e}")
            raise e

    def ejecutar_pipeline(self) -> Dict[str, Any]:
        """Ejecutar pipeline completo con manejo de errores"""
        self.logger.info("üéØ Iniciando pipeline ETL completo")

        try:
            # Extract
            datos_crudo = self.extract()

            # Transform
            datos_transformados = self.transform(datos_crudo)

            # Load
            exito = self.load(datos_transformados)

            resultado = {
                'exito': True,
                'registros_procesados': len(datos_transformados),
                'errores': self.errores
            }

            self.logger.info("üéâ Pipeline ETL completado exitosamente")
            return resultado

        except Exception as e:
            self.logger.error(f"üö® Pipeline ETL fall√≥: {e}")

            return {
                'exito': False,
                'error_principal': str(e),
                'errores': self.errores
            }

### 3.- Ejecutar y validar pipeline

In [3]:
# Ejecutar pipeline con diferentes escenarios
pipeline = ETLPipeline()

# Ejecuci√≥n exitosa
resultado = pipeline.ejecutar_pipeline()

print(f"\nResultado del pipeline:")
print(f"√âxito: {resultado['exito']}")
if resultado['exito']:
    print(f"Registros procesados: {resultado['registros_procesados']}")
else:
    print(f"Error principal: {resultado['error_principal']}")

print(f"Errores registrados: {len(resultado['errores'])}")
for error in resultado['errores']:
    print(f"  - {error}")

# Ejecutar m√∫ltiples veces para probar robustez
resultados_multiples = []
for i in range(5):
    print(f"\n--- Ejecuci√≥n {i+1} ---")
    pipeline_i = ETLPipeline()
    resultado_i = pipeline_i.ejecutar_pipeline()
    resultados_multiples.append(resultado_i['exito'])

exito_rate = sum(resultados_multiples) / len(resultados_multiples)
print(".1%")

ERROR:etl_ecommerce:üí• extracci√≥n de datos fall√≥ en 0.00s: Error de conexi√≥n a fuente de datos
ERROR:etl_ecommerce:üö® Pipeline ETL fall√≥: Error de conexi√≥n a fuente de datos



Resultado del pipeline:
√âxito: True
Registros procesados: 100
Errores registrados: 0

--- Ejecuci√≥n 1 ---

--- Ejecuci√≥n 2 ---

--- Ejecuci√≥n 3 ---

--- Ejecuci√≥n 4 ---

--- Ejecuci√≥n 5 ---
.1%
