In [None]:
%spark
println("="*60)
println("TP ZEPPELIN - ANALYSE DES VENTES")
println("Dataset: sales_large.csv (500+ lignes)")
println("="*60)

// Créer la session Spark
val spark = org.apache.spark.sql.SparkSession.builder()
    .appName("TP_Analyse_Ventes_Large")
    .config("spark.executor.memory", "1g")
    .config("spark.driver.memory", "1g")
    .getOrCreate()

// Imports nécessaires
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

spark.sparkContext.setLogLevel("WARN")
println("Session Spark initialisée")

In [1]:
%spark
// Définir le chemin HDFS
val hdfs_path = "hdfs:///user/zeppelin/sales_data/"

// Lister les fichiers disponibles
println("Fichiers disponibles dans HDFS:")
try {
    val fs = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val files = fs.listStatus(new org.apache.hadoop.fs.Path(hdfs_path))
    files.foreach(f => println(s"  - ${f.getPath.getName}"))
} catch {
    case e: Exception => println(s"Erreur: ${e.getMessage}")
}

// Charger le premier fichier CSV trouvé
val df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv(hdfs_path + "*")

println(s"\n Dataset chargé avec succès!")
println(s"   Nombre de lignes: ${df.count()}")
println(s"   Nombre de colonnes: ${df.columns.length}")
println("\n Schéma des données:")
df.printSchema()
println("\n Aperçu des données:")
df.show(5, truncate=false)

In [2]:
%spark
println("Colonnes disponibles:")
df.columns.foreach(println)

// Vérifier si nous avons les colonnes nécessaires
val hasTotalColumn = df.columns.contains("Total")
val hasDateColumn = df.columns.contains("Date")

println(s"\nVérifications:")
println(s"Colonne 'Total' existe? $hasTotalColumn")
println(s"Colonne 'Date' existe? $hasDateColumn")

// Ajouter la colonne Total si elle n'existe pas
val df_with_total = if (!hasTotalColumn && df.columns.contains("Price") && df.columns.contains("Quantity")) {
    println("Ajout de la colonne Total = Price * Quantity")
    df.withColumn("Total", round($"Price" * $"Quantity", 2))
} else if (hasTotalColumn) {
    println("Colonne 'Total' déjà présente")
    df
} else {
    println("Impossible de calculer Total: colonnes Price et/ou Quantity manquantes")
    df.withColumn("Total", lit(0.0))
}

// Convertir et organiser les dates si elles existent
val df_final = if (hasDateColumn) {
    println("Transformation des dates...")
    df_with_total
        .withColumn("OrderDate", to_date($"Date", "yyyy-MM-dd"))
        .withColumn("Month", month($"OrderDate"))
        .withColumn("Year", year($"OrderDate"))
        .withColumn("MonthYear", date_format($"OrderDate", "yyyy-MM"))
        .drop("Date")  // Supprimer la colonne Date originale pour éviter la duplication
} else {
    println("Colonne 'Date' non trouvée, utilisation de dates par défaut")
    df_with_total
        .withColumn("OrderDate", current_date())
        .withColumn("Month", month($"OrderDate"))
        .withColumn("Year", year($"OrderDate"))
        .withColumn("MonthYear", date_format($"OrderDate", "yyyy-MM"))
}

println(s"\n Données préparées (${df_final.count()} lignes):")
println(s"Colonnes finales: ${df_final.columns.mkString(", ")}")

// Afficher le schéma
println("\n Schéma final:")
df_final.printSchema()

println("\n Aperçu des données (format amélioré):")
df_final.select("OrderID", "Product", "Category", "Price", "Quantity", "Total", "OrderDate", "Region")
    .show(5, truncate=false)

In [3]:
%spark
println("="*60)
println("ANALYSE EXPLORATOIRE DES DONNÉES")
println("="*60)

// 1. Statistiques générales
println("STATISTIQUES GÉNÉRALES")
println("-"*40)

val totalCA = df_final.select(sum("Total")).first().getDouble(0)
val avgCA = df_final.select(avg("Total")).first().getDouble(0)
val totalProfit = df_final.select(sum("Profit")).first().getDouble(0)
val totalOrders = df_final.select(countDistinct("OrderID")).first().getLong(0)
val totalCustomers = df_final.select(countDistinct("CustomerID")).first().getLong(0)
val marginPercentage = (totalProfit / totalCA * 100)

// Créer un DataFrame pour visualisation des KPI
val kpiDF = Seq(
  ("Chiffre d'Affaires", totalCA),
  ("Profit Total", totalProfit),
  ("Panier Moyen", avgCA),
  ("Marge (%)", marginPercentage)
).toDF("KPI", "Valeur")

println(f"Chiffre d'Affaires Total: $totalCA%.2f €")
println(f"Profit Total: $totalProfit%.2f €")
println(f"Panier Moyen: $avgCA%.2f €")
println(s"Nombre de Commandes: $totalOrders")
println(s"Nombre de Clients Uniques: $totalCustomers")
println(f"Marge Moyenne: $marginPercentage%.1f%%")

// VISUALISATION 1: Graphique à barres des KPI
println("\n VISUALISATION 1: KPI Principaux")
z.show(kpiDF)

// 2. Statistiques par colonnes numériques
println("\n STATISTIQUES DES COLONNES NUMÉRIQUES:")

val statsDF = df_final.select("Price", "Quantity", "Total", "Profit")
  .describe()

statsDF.show()

// VISUALISATION 2: Distribution des prix
println("\n VISUALISATION 2: Distribution des Prix")
val priceDistribution = df_final
  .select("Price")
  .withColumn("Price_Range", 
    when($"Price" < 50, "0-50€")
    .when($"Price" < 100, "50-100€")
    .when($"Price" < 200, "100-200€")
    .when($"Price" < 500, "200-500€")
    .otherwise("500€+")
  )
  .groupBy("Price_Range")
  .agg(count("*").alias("Nombre_Produits"))
  .orderBy("Price_Range")

z.show(priceDistribution)

// VISUALISATION 3: Histogramme des quantités
println("\n VISUALISATION 3: Distribution des Quantités")
val quantityDist = df_final
  .groupBy("Quantity")
  .agg(count("*").alias("Nombre_Commandes"))
  .orderBy("Quantity")

z.show(quantityDist)

// VISUALISATION 4: Nuage de points Prix vs Profit
println("\n VISUALISATION 4: Relation Prix vs Profit")
val sampleForScatter = df_final
  .select("Price", "Profit", "Category")
  .limit(100)  // Limiter pour une meilleure visualisation

z.show(sampleForScatter)

// VISUALISATION 5: Boîte à moustaches des métriques
println("\n VISUALISATION 5: Comparaison des Métriques")
val metricsComparison = Seq(
  ("Prix Moyen", df_final.select(avg("Price")).first().getDouble(0)),
  ("Quantité Moyenne", df_final.select(avg("Quantity")).first().getDouble(0)),
  ("Total Moyen", df_final.select(avg("Total")).first().getDouble(0)),
  ("Profit Moyen", df_final.select(avg("Profit")).first().getDouble(0))
).toDF("Métrique", "Valeur")

z.show(metricsComparison)

In [4]:
%spark
println("DISTRIBUTION PAR CATÉGORIE")
println("="*50)

// Analyse par catégorie
val categoryAnalysis = df_final
  .groupBy("Category")
  .agg(
    count("OrderID").alias("Nb_Commandes"),
    sum("Total").alias("CA_Categorie"),
    sum("Profit").alias("Profit_Categorie"),
    sum("Quantity").alias("Quantite_Vendue"),
    avg("Total").alias("Panier_Moyen"),
    (sum("Profit") / sum("Total") * 100).alias("Marge_Pourcentage")
  )
  .orderBy(desc("CA_Categorie"))

println("Tableau d'analyse par catégorie:")
categoryAnalysis.show()

// VISUALISATION 1: Camembert des catégories par CA
println("\n VISUALISATION 1: Répartition du CA par Catégorie")
z.show(categoryAnalysis.select("Category", "CA_Categorie"))

// VISUALISATION 2: Barres groupées (CA vs Profit)
println("\n VISUALISATION 2: CA et Profit par Catégorie")
val caProfitByCategory = categoryAnalysis
  .select("Category", "CA_Categorie", "Profit_Categorie")
  .withColumn("CA_K", $"CA_Categorie" / 1000)
  .withColumn("Profit_K", $"Profit_Categorie" / 1000)

z.show(caProfitByCategory.select("Category", "CA_K", "Profit_K"))

// VISUALISATION 3: Graphique à barres empilées
println("\n VISUALISATION 3: Quantité Vendue par Catégorie")
z.show(categoryAnalysis.select("Category", "Quantite_Vendue"))

// VISUALISATION 4: Graphique en radar (marge et panier moyen)
println("\n VISUALISATION 4: Performance des Catégories")
val performanceMatrix = categoryAnalysis
  .select("Category", "Marge_Pourcentage", "Panier_Moyen")
  .withColumn("Marge_Normalisee", $"Marge_Pourcentage" / 100)
  .withColumn("Panier_Normalise", $"Panier_Moyen" / 1000)

z.show(performanceMatrix.select("Category", "Marge_Normalisee", "Panier_Normalise"))

// VISUALISATION 5: Treemap hiérarchique
println("\n VISUALISATION 5: Treemap des Catégories")
val treemapData = categoryAnalysis
  .select("Category", "CA_Categorie", "Nb_Commandes")
  .withColumn("CA_Per_Commande", $"CA_Categorie" / $"Nb_Commandes")

z.show(treemapData)

In [5]:
%spark
println("TOP 10 PRODUITS")
println("="*50)

// Importer Window pour les fonctions de fenêtrage
import org.apache.spark.sql.expressions.Window

val topProducts = df_final
  .groupBy("Product", "Category")
  .agg(
    sum("Total").alias("CA_Produit"),
    sum("Quantity").alias("Quantite_Vendue"),
    sum("Profit").alias("Profit_Produit"),
    countDistinct("OrderID").alias("Nb_Commandes"),
    avg("Price").alias("Prix_Moyen")
  )
  .withColumn("Marge_Produit", ($"Profit_Produit" / $"CA_Produit") * 100)
  .orderBy(desc("CA_Produit"))
  .limit(10)

println("Top 10 produits par Chiffre d'Affaires:")
topProducts.select("Product", "Category", "CA_Produit", "Quantite_Vendue", "Marge_Produit")
  .show(false)

// VISUALISATION 1: Top produits par CA (barres horizontales)
println("\n VISUALISATION 1: Top 10 Produits par CA")
val topProductsViz = topProducts
  .select("Product", "CA_Produit")
  .withColumn("CA_K", round($"CA_Produit" / 1000, 2))
  .orderBy(desc("CA_K"))

z.show(topProductsViz)

// VISUALISATION 2: Graphique bulles (CA vs Quantité vs Marge)
println("\n VISUALISATION 2: Graphique Bulles - CA vs Quantité")
val bubbleData = topProducts
  .select("Product", "CA_Produit", "Quantite_Vendue", "Marge_Produit")
  .withColumn("Taille_Bulle", $"Quantite_Vendue" * 10)
  .withColumn("CA_K", $"CA_Produit" / 1000)

z.show(bubbleData.select("Product", "CA_K", "Quantite_Vendue", "Taille_Bulle"))

// VISUALISATION 3: Heatmap par catégorie et produit
println("\n VISUALISATION 3: Heatmap CA des Produits")
val heatmapData = topProducts
  .groupBy("Category")
  .pivot("Product")
  .agg(first("CA_Produit"))
  .na.fill(0)

z.show(heatmapData)

// VISUALISATION 4: Graphique en cascade (cumul CA)
println("\n VISUALISATION 4: Cascade du CA par Produit")
val windowSpec = Window.orderBy(desc("CA_Produit"))
val waterfallData = topProducts
  .select("Product", "CA_Produit")
  .withColumn("CA_Cumul", sum("CA_Produit").over(windowSpec))
  .orderBy(desc("CA_Produit"))

z.show(waterfallData.select("Product", "CA_Produit", "CA_Cumul"))

// VISUALISATION 5: Graphique combiné (barres + ligne)
println("\n VISUALISATION 5: CA et Marge des Top Produits")
val combinedData = topProducts
  .select("Product", "CA_Produit", "Marge_Produit")
  .withColumn("CA_Normalise", $"CA_Produit" / 1000)
  .orderBy(desc("CA_Normalise"))

z.show(combinedData.select("Product", "CA_Normalise", "Marge_Produit"))

// VISUALISATION 6: Graphique radar pour comparaison multi-critères
println("\n VISUALISATION 6: Radar Chart - Comparaison Multi-Critères")
val radarData = topProducts
  .select("Product", "CA_Produit", "Quantite_Vendue", "Marge_Produit", "Nb_Commandes")
  .withColumn("CA_Score", ($"CA_Produit" / 10000).cast("double"))
  .withColumn("Quantite_Score", ($"Quantite_Vendue" / 10).cast("double"))
  .withColumn("Marge_Score", ($"Marge_Produit" / 100).cast("double"))
  .withColumn("Commandes_Score", ($"Nb_Commandes" / 5).cast("double"))

z.show(radarData.select("Product", "CA_Score", "Quantite_Score", "Marge_Score", "Commandes_Score"))

// VISUALISATION 7: Treemap hiérarchique
println("\n VISUALISATION 7: Treemap Hiérarchique")
val treemapData = topProducts
  .select("Category", "Product", "CA_Produit", "Marge_Produit")
  .withColumn("Taille", log($"CA_Produit" + 1))

z.show(treemapData.select("Category", "Product", "CA_Produit", "Taille"))

In [6]:
%spark
println("PRODUITS LES PLUS RENTABLES")
println("="*50)

val profitableProducts = df_final
  .groupBy("Product", "Category")
  .agg(
    (sum("Profit") / sum("Total") * 100).alias("Marge_Pourcentage"),
    sum("Profit").alias("Profit_Total"),
    sum("Total").alias("CA_Total"),
    sum("Quantity").alias("Quantite_Vendue")
  )
  .filter($"Marge_Pourcentage" > 0)
  .orderBy(desc("Marge_Pourcentage"))
  .limit(10)

println("Top 10 produits par rentabilité:")
profitableProducts.show()

// VISUALISATION 1: Graphique à barres des marges
println("\n VISUALISATION 1: Marges par Produit")
z.show(profitableProducts.select("Product", "Marge_Pourcentage"))

// VISUALISATION 2: Nuage de points Marge vs CA
println("\n VISUALISATION 2: Marge vs Chiffre d'Affaires")
val scatterData = profitableProducts
  .select("Product", "Marge_Pourcentage", "CA_Total", "Category")
  .withColumn("CA_K", $"CA_Total" / 1000)

z.show(scatterData.select("Product", "Marge_Pourcentage", "CA_K", "Category"))

// VISUALISATION 3: Graphique en radar comparatif
println("\n VISUALISATION 3: Comparaison Multi-Critères")
val radarData = profitableProducts
  .select("Product", "Marge_Pourcentage", "CA_Total", "Quantite_Vendue")
  .withColumn("Marge_Norm", $"Marge_Pourcentage" / 100)
  .withColumn("CA_Norm", $"CA_Total" / 10000)
  .withColumn("Qte_Norm", $"Quantite_Vendue" / 10)

z.show(radarData.select("Product", "Marge_Norm", "CA_Norm", "Qte_Norm"))

In [7]:
%spark
