In [43]:
import logging
import requests
import pandas as pd
from datetime import datetime, timedelta
import concurrent.futures
import mysql.connector
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring, to_date, concat_ws, lit

# CONFIGURATION DU LOGGING 

In [44]:
logging.basicConfig(
    filename='etl_execution.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

#  ÉTAPE 1 : EXTRACTION 

In [None]:
# Définir les dates AVANT toute utilisation
date_actuelle = datetime.today()
date_delay = date_actuelle - timedelta(days=8)
end_date = date_delay.strftime("%Y%m%d")

start_date_delay = date_delay - timedelta(days=75)
start_date = start_date_delay.strftime("%Y%m%d")

# Dictionnaire des coordonnées
pays_coordinee = {
    "senegal": {
        "dakar": (14.6928, -17.4467),
        "thies": (14.7894, -16.926),
        "saint-louis": (16.0333, -16.5),
        "kaolack": (14.151, -16.0726),
        "ziguinchor": (12.5833, -16.2667),
        "tambacounda": (13.77, -13.6672),
        "kedougou": (12.5535, -12.1743),
    },
    "mali": {
        "bamako": (12.6392, -8.0029),
        "segou": (13.4317, -6.2157),
        "timbuktu": (16.7666, -3.0026),
        "mopti": (14.4843, -4.1827),
    },
    "cote_d_ivoire": {
        "abidjan": (5.3364, -4.0261),
        "bouake": (7.6833, -5.0333),
        "yamoussoukro": (6.8161, -5.2742),
        "san_pedro": (4.7485, -6.6363),
    },
    "guinee": {
        "conakry": (9.6412, -13.5784),
        "kankan": (10.3842, -9.3057),
        "n_zerekore": (7.7594, -8.8174),
        "labé": (11.3167, -12.2833),
    },
    "nigeria": {
        "lagos": (6.5244, 3.3792),
        "abuja": (9.0579, 7.4951),
        "kano": (12.0022, 8.5919),
    },
    "ghana": {
        "accra": (5.6037, -0.187),
        "kumasi": (6.6666, -1.6163),
        "tamale": (9.4075, -0.8531),
        "takoradi": (4.8975, -1.7603),
    },
    "burkina faso": {
        "ouagadougou": (12.3714, -1.5197),
        "bobo dioulasso": (11.1786, -4.2979),
        "koudougou": (12.2542, -2.3625),
    },
}


# Fonction principale
def get_data(pays_list, start_date, end_date):
    dfs = []
    # Générer l’URL NASA
    def __get_url__(lat:str, long:str, start_date:str, end_date:str)->str:
        return f"https://power.larc.nasa.gov/api/temporal/hourly/point?parameters=T2M,RH2M,T2MWET,PRECTOTCORR,WS10M,WD10M,T2MDEW,V10M,PS,QV2M,U10M&community=AG&longitude={long}&latitude={lat}&start={start_date}&end={end_date}&format=json"

    # Convertir les données JSON en DataFrame
    def __convert_to_df_optimized__(parameters, city, county):
        dates = list(parameters['T2M'].keys())
        n_dates = len(dates)
        data = {
            'date': dates,
            'ville': [city] * n_dates,
            'pays': [county] * n_dates
        }
        for param in parameters:
            data[param] = [parameters[param].get(date, None) for date in dates]
        return pd.DataFrame(data)
    
    def fetch_city_data(pays, ville, coordonate):
        lat, long = coordonate
        url = __get_url__(lat=lat, long=long, start_date=start_date, end_date=end_date)
        try:
            response = requests.get(url)
            data = response.json()["properties"]["parameter"]
            return __convert_to_df_optimized__(parameters=data, city=ville, county=pays)
        except Exception as e:
            print(f"Erreur pour {ville}, {pays}: {e}")
            return None

    tasks = []
    for pays_loop in pays_list:
        if pays_loop.lower() not in [p.lower() for p in pays_coordinee.keys()]:
            print(f"Ce pays n'est pas pris en compte: {pays_loop}")
            continue
        villes = pays_coordinee[pays_loop.lower()]
        for ville, coordonate in villes.items():
            tasks.append((pays_loop, ville, coordonate))

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(fetch_city_data, pays, ville, coord) for pays, ville, coord in tasks]
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if result is not None:
                dfs.append(result)

    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

# Exécution
df = get_data(["Senegal","mali","cote_d_ivoire","guinee","nigeria","ghana","burkina faso"], start_date, end_date)
df.to_csv("Nasa_Power_data_PAYS.csv", index=False)


In [26]:
df = pd.read_csv("Nasa_Power_data_PAYS.csv")
df.head()

Unnamed: 0,date,ville,pays,T2M,RH2M,T2MWET,PRECTOTCORR,WS10M,WD10M,T2MDEW,V10M,PS,QV2M,U10M
0,2025072600,bamako,mali,24.21,85.41,22.92,0.14,2.39,200.9,21.62,2.23,96.76,16.77,0.85
1,2025072601,bamako,mali,23.76,87.03,22.62,0.15,2.23,201.8,21.49,2.07,96.71,16.64,0.83
2,2025072602,bamako,mali,23.47,88.29,22.46,0.19,2.23,202.4,21.45,2.06,96.69,16.59,0.85
3,2025072603,bamako,mali,23.24,89.63,22.35,0.24,2.29,205.6,21.46,2.07,96.7,16.61,0.99
4,2025072604,bamako,mali,23.08,90.6,22.29,0.22,2.5,212.4,21.49,2.11,96.74,16.62,1.34


#  ÉTAPE 2 : TRANSFORMATION 

In [27]:
from pyspark.sql.functions import col, to_date, substring, concat_ws, lit

# Initialisation Spark
spark = SparkSession.builder \
    .appName("Météo") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Lecture du fichier CSV avec pandas
df_read = pd.read_csv("Nasa_Power_data_PAYS.csv")

# Conversion en DataFrame Spark
df = spark.createDataFrame(df_read)

def transformer_header(df):
    # 1. Extraire la date et l'heure depuis la colonne 'date'
    df = df.withColumn("date_str", col("date").cast("string"))
    df = df.withColumn("date_formatted", to_date(substring(col("date_str"), 1, 8), "yyyyMMdd")) \
           .withColumn("heure_formatted", concat_ws(":", substring(col("date_str"), 9, 2), lit("00"), lit("00")))

    # 2. Sélectionner les colonnes utiles
    colonnes_a_exclure = ["date_str", "heure_str", "date", "heure", "date_formatted", "heure_formatted"]
    #all_columns = [c for c in df.columns if c not in colonnes_a_exclure]
    all_columns = [c for c in df.columns if c not in colonnes_a_exclure]
    df = df.select(*all_columns, col("date_formatted").alias("date"), col("heure_formatted").alias("heure"))

    # 3. Nettoyage des données
    df = df.dropna()
    df = df.dropDuplicates()
    df = df.filter((col("T2MWET") >= -30) & (col("RH2M") >= -30))

    # 4. Renommer les colonnes
    header_map = {
        'ville': 'ville',
        'pays': 'pays',
        'T2M': 'temperature_air',
        'PS': 'pression',
        'WS10M': 'intensite_vent',
        'QV2M': 'humidite_specifique',
        'T2MDEW': 'temperature_point_rosee',
        'U10M': 'composante_est_ouest_vent',
        'V10M': 'vitesse_vent',
        'RH2M': 'humidite_relative',
        'WD10M': 'direction_vent',
        'T2MWET': 'temperature_humide',
        'PRECTOTCORR': 'precipitations_corrigees',
        'date': 'date',
        'heure': 'heure'
    }

    for old_col, new_col in header_map.items():
        df = df.withColumnRenamed(old_col, new_col)

    return df

#df_cleaned = df.toPandas()
# Transformation
df_cleaned = transformer_header(df)

# Export vers CSV
df_cleaned.toPandas().to_csv("Nasa_Power_data_cleaned_PAYS.csv", index=False)

# Arrêt de Spark
spark.stop()



In [45]:
dataset_path = "Nasa_Power_data_cleaned_PAYS.csv"
df_cleaned = pd.read_csv(dataset_path)
df_cleaned.head(5)

Unnamed: 0,ville,pays,temperature_air,humidite_relative,temperature_humide,precipitations_corrigees,intensite_vent,direction_vent,temperature_point_rosee,vitesse_vent,pression,humidite_specifique,composante_est_ouest_vent,date,heure
0,saint-louis,Senegal,26.74,88.8,25.75,0.03,3.15,278.0,24.76,-0.44,101.07,19.43,3.12,2025-08-05,00:00:00
1,saint-louis,Senegal,29.36,75.95,27.03,5.62,4.24,275.4,24.7,-0.4,101.06,19.36,4.22,2025-08-07,09:00:00
2,saint-louis,Senegal,26.79,90.79,25.98,3.75,3.48,218.9,25.18,2.71,101.27,19.89,2.19,2025-08-10,23:00:00
3,saint-louis,Senegal,25.91,93.62,25.36,2.31,3.98,227.6,24.82,2.68,101.16,19.49,2.94,2025-08-11,06:00:00
4,saint-louis,Senegal,31.55,63.82,27.79,1.82,4.02,248.9,24.03,1.45,100.72,18.49,3.75,2025-09-06,12:00:00


In [46]:
print(df_cleaned.columns.tolist())

['ville', 'pays', 'temperature_air', 'humidite_relative', 'temperature_humide', 'precipitations_corrigees', 'intensite_vent', 'direction_vent', 'temperature_point_rosee', 'vitesse_vent', 'pression', 'humidite_specifique', 'composante_est_ouest_vent', 'date', 'heure']


# ÉTAPE 3 : CHARGEMENT 

In [40]:
import pandas as pd
import mysql.connector

# Paramètres de connexion
DB_NAME = "meteo_db_PAYS"
TABLE_NAME = "meteo_data_PAYS"
MYSQL_USER = "root"
MYSQL_PASSWORD = "MYCCA"
MYSQL_HOST = "localhost"

def insert_connector_data(dataframe, batch_size=200):  
    # Vérifier que le DataFrame est bien un Pandas DataFrame
    if not isinstance(dataframe, pd.DataFrame):
        try:
            dataframe = dataframe.toPandas()
        except Exception as e:
            print(f"Erreur de conversion en DataFrame Pandas : {e}")
            return

    # Étape 1 : Créer la base si elle n'existe pas
    conn = mysql.connector.connect(
        host=MYSQL_HOST,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD
    )
    cursor = conn.cursor()
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}")
    cursor.close()
    conn.close()
    
    # Étape 2 : Connexion à la base
    conn = mysql.connector.connect(
        host=MYSQL_HOST,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD,
        database=DB_NAME
    )
    cursor = conn.cursor()
    
    # Créer la table si elle n'existe pas
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            id INT AUTO_INCREMENT PRIMARY KEY,
            date DATE,
            heure VARCHAR(255),
            ville VARCHAR(255),
            pays VARCHAR(255),
            temperature_air FLOAT,
            pression FLOAT,
            intensite_vent FLOAT,
            humidite_specifique FLOAT,
            temperature_point_rosee FLOAT,
            composante_est_ouest_vent FLOAT,
            vitesse_vent FLOAT,
            humidite_relative FLOAT,
            direction_vent FLOAT,
            temperature_humide FLOAT,
            precipitations_corrigees FLOAT
        );
    """)
    conn.commit()
    
    # Colonnes attendues
    expected_columns = [
        'date', 'heure', 'ville', 'pays', 'temperature_air', 'pression', 'intensite_vent',
        'humidite_specifique', 'temperature_point_rosee', 'composante_est_ouest_vent',
        'vitesse_vent', 'humidite_relative', 'direction_vent',
        'temperature_humide', 'precipitations_corrigees'
    ]
    
    # Vérifier que toutes les colonnes sont présentes
    missing = [col for col in expected_columns if col not in dataframe.columns]
    if missing:
        print("Colonnes manquantes :", missing)
        return
    # Vérifier le nombre de colonnes dans le DataFrame
    column_count = len(dataframe.columns)
    print(f"DataFrame has {column_count} columns")
    print(f"DataFrame columns: {dataframe.columns.tolist()}")
    
    # Réordonner les colonnes
    dataframe = dataframe[expected_columns]
    
    # Insertion par batch
    batch = []
    for _, row in dataframe.iterrows():
        row_tuple = tuple(row.values)
        batch.append(row_tuple)
        
        if len(batch) >= batch_size:
            try:
                cursor.executemany(
                    f"""INSERT INTO {TABLE_NAME} ({', '.join(expected_columns)}) 
                        VALUES ({', '.join(['%s'] * len(expected_columns))})""",
                    batch
                )
                conn.commit()
                print(f"Inserted batch of {len(batch)} rows")
                batch = []
            except Exception as e:
                print(f"Error inserting batch: {e}")
                conn.rollback()

    # Dernier batch
    if batch:
        try:
            cursor.executemany(
                f"""INSERT INTO {TABLE_NAME} ({', '.join(expected_columns)}) 
                    VALUES ({', '.join(['%s'] * len(expected_columns))})""",
                batch
            )
            conn.commit()
            print(f"Inserted final batch of {len(batch)} rows")
        except Exception as e:
            print(f"Error inserting final batch: {e}")
            conn.rollback()

    print("Data insertion completed")
    cursor.close()
    conn.close()
    
# Exemple d'appel à la fonction
insert_connector_data(df_cleaned, batch_size=200)

DataFrame has 15 columns
DataFrame columns: ['ville', 'pays', 'temperature_air', 'humidite_relative', 'temperature_humide', 'precipitations_corrigees', 'intensite_vent', 'direction_vent', 'temperature_point_rosee', 'vitesse_vent', 'pression', 'humidite_specifique', 'composante_est_ouest_vent', 'date', 'heure']
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batc

# Ce script Python permet t'exécuter toutes les étapes du pipeline E.T.L (Extraction,Transformation, Chargement) pour des données météorologiques provenant de la NASA

In [41]:
import pandas as pd
import logging
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, substring, concat_ws, lit

# Configuration du logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# === MAIN ===
if __name__ == "__main__":
    try:
        # Définir les dates
        date_actuelle = datetime.today()
        date_delay = date_actuelle - timedelta(days=8)
        end_date = date_delay.strftime("%Y%m%d")
        start_date = (date_delay - timedelta(days=75)).strftime("%Y%m%d")

        pays_list = ["Senegal", "mali", "cote_d_ivoire", "guinee", "nigeria", "ghana", "burkina faso"]

        logging.info("Début de l'extraction des données")
        df_extrait = get_data(pays_list, start_date, end_date) 
        df_extrait.to_csv("Nasa_Power_data_PAYS.csv", index=False)
        logging.info("Fin de l'extraction des données")

        logging.info("Initialisation de Spark")
        spark = SparkSession.builder \
            .appName("Météo") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "4g") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .getOrCreate()

        logging.info("Début de la transformation des données")
        df_spark = spark.createDataFrame(df_extrait)
        df_cleaned = transformer_header(df_spark)
        df_cleaned.toPandas().to_csv("Nasa_Power_data_cleaned_PAYS.csv", index=False)
        logging.info("Fin de la transformation des données")

        logging.info("Début du chargement des données")
        insert_connector_data(df_cleaned, batch_size=200)  # Assurez-vous que cette fonction existe
        logging.info("Fin du chargement des données")

        logging.info("Pipeline ETL terminé avec succès")
        spark.stop()

    except Exception as e:
        logging.error(f"Erreur globale du pipeline ETL : {e}")


DataFrame has 15 columns
DataFrame columns: ['ville', 'pays', 'temperature_air', 'humidite_relative', 'temperature_humide', 'precipitations_corrigees', 'intensite_vent', 'direction_vent', 'temperature_point_rosee', 'vitesse_vent', 'pression', 'humidite_specifique', 'composante_est_ouest_vent', 'date', 'heure']
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batc