In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Integrate and manage Public Transport data").getOrCreate()


In [0]:
spark.conf.set(
    f"fs.azure.account.key.omardbstorageaccount.dfs.core.windows.net", 
    "n/M6dGAvjc8505kkOdZWCcfky+UKdckTs1aAhVFqb/J7Ck1yTg+meYzFXNAb/oz1mxXbizdVQdB5+ASt8wSp4w=="
)


In [0]:
import os
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, year, month, dayofmonth, dayofweek, unix_timestamp, when, col
from pyspark.sql import functions as F

# Montage du Data Lake
storageAccountName = "omardbstorageaccount"
storageAccountAccessKey = "n/M6dGAvjc8505kkOdZWCcfky+UKdckTs1aAhVFqb/J7Ck1yTg+meYzFXNAb/oz1mxXbizdVQdB5+ASt8wSp4w=="
sasToken = "?sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2023-09-28T15:43:37Z&st=2023-09-27T07:43:37Z&spr=https&sig=13kFGISyoUEheYmR4g9YC3qa4hCoO8YDOEQzIorKfCw%3D"
blobContainerName = "public-transport-data"
mountPoint = "/mnt/public-transport-data/"

# Vérification de l'existence du point de montage
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
    try:
        dbutils.fs.mount(
            source="wasbs://{}@{}.blob.core.windows.net".format(blobContainerName, storageAccountName),
            mount_point=mountPoint,
            extra_configs={'fs.azure.sas.' + blobContainerName + '.' + storageAccountName + '.blob.core.windows.net': sasToken}
        )
        print("Montage réussi !")
    except Exception as e:
        print("Exception lors du montage :", e)
else:
    print("Le point de montage existe déjà.")

# Définition du répertoire d'entrée dans le Data Lake Storage Gen2
raw_data_dir = "abfss://public-transport-data@omardbstorageaccount.dfs.core.windows.net/raw/"

# Définition du répertoire de sortie local
local_output_dir = "/dbfs/mnt/public-transport-data/processed/"

# Liste des mois et des noms de fichiers correspondants
months = ["January", "February", "March", "April", "May"]

# Définition de la taille du lot et de l'intervalle de sommeil en secondes
batch_size = 2
sleep_interval = 30

# Traitement des fichiers par lots de deux
for i in range(0, len(months), batch_size):
    batch = months[i:i+batch_size]  # Get a batch of files
    for month_name in batch:
        # Définition des chemins des fichiers d'entrée et de sortie locaux
        input_file = os.path.join(raw_data_dir, f"public_transport_data_{month_name}.csv")
        local_output_file = os.path.join(local_output_dir, f"public_transport_data_{month_name}_cleaned.csv")

        # Check if the processed file already exists, and if it does, skip processing
        if os.path.exists(local_output_file):
            print(f"Fichier  {local_output_file} déjà traité. Ignorer...")
            continue

        # Lecture des données
        df = spark.read.csv(input_file, header=True, inferSchema=True)

        # Conversion de "ArrivalTime" et "DepartureTime" au format "HH:mm"
        df = df.withColumn("ArrivalTime", date_format(df["ArrivalTime"], "HH:mm"))
        df = df.withColumn("DepartureTime", date_format(df["DepartureTime"], "HH:mm"))

        # Ajout des colonnes Année, Mois, Jour et JourDeLaSemaine
        df = df.withColumn("Year", year(df["Date"]))
        df = df.withColumn("Month", month(df["Date"]))
        df = df.withColumn("Day", dayofmonth(df["Date"]))
        df = df.withColumn("DayOfWeek", dayofweek(df["Date"]))

        # Conversion des colonnes HeureDepart et HeureArrivee en horodatage
        df = df.withColumn("DepartureTimeTimestamp", unix_timestamp(df["DepartureTime"], "HH:mm").cast("timestamp"))
        df = df.withColumn("ArrivalTimeTimestamp", unix_timestamp(df["ArrivalTime"], "HH:mm").cast("timestamp"))

        # Calcul de la durée du trajet en minutes et en heures
        df = df.withColumn("TripDurationMinutes", 
            when(col("ArrivalTimeTimestamp") >= col("DepartureTimeTimestamp"), 
                (col("ArrivalTimeTimestamp").cast("long") - col("DepartureTimeTimestamp").cast("long")) / 60)
            .otherwise(1440 - (col("DepartureTimeTimestamp").cast("long") - col("ArrivalTimeTimestamp").cast("long")) / 60))

        # Suppression des colonnes de timestamp intermédiaires
        df = df.drop("DepartureTimeTimestamp", "ArrivalTimeTimestamp")

        # Ajout de la colonne "CatégorieRetard"
        df = df.withColumn("DelayCategory", 
            when(col("Delay") == 0, "Pas de Retard")
            .when((col("Delay") >= 1) & (col("Delay") <= 10), "Retard Court")
            .when((col("Delay") >= 11) & (col("Delay") <= 20), "Retard Moyen")
            .when(col("Delay") > 20, "Long Retard")
            .otherwise("Unknown"))

        # Analyse des itinéraires 
        route_analysis = df.groupBy("Route").agg(
            F.round(F.avg("Delay"), 2).alias("AverageDelay"),
            F.when((F.avg("Passengers") % 1) >= 0.5, F.ceil(F.avg("Passengers"))).otherwise(F.floor(F.avg("Passengers"))).cast("int").alias("AveragePassengers"),
            F.count("*").alias("TotalTrips")
        )

        df = df.join(route_analysis, on="Route", how="left")

        # Colonne "HeureDePointe"
        threshold = 50
        df = df.withColumn("HeureDePointe", when(col("Passengers") >= threshold, "Peak").otherwise("Off-Peak"))

        data = df.toPandas()
        data.to_csv(local_output_file, index=False)
        print(f"Traitement terminé et enregistré dans {local_output_file}")

    if i + batch_size < len(months):
        print(f"Pausing for {sleep_interval} seconds before the next batch...")
        time.sleep(sleep_interval)

# Démontage du Data Lake
dbutils.fs.unmount(mountPoint)


## Description des Données

L'ensemble de données utilisé dans ce script est destiné à la gestion des données de transports publics. Il comprend des informations sur les départs, les arrivées, les retards, les passagers, les itinéraires, et plus encore, pour plusieurs mois.

## Transformations

Les transformations suivantes sont appliquées aux données brutes :

1. **Conversion des Horaires** : Les colonnes "ArrivalTime" et "DepartureTime" sont converties au format "HH:mm" pour représenter l'heure d'arrivée et de départ respectivement.

2. **Ajout de Colonnes de Date** : Les colonnes "Year", "Month", "Day" et "DayOfWeek" sont ajoutées pour décomposer les dates et faciliter les analyses basées sur le temps.

3. **Conversion en Horodatage** : Les colonnes "DepartureTime" et "ArrivalTime" sont converties en horodatages pour permettre des calculs de durée de trajet précis.

4. **Calcul de la Durée de Trajet** : La colonne "TripDurationMinutes" est calculée en minutes en fonction des horodatages de départ et d'arrivée. Une gestion spéciale est effectuée pour les trajets qui passent minuit.

5. **Catégorisation des Retards** : La colonne "DelayCategory" est ajoutée pour catégoriser les retards en "Pas de Retard," "Retard Court," "Retard Moyen," "Long Retard," ou "Inconnu."

6. **Analyse des Itinéraires** : Une analyse des itinéraires est effectuée pour calculer la moyenne des retards, le nombre moyen de passagers et le nombre total de trajets par itinéraire.

7. **Identification des Heures de Pointe** : La colonne "HeureDePointe" est ajoutée pour identifier les heures de pointe en fonction du nombre de passagers.

## Lignage des Données

Les données sont générées à partir d'un script Python qui simule les données de transport public. Les données générées sont stockées dans le stockage Data Lake Azure à l'emplacement spécifié dans "raw." Elles représentent des informations sur les départs, les arrivées, les retards, les passagers, les itinéraires, etc., pour plusieurs mois. Les sources simulées comprennent différents modes de transport public, tels que les bus, les trains, les tramways et les métros. Ces données sont ensuite traitées à l'aide d'Azure Databricks pour effectuer diverses transformations et analyses.

## Directives d'Utilisation

Les données traitées peuvent être utilisées pour divers cas d'utilisation, notamment :

- Analyse des horaires de transport public.
- Suivi des retards et de la ponctualité.
- Analyse de l'utilisation des itinéraires.
- Identification des heures de pointe.
- Études sur l'impact de la météo sur les transports publics.

Il est important de prendre en compte les catégories de retard pour une meilleure compréhension des performances du système de transport. Il est également important de noter que les données sont générées synthétiquement à des fins de démonstration et de test, ce qui peut entraîner des variations dans les données simulées.

