

# Pipeline ETL Automatisé: CSV et API vers MySQL

Ce notebook implémente un pipeline ETL complet qui:
1. Extrait des données depuis deux fichiers CSV (orders.csv et details.csv)
2. Extrait des données supplémentaires depuis une API publique
3. Transforme et nettoie les données
4. Charge les données dans une base de données MySQL
5. Automatise le processus avec des fonctions réutilisables


### Configuration initiale

Ici, je prépare le terrain : 

j’importe les bibliothèques, je récupère les infos de connexion depuis un fichier *.env*, et je vérifie que je peux bien 
me connecter à ma base de données MySQL. Si tout se passe bien, un petit message de succès s’affiche. Sinon, j’ai une
erreur claire pour savoir ce qui cloche.

In [75]:
# !pip install pymysql pandas sqlalchemy python-dotenv requests
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, exc
import requests
from dotenv import load_dotenv
import os
from datetime import datetime

# Chargement des variables d'environnement
load_dotenv('config/.env')

# Récupération des infos de connexion MySQL
user = os.getenv('MYSQL_USER', 'root')
password = os.getenv('MYSQL_PASSWORD', '')
host = os.getenv('MYSQL_HOST', 'localhost')
port = os.getenv('MYSQL_PORT', '3306')
db = os.getenv('MYSQL_DB', 'etl_project')

# Création de la connexion
engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}:{port}/{db}')

# Test de connexion
try:
    with engine.connect() as conn:
        print("Connexion à MySQL réussie.")
except exc.SQLAlchemyError as e:
    print(f"Erreur de connexion : {e}")


Connexion à MySQL réussie.


#### Remarque

Il faut savoir que avant que ce code ne marche, il vous faut avoir créer votre base de donnée et le nommer **etl_project** avec Mysql.

### D'abord c’est quoi un processus ETL ?

**ETL** signifie **Extract – Transform – Load**.

C’est un processus classique utilisé en data engineering pour manipuler des données :

***Extract*** : on récupère les données depuis une source (fichier CSV, base de données, API, etc.).

***Transform***: on nettoie, filtre, reformate ou enrichit les données pour qu’elles soient exploitables.

***Load*** : on charge les données transformées dans une base de données ou un entrepôt de données pour analyse ou visualisation.

### Extraction des données CSV

On commence par extraire les fichiers CSV contenant les commandes et leurs détails, puis j’affiche un aperçu du contenu.

In [81]:
def extract_csv_data(file_path):
    """Lecture d'un fichier CSV et retour d'un DataFrame"""
    try:
        df = pd.read_csv(file_path)
        print(f"Données extraites depuis {file_path}")
        return df
    except Exception as e:
        print(f"Erreur lors de l'extraction : {e}")
        return None
# Extraction des données CSV
orders_df = extract_csv_data('data/orders.csv')
details_df = extract_csv_data('data/details.csv')
# Aperçus rapides
print("\nCommandes :")
display(orders_df.head())
print("\nDétails :")
display(details_df.head())

Données extraites depuis data/orders.csv
Données extraites depuis data/details.csv

Commandes :


Unnamed: 0,Order ID,Order Date,CustomerName,State,City
0,B-26055,10-03-2018,Harivansh,Uttar Pradesh,Mathura
1,B-25993,03-02-2018,Madhav,Delhi,Delhi
2,B-25973,24-01-2018,Madan Mohan,Uttar Pradesh,Mathura
3,B-25923,27-12-2018,Gopal,Maharashtra,Mumbai
4,B-25757,21-08-2018,Vishakha,Madhya Pradesh,Indore



Détails :


Unnamed: 0,Order ID,Amount,Profit,Quantity,Category,Sub-Category,PaymentMode
0,B-25681,1096,658,7,Electronics,Electronic Games,COD
1,B-26055,5729,64,14,Furniture,Chairs,EMI
2,B-25955,2927,146,8,Furniture,Bookcases,EMI
3,B-26093,2847,712,8,Electronics,Printers,Credit Card
4,B-25602,2617,1151,4,Electronics,Phones,Credit Card


### Extraction des données depuis une API

In [83]:
def extract_api_data(limit=None):
    """Extrait des données de base sur les pays depuis l'API REST Countries"""
    url = "https://restcountries.com/v3.1/all"
    
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()

        countries_data = []
        for country in data[:limit]:  # On limite si besoin
            name = country.get('name', {}).get('common', 'Inconnu')
            region = country.get('region', 'Inconnu')
            subregion = country.get('subregion', 'Inconnu')
            population = country.get('population', 0)
            currency_keys = list(country.get('currencies', {}).keys())
            currency = currency_keys[0] if currency_keys else 'Inconnu'
            timezones = ', '.join(country.get('timezones', [])) if country.get('timezones') else 'Inconnu'

            countries_data.append({
                'country_name': name,
                'region': region,
                'subregion': subregion,
                'population': population,
                'currency': currency,
                'timezones': timezones
            })

        df = pd.DataFrame(countries_data)
        print(f"{len(df)} pays extraits depuis l'API.")
        return df

    except requests.exceptions.RequestException as e:
        print(f"Erreur réseau ou API : {e}")
    except Exception as e:
        print(f"Erreur inattendue : {e}")
    
    return pd.DataFrame()  # Retourne un DataFrame vide si erreur
# test Extraction des données
countries_df = extract_api_data(limit=10)  
print("\nAperçu des données de l'API :")
display(countries_df.head())


10 pays extraits depuis l'API.

Aperçu des données de l'API :


Unnamed: 0,country_name,region,subregion,population,currency,timezones
0,Eritrea,Africa,Eastern Africa,5352000,ERN,UTC+03:00
1,Cameroon,Africa,Middle Africa,26545864,XAF,UTC+01:00
2,Montenegro,Europe,Southeast Europe,621718,EUR,UTC+01:00
3,Fiji,Oceania,Melanesia,896444,FJD,UTC+12:00
4,Tunisia,Africa,Northern Africa,11818618,TND,UTC+01:00


### Transformation des données

Dans notre logique ETL (Extract – Transform – Load), cette étape correspond à la transformation : on prend les données brutes, on les fusionne, on nettoie, on enrichit avec des infos externes (ici, les pays), et on ajoute des indicateurs utiles pour l’analyse.

In [88]:
def transform_data(orders_df, details_df, countries_df):
    """
    Fusionne et prépare les données pour l'analyse :
    - jointure commandes + détails
    - nettoyage et enrichissement
    - indicateurs utiles ajoutés
    """
    try:
        # On commence par fusionner les commandes et les détails
        merged_df = pd.merge(orders_df, details_df, on='Order ID', how='inner')
        
        # Conversion des dates : on transforme le champ texte en datetime exploitable
        merged_df['Order Date'] = pd.to_datetime(merged_df['Order Date'], errors='coerce')
        
        # Calcul du pourcentage de profit
        merged_df['Profit Margin'] = (merged_df['Profit'] / merged_df['Amount'] * 100).round(2)
        
        # Marquage des commandes "importantes" selon le montant
        seuil = merged_df['Amount'].quantile(0.75)
        merged_df['Important Order'] = np.where(merged_df['Amount'] > seuil, 'Oui', 'Non')
        
        # Ajout d'une colonne "Année-Mois" pour faciliter les analyses temporelles
        merged_df['Year Month'] = merged_df['Order Date'].dt.to_period('M')
        
        # On enrichit avec les infos pays via une jointure sur le champ "State"
        # (Remarque : dans la vraie vie, on ferait une jointure plus précise avec les bons codes pays)
        merged_df = pd.merge(
            merged_df,
            countries_df[['country_name', 'region', 'subregion']],
            left_on='State',
            right_on='country_name',
            how='left'
        ).drop(columns=['country_name'])  # Pas besoin de garder ce champ ensuite
        
        # On remplit les régions manquantes par défaut
        merged_df['region'] = merged_df['region'].fillna('Inconnu')
        merged_df['subregion'] = merged_df['subregion'].fillna('Inconnu')
        
        print("Données transformées avec succès")
        return merged_df

    except Exception as e:
        print(f"Problème pendant la transformation : {e}")
        return None

# test Transformation des données
transformed_df = transform_data(orders_df, details_df, countries_df)
print("\nAperçu des données finales :")
display(transformed_df.head())


Données transformées avec succès

Aperçu des données finales :


Unnamed: 0,Order ID,Order Date,CustomerName,State,City,Amount,Profit,Quantity,Category,Sub-Category,PaymentMode,Profit Margin,Important Order,Year Month,region,subregion
0,B-26055,2018-10-03,Harivansh,Uttar Pradesh,Mathura,5729,64,14,Furniture,Chairs,EMI,1.12,Oui,2018-10,Inconnu,Inconnu
1,B-26055,2018-10-03,Harivansh,Uttar Pradesh,Mathura,671,114,9,Electronics,Phones,Credit Card,16.99,Oui,2018-10,Inconnu,Inconnu
2,B-26055,2018-10-03,Harivansh,Uttar Pradesh,Mathura,443,11,1,Clothing,Saree,COD,2.48,Oui,2018-10,Inconnu,Inconnu
3,B-26055,2018-10-03,Harivansh,Uttar Pradesh,Mathura,57,7,2,Clothing,Shirt,UPI,12.28,Non,2018-10,Inconnu,Inconnu
4,B-26055,2018-10-03,Harivansh,Uttar Pradesh,Mathura,227,48,5,Clothing,Stole,COD,21.15,Non,2018-10,Inconnu,Inconnu


### Chargement des données dans MySQL

Cette fonction prend un DataFrame, un nom de table, et un moteur SQLAlchemy pour envoyer les données dans une base MySQL.

In [91]:
from sqlalchemy import text
def load_data_to_mysql(df, table_name, engine):
    """Charge les données dans MySQL en gérant automatiquement la création des tables"""
    try:
        with engine.connect() as conn:
            # Vérification si la table existe déjà
            table_exists = conn.execute(
                text(f"SHOW TABLES LIKE '{table_name}'")
            ).fetchone()
            
            if not table_exists:
                # Création de la table basée sur le DataFrame
                print(f"Création de la table '{table_name}'...")
                df.head(0).to_sql(
                    name=table_name,
                    con=engine,
                    index=False,
                    if_exists='fail',  # Échoue si la table existe déjà
                    chunksize=1000
                )
                print(f"Table '{table_name}' créée avec succès")
            else:
                print(f"Table '{table_name}' existe déjà, ajout des données...")
            
            # Chargement des données
            df.to_sql(
                name=table_name,
                con=engine,
                index=False,
                if_exists='append',  # Ajoute aux données existantes
                chunksize=1000,
                method='multi'
            )
            
            print(f"Données chargées avec succès dans '{table_name}'")
            
            # Vérification
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
            count_before = result.fetchone()[0] - len(df)
            count_after = count_before + len(df)
            print(f"Nombre d'enregistrements: {count_before} → {count_after} (+{len(df)})")
            
    except Exception as e:
        print(f"Erreur lors du chargement: {e}")
        raise

##on ajoute un schéma précis selon le nom de la table. Cela donne plus de contrôle sur la structure SQL
def load_data_to_mysql_typed(df, table_name, engine):
    """Charge les données avec un schéma spécifique, sans doublons"""
    try:
        with engine.connect() as conn:
            # Vérification si la table existe
            table_exists = conn.execute(
                text(f"SHOW TABLES LIKE '{table_name}'")
            ).fetchone()

            if not table_exists:
                print(f"Création de la table typée '{table_name}'...")

                if table_name == 'orders_details':
                    create_sql = """
                    CREATE TABLE orders_details (
                        `Order ID` VARCHAR(50) PRIMARY KEY,
                        `Order Date` DATETIME,
                        `CustomerName` VARCHAR(100),
                        `State` VARCHAR(50),
                        `City` VARCHAR(50),
                        `Amount` DECIMAL(10, 2),
                        `Profit` DECIMAL(10, 2),
                        `Quantity` INT,
                        `Category` VARCHAR(50),
                        `Sub-Category` VARCHAR(50),
                        `PaymentMode` VARCHAR(50),
                        `Profit Margin` DECIMAL(5, 2),
                        `Important Order` VARCHAR(3),
                        `Year Month` VARCHAR(7),
                        `region` VARCHAR(50),
                        `subregion` VARCHAR(50)
                    ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
                    """
                elif table_name == 'countries':
                    create_sql = """
                    CREATE TABLE countries (
                        id INT AUTO_INCREMENT PRIMARY KEY,
                        country_name VARCHAR(100),
                        region VARCHAR(50),
                        subregion VARCHAR(50),
                        population BIGINT,
                        currency VARCHAR(10),
                        timezones VARCHAR(50)
                    ) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
                    """
                else:
                    create_sql = None

                if create_sql:
                    conn.execute(text(create_sql))
                    print(f"Table '{table_name}' créée avec le schéma spécifique")
                else:
                    df.head(0).to_sql(
                        name=table_name,
                        con=engine,
                        index=False,
                        if_exists='fail'
                    )
                    print(f"Table '{table_name}' créée avec schéma automatique")

            # Étape clé : suppression des doublons dans df
            if 'Order ID' in df.columns:
                # Lecture des identifiants existants
                existing_ids = pd.read_sql(f"SELECT `Order ID` FROM {table_name}", con=engine)
                before_filter = len(df)
                df = df[~df['Order ID'].isin(existing_ids['Order ID'])]
                after_filter = len(df)
                print(f"{before_filter - after_filter} doublons détectés et ignorés.")
                
                if df.empty:
                    print("Aucune nouvelle donnée à insérer.")
                    return

            # Insertion finale
            df.to_sql(
                name=table_name,
                con=engine,
                index=False,
                if_exists='append',
                chunksize=1000,
                method='multi'
            )
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
            count = result.fetchone()[0]
            print(f"Total d'enregistrements dans '{table_name}': {count}")

    except Exception as e:
        print(f"Erreur lors du chargement: {e}")
        raise


### Automatisation complète du pipeline

Le petit chef d’orchestre suivant va lancer tout le processus ETL, étape par étape :

**1.Démarrage**
Il affiche que le pipeline commence, et il lance un chronomètre pour mesurer combien de temps tout le processus va prendre.

**2.Extraction**
Il va chercher les données brutes :
depuis le fichier orders.csv
depuis le fichier details.csv
et via une API pour récupérer les données des pays

**3.Transformation**
Il nettoie et transforme les données extraites : il fusionne les tableaux, calcule les marges, ajoute des indicateurs utiles, et associe les données aux pays.

**4.Chargement**
Une fois les données prêtes, il les insère dans la base MySQL :
les données transformées vont dans la table orders_details
les pays vont dans la table countries

**5.Rapport final**
Il affiche un message de succès avec le temps total mis pour exécuter le pipeline.

Ce script te permet donc de tout faire d’un coup : extraire, transformer, charger sans devoir lancer chaque étape manuellement.

In [97]:
def run_etl_pipeline():
    """Exécute le pipeline ETL complet"""
    print("Démarrage du pipeline ETL...")
    start_time = datetime.now()
    
    # Extraction
    print("\nÉtape 1: Extraction des données...")
    orders = extract_csv_data('data/orders.csv')
    details = extract_csv_data('data/details.csv')
    countries = extract_api_data()
    
    # Transformation
    print("\nÉtape 2: Transformation des données...")
    transformed_data = transform_data(orders, details, countries)
    
    # Chargement
    print("\nÉtape 3: Chargement des données...")
    if transformed_data is not None:
        load_data_to_mysql(transformed_data, 'orders_details', engine)
    if countries is not None:
        load_data_to_mysql(countries, 'countries', engine)
    
    # Rapport final
    end_time = datetime.now()
    duration = end_time - start_time
    print(f"\nPipeline ETL terminé avec succès en {duration.total_seconds():.2f} secondes")

# Exécution du pipeline complet
run_etl_pipeline()

Démarrage du pipeline ETL...

Étape 1: Extraction des données...
Données extraites depuis data/orders.csv
Données extraites depuis data/details.csv
250 pays extraits depuis l'API.

Étape 2: Transformation des données...
Données transformées avec succès

Étape 3: Chargement des données...
Création de la table 'orders_details'...
Table 'orders_details' créée avec succès
Données chargées avec succès dans 'orders_details'
Nombre d'enregistrements: 0 → 1500 (+1500)
Création de la table 'countries'...
Table 'countries' créée avec succès
Données chargées avec succès dans 'countries'
Nombre d'enregistrements: 0 → 250 (+250)

Pipeline ETL terminé avec succès en 1.44 secondes


### Requêtes SQL de validation

La fonction advanced_validation() exécute des requêtes SQL pour vérifier les données dans les tables orders_details et countries. Elle récupère le nombre total de commandes, le chiffre d'affaires total, le profit total, les 5 catégories les plus rentables, et le nombre de pays enregistrés. Chaque requête est exécutée avec gestion d'erreurs, et les résultats sont affichés proprement à la fin.

In [100]:
def advanced_validation():
    """Validation avancée avec gestion des erreurs par requête"""
    validation_queries = {
        'commandes_total': "SELECT COUNT(*) FROM orders_details",
        'ca_total': "SELECT ROUND(SUM(Amount), 2) FROM orders_details",
        'profit_total': "SELECT ROUND(SUM(Profit), 2) FROM orders_details",
        'top_categories': """
            SELECT Category, ROUND(SUM(Profit), 2) as profit 
            FROM orders_details 
            GROUP BY Category 
            ORDER BY profit DESC 
            LIMIT 5
        """,
        'pays_total': "SELECT COUNT(*) FROM countries"
    }
    
    results = {}
    
    try:
        with engine.connect() as conn:
            print(" VALIDATION AVANCÉE")
            
            for name, query in validation_queries.items():
                try:
                    result = conn.execute(text(query))
                    
                    if name == 'top_categories':
                        results[name] = result.fetchall()
                    else:
                        results[name] = result.fetchone()[0]
                        
                    print(f" {name.replace('_', ' ').title()} - Succès")
                    
                except Exception as e:
                    print(f" {name.replace('_', ' ').title()} - Erreur: {str(e)[:50]}...")
                    results[name] = None
    
    except Exception as e:
        print(f"\nErreur majeure: {e}")
        return None
    # Affichage structuré des résultats
    print("\nRÉSULTATS DE VALIDATION:")
    print(f"- Commandes totales: {results.get('commandes_total', 'N/A')}")
    print(f"- CA total: {results.get('ca_total', 'N/A')} €")
    print(f"- Profit total: {results.get('profit_total', 'N/A')} €")
    print(f"- Pays référencés: {results.get('pays_total', 'N/A')}")
    if results.get('top_categories'):
        print("\nTop Catégories:")
        for cat, profit in results['top_categories']:
            print(f"- {cat}: {profit} €")

# Exécution
advanced_validation()

 VALIDATION AVANCÉE
 Commandes Total - Succès
 Ca Total - Succès
 Profit Total - Succès
 Top Categories - Succès
 Pays Total - Succès

RÉSULTATS DE VALIDATION:
- Commandes totales: 1500
- CA total: 437771.00 €
- Profit total: 36963.00 €
- Pays référencés: 250

Top Catégories:
- Clothing: 13325.00 €
- Electronics: 13162.00 €
- Furniture: 10476.00 €


### Résumé

Ce notebook met en œuvre un pipeline ETL complet qui extrait des données depuis des fichiers CSV et une API, les transforme, puis les charge dans une base de données MySQL. Le chargement est géré intelligemment pour éviter les erreurs de duplication. La création des tables peut se faire automatiquement à partir de la structure du DataFrame ou en définissant un schéma SQL précis. Un module de validation avancée permet ensuite de vérifier les agrégats clés (nombre total de commandes, chiffre d’affaires, bénéfices, etc.) ainsi que des insights comme les catégories les plus rentables.

Ce pipeline est réutilisable pour tout autre jeu de données, à condition d’adapter les noms de colonnes, les règles de transformation, et les requêtes SQL de validation.