# Proceso ETL: Una Gu√≠a Pr√°ctica con Python

## üìö Objetivos de Aprendizaje

Al completar este notebook, los estudiantes ser√°n capaces de:
- Comprender los conceptos fundamentales de ETL (Extract, Transform, Load)
- Implementar cada fase del proceso ETL usando Python y pandas
- Aplicar t√©cnicas de limpieza y transformaci√≥n de datos
- Manejar errores comunes en pipelines de datos
- Dise√±ar pipelines ETL escalables y mantenibles

## üéØ ¬øQu√© es ETL?

**ETL** es un proceso fundamental en ingenier√≠a de datos que permite:
- **Extract (Extraer)**: Obtener datos de m√∫ltiples fuentes
- **Transform (Transformar)**: Limpiar, validar y estructurar los datos
- **Load (Cargar)**: Almacenar los datos procesados en el destino final

Este notebook es una gu√≠a completa que cubre desde conceptos b√°sicos hasta temas avanzados, con ejemplos pr√°cticos y ejercicios para trabajar con pipelines ETL en entornos reales de ingenier√≠a de datos.

## El Pipeline ETL de Referencia

Nos basaremos en la siguiente estructura de pipeline, que representa un flujo ETL cl√°sico:

In [None]:
def etl_pipeline():
    # 1. Extraer datos desde una fuente original
    raw_data = extract_from_database()
    
    # 2. Transformar los datos aplicando reglas y limpiando
    cleaned_data = apply_business_rules(raw_data)
    
    # 3. Normalizar el esquema para que sea consistente
    structured_data = normalize_schema(cleaned_data)
    
    # 4. Cargar los datos transformados a su destino final
    load_to_warehouse(structured_data)

A continuaci√≥n, implementaremos cada una de estas funciones con ejemplos claros.

## üì¶ Librer√≠as y Configuraci√≥n Inicial

Para nuestros ejemplos, usaremos las siguientes librer√≠as esenciales en ingenier√≠a de datos:

In [25]:
import pandas as pd          # Manipulaci√≥n y an√°lisis de datos
import sqlite3              # Base de datos SQL ligera
import numpy as np          # Operaciones num√©ricas
import json                 # Manejo de datos JSON
import logging              # Sistema de logs
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configuraci√≥n de logging para monitorear el pipeline
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

## üß† Conceptos Fundamentales de ETL

### Tipos de Procesamiento ETL

1. **Batch Processing (Procesamiento por lotes)**
   - Procesa grandes vol√∫menes de datos en intervalos programados
   - Ideal para reportes diarios, semanales o mensuales
   - Ejemplo: Procesar todas las transacciones del d√≠a anterior

2. **Real-time Processing (Procesamiento en tiempo real)**
   - Procesa datos tan pronto como llegan
   - Cr√≠tico para aplicaciones que requieren respuesta inmediata
   - Ejemplo: Detecci√≥n de fraude en transacciones

3. **Near Real-time Processing (Procesamiento casi en tiempo real)**
   - Procesa datos con un peque√±o retraso (segundos o minutos)
   - Balance entre velocidad y eficiencia de recursos
   - Ejemplo: Actualizaci√≥n de dashboards cada 5 minutos

### Calidad de Datos - Las 6 Dimensiones

1. **Exactitud**: Los datos reflejan la realidad
2. **Completitud**: No hay valores faltantes cr√≠ticos
3. **Consistencia**: Los datos son coherentes entre sistemas
4. **Validez**: Los datos cumplen con reglas de negocio
5. **Unicidad**: No hay duplicados no deseados
6. **Actualidad**: Los datos est√°n actualizados

## 1. Extract: Extracci√≥n de Datos

### üéØ Objetivos de la Fase Extract
- Conectar con m√∫ltiples fuentes de datos
- Extraer datos de manera eficiente
- Manejar diferentes formatos y protocolos
- Implementar estrategias de extracci√≥n incremental

### üìä Fuentes de Datos Comunes
- **Bases de datos relacionales**: MySQL, PostgreSQL, SQL Server
- **Bases de datos NoSQL**: MongoDB, Cassandra, Redis
- **Archivos**: CSV, JSON, XML, Parquet
- **APIs REST**: Servicios web y microservicios
- **Sistemas de streaming**: Kafka, Kinesis

**Ejemplo:** Vamos a simular la extracci√≥n de datos de una base de datos SQL. Crearemos una peque√±a base de datos en memoria con `sqlite3` y luego la leeremos usando `pandas`.

In [37]:
def extract_from_database():
    """
    Simula la extracci√≥n de datos desde una base de datos.
    En un entorno real, esto se conectar√≠a a una base de datos real.
    
    Returns:
        pd.DataFrame: Datos crudos extra√≠dos
    """
    try:
        logger.info("Iniciando extracci√≥n de datos...")
        
        # Simulamos datos con problemas t√≠picos de calidad
        data = {
            'ID_USER': [101, 102, 103, 104, 105, 106],
            'user_name': ['Ana', 'Luis', 'Marta', 'Juan', 'Eva', None],  # Valor nulo
            'registration_date': ['2025-01-15', '2025-02-20', '2025-03-01', '2025-04-10', '2025-05-19', '2025-06-30'],
            'total_spent': [150.5, 80.0, -999, 200.0, 45.25, 'invalid'],  # Valor inv√°lido
            'country_code': ['ES', 'MX', 'ES', 'CO', 'es', 'ES'],  # Inconsistencia en may√∫sculas
            'email': ['ana@email.com', 'luis@email.com', 'marta@email.com', 'juan@email.com', 'eva@email.com', 'pedro@email.com']
        }
        
        df = pd.DataFrame(data)
        
        print("--- 1. Datos Crudos Extra√≠dos ---")
        print(f"Registros extra√≠dos: {len(df)}")
        print(f"Columnas: {list(df.columns)}")
        print("\nPrimeras filas:")
        print(df)
        print("\nInformaci√≥n del DataFrame:")
        print(df.info())
        print("\n geometria del DataFrame:")
        print(df.shape)
        print("\n Estad√≠sticas descriptivas:")
        print (df.describe())
        print('')
        
        logger.info(f"Extracci√≥n completada: {len(df)} registros")
        return df
        
    except Exception as e:
        logger.error(f"Error en la extracci√≥n: {str(e)}")
        raise

In [32]:
raw_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   ID_USER            6 non-null      int64 
 1   user_name          5 non-null      object
 2   registration_date  6 non-null      object
 3   total_spent        6 non-null      object
 4   country_code       6 non-null      object
 5   email              6 non-null      object
dtypes: int64(1), object(5)
memory usage: 420.0+ bytes


### ‚ñ∂Ô∏è Ejecutemos la Extracci√≥n

Ahora vamos a ejecutar la funci√≥n de extracci√≥n para ver los datos:

In [38]:
# Ejecutar la funci√≥n de extracci√≥n
raw_data = extract_from_database()

2025-11-06 19:30:15,671 - INFO - Iniciando extracci√≥n de datos...
2025-11-06 19:30:15,682 - INFO - Extracci√≥n completada: 6 registros


--- 1. Datos Crudos Extra√≠dos ---
Registros extra√≠dos: 6
Columnas: ['ID_USER', 'user_name', 'registration_date', 'total_spent', 'country_code', 'email']

Primeras filas:
   ID_USER user_name registration_date total_spent country_code  \
0      101       Ana        2025-01-15       150.5           ES   
1      102      Luis        2025-02-20        80.0           MX   
2      103     Marta        2025-03-01        -999           ES   
3      104      Juan        2025-04-10       200.0           CO   
4      105       Eva        2025-05-19       45.25           es   
5      106      None        2025-06-30     invalid           ES   

             email  
0    ana@email.com  
1   luis@email.com  
2  marta@email.com  
3   juan@email.com  
4    eva@email.com  
5  pedro@email.com  

Informaci√≥n del DataFrame:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             -----------

### üìÅ Ejemplo: Extracci√≥n desde Archivo CSV

Veamos c√≥mo extraer datos desde un archivo CSV (otra fuente com√∫n):

In [None]:
def extract_from_csv(file_path='sample_products.csv'):
    """
    Extrae datos desde un archivo CSV.
    
    Args:
        file_path (str): Ruta del archivo CSV
    
    Returns:
        pd.DataFrame: Datos extra√≠dos del CSV
    """
    try:
        # Crear un archivo CSV de ejemplo
        sample_data = {
            'product_id': [1, 2, 3, 4, 5],
            'product_name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
            'price': [999.99, 25.50, 75.00, 299.99, 89.99],
            'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories']
        }
        
        df_sample = pd.DataFrame(sample_data)
        df_sample.to_csv(file_path, index=False)
        
        # Leer el archivo CSV
        df = pd.read_csv(file_path)
        
        print("--- Extracci√≥n desde CSV ---")
        print(df)
        return df
        
    except FileNotFoundError:
        logger.error(f"Archivo no encontrado: {file_path}")
        return pd.DataFrame()
    except Exception as e:
        logger.error(f"Error leyendo CSV: {str(e)}")
        return pd.DataFrame()

# Ejemplo de uso
products_df = extract_from_csv()

## 2. Transform: Aplicaci√≥n de Reglas de Negocio

### üéØ Objetivos de la Fase Transform
- Limpiar datos inconsistentes o err√≥neos
- Validar que los datos cumplan reglas de negocio
- Enriquecer datos con informaci√≥n adicional
- Aplicar transformaciones matem√°ticas o l√≥gicas
- Normalizar formatos y estructuras

### üîß T√©cnicas Comunes de Transformaci√≥n
- **Limpieza**: Eliminar duplicados, corregir errores tipogr√°ficos
- **Validaci√≥n**: Verificar rangos, formatos, tipos de datos
- **Enriquecimiento**: Agregar datos calculados o de referencia
- **Normalizaci√≥n**: Estandarizar formatos y escalas
- **Agregaci√≥n**: Resumir datos por grupos o per√≠odos

**Ejemplo:** Aplicaremos las siguientes reglas:
1. Corregir valores err√≥neos: El valor `-999` en `total_spent` debe ser 0
2. Corregir inconsistencias: El c√≥digo de pa√≠s `es` debe ser `ES`
3. Filtrar datos: Solo usuarios con gasto mayor a 50
4. Manejar valores nulos en nombres de usuario

In [None]:
def apply_business_rules(raw_data):
    """
    Aplica reglas de negocio y limpieza de datos.
    
    Args:
        raw_data (pd.DataFrame): Datos crudos a transformar
    
    Returns:
        pd.DataFrame: Datos limpios y transformados
    """
    try:
        logger.info("Iniciando transformaci√≥n de datos...")
        cleaned_data = raw_data.copy()
        
        print("--- An√°lisis de Calidad de Datos ---")
        print(f"Registros iniciales: {len(cleaned_data)}")
        print(f"Valores nulos por columna:")
        print(cleaned_data.isnull().sum())
        print()
        
        # 1. Manejar valores nulos en user_name
        null_names = cleaned_data['user_name'].isnull().sum()
        if null_names > 0:
            print(f"‚ö†Ô∏è  Encontrados {null_names} nombres de usuario nulos")
            cleaned_data['user_name'] = cleaned_data['user_name'].fillna('Usuario_Desconocido')
        
        # 2. Corregir valores err√≥neos en total_spent
        # Convertir valores no num√©ricos a NaN, luego reemplazar -999 por 0
        cleaned_data['total_spent'] = pd.to_numeric(cleaned_data['total_spent'], errors='coerce')
        invalid_spent = cleaned_data['total_spent'].isnull().sum()
        if invalid_spent > 0:
            print(f"‚ö†Ô∏è  Encontrados {invalid_spent} valores inv√°lidos en total_spent")
            cleaned_data['total_spent'] = cleaned_data['total_spent'].fillna(0)
        
        cleaned_data['total_spent'] = cleaned_data['total_spent'].replace(-999, 0)
        
        # 3. Normalizar c√≥digos de pa√≠s
        cleaned_data['country_code'] = cleaned_data['country_code'].str.upper()
        
        # 4. Aplicar regla de negocio: solo usuarios con gasto > 50
        initial_count = len(cleaned_data)
        cleaned_data = cleaned_data[cleaned_data['total_spent'] > 50]
        filtered_count = initial_count - len(cleaned_data)
        
        print(f"üìä Registros filtrados (gasto <= 50): {filtered_count}")
        print(f"üìä Registros finales: {len(cleaned_data)}")
        
        print("\n--- 2. Datos Limpios (Reglas Aplicadas) ---")
        print(cleaned_data)
        print('')
        
        logger.info(f"Transformaci√≥n completada: {len(cleaned_data)} registros v√°lidos")
        return cleaned_data
        
    except Exception as e:
        logger.error(f"Error en la transformaci√≥n: {str(e)}")
        raise

### üîç T√©cnicas Avanzadas de Transformaci√≥n

Veamos algunas t√©cnicas adicionales de transformaci√≥n:

In [None]:
def advanced_transformations(df):
    """
    Aplica transformaciones avanzadas a los datos.
    
    Args:
        df (pd.DataFrame): DataFrame a transformar
    
    Returns:
        pd.DataFrame: DataFrame con transformaciones aplicadas
    """
    df_transformed = df.copy()
    
    # 1. Crear columnas derivadas
    df_transformed['registration_year'] = pd.to_datetime(df_transformed['registration_date']).dt.year
    df_transformed['registration_month'] = pd.to_datetime(df_transformed['registration_date']).dt.month
    
    # 2. Categorizar gastos
    def categorize_spending(amount):
        if amount < 100:
            return 'Bajo'
        elif amount < 200:
            return 'Medio'
        else:
            return 'Alto'
    
    df_transformed['spending_category'] = df_transformed['total_spent'].apply(categorize_spending)
    
    # 3. Validar emails (ejemplo b√°sico)
    if 'email' in df_transformed.columns:
        df_transformed['email_valid'] = df_transformed['email'].str.contains('@', na=False)
    
    print("--- Transformaciones Avanzadas ---")
    print(df_transformed[['user_name', 'total_spent', 'spending_category', 'registration_year']].head())
    
    return df_transformed

# Esta funci√≥n se puede usar despu√©s de apply_business_rules
# df_advanced = advanced_transformations(cleaned_data)

## 3. Transform: Normalizaci√≥n del Esquema

### üéØ Objetivos de la Normalizaci√≥n
- Estandarizar nombres de columnas
- Asegurar tipos de datos correctos
- Mantener consistencia entre sistemas
- Facilitar la integraci√≥n con el destino

El objetivo es asegurar que el esquema de los datos (nombres de columnas, tipos de datos) sea consistente.

**Ejemplo:** Vamos a:
1. Cambiar nombres de columnas a un formato est√°ndar (snake_case)
2. Asegurar que `registration_date` sea de tipo fecha
3. Reordenar columnas seg√∫n el esquema del destino

In [None]:
def normalize_schema(cleaned_data):
    """
    Normaliza el esquema de datos seg√∫n est√°ndares del data warehouse.
    
    Args:
        cleaned_data (pd.DataFrame): Datos limpios a normalizar
    
    Returns:
        pd.DataFrame: Datos con esquema normalizado
    """
    try:
        logger.info("Iniciando normalizaci√≥n del esquema...")
        structured_data = cleaned_data.copy()
        
        print("--- An√°lisis del Esquema Original ---")
        print(f"Columnas originales: {list(structured_data.columns)}")
        print(f"Tipos de datos originales:")
        print(structured_data.dtypes)
        print()
        
        # 1. Normalizar nombres de columnas (snake_case)
        column_mapping = {
            'ID_USER': 'user_id',
            'user_name': 'user_name',
            'registration_date': 'registration_date',
            'total_spent': 'total_spent',
            'country_code': 'country_code',
            'email': 'email_address'
        }
        
        # Solo renombrar columnas que existen
        existing_columns = {k: v for k, v in column_mapping.items() if k in structured_data.columns}
        structured_data = structured_data.rename(columns=existing_columns)
        
        # 2. Convertir tipos de datos
        structured_data['registration_date'] = pd.to_datetime(structured_data['registration_date'])
        structured_data['user_id'] = structured_data['user_id'].astype('int64')
        structured_data['total_spent'] = structured_data['total_spent'].astype('float64')
        
        # 3. Agregar metadatos de procesamiento
        structured_data['processed_at'] = datetime.now()
        structured_data['data_source'] = 'user_database'
        
        # 4. Reordenar columnas seg√∫n esquema del data warehouse
        column_order = ['user_id', 'user_name', 'email_address', 'country_code', 
                       'registration_date', 'total_spent', 'processed_at', 'data_source']
        
        # Solo incluir columnas que existen
        available_columns = [col for col in column_order if col in structured_data.columns]
        structured_data = structured_data[available_columns]
        
        print("--- 3. Datos Estructurados (Esquema Normalizado) ---")
        print(f"Columnas finales: {list(structured_data.columns)}")
        print(structured_data)
        print("\nInformaci√≥n del esquema final:")
        print(structured_data.info())
        print('')
        
        logger.info(f"Normalizaci√≥n completada: {len(structured_data)} registros estructurados")
        return structured_data
        
    except Exception as e:
        logger.error(f"Error en la normalizaci√≥n: {str(e)}")
        raise

## 4. Load: Carga de Datos

### üéØ Objetivos de la Fase Load
- Cargar datos en el sistema de destino
- Mantener integridad referencial
- Optimizar el rendimiento de carga
- Implementar estrategias de carga incremental

### üìä Estrategias de Carga
- **Full Load**: Carga completa de todos los datos
- **Incremental Load**: Solo datos nuevos o modificados
- **Upsert**: Insertar nuevos registros, actualizar existentes
- **SCD (Slowly Changing Dimensions)**: Manejo de cambios hist√≥ricos

La fase final es cargar los datos transformados en el sistema de destino (Data Warehouse, Data Lake, etc.).

**Ejemplo:** Simularemos diferentes tipos de carga:

In [None]:
def load_to_warehouse(structured_data, load_type='full'):
    """
    Carga datos al data warehouse con diferentes estrategias.
    
    Args:
        structured_data (pd.DataFrame): Datos estructurados a cargar
        load_type (str): Tipo de carga ('full', 'incremental', 'upsert')
    
    Returns:
        bool: True si la carga fue exitosa
    """
    try:
        logger.info(f"Iniciando carga de datos - Tipo: {load_type}")
        
        # Validaciones pre-carga
        if structured_data.empty:
            logger.warning("No hay datos para cargar")
            return False
        
        print("--- Validaciones Pre-Carga ---")
        print(f"Registros a cargar: {len(structured_data)}")
        print(f"Columnas: {list(structured_data.columns)}")
        
        # Verificar duplicados
        duplicates = structured_data.duplicated(subset=['user_id']).sum()
        if duplicates > 0:
            print(f"‚ö†Ô∏è  Encontrados {duplicates} registros duplicados")
            structured_data = structured_data.drop_duplicates(subset=['user_id'], keep='last')
        
        # Simular diferentes tipos de carga
        if load_type == 'full':
            # Carga completa - reemplaza todos los datos
            filename = 'data_warehouse_users_full.csv'
            structured_data.to_csv(filename, index=False)
            print(f"‚úÖ Carga completa realizada: {filename}")
            
        elif load_type == 'incremental':
            # Carga incremental - solo nuevos registros
            filename = 'data_warehouse_users_incremental.csv'
            # En un caso real, verificar√≠amos qu√© registros ya existen
            structured_data.to_csv(filename, mode='a', header=False, index=False)
            print(f"‚úÖ Carga incremental realizada: {filename}")
            
        elif load_type == 'upsert':
            # Upsert - insertar nuevos, actualizar existentes
            filename = 'data_warehouse_users_upsert.csv'
            structured_data.to_csv(filename, index=False)
            print(f"‚úÖ Upsert realizado: {filename}")
        
        print("\n--- 4. Datos Cargados ---")
        print(f"Registros cargados exitosamente: {len(structured_data)}")
        print("\nPrimeras filas del archivo de destino:")
        
        # Mostrar contenido del archivo
        with open(filename, 'r') as f:
            lines = f.readlines()[:6]  # Mostrar solo las primeras 6 l√≠neas
            print(''.join(lines))
        
        # Estad√≠sticas de carga
        print("\n--- Estad√≠sticas de Carga ---")
        print(f"üìä Total de registros: {len(structured_data)}")
        print(f"üìä Pa√≠ses √∫nicos: {structured_data['country_code'].nunique()}")
        print(f"üìä Gasto promedio: ${structured_data['total_spent'].mean():.2f}")
        print(f"üìä Gasto total: ${structured_data['total_spent'].sum():.2f}")
        
        logger.info(f"Carga completada exitosamente: {len(structured_data)} registros")
        return True
        
    except Exception as e:
        logger.error(f"Error en la carga: {str(e)}")
        return False

## Ejecuci√≥n del Pipeline Completo

Finalmente, orquestamos todas las funciones en el pipeline principal.

In [None]:
def etl_pipeline_with_monitoring():
    """
    Pipeline ETL completo con monitoreo y manejo de errores.
    """
    start_time = datetime.now()
    
    try:
        print("üöÄ Iniciando Pipeline ETL")
        print(f"‚è∞ Hora de inicio: {start_time}")
        print("=" * 50)
        
        # Extract
        raw_data = extract_from_database()
        
        # Transform
        cleaned_data = apply_business_rules(raw_data)
        
        # Normalize
        structured_data = normalize_schema(cleaned_data)
        
        # Load
        success = load_to_warehouse(structured_data, load_type='full')
        
        # M√©tricas finales
        end_time = datetime.now()
        duration = end_time - start_time
        
        print("\n" + "=" * 50)
        print("üìä RESUMEN DEL PIPELINE ETL")
        print("=" * 50)
        print(f"‚úÖ Estado: {'EXITOSO' if success else 'FALLIDO'}")
        print(f"‚è±Ô∏è  Duraci√≥n total: {duration.total_seconds():.2f} segundos")
        print(f"üì• Registros extra√≠dos: {len(raw_data)}")
        print(f"üîÑ Registros procesados: {len(cleaned_data)}")
        print(f"üì§ Registros cargados: {len(structured_data)}")
        print(f"üìâ Tasa de filtrado: {((len(raw_data) - len(structured_data)) / len(raw_data) * 100):.1f}%")
        
        return structured_data
        
    except Exception as e:
        logger.error(f"Pipeline fall√≥: {str(e)}")
        print(f"‚ùå Error en el pipeline: {str(e)}")
        return None

# Ejecutar el pipeline completo
result = etl_pipeline_with_monitoring()

## üîß Ejecuci√≥n Paso a Paso

Si quieres ver cada paso por separado, ejecuta las siguientes celdas una por una:

In [None]:
# Paso 1: Extracci√≥n
print("=== PASO 1: EXTRACCI√ìN ===")
raw_data_step = extract_from_database()

In [None]:
# Paso 2: Transformaci√≥n
print("=== PASO 2: TRANSFORMACI√ìN ===")
cleaned_data_step = apply_business_rules(raw_data_step)

In [None]:
# Paso 3: Normalizaci√≥n
print("=== PASO 3: NORMALIZACI√ìN ===")
structured_data_step = normalize_schema(cleaned_data_step)

In [None]:
# Paso 4: Carga
print("=== PASO 4: CARGA ===")
success = load_to_warehouse(structured_data_step, load_type='full')
print(f"\n‚úÖ Pipeline completado exitosamente: {success}")