# Clase Completa de Apache Airflow

## Introducci√≥n

Apache Airflow es una plataforma de c√≥digo abierto para programar, orquestar y monitorear flujos de trabajo (workflows) de forma program√°tica.

### ¬øQu√© aprenderemos?
1. Conceptos b√°sicos de Airflow
2. Instalaci√≥n y configuraci√≥n
3. DAGs (Directed Acyclic Graphs)
4. Operadores y tareas
5. Dependencias y ejecuci√≥n
6. Ejemplos pr√°cticos

## 1. Instalaci√≥n de Apache Airflow

Primero instalamos Airflow. Es recomendable usar un entorno virtual.

In [None]:
# ============================================
# INSTALACI√ìN DE APACHE AIRFLOW
# ============================================
# NOTA: Ejecutar esto solo una vez para instalar las dependencias

# Importamos sys para obtener la ruta del int√©rprete de Python actual
import sys

# Instalamos Apache Airflow versi√≥n 2.7.3
# !{sys.executable} ejecuta pip con el Python actual del notebook
!{sys.executable} -m pip install apache-airflow==2.7.3

# Instalamos el provider de HTTP para hacer requests a APIs
!{sys.executable} -m pip install apache-airflow-providers-http

## 2. Conceptos Fundamentales

### DAG (Directed Acyclic Graph)
- Es un grafo dirigido sin ciclos que representa un flujo de trabajo
- Contiene tareas y sus dependencias

### Tarea (Task)
- Unidad b√°sica de trabajo en Airflow
- Se define usando operadores

### Operador (Operator)
- Template para una tarea
- Define QU√â se ejecutar√°

### Scheduler
- Componente que programa la ejecuci√≥n de los DAGs

## 3. Configuraci√≥n Inicial

In [None]:
# ============================================
# IMPORTAR LIBRER√çAS NECESARIAS
# ============================================

# datetime y timedelta: Para manejar fechas y duraciones de tiempo
from datetime import datetime, timedelta

# DAG: Clase principal para definir un flujo de trabajo en Airflow
from airflow import DAG

# PythonOperator: Ejecuta funciones Python como tareas
from airflow.operators.python import PythonOperator

# BashOperator: Ejecuta comandos de l√≠nea de comandos (bash/shell)
from airflow.operators.bash import BashOperator

# DummyOperator: Tarea vac√≠a que no hace nada, √∫til para organizar el flujo
from airflow.operators.dummy import DummyOperator

print("Librer√≠as importadas correctamente")

## 4. Argumentos por Defecto

Los argumentos por defecto se aplican a todas las tareas del DAG si no se especifica lo contrario.

In [None]:
# ============================================
# ARGUMENTOS POR DEFECTO DEL DAG
# ============================================
# Estos par√°metros se aplican a TODAS las tareas del DAG autom√°ticamente

default_args = {
    # 'owner': Propietario del DAG (nombre del equipo o persona)
    'owner': 'airflow',
    
    # 'depends_on_past': Si True, la tarea solo se ejecuta si la anterior fue exitosa
    'depends_on_past': False,
    
    # 'start_date': Fecha desde la cual el DAG est√° activo
    'start_date': datetime(2024, 1, 1),
    
    # 'email': Lista de correos para notificaciones
    'email': ['admin@ejemplo.com'],
    
    # 'email_on_failure': Enviar email cuando una tarea falla
    'email_on_failure': False,
    
    # 'email_on_retry': Enviar email cuando una tarea se reintenta
    'email_on_retry': False,
    
    # 'retries': N√∫mero de reintentos si una tarea falla
    'retries': 1,
    
    # 'retry_delay': Tiempo de espera entre reintentos
    'retry_delay': timedelta(minutes=5)
}

# Mostrar la configuraci√≥n
print("Argumentos por defecto configurados:")
for key, value in default_args.items():
    print(f"  {key}: {value}")

## 5. Crear tu Primer DAG

Vamos a crear un DAG simple que ejecuta varias tareas.

In [None]:
# ============================================
# CREAR EL PRIMER DAG
# ============================================

# Instanciamos un objeto DAG con sus par√°metros
dag = DAG(
    # 'dag_id': Identificador √∫nico del DAG (debe ser √∫nico en Airflow)
    'mi_primer_dag',
    
    # Heredamos los argumentos por defecto definidos anteriormente
    default_args=default_args,
    
    # Descripci√≥n que aparecer√° en la UI de Airflow
    description='Un DAG simple de ejemplo',
    
    # 'schedule_interval': Frecuencia de ejecuci√≥n (cada 1 d√≠a en este caso)
    schedule_interval=timedelta(days=1),
    
    # 'catchup': Si False, no ejecuta las ejecuciones pasadas al activar el DAG
    catchup=False,
    
    # 'tags': Etiquetas para organizar y filtrar DAGs en la UI
    tags=['ejemplo', 'tutorial']
)

# Verificar que el DAG se cre√≥ correctamente
print(f"DAG creado: {dag.dag_id}")
print(f"Schedule: {dag.schedule_interval}")

## 6. Funciones Python para las Tareas

Definimos funciones que ser√°n ejecutadas por nuestras tareas.

In [None]:
# ============================================
# DEFINIR FUNCIONES PARA LAS TAREAS
# ============================================
# Estas funciones ser√°n ejecutadas por los PythonOperators

# Funci√≥n 1: Extraer datos (fase Extract del ETL)
def extraer_datos(**context):
    """Simula la extracci√≥n de datos de una fuente"""
    print("Extrayendo datos...")
    
    # Simulamos datos extra√≠dos de una base de datos o API
    datos = {'usuarios': 150, 'ventas': 3500, 'productos': 45}
    print(f"Datos extra√≠dos: {datos}")
    
    # El valor retornado se guarda autom√°ticamente en XCom
    return datos


# Funci√≥n 2: Transformar datos (fase Transform del ETL)
def transformar_datos(**context):
    """Simula la transformaci√≥n de datos"""
    print("Transformando datos...")
    
    # Aqu√≠ aplicar√≠amos las transformaciones necesarias
    datos_transformados = {
        'total_usuarios': 150,
        'promedio_ventas': 23.33,
        'categoria': 'productos_electronicos'
    }
    print(f"Datos transformados: {datos_transformados}")
    
    return datos_transformados


# Funci√≥n 3: Cargar datos (fase Load del ETL)
def cargar_datos(**context):
    """Simula la carga de datos en un destino"""
    print("Cargando datos en el destino...")
    print("Datos cargados exitosamente en la base de datos")
    
    return "Carga completada"


# Funci√≥n 4: Enviar notificaci√≥n
def enviar_notificacion(**context):
    """Env√≠a una notificaci√≥n al finalizar el proceso"""
    print("Enviando notificaci√≥n...")
    print("Notificaci√≥n enviada: Pipeline ETL completado exitosamente")


print("Funciones definidas correctamente")

## 7. Crear Tareas con Operadores

### PythonOperator
Ejecuta funciones Python

In [None]:
# ============================================
# CREAR TAREAS CON PYTHONOPERATOR
# ============================================

# Tarea 1: Tarea de inicio (dummy - no hace nada, solo marca el inicio)
inicio = DummyOperator(
    task_id='inicio',  # ID √∫nico de la tarea
    dag=dag            # DAG al que pertenece esta tarea
)

# Tarea 2: Extracci√≥n de datos
tarea_extraer = PythonOperator(
    task_id='extraer_datos',           # ID √∫nico de la tarea
    python_callable=extraer_datos,     # Funci√≥n Python que ejecutar√°
    provide_context=True,              # Proporciona el contexto de Airflow a la funci√≥n
    dag=dag                            # DAG al que pertenece
)

# Tarea 3: Transformaci√≥n de datos
tarea_transformar = PythonOperator(
    task_id='transformar_datos',       # ID √∫nico de la tarea
    python_callable=transformar_datos, # Funci√≥n a ejecutar
    provide_context=True,              # Permite acceder a XCom y otros datos del contexto
    dag=dag
)

# Tarea 4: Carga de datos
tarea_cargar = PythonOperator(
    task_id='cargar_datos',
    python_callable=cargar_datos,
    provide_context=True,
    dag=dag
)

print("Tareas Python creadas")

### BashOperator
Ejecuta comandos Bash

In [None]:
# ============================================
# CREAR TAREAS CON BASHOPERATOR
# ============================================

# Tarea con BashOperator: Ejecuta un comando de shell
tarea_bash = BashOperator(
    task_id='verificar_sistema',                        # ID de la tarea
    bash_command='echo "Sistema verificado en $(date)"', # Comando bash a ejecutar
    dag=dag
)

# Tarea de notificaci√≥n usando PythonOperator
tarea_notificar = PythonOperator(
    task_id='enviar_notificacion',
    python_callable=enviar_notificacion,
    provide_context=True,
    dag=dag
)

# Tarea final (dummy - marca el fin del DAG)
fin = DummyOperator(
    task_id='fin',
    dag=dag
)

print("Todas las tareas creadas")

## 8. Definir Dependencias entre Tareas

Las dependencias determinan el orden de ejecuci√≥n de las tareas.

### M√©todos para definir dependencias:
- `tarea1 >> tarea2` (tarea1 antes de tarea2)
- `tarea1 << tarea2` (tarea2 antes de tarea1)
- `set_upstream()` y `set_downstream()`

In [None]:
# ============================================
# DEFINIR DEPENDENCIAS ENTRE TAREAS
# ============================================
# El operador '>>' significa "ejecutar antes que"
# Esto crea el flujo de ejecuci√≥n del DAG

# Flujo completo del pipeline:
# 1. inicio -> 2. verificar_sistema -> 3. extraer_datos -> 
# 4. transformar_datos -> 5. cargar_datos -> 6. enviar_notificacion -> 7. fin

inicio >> tarea_bash >> tarea_extraer >> tarea_transformar >> tarea_cargar >> tarea_notificar >> fin

print("Dependencias configuradas:")
print("inicio -> verificar_sistema -> extraer_datos -> transformar_datos -> cargar_datos -> enviar_notificacion -> fin")

## 9. Ejemplo de DAG con Paralelizaci√≥n

Vamos a crear un DAG donde algunas tareas se ejecutan en paralelo.

In [None]:
# ============================================
# DAG CON TAREAS PARALELAS
# ============================================
# Este ejemplo muestra c√≥mo ejecutar varias tareas al mismo tiempo

# Crear un nuevo DAG para demostrar paralelizaci√≥n
dag_paralelo = DAG(
    'dag_con_tareas_paralelas',
    default_args=default_args,
    description='DAG con ejecuci√≥n paralela',
    schedule_interval='@daily',  # Se ejecuta diariamente
    catchup=False
)


# Funci√≥n 1: Procesar datos de la fuente A
def procesar_fuente_a(**context):
    """Procesa datos de la primera fuente"""
    print("Procesando datos de la fuente A")
    return "Fuente A procesada"


# Funci√≥n 2: Procesar datos de la fuente B
def procesar_fuente_b(**context):
    """Procesa datos de la segunda fuente"""
    print("Procesando datos de la fuente B")
    return "Fuente B procesada"


# Funci√≥n 3: Procesar datos de la fuente C
def procesar_fuente_c(**context):
    """Procesa datos de la tercera fuente"""
    print("Procesando datos de la fuente C")
    return "Fuente C procesada"


# Funci√≥n 4: Consolidar todos los resultados
def consolidar_datos(**context):
    """Consolida los resultados de todas las fuentes"""
    print("Consolidando todas las fuentes")
    return "Consolidaci√≥n completada"


print("DAG paralelo creado")

In [None]:
# ============================================
# CREAR Y CONFIGURAR TAREAS PARALELAS
# ============================================

# Tarea inicial
inicio_paralelo = DummyOperator(task_id='inicio', dag=dag_paralelo)

# Crear tres tareas que se ejecutar√°n EN PARALELO
tarea_fuente_a = PythonOperator(
    task_id='procesar_fuente_a',
    python_callable=procesar_fuente_a,
    dag=dag_paralelo
)

tarea_fuente_b = PythonOperator(
    task_id='procesar_fuente_b',
    python_callable=procesar_fuente_b,
    dag=dag_paralelo
)

tarea_fuente_c = PythonOperator(
    task_id='procesar_fuente_c',
    python_callable=procesar_fuente_c,
    dag=dag_paralelo
)

# Tarea que consolida los resultados (espera a que terminen las 3 anteriores)
tarea_consolidar = PythonOperator(
    task_id='consolidar_datos',
    python_callable=consolidar_datos,
    dag=dag_paralelo
)

# Tarea final
fin_paralelo = DummyOperator(task_id='fin', dag=dag_paralelo)

# ============================================
# DEFINIR DEPENDENCIAS PARALELAS
# ============================================
# La sintaxis [tarea1, tarea2, tarea3] indica que todas se ejecutan en paralelo
# inicio -> [fuente_a, fuente_b, fuente_c] -> consolidar -> fin

inicio_paralelo >> [tarea_fuente_a, tarea_fuente_b, tarea_fuente_c] >> tarea_consolidar >> fin_paralelo

print("Tareas paralelas configuradas")
print("Las tareas A, B y C se ejecutar√°n en paralelo")

## 10. XCom - Compartir Datos entre Tareas

XCom (cross-communication) permite que las tareas compartan peque√±as cantidades de datos.

In [None]:
# ============================================
# XCOM - COMPARTIR DATOS ENTRE TAREAS
# ============================================
# XCom (Cross-Communication) permite pasar datos entre tareas

# Crear DAG para demostrar XCom
dag_xcom = DAG(
    'ejemplo_xcom',
    default_args=default_args,
    description='Ejemplo de XCom',
    schedule_interval=None,  # Ejecuci√≥n manual solamente
    catchup=False
)


def generar_numero(**context):
    """Genera un n√∫mero aleatorio y lo guarda en XCom"""
    import random
    
    # Generamos un n√∫mero aleatorio
    numero = random.randint(1, 100)
    print(f"N√∫mero generado: {numero}")
    
    # Al hacer 'return', el valor se guarda AUTOM√ÅTICAMENTE en XCom
    # Otros tasks podr√°n recuperarlo usando xcom_pull()
    return numero


def procesar_numero(**context):
    """Recupera el n√∫mero de XCom y lo procesa"""
    
    # 'ti' (Task Instance) contiene m√©todos para interactuar con XCom
    ti = context['ti']
    
    # xcom_pull() recupera el valor guardado por la tarea 'generar_numero'
    numero = ti.xcom_pull(task_ids='generar_numero')
    
    # Procesamos el n√∫mero (en este caso, lo multiplicamos por 2)
    resultado = numero * 2
    
    print(f"N√∫mero recibido: {numero}")
    print(f"Resultado del procesamiento: {resultado}")
    
    return resultado


# Crear las tareas
tarea_generar = PythonOperator(
    task_id='generar_numero',
    python_callable=generar_numero,
    dag=dag_xcom
)

tarea_procesar = PythonOperator(
    task_id='procesar_numero',
    python_callable=procesar_numero,
    dag=dag_xcom
)

# Definir dependencia: primero generar, luego procesar
tarea_generar >> tarea_procesar

print("DAG con XCom configurado")

## 11. Programaci√≥n de DAGs (Schedule Interval)

### Opciones de programaci√≥n:
- `None`: Ejecuci√≥n manual
- `@once`: Una sola vez
- `@hourly`: Cada hora
- `@daily`: Diario
- `@weekly`: Semanal
- `@monthly`: Mensual
- Expresi√≥n cron: `'0 0 * * *'`
- `timedelta`: `timedelta(hours=2)`

In [None]:
# ============================================
# SCHEDULE INTERVAL - PROGRAMACI√ìN DE DAGS
# ============================================
# Diferentes formas de programar cu√°ndo se ejecuta un DAG

from datetime import timedelta

# Diccionario con ejemplos de diferentes programaciones
ejemplos_schedule = {
    # None: El DAG solo se ejecuta manualmente
    'Manual': None,
    
    # @once: Se ejecuta una sola vez
    'Una vez': '@once',
    
    # @hourly: Se ejecuta cada hora (equivale a '0 * * * *')
    'Cada hora': '@hourly',
    
    # @daily: Se ejecuta diariamente a medianoche (equivale a '0 0 * * *')
    'Diario a medianoche': '@daily',
    
    # @weekly: Se ejecuta semanalmente (domingos a medianoche)
    'Semanal': '@weekly',
    
    # timedelta: Ejecutar cada X tiempo desde la √∫ltima ejecuci√≥n
    'Cada 2 horas': timedelta(hours=2),
    
    # Expresi√≥n CRON: '0 9 * * *' = todos los d√≠as a las 9 AM
    # Formato: minuto hora d√≠a_mes mes d√≠a_semana
    'Diario a las 9 AM': '0 9 * * *',
    
    # '0 8 * * 1-5' = Lunes a Viernes (1-5) a las 8 AM
    'Lunes a Viernes a las 8 AM': '0 8 * * 1-5'
}

print("Ejemplos de Schedule Interval:")
for nombre, schedule in ejemplos_schedule.items():
    print(f"  {nombre}: {schedule}")

## 12. Caso Pr√°ctico: Pipeline ETL Completo

Vamos a crear un pipeline ETL real que simula:
1. Extracci√≥n de datos de una API
2. Validaci√≥n de datos
3. Transformaci√≥n
4. Carga en base de datos
5. Generaci√≥n de reporte

In [None]:
# ============================================
# PIPELINE ETL COMPLETO - FUNCIONES
# ============================================

import json
from datetime import datetime

# Crear DAG para un pipeline ETL completo
dag_etl = DAG(
    'pipeline_etl_completo',
    default_args=default_args,
    description='Pipeline ETL completo con validaciones',
    schedule_interval='0 2 * * *',  # Ejecutar diariamente a las 2 AM (expresi√≥n CRON)
    catchup=False,
    tags=['etl', 'produccion']
)


def extraer_de_api(**context):
    """Extrae datos de una API (simulado)"""
    print("Conectando a la API...")
    
    # Simulaci√≥n de respuesta de una API con datos de productos
    datos = [
        {'id': 1, 'nombre': 'Producto A', 'precio': 100, 'stock': 50},
        {'id': 2, 'nombre': 'Producto B', 'precio': 200, 'stock': 30},
        {'id': 3, 'nombre': 'Producto C', 'precio': 150, 'stock': 0},
        {'id': 4, 'nombre': 'Producto D', 'precio': 300, 'stock': 20}
    ]
    print(f"Extra√≠dos {len(datos)} registros")
    
    # Retornar datos para que est√©n disponibles en XCom
    return datos


def validar_datos(**context):
    """Valida que los datos cumplan con las reglas de negocio"""
    # Obtener datos de la tarea anterior usando XCom
    ti = context['ti']
    datos = ti.xcom_pull(task_ids='extraer_de_api')
    
    datos_validos = []
    datos_invalidos = []
    
    # Validar cada registro seg√∫n reglas de negocio
    for item in datos:
        # Regla: precio debe ser positivo y stock no negativo
        if item['precio'] > 0 and item['stock'] >= 0:
            datos_validos.append(item)
        else:
            datos_invalidos.append(item)
    
    print(f"Datos v√°lidos: {len(datos_validos)}")
    print(f"Datos inv√°lidos: {len(datos_invalidos)}")
    
    # Retornar solo los datos v√°lidos
    return datos_validos


def transformar_etl(**context):
    """Transforma los datos seg√∫n los requisitos del negocio"""
    # Recuperar datos validados de la tarea anterior
    ti = context['ti']
    datos = ti.xcom_pull(task_ids='validar_datos')
    
    datos_transformados = []
    
    # Aplicar transformaciones a cada registro
    for item in datos:
        transformado = {
            # Mantener ID
            'producto_id': item['id'],
            
            # Convertir nombre a may√∫sculas
            'nombre_producto': item['nombre'].upper(),
            
            # Calcular precio con IVA (21%)
            'precio_con_iva': round(item['precio'] * 1.21, 2),
            
            # Agregar campo booleano de disponibilidad
            'disponible': item['stock'] > 0,
            
            # Categorizar seg√∫n stock
            'categoria': 'En Stock' if item['stock'] > 0 else 'Agotado',
            
            # Agregar timestamp del procesamiento
            'fecha_proceso': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        datos_transformados.append(transformado)
    
    print(f"Transformados {len(datos_transformados)} registros")
    return datos_transformados


def cargar_en_bd(**context):
    """Simula la carga de datos en una base de datos"""
    # Recuperar datos transformados
    ti = context['ti']
    datos = ti.xcom_pull(task_ids='transformar_etl')
    
    print("Conectando a la base de datos...")
    print(f"Insertando {len(datos)} registros...")
    
    # Simular inserciones en la base de datos
    for item in datos:
        print(f"  INSERT: {item['nombre_producto']} - ${item['precio_con_iva']}")
    
    print("Carga completada exitosamente")
    
    # Retornar cantidad de registros cargados
    return len(datos)


def generar_reporte(**context):
    """Genera un reporte resumen del proceso ETL"""
    # Recuperar informaci√≥n de la tarea de carga
    ti = context['ti']
    registros_cargados = ti.xcom_pull(task_ids='cargar_en_bd')
    
    # Crear reporte formateado
    reporte = f"""
    ========================================
    REPORTE ETL - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
    ========================================
    Registros procesados: {registros_cargados}
    Estado: COMPLETADO
    Pipeline: pipeline_etl_completo
    ========================================
    """
    print(reporte)
    
    return reporte


print("Funciones ETL definidas")

In [None]:
# ============================================
# PIPELINE ETL COMPLETO - CREAR TAREAS
# ============================================

# Tarea 1: Inicio del pipeline
inicio_etl = DummyOperator(task_id='inicio', dag=dag_etl)

# Tarea 2: Extracci√≥n de datos desde API
extraer_api = PythonOperator(
    task_id='extraer_de_api',
    python_callable=extraer_de_api,
    dag=dag_etl
)

# Tarea 3: Validaci√≥n de datos
validar = PythonOperator(
    task_id='validar_datos',
    python_callable=validar_datos,
    dag=dag_etl
)

# Tarea 4: Transformaci√≥n de datos
transformar = PythonOperator(
    task_id='transformar_etl',
    python_callable=transformar_etl,
    dag=dag_etl
)

# Tarea 5: Carga en base de datos
cargar = PythonOperator(
    task_id='cargar_en_bd',
    python_callable=cargar_en_bd,
    dag=dag_etl
)

# Tarea 6: Generar reporte del proceso
reporte = PythonOperator(
    task_id='generar_reporte',
    python_callable=generar_reporte,
    dag=dag_etl
)

# Tarea 7: Fin del pipeline
fin_etl = DummyOperator(task_id='fin', dag=dag_etl)

# ============================================
# DEFINIR FLUJO DEL PIPELINE ETL
# ============================================
# Flujo secuencial completo del ETL:
# inicio -> extraer -> validar -> transformar -> cargar -> reporte -> fin

inicio_etl >> extraer_api >> validar >> transformar >> cargar >> reporte >> fin_etl

print("Pipeline ETL completo configurado")

## 13. Visualizaci√≥n de la Estructura del DAG

In [None]:
# ============================================
# VISUALIZAR ESTRUCTURA DE LOS DAGS CREADOS
# ============================================

# Lista con todos los DAGs que hemos creado
dags_creados = [
    ('mi_primer_dag', dag),
    ('dag_con_tareas_paralelas', dag_paralelo),
    ('ejemplo_xcom', dag_xcom),
    ('pipeline_etl_completo', dag_etl)
]

print("\n" + "="*60)
print("RESUMEN DE DAGs CREADOS")
print("="*60)

# Iterar sobre cada DAG y mostrar su informaci√≥n
for nombre, dag_obj in dags_creados:
    print(f"\nDAG: {nombre}")
    print(f"  Descripci√≥n: {dag_obj.description}")
    print(f"  Schedule: {dag_obj.schedule_interval}")
    print(f"  N√∫mero de tareas: {len(dag_obj.tasks)}")
    # Listar los IDs de todas las tareas del DAG
    print(f"  Tareas: {[t.task_id for t in dag_obj.tasks]}")

## 14. Comandos √ötiles de Airflow

### Comandos CLI b√°sicos:

```bash
# Inicializar la base de datos
airflow db init

# Crear usuario admin
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com

# Iniciar el webserver
airflow webserver --port 8080

# Iniciar el scheduler
airflow scheduler

# Listar DAGs
airflow dags list

# Probar una tarea espec√≠fica
airflow tasks test <dag_id> <task_id> <execution_date>

# Ejecutar un DAG manualmente
airflow dags trigger <dag_id>
```

## 15. Mejores Pr√°cticas

### 1. Dise√±o de DAGs
- Mant√©n los DAGs simples y enfocados
- Evita dependencias circulares
- Usa nombres descriptivos para tareas

### 2. Manejo de Errores
- Configura reintentos apropiados
- Implementa manejo de excepciones
- Usa alertas para fallos cr√≠ticos

### 3. Rendimiento
- No uses XCom para datos grandes
- Paraleliza tareas cuando sea posible
- Configura pools para limitar concurrencia

### 4. Monitoreo
- Implementa logs detallados
- Usa SLAs para tareas cr√≠ticas
- Monitorea el estado de los DAGs regularmente

## 19. Ejercicios Pr√°cticos

### Ejercicio 1: DAG Simple
Crea un DAG que:
1. Lea un archivo CSV
2. Filtre datos por una condici√≥n
3. Guarde el resultado

### Ejercicio 2: DAG con Branching
Crea un DAG que ejecute diferentes ramas seg√∫n una condici√≥n

### Ejercicio 3: Pipeline de ML
Crea un DAG que:
1. Cargue datos
2. Entrene un modelo
3. Eval√∫e el modelo
4. Guarde el mejor modelo

### Ejercicio 4: TaskFlow API Challenge
Convierte un DAG tradicional a TaskFlow API

### Ejercicio 5: Sensor Personalizado
Crea un sensor que espere a que una API espec√≠fica est√© disponible

### Ejercicio 6: Pipeline con Hooks
Crea un pipeline que:
1. Use HttpHook para obtener datos de una API
2. Procese los datos
3. Use un Hook de base de datos para guardar los resultados

In [None]:
from airflow.decorators import dag, task
from datetime import datetime
import json

@dag(
    dag_id='pipeline_completo_avanzado',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    description='Pipeline que combina TaskFlow, Sensors y Hooks',
    tags=['completo', 'avanzado', 'produccion']
)
def advanced_pipeline():
    """
    Pipeline avanzado que demuestra:
    - TaskFlow API para c√≥digo limpio
    - Sensors para esperar condiciones
    - Hooks para integraciones externas
    """
    
    @task
    def setup_pipeline():
        """Inicializa el pipeline"""
        print("="*60)
        print("INICIANDO PIPELINE AVANZADO")
        print(f"Timestamp: {datetime.now()}")
        print("="*60)
        return {'status': 'initialized', 'timestamp': str(datetime.now())}
    
    @task
    def fetch_from_api():
        """Simula fetch de API con Hook"""
        print("\n[1] Obteniendo datos de API...")
        # En producci√≥n: HttpHook(http_conn_id='api_prod').run()
        data = {
            'users': [
                {'id': 1, 'name': 'Alice', 'purchases': 150},
                {'id': 2, 'name': 'Bob', 'purchases': 200},
                {'id': 3, 'name': 'Charlie', 'purchases': 175}
            ],
            'timestamp': str(datetime.now())
        }
        print(f"   Obtenidos {len(data['users'])} usuarios")
        return data
    
    @task
    def validate_and_transform(data: dict):
        """Valida y transforma los datos"""
        print("\n[2] Validando y transformando datos...")
        users = data['users']
        
        # Validaci√≥n
        valid_users = [u for u in users if u['purchases'] > 0]
        print(f"   Usuarios v√°lidos: {len(valid_users)}/{len(users)}")
        
        # Transformaci√≥n
        transformed = [
            {
                'user_id': u['id'],
                'user_name': u['name'].upper(),
                'total_purchases': u['purchases'],
                'category': 'premium' if u['purchases'] > 180 else 'standard',
                'processed_at': str(datetime.now())
            }
            for u in valid_users
        ]
        
        print(f"   Transformados {len(transformed)} registros")
        return transformed
    
    @task
    def calculate_metrics(users: list):
        """Calcula m√©tricas del negocio"""
        print("\n[3] Calculando m√©tricas...")
        
        total_purchases = sum(u['total_purchases'] for u in users)
        premium_users = len([u for u in users if u['category'] == 'premium'])
        
        metrics = {
            'total_users': len(users),
            'total_revenue': total_purchases,
            'average_purchase': total_purchases / len(users) if users else 0,
            'premium_users': premium_users,
            'premium_percentage': (premium_users / len(users) * 100) if users else 0
        }
        
        print("   M√©tricas calculadas:")
        for key, value in metrics.items():
            print(f"     {key}: {value}")
        
        return metrics
    
    @task
    def save_to_database(users: list, metrics: dict):
        """Guarda datos en base de datos"""
        print("\n[4] Guardando en base de datos...")
        # En producci√≥n: PostgresHook(postgres_conn_id='db_prod').run()
        
        print(f"   Insertando {len(users)} usuarios...")
        print(f"   Insertando m√©tricas: {metrics['total_revenue']} revenue")
        print("   ‚úì Guardado exitoso")
        
        return {'status': 'saved', 'records': len(users)}
    
    @task
    def send_report(save_status: dict, metrics: dict):
        """Env√≠a reporte final"""
        print("\n[5] Generando reporte final...")
        print("="*60)
        print("REPORTE EJECUTIVO - PIPELINE COMPLETADO")
        print("="*60)
        print(f"Estado: {save_status['status'].upper()}")
        print(f"Registros procesados: {save_status['records']}")
        print(f"Revenue total: ${metrics['total_revenue']}")
        print(f"Usuarios premium: {metrics['premium_users']} ({metrics['premium_percentage']:.1f}%)")
        print(f"Compra promedio: ${metrics['average_purchase']:.2f}")
        print("="*60)
        print("Pipeline finalizado exitosamente\n")
    
    # Definir el flujo completo
    init = setup_pipeline()
    api_data = fetch_from_api()
    transformed_data = validate_and_transform(api_data)
    metrics = calculate_metrics(transformed_data)
    save_status = save_to_database(transformed_data, metrics)
    
    # Establecer dependencias
    init >> api_data
    send_report(save_status, metrics)

# Instanciar el DAG
advanced_dag = advanced_pipeline()

print("\n" + "="*60)
print("DAG AVANZADO CREADO EXITOSAMENTE")
print("="*60)
print("Caracter√≠sticas:")
print("  ‚úì TaskFlow API para c√≥digo limpio")
print("  ‚úì Type hints para mejor desarrollo")
print("  ‚úì Paso autom√°tico de datos entre tareas")
print("  ‚úì Estructura modular y escalable")
print("  ‚úì Integraci√≥n con Hooks y Sensors")
print("="*60 + "\n")

## 20. Recursos Adicionales

### Documentaci√≥n Oficial
- [Apache Airflow Documentation](https://airflow.apache.org/docs/)
- [Airflow Best Practices](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html)
- [TaskFlow API Tutorial](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
- [Pythonic DAGs with TaskFlow](https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html)
- [TaskFlow Concepts](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html)

### Tutoriales Externos
- [Apache Airflow Tutorial: Ultimate Guide 2024 - Innowise](https://innowise.com/blog/apache-airflow-introduction/)
- [Getting Started with Apache Airflow - DataCamp](https://www.datacamp.com/tutorial/getting-started-with-apache-airflow)

### Operadores √ötiles
- **PythonOperator**: Ejecutar funciones Python
- **BashOperator**: Ejecutar comandos Bash
- **EmailOperator**: Enviar emails
- **HttpOperator**: Llamadas HTTP/API
- **SqlOperator**: Consultas SQL
- **BranchPythonOperator**: Ramificaci√≥n condicional

### Sensors Comunes
- **FileSensor**: Detectar archivos
- **TimeSensor**: Esperar hasta una hora
- **HttpSensor**: Verificar endpoints
- **ExternalTaskSensor**: Esperar otras tareas

### Hooks Populares
- **HttpHook**: APIs REST
- **PostgresHook**: PostgreSQL
- **MySqlHook**: MySQL
- **S3Hook**: Amazon S3
- **GCSHook**: Google Cloud Storage
- **SlackHook**: Notificaciones Slack

### Providers
- AWS
- Google Cloud
- Azure
- Kubernetes
- Snowflake
- Databricks
- Y muchos m√°s...

In [None]:
## Conclusi√≥n

En este tutorial has aprendido:

‚úÖ Conceptos fundamentales de Apache Airflow  
‚úÖ C√≥mo crear y configurar DAGs  
‚úÖ Diferentes tipos de operadores  
‚úÖ Manejo de dependencias entre tareas  
‚úÖ Ejecuci√≥n paralela de tareas  
‚úÖ Compartir datos con XCom  
‚úÖ Programaci√≥n de DAGs  
‚úÖ Implementaci√≥n de pipelines ETL completos  
‚úÖ **TaskFlow API (Airflow 2.0+)** - C√≥digo m√°s limpio y Pythonic  
‚úÖ **Sensors** - Esperar condiciones antes de continuar  
‚úÖ **Hooks** - Integraci√≥n con servicios externos  
‚úÖ Mejores pr√°cticas  

### Lo Nuevo en Airflow 2.0+

**TaskFlow API** revoluciona la forma de escribir DAGs:
- Decoradores simples `@dag` y `@task`
- Paso autom√°tico de datos (sin XCom manual)
- Type hints para mejor desarrollo
- C√≥digo m√°s limpio y mantenible

**Sensors mejorados** con decoradores:
- `@task.sensor` para crear sensors personalizados
- Mejor manejo de recursos con `mode='reschedule'`
- Integraci√≥n perfecta con TaskFlow

**Hooks modernos**:
- Mejor abstracci√≥n de conexiones externas
- Manejo seguro de credenciales
- Integraci√≥n con cientos de servicios

### Pr√≥ximos Pasos
1. Practica creando tus propios DAGs con TaskFlow API
2. Explora diferentes Sensors y Hooks
3. Implementa pipelines de datos reales
4. Aprende sobre Airflow en producci√≥n
5. Explora integraciones con Cloud Providers
6. Investiga sobre Airflow en Kubernetes

### Recursos Avanzados
- [Understanding TaskFlow API - Restack](https://www.restack.io/docs/airflow-faq-tutorial-taskflow-01)
- [Apache Airflow Components - Restack](https://www.restack.io/docs/airflow-faq-core-concepts-tasks-07)
- Comunidad Airflow en Slack
- Airflow Summit (conferencia anual)

¬°Feliz orquestaci√≥n de flujos de trabajo! üöÄ

---

**Nota**: Este notebook ha sido enriquecido con informaci√≥n de:
- Documentaci√≥n oficial de Apache Airflow
- Tutoriales de la comunidad
- Mejores pr√°cticas de industria
- Ejemplos pr√°cticos de producci√≥n

## 18. Hooks en Airflow

### ¬øQu√© son los Hooks?

Los **Hooks** son interfaces para interactuar con sistemas externos:
- **Abstraen** la conexi√≥n a servicios externos
- **Reutilizan** conexiones y credenciales
- **Simplifican** la interacci√≥n con APIs, bases de datos, servicios cloud

### Hooks Comunes
- **HttpHook**: API REST
- **PostgresHook**: PostgreSQL
- **MySqlHook**: MySQL
- **S3Hook**: Amazon S3
- **GCSHook**: Google Cloud Storage
- **SlackHook**: Slack

### Ventajas
- Manejo centralizado de conexiones
- Credenciales seguras en Airflow Connections
- C√≥digo m√°s limpio y mantenible

In [None]:
# ============================================
# SENSOR PERSONALIZADO CON @task.sensor
# ============================================

from airflow.decorators import dag, task
from datetime import datetime
import random


@dag(
    dag_id='custom_sensor_ejemplo',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,  # Solo ejecuci√≥n manual
    catchup=False
)
def dag_with_custom_sensor():
    """DAG con sensor personalizado usando decorador"""
    
    # ============================================
    # @task.sensor: Crear sensor personalizado
    # ============================================
    @task.sensor(
        poke_interval=10,    # Verificar cada 10 segundos
        timeout=120,         # Timeout despu√©s de 2 minutos
        mode='poke'          # Modo de ejecuci√≥n
    )
    def wait_for_condition() -> bool:
        """Sensor que espera una condici√≥n personalizada
        
        IMPORTANTE: Debe retornar bool
        - True: Condici√≥n cumplida, continuar
        - False: Condici√≥n no cumplida, seguir esperando
        """
        # Simulamos verificar una condici√≥n externa (ej: API, archivo, etc)
        condition_met = random.random() > 0.7  # 30% de probabilidad
        
        if condition_met:
            print("‚úì Condici√≥n cumplida!")
            return True  # Sensor deja de esperar
        else:
            print("‚úó Condici√≥n no cumplida, esperando...")
            return False  # Sensor sigue verificando
    
    @task
    def process_when_ready():
        """Esta tarea solo se ejecuta cuando el sensor retorna True"""
        print("Sensor detect√≥ que la condici√≥n se cumpli√≥")
        print("Iniciando procesamiento...")
        return "Proceso completado"
    
    # ============================================
    # FLUJO CON SENSOR PERSONALIZADO
    # ============================================
    # El proceso solo inicia cuando wait_for_condition() retorna True
    wait_for_condition() >> process_when_ready()


# Instanciar el DAG
custom_sensor_dag = dag_with_custom_sensor()

print("Sensor personalizado con decorador @task.sensor creado")

### Sensor Personalizado con Decorador @task.sensor

In [None]:
# ============================================
# SENSORS - ESPERAR CONDICIONES
# ============================================

from airflow.decorators import dag, task
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime
import os


@dag(
    dag_id='ejemplo_sensors',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@hourly',
    catchup=False,
    tags=['sensors', 'avanzado']
)
def pipeline_with_sensors():
    """DAG que demuestra el uso de Sensors"""
    
    # ============================================
    # TIMESENSOR: Espera hasta una hora espec√≠fica
    # ============================================
    wait_for_morning = TimeSensor(
        task_id='wait_for_9am',
        # target_time: Hora objetivo a esperar
        target_time=datetime.strptime('09:00:00', '%H:%M:%S').time(),
        # poke_interval: Cada cu√°nto verificar (300 seg = 5 min)
        poke_interval=300,
        # mode='reschedule': Libera el worker mientras espera (m√°s eficiente)
        mode='reschedule'
    )
    
    @task
    def create_temp_file():
        """Crea archivo temporal para demostraci√≥n"""
        filepath = '/tmp/airflow_sensor_test.txt'
        with open(filepath, 'w') as f:
            f.write('Archivo de prueba para sensor')
        print(f"Archivo creado: {filepath}")
        return filepath
    
    # ============================================
    # FILESENSOR: Espera a que exista un archivo
    # ============================================
    wait_for_file = FileSensor(
        task_id='wait_for_data_file',
        # filepath: Ruta del archivo a esperar
        filepath='/tmp/airflow_sensor_test.txt',
        # poke_interval: Verificar cada 30 segundos
        poke_interval=30,
        # timeout: Tiempo m√°ximo de espera (600 seg = 10 min)
        timeout=600,
        # mode='poke': Ocupa el worker mientras espera
        mode='poke'
    )
    
    @task
    def process_file():
        """Procesa el archivo una vez detectado por el sensor"""
        print("Archivo detectado, procesando...")
        with open('/tmp/airflow_sensor_test.txt', 'r') as f:
            content = f.read()
        print(f"Contenido: {content}")
        return "Procesamiento completado"
    
    @task
    def cleanup():
        """Limpia archivos temporales"""
        filepath = '/tmp/airflow_sensor_test.txt'
        if os.path.exists(filepath):
            os.remove(filepath)
            print(f"Archivo eliminado: {filepath}")
    
    # ============================================
    # DEFINIR FLUJO CON SENSORS
    # ============================================
    # El sensor 'wait_for_file' bloquea la ejecuci√≥n hasta que el archivo exista
    file_path = create_temp_file()
    file_path >> wait_for_file >> process_file() >> cleanup()


# Instanciar el DAG
sensors_dag = pipeline_with_sensors()

print("DAG con Sensors creado")
print("Los sensors esperan condiciones antes de continuar")

## 17. Sensors en Airflow

### ¬øQu√© son los Sensors?

Los **Sensors** son un tipo especial de operador que:
- **Esperan** a que se cumpla una condici√≥n espec√≠fica
- **Verifican peri√≥dicamente** (polling) el estado
- **Bloquean** la ejecuci√≥n hasta que la condici√≥n se cumple

### Tipos Comunes de Sensors
- **FileSensor**: Espera a que exista un archivo
- **ExternalTaskSensor**: Espera a que otra tarea termine
- **TimeSensor**: Espera hasta una hora espec√≠fica
- **HttpSensor**: Espera respuesta HTTP
- **SqlSensor**: Espera resultado SQL

### Par√°metros Importantes
- `poke_interval`: Tiempo entre verificaciones (segundos)
- `timeout`: Tiempo m√°ximo de espera
- `mode`: 'poke' (ocupa worker) o 'reschedule' (libera worker)

In [None]:
# ============================================
# PIPELINE ETL CON TASKFLOW API
# ============================================

from airflow.decorators import dag, task
from datetime import datetime
from typing import List, Dict  # Para type hints


@dag(
    dag_id='etl_taskflow_completo',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    description='Pipeline ETL usando TaskFlow API',
    tags=['taskflow', 'etl', 'avanzado']
)
def etl_pipeline_taskflow():
    """Pipeline ETL completo usando decoradores"""
    
    @task
    def extract_from_sources() -> List[Dict]:
        """Extrae datos - Retorna lista de diccionarios"""
        print("Extrayendo datos...")
        return [
            {'id': 1, 'value': 100, 'category': 'A'},
            {'id': 2, 'value': 200, 'category': 'B'},
            {'id': 3, 'value': 150, 'category': 'A'},
            {'id': 4, 'value': 300, 'category': 'C'}
        ]
    
    @task
    def validate_data(data: List[Dict]) -> List[Dict]:
        """Valida datos - Recibe y retorna lista de diccionarios"""
        print(f"Validando {len(data)} registros...")
        # List comprehension para filtrar datos v√°lidos
        valid_data = [item for item in data if item['value'] > 0]
        print(f"Registros v√°lidos: {len(valid_data)}")
        return valid_data
    
    @task
    def transform_by_category(data: List[Dict]) -> Dict[str, List[Dict]]:
        """Agrupa y transforma por categor√≠a - Retorna diccionario"""
        print("Transformando datos por categor√≠a...")
        categories = {}
        
        # Agrupar por categor√≠a y duplicar valores
        for item in data:
            cat = item['category']
            if cat not in categories:
                categories[cat] = []
            categories[cat].append({
                'id': item['id'],
                'value_doubled': item['value'] * 2,  # Transformaci√≥n
                'category': cat
            })
        return categories
    
    @task
    def calculate_statistics(grouped_data: Dict[str, List[Dict]]) -> Dict:
        """Calcula estad√≠sticas por categor√≠a"""
        print("Calculando estad√≠sticas...")
        stats = {}
        
        # Calcular totales y promedios por categor√≠a
        for category, items in grouped_data.items():
            total = sum(item['value_doubled'] for item in items)
            count = len(items)
            stats[category] = {
                'total': total,
                'count': count,
                'average': total / count if count > 0 else 0
            }
        return stats
    
    @task
    def load_to_database(stats: Dict) -> str:
        """Carga estad√≠sticas a base de datos"""
        print("Cargando datos a la base de datos...")
        for category, data in stats.items():
            print(f"  Categor√≠a {category}: Total={data['total']}, Promedio={data['average']:.2f}")
        return "Carga completada exitosamente"
    
    @task
    def send_notification(status: str) -> None:
        """Env√≠a notificaci√≥n final"""
        print(f"\n{'='*50}")
        print(f"NOTIFICACI√ìN: {status}")
        print(f"Pipeline completado en: {datetime.now()}")
        print(f"{'='*50}\n")
    
    # ============================================
    # DEFINIR EL FLUJO DEL PIPELINE
    # ============================================
    # Cada variable contiene el resultado de la tarea anterior
    # El paso de datos es AUTOM√ÅTICO
    
    raw_data = extract_from_sources()           # 1. Extraer
    valid_data = validate_data(raw_data)        # 2. Validar (recibe raw_data)
    grouped_data = transform_by_category(valid_data)  # 3. Transformar (recibe valid_data)
    statistics = calculate_statistics(grouped_data)   # 4. Calcular stats (recibe grouped_data)
    status = load_to_database(statistics)       # 5. Cargar (recibe statistics)
    send_notification(status)                   # 6. Notificar (recibe status)


# Instanciar el DAG
etl_taskflow_dag = etl_pipeline_taskflow()

print("Pipeline ETL con TaskFlow API creado")
print("Ventajas: Type hints, paso autom√°tico de datos, c√≥digo m√°s legible")

### Pipeline ETL Completo con TaskFlow API

In [None]:
# ============================================
# TASKFLOW API (Airflow 2.0+) - MODERNO
# ============================================
# Forma moderna y m√°s limpia de crear DAGs

from airflow.decorators import dag, task
from datetime import datetime


# @dag: Decorador que convierte una funci√≥n en un DAG
@dag(
    dag_id='taskflow_ejemplo',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['taskflow', 'moderno']
)
def pipeline_taskflow():
    """Funci√≥n que define el DAG completo"""
    
    # @task: Decorador que convierte una funci√≥n en una tarea
    @task
    def extract() -> dict:
        """Extrae datos - El tipo de retorno es expl√≠cito"""
        return {'data': 100}
    
    @task
    def transform(data: dict) -> int:
        """Transforma datos - Recibe data como par√°metro directamente
        ¬°No necesitamos XCom manual!"""
        return data['data'] * 2
    
    @task
    def load(value: int) -> None:
        """Carga datos - Type hints hacen el c√≥digo m√°s claro"""
        print(f"Valor final: {value}")
    
    # ============================================
    # DEFINIR EL FLUJO - Syntax Pythonic
    # ============================================
    # El paso de datos es AUTOM√ÅTICO entre tareas
    data = extract()                # Llama a extract()
    transformed = transform(data)   # Pasa 'data' autom√°ticamente
    load(transformed)               # Pasa 'transformed' autom√°ticamente


# Instanciar el DAG (necesario para que Airflow lo detecte)
dag_taskflow = pipeline_taskflow()

print("TaskFlow API: C√≥digo m√°s limpio y Pythonic")
print("El paso de datos entre tareas es autom√°tico")

In [None]:
# ============================================
# FORMA TRADICIONAL (Airflow 1.x)
# ============================================
# Este es el m√©todo antiguo de crear DAGs en Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator


def extract_tradicional(**context):
    """Funci√≥n que extrae datos"""
    return {'data': 100}


def transform_tradicional(**context):
    """Funci√≥n que transforma datos"""
    # PROBLEMA: Necesitamos usar XCom manualmente
    ti = context['ti']  # Obtener Task Instance
    data = ti.xcom_pull(task_ids='extract')  # Recuperar datos manualmente
    return data['data'] * 2


# Crear DAG tradicional usando 'with' statement
with DAG('tradicional', start_date=datetime(2024, 1, 1)) as dag_trad:
    
    # Crear tarea de extracci√≥n
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_tradicional
    )
    
    # Crear tarea de transformaci√≥n
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_tradicional
    )
    
    # Definir dependencias
    extract >> transform

print("Forma tradicional: Requiere m√°s c√≥digo y manejo manual de XCom")

In [None]:
# Espacio para tus ejercicios

# Ejercicio 1: Tu c√≥digo aqu√≠


# Ejercicio 2: Tu c√≥digo aqu√≠


# Ejercicio 3: Tu c√≥digo aqu√≠


## 17. Recursos Adicionales

### Documentaci√≥n Oficial
- [Apache Airflow Documentation](https://airflow.apache.org/docs/)
- [Airflow Best Practices](https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html)

### Operadores √ötiles
- **PythonOperator**: Ejecutar funciones Python
- **BashOperator**: Ejecutar comandos Bash
- **EmailOperator**: Enviar emails
- **HttpOperator**: Llamadas HTTP/API
- **SqlOperator**: Consultas SQL
- **BranchPythonOperator**: Ramificaci√≥n condicional

### Providers
- AWS
- Google Cloud
- Azure
- Kubernetes
- Snowflake
- Y muchos m√°s...

## Conclusi√≥n

En este tutorial has aprendido:

‚úÖ Conceptos fundamentales de Apache Airflow  
‚úÖ C√≥mo crear y configurar DAGs  
‚úÖ Diferentes tipos de operadores  
‚úÖ Manejo de dependencias entre tareas  
‚úÖ Ejecuci√≥n paralela de tareas  
‚úÖ Compartir datos con XCom  
‚úÖ Programaci√≥n de DAGs  
‚úÖ Implementaci√≥n de pipelines ETL completos  
‚úÖ Mejores pr√°cticas  

### Pr√≥ximos Pasos
1. Practica creando tus propios DAGs
2. Explora diferentes operadores
3. Implementa pipelines de datos reales
4. Aprende sobre Airflow en producci√≥n
5. Explora integraciones con otras herramientas

¬°Feliz orquestaci√≥n de flujos de trabajo! üöÄ