                                Apache Airflow: Workflow Orchestration

La définition DAG (Directed Acyclic Graph) d’Apache Airflow est un moyen puissant de structurer les pipelines ETL. 

Principaux avantages :

-  Visualisation des workflows : les DAG offrent une représentation visuelle claire de votre processus ETL
-  Gestion des dépendances : définissez facilement les dépendances des tâches et leur ordre d’exécution
-  Ordonnancement : fonctionnalités de planification intégrées pour les workflows récurrents
-  Paramétrage : exécution dynamique basée sur des dates, des variables et des configurations
-  Surveillance : suivez l’état des tâches, leur durée d’exécution et les échecs
-  Relances et gestion des erreurs : relancez automatiquement les tâches ayant échoué et gérez les erreurs correctement

En ingénierie des données, les DAG sont essentiels pour :

-  Organisation des processus : décomposez les workflows ETL complexes en tâches gérables
-  Gestion des dépendances : assurez-vous que les tâches s’exécutent dans le bon ordre
-  Allocation des ressources : contrôlez le parallélisme et l’utilisation des ressources
-  Surveillance et alertes : suivez l’état du pipeline et recevez des notifications en cas d’échec
-  Documentation : workflows auto-documentés avec des relations claires entre les tâches

L’exemple illustre un pipeline ETL complet avec plusieurs bonnes pratiques :

-  Modularité des tâches : chaque étape du processus ETL est une tâche distincte
-  Transfert de données : utilisez XComs pour Transférer des données entre les tâches
-  Gestion des erreurs : Implémentation de nouvelles tentatives et de délais d’expiration
-  Contrôles de la qualité des données : Validation des données chargées
-  Notifications : Alerte en cas d’achèvement ou d’échec du pipeline
-  Documentation : Inclure une documentation détaillée dans la définition du DAG


Ce modèle est courant en ingénierie des données, où les pipelines de données complexes doivent être fiables, maintenables et observables.

In [None]:
# Nom du fichier : example_etl_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.models import Variable

import pandas as pd
import logging

# Arguments par défaut pour toutes les tâches du DAG
default_args = {
    'owner': 'data_engineering',
    'depends_on_past': False,
    'email': ['data_alerts@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=1),
    'start_date': datetime(2023, 1, 1),
    'catchup': False,
}

# Définition du DAG
dag = DAG(
    'customer_order_etl_pipeline',
    default_args=default_args,
    description='Pipeline ETL pour les données clients et commandes',
    schedule_interval='0 2 * * *',  # S'exécute tous les jours à 2h du matin
    max_active_runs=1,
    tags=['etl', 'customer', 'orders'],
    doc_md="""
    # Pipeline ETL Client-Commande
    
    Ce DAG effectue les opérations suivantes :
    1. Extraction des données clients et commandes
    2. Transformation : nettoyage, enrichissement, agrégation
    3. Chargement dans l'entrepôt de données
    4. Contrôles qualité
    5. Notification de succès
    
    ## Dépendances
    - Nécessite que la base opérationnelle soit disponible
    - Dépend de la complétion du DAG `daily_data_sync`
    
    ## Équipe
    Équipe Data Engineering
    """
)

# Fonction pour extraire les données clients
def extract_customer_data(**context):
    logging.info("Extraction des données clients")
    execution_date = context['execution_date']
    pg_hook = PostgresHook(postgres_conn_id='postgres_oltp')

    sql = """
    SELECT customer_id, name, email, signup_date, last_login_date, customer_segment
    FROM customers
    WHERE updated_at >= %s
    """
    cutoff_date = execution_date - timedelta(days=1)
    df = pg_hook.get_pandas_df(sql, parameters=[cutoff_date])
    logging.info(f"{len(df)} lignes client extraites")
    temp_file_path = f"/tmp/customer_extract_{execution_date.strftime('%Y%m%d')}.csv"
    df.to_csv(temp_file_path, index=False)
    context['ti'].xcom_push(key='customer_extract_path', value=temp_file_path)
    return temp_file_path

# Fonction pour extraire les données commandes
def extract_order_data(**context):
    logging.info("Extraction des données commandes")
    execution_date = context['execution_date']
    pg_hook = PostgresHook(postgres_conn_id='postgres_oltp')

    sql = """
    SELECT o.order_id, o.customer_id, o.order_date, o.total_amount, o.status,
           oi.product_id, oi.quantity, oi.unit_price
    FROM orders o
    JOIN order_items oi ON o.order_id = oi.order_id
    WHERE o.order_date >= %s
    """
    cutoff_date = execution_date - timedelta(days=1)
    df = pg_hook.get_pandas_df(sql, parameters=[cutoff_date])
    logging.info(f"{len(df)} lignes commande extraites")
    temp_file_path = f"/tmp/order_extract_{execution_date.strftime('%Y%m%d')}.csv"
    df.to_csv(temp_file_path, index=False)
    context['ti'].xcom_push(key='order_extract_path', value=temp_file_path)
    return temp_file_path

# Fonction de transformation des données
def transform_data(**context):
    logging.info("Transformation des données")
    ti = context['ti']
    customer_file = ti.xcom_pull(task_ids='extract_customer_data', key='customer_extract_path')
    order_file = ti.xcom_pull(task_ids='extract_order_data', key='order_extract_path')

    customers_df = pd.read_csv(customer_file)
    orders_df = pd.read_csv(order_file)

    # Nettoyage : suppression des doublons, valeurs manquantes
    customers_df = customers_df.drop_duplicates(subset=['customer_id'])
    orders_df = orders_df.drop_duplicates(subset=['order_id', 'product_id'])
    customers_df['customer_segment'] = customers_df['customer_segment'].fillna('Unknown')

    # Conversions de dates
    customers_df['signup_date'] = pd.to_datetime(customers_df['signup_date'])
    customers_df['last_login_date'] = pd.to_datetime(customers_df['last_login_date'])
    orders_df['order_date'] = pd.to_datetime(orders_df['order_date'])

    # Jours depuis dernière connexion
    customers_df['days_since_last_login'] = (datetime.now() - customers_df['last_login_date']).dt.days

    # Valeur vie client (LTV)
    customer_ltv = orders_df.groupby('customer_id')['total_amount'].sum().reset_index()
    customer_ltv.columns = ['customer_id', 'lifetime_value']
    enriched_customers = customers_df.merge(customer_ltv, on='customer_id', how='left')
    enriched_customers['lifetime_value'] = enriched_customers['lifetime_value'].fillna(0)

    # Résumé des commandes
    order_summary = orders_df.groupby('order_id').agg({
        'customer_id': 'first',
        'order_date': 'first',
        'total_amount': 'first',
        'status': 'first',
        'quantity': 'sum',
        'unit_price': 'mean'
    }).reset_index()

    # Sauvegarde des fichiers transformés
    execution_date = context['execution_date'].strftime('%Y%m%d')
    customer_output = f"/tmp/transformed_customers_{execution_date}.csv"
    order_output = f"/tmp/transformed_orders_{execution_date}.csv"
    enriched_customers.to_csv(customer_output, index=False)
    order_summary.to_csv(order_output, index=False)
    ti.xcom_push(key='transformed_customers_path', value=customer_output)
    ti.xcom_push(key='transformed_orders_path', value=order_output)
    logging.info(f"Transformation terminée : {len(enriched_customers)} clients, {len(order_summary)} commandes")
    return {'customers_path': customer_output, 'orders_path': order_output}

# Fonction de chargement des clients
def load_customer_data(**context):
    logging.info("Chargement des clients dans le DWH")
    ti = context['ti']
    file_path = ti.xcom_pull(task_ids='transform_data', key='transformed_customers_path')
    df = pd.read_csv(file_path)
    pg_hook = PostgresHook(postgres_conn_id='postgres_dwh')

    temp_table = 'temp_customers'
    target_table = 'dim_customers'

    pg_hook.run(f"DROP TABLE IF EXISTS {temp_table}")
    pg_hook.run(f"""
    CREATE TABLE {temp_table} (
        customer_id INTEGER PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100),
        signup_date DATE,
        last_login_date TIMESTAMP,
        customer_segment VARCHAR(50),
        days_since_last_login INTEGER,
        lifetime_value NUMERIC(10,2)
    )
    """)
    pg_hook.insert_rows(temp_table, df.values.tolist(), df.columns.tolist(), commit_every=1000)
    pg_hook.run(f"""
    INSERT INTO {target_table}
    SELECT * FROM {temp_table}
    ON CONFLICT (customer_id)
    DO UPDATE SET
        name = EXCLUDED.name,
        email = EXCLUDED.email,
        signup_date = EXCLUDED.signup_date,
        last_login_date = EXCLUDED.last_login_date,
        customer_segment = EXCLUDED.customer_segment,
        days_since_last_login = EXCLUDED.days_since_last_login,
        lifetime_value = EXCLUDED.lifetime_value
    """)
    pg_hook.run(f"DROP TABLE IF EXISTS {temp_table}")
    row_count = pg_hook.get_records(f"SELECT COUNT(*) FROM {target_table}")[0][0]
    logging.info(f"Chargement clients terminé, {row_count} lignes dans {target_table}")
    return row_count

# Fonction de chargement des commandes
def load_order_data(**context):
    logging.info("Chargement des commandes dans le DWH")
    ti = context['ti']
    file_path = ti.xcom_pull(task_ids='transform_data', key='transformed_orders_path')
    df = pd.read_csv(file_path)
    pg_hook = PostgresHook(postgres_conn_id='postgres_dwh')

    temp_table = 'temp_orders'
    target_table = 'fact_orders'

    pg_hook.run(f"DROP TABLE IF EXISTS {temp_table}")
    pg_hook.run(f"""
    CREATE TABLE {temp_table} (
        order_id INTEGER PRIMARY KEY,
        customer_id INTEGER,
        order_date DATE,
        total_amount NUMERIC(10,2),
        status VARCHAR(50),
        total_quantity INTEGER,
        average_unit_price NUMERIC(10,2)
    )
    """)
    pg_hook.insert_rows(temp_table, df.values.tolist(), df.columns.tolist(), commit_every=1000)
    pg_hook.run(f"""
    INSERT INTO {target_table}
    SELECT * FROM {temp_table}
    ON CONFLICT (order_id)
    DO UPDATE SET
        customer_id = EXCLUDED.customer_id,
        order_date = EXCLUDED.order_date,
        total_amount = EXCLUDED.total_amount,
        status = EXCLUDED.status,
        total_quantity = EXCLUDED.total_quantity,
        average_unit_price = EXCLUDED.average_unit_price
    """)
    pg_hook.run(f"DROP TABLE IF EXISTS {temp_table}")
    row_count = pg_hook.get_records(f"SELECT COUNT(*) FROM {target_table}")[0][0]
    logging.info(f"Chargement commandes terminé, {row_count} lignes dans {target_table}")
    return row_count

# Vérification qualité des données
def run_data_quality_checks(**context):
    logging.info("Lancement des contrôles qualité")
    pg_hook = PostgresHook(postgres_conn_id='postgres_dwh')
    checks = [
        {
            'name': 'Emails clients manquants',
            'query': "SELECT COUNT(*) FROM dim_customers WHERE email IS NULL OR email = ''",
            'threshold': 0,
            'operator': 'lte'
        },
        {
            'name': 'Commandes sans clients',
            'query': """
                SELECT COUNT(*) FROM fact_orders o
                LEFT JOIN dim_customers c ON o.customer_id = c.customer_id
                WHERE c.customer_id IS NULL
            """,
            'threshold': 0,
            'operator': 'lte'
        },
        {
            'name': 'Montants négatifs',
            'query': "SELECT COUNT(*) FROM fact_orders WHERE total_amount < 0",
            'threshold': 0,
            'operator': 'lte'
        },
        {
            'name': 'Nombre minimum de clients',
            'query': "SELECT COUNT(*) FROM dim_customers",
            'threshold': 100,
            'operator': 'gte'
        }
    ]
    failed_checks = []
    for check in checks:
        result = pg_hook.get_first(check['query'])[0]
        if check['operator'] == 'lte' and result > check['threshold']:
            failed_checks.append(f"{check['name']} : {result} > {check['threshold']}")
        elif check['operator'] == 'gte' and result < check['threshold']:
            failed_checks.append(f"{check['name']} : {result} < {check['threshold']}")
        logging.info(f"Résultat de {check['name']} : {result}")

    if failed_checks:
        msg = "Échec des contrôles qualité :\n" + "\n".join(failed_checks)
        logging.error(msg)
        raise ValueError(msg)

    logging.info("Tous les contrôles qualité sont passés")
    return True

# Notification en fin de process
def send_success_notification(**context):
    execution_date = context['execution_date']
    dag_id = context['dag'].dag_id
    message = f"Le pipeline ETL {dag_id} a terminé avec succès pour la date {execution_date}"
    logging.info(f"NOTIFICATION DE SUCCÈS : {message}")
    return message

# Définition des tâches
wait_for_data_sync = ExternalTaskSensor(
    task_id='wait_for_data_sync',
    external_dag_id='daily_data_sync',
    external_task_id='complete_sync',
    timeout=3600,
    mode='reschedule',
    dag=dag
)

create_tables = PostgresOperator(
    task_id='create_tables',
    postgres_conn_id='postgres_dwh',
    sql="""CREATE TABLE IF NOT EXISTS dim_customers (...); CREATE TABLE IF NOT EXISTS fact_orders (...);""",
    dag=dag
)

extract_customer_task = PythonOperator(
    task_id='extract_customer_data',
    python_callable=extract_customer_data,
    provide_context=True,
    dag=dag
)

extract_order_task = PythonOperator(
    task_id='extract_order_data',
    python_callable=extract_order_data,
    provide_context=True,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    provide_context=True,
    dag=dag
)

load_customer_task = PythonOperator(
    task_id='load_customer_data',
    python_callable=load_customer_data,
    provide_context=True,
    dag=dag
)

load_order_task = PythonOperator(
    task_id='load_order_data',
    python_callable=load_order_data,
    provide_context=True,
    dag=dag
)

data_quality_task = PythonOperator(
    task_id='run_data_quality_checks',
    python_callable=run_data_quality_checks,
    provide_context=True,
    dag=dag
)

cleanup_task = BashOperator(
    task_id='cleanup_temp_files',
    bash_command='rm -f /tmp/customer_extract_*.csv /tmp/order_extract_*.csv /tmp/transformed_*.csv',
    dag=dag
)

notification_task = PythonOperator(
    task_id='send_success_notification',
    python_callable=send_success_notification,
    provide_context=True,
    dag=dag
)

# Dépendances entre les tâches
wait_for_data_sync >> create_tables
create_tables >> [extract_customer_task, extract_order_task]
[extract_customer_task, extract_order_task] >> transform_task
transform_task >> [load_customer_task, load_order_task]
[load_customer_task, load_order_task] >> data_quality_task
data_quality_task >> cleanup_task >> notification_task
