# üü£ ETL avec Python

**Badge:** üü£ Expert | ‚è± 60 min | üîë **Concepts cl√©s :** pipeline ETL, extract, transform, load, bonnes pratiques

## Objectifs

- Comprendre l'architecture d'un pipeline ETL
- Impl√©menter les 3 phases : Extract, Transform, Load
- Valider les donn√©es avec Pydantic
- G√©rer les erreurs et le logging
- Appliquer les bonnes pratiques (idempotence, configuration)
- Construire un pipeline ETL complet

## Pr√©requis

- Pandas, Pydantic, requests
- Bases de donn√©es (SQLite, DuckDB)
- Formats de donn√©es (CSV, Parquet)
- Logging Python

## 1. ETL : Extract, Transform, Load

### Qu'est-ce qu'un ETL ?

Un **pipeline ETL** extrait des donn√©es depuis des sources, les transforme, puis les charge dans une destination.

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê     ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ Extract ‚îÇ --> ‚îÇ Transform ‚îÇ --> ‚îÇ Load ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò     ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
    ‚îÇ               ‚îÇ                 ‚îÇ
   API           Validate          Database
   CSV           Clean             Parquet
   DB            Enrich            S3
```

### Extract (Extraction)
- Lire depuis des sources : API, fichiers, bases de donn√©es
- G√©rer les erreurs de connexion
- Pagination pour les APIs

### Transform (Transformation)
- Nettoyer : valeurs manquantes, doublons, formats
- Valider : types, contraintes (Pydantic)
- Enrichir : jointures, calculs, agr√©gations

### Load (Chargement)
- √âcrire dans la destination : database, data lake, warehouse
- Gestion des conflits (upsert)
- Partitionnement

### ELT vs ETL

**ETL** : Transform AVANT Load (sur le serveur local)  
**ELT** : Load PUIS Transform (dans le data warehouse)  

ELT est devenu populaire avec les data warehouses modernes (BigQuery, Snowflake) car ils ont la puissance de calcul.

## 2. Setup : Configuration et logging

In [None]:
import pandas as pd
import numpy as np
import requests
import logging
from pathlib import Path
from typing import List, Optional
from datetime import datetime, timedelta
from pydantic import BaseModel, Field, field_validator, ValidationError
from pydantic_settings import BaseSettings
import sqlite3
import json

# Configuration du logging structur√©
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

logger = logging.getLogger('ETL')
logger.info("Logger ETL initialis√©")

In [None]:
# Configuration avec Pydantic Settings
class ETLConfig(BaseSettings):
    """Configuration du pipeline ETL"""
    
    # Sources
    source_csv_path: str = "etl_data/sales_raw.csv"
    source_api_url: str = "https://api.example.com/products"
    
    # Destinations
    output_parquet_path: str = "etl_data/sales_clean.parquet"
    output_db_path: str = "etl_data/analytics.db"
    
    # Param√®tres
    batch_size: int = 1000
    max_retries: int = 3
    
    # Dead letter queue (erreurs)
    error_log_path: str = "etl_data/errors.jsonl"
    
    class Config:
        env_prefix = "ETL_"

config = ETLConfig()
logger.info(f"Configuration charg√©e : {config.model_dump()}")

## 3. Extract : Extraction depuis sources multiples

In [None]:
# Cr√©er des donn√©es sources
Path('etl_data').mkdir(exist_ok=True)

# Source 1 : CSV avec donn√©es brutes (parfois incorrectes)
np.random.seed(42)
df_raw = pd.DataFrame({
    'order_id': range(1, 1001),
    'order_date': pd.date_range('2024-01-01', periods=1000, freq='H'),
    'customer_id': np.random.randint(1, 200, 1000),
    'product_id': np.random.randint(1, 50, 1000),
    'quantity': np.random.randint(-2, 10, 1000),  # Certaines n√©gatives !
    'unit_price': np.random.uniform(-10, 2000, 1000),  # Certaines n√©gatives !
    'status': np.random.choice(['completed', 'pending', 'cancelled', 'invalid', None], 1000)
})

df_raw.to_csv(config.source_csv_path, index=False)
logger.info(f"Donn√©es source cr√©√©es : {config.source_csv_path}")

In [None]:
def extract_from_csv(file_path: str) -> pd.DataFrame:
    """Extrait les donn√©es d'un fichier CSV"""
    logger.info(f"Extract: Lecture CSV {file_path}")
    
    try:
        df = pd.read_csv(file_path, parse_dates=['order_date'])
        logger.info(f"Extract: {len(df)} lignes extraites")
        return df
    except Exception as e:
        logger.error(f"Extract: Erreur lecture CSV - {e}")
        raise

# Extraction
df_extracted = extract_from_csv(config.source_csv_path)
print("Aper√ßu des donn√©es extraites :")
print(df_extracted.head())
print(f"\nTypes : \n{df_extracted.dtypes}")

In [None]:
def extract_from_api(api_url: str, max_retries: int = 3) -> List[dict]:
    """Extrait les donn√©es d'une API avec retry"""
    logger.info(f"Extract: Appel API {api_url}")
    
    # Simulation d'une API (en r√©alit√© on utiliserait requests.get)
    # for attempt in range(max_retries):
    #     try:
    #         response = requests.get(api_url, timeout=10)
    #         response.raise_for_status()
    #         data = response.json()
    #         logger.info(f"Extract: {len(data)} enregistrements de l'API")
    #         return data
    #     except requests.RequestException as e:
    #         logger.warning(f"Extract: Tentative {attempt+1}/{max_retries} √©chou√©e - {e}")
    #         if attempt == max_retries - 1:
    #             raise
    #         time.sleep(2 ** attempt)  # Exponential backoff
    
    # Donn√©es simul√©es
    products = [
        {'product_id': i, 'product_name': f'Product_{i}', 'category': np.random.choice(['A', 'B', 'C'])}
        for i in range(1, 51)
    ]
    logger.info(f"Extract: {len(products)} produits simul√©s")
    return products

# Extraction API
products_data = extract_from_api(config.source_api_url)
df_products = pd.DataFrame(products_data)
print("\nProduits extraits de l'API :")
print(df_products.head())

## 4. Transform : Validation avec Pydantic

In [None]:
# Mod√®les Pydantic pour validation
class OrderSchema(BaseModel):
    """Sch√©ma de validation pour une commande"""
    order_id: int = Field(gt=0, description="ID de commande positif")
    order_date: datetime
    customer_id: int = Field(gt=0)
    product_id: int = Field(gt=0)
    quantity: int = Field(gt=0, description="Quantit√© strictement positive")
    unit_price: float = Field(gt=0, description="Prix strictement positif")
    status: str = Field(pattern=r'^(completed|pending|cancelled)$')
    
    @field_validator('order_date')
    @classmethod
    def validate_date(cls, v):
        # Date ne doit pas √™tre dans le futur
        if v > datetime.now():
            raise ValueError("Date dans le futur")
        # Date pas trop ancienne (> 5 ans)
        if v < datetime.now() - timedelta(days=365*5):
            raise ValueError("Date trop ancienne")
        return v

class ProductSchema(BaseModel):
    product_id: int = Field(gt=0)
    product_name: str = Field(min_length=1)
    category: str

logger.info("Sch√©mas de validation d√©finis")

In [None]:
def transform_validate_orders(df: pd.DataFrame) -> tuple[pd.DataFrame, List[dict]]:
    """Valide et nettoie les commandes"""
    logger.info(f"Transform: Validation de {len(df)} commandes")
    
    valid_records = []
    invalid_records = []
    
    for idx, row in df.iterrows():
        try:
            # Valider avec Pydantic
            validated = OrderSchema(**row.to_dict())
            valid_records.append(validated.model_dump())
        except ValidationError as e:
            # Enregistrer l'erreur
            error = {
                'row_index': int(idx),
                'data': row.to_dict(),
                'errors': e.errors(),
                'timestamp': datetime.now().isoformat()
            }
            invalid_records.append(error)
            logger.warning(f"Transform: Ligne {idx} invalide - {e.error_count()} erreurs")
    
    df_valid = pd.DataFrame(valid_records)
    logger.info(f"Transform: {len(df_valid)} valides, {len(invalid_records)} invalides")
    
    return df_valid, invalid_records

# Transformation et validation
df_valid, errors = transform_validate_orders(df_extracted)

print(f"\n‚úì Validation termin√©e :")
print(f"  Valides : {len(df_valid)}")
print(f"  Invalides : {len(errors)}")

if errors:
    print(f"\nPremi√®re erreur (exemple) :")
    print(json.dumps(errors[0], indent=2, default=str))

## 5. Transform : Enrichissement

In [None]:
def transform_enrich(df_orders: pd.DataFrame, df_products: pd.DataFrame) -> pd.DataFrame:
    """Enrichit les commandes avec les informations produits"""
    logger.info(f"Transform: Enrichissement de {len(df_orders)} commandes")
    
    # Jointure avec produits
    df_enriched = df_orders.merge(
        df_products,
        on='product_id',
        how='left'
    )
    
    # Calculs
    df_enriched['total_amount'] = (df_enriched['quantity'] * df_enriched['unit_price']).round(2)
    
    # Colonnes d√©riv√©es
    df_enriched['year'] = df_enriched['order_date'].dt.year
    df_enriched['month'] = df_enriched['order_date'].dt.month
    df_enriched['day_of_week'] = df_enriched['order_date'].dt.day_name()
    df_enriched['is_weekend'] = df_enriched['order_date'].dt.dayofweek >= 5
    
    # Cat√©gories de montant
    df_enriched['amount_category'] = pd.cut(
        df_enriched['total_amount'],
        bins=[0, 100, 500, 1000, float('inf')],
        labels=['Small', 'Medium', 'Large', 'XLarge']
    )
    
    logger.info(f"Transform: Enrichissement termin√© - {len(df_enriched)} lignes")
    return df_enriched

# Enrichissement
df_enriched = transform_enrich(df_valid, df_products)

print("\n‚úì Donn√©es enrichies :")
print(df_enriched.head())
print(f"\nColonnes : {list(df_enriched.columns)}")

## 6. Load : Chargement vers destinations multiples

In [None]:
def load_to_parquet(df: pd.DataFrame, output_path: str, partition_cols: Optional[List[str]] = None):
    """Charge les donn√©es dans un fichier Parquet"""
    logger.info(f"Load: √âcriture Parquet {output_path}")
    
    try:
        if partition_cols:
            df.to_parquet(
                output_path,
                partition_cols=partition_cols,
                index=False,
                compression='snappy'
            )
            logger.info(f"Load: Parquet partitionn√© par {partition_cols}")
        else:
            df.to_parquet(
                output_path,
                index=False,
                compression='snappy'
            )
        
        file_size = Path(output_path).stat().st_size / 1024**2 if not partition_cols else 0
        logger.info(f"Load: {len(df)} lignes √©crites ({file_size:.2f} MB)")
        
    except Exception as e:
        logger.error(f"Load: Erreur √©criture Parquet - {e}")
        raise

# Chargement Parquet
load_to_parquet(df_enriched, config.output_parquet_path)
print(f"\n‚úì Donn√©es charg√©es : {config.output_parquet_path}")

In [None]:
def load_to_database(df: pd.DataFrame, db_path: str, table_name: str, if_exists: str = 'replace'):
    """Charge les donn√©es dans une base SQLite"""
    logger.info(f"Load: √âcriture database {db_path} table {table_name}")
    
    try:
        with sqlite3.connect(db_path) as conn:
            df.to_sql(
                name=table_name,
                con=conn,
                if_exists=if_exists,
                index=False
            )
            
            # V√©rifier
            cursor = conn.cursor()
            cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
            count = cursor.fetchone()[0]
            
            logger.info(f"Load: {count} lignes dans {table_name}")
            
    except Exception as e:
        logger.error(f"Load: Erreur √©criture database - {e}")
        raise

# Chargement database
load_to_database(df_enriched, config.output_db_path, 'orders_clean')
print(f"‚úì Donn√©es charg√©es : {config.output_db_path}")

In [None]:
def load_errors_to_dlq(errors: List[dict], error_log_path: str):
    """Enregistre les erreurs dans une Dead Letter Queue (JSONL)"""
    logger.info(f"Load: √âcriture erreurs {error_log_path}")
    
    try:
        with open(error_log_path, 'a') as f:
            for error in errors:
                f.write(json.dumps(error, default=str) + '\n')
        
        logger.info(f"Load: {len(errors)} erreurs enregistr√©es")
        
    except Exception as e:
        logger.error(f"Load: Erreur √©criture DLQ - {e}")

# Dead Letter Queue
if errors:
    load_errors_to_dlq(errors, config.error_log_path)
    print(f"‚úì Erreurs enregistr√©es : {config.error_log_path}")

## 7. Pipeline ETL complet : Orchestration

In [None]:
from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class ETLMetrics:
    """M√©triques du pipeline ETL"""
    start_time: datetime
    end_time: datetime
    records_extracted: int
    records_valid: int
    records_invalid: int
    records_loaded: int
    
    @property
    def duration_seconds(self) -> float:
        return (self.end_time - self.start_time).total_seconds()
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'start_time': self.start_time.isoformat(),
            'end_time': self.end_time.isoformat(),
            'duration_seconds': self.duration_seconds,
            'records_extracted': self.records_extracted,
            'records_valid': self.records_valid,
            'records_invalid': self.records_invalid,
            'records_loaded': self.records_loaded,
            'success_rate': round(self.records_valid / self.records_extracted * 100, 2)
        }

class ETLPipeline:
    """Pipeline ETL complet"""
    
    def __init__(self, config: ETLConfig):
        self.config = config
        self.logger = logging.getLogger('ETLPipeline')
    
    def run(self) -> ETLMetrics:
        """Ex√©cute le pipeline ETL complet"""
        start_time = datetime.now()
        self.logger.info("="*70)
        self.logger.info("D√©marrage du pipeline ETL")
        self.logger.info("="*70)
        
        try:
            # 1. EXTRACT
            self.logger.info("PHASE 1: EXTRACT")
            df_orders = extract_from_csv(self.config.source_csv_path)
            products_data = extract_from_api(self.config.source_api_url)
            df_products = pd.DataFrame(products_data)
            records_extracted = len(df_orders)
            
            # 2. TRANSFORM
            self.logger.info("PHASE 2: TRANSFORM")
            df_valid, errors = transform_validate_orders(df_orders)
            df_enriched = transform_enrich(df_valid, df_products)
            
            # 3. LOAD
            self.logger.info("PHASE 3: LOAD")
            load_to_parquet(df_enriched, self.config.output_parquet_path)
            load_to_database(df_enriched, self.config.output_db_path, 'orders_clean')
            
            if errors:
                load_errors_to_dlq(errors, self.config.error_log_path)
            
            # M√©triques
            end_time = datetime.now()
            metrics = ETLMetrics(
                start_time=start_time,
                end_time=end_time,
                records_extracted=records_extracted,
                records_valid=len(df_valid),
                records_invalid=len(errors),
                records_loaded=len(df_enriched)
            )
            
            self.logger.info("="*70)
            self.logger.info("Pipeline ETL termin√© avec succ√®s")
            self.logger.info(f"Dur√©e : {metrics.duration_seconds:.2f}s")
            self.logger.info("="*70)
            
            return metrics
            
        except Exception as e:
            self.logger.error(f"Pipeline ETL √©chou√© : {e}", exc_info=True)
            raise

# Ex√©cuter le pipeline
pipeline = ETLPipeline(config)
metrics = pipeline.run()

print("\n" + "="*70)
print("M√âTRIQUES DU PIPELINE")
print("="*70)
for key, value in metrics.to_dict().items():
    print(f"  {key}: {value}")

## 8. Bonnes pratiques

### Idempotence

Un pipeline **idempotent** produit le m√™me r√©sultat s'il est ex√©cut√© plusieurs fois.

In [None]:
def idempotent_load(df: pd.DataFrame, db_path: str, table_name: str, unique_key: str):
    """Charge avec upsert (idempotent)"""
    logger.info(f"Load idempotent: {table_name} avec cl√© {unique_key}")
    
    with sqlite3.connect(db_path) as conn:
        # Cr√©er table si inexistante
        df.head(0).to_sql(table_name, conn, if_exists='append', index=False)
        
        # Supprimer les lignes existantes
        existing_ids = tuple(df[unique_key].tolist())
        if existing_ids:
            placeholders = ','.join(['?'] * len(existing_ids))
            conn.execute(f"DELETE FROM {table_name} WHERE {unique_key} IN ({placeholders})", existing_ids)
        
        # Ins√©rer nouvelles lignes
        df.to_sql(table_name, conn, if_exists='append', index=False)
        
        logger.info(f"Load: {len(df)} lignes upserted")

# Test idempotence
idempotent_load(df_enriched, config.output_db_path, 'orders_idempotent', 'order_id')
print("‚úì Load idempotent (peut √™tre rejou√© sans doublon)")

### Gestion d'erreurs avanc√©e

In [None]:
from typing import Callable
import time

def retry_with_backoff(func: Callable, max_retries: int = 3, backoff_factor: int = 2):
    """Retry avec exponential backoff"""
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                logger.error(f"√âchec apr√®s {max_retries} tentatives")
                raise
            
            wait_time = backoff_factor ** attempt
            logger.warning(f"Tentative {attempt+1}/{max_retries} √©chou√©e. Retry dans {wait_time}s")
            time.sleep(wait_time)

# Exemple d'utilisation
def unreliable_operation():
    # Simulation op√©ration qui √©choue parfois
    import random
    if random.random() < 0.3:  # 30% √©chec
        raise Exception("Erreur temporaire")
    return "Success"

# result = retry_with_backoff(unreliable_operation)
print("‚úì Retry avec exponential backoff impl√©ment√©")

### Tests unitaires (exemple)

In [None]:
# Exemple de test pour les transformations
def test_transform_enrich():
    """Test de la fonction d'enrichissement"""
    # Donn√©es de test
    df_orders_test = pd.DataFrame({
        'order_id': [1, 2],
        'order_date': [datetime(2024, 1, 1), datetime(2024, 1, 2)],
        'customer_id': [1, 2],
        'product_id': [1, 2],
        'quantity': [1, 2],
        'unit_price': [100.0, 200.0],
        'status': ['completed', 'completed']
    })
    
    df_products_test = pd.DataFrame({
        'product_id': [1, 2],
        'product_name': ['Product_1', 'Product_2'],
        'category': ['A', 'B']
    })
    
    # Transformation
    result = transform_enrich(df_orders_test, df_products_test)
    
    # Assertions
    assert len(result) == 2, "Devrait retourner 2 lignes"
    assert 'total_amount' in result.columns, "Colonne total_amount manquante"
    assert result['total_amount'].iloc[0] == 100.0, "Calcul incorrect"
    assert result['total_amount'].iloc[1] == 400.0, "Calcul incorrect"
    assert 'product_name' in result.columns, "Jointure √©chou√©e"
    
    print("‚úì Test transform_enrich passed")

test_transform_enrich()
print("\nüí° Tests unitaires essentiels pour la fiabilit√© du pipeline")

## 9. Orchestration : Vers Airflow / Prefect

Pour des pipelines production, utilisez un orchestrateur.

### Apache Airflow
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

with DAG(
    'etl_pipeline',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    catchup=False
) as dag:
    
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract_from_csv
    )
    
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform_validate_orders
    )
    
    load_task = PythonOperator(
        task_id='load',
        python_callable=load_to_database
    )
    
    extract_task >> transform_task >> load_task
```

### Prefect (moderne)
```python
from prefect import flow, task

@task
def extract():
    return extract_from_csv('data.csv')

@task
def transform(df):
    return transform_validate_orders(df)

@task
def load(df):
    load_to_database(df, 'db.db', 'table')

@flow
def etl_pipeline():
    df = extract()
    df_clean, errors = transform(df)
    load(df_clean)
```

## Pi√®ges courants

### 1. Pas de gestion d'erreurs

In [None]:
# ‚ùå Pipeline sans gestion d'erreurs
# df = pd.read_csv('file.csv')  # Crash si fichier absent
# df.to_sql('table', conn)  # Crash si DB inaccessible

# ‚úÖ Toujours g√©rer les erreurs
try:
    df = pd.read_csv('file.csv')
except FileNotFoundError:
    logger.error("Fichier introuvable")
    # Fallback ou notification

print("‚úì G√©rez TOUTES les erreurs possibles")

### 2. Pas de logging

In [None]:
# ‚ùå Pas de logging = impossible de d√©bugger
# df = transform(df)

# ‚úÖ Logging √† chaque √©tape
logger.info(f"Transform: {len(df)} lignes avant")
# df = transform(df)
logger.info(f"Transform: {len(df)} lignes apr√®s")

print("‚úì Le logging est votre meilleur ami")

### 3. Couplage fort

In [None]:
# ‚ùå Couplage fort : code difficilement testable
# def etl():
#     df = pd.read_csv('hardcoded.csv')
#     df.to_sql('hardcoded_table', hardcoded_conn)

# ‚úÖ Injection de d√©pendances
def etl(source: str, destination: str, conn):
    df = pd.read_csv(source)
    df.to_sql(destination, conn)

print("‚úì Utilisez la configuration et l'injection de d√©pendances")

## Mini-exercices

### Exercice 1 : Pipeline avec logging

Cr√©ez un mini pipeline qui :  
1. Lit un CSV avec 100 lignes  
2. Filtre les lignes o√π une colonne > seuil  
3. √âcrit le r√©sultat en Parquet  
4. Log chaque √©tape avec le nombre de lignes

In [None]:
# Votre code ici


### Exercice 2 : Validation avec Pydantic

1. Cr√©ez un sch√©ma Pydantic pour un utilisateur (nom, email, age)  
2. Cr√©ez un DataFrame avec des donn√©es valides et invalides  
3. Validez et s√©parez valides/invalides  
4. Enregistrez les erreurs en JSONL

In [None]:
# Votre code ici


### Exercice 3 : Pipeline ETL complet fil rouge

Cr√©ez un pipeline ETL complet :  
1. Extract : G√©n√©rez 1000 ventes e-commerce  
2. Transform : Validez avec Pydantic, enrichissez avec cat√©gories  
3. Load : Sauvez en Parquet partitionn√© par mois + SQLite  
4. M√©triques : Affichez dur√©e, taux de succ√®s

In [None]:
# Votre code ici


## Solutions des exercices

In [None]:
# Solution Exercice 1
logger_ex1 = logging.getLogger('Exercise1')

# 1. Cr√©er CSV
df_ex1 = pd.DataFrame({
    'id': range(1, 101),
    'value': np.random.randint(1, 100, 100)
})
df_ex1.to_csv('etl_data/ex1_input.csv', index=False)
logger_ex1.info(f"CSV cr√©√© : {len(df_ex1)} lignes")

# 2. Lire
df_read = pd.read_csv('etl_data/ex1_input.csv')
logger_ex1.info(f"CSV lu : {len(df_read)} lignes")

# 3. Filtrer
threshold = 50
df_filtered = df_read[df_read['value'] > threshold]
logger_ex1.info(f"Filtrage value > {threshold} : {len(df_filtered)} lignes restantes")

# 4. √âcrire Parquet
df_filtered.to_parquet('etl_data/ex1_output.parquet', index=False)
logger_ex1.info(f"Parquet √©crit : {len(df_filtered)} lignes")

print("\n‚úì Exercice 1 termin√©")

In [None]:
# Solution Exercice 2
from pydantic import EmailStr

class UserSchema(BaseModel):
    name: str = Field(min_length=1)
    email: EmailStr
    age: int = Field(ge=18, le=120)

# Donn√©es mixtes
df_users = pd.DataFrame({
    'name': ['Alice', 'Bob', '', 'Diana', 'Eve'],
    'email': ['alice@test.com', 'invalid-email', 'bob@test.com', 'diana@test.com', 'eve@test.com'],
    'age': [25, 30, 17, 150, 22]
})

valid_users = []
invalid_users = []

for idx, row in df_users.iterrows():
    try:
        validated = UserSchema(**row.to_dict())
        valid_users.append(validated.model_dump())
    except ValidationError as e:
        invalid_users.append({
            'row': int(idx),
            'data': row.to_dict(),
            'errors': [err['msg'] for err in e.errors()]
        })

# Sauver erreurs
with open('etl_data/ex2_errors.jsonl', 'w') as f:
    for error in invalid_users:
        f.write(json.dumps(error) + '\n')

print(f"\n‚úì Exercice 2 termin√© :")
print(f"  Valides : {len(valid_users)}")
print(f"  Invalides : {len(invalid_users)}")

In [None]:
# Solution Exercice 3 (pipeline complet)
class SaleSchema(BaseModel):
    sale_id: int = Field(gt=0)
    date: datetime
    product: str
    amount: float = Field(gt=0)

# 1. Extract
df_sales_ex3 = pd.DataFrame({
    'sale_id': range(1, 1001),
    'date': pd.date_range('2024-01-01', periods=1000, freq='6h'),
    'product': np.random.choice(['A', 'B', 'C'], 1000),
    'amount': np.random.uniform(-10, 500, 1000)  # Certaines n√©gatives
})

logger.info(f"Extract: {len(df_sales_ex3)} ventes g√©n√©r√©es")

# 2. Transform
valid_sales = []
invalid_sales = []

for idx, row in df_sales_ex3.iterrows():
    try:
        validated = SaleSchema(**row.to_dict())
        valid_sales.append(validated.model_dump())
    except ValidationError:
        invalid_sales.append(row.to_dict())

df_valid_ex3 = pd.DataFrame(valid_sales)
df_valid_ex3['month'] = df_valid_ex3['date'].dt.month
df_valid_ex3['category'] = df_valid_ex3['product'].map({'A': 'Cat1', 'B': 'Cat2', 'C': 'Cat3'})

logger.info(f"Transform: {len(df_valid_ex3)} valides, {len(invalid_sales)} invalides")

# 3. Load
df_valid_ex3.to_parquet(
    'etl_data/sales_ex3_partitioned',
    partition_cols=['month'],
    index=False
)
logger.info("Load: Parquet partitionn√© √©crit")

with sqlite3.connect('etl_data/ex3.db') as conn:
    df_valid_ex3.to_sql('sales', conn, if_exists='replace', index=False)
logger.info("Load: SQLite √©crit")

# 4. M√©triques
success_rate = len(df_valid_ex3) / len(df_sales_ex3) * 100
print(f"\n‚úì Exercice 3 termin√© :")
print(f"  Extraites : {len(df_sales_ex3)}")
print(f"  Valides : {len(df_valid_ex3)}")
print(f"  Taux succ√®s : {success_rate:.1f}%")

## R√©sum√©

### Points cl√©s

1. **ETL** = Extract (sources) ‚Üí Transform (validation, enrichissement) ‚Üí Load (destinations)
2. **Pydantic** : validation de sch√©ma essentielle pour la qualit√© des donn√©es
3. **Logging** : indispensable pour le debugging et le monitoring
4. **Gestion d'erreurs** : Dead Letter Queue pour les donn√©es invalides
5. **Idempotence** : pipeline peut √™tre rejou√© sans doublon
6. **Configuration** : externalisez avec Pydantic Settings
7. **M√©triques** : trackez dur√©e, taux de succ√®s, volum√©trie
8. **Orchestration** : Airflow, Prefect, Dagster pour production

### Checklist pipeline production

- [ ] Logging structur√© √† chaque √©tape
- [ ] Validation des donn√©es (Pydantic)
- [ ] Gestion d'erreurs compl√®te
- [ ] Dead Letter Queue pour donn√©es invalides
- [ ] Idempotence (upsert)
- [ ] Configuration externalis√©e
- [ ] Tests unitaires
- [ ] M√©triques (dur√©e, success rate)
- [ ] Monitoring et alertes
- [ ] Documentation

### Prochaines √©tapes

- Cheatsheet : R√©capitulatif de tous les concepts
- Approfondir : Airflow, dbt, Great Expectations