<a href="https://colab.research.google.com/github/jazaineam1/BigData2026/blob/main/Cuadernos/4_ETL_ELT_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Sesión 5: Implementación de ETL mediante herramientas de flujos de trabajo

## Objetivos de la Sesión:
* Comparar los enfoques ETL (Extract, Transform, Load) y ELT (Extract, Load, Transform).
* Resolver un caso práctico de ingeniería de datos utilizando Python para construir un pipeline ETL/ELT.
* Entender conceptos clave como  ingesta de datos y herramientas de orquestación.
* Identificar y citar fuentes relevantes para el estudio de ETL/ELT.

---

## Bloque 1: Reto Estratégico y Arquitectura Conceptual (40 min)

### Reto Práctico: Consolidación de Datos Financieros Universitarios

Una universidad recibe **mensualmente tres archivos CSV** que contienen información financiera de sus estudiantes (por ejemplo, ingresos, gastos, becas, etc.).

**Características de los archivos:**
*   Pueden contener **registros duplicados**.
*   Pueden presentar **valores nulos** o errores en los datos.
*   El objetivo es **consolidar históricamente** estos datos para análisis financieros y de rendimiento estudiantil.

**Restricción Clave:**
*   **No se puede modificar el sistema de origen** de los archivos.

### Preguntas Iniciales para Discusión:

Para abordar este reto, es fundamental plantearse las siguientes preguntas que nos guiarán hacia el diseño de una arquitectura de datos robusta:

1.  **¿Dónde almacenar los datos crudos originales?**

2.  **Si llega un archivo corrupto, ¿qué hacemos?**

3.  **¿Cómo evitamos duplicados al procesar múltiples veces los mismos archivos? (Concepto de idempotencia).**
  
4.  **¿Dónde implementamos las reglas de negocio (cálculos, transformaciones)?**
   


In [None]:
import os
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# Crear carpeta staging
os.makedirs("staging", exist_ok=True)

np.random.seed(42)

def generar_datos_mes(mes, anio=2024, n_registros=100000):

    ids = np.random.randint(1, 3000, n_registros)  # Puede generar duplicados
    # Inicializar salarios y gastos como float para poder asignar None (que se convierte a NaN)
    salarios = np.random.randint(-1000, 8000, n_registros).astype(float)  # Incluye negativos
    gastos = np.random.randint(-500, 5000, n_registros).astype(float)     # Incluye negativos

    # Fechas con formatos inconsistentes
    fechas = []
    for _ in range(n_registros):
        fecha = datetime(anio, mes, random.randint(1, 28))
        formato = random.choice([
            "%Y-%m-%d",
            "%d/%m/%Y",
            "%m-%d-%Y"
        ])
        fechas.append(fecha.strftime(formato))

    # Correos (PII)
    correos = [f"usuario{random.randint(1,5000)}@universidad.edu" for _ in range(n_registros)]

    # Introducir valores nulos aleatorios
    salarios[np.random.choice(n_registros, size=300, replace=False)] = None
    gastos[np.random.choice(n_registros, size=200, replace=False)] = None

    df = pd.DataFrame({
        "id": ids,
        "salario": salarios,
        "gastos": gastos,
        "fecha": fechas,
        "correo": correos
    })

    # Introducir filas completamente corruptas
    for _ in range(10):
        df.loc[random.randint(0, n_registros-1)] = [None, None, None, "fecha_invalida", "correo_invalido"]

    # Introducir duplicados completos
    df = pd.concat([df, df.sample(100)], ignore_index=True)

    return df


# Generar archivos mensuales
for mes in [1, 2, 3, 4, 5, 6, 7]:
    df_mes = generar_datos_mes(mes)
    df_mes.to_csv(f"staging/finanzas_mes_{mes}.csv", index=False)

print("Archivos problemáticos generados en carpeta 'staging/'")

## Bloque 3: ETL clásico (Extracción, Transformación, Carga)

El modelo ETL clásico implica transformar los datos antes de cargarlos al almacén. Los pasos son:

*   **Extracción (Extract)**
En esta primera etapa, se recolectan datos brutos de múltiples orígenes, que pueden incluir bases de datos relacionales (SQL), sistemas CRM, archivos planos (como CSV), APIs, sensores de IoT o aplicaciones SaaS. Los datos extraídos suelen colocarse inicialmente en un área de preparación temporal conocida como staging area
*  **Transformación (Transform)**
Aquí, los datos extraídos se someten a una serie de reglas de negocio y procesos de limpieza en un servidor de procesamiento secundario antes de ser movidos al sistema de destino. Las tareas principales incluyen:
>* Limpieza y validación: Se eliminan datos inconsistentes, se manejan valores nulos y se corrigen errores de formato.
>* Estandarización: Se unifican formatos, como fechas o unidades de medida.
>* Deduplicación: Se identifican y eliminan registros repetidos.
>* Seguridad y cumplimiento: Se aplican técnicas de enmascaramiento o anonimización de información personal identificable (PII) para cumplir con regulaciones como GDPR o HIPAA antes de que el dato toque el almacenamiento final.
> * Derivación y enriquecimiento: Construcción de nuevas variables, métricas y atributos que representan la lógica empresarial y permiten análisis posteriores.
*    Carga (Load)
En el paso final, los datos ya transformados, estructurados y limpios se guardan en el sistema de destino (como un Data Warehouse OLAP). Una vez cargados, los datos están listos para ser utilizados directamente por herramientas de Inteligencia de Negocios (BI), analistas y científicos de datos para generar informes y conocimientos estratégicos

### Características del ETL Clásico
*  Esquema de escritura (Schema-on-Write): Requiere que la estructura y el esquema del destino se definan meticulosamente antes de cargar los datos.
* Enfoque en TI: Tradicionalmente, estos procesos han sido diseñados y gestionados por equipos especializados de ingeniería de datos o departamentos de TI.
* Procesamiento por lotes: Por lo general, se ejecuta en intervalos programados (diarios, semanales), aunque existen implementaciones en tiempo real.
* Herramientas comunes: Algunas de las plataformas más conocidas para este flujo son Informatica PowerCenter, Talend y Microsoft SSIS.

### Implementación en Python

Utilizaremos `pandas` para procesar los CSV y una base de datos `SQLite` como almacén simulado. El siguiente código ejecuta un pipeline ETL:

### Extracción

In [None]:
import os
import pandas as pd

archivos = [f for f in os.listdir('staging') if f.endswith('.csv')]

dataframes = []

for fname in archivos:
    df = pd.read_csv(os.path.join('staging', fname))
    print(f"Archivo {fname} leído con {len(df)} registros")
    dataframes.append(df)

# Unificamos todos los meses
df_total = pd.concat(dataframes, ignore_index=True)

print(f"Total registros extraídos: {len(df_total)}")

In [None]:
df_total.head()

* El dato pasa de disco a memoria.

* Aún no hay reglas de negocio.

* Solo estamos recolectando.

### Transformación
1. Tratamiento de valores nulos

Durante la transformación, existen varias formas de abordar los datos faltantes:

*  Imputación: Consiste en rellenar los valores nulos con datos específicos para no perder la fila completa. Puede optar por valores predeterminados como "0" para campos numéricos, o etiquetas como "Desconocido" o "Otros" para categorías. También es común usar métodos estadísticos como imputar el promedio o la moda, pero depende de las reglas de negocio.
* Filtrado o Eliminación: Si un campo crítico (como un ID de cliente o una fecha de transacción) es nulo, la práctica común es descartar la fila para evitar análisis sesgados.
* Validación de Calidad: Implementar pruebas de nulos (como el test not_null en dbt) permite marcar registros obligatorios que faltan y generar alertas antes de que los datos lleguen a los informes finales.

**Campos críticos: id y fecha**: Es una regla estructural para este caso, si falta identidad o temporalidad, el registro no es analizable.

In [None]:
df_total.isna().any(axis=0)

In [None]:
registros_antes = len(df_total)
df_total = df_total.dropna(subset=['id', 'fecha'])
print("Registros eliminados por id/fecha nulos:", registros_antes - len(df_total))

La imputación masiva puede introducir ruido o sesgos artificiales en los modelos de aprendizaje automático si no se realiza con cuidado, alterando las líneas de ajuste y la exactitud de los resultados, para nuestro ejercico tener gastos nulos indicará que tienen valor 0.

Más información  en  Rubin, D. B. (1976). Inference and missing data. Biometrika, 63(3), 581–590.

In [None]:
df_total

In [None]:
df_total['gastos'].isna().sum()

In [None]:
df_total["gastos"] = df_total["gastos"].fillna(0)

In [None]:
df_total['gastos'].isna().sum()


2. Validar reglas básicas

In [None]:
df_total = df_total[df_total['salario'] > 0]
df_total = df_total[df_total['gastos'] >= 0]

* No puede haber salario negativo.

* Gastos no pueden ser negativos.
3. Estandarización


In [None]:

def estandarizar_fecha(valor):
    if pd.isnull(valor):
        return pd.NaT

    valor = str(valor).strip()

    # Caso 1: ISO estándar YYYY-MM-DD
    try:
        if "-" in valor and len(valor.split("-")[0]) == 4:
            return pd.to_datetime(valor, format="%Y-%m-%d", errors="coerce")
    except:
        pass

    # Caso 2: Formato europeo DD/MM/YYYY
    if "/" in valor:
        try:
            return pd.to_datetime(valor, format="%d/%m/%Y", errors="coerce")
        except:
            pass

    # Caso 3: Formato americano MM-DD-YYYY
    if "-" in valor:
        try:
            return pd.to_datetime(valor, format="%m-%d-%Y", errors="coerce")
        except:
            pass

    return pd.NaT


df_total["fecha_estandar"] = df_total["fecha"].apply(estandarizar_fecha)

In [None]:
df_total[['fecha','fecha_estandar']]# = pd.to_datetime(df_total['fecha'], errors='coerce')

In [None]:
df_total['fecha_estandar'].isna().sum()

In [None]:
df_total.drop(columns='fecha', inplace=True)

4. Seguridad y Cumplimiento (PII)
* Protegemos información sensible.

* Cumplimos regulación.

**Hasheo**
Aplicar una función matemática que convierte un dato en una cadena fija de caracteres (un código), de manera determinística.

Ejemplo:
```
usuario@universidad.edu
↓
3b1f7c4d9e8a...
```


Tiene longitud fija (ej: 64 caracteres en SHA-256).



In [None]:
import hashlib

def hash_correo(correo):
    if pd.isnull(correo):
        return None
    return hashlib.sha256(correo.encode("utf-8")).hexdigest()

df_total["correo_hash"] = df_total["correo"].apply(hash_correo)

# Eliminamos el correo original
df_total = df_total.drop(columns=["correo"])

In [None]:
df_total.head()

5. Derivación y enriquecimiento

In [None]:
df_total['utilidad'] = df_total['salario'] - df_total['gastos']

### CARGA (Load)

Ahora aplicamos schema-on-write.

Primero definimos estructura.

In [None]:
import duckdb

conn = duckdb.connect('dw.duckdb')

conn.execute("""
CREATE OR REPLACE TABLE fact_finanzas_etl (
    id INTEGER,
    salario DOUBLE,
    gastos DOUBLE,
    fecha DATE,
    utilidad DOUBLE,
    correo_hash STRING
)
""")
conn.execute("DELETE FROM fact_finanzas_etl")

In [None]:


print("Tablas en la base de datos:")
# Consultar las tablas existentes en la base de datos
# Utilizamos 'PRAGMA show_tables;' para DuckDB
# Si fuera SQLite, sería 'SELECT name FROM sqlite_master WHERE type='table';'
# O en DuckDB también se puede usar 'SHOW TABLES;'
tables_df = conn.execute("SHOW TABLES;").fetchdf()
display(tables_df)

# Si quieres ver el esquema de una tabla específica, por ejemplo, 'fact_finanzas_etl'
print("Esquema de la tabla fact_finanzas_etl:")
schema_etl_df = conn.execute("DESCRIBE fact_finanzas_etl;").fetchdf()
display(schema_etl_df)

# Si quieres ver algunas filas de una tabla, por ejemplo, 'fact_finanzas_etl'
print("Primeras 5 filas de fact_finanzas_etl:")
fact_finanzas_etl_df = conn.execute("SELECT * FROM fact_finanzas_etl LIMIT 5;").fetchdf()
display(fact_finanzas_etl_df)

# Cerrar la conexión cuando hayas terminado
conn.close()


In [None]:
conn = duckdb.connect('dw.duckdb')

conn.register("df_transformado", df_total)

conn.execute("""
INSERT INTO fact_finanzas_etl
SELECT id, salario, gastos, fecha_estandar, utilidad, correo_hash
FROM df_transformado
""")
df_total2=conn.execute("SELECT * FROM df_transformado LIMIT 5;").fetchdf()
display(df_total2)
conn.unregister("df_transformado")

conn.close()

## Bloque 4: ELT moderno (Extracción, Carga, Transformación)

En ELT, invertimos los pasos: cargamos primero los datos crudos al almacén y luego los transformamos dentro de él usando su motor (por ejemplo, SQL). Los pasos son:

ELT Moderno: El Cambio hacia la Flexibilidad y el Cómputo Elástico
En el paradigma ELT (Extracción, Carga, Transformación), se invierte el flujo tradicional para adaptarse a la era de la nube. A diferencia del ETL clásico, que depende de un servidor intermedio rígido (Schema-on-Write), el ELT adopta la filosofía de Schema-on-Read, cargando los datos brutos primero para decidir cómo estructurarlos después.
Las Fases del Proceso ELT
1. Extracción: Se recolectan los datos de fuentes diversas (APIs, bases de datos, logs) en su estado natural y sin procesar. En este paso, no se realiza ninguna limpieza profunda ni cambio de esquema para evitar la pérdida de información original.
2. Carga: Los datos se inyectan directamente en el destino final, ya sea un Data Warehouse (como BigQuery o Snowflake) o un Data Lake. Aquí se suelen organizar en una Arquitectura de Medallón, depositándose primero en una capa "Bronze" (cruda).
3. Transformación: Una vez que los datos residen en el destino, se ejecutan las consultas (usualmente en SQL) para limpiar, normalizar y enriquecer la información.
¿Por qué es el estándar actual?
*  Aprovechamiento del MPP: Se utiliza la Potencia de Procesamiento Paralelo Masivo de los almacenes de datos modernos, delegando el cómputo pesado a motores escalables en lugar de saturar la memoria de una aplicación cliente.
*  Agilidad y Democratización: Al tener los datos crudos disponibles de inmediato, diferentes equipos pueden aplicar sus propias transformaciones según sus necesidades específicas sin depender de procesos previos de TI.
*  Red de Seguridad: Mantener una copia fiel de los datos originales permite re-procesar la información meses después si se descubren errores en la lógica de transformación inicial.
•* Escalabilidad: Es el enfoque ideal para Big Data y datos no estructurados, permitiendo una ingesta de alta velocidad y baja latencia.


### Implementación en Python



### Extracción y Carga

In [None]:


conn = duckdb.connect("dw.duckdb")

# Crear tabla staging cargando CSV directamente
conn.execute("""
CREATE OR REPLACE TABLE staging_raw AS
SELECT *
FROM read_csv_auto('staging/*.csv')
""")

print("Datos crudos cargados en staging_raw")

In [None]:
conn.execute("""
CREATE OR REPLACE TABLE fact_finanzas_elt AS
SELECT
    id,

    -- Validación salario
    salario,

    -- Imputación gastos
    COALESCE(gastos, 0) AS gastos,

    -- Estandarización fecha
    CASE
        WHEN regexp_matches(fecha, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')
            THEN CAST(fecha AS DATE)
        WHEN regexp_matches(fecha, '^[0-9]{2}/[0-9]{2}/[0-9]{4}$')
            THEN STRPTIME(fecha, '%d/%m/%Y')
        WHEN regexp_matches(fecha, '^[0-9]{2}-[0-9]{2}-[0-9]{4}$')
            THEN STRPTIME(fecha, '%m-%d-%Y')
        ELSE NULL
    END AS fecha,

    -- Hash SHA256 dentro del motor
    CASE
        WHEN correo IS NOT NULL
            THEN sha256(correo)
        ELSE NULL
    END AS correo_hash,

    -- Regla de negocio
    salario - COALESCE(gastos, 0) AS utilidad

FROM staging_raw
WHERE
    id IS NOT NULL
    AND fecha IS NOT NULL
    AND salario > 0
    AND COALESCE(gastos, 0) >= 0
""")

print("Transformación ELT completada")

In [None]:
conn.execute("""
SELECT COUNT(*) FROM fact_finanzas_elt
""").fetchall()

## Bloque 5: Comparación de tiempos y conclusiones

Veamos un ejemplo práctico comparando ambos enfoques. Ejecutamos de nuevo ambos pipelines (ETL y ELT) para medir sus tiempos:

In [None]:

import time

# ==========================
# INICIO MEDICIÓN
# ==========================
start = time.time()

conn = duckdb.connect('dw.duckdb')
conn.execute("DROP TABLE IF EXISTS fact_finanzas_etl")

dataframes = []

# ==========================
# 1️ EXTRACCIÓN
# ==========================
for fname in os.listdir('staging'):
    if fname.endswith('.csv'):
        df = pd.read_csv(os.path.join('staging', fname))
        dataframes.append(df)

df_total = pd.concat(dataframes, ignore_index=True)

# ==========================
#  TRANSFORMACIÓN
# ==========================

#  Eliminación campos críticos
df_total = df_total.dropna(subset=['id', 'fecha'])

#  Imputación controlada
df_total['gastos'] = df_total['gastos'].fillna(0)

#  Validación estructural
df_total = df_total[df_total['salario'] > 0]
df_total = df_total[df_total['gastos'] >= 0]

#  Estandarización fecha
def estandarizar_fecha(valor):
    if pd.isnull(valor):
        return pd.NaT

    valor = str(valor).strip()

    if "-" in valor and len(valor.split("-")[0]) == 4:
        return pd.to_datetime(valor, format="%Y-%m-%d", errors="coerce")

    if "/" in valor:
        return pd.to_datetime(valor, format="%d/%m/%Y", errors="coerce")

    if "-" in valor:
        return pd.to_datetime(valor, format="%m-%d-%Y", errors="coerce")

    return pd.NaT

df_total['fecha_estandar'] = df_total['fecha'].apply(estandarizar_fecha)

# Eliminar fechas inválidas
df_total = df_total.dropna(subset=['fecha_estandar'])

#  Hash SHA256 del correo
def hash_correo(correo):
    if pd.isnull(correo):
        return None
    return hashlib.sha256(correo.encode("utf-8")).hexdigest()

df_total['correo_hash'] = df_total['correo'].apply(hash_correo)

# Eliminar columnas originales sensibles
df_total = df_total.drop(columns=['correo', 'fecha'])

#  Regla de negocio
df_total['utilidad'] = df_total['salario'] - df_total['gastos']

#  Deduplicación global
df_total = df_total.drop_duplicates()

# ==========================
# 3️ CARGA
# ==========================

conn.register("df_etl", df_total)

conn.execute("""
CREATE TABLE fact_finanzas_etl AS
SELECT
    id,
    salario,
    gastos,
    fecha_estandar AS fecha,
    utilidad,
    correo_hash
FROM df_etl
""")

conn.unregister("df_etl")
conn.close()

# ==========================
# FIN MEDICIÓN
# ==========================
etl_time = time.time() - start
print("Tiempo ETL completo:", round(etl_time, 3), "segundos")

In [None]:

start = time.time()

conn = duckdb.connect('dw.duckdb')

# Limpieza previa
conn.execute("DROP TABLE IF EXISTS staging_raw")
conn.execute("DROP TABLE IF EXISTS fact_finanzas_elt")

# ==========================
# 1 LOAD (Extract + Load)
# ==========================

conn.execute("""
CREATE TABLE staging_raw AS
SELECT *
FROM read_csv_auto('staging/*.csv')
""")

# ==========================
# TRANSFORM (Dentro del motor)
# ==========================

conn.execute("""
CREATE TABLE fact_finanzas_elt AS
SELECT DISTINCT
    id,

    -- Validación salario
    salario,

    -- Imputación gastos
    COALESCE(gastos, 0) AS gastos,

    -- Estandarización fecha
    CASE
        WHEN regexp_matches(fecha, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')
            THEN CAST(fecha AS DATE)
        WHEN regexp_matches(fecha, '^[0-9]{2}/[0-9]{2}/[0-9]{4}$')
            THEN STRPTIME(fecha, '%d/%m/%Y')
        WHEN regexp_matches(fecha, '^[0-9]{2}-[0-9]{2}-[0-9]{4}$')
            THEN STRPTIME(fecha, '%m-%d-%Y')
        ELSE NULL
    END AS fecha,

    -- Hash SHA256
    CASE
        WHEN correo IS NOT NULL
            THEN sha256(correo)
        ELSE NULL
    END AS correo_hash,

    -- Regla de negocio
    salario - COALESCE(gastos, 0) AS utilidad

FROM staging_raw
WHERE
    id IS NOT NULL
    AND fecha IS NOT NULL
    AND salario > 0
    AND COALESCE(gastos, 0) >= 0
""")

elt_time = time.time() - start

print("Tiempo ELT completo:", round(elt_time, 3), "segundos")

|Categoría|ETL (Tradicional)|ELT (Moderno)|
|---|---|---|
|Orden de operaciones|Extrae, transforma en un servidor intermedio y luego carga.|Extrae, carga los datos en bruto y luego transforma en el destino.|
|Arquitectura|Schema-on-Write: El esquema debe definirse meticulosamente antes de la carga.|Schema-on-Read: Los datos se guardan en bruto; el esquema se aplica al usarlos.|
|Ubicación de cómputo|Servidor de procesamiento secundario o motor de ETL externo.|Dentro del almacén de datos (Data Warehouse) o Data Lake.|
|Tipos de datos|Ideal para datos estructurados (tablas).|Maneja datos estructurados, semiestructurados y no estructurados (JSON, logs, imágenes).
|Escalabilidad|Limitada por el servidor de procesamiento (escalado vertical costoso).|Altamente escalable gracias al procesamiento paralelo masivo (MPP) de la nube.

 #### Rendimiento y Costos
* Velocidad: El ELT es generalmente más rápido al cargar grandes volúmenes de datos porque elimina la fase de transformación previa, permitiendo que el sistema de destino procese los datos en paralelo. El ETL puede convertirse en un cuello de botella al procesar registros uno a uno antes de moverlos.
* Costos: El ETL requiere una inversión mayor en infraestructura de servidores dedicada y planificación detallada. El ELT aprovecha la elasticidad de la nube (pago por uso) y requiere menos sistemas que mantener, lo que simplifica la pila de datos.
#### Seguridad y Cumplimiento
* ETL: Es el método preferido para industrias con regulaciones estrictas (como finanzas o salud). Permite limpiar, anonimizar o enmascarar información sensible (PII) antes de que toque el almacenamiento persistente.
* ELT: Almacenar datos en bruto requiere un gobierno de datos robusto. Sin embargo, ofrece características de seguridad integradas en los almacenes modernos, como control de acceso granular y auditorías de linaje de datos.

--------------------------------------------------------------------------------
#### ¿Cuándo usar cada uno?
Elige ETL cuando:
* Trabajes con bases de datos heredadas (on-premise) con potencia de procesamiento limitada.
* Necesites cumplir con normativas estrictas de privacidad (GDPR, HIPAA) mediante limpieza previa.
* El volumen de datos sea pequeño o mediano y altamente predecible.

Elige ELT cuando:
*  Utilices arquitecturas nativas de la nube como Snowflake, BigQuery o Amazon Redshift.
* Manejes Big Data o datos de fuentes cambiantes (sensores de IoT, redes sociales).
* Desees mantener una copia fiel de los datos originales (Bronze Layer) para permitir futuros análisis o re-procesamientos sin volver a extraer la información.

En la ingeniería de datos contemporánea, la tendencia es hacia un modelo híbrido que prioriza el ELT para la agilidad analítica, mientras utiliza procesos de ETL específicos para la ingesta de datos sensibles o la integración con sistemas antiguos.

## Bloque 6: Orquestación de flujos de trabajo


## ¿Qué es la Orquestación de Flujos de Trabajo?

Si el ETL/ELT es el proceso de transformar materias primas en productos, la orquestación es el "director de la orquesta" que asegura que cada instrumento toque en el momento justo y en armonía. Mientras que la automatización se limita a ejecutar una tarea individual, la orquestación gestiona la secuencia, la lógica de decisión y el intercambio de datos entre sistemas heterogéneos.

### Las 4 Funciones Críticas de un Orquestador

1.  **Programación (Scheduling):** Automatiza la ejecución basada en intervalos de tiempo (cron), eventos o disparadores externos.
2.  **Gestión de Dependencias:** Garantiza que la "Tarea B" (ej. transformar) solo inicie si la "Tarea A" (ej. extraer) finalizó con éxito. Se visualiza comúnmente como un DAG (Grafo Acíclico Dirigido), que representa el flujo de trabajo sin bucles.
3.  **Manejo de Errores y Reintentos:** Implementa lógicas de recuperación automática (retries) ante fallos temporales (como una caída de red) y genera alertas proactivas.
4.  **Observabilidad:** Proporciona tableros visuales para monitorear la salud de las tuberías en tiempo real, permitiendo identificar cuellos de botella rápidamente.

### Herramientas Líderes en el Mercado (2025-2026)

Dependiendo del entorno y la experiencia del equipo, existen varias opciones:

*   **Apache Airflow:** El estándar de la industria. Permite definir flujos mediante código Python puro, lo que ofrece flexibilidad total para tuberías complejas y de gran escala.
*   **Mage.ai:** Una alternativa moderna con un enfoque híbrido. Combina la interactividad de los notebooks con la robustez de una herramienta de producción, facilitando la visualización de datos en cada bloque de código.
*   **Prefect:** Diseñado para la simplicidad, utiliza decoradores de Python para convertir funciones estándar en tareas orquestadas, ideal para flujos dinámicos.
*   **Dagster:** Se centra en los activos de datos (tablas resultantes) en lugar de solo en las tareas, priorizando el linaje y la calidad del dato.

--------------------------------------------------------------------------------

### Idempotencia

En un entorno profesional, es imperativo que las tareas sean **idempotentes**. Esto significa que si una tarea se ejecuta varias veces con la misma entrada, el resultado final debe ser siempre el mismo.

*   **Por qué es vital:** Si una carga de datos falla a la mitad y el orquestador la reintenta, la idempotencia evita que se dupliquen registros o se corrompa la base de datos.
*   **Patrón común:** Usar la lógica de "Eliminar e Insertar" (Delete + Insert) para el periodo específico de tiempo que se está procesando.

In [None]:
!pip install apache-airflow --quiet


## Airflow

1. ¿Qué es Apache Airflow?
Apache Airflow es la plataforma estándar de la industria para orquestar flujos de trabajo de datos. Su filosofía es "Workflows as Code" (flujos como código), lo que permite definir procesos mediante Python, facilitando el control de versiones, pruebas y colaboración.
* DAG (Directed Acyclic Graph): Es el plano de tu flujo. Es una colección de tareas organizadas para definir su orden de ejecución y dependencias.
* Operadores: Plantillas que definen qué hace cada tarea (ej. ejecutar un script de Python, una consulta SQL o un comando de terminal).
* Tareas: La instancia real de un operador dentro de un DAG.

--------------------------------------------------------------------------------
2. Tutorial: Creando tu primer Pipeline (TaskFlow API)
La forma más moderna y "Pythonica" de escribir DAGs es mediante la TaskFlow API (introducida en la versión 2.0), que utiliza decoradores para simplificar el código.

La orquestación integra múltiples tareas automatizadas para completar un proceso de extremo a extremo, asegurando que los datos se compartan efectivamente y se activen en el orden correcto.

* @dag (Decorador de Grafo Acíclico Dirigido):
    >* Qué hace: Define el contenedor principal del flujo de trabajo. Establece la frecuencia de ejecución (schedule), la fecha de inicio y las propiedades globales del pipeline.
    >* Cómo funciona: Airflow lee este decorador para registrar el proceso en su planificador (Scheduler), permitiendo que el flujo sea visible y programable en la interfaz web.

* @task (Decorador de Tarea):
    >* Qué hace: Convierte una función de Python estándar en una unidad de trabajo atómica.
    >* Cómo funciona: Cada función marcada con @task se convierte en un nodo dentro del DAG. Airflow gestiona los reintentos (retries) y el aislamiento de esta tarea; si falla, se puede reintentar sin reiniciar todo el pipeline.

* Sensor (Operador Especializado):
    >* Qué hace: Es un tipo de operador que espera una condición externa antes de permitir que el flujo continúe.
    >* Cómo funciona: Por ejemplo, un sensor puede monitorear la llegada de un archivo a un bucket de S3 o la finalización de un proceso en una base de datos externa antes de activar la siguiente tarea.

* .expand() y .partial() (Mapeo Dinámico):
    >* Qué hace: Permite que una tarea se ejecute múltiples veces en paralelo, una por cada entrada de una lista.
    >* Cómo funciona: partial() define los parámetros que permanecen constantes (como credenciales), mientras que expand() crea n copias de la tarea para procesar archivos o modelos de forma concurrente.

3. Arquitectura y Ejecución
Para ejecutar Airflow, el sistema se apoya en componentes clave que interactúan de forma asíncrona:
> 1. Scheduler (Planificador): El cerebro que monitorea los DAGs y activa las tareas cuando sus dependencias se cumplen.
>2. Webserver: La interfaz de usuario para monitorear estados, ver logs y activar tareas manualmente.
>3. Metadata Database: Donde se guarda el historial de ejecuciones y estados (usualmente PostgreSQL o MySQL).


In [None]:
from datetime import datetime, timedelta
from airflow.decorators import dag, task # Revertido a airflow.decorators
import duckdb
import time


default_args = {
    "owner": "data_engineer",
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
}


@dag(
    dag_id="elt_duckdb_pipeline",
    default_args=default_args,
    schedule="@daily", # Cambiado de schedule_interval a schedule
    start_date=datetime(2025, 1, 1),
    catchup=False,
    description="Pipeline ELT con DuckDB orquestado por Airflow",
)
def elt_pipeline():

    @task()
    def limpiar_tablas():
        conn = duckdb.connect("dw.duckdb")
        conn.execute("DROP TABLE IF EXISTS staging_raw")
        conn.execute("DROP TABLE IF EXISTS fact_finanzas_elt2")
        conn.close()
        return "Tablas limpiadas"

    @task()
    def cargar_staging(mensaje):
        conn = duckdb.connect("dw.duckdb")
        conn.execute("""
            CREATE TABLE staging_raw AS
            SELECT *
            FROM read_csv_auto('staging/*.csv')
        """)
        conn.close()
        return "Staging cargado"

    @task()
    def transformar_datos(mensaje):
        conn = duckdb.connect("dw.duckdb")
        conn.execute("""
            CREATE TABLE fact_finanzas_elt AS
            SELECT DISTINCT
                id,
                salario,
                COALESCE(gastos, 0) AS gastos,
                CASE
                    WHEN regexp_matches(fecha, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')
                        THEN CAST(fecha AS DATE)
                    WHEN regexp_matches(fecha, '^[0-9]{2}/[0-9]{2}/[0-9]{4}$')
                        THEN STRPTIME(fecha, '%d/%m/%Y')
                    WHEN regexp_matches(fecha, '^[0-9]{2}-[0-9]{2}-[0-9]{4}$')
                        THEN STRPTIME(fecha, '%m-%d-%Y')
                    ELSE NULL
                END AS fecha,
                CASE
                    WHEN correo IS NOT NULL
                        THEN sha256(correo)
                    ELSE NULL
                END AS correo_hash,
                salario - COALESCE(gastos, 0) AS utilidad
            FROM staging_raw
            WHERE
                id IS NOT NULL
                AND fecha IS NOT NULL
                AND salario > 0
                AND COALESCE(gastos, 0) >= 0
        """)
        conn.close()
        return "Transformación completada"

    @task()
    def medir_tiempo():
        # Esta tarea solo muestra que podemos agregar lógica adicional
        return f"Ejecución completada en {datetime.now()}"

    limpieza = limpiar_tablas()
    staging = cargar_staging(limpieza)
    transformacion = transformar_datos(staging)
    medir_tiempo()


elt_dag = elt_pipeline()

In [None]:
print(elt_dag)
print(type(elt_dag))

### Referencias

Rubin, D. B. (1976). Inference and missing data. *Biometrika*, *63*(3), 581–590.

Software Foundation. (s.f.). *Architecture Overview — Airflow 3.1.7 Documentation*. Apachecuperado de [https://airflow.apache.org/].