# Apache Airflow: Conceptos Fundamentales

Este notebook introduce los conceptos fundamentales de Apache Airflow para estudiantes de Data Engineering.

## ¿Qué es Apache Airflow?

Apache Airflow es una plataforma de código abierto para programar, monitorear y gestionar flujos de trabajo complejos. Fue desarrollado originalmente por Airbnb en 2014 y se convirtió en un proyecto de la Apache Software Foundation en 2019.

### Características principales:

- **Programación en Python**: Los flujos de trabajo se definen como código Python
- **Interfaz web intuitiva**: Para monitorear y gestionar los flujos de trabajo
- **Escalabilidad**: Puede orquestar miles de tareas
- **Extensibilidad**: Amplio ecosistema de plugins y operadores
- **Integración con múltiples fuentes de datos y plataformas**

## Arquitectura de Airflow

Airflow tiene una arquitectura modular compuesta por varios componentes clave:

### 1. Webserver
- Interfaz de usuario que permite visualizar DAGs, tareas, logs y métricas
- Facilita la activación/desactivación de DAGs, retrying de tareas, etc.

### 2. Scheduler
- Componente que determina qué tareas necesitan ser ejecutadas y cuándo
- Monitorea el estado de las tareas y DAGs

### 3. Executor
- Define cómo se ejecutan las tareas
- Tipos comunes: LocalExecutor, CeleryExecutor, KubernetesExecutor

### 4. Metastore (Base de datos)
- Almacena metadata sobre DAGs, tareas, variables, conexiones, etc.
- Puede ser SQLite (desarrollo), PostgreSQL, MySQL (producción)

### 5. Workers
- Procesos que realmente ejecutan las tareas (cuando se usa un ejecutor distribuido como Celery)

### 6. Queue
- Sistema de mensajería que permite la comunicación entre el Scheduler y los Workers, utilizado para distribuir tareas a los Workers en arquitecturas distribuidas(Ejemplos comunes: RabbitMQ, Redis)



Airflow ejecuta DAGs en seis pasos diferentes:

- El programador explora constantemente el directorio de DAGs en busca de nuevos archivos. El tiempo por defecto es cada 5 minutos.   
- Una vez que el programador detecta un nuevo DAG, éste se procesa y se serializa en la base de datos de metadatos.   
- El programador busca DAGs que estén listos para ejecutarse en la base de datos de metadatos. El tiempo predeterminado es cada 5 segundos.   
- Una vez que un DAG está listo para ejecutarse, sus tareas se ponen en la cola del ejecutor.   
- Una vez que un trabajador está disponible, recuperará una tarea de la cola para ejecutarla.   
- El trabajador ejecutará la tarea.   



## Conceptos clave en Airflow

### 1. DAG (Directed Acyclic Graph)

Un DAG es un grafo dirigido acíclico que representa un flujo de trabajo. Es una colección de tareas con dependencias específicas entre ellas, pero sin ciclos (una tarea no puede depender directa o indirectamente de sí misma).

In [None]:
# Ejemplo de definición de un DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator

# Argumentos por defecto para el DAG
default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Definición del DAG
with DAG(
    'ejemplo_basico',
    default_args=default_args,
    description='Un DAG de ejemplo',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
    tags=['ejemplo'],
) as dag:
    
    inicio = DummyOperator(task_id='inicio')
    tarea1 = DummyOperator(task_id='tarea1')
    tarea2 = DummyOperator(task_id='tarea2')
    fin = DummyOperator(task_id='fin')
    
    # Definir dependencias entre tareas
    inicio >> tarea1 >> fin
    inicio >> tarea2 >> fin

### Parámetros importantes de un DAG

- **dag_id**: Identificador único del DAG
- **description**: Descripción del propósito del DAG
- **schedule_interval**: Frecuencia de ejecución (cron, timedelta, etc.)
- **start_date**: Fecha desde la que se considera el DAG activo
- **catchup**: Si debe ejecutar ejecuciones pasadas pendientes
- **default_args**: Argumentos por defecto para todas las tareas del DAG
- **tags**: Etiquetas para categorizar el DAG

### 2. Operadores

Los operadores determinan qué se hace en cada tarea. Airflow proporciona muchos operadores predefinidos para tareas comunes.

#### Tipos comunes de operadores:

1. **BashOperator**: Ejecuta un comando bash
2. **PythonOperator**: Ejecuta una función Python
3. **SQLOperator**: Ejecuta consultas SQL
4. **EmailOperator**: Envía un email
5. **SimpleHttpOperator**: Envía una solicitud HTTP
6. **DummyOperator**: No hace nada, útil para estructurar DAGs
7. **Operadores específicos para servicios**: S3, GCS, BigQuery, Spark, etc.

In [None]:
# Ejemplos de diferentes operadores
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# Función Python para el PythonOperator
def procesar_datos(**kwargs):
    print("Procesando datos...")
    return "Datos procesados"

# En un DAG:
tarea_bash = BashOperator(
    task_id='ejecutar_script',
    bash_command='echo "Ejecutando script" && date',
    dag=dag,
)

tarea_python = PythonOperator(
    task_id='procesar_datos',
    python_callable=procesar_datos,
    dag=dag,
)

tarea_sql = PostgresOperator(
    task_id='crear_tabla',
    postgres_conn_id='postgres_default',
    sql="""CREATE TABLE IF NOT EXISTS usuarios (id SERIAL PRIMARY KEY, nombre VARCHAR);""",
    dag=dag,
)

### 3. Sensores

Los sensores son un tipo especial de operador que espera a que ocurra un determinado evento.

#### Tipos comunes de sensores:

1. **FileSensor**: Espera la aparición de un archivo
2. **SqlSensor**: Espera que una consulta SQL devuelva resultados
3. **HttpSensor**: Espera que un endpoint HTTP devuelva un resultado específico
4. **ExternalTaskSensor**: Espera la finalización de una tarea en otro DAG
5. **TimeSensor**: Espera hasta un momento específico del día

In [None]:
# Ejemplo de un sensor
from airflow.sensors.filesystem import FileSensor

# En un DAG:
esperar_archivo = FileSensor(
    task_id='esperar_archivo_csv',
    filepath='/data/eventos.csv',
    poke_interval=300,  # Verificar cada 5 minutos
    timeout=60 * 60 * 24,  # Timeout después de 24 horas
    mode='poke',  # Modo de sensado
    dag=dag,
)

### 4. Tasks

Una tarea es una instancia de un operador y representa un trabajo específico que debe realizarse como parte de un DAG.

#### Ciclo de vida de una tarea:

1. **none**: Estado inicial
2. **scheduled**: Programada para ejecución
3. **queued**: En cola para ejecución
4. **running**: En ejecución
5. **success/failed/skipped/upstream_failed**: Estados finales

### 5. Task dependencies

Las dependencias entre tareas determinan el orden de ejecución. En Airflow, estas se pueden definir usando diferentes sintaxis:

In [None]:
# Diferentes formas de definir dependencias entre tareas

# Método 1: Operador de bit shift
tarea_a >> tarea_b >> tarea_c  # A se ejecuta antes que B, B antes que C
tarea_a << tarea_b << tarea_c  # C se ejecuta antes que B, B antes que A

# Método 2: Método set_upstream/set_downstream
tarea_b.set_upstream(tarea_a)   # A se ejecuta antes que B
tarea_b.set_downstream(tarea_c) # C se ejecuta después que B

# Dependencias complejas
tarea_inicio >> [tarea_a, tarea_b, tarea_c] >> tarea_union  # Bifurcación y unión

### 6. XComs (Cross-Communication)

XComs (Cross-Communications) permite que las tareas intercambien pequeñas cantidades de datos entre sí.

In [None]:
# Ejemplo de uso de XComs
def tarea_emisora(**kwargs):
    # Guardar un valor en XCom
    kwargs['ti'].xcom_push(key='resultado_analisis', value={'estado': 'completado', 'registros': 1250})
    return "Tarea completada"

def tarea_receptora(**kwargs):
    # Recuperar valor de XCom
    ti = kwargs['ti']
    resultado = ti.xcom_pull(task_ids='tarea_emisora', key='resultado_analisis')
    print(f"Resultado recuperado: {resultado}")
    return f"Procesados {resultado['registros']} registros"

# En un DAG:
emisor = PythonOperator(
    task_id='tarea_emisora',
    python_callable=tarea_emisora,
    provide_context=True,
    dag=dag,
)

receptor = PythonOperator(
    task_id='tarea_receptora',
    python_callable=tarea_receptora,
    provide_context=True,
    dag=dag,
)

emisor >> receptor

### 7. Variables

Las variables de Airflow permiten almacenar y recuperar valores que se utilizan en los DAGs. Se almacenan en la base de datos de Airflow y se pueden acceder a través del código.

In [None]:
# Ejemplo de uso de Variables de Airflow
from airflow.models import Variable

# En un operador Python:
def procesar_con_config(**kwargs):
    # Recuperar una variable
    api_key = Variable.get("api_key")
    entorno = Variable.get("entorno", default_var="desarrollo")
    
    # Variables pueden ser JSON
    config = Variable.get("config_json", deserialize_json=True)
    umbral = config.get('umbral', 100)
    
    print(f"Procesando en entorno {entorno} con umbral {umbral}")
    return "Procesamiento completo"

### 8. Conexiones

Las conexiones almacenan información para conectarse a sistemas externos (bases de datos, APIs, etc.).

In [None]:
# Ejemplo de uso de una conexión en un operador
from airflow.hooks.postgres_hook import PostgresHook

def consultar_base_datos(**kwargs):
    # Crear un hook usando el ID de conexión
    postgres_hook = PostgresHook(postgres_conn_id='postgres_data_warehouse')
    
    # Ejecutar una consulta
    registros = postgres_hook.get_records("SELECT * FROM ventas LIMIT 10")
    
    print(f"Se encontraron {len(registros)} registros")
    return registros

## Conceptos avanzados

### 1. Branching

El branching permite tomar diferentes caminos en un DAG basado en alguna lógica o condición.

In [None]:
# Ejemplo de branching
from airflow.operators.python import BranchPythonOperator

def elegir_camino(**kwargs):
    # Alguna lógica para decidir qué camino tomar
    if Variable.get("entorno") == "produccion":
        return 'tarea_produccion'
    else:
        return 'tarea_desarrollo'

# En un DAG:
rama = BranchPythonOperator(
    task_id='elegir_camino',
    python_callable=elegir_camino,
    provide_context=True,
    dag=dag,
)

tarea_prod = DummyOperator(task_id='tarea_produccion', dag=dag)
tarea_dev = DummyOperator(task_id='tarea_desarrollo', dag=dag)
final = DummyOperator(task_id='tarea_final', trigger_rule='one_success', dag=dag)

rama >> [tarea_prod, tarea_dev] >> final

### 2. Trigger Rules

Las reglas de disparo determinan cuándo una tarea debe ejecutarse basado en el estado de sus tareas upstream.

Opciones comunes:

- **all_success**: Todas las tareas upstream deben tener éxito (default)
- **all_failed**: Todas las tareas upstream deben fallar
- **all_done**: Todas las tareas upstream deben completarse (éxito o fallo)
- **one_success**: Al menos una tarea upstream debe tener éxito
- **one_failed**: Al menos una tarea upstream debe fallar
- **none_failed**: Todas las tareas upstream no deben fallar (pueden tener éxito o ser omitidas)
- **none_skipped**: Ninguna tarea upstream debe ser omitida

In [None]:
# Ejemplo de trigger rule
tarea_final = DummyOperator(
    task_id='tarea_final',
    trigger_rule='one_success',  # Se ejecuta si al menos una tarea upstream tiene éxito
    dag=dag,
)

### 3. SubDAGs

Los SubDAGs permiten encapsular un conjunto de tareas como una sola tarea dentro de un DAG principal, lo que ayuda a organizar flujos de trabajo complejos.

In [None]:
# Ejemplo de SubDAG
from airflow.operators.subdag import SubDagOperator

# Función que crea y devuelve un subDAG
def crear_subdag_procesamiento(parent_dag_id, child_dag_id, args):
    with DAG(
        dag_id=f'{parent_dag_id}.{child_dag_id}',
        default_args=args,
        schedule_interval=None,
    ) as dag_hijo:
        tarea1 = DummyOperator(task_id='subtarea1', dag=dag_hijo)
        tarea2 = DummyOperator(task_id='subtarea2', dag=dag_hijo)
        tarea1 >> tarea2
        return dag_hijo

# En el DAG principal:
procesamiento = SubDagOperator(
    task_id='procesar_datos',
    subdag=crear_subdag_procesamiento('dag_principal', 'procesar_datos', default_args),
    dag=dag_principal,
)

### 4. TaskGroups

TaskGroups es una alternativa más moderna a SubDAGs, y proporciona una manera de agrupar visualmente las tareas en la interfaz de usuario sin las complejidades de los SubDAGs.

In [None]:
# Ejemplo de TaskGroup
from airflow.utils.task_group import TaskGroup

# En un DAG:
with TaskGroup(group_id='procesar_datos') as grupo_procesamiento:
    tarea1 = DummyOperator(task_id='extraer')
    tarea2 = DummyOperator(task_id='transformar')
    tarea3 = DummyOperator(task_id='cargar')
    
    tarea1 >> tarea2 >> tarea3

inicio >> grupo_procesamiento >> fin

### 5. Pools

Los pools limitan el número de tareas que pueden ejecutarse simultáneamente en un grupo específico, lo que permite controlar el uso de recursos.

In [None]:
# Ejemplo de uso de pool
tarea_intensiva = BashOperator(
    task_id='tarea_intensiva_cpu',
    bash_command='echo "Procesando datos" && sleep 30',
    pool='cpu_intensive',  # Asignar a un pool específico
    pool_slots=2,          # Usar 2 slots del pool
    dag=dag,
)

## Mejores prácticas en Airflow

### 1. Idempotencia

Las tareas deben ser idempotentes, es decir, deben producir el mismo resultado independientemente de cuántas veces se ejecuten con los mismos parámetros.

```python
# Malo: Insertar directamente sin verificar
INSERT INTO tabla VALUES (1, 'valor');

# Bueno: Usar INSERT IGNORE o ON CONFLICT
INSERT INTO tabla VALUES (1, 'valor') ON CONFLICT (id) DO NOTHING;
```

### 2. Atomicidad

Cada tarea debe hacer una sola cosa bien definida. Evita tareas que hagan demasiadas cosas diferentes.

```python
# Malo: Una sola tarea que hace todo
def extraer_transformar_cargar():
    datos = extraer()
    datos_transformados = transformar(datos)
    cargar(datos_transformados)

# Bueno: Dividir en tareas atómicas
def extraer():
    return obtener_datos()

def transformar(**kwargs):
    ti = kwargs['ti']
    datos = ti.xcom_pull(task_ids='extraer')
    return procesar_datos(datos)

def cargar(**kwargs):
    ti = kwargs['ti']
    datos_transformados = ti.xcom_pull(task_ids='transformar')
    guardar_en_bd(datos_transformados)
```

### 3. Evitar dependencias innecesarias entre DAGs

Minimiza el acoplamiento entre DAGs diferentes para facilitar el mantenimiento y la escalabilidad.

### 4. Manejo adecuado de errores

Implementa manejo de errores y estrategias de retry apropiadas para cada tarea.

```python
tarea = PythonOperator(
    task_id='tarea_con_retry',
    python_callable=funcion_propensa_a_fallos,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    max_retry_delay=timedelta(hours=1),
    dag=dag,
)
```

### 5. No usar datos modificables entre tareas

Evita compartir datos modificables (como listas o diccionarios globales) entre tareas, ya que puede llevar a comportamientos inesperados.