In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, max, min, lit

In [None]:
spark = SparkSession.builder.appName("Exercises Modeles AutoHeberge").getOrCreate()

In [None]:
# --- Exercice 1: Analyse des ventes ---
pathCSVVentes = "/Users/ayaelyaagoubi/Downloads/ventes.csv"
ventes = spark.read.csv(pathCSVVentes, header=True, inferSchema=True)
ventes = ventes.withColumn("Chiffre_d_affaires", col("Quantité") * col("Prix_unitaire"))
chiffre_affaires_total = ventes.agg(sum("Chiffre_d_affaires").alias("Chiffre_affaires_total")).collect()[0][0]

produit_plus_vendu = ventes.groupBy("Produit").sum("Quantité").withColumnRenamed("sum(Quantité)", "Total_Quantité") \
    .orderBy(col("Total_Quantité").desc()).first()

print(f"Chiffre d'affaires total : {chiffre_affaires_total} ")
print(f"Produit le plus vendu : {produit_plus_vendu['Produit']} ({produit_plus_vendu['Total_Quantité']} unités)")


In [None]:
# --- Exercice 2: Analyse des utilisateurs ---
pathJSONUtilisateurs =  "/Users/ayaelyaagoubi/Downloads/utilisateurs.json"
utilisateurs = spark.read.json("utilisateurs.json")

age_moyen = utilisateurs.select(avg("âge").alias("Âge_moyen")).collect()[0][0]

utilisateurs_par_ville = utilisateurs.groupBy("ville").count()

plus_jeune_utilisateur = utilisateurs.orderBy(col("âge").asc()).first()

print(f"Âge moyen : {age_moyen} ans")
utilisateurs_par_ville.show()
print(f"Plus jeune utilisateur : {plus_jeune_utilisateur['nom']} ({plus_jeune_utilisateur['âge']} ans)")


In [None]:
# --- Exercice 3: Nettoyage de données ---
pathCSVClients = "/Users/ayaelyaagoubi/Downloads/clients.csv"
clients = spark.read.csv("clients.csv", header=True, inferSchema=True)

age_moyen = clients.select(mean("Âge").alias("moyenne_age")).collect()[0][0]

clients = clients.fillna({"Âge": age_moyen})

clients = clients.fillna({"Ville": "Inconnue"})

clients = clients.filter(col("Revenu").isNotNull())

clients.show()

In [None]:
# --- Exercice 4: Produits les plus chers ---
pathCSVProduits = "/Users/ayaelyaagoubi/Downloads/produits.csv"
produits = spark.read.csv(pathCSVProduits, header=True, inferSchema=True)

produits_plus_chers = produits.orderBy(col("Prix").desc()).limit(3)

produits_plus_chers.show()


In [None]:
# --- Exercice 5: Analyse des transactions ---
pathCSVTransaction= "/Users/ayaelyaagoubi/Downloads/transactions.csv"

transactions = spark.read.csv(pathCSVTransaction, header=True, inferSchema=True)

depenses_totales = transactions.groupBy("Client").sum("Montant").withColumnRenamed("sum(Montant)", "Dépenses_totales")

client_plus_depense = depenses_totales.orderBy(col("Dépenses_totales").desc()).first()

depenses_totales.show()
print(f"Client ayant dépensé le plus : {client_plus_depense['Client']} ({client_plus_depense['Dépenses_totales']} €)")

In [None]:
# --- Exercice 6: Agrégation de données par catégorie ---
pathCSVProduits = "/Users/ayaelyaagoubi/Downloads/produits.csv"

produits = spark.read.csv(pathCSVProduits, header=True, inferSchema=True)

statistiques_categorie = produits.groupBy("Catégorie").agg(
    avg("Prix").alias("Prix_moyen"),
    sum("Prix").alias("Prix_total")
)

statistiques_categorie.show()