In [24]:
import logging
import requests
import pandas as pd
from datetime import datetime, timedelta
import concurrent.futures
import mysql.connector
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, substring, to_date, concat_ws, lit

In [25]:
# === CONFIGURATION DU LOGGING ===
logging.basicConfig(
    filename='etl_execution.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

In [43]:
# === ÉTAPE 1 : EXTRACTION === 
#logging.info("Début de l'extraction des données")
# Définir les dates AVANT toute utilisation
date_actuelle = datetime.today()
date_delay = date_actuelle - timedelta(days=8)
end_date = date_delay.strftime("%Y%m%d")

start_date_delay = date_delay - timedelta(days=75)
start_date = start_date_delay.strftime("%Y%m%d")

# Dictionnaire des coordonnées
pays_coordinee = {
    "senegal": {
        "dakar": (14.6928, -17.4467),
        "thies": (14.7894, -16.926),
        "saint-louis": (16.0333, -16.5),
        "kaolack": (14.151, -16.0726),
        "ziguinchor": (12.5833, -16.2667),
        "tambacounda": (13.77, -13.6672),
        "kedougou": (12.5535, -12.1743),
    },
    "mali": {
        "bamako": (12.6392, -8.0029),
        "segou": (13.4317, -6.2157),
        "timbuktu": (16.7666, -3.0026),
        "mopti": (14.4843, -4.1827),
    },
    "cote_d_ivoire": {
        "abidjan": (5.3364, -4.0261),
        "bouake": (7.6833, -5.0333),
        "yamoussoukro": (6.8161, -5.2742),
        "san_pedro": (4.7485, -6.6363),
    },
    "guinee": {
        "conakry": (9.6412, -13.5784),
        "kankan": (10.3842, -9.3057),
        "n_zerekore": (7.7594, -8.8174),
        "labé": (11.3167, -12.2833),
    },
    "nigeria": {
        "lagos": (6.5244, 3.3792),
        "abuja": (9.0579, 7.4951),
        "kano": (12.0022, 8.5919),
    },
    "ghana": {
        "accra": (5.6037, -0.187),
        "kumasi": (6.6666, -1.6163),
        "tamale": (9.4075, -0.8531),
        "takoradi": (4.8975, -1.7603),
    },
    "burkina faso": {
        "ouagadougou": (12.3714, -1.5197),
        "bobo dioulasso": (11.1786, -4.2979),
        "koudougou": (12.2542, -2.3625),
    },
}


# Fonction principale
def get_data(pays_list, start_date, end_date):
    dfs = []
    # Générer l’URL NASA
    def __get_url__(lat:str, long:str, start_date:str, end_date:str)->str:
        return f"https://power.larc.nasa.gov/api/temporal/hourly/point?parameters=T2M,RH2M,T2MWET,PRECTOTCORR,WS10M,WD10M,T2MDEW,V10M,PS,QV2M,U10M&community=AG&longitude={long}&latitude={lat}&start={start_date}&end={end_date}&format=json"

    # Convertir les données JSON en DataFrame
    def __convert_to_df_optimized__(parameters, city, county):
        dates = list(parameters['T2M'].keys())
        n_dates = len(dates)
        data = {
            'date': dates,
            'ville': [city] * n_dates,
            'pays': [county] * n_dates
        }
        for param in parameters:
            data[param] = [parameters[param].get(date, None) for date in dates]
        return pd.DataFrame(data)
    
    def fetch_city_data(pays, ville, coordonate):
        lat, long = coordonate
        url = __get_url__(lat=lat, long=long, start_date=start_date, end_date=end_date)
        try:
            response = requests.get(url)
            data = response.json()["properties"]["parameter"]
            return __convert_to_df_optimized__(parameters=data, city=ville, county=pays)
        except Exception as e:
            print(f"Erreur pour {ville}, {pays}: {e}")
            return None

    tasks = []
    for pays_loop in pays_list:
        if pays_loop.lower() not in [p.lower() for p in pays_coordinee.keys()]:
            print(f"Ce pays n'est pas pris en compte: {pays_loop}")
            continue
        villes = pays_coordinee[pays_loop.lower()]
        for ville, coordonate in villes.items():
            tasks.append((pays_loop, ville, coordonate))

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(fetch_city_data, pays, ville, coord) for pays, ville, coord in tasks]
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            if result is not None:
                dfs.append(result)

    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()

# Exécution
df = get_data(["Senegal","mali","cote_d_ivoire","guinee","nigeria","ghana","burkina faso"], start_date, end_date)
df.to_csv("Nasa_Power_data_77PAYS.csv", index=False)

#logging.info("Fin de l'extraction des données")

In [44]:
df = pd.read_csv("Nasa_Power_data_77PAYS.csv")
df.head()

Unnamed: 0,date,ville,pays,T2M,RH2M,T2MWET,PRECTOTCORR,WS10M,WD10M,T2MDEW,V10M,PS,QV2M,U10M
0,2025071200,kedougou,Senegal,24.15,94.51,23.7,1.43,5.14,260.5,23.24,0.85,98.55,18.17,5.07
1,2025071201,kedougou,Senegal,23.81,94.93,23.39,0.88,5.1,257.4,22.98,1.11,98.49,17.89,4.98
2,2025071202,kedougou,Senegal,23.49,95.08,23.08,0.57,5.0,255.4,22.67,1.26,98.45,17.58,4.84
3,2025071203,kedougou,Senegal,23.2,95.02,22.79,0.78,4.95,253.7,22.38,1.39,98.45,17.26,4.75
4,2025071204,kedougou,Senegal,22.99,94.75,22.55,1.06,5.02,252.5,22.12,1.51,98.46,16.99,4.79


In [28]:
# === ÉTAPE 2 : TRANSFORMATION ===
#logging.info("Début de la transformation des données")
    
spark = SparkSession.builder \
    .appName("Météo") \
    .config("spark.python.worker.reuse", "true") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.network.timeout", "600s") \
    .getOrCreate()

df_read = pd.read_csv("Nasa_Power_data_77PAYS.csv")


def transformer_header(df):
    def date_hour_colonne(dataframe):
        # Créer le DataFrame Spark depuis Pandas
        df = spark.createDataFrame(pd.DataFrame(dataframe))
        # Sauvegarder la date brute dans une nouvelle colonne temporaire
        df = df.withColumn("date_str", col("date").cast("string"))
        # Extraire la date et l'heure
        df = df.withColumn("date_formatted", to_date(substring(col("date_str"), 1, 8), "yyyyMMdd")) \
           .withColumn("heure_formatted", concat_ws(":", substring(col("date_str"), 9, 2), lit("00"), lit("00"))) \
           #.withColumn("heure_formatted", concat_ws(" ", col("date_formatted"), col("heure_str"))) \
        # Obtenir une liste de colonnes sans les colonnes temporaires et les colonnes à remplacer
        all_columns = [c for c in df.columns if c not in ["date_str", "heure_str", "date", "heure", "date_formatted", "heure_formatted"]]
        # Sélectionner les colonnes originales plus les nouvelles colonnes transformées
        df = df.select(
            *all_columns,
            col("date_formatted").alias("date"),
            col("heure_formatted").alias("heure"),
            )
    
        return df
    def nan_value_manage(df):
        return df.dropna()

    def duplicate_value_manage(df):
        return df.dropDuplicates()

    def remove_invalid_rows(df):
        # Supprimer les lignes où T2MWET < -30 ou RH2M < -30
        df = df.filter((col("T2MWET") >= -30) & (col("RH2M") >= -30))
        return df
    # Dictionnaire de correspondance entre les anciennes colonnes et les nouvelles colonnes
    header_map = {
        'ville': 'ville',
        'pays': 'pays',
        'T2M': 'temperature_air',
        'PS': 'pression',
        'WS10M': 'intensite_vent',
        'QV2M': 'humidite_specifique',
        'T2MDEW': 'temperature_point_rosee',
        'U10M': 'composante_est_ouest_vent',
        'V10M': 'vitesse_vent',
        'RH2M': 'humidite_relative',
        'WD10M': 'direction_vent',
        'T2MWET': 'temperature_humide',
        'PRECTOTCORR': 'precipitations_corrigees',
        'date': 'date',
        'heure': 'heure'
    }
    
    # Renommer les colonnes du DataFrame
    df.rename(columns=header_map, inplace=True)
    
    return df

df_cleaned = df.toPandas()

#Transforme header
df_cleaned = transformer_header(df_cleaned)

df_cleaned.shape
spark.stop()

df_cleaned.to_csv("Nasa_Power_data_cleaned_77PAYS.csv", index=False)
df_cleaned.head()

Unnamed: 0,ville,pays,temperature_humide,direction_vent,intensite_vent,vitesse_vent,composante_est_ouest_vent,humidite_specifique,pression,humidite_relative,temperature_point_rosee,temperature_air,precipitations_corrigees,date,heure
0,kaolack,Senegal,24.65,245.4,3.89,1.62,3.54,18.43,101.07,91.12,23.88,25.42,3.35,2025-07-12,05:00:00
1,kaolack,Senegal,27.55,244.9,7.6,3.23,6.88,17.27,100.91,57.25,22.78,32.31,1.16,2025-07-12,14:00:00
2,kaolack,Senegal,26.13,248.2,6.7,2.49,6.22,17.39,100.97,68.28,22.91,29.35,0.9,2025-07-12,17:00:00
3,kaolack,Senegal,25.7,248.4,5.84,2.15,5.43,17.72,101.03,74.57,23.24,28.16,1.84,2025-07-12,18:00:00
4,kaolack,Senegal,25.32,244.1,4.1,1.79,3.69,19.06,100.99,89.88,24.43,26.2,5.03,2025-07-13,00:00:00


In [29]:
dataset_path = "Nasa_Power_data_cleaned_77PAYS.csv"
df_cleaned = pd.read_csv(dataset_path)
df_cleaned.head(5)
print(df_cleaned.columns.tolist())

['ville', 'pays', 'temperature_humide', 'direction_vent', 'intensite_vent', 'vitesse_vent', 'composante_est_ouest_vent', 'humidite_specifique', 'pression', 'humidite_relative', 'temperature_point_rosee', 'temperature_air', 'precipitations_corrigees', 'date', 'heure']


In [63]:
#logging.info("Début du chargement des données")
# Paramètres de connexion
DB_NAME = "meteo_db_77PAYS"
TABLE_NAME = "meteo_data_77PAYS"
MYSQL_USER = "root"
MYSQL_PASSWORD = "MYCCA"
MYSQL_HOST = "localhost"

def insert_connector_data(dataframe, batch_size=200):  
    #Charger le fichier CSV
    df_cleaned = pd.read_csv("Nasa_Power_data_cleaned_77PAYS.csv")

    # Étape 1 : Connexion sans base pour créer la base si elle n'existe pas
    conn = mysql.connector.connect(
        host=MYSQL_HOST,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD
    )
    cursor = conn.cursor()
    cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}")
    cursor.close()
    conn.close()
    
    # Étape 2 : Connexion à la base nouvellement créée
    conn = mysql.connector.connect(
        host=MYSQL_HOST,
        user=MYSQL_USER,
        password=MYSQL_PASSWORD,
        database=DB_NAME
    )
    cursor = conn.cursor() 
    
    # Connexion à PostgreSQL
    conn = mysql.connector.connect(
    host=MYSQL_HOST,
    user=MYSQL_USER,
    password=MYSQL_PASSWORD,
    database=DB_NAME
    )
    cursor = conn.cursor()
    
    # Créer la table si elle n'existe pas (en respectant le bon ordre)
    cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            id SERIAL PRIMARY KEY,
            date DATE,
            heure VARCHAR(255),
            ville VARCHAR(255),
            pays VARCHAR(255),
            temperature_air FLOAT,
            pression FLOAT,
            intensite_vent FLOAT,
            humidite_specifique FLOAT,
            temperature_point_rosee FLOAT,
            composante_est_ouest_vent FLOAT,
            vitesse_vent FLOAT,
            humidite_relative FLOAT,
            direction_vent FLOAT,
            temperature_humide FLOAT,
            precipitations_corrigees FLOAT
        );
    """)
    conn.commit()
    
    # Vérifier le nombre de colonnes dans le DataFrame
    column_count = len(dataframe.columns)
    print(f"DataFrame has {column_count} columns")
    print(f"DataFrame columns: {dataframe.columns.tolist()}")
    
    # Liste des colonnes attendues dans le bon ordre
    expected_columns = [
        'date', 'heure', 'ville', 'pays', 'temperature_air', 'pression', 'intensite_vent',
        'humidite_specifique', 'temperature_point_rosee', 'composante_est_ouest_vent',
        'vitesse_vent', 'humidite_relative', 'direction_vent',
        'temperature_humide', 'precipitations_corrigees'
    ]
    
    # Ajuster le DataFrame
    dataframe = dataframe[expected_columns]
        
    # Préparer les données pour l'insertion
    batch = []
    for _, row in dataframe.iterrows():
        row_tuple = tuple(row.values)
        if len(row_tuple) != 15:
            print(f"Warning: Row has {len(row_tuple)} values, expected 15")
            continue
        batch.append(row_tuple)
        
        if len(batch) >= batch_size:
            try:
                query = f"""
                    INSERT INTO {TABLE_NAME} 
                    (
                        date, heure, ville, pays, temperature_air, pression, intensite_vent,
                        humidite_specifique, temperature_point_rosee, composante_est_ouest_vent,
                        vitesse_vent, humidite_relative, direction_vent,
                        temperature_humide, precipitations_corrigees
                    )
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """
                cursor.executemany(query, batch)
                conn.commit()
                print(f"Inserted batch of {len(batch)} rows")
                batch = []
            except Exception as e:
                print(f"Error inserting batch: {e}")
                if batch:
                    print(f"First row in batch: {batch[0]}")
                conn.rollback()
    
    # Insérer le dernier batch
    if batch:
        try:
            query = f"""
                INSERT INTO {TABLE_NAME} 
                (
                    date, heure, ville, pays, temperature_air, pression, intensite_vent,
                    humidite_specifique, temperature_point_rosee, composante_est_ouest_vent,
                    vitesse_vent, humidite_relative, direction_vent,
                    temperature_humide, precipitations_corrigees
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """
            cursor.executemany(query, batch)
            conn.commit()
            print(f"Inserted final batch of {len(batch)} rows")
        except Exception as e:
            print(f"Error inserting final batch: {e}")
            if batch:
                print(f"First row in batch: {batch[0]}")
            conn.rollback()
    
    print("Data insertion completed")
    cursor.close()
    conn.close()


# Exemple d'appel à la fonction
insert_connector_data(df_cleaned, batch_size=200)

DataFrame has 15 columns
DataFrame columns: ['ville', 'pays', 'temperature_humide', 'direction_vent', 'intensite_vent', 'vitesse_vent', 'composante_est_ouest_vent', 'humidite_specifique', 'pression', 'humidite_relative', 'temperature_point_rosee', 'temperature_air', 'precipitations_corrigees', 'date', 'heure']
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batc

In [66]:
# === MAIN ===
if __name__ == "__main__":
    try:
        #date_actuelle = datetime.today()
        ##date_delay = date_actuelle - timedelta(days=8)
        #end_date = date_delay.strftime("%Y%m%d")
        #start_date = (date_delay - timedelta(days=75)).strftime("%Y%m%d")

        pays_list = ["Senegal","mali","cote_d_ivoire","guinee","nigeria","ghana","burkina faso"]
        logging.info("Début de l'extraction des données")
        df_extrait=get_data(pays_list, start_date, end_date)
        df_extrait.to_csv("Nasa_Power_data_77PAYS.csv", index=False)
        logging.info("Fin de l'extraction des données")
        logging.info("Début de la transformation des données")
        df_cleaned = transformer_header(df_cleaned)
        df_cleaned = transformer_header(df_cleaned)
        df_cleaned.to_csv("Nasa_Power_data_cleaned_77PAYS.csv", index=False)
        logging.info("Fin de la transformation des données")
        logging.info("Début du chargement des données")
        insert_connector_data(df_cleaned, batch_size=200)
        logging.info("Fin du chargement des données")
        logging.info("Pipeline ETL terminé avec succès")
    except Exception as e:
        logging.error(f"Erreur globale du pipeline ETL : {e}")

DataFrame has 15 columns
DataFrame columns: ['ville', 'pays', 'temperature_humide', 'direction_vent', 'intensite_vent', 'vitesse_vent', 'composante_est_ouest_vent', 'humidite_specifique', 'pression', 'humidite_relative', 'temperature_point_rosee', 'temperature_air', 'precipitations_corrigees', 'date', 'heure']
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batch of 200 rows
Inserted batc