## FUNCTIONS

In [None]:
import os

def create_directory_if_not_exists(directory_path):
    if not os.path.exists(directory_path):
        os.makedirs(directory_path)
        print(f"Répertoire '{directory_path}' créé avec succès.")
    else:
        print(f"Le répertoire '{directory_path}' existe déjà.")


In [None]:
import requests
from urllib.parse import urlparse
import os


def download_file(url, destination_path):

  # Extraction du nom du fichier zippé
  zip_filename = os.path.basename(urlparse(url).path)
  csv_filename = zip_filename[:-3]

  # Chemin où vous souhaitez enregistrer le fichier téléchargé
  destination_file_path = f"{destination_path}{zip_filename}"

  # Téléchargement du fichier
  response = requests.get(url)

  # Vérification du code de réponse HTTP
  if response.status_code == 200:
      # Écriture du contenu téléchargé dans le fichier local
      with open(destination_file_path, 'wb') as f:
          f.write(response.content)
      print(f"Le fichier {zip_filename} a été téléchargé avec succès.")
  else:
      print(f"Échec du téléchargement du fichier {zip_filename}. Code de réponse HTTP :", response.status_code)

  return zip_filename, csv_filename


In [None]:
import gzip
import shutil


def unzip_file(gzip_file, output_file):
    with gzip.open(gzip_file, 'rb') as f_in:
        with open(output_file, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
    print(f"Le fichier {gzip_file} a été dézippé avec succès.")


In [None]:
from pyspark.sql.functions import col, to_date, lit


def load_file_as_df(filepath_or_folderpath, verbose=False):

  # Charger le(s) fichier(s) CSV
  df = spark.read.csv(filepath_or_folderpath, header=True, sep=";", inferSchema=True)

  # Ajouter le nom du fichier comme première colonne du DataFrame
  df = df.withColumn("Source", lit(filepath_or_folderpath))

  # Compter les lignes du DataFrame
  print(f"Le DataFrame contient {df.count()} lignes.")

  # Convertir le champ AAAAMMJJ en datetime
  df = df.withColumn("AAAAMMJJ", to_date(col("AAAAMMJJ").cast("string"), "yyyyMMdd"))

  # Afficher le schéma et quelques lignes du dataframe
  if verbose:
    df.printSchema()
    display(df)

  return df


In [None]:
def save_df_as_parquet(output_parquet_path):
  # Sauvegarder le DataFrame au format Parquet
  df.write.mode("overwrite").parquet(output_parquet_path)

  # Afficher un message de confirmation
  print(f"Le DataFrame a été sauvegardé au format Parquet avec succès dans le répertoire {output_parquet_path}.")

In [None]:
def save_df_as_delta(output_delta_path):
  # Sauvegarder le DataFrame au format Parquet
  df.write.format("delta").mode("overwrite").save(output_delta_path)

  # Afficher un message de confirmation
  print(f"Le DataFrame a été sauvegardé au format Delta avec succès dans le répertoire {output_delta_path}.")

## Liste des URLs pour tous les départements

In [None]:
# Utilisation d'une compréhension de liste avec formatage de chaîne
dept_list = ["{:02d}".format(i) for i in range(1, 100)]

#remove 96, 97, 98
dept_list.remove('96')
#add 971, 972, 973, 974, 975µ
dept_list.remove('97')
dept_list.extend(['971', '972', '973', '974', '975'])
#add 984, 985, 986, 987, 988
dept_list.remove('98')
dept_list.extend(['984', '985', '986', '987', '988'])

# Affichage de la liste résultante
print(dept_list)

In [None]:
for department_number in dept_list:
  create_directory_if_not_exists(f"/lakehouse/default/Files/download/{department_number}/")
  create_directory_if_not_exists(f"/lakehouse/default/Files/unzip/{department_number}/")
  create_directory_if_not_exists(f"/lakehouse/default/Files/refine/{department_number}/")
  print(f"Répertoires créés pour le département numéro {department_number}")

In [None]:
url_template_previous = "https://object.files.data.gouv.fr/meteofrance/data/synchro_ftp/BASE/QUOT/Q_%s_previous-1950-2022_RR-T-Vent.csv.gz"
url_template_latest = "https://object.files.data.gouv.fr/meteofrance/data/synchro_ftp/BASE/QUOT/Q_%s_latest-2023-2024_RR-T-Vent.csv.gz"

url_list = []

for i in dept_list:
  print(i)
  url_list.append(url_template_previous % i)
  url_list.append(url_template_latest % i)

# Afficher la liste d'URLs générée
for url in url_list:
    print(url)

In [None]:
for department_number in dept_list:

  print(f"* Début de traitement pour le département numéro {department_number}")
  destination_path = f"/lakehouse/default/Files/download/{department_number}/"

  url_template_previous = "https://object.files.data.gouv.fr/meteofrance/data/synchro_ftp/BASE/QUOT/Q_%s_previous-1950-2022_RR-T-Vent.csv.gz"
  url_template_latest = "https://object.files.data.gouv.fr/meteofrance/data/synchro_ftp/BASE/QUOT/Q_%s_latest-2023-2024_RR-T-Vent.csv.gz"

  url_list = []
  url_list.append(url_template_previous % department_number)
  url_list.append(url_template_latest % department_number)

  for url in url_list:
    print(f"*** Début de traitement pour l'URL {url} ***")
    zip_filename, csv_filename = download_file(url, destination_path)

    gzip_file_path = f"/lakehouse/default/Files/download/{department_number}/{zip_filename}"
    output_file_path = f"/lakehouse/default/Files/unzip/{department_number}/{csv_filename}"
    unzip_file(gzip_file_path, output_file_path)

    # TODO : catcher l'erreur sur le second fichier 99
    
    print(f"*** Fin de traitement pour l'URL {url} ***")

  print("** Chargement du DataFrame départemental")

  unzipped_file_path = f"Files/unzip/{department_number}/"
  df = load_file_as_df(unzipped_file_path)

  print("** Sauvegarde du Parquet départemental")

  output_parquet_path = f"Files/refine/{department_number}/Q_{department_number}_RR-T-Vent.parquet"
  save_df_as_parquet(output_parquet_path)

  print(f"* Fin de traitement pour le département numéro {department_number}")


In [None]:
unzipped_file_path = f"Files/unzip/*/*.csv"
df = load_file_as_df(unzipped_file_path)

# output_parquet_path = f"Files/refine/Q_FRANCE_RR-T-Vent.parquet"
# save_df_as_parquet(output_parquet_path)

output_delta_path = f"Files/refine/Q_FRANCE_RR-T-Vent.delta"
save_df_as_delta(output_delta_path)

In [None]:
display(df)

In [None]:
display(df.select("NUM_POSTE").distinct())

## Descriptif des champs

In [None]:
# import pandas as pd


# url_descriptif = "https://object.files.data.gouv.fr/meteofrance/data/synchro_ftp/BASE/QUOT/Q_descriptif_champs_RR-T-Vent.csv"

# # Charger le DataFrame à partir de l'URL
# pdf = pd.read_csv(url_descriptif, header=None, sep=':', skipfooter=7, engine='python', error_bad_lines=False)

# # Afficher les premières lignes du DataFrame
# print(pdf.head())