In [None]:
spark

## Import librairies

In [114]:
import os
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

## DATAFRAME OPERATIONS

### Functions Utility

In [115]:
def load_table(table_name: str, db_name=None):
    """Load a table from the specified database.

    Args:
        table_name (str): The name of the table to load.
        db_name (str, optional): The name of the database. Defaults to None.

    Returns:
        DataFrame: The loaded table as a DataFrame.
    """

    if db_name:
        table = db_name + "." + table_name
    else:
        table = "default" + "." + table_name

    try:
        return spark.read.table(table)
    except Exception as e:
        print(f"Error loading data: {e}")
        print(f"Please ensure the {table_name} table exist.")

### DataFrame - Cas Pratique 1 - Schéma de quelques tables

In [116]:
contoso_dataset_path = "abfss://dlsfssynwformwtwfrctrl@dlsaccsynwformwtwfrctrl.dfs.core.windows.net/synapse/workspaces/synw-formwtw-frctrl/warehouse/dataset/contoso/"

dim_customer_path = os.path.join(contoso_dataset_path, "DimCustomer.csv")
dim_product_path   = os.path.join(contoso_dataset_path, "DimProduct.csv")
dim_store_path   = os.path.join(contoso_dataset_path, "DimStore.csv")
dim_factSales_path   = os.path.join(contoso_dataset_path, "FactOnlineSales.csv")

In [117]:
%%pyspark

try:
    df_dim_customer = spark.read.csv(dim_customer_path, header=True, inferSchema=True)
    df_dim_product = spark.read.csv(dim_product_path, header=True, inferSchema=True)
    df_dim_store = spark.read.csv(dim_store_path, header=True, inferSchema=True)
    df_factSales = spark.read.csv(dim_factSales_path, header=True, inferSchema=True)
except Exception as e:
    print(f"Error loading data: {e}")
    print("Please ensure the data_path is correct and the CSV files exist.")
    # spark.stop()

In [118]:
df_dim_customer.printSchema()
display(df_dim_customer.limit(5))

In [119]:
df_dim_product.printSchema()
display(df_dim_product.limit(5))

In [120]:
df_dim_store.printSchema()
display(df_dim_store.limit(5))

In [121]:
df_factSales.printSchema()
display(df_factSales.limit(5))

In [122]:
# Create Tables

try:
    
    df_dim_customer.createOrReplaceTempView("table_customer")

    df_dim_product.createOrReplaceTempView("table_product")

    df_dim_store.createOrReplaceTempView("table_store")

    df_factSales.createOrReplaceTempView("table_fact_sales")

except Exception as e:
    print(f"Error creating table: {e}")
    print("Please ensure the dataframe is already created and available in memory.")

In [123]:
# REQUETES SPARK SQL

display(spark.sql("SELECT * FROM table_customer").limit(3))

display(spark.sql("SELECT FirstName, LastName, NumberChildrenAtHome, Education FROM table_customer").limit(5))

#### Jointures

In [124]:
# Jointure df_factSales avec df_dim_customer (INNER JOIN)
sales_with_customers = df_factSales.join(df_dim_customer, on="CustomerKey", how="inner")
display(sales_with_customers.select("OnlineSalesKey", "FirstName", "LastName", "ProductKey", "TotalCost").limit(20))

In [125]:
# Variante : LEFT JOIN (pour garder toutes les ventes même sans client connu)
sales_with_customers = df_factSales.join(df_dim_customer, on="CustomerKey", how="left")
display(sales_with_customers.limit(10))

In [126]:
# Jointure df_factSales avec product_df (INNER JOIN)

# Pour analyser les ventes par produit, catégorie, prix

sales_with_products = df_factSales.join(df_dim_product, on="ProductKey", how="inner")
sales_with_products.columns

# ########### sales_with_products.select("OnlineSalesKey", "ProductName", "ProductKey", "UnitPrice", "TotalCost").show()

In [127]:
# Jointure df_factSales avec df_dim_store (INNER JOIN)

sales_with_store = df_factSales.join(df_dim_store, on="StoreKey", how="inner")

sales_with_store.select("OnlineSalesKey", "StoreName", "GeographyKey", "TotalCost").show()

In [128]:
# Chaînage : df_factSales + df_dim_product + df_dim_customer + df_dim_store
full_sales = df_factSales \
    .join(df_dim_product, on="ProductKey", how="inner") \
    .join(df_dim_customer, on="CustomerKey", how="inner") \
    .join(df_dim_store, on="StoreKey", how="inner")

full_sales.select("OnlineSalesKey", "FirstName", "ProductKey", "StoreName", "TotalCost").show()


### Cas Pratique 2 - Ingestion + Lecture des données dans l’ADLS

In [129]:
%%pyspark

try:
    df_dim_customer.cache()
    df_dim_product.cache()
    df_dim_store.cache()
    df_factSales.cache()
except Exception as e:
    print(f"Error caching dataframe: {e}")
    print("Please ensure the dataframe exist in memory.")
    # spark.stop()

In [130]:
# Créer differentes tables
try:
    
    df_dim_customer.createOrReplaceTempView("table_customer")

    df_dim_product.createOrReplaceTempView("table_product")

    df_dim_store.createOrReplaceTempView("table_store")

    df_factSales.createOrReplaceTempView("table_fact_sales")

except Exception as e:
    print(f"Error creating table: {e}")
    print("Please ensure the dataframe is already created and available in memory.")
    # spark.stop()

### Cas Pratique 3 - Manipulations simples de dataframe

In [131]:
import os 

In [132]:
contoso_dataset_path = "abfss://synapsestorage-fs-skw@synapsestorageskw.dfs.core.windows.net/contoso_dataset/"

filename = "DimCustomer.csv"

file_path = os.path.join(contoso_dataset_path, filename)

print(f"file_path {file_path}")

In [133]:
%%pyspark

df_dim_customer = spark.read.csv(dim_customer_path, header=True, inferSchema=True)
display(df_dim_customer.limit(10))

### Cas Pratique 4 - Utilisation de Spark SQL

In [144]:
# 1 – Créer une vue temporaire

# Jointure pour enrichir les ventes avec les noms de produits
sales_with_products = df_factSales.join(df_dim_product, on="ProductKey", how="inner")

# Création de la vue temporaire
sales_with_products.createOrReplaceTempView("sales_view")

##### Faire du caching, au besoin
- df_dim_customer.cache()
- df_dim_product.cache()
- df_dim_store.cache()
- df_factSales.cache()

In [135]:
sales_with_products.columns

##### Créer une vue temporaire

In [142]:
%%sql
SELECT 
    ProductName, 
    SUM(TotalCost) AS total_revenue
FROM 
    sales_view
GROUP BY 
    ProductName

In [143]:
# Exécution de la requête SQL et stockage du résultat
df_top_products = spark.sql("""
    SELECT 
        ProductName, 
        SUM(TotalCost) AS total_revenue
    FROM 
        sales_view
    GROUP BY 
        ProductName
""")

In [138]:
# Trier et afficher les 10 produits les plus vendus
df_top_10_products = df_top_products.orderBy("total_revenue", ascending=False).limit(10)
display(df_top_10_products.limit(10))

### Cas Pratique 5 - Spécification de schéma

In [139]:
# Définir un schéma explicite pour la table Customer

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

customer_schema = StructType([
    StructField("CustomerKey", IntegerType(), True),
    StructField("FirstName", StringType(), True),
    StructField("LastName", StringType(), True),
    StructField("YearlyIncome", DoubleType(), True),
    StructField("TotalChildren", IntegerType(), True)
])

In [141]:
# Lire un fichier CSV en appliquant un schéma custom

df_dim_customer_custom = spark.read.csv(
    path=dim_customer_path,
    schema=customer_schema,
    header=True,
    sep=",",
    inferSchema=False
)

In [145]:
# Afficher le schéma et les 5 premières lignes

df_dim_customer_custom.printSchema()
df_dim_customer_custom.show(5)

### Cas Pratique 6 - Utilisation des jointures

In [146]:
# 1. Lister les clients avec leurs achats

# Jointure entre clients et ventes
clients_avec_achats = df_dim_customer.join(
    df_factSales, 
    on="CustomerKey", 
    how="inner"
)

# Sélection des colonnes demandées
clients_avec_achats.select(
    "FirstName", "LastName", "SalesOrderNumber", "SalesAmount"
).show(10)

In [147]:
# 2. Identifier les clients sans aucun achat

clients_sans_achat = df_dim_customer.join(
    df_factSales, 
    on="CustomerKey", 
    how="left_anti"
)

clients_sans_achat.select("FirstName", "LastName").show(10)

In [148]:
# 3. Ajouter le nom du produit acheté (jointure avec Product)

# Jointure à trois tables : Customer + OnlineSales + Product
clients_achats_produits = df_dim_customer \
    .join(df_factSales, on="CustomerKey", how="inner") \
    .join(df_dim_product, on="ProductKey", how="inner")

# Sélection enrichie avec le nom du produit
clients_achats_produits.select(
    "FirstName", "LastName", "SalesOrderNumber", "SalesAmount", "ProductName"
).show(10)

### Cas Pratique 7 - Cas pratique complet

##### Sauvegarde de résultats en mode parquet

In [149]:
clients_achats_produits.columns

In [150]:
new_df = clients_achats_produits[['ProductKey', 'CustomerKey', '_c0', 'GeographyKey', 'FirstName', 'LastName', 'BirthDate', 'MaritalStatus',
 'Gender', 'YearlyIncome', 'TotalChildren', 'NumberChildrenAtHome', 'Education']]

new_df.printSchema()

new_df.show(5)

##### mode d'enregistrement des dataframes en format parquet : 
    - overwrite,
    - append,
    - ignore,
    - error


In [151]:
output_dataset_path = "abfss://dlsfssynwformwtwfrctrl@dlsaccsynwformwtwfrctrl.dfs.core.windows.net/synapse/workspaces/synw-formwtw-frctrl/warehouse/dataset/out_contoso/"

new_df.write.mode("overwrite").parquet(output_dataset_path + "CA_par_produits")

### Cas Pratique 8 - Cas pratique complet

##### 1. Chargement des fichiers CSV

In [152]:
# Chargement des fichiers CSV

try:
    dim_customer_df = spark.read.csv(dim_customer_path, header=True, inferSchema=True)
    dim_product_df = spark.read.csv(dim_product_path, header=True, inferSchema=True)
    dim_store_df = spark.read.csv(dim_store_path, header=True, inferSchema=True)
    fact_sales_df = spark.read.csv(dim_factSales_path, header=True, inferSchema=True)
except Exception as e:
    print(f"Error loading data: {e}")
    print("Please ensure the data_path is correct and the CSV files exist.")

In [158]:
# Chargement des fichiers CSV (suite)

dim_date_path   = os.path.join(contoso_dataset_path, "DimDate.csv")
dim_geo_path   = os.path.join(contoso_dataset_path, "DimGeography.csv")

try:
    dim_date_df = spark.read.csv(dim_date_path, header=True, inferSchema=True)
    dim_geo_df = spark.read.csv(dim_geo_path, header=True, inferSchema=True)
except Exception as e:
    print(f"Error loading data: {e}")
    print("Please ensure the data_path is correct and the CSV files exist.")

##### 2. Nettoyage et Preparation

In [154]:
fact_sales_df.columns

In [None]:
# Supprimer les lignes avec des valeurs nulles critiques

fact_sales_df = fact_sales_df.dropna(subset=["ProductKey", "DateKey", "SalesAmount"])

# Convertir les dates (dans DimDate) et montants (SalesAmount)

from pyspark.sql.functions import to_date, col, concat_ws

# Conversion de la date
"""
dim_date_df = dim_date_df.withColumn(
    "FullDate", to_date(concat_ws("-", col("CalendarYear"), col("MonthNumber"), col("DateKey")), "yyyy-M-d")
)
"""

dim_date_df = dim_date_df.withColumn(
    "FullDate", to_date(concat_ws("-", col("DateKey")), "yyyy-M-d")
)

# Ajout colonne YearMonth (ex : 2024-06)
dim_date_df = dim_date_df.withColumn(
    "YearMonth", concat_ws("-", col("Year"), col("Month"))
)

# Conversion des montants en float si nécessaire
fact_sales_df = fact_sales_df.withColumn("SalesAmount", col("SalesAmount").cast("double"))

display(dim_date_df.limit(10))

##### 3. Jointure — Création de la vue enrichie des ventes

In [None]:

sales_enriched_df = fact_sales_df \
    .join(dim_date_df.select("DateKey", "FullDate", "YearMonth"), on="DateKey", how="inner") \
    .join(dim_customer_df.select("CustomerKey", "GeographyKey"), on="CustomerKey", how="left") \
    .join(dim_geo_df.select("GeographyKey", "RegionCountryName"), on="GeographyKey", how="left") \
    .join(dim_product_df.select("ProductKey", "ProductName", "ProductCategoryName"), on="ProductKey", how="left") \
    .join(dim_store_df.select("StoreKey", "StoreName", "StoreType"), on="StoreKey", how="left")

##### 4.  KPI à Calculer

In [None]:
# KPI 1 – Chiffre d'affaires mensuel par produit et région

monthly_revenue = sales_enriched_df.groupBy("YearMonth", "ProductName", "RegionCountryName") \
    .agg({"SalesAmount": "sum"}) \
    .withColumnRenamed("sum(SalesAmount)", "MonthlyRevenue")

In [None]:
# KPI 2 – Top 10 produits les plus vendus (volume et valeur)

from pyspark.sql.functions import sum as _sum

top_products = sales_enriched_df.groupBy("ProductName") \
    .agg(
        _sum("SalesAmount").alias("TotalSales"),
        _sum("SalesQuantity").alias("TotalUnits")
    ) \
    .orderBy(col("TotalSales").desc())

top_products.show(10)

In [None]:
# KPI 3 – Taux de croissance mensuel des ventes

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, round

monthly_sales = sales_enriched_df.groupBy("YearMonth").agg(_sum("SalesAmount").alias("TotalSales"))

w = Window.orderBy("YearMonth")

monthly_growth = monthly_sales.withColumn(
    "PreviousMonthSales", lag("TotalSales").over(w)
).withColumn(
    "GrowthRate", round((col("TotalSales") - col("PreviousMonthSales")) / col("PreviousMonthSales") * 100, 2)
)

In [None]:
# KPI 4 – Répartition des ventes par canal (magasin physique vs en ligne)

channel_split = sales_enriched_df.groupBy("StoreType") \
    .agg(_sum("SalesAmount").alias("TotalSales")) \
    .orderBy("TotalSales", ascending=False)


In [None]:
# KPI 5 – Analyse RFM (Récence, Fréquence, Montant)

from pyspark.sql.functions import max, datediff, countDistinct

# Dernière date d'achat
latest_date = sales_enriched_df.agg({"FullDate": "max"}).collect()[0][0]

rfm = sales_enriched_df.groupBy("CustomerKey") \
    .agg(
        datediff(lit(latest_date), max("FullDate")).alias("Recency"),
        countDistinct("SalesOrderNumber").alias("Frequency"),
        _sum("SalesAmount").alias("Monetary")
    )


In [None]:
from pyspark.sql.types import IntegerType, UserDefinedType
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()

Have fun with PySpark !