In [0]:
# Sélectionner ta base de données (exemple : default)
spark.sql("USE default")

# Lister les tables disponibles dans la base
spark.sql("SHOW TABLES").show()

# Récupération de la table raw
df1 = spark.table("default.clean_table")

In [0]:
# identifier les 10 zones de départ les plus fréquentées chaque mois
spark.sql("""
    CREATE TABLE IF NOT EXISTS default.Top_10_zones_depart AS
    WITH trajets_par_mois AS (
        SELECT 
            YEAR(tpep_pickup_datetime) AS annee,
            MONTH(tpep_pickup_datetime) AS mois,
            PULocationID,
            COUNT(*) AS nb_trajets
        FROM default.clean_table
        GROUP BY annee, mois, PULocationID
    ),
    classement AS (
        SELECT 
            annee,
            mois,
            PULocationID,
            nb_trajets,
            RANK() OVER (PARTITION BY annee, mois ORDER BY nb_trajets DESC) AS rang
        FROM trajets_par_mois
    )
    SELECT *
    FROM classement
    WHERE rang <= 10
    ORDER BY annee, mois, rang
""")
display(spark.table("default.Top_10_zones_depart"))

In [0]:
# calculer la durée moyenne des trajets par mois

df=spark.sql("""
            CREATE TABLE IF NOT EXISTS default.temps_moyen_trajet_mois
           SELECT concat(year(tpep_pickup_datetime), '-', month(tpep_pickup_datetime)) as datemonth,
           round(avg(UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime)), 2) AS duree
           FROM default.clean_table
           GROUP BY datemonth
           ORDER BY datemonth""")

df.show()

In [0]:
# déterminer la distance moyenne par type de paiement

spark.sql("""
    CREATE TABLE IF NOT EXISTS default.distance_moyenne_par_type_paiement AS
        SELECT payment_type, avg(trip_distance) as distance_moyenne
        FROM default.clean_table
        GROUP BY payment_type
        ORDER BY distance_moyenne""")
display(spark.table("default.distance_moyenne_par_type_paiement"))


In [0]:
# estimer le montant moyen des courses en fonction du nombre de passagers
spark.sql("""
        DROP TABLE IF EXISTS default.montant_moyen_nb_passager""")
spark.sql(""" 
        CREATE TABLE IF NOT EXISTS default.montant_moyen_nb_passager AS
        SELECT round(avg(total_amount), 2) as total_amount, passenger_count
        FROM default.clean_table
        GROUP BY passenger_count
        ORDER BY passenger_count;""")
display(spark.table("default.montant_moyen_nb_passager"))



In [0]:
# mesurer la somme totale des pourboires versés chaque moisspark.sql("""
spark.sql("""
        DROP TABLE IF EXISTS default.somme_pourboire_mois""")
spark.sql(""" 
        CREATE TABLE IF NOT EXISTS default.somme_pourboire_mois AS
        SELECT round(SUM(tip_amount), 2) as tip_amount, concat(year(tpep_pickup_datetime), '-', month(tpep_pickup_datetime)) as month_year
        FROM default.clean_table
        GROUP BY concat(year(tpep_pickup_datetime), '-', month(tpep_pickup_datetime))
        ORDER BY concat(year(tpep_pickup_datetime), '-', month(tpep_pickup_datetime));""")
display(spark.table("default.somme_pourboire_mois"))


In [0]:
import os

jdbc_username = os.environ.get("LOGIN")
jdbc_password = os.environ.get("PASSWORD")

print("Utilisateur connecté :", jdbc_username)

jdbc_url = "jdbc:sqlserver://cacfsql.database.windows.net:1433;database=brief"

connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [0]:
# Écrire un DataFrame dans une table d’une base de données via JDBC
tables = ["somme_pourboire_mois", "distance_moyenne_par_type_paiement", "temps_moyen_trajet_mois", "montant_moyen_nb_passager", "top_10_zones_depart"]

for table in tables : 
    df = spark.table("default."+table)
    df.write.mode("overwrite").jdbc(
        url=jdbc_url,
        table="lr_"+table,
        properties={
            "user": jdbc_username,
            "password": jdbc_password,
            "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        }
)