# üèóÔ∏è Atelier Data Lake - Pipeline Bronze ‚Üí Silver ‚Üí Gold

Ce notebook vous guide √† travers l'architecture Medallion du Data Lake.


In [None]:
import sys
sys.path.append('..')

from src.spark_session import get_spark_session
from config.settings import SOURCES, BRONZE_PATH, SILVER_PATH, GOLD_PATH
from pyspark.sql.functions import lit, current_timestamp, explode, col, trim, upper, lower, regexp_replace, coalesce, when, to_date

spark = get_spark_session("DataLake_Pipeline")
print("‚úÖ Session Spark cr√©√©e!")


## ü•â √âtape 1: Ingestion vers Bronze

La couche Bronze contient les donn√©es brutes, telles qu'elles arrivent des sources.


In [None]:
# Ingestion Clients CSV ‚Üí Bronze
clients_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(str(SOURCES["csv"] / "clients.csv"))
    .withColumn("_source_file", lit("clients.csv"))
    .withColumn("_ingestion_timestamp", current_timestamp())
    .withColumn("_source_type", lit("csv"))
)

clients_df.write.mode("overwrite").parquet(str(BRONZE_PATH / "clients"))
print(f"‚úÖ Clients ing√©r√©s: {clients_df.count()} enregistrements")


In [None]:
# Ingestion Produits CSV ‚Üí Bronze
produits_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(str(SOURCES["csv"] / "produits.csv"))
    .withColumn("_source_file", lit("produits.csv"))
    .withColumn("_ingestion_timestamp", current_timestamp())
    .withColumn("_source_type", lit("csv"))
)

produits_df.write.mode("overwrite").parquet(str(BRONZE_PATH / "produits"))
print(f"‚úÖ Produits ing√©r√©s: {produits_df.count()} enregistrements")


In [None]:
# Ingestion Commandes JSON ‚Üí Bronze
commandes_raw = spark.read.option("multiline", True).json(str(SOURCES["json"] / "commandes.json"))
commandes_df = (
    commandes_raw
    .select(explode("commandes").alias("c"))
    .select("c.*")
    .withColumn("_source_file", lit("commandes.json"))
    .withColumn("_ingestion_timestamp", current_timestamp())
    .withColumn("_source_type", lit("json"))
)

commandes_df.write.mode("overwrite").parquet(str(BRONZE_PATH / "commandes"))
print(f"‚úÖ Commandes ing√©r√©es: {commandes_df.count()} enregistrements")


## ü•à √âtape 2: Transformation Bronze ‚Üí Silver

La couche Silver contient les donn√©es nettoy√©es et standardis√©es.


In [None]:
# Transformation Clients Bronze ‚Üí Silver
bronze_clients = spark.read.parquet(str(BRONZE_PATH / "clients"))

silver_clients = (
    bronze_clients
    .withColumn("nom", trim(upper(col("nom"))))
    .withColumn("prenom", trim(upper(col("prenom"))))
    .withColumn("email", trim(lower(col("email"))))
    .withColumn("telephone", regexp_replace(col("telephone"), "[^0-9]", ""))
    .withColumn("ville", coalesce(col("ville"), lit("INCONNU")))
    .dropDuplicates(["email"])
    .withColumn("_silver_timestamp", current_timestamp())
)

silver_clients.write.mode("overwrite").parquet(str(SILVER_PATH / "clients"))
print(f"‚úÖ Clients Silver: {silver_clients.count()} enregistrements")
silver_clients.show(3)


In [None]:
# Transformation Produits Bronze ‚Üí Silver
bronze_produits = spark.read.parquet(str(BRONZE_PATH / "produits"))

silver_produits = (
    bronze_produits
    .withColumn("nom_produit", trim(col("nom_produit")))
    .withColumn("categorie", trim(upper(col("categorie"))))
    .withColumn("prix", when(col("prix") < 0, 0).otherwise(col("prix")))
    .withColumn("stock", when(col("stock") < 0, 0).otherwise(col("stock")))
    .dropDuplicates(["produit_id"])
    .withColumn("_silver_timestamp", current_timestamp())
)

silver_produits.write.mode("overwrite").parquet(str(SILVER_PATH / "produits"))
print(f"‚úÖ Produits Silver: {silver_produits.count()} enregistrements")


In [None]:
# Transformation Commandes Bronze ‚Üí Silver
bronze_commandes = spark.read.parquet(str(BRONZE_PATH / "commandes"))

silver_commandes = (
    bronze_commandes
    .withColumn("date_commande", to_date(col("date_commande")))
    .withColumn("montant_total", when(col("montant_total") < 0, 0).otherwise(col("montant_total")))
    .withColumn("statut", upper(trim(col("statut"))))
    .dropDuplicates(["commande_id"])
    .withColumn("_silver_timestamp", current_timestamp())
)

silver_commandes.write.mode("overwrite").parquet(str(SILVER_PATH / "commandes"))
print(f"‚úÖ Commandes Silver: {silver_commandes.count()} enregistrements")


## ü•á √âtape 3: Agr√©gation Silver ‚Üí Gold

La couche Gold contient les donn√©es agr√©g√©es pr√™tes pour l'analyse business.


In [None]:
from pyspark.sql.functions import sum, avg, count, max, min, round as spark_round, year, month

# Gold: Ventes par Client
silver_clients = spark.read.parquet(str(SILVER_PATH / "clients"))
silver_commandes = spark.read.parquet(str(SILVER_PATH / "commandes"))

ventes_par_client = (
    silver_commandes
    .groupBy("client_id")
    .agg(
        count("commande_id").alias("nombre_commandes"),
        sum("montant_total").alias("total_achats"),
        avg("montant_total").alias("panier_moyen"),
        max("date_commande").alias("derniere_commande"),
        min("date_commande").alias("premiere_commande"),
    )
)

gold_ventes_client = (
    silver_clients
    .join(ventes_par_client, "client_id", "left")
    .select(
        "client_id", "nom", "prenom", "email", "ville",
        "nombre_commandes", "total_achats",
        spark_round("panier_moyen", 2).alias("panier_moyen"),
        "premiere_commande", "derniere_commande",
    )
    .withColumn("_gold_timestamp", current_timestamp())
)

gold_ventes_client.write.mode("overwrite").parquet(str(GOLD_PATH / "ventes_par_client"))
print(f"‚úÖ Gold Ventes par Client: {gold_ventes_client.count()} enregistrements")
gold_ventes_client.orderBy(col("total_achats").desc()).show(10)


In [None]:
# Gold: KPIs Mensuels
gold_kpi = (
    silver_commandes
    .withColumn("annee", year("date_commande"))
    .withColumn("mois", month("date_commande"))
    .groupBy("annee", "mois")
    .agg(
        count("commande_id").alias("nombre_commandes"),
        sum("montant_total").alias("chiffre_affaires"),
        avg("montant_total").alias("panier_moyen"),
        sum("quantite").alias("articles_vendus"),
    )
    .withColumn("chiffre_affaires", spark_round("chiffre_affaires", 2))
    .withColumn("panier_moyen", spark_round("panier_moyen", 2))
    .orderBy("annee", "mois")
    .withColumn("_gold_timestamp", current_timestamp())
)

gold_kpi.write.mode("overwrite").parquet(str(GOLD_PATH / "kpi_mensuel"))
print("‚úÖ Gold KPIs Mensuels cr√©√©s!")
gold_kpi.show()


In [None]:
# Fermeture de la session
spark.stop()
print("‚úÖ Pipeline termin√©! Session Spark ferm√©e.")
