#1/ loading data from API (EXTRACTION) in Tables

In [0]:
%python
from pyspark.sql.types import *

#  Définir le schéma attendu (facultatif si tu veux forcer un schéma précis)
schema = StructType([
    StructField("id", IntegerType()),
    StructField("name", StringType()),
    StructField("username", StringType()),
    StructField("email", StringType()),
    StructField("address", StructType([
        StructField("street", StringType()),
        StructField("suite", StringType()),
        StructField("city", StringType()),
        StructField("zipcode", StringType()),
        StructField("geo", StructType([
            StructField("lat", StringType()),
            StructField("lng", StringType())
        ]))
    ])),
    StructField("phone", StringType()),
    StructField("website", StringType()),
    StructField("company", StructType([
        StructField("name", StringType()),
        StructField("catchPhrase", StringType()),
        StructField("bs", StringType())
    ]))
])

#  Charger la table users existante
df = spark.sql("SELECT * FROM `workspace`.`default`.`users`")

#  Afficher un aperçu
print("=== Contenu de la table users ===")
df.show(truncate=False)

#  Supprimer la table bronze si elle existe déjà
spark.sql("DROP TABLE IF EXISTS user_bronze")

#  Écrire le DataFrame dans une table Delta (user_bronze)
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("user_bronze")

#  Vérifier le contenu de la table user_bronze
print("=== Contenu de la table user_bronze ===")
spark.sql("SELECT * FROM user_bronze").show(truncate=False)


#2/ Silver data: clean the data and anonymize the table

In [0]:
import pyspark.sql.functions as F

# Lire depuis la table bronze
df = spark.read.table("user_bronze")

# Transform pour silver (extraire city avant, puis concaténer address, gérer noms avec sécurité)
silver_df = df \
    .withColumn("firstname", F.split(F.col("name"), " ").getItem(0)) \
    .withColumn("lastname", F.when(F.size(F.split(F.col("name"), " ")) > 1, F.split(F.col("name"), " ").getItem(F.size(F.split(F.col("name"), " ")) - 1)).otherwise(F.lit(None))) \
    .withColumn("city", F.col("address.city")) \
    .withColumn("address", F.concat_ws(", ", F.col("address.street"), F.col("address.suite"), F.col("address.zipcode"))) \
    .withColumn("email", F.sha1(F.col("email"))) \
    .withColumn("creation_date", F.current_timestamp()) \
    .withColumn("last_activity_date", F.current_timestamp()) \
    .select("id", "email", "firstname", "lastname", "address", "city", "creation_date", "last_activity_date")

# Supprimer la table silver si elle existe déjà
spark.sql("DROP TABLE IF EXISTS user_silver")

# Ingest dans silver sans créer la table manuellement
silver_df.write.format("delta").mode("overwrite").saveAsTable("user_silver")

In [0]:
spark.read.table("user_bronze").printSchema()

#3/ ADD UAND JOIN TABLE IN GOLD

In [0]:
import pyspark.sql.functions as F

# Lire depuis silver
silver_df = spark.read.table("user_silver")

# Créer table gold vide pour raoujouter des table supplémentaire
spark.sql("DROP TABLE IF EXISTS user_gold")

# Ingest dans gold avec un exemple de filtre (supprimer anciens enregistrements)
silver_df.filter(F.col("creation_date") >= "2025-09-01") \
    .write.format("delta").mode("overwrite").saveAsTable("user_gold")