<p style="margin: 5px 0 0 0; color: #666;"><em>Desarrollado con Claude - Anthropic</em></p>

# 12. Conceptos de Ingenier√≠a de Datos B√°sicos

In [5]:
import pandas as pd
import numpy as np
import json
import requests
import time
from datetime import datetime
import os
import warnings
import pyarrow
warnings.filterwarnings('ignore')

# Pre-registrar tipos de extensi√≥n pyarrow para evitar conflictos al re-ejecutar celdas
try:
    import pandas.core.arrays.arrow.extension_types
except Exception:
    pass

print("‚úì M√≥dulos importados correctamente")
print(f"Fecha actual: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

‚úì M√≥dulos importados correctamente
Fecha actual: 2026-02-21 22:22:15


## Introducci√≥n a la Ingenier√≠a de Datos

### ¬øQu√© es?

La **ingenier√≠a de datos** es la disciplina que se encarga de dise√±ar, construir y mantener los sistemas e infraestructuras necesarios para recopilar, almacenar, transformar y distribuir datos. Es el puente entre las fuentes de datos en bruto y los equipos de an√°lisis que extraen valor de ellos.

**Ingeniero de Datos vs Analista de Datos:**

| Aspecto | Ingeniero de Datos | Analista de Datos |
|---|---|---|
| Enfoque | Infraestructura y arquitectura | Estad√≠stica y visualizaci√≥n |
| Construye | Pipelines y automatizaciones | Insights y reportes |
| Tecnolog√≠as | Spark, Airflow, Kafka | Python, SQL, Power BI |
| Objetivo | Datos disponibles y confiables | Valor y decisiones basadas en datos |

**Habilidades compartidas:** SQL avanzado, Python, comprensi√≥n de datos, resoluci√≥n de problemas y documentaci√≥n.

**Ciclo de vida de los datos:**

1. **Fuentes de datos** ‚Üí Bases de datos, APIs, archivos, streaming
2. **Ingesta** ‚Üí Extracci√≥n batch o streaming
3. **Almacenamiento crudo** ‚Üí Data Lake con formatos eficientes (Parquet, Avro)
4. **Transformaci√≥n** ‚Üí Limpieza, normalizaci√≥n, enriquecimiento
5. **Almacenamiento procesado** ‚Üí Data Warehouse optimizado para an√°lisis
6. **Consumo** ‚Üí Dashboards, ML, APIs, an√°lisis ad-hoc

### ¬øPara qu√© sirve?

Comprender los fundamentos de ingenier√≠a de datos sirve para:

- **Entender de d√≥nde vienen los datos** y c√≥mo llegan hasta las herramientas de an√°lisis
- **Ser un analista m√°s vers√°til** con capacidad de gestionar el ciclo completo de los datos
- **Manejar datos de forma m√°s eficiente** eligiendo formatos y arquitecturas adecuadas
- **Automatizar tareas repetitivas** como reportes, descargas y limpiezas peri√≥dicas
- **Colaborar mejor con ingenieros de datos** al compartir vocabulario t√©cnico
- **Crear pipelines b√°sicos** que automatizan la recolecci√≥n y transformaci√≥n de datos

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, simulamos un mini flujo de datos que recorre las etapas del ciclo de vida: desde la extracci√≥n hasta el an√°lisis.

In [6]:
print("=" * 70)
print("EJEMPLO: MINI CICLO DE VIDA DE DATOS".center(70))
print("=" * 70)

# 1. FUENTE: Simular datos crudos de un sistema transaccional
print("\n1Ô∏è‚É£ FUENTE - Datos crudos del sistema:")
datos_crudos = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'producto': ['Laptop', 'mouse', 'TECLADO', 'Monitor', None],
    'precio': [1200, 25, 'N/A', 350, 80],
    'fecha': ['2024-01-15', '2024/01/16', '15-01-2024', '2024-01-17', '2024-01-18']
})
print(datos_crudos)
print(f"   Problemas: may√∫sculas inconsistentes, nulo, tipo incorrecto, fechas mixtas")

# 2. TRANSFORMACI√ìN: Limpiar y normalizar
print("\n2Ô∏è‚É£ TRANSFORMACI√ìN - Limpieza y normalizaci√≥n:")
datos_limpios = datos_crudos.copy()
datos_limpios['producto'] = datos_limpios['producto'].fillna('Desconocido').str.capitalize()
datos_limpios['precio'] = pd.to_numeric(datos_limpios['precio'], errors='coerce').fillna(0)
datos_limpios['fecha'] = pd.to_datetime(datos_limpios['fecha'], format='mixed', dayfirst=False)
print(datos_limpios)
print(f"   Tipos: {dict(datos_limpios.dtypes)}")

# 3. CONSUMO: An√°lisis r√°pido
print("\n3Ô∏è‚É£ CONSUMO - An√°lisis listo para dashboards:")
print(f"   Total productos: {len(datos_limpios)}")
print(f"   Precio promedio: ${datos_limpios['precio'].mean():.2f}")
print(f"   Rango de fechas: {datos_limpios['fecha'].min().date()} a {datos_limpios['fecha'].max().date()}")

print("\n‚úÖ Datos pasaron de crudos a listos para an√°lisis")

                 EJEMPLO: MINI CICLO DE VIDA DE DATOS                 

1Ô∏è‚É£ FUENTE - Datos crudos del sistema:
   id producto precio       fecha
0   1   Laptop   1200  2024-01-15
1   2    mouse     25  2024/01/16
2   3  TECLADO    N/A  15-01-2024
3   4  Monitor    350  2024-01-17
4   5      NaN     80  2024-01-18
   Problemas: may√∫sculas inconsistentes, nulo, tipo incorrecto, fechas mixtas

2Ô∏è‚É£ TRANSFORMACI√ìN - Limpieza y normalizaci√≥n:
   id     producto  precio      fecha
0   1       Laptop  1200.0 2024-01-15
1   2        Mouse    25.0 2024-01-16
2   3      Teclado     0.0 2024-01-15
3   4      Monitor   350.0 2024-01-17
4   5  Desconocido    80.0 2024-01-18
   Tipos: {'id': dtype('int64'), 'producto': <StringDtype(na_value=nan)>, 'precio': dtype('float64'), 'fecha': dtype('<M8[us]')}

3Ô∏è‚É£ CONSUMO - An√°lisis listo para dashboards:
   Total productos: 5
   Precio promedio: $331.00
   Rango de fechas: 2024-01-15 a 2024-01-18

‚úÖ Datos pasaron de crudos a listos para an

## 1. ETL vs ELT

### ¬øQu√© es?

**ETL (Extract, Transform, Load)** y **ELT (Extract, Load, Transform)** son los dos paradigmas principales para mover datos desde sus fuentes hasta un destino anal√≠tico.

**ETL** ‚Äî Paradigma tradicional:
> `[Fuentes] ‚Üí EXTRACT ‚Üí TRANSFORM ‚Üí LOAD ‚Üí [Data Warehouse]`

Transforma los datos **antes** de cargarlos. Los datos llegan ya procesados al destino.

**ELT** ‚Äî Paradigma moderno (cloud):
> `[Fuentes] ‚Üí EXTRACT ‚Üí LOAD ‚Üí TRANSFORM ‚Üí [Consultas]`

Carga los datos en bruto y los transforma **dentro** del warehouse aprovechando su potencia.

| Aspecto | ETL | ELT |
|---|---|---|
| Transformaci√≥n | Antes de cargar | Despu√©s de cargar |
| Velocidad | M√°s lento | M√°s r√°pido |
| Datos crudos | No disponibles | Disponibles |
| Flexibilidad | Menor | Mayor |
| Infraestructura | Servidor propio | Cloud warehouse |
| Privacidad | Mejor (transforma antes) | Requiere gobernanza |

**¬øCu√°l usar?**

- **ETL** cuando hay transformaciones complejas, preocupaciones de privacidad o el warehouse tiene recursos limitados
- **ELT** cuando usas plataformas cloud (Snowflake, BigQuery) y necesitas flexibilidad
- **Tendencia actual:** ELT gana popularidad. Herramientas modernas: dbt, Dataform, Matillion

### ¬øPara qu√© sirve?

Conocer ETL y ELT sirve para:

- **Elegir la arquitectura correcta** seg√∫n los recursos disponibles y requisitos del proyecto
- **Dise√±ar flujos de datos** que lleven informaci√≥n desde m√∫ltiples fuentes hasta un repositorio centralizado
- **Limpiar y transformar datos** de forma sistem√°tica antes o despu√©s de almacenarlos
- **Comprender las tendencias actuales** del ecosistema de datos
- **Implementar pipelines reproducibles** que garanticen la calidad y consistencia de los datos
- **Aprovechar plataformas cloud** modernas con el enfoque ELT

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, implementamos un proceso ETL completo en Python: extracci√≥n de datos de ventas, transformaci√≥n con limpieza y columnas calculadas, y carga al destino final.

In [7]:
print("=" * 70)
print("EJEMPLO: ETL SIMPLE EN PYTHON".center(70))
print("=" * 70)

# 1. EXTRACT: Simular extracci√≥n de datos de m√∫ltiples fuentes
print("\n1Ô∏è‚É£ EXTRACT - Extrayendo datos...")

ventas_raw = {
    'id': [1, 2, 3, 4, 5],
    'producto': ['laptop', 'mouse', 'TECLADO', 'Monitor', 'laptop'],
    'cantidad': [2, 5, None, 1, 3],
    'precio': [1000, 25, 50, 300, 1000],
    'fecha': ['2024-01-15', '2024-01-16', '2024-01-16', None, '2024-01-17']
}

df_ventas = pd.DataFrame(ventas_raw)
print(f"‚úì Extra√≠dos {len(df_ventas)} registros de ventas")
print(df_ventas)

# 2. TRANSFORM: Limpiar y transformar
print("\n2Ô∏è‚É£ TRANSFORM - Transformando y limpiando datos...")

df_transformado = df_ventas.copy()

# Normalizar nombres de productos
df_transformado['producto'] = df_transformado['producto'].str.lower().str.strip()

# Rellenar valores faltantes
df_transformado['cantidad'].fillna(1, inplace=True)
df_transformado['fecha'].fillna('2024-01-15', inplace=True)

# Crear columnas calculadas
df_transformado['total'] = df_transformado['cantidad'] * df_transformado['precio']
df_transformado['fecha'] = pd.to_datetime(df_transformado['fecha'])

# Agregar informaci√≥n adicional
df_transformado['a√±o'] = df_transformado['fecha'].dt.year
df_transformado['mes'] = df_transformado['fecha'].dt.month

print("‚úì Datos transformados:")
print(df_transformado)

# 3. LOAD: Cargar a destino
print("\n3Ô∏è‚É£ LOAD - Cargando datos procesados...")

# En producci√≥n: cargar√≠as a base de datos
# df_transformado.to_sql('ventas', con=engine, if_exists='append')

# Para ejemplo: guardar a CSV
output_file = 'ventas_procesadas.csv'
df_transformado.to_csv(output_file, index=False)
print(f"‚úì Datos cargados en {output_file}")

# Resumen del proceso
print("\n" + "=" * 70)
print("RESUMEN DEL PROCESO ETL".center(70))
print("=" * 70)
print(f"""
Registros extra√≠dos:    {len(df_ventas)}
Registros transformados: {len(df_transformado)}
Registros cargados:     {len(df_transformado)}
Columnas originales:    {len(df_ventas.columns)}
Columnas finales:       {len(df_transformado.columns)}
Valores nulos corregidos: {df_ventas.isna().sum().sum()}
""")

# Limpiar archivo de ejemplo
if os.path.exists(output_file):
    os.remove(output_file)
    print(f"‚úì Archivo temporal {output_file} eliminado")

                    EJEMPLO: ETL SIMPLE EN PYTHON                     

1Ô∏è‚É£ EXTRACT - Extrayendo datos...
‚úì Extra√≠dos 5 registros de ventas
   id producto  cantidad  precio       fecha
0   1   laptop       2.0    1000  2024-01-15
1   2    mouse       5.0      25  2024-01-16
2   3  TECLADO       NaN      50  2024-01-16
3   4  Monitor       1.0     300         NaN
4   5   laptop       3.0    1000  2024-01-17

2Ô∏è‚É£ TRANSFORM - Transformando y limpiando datos...
‚úì Datos transformados:
   id producto  cantidad  precio      fecha   total     a√±o  mes
0   1   laptop       2.0    1000 2024-01-15  2000.0  2024.0  1.0
1   2    mouse       5.0      25 2024-01-16   125.0  2024.0  1.0
2   3  teclado       NaN      50 2024-01-16     NaN  2024.0  1.0
3   4  monitor       1.0     300        NaT   300.0     NaN  NaN
4   5   laptop       3.0    1000 2024-01-17  3000.0  2024.0  1.0

3Ô∏è‚É£ LOAD - Cargando datos procesados...
‚úì Datos cargados en ventas_procesadas.csv

                     

## 2. Pipelines de Datos

### ¬øQu√© es?

Un **pipeline de datos** es una secuencia automatizada de pasos que mueve y transforma datos desde un origen hasta un destino. Funciona como una cadena de montaje digital: cada etapa recibe datos, los procesa y los pasa a la siguiente.

**Componentes t√≠picos:**

1. **Source** ‚Üí Base de datos, API, archivos, sensores/IoT
2. **Ingestion** ‚Üí Conectores, schedulers, validaci√≥n inicial
3. **Processing** ‚Üí Transformaciones, limpieza, agregaciones
4. **Storage** ‚Üí Data Lake, Data Warehouse, cach√©
5. **Serving** ‚Üí APIs, dashboards, reportes, modelos ML

**Tipos de pipelines:**

| Tipo | Descripci√≥n | Ejemplo |
|---|---|---|
| **Batch** | Procesa en intervalos (horario, diario) | Reporte diario de ventas |
| **Streaming** | Procesa continuamente, baja latencia | Detecci√≥n de fraude en tiempo real |
| **H√≠brido** | Combina batch y streaming | Alertas streaming + an√°lisis hist√≥rico batch |

**Herramientas comunes:** Scripts Python (simple), Apache Airflow (est√°ndar industria), Prefect/Dagster (modernas), Cloud (AWS Glue, GCP Dataflow, Azure Data Factory), Streaming (Kafka, Spark Streaming).

**Mejores pr√°cticas:** Idempotencia, manejo de errores con retries, logging completo, testing con datos de prueba, monitoreo con alertas, documentaci√≥n y versionado.

### ¬øPara qu√© sirve?

Construir pipelines de datos sirve para:

- **Automatizar flujos repetitivos** eliminando la intervenci√≥n manual en procesos de datos
- **Conectar m√∫ltiples sistemas** desde fuentes de origen hasta almacenamiento y herramientas de BI
- **Elegir entre batch y streaming** seg√∫n las necesidades de latencia del negocio
- **Escalar el procesamiento** conforme crecen los vol√∫menes de datos
- **Monitorear y alertar** cuando algo falla en el procesamiento de datos
- **Garantizar reproducibilidad** donde los mismos inputs siempre producen los mismos outputs

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, construimos un pipeline con una clase Python que implementa los pasos de extracci√≥n, validaci√≥n, transformaci√≥n y carga, con logging de eventos y manejo de errores.

In [8]:
print("=" * 70)
print("EJEMPLO: PIPELINE DE PROCESAMIENTO".center(70))
print("=" * 70)

class DataPipeline:
    """Pipeline simple de procesamiento de datos"""
    
    def __init__(self, name):
        self.name = name
        self.logs = []
        
    def log(self, message):
        """Registrar eventos del pipeline"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        log_entry = f"[{timestamp}] {message}"
        self.logs.append(log_entry)
        print(f"üìù {log_entry}")
    
    def extract(self, source):
        """Paso 1: Extraer datos"""
        self.log(f"EXTRACT: Extrayendo datos de {source}...")
        time.sleep(0.5)  # Simular tiempo de extracci√≥n
        
        # Simular datos extra√≠dos
        data = pd.DataFrame({
            'id': range(1, 101),
            'valor': np.random.randint(10, 1000, 100),
            'categoria': np.random.choice(['A', 'B', 'C'], 100),
            'fecha': pd.date_range('2024-01-01', periods=100, freq='D')
        })
        
        self.log(f"‚úì Extra√≠dos {len(data)} registros")
        return data
    
    def validate(self, data):
        """Paso 2: Validar datos"""
        self.log("VALIDATE: Validando calidad de datos...")
        time.sleep(0.3)
        
        # Validaciones
        checks = {
            'No hay datos': len(data) == 0,
            'Valores nulos': data.isna().any().any(),
            'Duplicados': data.duplicated().any()
        }
        
        issues = [check for check, failed in checks.items() if failed]
        
        if issues:
            self.log(f"‚ö†Ô∏è Problemas encontrados: {', '.join(issues)}")
        else:
            self.log("‚úì Validaci√≥n exitosa, datos limpios")
            
        return data, issues
    
    def transform(self, data):
        """Paso 3: Transformar datos"""
        self.log("TRANSFORM: Aplicando transformaciones...")
        time.sleep(0.5)
        
        # Agregar columnas calculadas
        data['valor_normalizado'] = (data['valor'] - data['valor'].mean()) / data['valor'].std()
        
        # Agregar informaci√≥n temporal
        data['dia_semana'] = data['fecha'].dt.day_name()
        
        # Agrupar y agregar
        resumen = data.groupby('categoria').agg({
            'valor': ['count', 'mean', 'sum'],
            'valor_normalizado': 'mean'
        }).round(2)
        
        self.log(f"‚úì Transformaciones aplicadas")
        return data, resumen
    
    def load(self, data, destination):
        """Paso 4: Cargar datos"""
        self.log(f"LOAD: Cargando datos a {destination}...")
        time.sleep(0.3)
        
        # En producci√≥n: cargar a base de datos
        # data.to_sql(destination, con=engine, if_exists='replace')
        
        self.log(f"‚úì {len(data)} registros cargados exitosamente")
        return True
    
    def run(self, source, destination):
        """Ejecutar pipeline completo"""
        print(f"\n{'='*70}")
        print(f"üöÄ Iniciando Pipeline: {self.name}".center(70))
        print(f"{'='*70}\n")
        
        try:
            # 1. Extract
            data = self.extract(source)
            
            # 2. Validate
            data, issues = self.validate(data)
            
            # 3. Transform
            data_transformed, resumen = self.transform(data)
            
            # 4. Load
            success = self.load(data_transformed, destination)
            
            # Resumen
            print(f"\n{'='*70}")
            print("‚úÖ PIPELINE COMPLETADO EXITOSAMENTE".center(70))
            print(f"{'='*70}\n")
            
            print("üìä RESUMEN POR CATEGOR√çA:")
            print(resumen)
            
            return data_transformed
            
        except Exception as e:
            self.log(f"‚ùå ERROR: {str(e)}")
            print(f"\n{'='*70}")
            print("‚ùå PIPELINE FALL√ì".center(70))
            print(f"{'='*70}")
            raise
        
        finally:
            print(f"\nüìã Total de eventos registrados: {len(self.logs)}")

# Ejecutar pipeline
pipeline = DataPipeline("Procesamiento Ventas Diario")
datos_finales = pipeline.run(
    source="base_datos_ventas",
    destination="warehouse_ventas_procesadas"
)

print("\nüí° Este es un pipeline simple. En producci√≥n:")
print("  ‚Ä¢ Usar√≠as Airflow para orquestaci√≥n")
print("  ‚Ä¢ Tendr√≠as m√∫ltiples pipelines interconectados")
print("  ‚Ä¢ Monitorear√≠as con herramientas como Grafana")
print("  ‚Ä¢ Usar√≠as colas de mensajes (Kafka, RabbitMQ)")
print("  ‚Ä¢ Implementar√≠as reintentos autom√°ticos")

                  EJEMPLO: PIPELINE DE PROCESAMIENTO                  

          üöÄ Iniciando Pipeline: Procesamiento Ventas Diario           

üìù [2026-02-21 22:22:53] EXTRACT: Extrayendo datos de base_datos_ventas...
üìù [2026-02-21 22:22:53] ‚úì Extra√≠dos 100 registros
üìù [2026-02-21 22:22:53] VALIDATE: Validando calidad de datos...
üìù [2026-02-21 22:22:54] ‚úì Validaci√≥n exitosa, datos limpios
üìù [2026-02-21 22:22:54] TRANSFORM: Aplicando transformaciones...
üìù [2026-02-21 22:22:54] ‚úì Transformaciones aplicadas
üìù [2026-02-21 22:22:54] LOAD: Cargando datos a warehouse_ventas_procesadas...
üìù [2026-02-21 22:22:54] ‚úì 100 registros cargados exitosamente

                  ‚úÖ PIPELINE COMPLETADO EXITOSAMENTE                  

üìä RESUMEN POR CATEGOR√çA:
          valor                valor_normalizado
          count    mean    sum              mean
categoria                                       
A            37  519.03  19204              0.08
B            

## 3. Formatos de Datos

### ¬øQu√© es?

Los **formatos de datos** son las estructuras y codificaciones que definen c√≥mo se almacenan, organizan y transmiten los datos en archivos. Los m√°s comunes en ingenier√≠a de datos son CSV (texto plano tabular), JSON (pares clave-valor), Parquet (binario columnar) y Avro (binario orientado a filas), cada uno con caracter√≠sticas distintas de velocidad, tama√±o y compatibilidad.

**Comparaci√≥n de formatos:**

| Formato | Tipo | Legible | Compresi√≥n | Velocidad | Uso principal |
|---|---|---|---|---|---|
| **CSV** | Texto plano | S√≠ | No | Lenta | Intercambio, exploraci√≥n |
| **JSON** | Texto clave-valor | S√≠ | No | Lenta | APIs, datos anidados |
| **Parquet** | Binario columnar | No | Excelente (90%‚Üì) | Muy r√°pida | Analytics, Data Lake |
| **Avro** | Binario filas | No | Buena | R√°pida escritura | Streaming, Kafka |

**Detalles por formato:**

- **CSV**: Universal y editable manualmente. Sin tipos de datos (todo es texto), sin compresi√≥n nativa. Problemas con separadores dentro de los datos. Ideal para compatibilidad m√°xima.
- **JSON**: Estructura flexible con datos anidados. Est√°ndar para APIs web y configuraciones. Soporta tipos b√°sicos pero puede ser verboso para grandes vol√∫menes.
- **Parquet**: Lee solo las columnas necesarias (column pruning). Preserva esquema y tipos. Excelente compresi√≥n (hasta 90% menos que CSV). Soporta particionamiento. Compatible con Spark, Hive, etc.
- **Avro**: Esquema auto-contenido que evoluciona sin romper compatibilidad. Optimizado para escritura r√°pida y streaming (Kafka). Menos com√∫n que Parquet en analytics.

**Regla general de uso:**

- **Desarrollo/Exploraci√≥n** ‚Üí CSV, JSON (f√°cil de inspeccionar)
- **Producci√≥n/Analytics** ‚Üí Parquet (eficiencia y velocidad)
- **APIs y microservicios** ‚Üí JSON (est√°ndar web)
- **Streaming** ‚Üí Avro o Parquet
- **Intercambio con externos** ‚Üí CSV (compatibilidad universal)

### ¬øPara qu√© sirve?

Conocer los formatos de datos sirve para:

- **Elegir el formato √≥ptimo** seg√∫n el caso de uso: exploraci√≥n, producci√≥n, intercambio o streaming
- **Reducir costos de almacenamiento** usando formatos comprimidos como Parquet (hasta 90% menos que CSV)
- **Acelerar la lectura y escritura** seleccionando formatos binarios para grandes vol√∫menes
- **Preservar tipos de datos y esquemas** evitando p√©rdida de informaci√≥n en conversiones
- **Mejorar el rendimiento de queries** con formatos columnares que leen solo las columnas necesarias
- **Facilitar la interoperabilidad** entre diferentes sistemas, plataformas y herramientas

La elecci√≥n del formato correcto impacta directamente en la eficiencia de todo el pipeline de datos.

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, crearemos un dataset de ejemplo y lo guardaremos en CSV, JSON y Parquet, comparando tiempos de escritura, lectura, tama√±o de archivo y rendimiento en lectura selectiva de columnas.

In [9]:
print("=" * 70)
print("COMPARACI√ìN PR√ÅCTICA DE FORMATOS".center(70))
print("=" * 70)

# Crear dataset de ejemplo
np.random.seed(42)
df = pd.DataFrame({
    'id': range(1, 10001),
    'nombre': [f'Producto_{i}' for i in range(1, 10001)],
    'categoria': np.random.choice(['Electr√≥nica', 'Hogar', 'Deportes', 'Ropa'], 10000),
    'precio': np.round(np.random.uniform(10, 1000, 10000), 2),
    'cantidad': np.random.randint(1, 100, 10000),
    'fecha': pd.date_range('2023-01-01', periods=10000, freq='D')
})

print(f"\nüìä Dataset de ejemplo: {len(df)} filas, {len(df.columns)} columnas")
print(df.head())

# Funci√≥n para obtener tama√±o de archivo
def get_file_size(filename):
    if os.path.exists(filename):
        size_bytes = os.path.getsize(filename)
        size_mb = size_bytes / (1024 * 1024)
        return size_bytes, size_mb
    return 0, 0

# 1. Guardar como CSV
print("\n1Ô∏è‚É£ Guardando como CSV...")
csv_file = 'datos_ejemplo.csv'
start_time = time.time()
df.to_csv(csv_file, index=False)
csv_time = time.time() - start_time
csv_bytes, csv_mb = get_file_size(csv_file)
print(f"   ‚úì Tiempo: {csv_time:.3f}s | Tama√±o: {csv_mb:.2f} MB")

# 2. Guardar como JSON
print("\n2Ô∏è‚É£ Guardando como JSON...")
json_file = 'datos_ejemplo.json'
start_time = time.time()
df.to_json(json_file, orient='records', date_format='iso')
json_time = time.time() - start_time
json_bytes, json_mb = get_file_size(json_file)
print(f"   ‚úì Tiempo: {json_time:.3f}s | Tama√±o: {json_mb:.2f} MB")

# 3. Guardar como Parquet
print("\n3Ô∏è‚É£ Guardando como PARQUET...")
parquet_file = 'datos_ejemplo.parquet'
start_time = time.time()
df.to_parquet(parquet_file, engine='pyarrow', compression='snappy')
parquet_time = time.time() - start_time
parquet_bytes, parquet_mb = get_file_size(parquet_file)
print(f"   ‚úì Tiempo: {parquet_time:.3f}s | Tama√±o: {parquet_mb:.2f} MB")

# Comparaci√≥n de resultados
print("\n" + "=" * 70)
print("üìä COMPARACI√ìN DE RESULTADOS".center(70))
print("=" * 70)

resultados = pd.DataFrame({
    'Formato': ['CSV', 'JSON', 'Parquet'],
    'Tama√±o (MB)': [csv_mb, json_mb, parquet_mb],
    'Tiempo Escritura (s)': [csv_time, json_time, parquet_time],
    '% vs CSV': [100, (json_mb/csv_mb)*100, (parquet_mb/csv_mb)*100]
})

print(resultados.to_string(index=False))
print(f"\nüí° Parquet es {csv_mb/parquet_mb:.1f}x m√°s peque√±o que CSV")

# Velocidad de lectura
print("\n" + "=" * 70)
print("‚ö° VELOCIDAD DE LECTURA".center(70))
print("=" * 70)

start_time = time.time()
df_csv = pd.read_csv(csv_file)
csv_read_time = time.time() - start_time
print(f"CSV:     {csv_read_time:.3f}s")

start_time = time.time()
df_json = pd.read_json(json_file)
json_read_time = time.time() - start_time
print(f"JSON:    {json_read_time:.3f}s")

start_time = time.time()
df_parquet = pd.read_parquet(parquet_file)
parquet_read_time = time.time() - start_time
print(f"Parquet: {parquet_read_time:.3f}s")

print(f"\nüí° Parquet es {csv_read_time/parquet_read_time:.1f}x m√°s r√°pido que CSV en lectura")

# Lectura selectiva de columnas (ventaja clave de Parquet)
print("\n" + "=" * 70)
print("üìñ LECTURA SELECTIVA DE COLUMNAS".center(70))
print("=" * 70)

columnas_necesarias = ['id', 'categoria', 'precio']

start_time = time.time()
df_csv_cols = pd.read_csv(csv_file, usecols=columnas_necesarias)
csv_selective_time = time.time() - start_time

start_time = time.time()
df_parquet_cols = pd.read_parquet(parquet_file, columns=columnas_necesarias)
parquet_selective_time = time.time() - start_time

print(f"CSV (lectura selectiva):     {csv_selective_time:.3f}s")
print(f"Parquet (lectura selectiva): {parquet_selective_time:.3f}s")
print(f"\nüí° Para lectura selectiva, Parquet es {csv_selective_time/parquet_selective_time:.1f}x m√°s eficiente")

# Limpiar archivos temporales
for file in [csv_file, json_file, parquet_file]:
    if os.path.exists(file):
        os.remove(file)

print("\n‚úÖ Archivos temporales eliminados")

                   COMPARACI√ìN PR√ÅCTICA DE FORMATOS                   

üìä Dataset de ejemplo: 10000 filas, 6 columnas
   id      nombre    categoria  precio  cantidad      fecha
0   1  Producto_1     Deportes  399.70        28 2023-01-01
1   2  Producto_2         Ropa  478.70        22 2023-01-02
2   3  Producto_3  Electr√≥nica  856.00        16 2023-01-03
3   4  Producto_4     Deportes  346.60        13 2023-01-04
4   5  Producto_5     Deportes  870.95        62 2023-01-05

1Ô∏è‚É£ Guardando como CSV...
   ‚úì Tiempo: 0.033s | Tama√±o: 0.47 MB

2Ô∏è‚É£ Guardando como JSON...
   ‚úì Tiempo: 0.022s | Tama√±o: 1.18 MB

3Ô∏è‚É£ Guardando como PARQUET...
   ‚úì Tiempo: 0.118s | Tama√±o: 0.28 MB

                     üìä COMPARACI√ìN DE RESULTADOS                      
Formato  Tama√±o (MB)  Tiempo Escritura (s)   % vs CSV
    CSV     0.465096              0.032845 100.000000
   JSON     1.180449              0.022075 253.807762
Parquet     0.279115              0.117951  60.012344



## 4. APIs y Web Scraping

### ¬øQu√© es?

Las **APIs (Application Programming Interfaces)** son interfaces estructuradas y oficiales que permiten a las aplicaciones comunicarse entre s√≠ para intercambiar datos. El **web scraping** es la t√©cnica de extraer datos directamente del HTML de p√°ginas web cuando no existe una API disponible. Ambos son m√©todos esenciales para obtener datos de la web.

**Tipos de APIs:**

| Tipo | Descripci√≥n | Formato | Uso actual |
|---|---|---|---|
| **REST** | HTTP est√°ndar (GET, POST, PUT, DELETE) | JSON | M√°s com√∫n en la web |
| **GraphQL** | Solicitas exactamente los datos necesarios | JSON | Moderno y eficiente |
| **SOAP** | Protocolo formal basado en XML | XML | Legacy, menos com√∫n hoy |

**Conceptos clave de APIs:**

- **Endpoint**: URL espec√≠fica de la API (ej: `api.ejemplo.com/users`)
- **Authentication**: API key, OAuth, tokens de acceso
- **Rate limiting**: L√≠mite de requests por tiempo (ej: 100 req/minuto)
- **Pagination**: Dividir resultados grandes en p√°ginas
- **Status codes**: 200 OK, 401 Unauthorized, 404 Not Found, 429 Too Many Requests

**Herramientas Python para Web Scraping:**

- **requests**: Hacer peticiones HTTP y obtener contenido HTML
- **BeautifulSoup**: Parsear HTML/XML, navegar estructura DOM
- **Selenium**: Automatizar navegador real, JavaScript din√°mico
- **Scrapy**: Framework completo para spiders y crawlers

**¬øAPI o Web Scraping?**

- **Usa API cuando**: est√° disponible, necesitas datos estructurados, es un proyecto en producci√≥n
- **Usa Scraping cuando**: no hay API, datos son p√∫blicos, es investigaci√≥n puntual y legal

**Consideraciones legales del scraping:**

- Revisar t√©rminos de servicio del sitio y respetar `robots.txt`
- No sobrecargar servidores (usar rate limiting con delays)
- Evitar scraping de datos personales sensibles o datos detr√°s de login
- Dar atribuci√≥n cuando se usen los datos obtenidos

### ¬øPara qu√© sirve?

Dominar APIs y web scraping sirve para:

- **Obtener datos de fuentes externas** como redes sociales, servicios financieros, APIs gubernamentales y sitios web
- **Automatizar la recolecci√≥n** de datos que se actualizan frecuentemente
- **Integrar m√∫ltiples fuentes** en un mismo pipeline de an√°lisis
- **Construir datasets enriquecidos** combinando datos internos con informaci√≥n externa
- **Implementar manejo robusto de errores** con reintentos, timeouts y rate limiting
- **Conocer las consideraciones legales y √©ticas** del scraping para actuar responsablemente

En an√°lisis de datos, saber consumir APIs es una habilidad fundamental para acceder a datos actualizados y estructurados.

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, consumiremos APIs p√∫blicas con la librer√≠a `requests`, manejaremos par√°metros de consulta e implementaremos funciones robustas con reintentos y manejo de errores.

In [10]:
print("=" * 70)
print("EJEMPLO 1: CONSUMIR API P√öBLICA".center(70))
print("=" * 70)

# API p√∫blica gratuita (no requiere key)
print("\nüì° Consumiendo API p√∫blica: JSONPlaceholder")
print("   (API de prueba con datos ficticios)")

try:
    url = "https://jsonplaceholder.typicode.com/users"
    response = requests.get(url, timeout=10)
    
    print(f"\n Status code: {response.status_code}", end="")
    if response.status_code == 200:
        print(" ‚úÖ (OK)")
    else:
        print(f" ‚ùå (Error)")
    
    usuarios = response.json()
    print(f"‚úì Obtenidos {len(usuarios)} usuarios")
    
    df_usuarios = pd.DataFrame(usuarios)
    
    print("\nüìä Datos obtenidos:")
    print(df_usuarios[['id', 'name', 'email', 'company']].head())
    
    print("\nüìà An√°lisis r√°pido:")
    print(f"  ‚Ä¢ Total usuarios: {len(df_usuarios)}")
    print(f"  ‚Ä¢ Campos disponibles: {', '.join(df_usuarios.columns)}")
    
    df_usuarios['company_name'] = df_usuarios['company'].apply(lambda x: x['name'])
    print(f"\nüè¢ Empresas √∫nicas: {df_usuarios['company_name'].nunique()}")
    print(df_usuarios['company_name'].value_counts().head())
    
except requests.exceptions.RequestException as e:
    print(f"‚ùå Error al conectar con API: {e}")

# EJEMPLO 2: API con par√°metros de consulta
print("\n" + "=" * 70)
print("EJEMPLO 2: API CON PAR√ÅMETROS".center(70))
print("=" * 70)

print("\nüîç Obteniendo posts de un usuario espec√≠fico")

try:
    url = "https://jsonplaceholder.typicode.com/posts"
    params = {
        'userId': 1,
        '_limit': 5
    }
    
    response = requests.get(url, params=params, timeout=10)
    
    if response.status_code == 200:
        posts = response.json()
        df_posts = pd.DataFrame(posts)
        
        print(f"‚úì Obtenidos {len(df_posts)} posts")
        print("\nüìù T√≠tulos de posts:")
        for idx, post in df_posts.iterrows():
            print(f"  {idx+1}. {post['title'][:50]}...")
            
except requests.exceptions.RequestException as e:
    print(f"‚ùå Error: {e}")

# EJEMPLO 3: Funci√≥n robusta para APIs con reintentos
print("\n" + "=" * 70)
print("EJEMPLO 3: FUNCI√ìN ROBUSTA PARA APIs".center(70))
print("=" * 70)

def obtener_datos_api(url, max_retries=3):
    """Funci√≥n robusta para consumir APIs con reintentos"""
    
    for intento in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            
            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                print(f"‚ö†Ô∏è Rate limit alcanzado, esperando...")
                time.sleep(5)
            elif response.status_code == 404:
                print(f"‚ùå Recurso no encontrado: {url}")
                return None
            else:
                print(f"‚ö†Ô∏è Status code: {response.status_code}")
                
        except requests.exceptions.Timeout:
            print(f"‚è±Ô∏è Timeout en intento {intento+1}/{max_retries}")
        except requests.exceptions.ConnectionError:
            print(f"üîå Error de conexi√≥n en intento {intento+1}/{max_retries}")
        except Exception as e:
            print(f"‚ùå Error inesperado: {e}")
            
        if intento < max_retries - 1:
            time.sleep(2)
    
    print(f"‚ùå Fallaron todos los intentos")
    return None

# Probar funci√≥n robusta
print("\nüß™ Probando funci√≥n robusta:")
datos = obtener_datos_api("https://jsonplaceholder.typicode.com/users/1")
if datos:
    print(f"‚úÖ Datos obtenidos: {datos['name']}")

print("\n‚úÖ Ejemplos de APIs completados")

                   EJEMPLO 1: CONSUMIR API P√öBLICA                    

üì° Consumiendo API p√∫blica: JSONPlaceholder
   (API de prueba con datos ficticios)

 Status code: 200 ‚úÖ (OK)
‚úì Obtenidos 10 usuarios

üìä Datos obtenidos:
   id              name                      email  \
0   1     Leanne Graham          Sincere@april.biz   
1   2      Ervin Howell          Shanna@melissa.tv   
2   3  Clementine Bauch         Nathan@yesenia.net   
3   4  Patricia Lebsack  Julianne.OConner@kory.org   
4   5  Chelsey Dietrich   Lucio_Hettinger@annie.ca   

                                             company  
0  {'name': 'Romaguera-Crona', 'catchPhrase': 'Mu...  
1  {'name': 'Deckow-Crist', 'catchPhrase': 'Proac...  
2  {'name': 'Romaguera-Jacobson', 'catchPhrase': ...  
3  {'name': 'Robel-Corkery', 'catchPhrase': 'Mult...  
4  {'name': 'Keebler LLC', 'catchPhrase': 'User-c...  

üìà An√°lisis r√°pido:
  ‚Ä¢ Total usuarios: 10
  ‚Ä¢ Campos disponibles: id, name, username, email, addres

## 5. Automatizaci√≥n de Procesos

### ¬øQu√© es?

La **automatizaci√≥n de procesos de datos** consiste en programar la ejecuci√≥n de tareas repetitivas (reportes, descargas, limpiezas, cargas) para que se realicen de forma aut√≥noma, sin intervenci√≥n manual. Utiliza herramientas como scripts Python con `schedule`, planificadores del sistema operativo (Task Scheduler, cron) u orquestadores profesionales como Apache Airflow.

**¬øQu√© automatizar?**

- **Buenos candidatos**: Reportes peri√≥dicos, descarga de datos de APIs, limpieza y transformaci√≥n, carga a base de datos, env√≠o de alertas, validaci√≥n de calidad, backups
- **No automatizar (a√∫n)**: Tareas √∫nicas, procesos que cambian constantemente, an√°lisis exploratorio, tareas que requieren juicio humano

> **Regla de tres**: Si haces algo m√°s de 3 veces ‚Üí automat√≠zalo.

**Herramientas de automatizaci√≥n:**

| Herramienta | Tipo | Complejidad | Uso |
|---|---|---|---|
| **Scripts Python** | Lo m√°s simple | Baja | Ejecuci√≥n manual o programada |
| **schedule** (Python) | Librer√≠a | Baja | Programaci√≥n simple en el script |
| **Cron / Task Scheduler** | Sistema operativo | Media | Ejecuci√≥n a horas espec√≠ficas |
| **Apache Airflow** | Orquestador | Alta | DAGs, monitoreo, retries, alertas |
| **Prefect / Dagster** | Orquestadores modernos | Media | Alternativa pythonic a Airflow |
| **AWS Lambda / Cloud Functions** | Cloud serverless | Media | Pago por uso, escala autom√°tica |

**Componentes de un proceso automatizado:**

1. **Trigger** (Disparador): Tiempo ("cada d√≠a a las 8 AM"), evento ("nuevo archivo"), o manual
2. **Ejecuci√≥n**: El trabajo real (ETL, an√°lisis), con manejo de errores y logging
3. **Notificaci√≥n**: √âxito ‚Üí log; Fallo ‚Üí email, Slack, SMS
4. **Monitoreo**: Dashboard de estado, historial, alertas de SLA

**Mejores pr√°cticas:**

- **Idempotencia**: Ejecutar m√∫ltiples veces = mismo resultado
- **Manejo de errores**: try/except, logging, reintentos autom√°ticos
- **Parametrizaci√≥n**: Configuraci√≥n en archivos externos, no hardcoded
- **Testing**: Probar con datos peque√±os antes de producci√≥n
- **Documentaci√≥n**: README, comentarios, diagramas de flujo

### ¬øPara qu√© sirve?

Automatizar procesos de datos sirve para:

- **Eliminar tareas manuales repetitivas** que consumen tiempo y son propensas a errores
- **Garantizar consistencia** ejecutando los mismos pasos de forma id√©ntica cada vez
- **Escalar operaciones** a cualquier volumen sin necesidad de m√°s personal
- **Liberar tiempo para an√°lisis** dedicando recursos a tareas de mayor valor
- **Implementar logging y monitoreo** para saber cu√°ndo algo falla y reaccionar r√°pido
- **Programar ejecuciones peri√≥dicas** con reportes diarios, semanales o mensuales autom√°ticos

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, crearemos un script de reporte autom√°tico de ventas con manejo de errores e implementaremos logging profesional para monitoreo.

In [11]:
print("=" * 70)
print("EJEMPLO 1: REPORTE AUTOM√ÅTICO DE VENTAS".center(70))
print("=" * 70)

def generar_reporte_ventas(fecha=None):
    """Genera reporte autom√°tico de ventas"""
    if fecha is None:
        fecha = datetime.now().strftime('%Y-%m-%d')
    
    print(f"\n{'='*60}")
    print(f"ü§ñ REPORTE AUTOM√ÅTICO DE VENTAS - {fecha}")
    print(f"{'='*60}")
    
    try:
        # 1. EXTRAER DATOS (simular)
        print("\n[1/5] üì• Extrayendo datos de ventas...")
        time.sleep(0.5)
        
        ventas = pd.DataFrame({
            'producto': ['Laptop', 'Mouse', 'Teclado', 'Monitor', 'Laptop'],
            'categoria': ['Electr√≥nica', 'Accesorios', 'Accesorios', 'Electr√≥nica', 'Electr√≥nica'],
            'cantidad': [2, 15, 8, 3, 1],
            'precio': [1000, 25, 50, 300, 1000],
            'fecha': [fecha] * 5
        })
        print(f"      ‚úì {len(ventas)} registros extra√≠dos")
        
        # 2. TRANSFORMAR
        print("\n[2/5] üîÑ Transformando datos...")
        time.sleep(0.3)
        ventas['total'] = ventas['cantidad'] * ventas['precio']
        print(f"      ‚úì Columnas calculadas agregadas")
        
        # 3. AN√ÅLISIS
        print("\n[3/5] üìä Generando an√°lisis...")
        time.sleep(0.3)
        
        total_ventas = ventas['total'].sum()
        total_items = ventas['cantidad'].sum()
        ticket_promedio = total_ventas / len(ventas)
        
        print(f"      ‚úì M√©tricas calculadas")
        
        # 4. GUARDAR REPORTE
        print("\n[4/5] üíæ Guardando reporte...")
        time.sleep(0.2)
        
        reporte_filename = f'reporte_ventas_{fecha}.csv'
        ventas.to_csv(reporte_filename, index=False)
        
        print(f"      ‚úì Guardado en {reporte_filename}")
        
        # 5. RESUMEN
        print("\n[5/5] üìã Resumen del d√≠a:")
        print(f"      ‚Ä¢ Total ventas: ${total_ventas:,.2f}")
        print(f"      ‚Ä¢ Items vendidos: {total_items}")
        print(f"      ‚Ä¢ Ticket promedio: ${ticket_promedio:,.2f}")
        print(f"      ‚Ä¢ Categor√≠a top: {ventas.groupby('categoria')['total'].sum().idxmax()}")
        
        # Limpiar archivo temporal
        if os.path.exists(reporte_filename):
            os.remove(reporte_filename)
        
        print(f"\n{'='*60}")
        print("‚úÖ REPORTE COMPLETADO EXITOSAMENTE")
        print(f"{'='*60}")
        
        return True
        
    except Exception as e:
        print(f"\n‚ùå ERROR al generar reporte: {e}")
        return False

# Ejecutar reporte
resultado = generar_reporte_ventas()

# EJEMPLO 2: Logging profesional para automatizaci√≥n
print("\n" + "=" * 70)
print("EJEMPLO 2: LOGGING PROFESIONAL".center(70))
print("=" * 70)

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def proceso_automatizado_con_logging():
    """Ejemplo de proceso con logging completo"""
    
    logger.info("="*60)
    logger.info("Iniciando proceso automatizado")
    logger.info("="*60)
    
    try:
        logger.info("Paso 1: Conectando a base de datos...")
        time.sleep(0.3)
        logger.info("‚úì Conexi√≥n establecida")
        
        logger.info("Paso 2: Extrayendo datos...")
        registros = 1000
        time.sleep(0.3)
        logger.info(f"‚úì {registros} registros extra√≠dos")
        
        logger.info("Paso 3: Procesando datos...")
        time.sleep(0.3)
        logger.info("‚úì Transformaciones aplicadas")
        
        logger.info("Paso 4: Generando reporte...")
        time.sleep(0.2)
        logger.info("‚úì Reporte generado")
        
        logger.info("="*60)
        logger.info("‚úÖ Proceso completado exitosamente")
        logger.info("="*60)
        
        return True
        
    except Exception as e:
        logger.error(f"‚ùå ERROR en el proceso: {e}")
        logger.exception("Traceback completo:")
        return False

# Ejecutar
proceso_automatizado_con_logging()

print("\n‚úÖ Ejemplos de automatizaci√≥n completados")

               EJEMPLO 1: REPORTE AUTOM√ÅTICO DE VENTAS                

ü§ñ REPORTE AUTOM√ÅTICO DE VENTAS - 2026-02-21

[1/5] üì• Extrayendo datos de ventas...
      ‚úì 5 registros extra√≠dos

[2/5] üîÑ Transformando datos...
      ‚úì Columnas calculadas agregadas

[3/5] üìä Generando an√°lisis...
      ‚úì M√©tricas calculadas

[4/5] üíæ Guardando reporte...
      ‚úì Guardado en reporte_ventas_2026-02-21.csv

[5/5] üìã Resumen del d√≠a:
      ‚Ä¢ Total ventas: $4,675.00
      ‚Ä¢ Items vendidos: 29
      ‚Ä¢ Ticket promedio: $935.00
      ‚Ä¢ Categor√≠a top: Electr√≥nica

‚úÖ REPORTE COMPLETADO EXITOSAMENTE

                    EJEMPLO 2: LOGGING PROFESIONAL                    


2026-02-21 22:23:47,518 - INFO - Iniciando proceso automatizado
2026-02-21 22:23:47,520 - INFO - Paso 1: Conectando a base de datos...
2026-02-21 22:23:47,821 - INFO - ‚úì Conexi√≥n establecida
2026-02-21 22:23:47,822 - INFO - Paso 2: Extrayendo datos...
2026-02-21 22:23:48,124 - INFO - ‚úì 1000 registros extra√≠dos
2026-02-21 22:23:48,126 - INFO - Paso 3: Procesando datos...
2026-02-21 22:23:48,427 - INFO - ‚úì Transformaciones aplicadas
2026-02-21 22:23:48,428 - INFO - Paso 4: Generando reporte...
2026-02-21 22:23:48,630 - INFO - ‚úì Reporte generado
2026-02-21 22:23:48,631 - INFO - ‚úÖ Proceso completado exitosamente



‚úÖ Ejemplos de automatizaci√≥n completados


## 6. Conceptos de Data Warehousing

### ¬øQu√© es?

Un **Data Warehouse** es un sistema centralizado de almacenamiento que integra datos de m√∫ltiples fuentes, optimizado para consultas anal√≠ticas y reportes. Se basa en el **modelado dimensional** (esquema estrella o copo de nieve) con tablas de hechos que contienen m√©tricas y tablas de dimensiones que proporcionan contexto descriptivo como productos, clientes, tiempo y ubicaciones.

**OLTP vs OLAP:**

| Aspecto | OLTP (Transaccional) | OLAP (Anal√≠tico) |
|---|---|---|
| Prop√≥sito | Operaciones del d√≠a a d√≠a | An√°lisis hist√≥rico |
| Operaciones | INSERT, UPDATE, DELETE frecuentes | Principalmente SELECT |
| Optimizado para | Escritura | Lectura |
| Datos | Actuales | Hist√≥ricos + actuales |
| Dise√±o | Normalizado (3NF) | Desnormalizado (estrella) |
| Ejemplo | Sistema de ventas, CRM, ERP | Data Warehouse, dashboards |

> **OLTP**: "Registrar esta venta de $100" ‚Üí **OLAP**: "¬øCu√°l fue el total de ventas por regi√≥n el a√±o pasado?"

**Esquema Estrella (Star Schema):**

La tabla **FACT** (hechos) central contiene m√©tricas num√©ricas (monto, cantidad, descuento) y claves for√°neas hacia las tablas **DIM** (dimensiones) que aportan contexto descriptivo:

- **Fact_Ventas**: venta_id, producto_id (FK), cliente_id (FK), fecha_id (FK), cantidad, monto, costo
- **Dim_Producto**: producto_id, nombre, categor√≠a, marca, precio_lista
- **Dim_Cliente**: cliente_id, nombre, ciudad, segmento
- **Dim_Tiempo**: fecha_id, a√±o, mes, d√≠a, trimestre, d√≠a_semana
- **Dim_Tienda**: tienda_id, nombre, ciudad, canal

**Slowly Changing Dimensions (SCD):**

| Tipo | Estrategia | Historial | Ejemplo |
|---|---|---|---|
| **SCD 0** | No cambiar | Original siempre | Fecha de nacimiento |
| **SCD 1** | Sobrescribir | Sin historial | Correcci√≥n de errores |
| **SCD 2** | Nueva fila con versi√≥n | Completo | Cliente cambia de ciudad |
| **SCD 3** | Columna anterior/actual | Limitado | Solo √∫ltimo cambio |

El **SCD Tipo 2** es el m√°s com√∫n: agrega una nueva fila con campos `version`, `fecha_inicio`, `fecha_fin` e `is_current` para rastrear todo el historial.

**Plataformas cloud modernas:**

- **Snowflake**: Separaci√≥n storage/compute, auto-scaling, pay-per-use
- **Google BigQuery**: Serverless, escalable, integraci√≥n Google Cloud
- **Amazon Redshift**: Basado en PostgreSQL, clusters configurables
- **Azure Synapse**: Integraci√≥n Microsoft, analytics a gran escala

### ¬øPara qu√© sirve?

Comprender data warehousing sirve para:

- **Entender la diferencia entre OLTP y OLAP** y por qu√© los sistemas anal√≠ticos se dise√±an diferente
- **Dise√±ar modelos dimensionales** con tablas de hechos y dimensiones para an√°lisis eficiente
- **Escribir queries m√°s eficientes** al comprender c√≥mo est√°n estructurados los datos en el warehouse
- **Manejar cambios hist√≥ricos** con Slowly Changing Dimensions (SCD Tipo 1, 2 y 3)
- **Conocer plataformas cloud modernas** como Snowflake, BigQuery, Redshift y Azure Synapse
- **Calcular KPIs y m√©tricas** usando la estructura fact-dimension de forma natural y escalable

El data warehouse es donde convergen todos los datos procesados para alimentar dashboards, reportes y modelos anal√≠ticos.

### ¬øC√≥mo se usa?

En el c√≥digo siguiente, crearemos un modelo estrella completo con tablas de dimensiones (producto, cliente, tiempo, tienda) y una tabla de hechos de ventas, para luego realizar an√°lisis por categor√≠a, canal y clientes top.

In [12]:
print("=" * 70)
print("EJEMPLO: MODELO ESTRELLA DE VENTAS".center(70))
print("=" * 70)

# 1. TABLAS DE DIMENSIONES
print("\n1Ô∏è‚É£ CREANDO TABLAS DE DIMENSIONES")

dim_producto = pd.DataFrame({
    'producto_id': [1, 2, 3, 4, 5],
    'nombre': ['Laptop HP', 'Mouse Logitech', 'Teclado Mec√°nico', 'Monitor LG', 'Webcam'],
    'categoria': ['Computadoras', 'Accesorios', 'Accesorios', 'Monitores', 'Accesorios'],
    'marca': ['HP', 'Logitech', 'Corsair', 'LG', 'Logitech'],
    'precio_lista': [1000, 25, 150, 300, 80]
})

dim_cliente = pd.DataFrame({
    'cliente_id': [101, 102, 103, 104],
    'nombre': ['Juan P√©rez', 'Mar√≠a Garc√≠a', 'Carlos L√≥pez', 'Ana Mart√≠nez'],
    'ciudad': ['Madrid', 'Barcelona', 'Madrid', 'Valencia'],
    'segmento': ['Premium', 'Regular', 'Premium', 'Regular']
})

dim_tiempo = pd.DataFrame({
    'fecha_id': pd.date_range('2024-01-01', periods=10, freq='D'),
    'a√±o': 2024,
    'mes': 1,
    'dia': range(1, 11),
    'dia_semana': pd.date_range('2024-01-01', periods=10, freq='D').day_name(),
    'trimestre': 1
})

dim_tienda = pd.DataFrame({
    'tienda_id': [1, 2, 3],
    'nombre_tienda': ['Tienda Centro', 'Tienda Norte', 'Online'],
    'ciudad': ['Madrid', 'Barcelona', 'Online'],
    'canal': ['F√≠sica', 'F√≠sica', 'Online']
})

print("‚úì Dim_Producto:", dim_producto.shape)
print("‚úì Dim_Cliente:", dim_cliente.shape)
print("‚úì Dim_Tiempo:", dim_tiempo.shape)
print("‚úì Dim_Tienda:", dim_tienda.shape)

# 2. TABLA DE HECHOS
print("\n2Ô∏è‚É£ CREANDO TABLA DE HECHOS")

np.random.seed(42)
fact_ventas = pd.DataFrame({
    'venta_id': range(1, 51),
    'producto_id': np.random.choice(dim_producto['producto_id'], 50),
    'cliente_id': np.random.choice(dim_cliente['cliente_id'], 50),
    'fecha_id': np.random.choice(dim_tiempo['fecha_id'], 50),
    'tienda_id': np.random.choice(dim_tienda['tienda_id'], 50),
    'cantidad': np.random.randint(1, 5, 50),
    'monto': np.random.uniform(20, 2000, 50).round(2),
    'costo': np.random.uniform(10, 1500, 50).round(2)
})

fact_ventas['margen'] = fact_ventas['monto'] - fact_ventas['costo']

print("‚úì Fact_Ventas:", fact_ventas.shape)
print("\nüìä Primeras filas de Fact_Ventas:")
print(fact_ventas.head())

# 3. AN√ÅLISIS CON MODELO DIMENSIONAL
print("\n" + "=" * 70)
print("3Ô∏è‚É£ AN√ÅLISIS CON MODELO DIMENSIONAL".center(70))
print("=" * 70)

# Join fact con dimensiones
ventas_completo = (
    fact_ventas
    .merge(dim_producto, on='producto_id', how='left')
    .merge(dim_cliente, on='cliente_id', how='left')
    .merge(dim_tienda, on='tienda_id', how='left')
)

# Ventas por categor√≠a
print("\nüìà Ventas por Categor√≠a de Producto:")
ventas_categoria = ventas_completo.groupby('categoria').agg({
    'monto': 'sum',
    'cantidad': 'sum',
    'venta_id': 'count',
    'margen': 'sum'
}).round(2)
ventas_categoria.columns = ['Monto Total', 'Unidades', 'Num Ventas', 'Margen Total']
print(ventas_categoria)

# Ventas por canal
print("\nüìä Ventas por Canal:")
ventas_canal = ventas_completo.groupby('canal').agg({
    'monto': ['sum', 'mean'],
    'venta_id': 'count'
}).round(2)
print(ventas_canal)

# Top clientes
print("\nüë• Top 5 Clientes:")
top_clientes = ventas_completo.groupby(['cliente_id', 'nombre_x']).agg({
    'monto': 'sum',
    'venta_id': 'count'
}).round(2)
top_clientes.columns = ['Monto Total', 'Num Compras']
print(top_clientes.nlargest(5, 'Monto Total'))

# KPIs
print("\n" + "=" * 70)
print("üìä M√âTRICAS CLAVE (KPIs)".center(70))
print("=" * 70)

total_ventas = fact_ventas['monto'].sum()
total_margen = fact_ventas['margen'].sum()
margen_pct = (total_margen / total_ventas * 100)
ticket_promedio = fact_ventas['monto'].mean()
unidades_totales = fact_ventas['cantidad'].sum()

print(f"""
  üí∞ Total Ventas:      ${total_ventas:,.2f}
  üìà Total Margen:      ${total_margen:,.2f}
  üìä Margen %:          {margen_pct:.1f}%
  üõí Ticket Promedio:   ${ticket_promedio:,.2f}
  üì¶ Unidades Vendidas: {unidades_totales:,}
  üî¢ N√∫mero de Ventas:  {len(fact_ventas):,}
""")

print("‚úÖ Modelo dimensional completado")

                  EJEMPLO: MODELO ESTRELLA DE VENTAS                  

1Ô∏è‚É£ CREANDO TABLAS DE DIMENSIONES
‚úì Dim_Producto: (5, 5)
‚úì Dim_Cliente: (4, 4)
‚úì Dim_Tiempo: (10, 6)
‚úì Dim_Tienda: (3, 4)

2Ô∏è‚É£ CREANDO TABLA DE HECHOS
‚úì Fact_Ventas: (50, 9)

üìä Primeras filas de Fact_Ventas:
   venta_id  producto_id  cliente_id   fecha_id  tienda_id  cantidad    monto  \
0         1            4         104 2024-01-09          1         4   951.20   
1         2            5         103 2024-01-09          2         4  1967.18   
2         3            3         104 2024-01-01          1         4   809.67   
3         4            5         103 2024-01-09          3         1  1636.54   
4         5            5         104 2024-01-07          3         4  1600.72   

     costo   margen  
0   733.56   217.64  
1   678.15  1289.03  
2  1491.74  -682.07  
3   272.13  1364.41  
4    36.93  1563.79  

                 3Ô∏è‚É£ AN√ÅLISIS CON MODELO DIMENSIONAL                  

ü

## Resumen y Pr√≥ximos Pasos

### ¬øQu√© es?

El **resumen del m√≥dulo** consolida todos los conceptos de ingenier√≠a de datos cubiertos: ETL/ELT, pipelines, formatos de datos, APIs, automatizaci√≥n y data warehousing. Proporciona una visi√≥n integral de c√≥mo estos elementos se conectan en la pr√°ctica profesional y c√≥mo aplicarlos en el rol de analista de datos.

**Lo que has aprendido:**

| Tema | Conceptos Clave |
|---|---|
| **ETL vs ELT** | Diferencias entre paradigmas, cu√°ndo usar cada uno, tendencia ELT en cloud |
| **Pipelines** | Componentes, batch vs streaming, herramientas (Airflow, schedule) |
| **Formatos** | CSV, JSON, Parquet, Avro ‚Äî trade-offs de velocidad, tama√±o y compatibilidad |
| **APIs** | Consumir APIs REST, manejo de errores, rate limiting, consideraciones legales |
| **Automatizaci√≥n** | Scripts automatizados, logging profesional, Task Scheduler, Airflow |
| **Data Warehousing** | OLTP vs OLAP, modelado estrella, SCD, plataformas cloud |

**Conexi√≥n con tu rol de analista:**

- **Entender de d√≥nde vienen tus datos** y comunicarte mejor con ingenieros
- **Ser m√°s aut√≥nomo** creando pipelines simples y automatizando an√°lisis repetitivos
- **Mejorar tus an√°lisis** escribiendo queries eficientes sobre modelos dimensionales
- **Avanzar en tu carrera** hacia roles h√≠bridos como **Analytics Engineer**

**Rol emergente ‚Äî Analytics Engineer:**

Combina lo mejor de ambos mundos: transforma datos en el warehouse (dbt), modela datos para an√°lisis, crea m√©tricas centralizadas y documenta/testea modelos. Requiere SQL avanzado, Python, dbt, Git y pensamiento anal√≠tico.

**Checklist de autoevaluaci√≥n:**

- [ ] Entiendo la diferencia entre ETL y ELT
- [ ] S√© qu√© es un pipeline de datos y sus componentes
- [ ] Conozco los trade-offs entre CSV, JSON y Parquet
- [ ] Puedo consumir APIs con Python y manejar errores
- [ ] S√© implementar logging y automatizar procesos
- [ ] Entiendo OLTP vs OLAP y el modelado dimensional
- [ ] Conozco las plataformas cloud de Data Warehousing

> Si marcaste m√°s de 5, ¬°vas muy bien! Si menos de 4, repasa las secciones espec√≠ficas.

**Pr√≥ximos pasos sugeridos:**

1. **Proyecto ETL Personal**: Elegir API p√∫blica ‚Üí extraer ‚Üí limpiar ‚Üí guardar en Parquet ‚Üí automatizar ‚Üí dashboard
2. **Explorar herramientas**: Postman para APIs, free tier de Snowflake/BigQuery, dbt para transformaciones
3. **Lecturas recomendadas**: "Fundamentals of Data Engineering" (Reis & Housley), "The Data Warehouse Toolkit" (Kimball)

### ¬øPara qu√© sirve?

Este resumen sirve para:

- **Repasar los conceptos clave** de cada secci√≥n y verificar tu comprensi√≥n
- **Conectar la ingenier√≠a de datos con el rol de analista** entendiendo c√≥mo te hace m√°s vers√°til
- **Identificar √°reas de mejora** con el checklist de autoevaluaci√≥n
- **Planificar tus pr√≥ximos pasos** con proyectos pr√°cticos y recursos de aprendizaje
- **Conocer el rol de Analytics Engineer** como evoluci√≥n natural que combina an√°lisis e ingenier√≠a

### ¬øC√≥mo se usa?

Revisa el checklist arriba y marca los conceptos que domines. El c√≥digo siguiente muestra un mensaje de finalizaci√≥n del m√≥dulo.

In [13]:
print("=" * 70)
print("‚úÖ NOTEBOOK 12 COMPLETADO".center(70))
print("=" * 70)

print(f"\nüìÖ Completado: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

print("\nüöÄ Has cubierto los fundamentos de Ingenier√≠a de Datos:")
print("   1. ETL vs ELT")
print("   2. Pipelines de Datos")
print("   3. Formatos de Datos")
print("   4. APIs y Web Scraping")
print("   5. Automatizaci√≥n de Procesos")
print("   6. Data Warehousing")

print("\nüí° 'Data is the new oil, but pipelines make it flow.'")

                       ‚úÖ NOTEBOOK 12 COMPLETADO                       

üìÖ Completado: 2026-02-21 22:24:23

üöÄ Has cubierto los fundamentos de Ingenier√≠a de Datos:
   1. ETL vs ELT
   2. Pipelines de Datos
   3. Formatos de Datos
   4. APIs y Web Scraping
   5. Automatizaci√≥n de Procesos
   6. Data Warehousing

üí° 'Data is the new oil, but pipelines make it flow.'


---

## Referencias y Recursos Adicionales

### üìö Documentaci√≥n Oficial
- **Apache Airflow**: https://airflow.apache.org/docs/
- **Pandas** (Parquet): https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_parquet.html
- **Requests**: https://docs.python-requests.org/
- **Schedule**: https://schedule.readthedocs.io/

### üéì Cursos Recomendados
- Data Engineering Zoomcamp (DataTalks.Club) - Gratuito
- Airflow Fundamentals (Astronomer) - Gratuito
- dbt Fundamentals (dbt Labs) - Gratuito

### üìñ Libros
- "Fundamentals of Data Engineering" - Joe Reis & Matt Housley
- "The Data Warehouse Toolkit" - Ralph Kimball
- "Designing Data-Intensive Applications" - Martin Kleppmann

### üåê APIs P√∫blicas para Practicar
- https://github.com/public-apis/public-apis
- https://jsonplaceholder.typicode.com/
- https://rapidapi.com/

### üõ†Ô∏è Herramientas
- **Airflow**: Orquestaci√≥n de pipelines
- **dbt**: Transformaci√≥n de datos en warehouse
- **Prefect**: Alternativa moderna a Airflow
- **Great Expectations**: Validaci√≥n de calidad de datos

### üë• Comunidades
- r/dataengineering
- dbt Community Slack
- Data Engineering Weekly Newsletter
- Locally Optimistic (Blog)

---

**Notebook desarrollado como parte de la Ruta de Analista de Datos con Python**

*√öltima actualizaci√≥n: Febrero 2026*