In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, when, lpad, concat
from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp, unix_timestamp, col, when, hour, avg

def process_public_transport_data(input_file):
    # Création de la session Spark
    spark = SparkSession.builder.appName("DataLakeIntegration").getOrCreate()

    # Configuration de l'accès au stockage Azure
    spark.conf.set(f"fs.azure.account.key.tahasrhstorageaccount.dfs.core.windows.net",
                   "VSOLoQmP3NugMwrGCd/C7ZHaz+7/e6bjB4jI1rNp1GaWeGIS9MvvzYChfyR1xXhJWNaC0qQLICnQ+ASttWp/+w==")

    # Emplacement du fichier source dans Azure Data Lake Storage Gen2
    file_location = f"abfss://public-transport-data@tahasrhstorageaccount.dfs.core.windows.net/raw/{input_file}"

    # Lecture du fichier CSV dans un DataFrame
    df = spark.read.format("csv").option("inferSchema", "True").option("header", "True").option("delimiter", ",").load(
        file_location)

    # Transformation de l'heure d'arrivée
    def transform_arrival_time(time_col):
        hours = expr("CAST(SUBSTRING_INDEX({}, ':', 1) AS INT)".format(time_col))
        minutes = expr("SUBSTRING({}, -2, 2)".format(time_col))

        transformed_hours = when(hours >= 24, hours - 24).otherwise(hours)
        transformed_hours_padded = lpad(transformed_hours, 2, '0')

        return concat(transformed_hours_padded, expr("':'"), minutes)

    # Application de la transformation à la colonne ArrivalTime
    df = df.withColumn("ArrivalTime", transform_arrival_time("ArrivalTime"))

    # Transformation pour extraire l'année, le mois, le jour et le jour de la semaine
    df = df.withColumn("Year", F.year(df["Date"]))
    df = df.withColumn("Month", F.month(df["Date"]))
    df = df.withColumn("Day", F.dayofmonth(df["Date"]))
    df = df.withColumn("DayOfWeek", F.date_format(df["Date"], "EEEE"))

    # Ajustement des heures d'arrivée si elles sont antérieures à l'heure de départ
    df = df.withColumn(
        "ArrivalTime",
        F.when(df["ArrivalTime"] < df["DepartureTime"], df["ArrivalTime"] + F.expr("INTERVAL 1 DAY"))
        .otherwise(df["ArrivalTime"])
    )

    # Conversion de DepartureTime et ArrivalTime en timestamp
    df = df.withColumn("DepartureTime", to_timestamp(df["DepartureTime"]))
    df = df.withColumn("ArrivalTime", to_timestamp(df["ArrivalTime"]))

    # Calcul de la durée du voyage en heures
    df = df.withColumn("TripDurationHours", (unix_timestamp("ArrivalTime") - unix_timestamp("DepartureTime")) / 3600)

    # Catégorisation de la durée du voyage en "Pas de Retard", "Retard Court", "Retard Moyen" ou "Long Retard"
    df = df.withColumn(
        "DelayCategory",
        when(col("TripDurationHours") <= 0, "Pas de Retard")
        .when((col("TripDurationHours") > 0) & (col("TripDurationHours") <= 0.1667), "Retard Court")
        .when((col("TripDurationHours") > 0.1667) & (col("TripDurationHours") <= 0.3333), "Retard Moyen")
        .otherwise("Long Retard")
    )

    # Extraction de l'heure de départ
    df = df.withColumn("DepartureHour", hour("DepartureTime"))

    # Calcul de la moyenne des passagers par heure de départ
    passenger_avg_by_hour = df.groupBy("DepartureHour").agg(avg("Passengers").alias("AvgPassengers"))

    # Définition du seuil pour les heures de pointe
    pointe_threshold = 50  # Par exemple, supposons que toute heure avec une moyenne de passagers > 50 est considérée comme une heure de pointe

    # Catégorisation des heures en "Heure de Pointe" ou "Heure Hors Pointe"
    passenger_avg_by_hour = passenger_avg_by_hour.withColumn(
        "HourCategory",
        when(col("AvgPassengers") > pointe_threshold, "Heure de Pointe").otherwise("Heure Hors Pointe")
    )

    # Spécification de l'emplacement de destination dans Azure Data Lake Storage Gen2
    output_path = f"abfss://public-transport-data@tahasrhstorageaccount.dfs.core.windows.net/processed/"

    # Écriture du DataFrame dans Azure Data Lake Storage Gen2 avec le nom de fichier spécifié
    df.coalesce(1).write.format("csv").mode("overwrite").option("header", "True").option("path", output_path + input_file).save()

    # Affichage du DataFrame
    # display(df)

#### --------------- ####

# Spécifiez le chemin du "répertoire brut" dans Azure Data Lake Storage Gen2
raw_directory = "abfss://public-transport-data@tahasrhstorageaccount.dfs.core.windows.net/raw/"

# Spécifiez le chemin du "répertoire traité" où vous souhaitez vérifier les noms de répertoires
processed_directory = "abfss://public-transport-data@tahasrhstorageaccount.dfs.core.windows.net/processed/"

# Listez tous les fichiers dans le "répertoire brut"
file_list = [fichier.name for fichier in dbutils.fs.ls(raw_directory)]

# Listez tous les répertoires dans le "répertoire traité"
processed_dirs = [dossier.name for dossier in dbutils.fs.ls(processed_directory) if dossier.isDir]

# Vérifiez si les noms de fichiers existent en tant que noms de répertoires dans le "répertoire traité"
counter = 0
total_files = len(file_list)

for idx, nom_fichier in enumerate(file_list):
    if nom_fichier + "/" in processed_dirs:
        print(f"'{nom_fichier}' déjà traité.")
    else:
        # Traitez les données de transport public (remplacez ceci par votre logique de traitement réelle)
        process_public_transport_data(nom_fichier)
        print(f"'{nom_fichier}' traité avec succès.")
        counter += 1

    if counter == 2 or idx == total_files - 1:
        if idx == total_files - 1 and counter == 0:
            print("Tous les fichiers sont traité")
        else:
            print("Limite de traitement atteinte. Sortie de la boucle.")
        break



'public_transport_data_month_1' déjà traité.
'public_transport_data_month_2' déjà traité.
'public_transport_data_month_3' déjà traité.
'public_transport_data_month_4' déjà traité.
'public_transport_data_month_5' déjà traité.
Tous les fichiers sont traité
