In [1]:
#intro
from pyspark import SparkContext

sc = SparkContext("local", "Exemple RDD")
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
rdd_squared = rdd.map(lambda x: x ** 2)
print(rdd_squared.collect())  # Résultat : [1, 4, 9, 16, 25]

# Arrêter proprement SparkContext
sc.stop()

[1, 4, 9, 16, 25]


In [2]:
#ex1

from pyspark import SparkContext

# Création du SparkContext
sc = SparkContext("local", "Exercice 1")

# Création d'un RDD à partir d'une liste de nombres
numbers = range(1, 11)  # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(numbers)

# Filtrer les nombres pairs
even_numbers = rdd.filter(lambda x: x % 2 == 0)

# Calculer le carré de chaque nombre filtré
squared_numbers = even_numbers.map(lambda x: x ** 2)

# Calculer la somme des carrés
sum_squared = squared_numbers.reduce(lambda x, y: x + y)

# Afficher les résultats
print("Nombres pairs :", even_numbers.collect())
print("Carrés des nombres pairs :", squared_numbers.collect())
print("Somme des carrés :", sum_squared)

# Arrêter le SparkContext
sc.stop()

Nombres pairs : [2, 4, 6, 8, 10]
Carrés des nombres pairs : [4, 16, 36, 64, 100]
Somme des carrés : 220


In [3]:
#ex2

from pyspark.sql import SparkSession

# Création de la SparkSession
spark = SparkSession.builder.appName("Exercice 2").getOrCreate()

# Création des données
employee_data = [
    ("Alice", "IT", 60000),
    ("Bob", "HR", 45000),
    ("Charlie", "IT", 75000),
    ("David", "Marketing", 48000),
    ("Eve", "IT", 55000)
]

# Création du DataFrame
df = spark.createDataFrame(employee_data, ["nom", "departement", "salaire"])

# 1. Afficher le schéma du DataFrame
print("Schéma du DataFrame :")
df.printSchema()

# 2. Filtrer les employés avec un salaire > 50000
print("\nEmployés avec un salaire > 50000 :")
df.filter(df.salaire > 50000).show()

# 3. Calculer le salaire moyen par département
print("\nSalaire moyen par département :")
df.groupBy("departement").avg("salaire").show()

# Arrêter la SparkSession
spark.stop()

Schéma du DataFrame :
root
 |-- nom: string (nullable = true)
 |-- departement: string (nullable = true)
 |-- salaire: long (nullable = true)


Employés avec un salaire > 50000 :
+-------+-----------+-------+
|    nom|departement|salaire|
+-------+-----------+-------+
|  Alice|         IT|  60000|
|Charlie|         IT|  75000|
|    Eve|         IT|  55000|
+-------+-----------+-------+


Salaire moyen par département :
+-----------+------------------+
|departement|      avg(salaire)|
+-----------+------------------+
|         IT|63333.333333333336|
|         HR|           45000.0|
|  Marketing|           48000.0|
+-----------+------------------+



In [4]:
#ex3
# 

from pyspark.sql import SparkSession

# Création de la SparkSession
spark = SparkSession.builder.appName("Exercice 3").getOrCreate()

# Création des données
products_data = [
    (1, "Laptop", "Electronics", 999.99),
    (2, "Smartphone", "Electronics", 699.99),
    (3, "T-shirt", "Clothing", 19.99),
    (4, "Jeans", "Clothing", 49.99),
    (5, "Headphones", "Electronics", 149.99),
    (6, "Book", "Books", 14.99),
    (7, "Tablet", "Electronics", 299.99)
]

# Création du DataFrame
df = spark.createDataFrame(products_data, ["id", "nom", "categorie", "prix"])

# Au lieu d'écrire en CSV, travaillons directement avec le DataFrame
print("Les 5 premiers produits :")
df.show(5)

print("\nProduits Electronics :")
df.filter(df.categorie == "Electronics").show()

print("\nPrix moyen par catégorie :")
df.groupBy("categorie").avg("prix").show()

# Arrêter la SparkSession
spark.stop()

Les 5 premiers produits :
+---+----------+-----------+------+
| id|       nom|  categorie|  prix|
+---+----------+-----------+------+
|  1|    Laptop|Electronics|999.99|
|  2|Smartphone|Electronics|699.99|
|  3|   T-shirt|   Clothing| 19.99|
|  4|     Jeans|   Clothing| 49.99|
|  5|Headphones|Electronics|149.99|
+---+----------+-----------+------+
only showing top 5 rows


Produits Electronics :
+---+----------+-----------+------+
| id|       nom|  categorie|  prix|
+---+----------+-----------+------+
|  1|    Laptop|Electronics|999.99|
|  2|Smartphone|Electronics|699.99|
|  5|Headphones|Electronics|149.99|
|  7|    Tablet|Electronics|299.99|
+---+----------+-----------+------+


Prix moyen par catégorie :
+-----------+---------+
|  categorie|avg(prix)|
+-----------+---------+
|Electronics|   537.49|
|   Clothing|    34.99|
|      Books|    14.99|
+-----------+---------+



In [6]:
#ex3 après hadoop
# 
from pyspark.sql import SparkSession

# Création de la SparkSession
spark = SparkSession.builder.appName("Exercice 3").getOrCreate()

# Création des données
products_data = [
    (1, "Laptop", "Electronics", 999.99),
    (2, "Smartphone", "Electronics", 699.99),
    (3, "T-shirt", "Clothing", 19.99),
    (4, "Jeans", "Clothing", 49.99),
    (5, "Headphones", "Electronics", 149.99),
    (6, "Book", "Books", 14.99),
    (7, "Tablet", "Electronics", 299.99)
]

# Création du DataFrame
df = spark.createDataFrame(products_data, ["id", "nom", "categorie", "prix"])

# 1. Sauvegarde en CSV avec chemin complet et mode overwrite
print("Sauvegarde du fichier CSV...")
df.write.mode("overwrite").csv("C:/EXERCICES/pyspark/PySpark/products.csv", header=True)

# 2. Lecture du fichier CSV
print("\nLecture du fichier CSV...")
products_df = spark.read.csv("C:/EXERCICES/pyspark/PySpark/products.csv", header=True, inferSchema=True)

# 3. Afficher les 5 premières lignes
print("\nLes 5 premiers produits :")
products_df.show(5)

# 4. Filtrer les produits Electronics
print("\nProduits Electronics :")
products_df.filter(products_df.categorie == "Electronics").show()

# 5. Calculer le prix moyen par catégorie
print("\nPrix moyen par catégorie :")
products_df.groupBy("categorie").avg("prix").show()

# Arrêter la SparkSession
spark.stop()

Sauvegarde du fichier CSV...

Lecture du fichier CSV...

Les 5 premiers produits :
+---+----------+-----------+------+
| id|       nom|  categorie|  prix|
+---+----------+-----------+------+
|  2|Smartphone|Electronics|699.99|
|  5|Headphones|Electronics|149.99|
|  1|    Laptop|Electronics|999.99|
|  7|    Tablet|Electronics|299.99|
|  3|   T-shirt|   Clothing| 19.99|
+---+----------+-----------+------+
only showing top 5 rows


Produits Electronics :
+---+----------+-----------+------+
| id|       nom|  categorie|  prix|
+---+----------+-----------+------+
|  2|Smartphone|Electronics|699.99|
|  5|Headphones|Electronics|149.99|
|  1|    Laptop|Electronics|999.99|
|  7|    Tablet|Electronics|299.99|
+---+----------+-----------+------+


Prix moyen par catégorie :
+-----------+---------+
|  categorie|avg(prix)|
+-----------+---------+
|Electronics|   537.49|
|   Clothing|    34.99|
|      Books|    14.99|
+-----------+---------+



In [7]:
#ex4

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Création de la SparkSession
spark = SparkSession.builder.appName("Exercice 4").getOrCreate()

# Utiliser le DataFrame de produits de l'exercice précédent
products_df = spark.read.csv("C:/EXERCICES/pyspark/PySpark/products.csv", header=True, inferSchema=True)

# Définir une fonction pour classifier les prix
def classify_price(price):
    if price >= 500:
        return "Premium"
    elif price >= 100:
        return "Standard"
    else:
        return "Budget"

# Créer une UDF (User Defined Function)
price_classifier_udf = udf(classify_price, StringType())

# Appliquer l'UDF pour ajouter une nouvelle colonne
products_with_category = products_df.withColumn(
    "gamme_prix", 
    price_classifier_udf(products_df.prix)
)

# Afficher le résultat
print("Produits avec classification de prix :")
products_with_category.show()

# Analyser la distribution des produits par gamme de prix
print("\nNombre de produits par gamme de prix :")
products_with_category.groupBy("gamme_prix").count().show()

# Arrêter la SparkSession
spark.stop()

Produits avec classification de prix :
+---+----------+-----------+------+----------+
| id|       nom|  categorie|  prix|gamme_prix|
+---+----------+-----------+------+----------+
|  2|Smartphone|Electronics|699.99|   Premium|
|  5|Headphones|Electronics|149.99|  Standard|
|  1|    Laptop|Electronics|999.99|   Premium|
|  7|    Tablet|Electronics|299.99|  Standard|
|  3|   T-shirt|   Clothing| 19.99|    Budget|
|  4|     Jeans|   Clothing| 49.99|    Budget|
|  6|      Book|      Books| 14.99|    Budget|
+---+----------+-----------+------+----------+


Nombre de produits par gamme de prix :
+----------+-----+
|gamme_prix|count|
+----------+-----+
|   Premium|    2|
|  Standard|    2|
|    Budget|    3|
+----------+-----+



In [8]:
#ex5

from pyspark.sql import SparkSession

# Création de la SparkSession
spark = SparkSession.builder.appName("Exercice 5").getOrCreate()

# Création des données pour les commandes
orders_data = [
    (1, 101, 999.99, "2024-01-15"),
    (2, 102, 149.99, "2024-01-16"),
    (3, 101, 49.99, "2024-01-17"),
    (4, 103, 699.99, "2024-01-18"),
    (5, 102, 299.99, "2024-01-19")
]

# Création des données pour les clients
customers_data = [
    (101, "Alice Martin", "France"),
    (102, "Bob Smith", "USA"),
    (103, "Charlie Brown", "UK"),
    (104, "David Wilson", "Canada")
]

# Création des DataFrames
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount", "date"])
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name", "country"])

# Jointure des DataFrames
orders_with_customers = orders_df.join(
    customers_df,
    orders_df.customer_id == customers_df.customer_id,
    "left"
)

# Afficher les résultats
print("Commandes avec informations clients :")
orders_with_customers.show()

# Calculer le montant total des commandes par pays
print("\nMontant total des commandes par pays :")
orders_with_customers.groupBy("country") \
    .sum("amount") \
    .orderBy("country") \
    .show()

# Trouver le client avec le montant total le plus élevé
print("\nTop clients par montant total des commandes :")
orders_with_customers.groupBy("name", "country") \
    .sum("amount") \
    .orderBy("sum(amount)", ascending=False) \
    .show()

# Arrêter la SparkSession
spark.stop()

Commandes avec informations clients :
+--------+-----------+------+----------+-----------+-------------+-------+
|order_id|customer_id|amount|      date|customer_id|         name|country|
+--------+-----------+------+----------+-----------+-------------+-------+
|       1|        101|999.99|2024-01-15|        101| Alice Martin| France|
|       2|        102|149.99|2024-01-16|        102|    Bob Smith|    USA|
|       3|        101| 49.99|2024-01-17|        101| Alice Martin| France|
|       4|        103|699.99|2024-01-18|        103|Charlie Brown|     UK|
|       5|        102|299.99|2024-01-19|        102|    Bob Smith|    USA|
+--------+-----------+------+----------+-----------+-------------+-------+


Montant total des commandes par pays :
+-------+-----------+
|country|sum(amount)|
+-------+-----------+
| France|    1049.98|
|     UK|     699.99|
|    USA|     449.98|
+-------+-----------+


Top clients par montant total des commandes :
+-------------+-------+-----------+
|       

In [10]:
#ex 5 optimisé car lent 

from pyspark.sql import SparkSession

# Créer une nouvelle SparkSession
spark = SparkSession.builder.appName("Exercice 5").getOrCreate()

# Création des données
orders_data = [
    (1, 101, 999.99, "2024-01-15"),
    (2, 102, 149.99, "2024-01-16"),
    (3, 101, 49.99, "2024-01-17"),
    (4, 103, 699.99, "2024-01-18"),
    (5, 102, 299.99, "2024-01-19")
]

customers_data = [
    (101, "Alice Martin", "France"),
    (102, "Bob Smith", "USA"),
    (103, "Charlie Brown", "UK"),
    (104, "David Wilson", "Canada")
]

# Création des DataFrames avec cache()
orders_df = spark.createDataFrame(orders_data, ["order_id", "customer_id", "amount", "date"]).cache()
customers_df = spark.createDataFrame(customers_data, ["customer_id", "name", "country"]).cache()

# Jointure avec optimisation
orders_with_customers = orders_df.join(
    customers_df,
    "customer_id",
    "left"
).cache()

# Afficher les résultats
print("Commandes avec informations clients :")
orders_with_customers.show()

print("\nMontant total des commandes par pays :")
orders_with_customers.groupBy("country").sum("amount").show()

print("\nTop clients par montant total :")
orders_with_customers.groupBy("name", "country").sum("amount").orderBy("sum(amount)", ascending=False).show()

# Ne pas arrêter la SparkSession cette fois-ci pour pouvoir continuer avec d'autres exercices

# Ne pas arrêter la SparkSession ici si vous voulez continuer avec d'autres exercices

Commandes avec informations clients :
+-----------+--------+------+----------+-------------+-------+
|customer_id|order_id|amount|      date|         name|country|
+-----------+--------+------+----------+-------------+-------+
|        103|       4|699.99|2024-01-18|Charlie Brown|     UK|
|        101|       1|999.99|2024-01-15| Alice Martin| France|
|        101|       3| 49.99|2024-01-17| Alice Martin| France|
|        102|       2|149.99|2024-01-16|    Bob Smith|    USA|
|        102|       5|299.99|2024-01-19|    Bob Smith|    USA|
+-----------+--------+------+----------+-------------+-------+


Montant total des commandes par pays :
+-------+-----------+
|country|sum(amount)|
+-------+-----------+
|     UK|     699.99|
| France|    1049.98|
|    USA|     449.98|
+-------+-----------+


Top clients par montant total :
+-------------+-------+-----------+
|         name|country|sum(amount)|
+-------------+-------+-----------+
| Alice Martin| France|    1049.98|
|Charlie Brown|     UK

In [11]:
#ex6
# 
# Utilisons les données des commandes de l'exercice précédent et ajoutons plus de données

# Création de nouvelles commandes avec plus de données
more_orders_data = [
    (6, 101, 1299.99, "2024-01-20", "Electronics"),
    (7, 102, 79.99, "2024-01-20", "Books"),
    (8, 103, 549.99, "2024-01-21", "Electronics"),
    (9, 101, 29.99, "2024-01-21", "Clothing"),
    (10, 104, 899.99, "2024-01-22", "Electronics")
]

# Créer un nouveau DataFrame avec les commandes étendues
extended_orders_df = spark.createDataFrame(
    more_orders_data, 
    ["order_id", "customer_id", "amount", "date", "category"]
).cache()

# Joindre avec les informations clients
orders_complete = extended_orders_df.join(
    customers_df,
    "customer_id",
    "left"
).cache()

# 1. Montant total des commandes par pays et catégorie
print("Montant total par pays et catégorie :")
orders_complete.groupBy("country", "category") \
    .agg({"amount": "sum"}) \
    .orderBy("country", "category") \
    .show()

# 2. Nombre de commandes par pays
print("\nNombre de commandes par pays :")
orders_complete.groupBy("country") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

# 3. Moyenne des commandes par catégorie
print("\nMoyenne des commandes par catégorie :")
orders_complete.groupBy("category") \
    .agg({"amount": "avg"}) \
    .show()

Montant total par pays et catégorie :
+-------+-----------+-----------+
|country|   category|sum(amount)|
+-------+-----------+-----------+
| Canada|Electronics|     899.99|
| France|   Clothing|      29.99|
| France|Electronics|    1299.99|
|     UK|Electronics|     549.99|
|    USA|      Books|      79.99|
+-------+-----------+-----------+


Nombre de commandes par pays :
+-------+-----+
|country|count|
+-------+-----+
| France|    2|
|    USA|    1|
|     UK|    1|
| Canada|    1|
+-------+-----+


Moyenne des commandes par catégorie :
+-----------+-----------------+
|   category|      avg(amount)|
+-----------+-----------------+
|Electronics|916.6566666666668|
|      Books|            79.99|
|   Clothing|            29.99|
+-----------+-----------------+



In [12]:
#ex7

# Créer un DataFrame avec des valeurs manquantes
data_with_nulls = [
    (1, "Laptop", "Electronics", 999.99, 4.5),
    (2, "Smartphone", None, 699.99, None),
    (3, "T-shirt", "Clothing", None, 4.0),
    (4, "Jeans", "Clothing", 49.99, 4.2),
    (5, None, "Electronics", 149.99, 3.8),
    (6, "Book", None, 14.99, None)
]

# Créer le DataFrame
df_nulls = spark.createDataFrame(
    data_with_nulls, 
    ["id", "product", "category", "price", "rating"]
).cache()

print("DataFrame original avec valeurs manquantes :")
df_nulls.show()

# 1. Compter les valeurs manquantes par colonne
from pyspark.sql.functions import col, count, when, isnan

print("\nNombre de valeurs manquantes par colonne :")
for column in df_nulls.columns:
    null_count = df_nulls.filter(
        col(column).isNull() | isnan(col(column))
    ).count()
    print(f"{column}: {null_count} valeurs manquantes")

# 2. Remplacer les valeurs manquantes
from pyspark.sql.functions import avg, coalesce, lit

# Calculer le prix moyen
avg_price = df_nulls.select(avg("price")).collect()[0][0]

# Remplacer les valeurs manquantes
df_cleaned = df_nulls \
    .fillna("Unknown", subset=["product"]) \
    .fillna("Other", subset=["category"]) \
    .fillna(avg_price, subset=["price"]) \
    .fillna(0.0, subset=["rating"])

print("\nDataFrame après traitement des valeurs manquantes :")
df_cleaned.show()

# 3. Statistiques après nettoyage
print("\nStatistiques par catégorie après nettoyage :")
df_cleaned.groupBy("category") \
    .agg(
        {"price": "avg", "rating": "avg", "product": "count"}
    ) \
    .show()


DataFrame original avec valeurs manquantes :
+---+----------+-----------+------+------+
| id|   product|   category| price|rating|
+---+----------+-----------+------+------+
|  1|    Laptop|Electronics|999.99|   4.5|
|  2|Smartphone|       NULL|699.99|  NULL|
|  3|   T-shirt|   Clothing|  NULL|   4.0|
|  4|     Jeans|   Clothing| 49.99|   4.2|
|  5|      NULL|Electronics|149.99|   3.8|
|  6|      Book|       NULL| 14.99|  NULL|
+---+----------+-----------+------+------+


Nombre de valeurs manquantes par colonne :
id: 0 valeurs manquantes
product: 1 valeurs manquantes
category: 2 valeurs manquantes
price: 1 valeurs manquantes
rating: 2 valeurs manquantes

DataFrame après traitement des valeurs manquantes :
+---+----------+-----------+------+------+
| id|   product|   category| price|rating|
+---+----------+-----------+------+------+
|  1|    Laptop|Electronics|999.99|   4.5|
|  2|Smartphone|      Other|699.99|   0.0|
|  3|   T-shirt|   Clothing|382.99|   4.0|
|  4|     Jeans|   Clothin

In [13]:
#ex8

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Créer un plus grand jeu de données pour démontrer le partitionnement
large_data = [(i, f"product_{i}", i % 5 * 100.0) for i in range(1000)]

# Créer le DataFrame
large_df = spark.createDataFrame(large_data, ["id", "name", "price"])

# Vérifier le nombre de partitions par défaut
print("Nombre de partitions par défaut :", large_df.rdd.getNumPartitions())

# Repartitionner le DataFrame
df_repartitioned = large_df.repartition(4)
print("Nombre de partitions après repartition :", df_repartitioned.rdd.getNumPartitions())

# Comparer les performances
def show_execution_time(df, description):
    import time
    start_time = time.time()
    # Forcer l'exécution avec count()
    count = df.count()
    end_time = time.time()
    print(f"{description}: {end_time - start_time:.2f} secondes")

# Test avec différentes configurations
print("\nTest de performance avec différentes partitions:")
show_execution_time(large_df, "DataFrame original")
show_execution_time(df_repartitioned, "DataFrame repartitionné")

# Démonstration de partitionnement par colonne
df_partitioned = large_df.repartition("price")
print("\nExemple de données par partition (prix):")
df_partitioned.show(5)

Nombre de partitions par défaut : 20
Nombre de partitions après repartition : 4

Test de performance avec différentes partitions:
DataFrame original: 20.03 secondes
DataFrame repartitionné: 20.29 secondes

Exemple de données par partition (prix):
+---+----------+-----+
| id|      name|price|
+---+----------+-----+
|  0| product_0|  0.0|
|  5| product_5|  0.0|
| 10|product_10|  0.0|
| 15|product_15|  0.0|
| 20|product_20|  0.0|
+---+----------+-----+
only showing top 5 rows



In [14]:
#ex9

# Créer des données pour les produits et les ventes
products_data = [
    (1, "Laptop", "Electronics", 999.99),
    (2, "Smartphone", "Electronics", 699.99),
    (3, "T-shirt", "Clothing", 19.99),
    (4, "Jeans", "Clothing", 49.99),
    (5, "Headphones", "Electronics", 149.99)
]

sales_data = [
    (1, 1, "2024-01-01", 2),
    (2, 2, "2024-01-01", 1),
    (3, 1, "2024-01-02", 1),
    (4, 3, "2024-01-02", 3),
    (5, 2, "2024-01-03", 2)
]

# Créer les DataFrames
products_df = spark.createDataFrame(products_data, ["product_id", "name", "category", "price"])
sales_df = spark.createDataFrame(sales_data, ["sale_id", "product_id", "date", "quantity"])

# Créer des vues temporaires pour utiliser SQL
products_df.createOrReplaceTempView("products")
sales_df.createOrReplaceTempView("sales")

# 1. Requête simple
print("Liste des produits avec leur prix:")
spark.sql("""
    SELECT name, price
    FROM products
    ORDER BY price DESC
""").show()

# 2. Jointure avec agrégation
print("\nTotal des ventes par catégorie:")
spark.sql("""
    SELECT 
        p.category,
        SUM(p.price * s.quantity) as total_sales,
        COUNT(s.sale_id) as number_of_sales
    FROM products p
    JOIN sales s ON p.product_id = s.product_id
    GROUP BY p.category
    ORDER BY total_sales DESC
""").show()

# 3. Analyse plus complexe
print("\nProduits les plus vendus:")
spark.sql("""
    WITH product_sales AS (
        SELECT 
            p.name,
            p.category,
            SUM(s.quantity) as total_quantity,
            SUM(p.price * s.quantity) as total_revenue
        FROM products p
        JOIN sales s ON p.product_id = s.product_id
        GROUP BY p.name, p.category
    )
    SELECT 
        name,
        category,
        total_quantity,
        total_revenue,
        RANK() OVER (ORDER BY total_revenue DESC) as revenue_rank
    FROM product_sales
    ORDER BY total_revenue DESC
""").show()

# 4. Statistiques par catégorie
print("\nStatistiques par catégorie:")
spark.sql("""
    SELECT 
        category,
        COUNT(*) as product_count,
        ROUND(AVG(price), 2) as avg_price,
        MIN(price) as min_price,
        MAX(price) as max_price
    FROM products
    GROUP BY category
""").show()

Liste des produits avec leur prix:
+----------+------+
|      name| price|
+----------+------+
|    Laptop|999.99|
|Smartphone|699.99|
|Headphones|149.99|
|     Jeans| 49.99|
|   T-shirt| 19.99|
+----------+------+


Total des ventes par catégorie:
+-----------+------------------+---------------+
|   category|       total_sales|number_of_sales|
+-----------+------------------+---------------+
|Electronics|5099.9400000000005|              4|
|   Clothing|             59.97|              1|
+-----------+------------------+---------------+


Produits les plus vendus:
+----------+-----------+--------------+------------------+------------+
|      name|   category|total_quantity|     total_revenue|revenue_rank|
+----------+-----------+--------------+------------------+------------+
|    Laptop|Electronics|             3|2999.9700000000003|           1|
|Smartphone|Electronics|             3|2099.9700000000003|           2|
|   T-shirt|   Clothing|             3|             59.97|           

In [16]:
#ex10

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, col, avg, sum, percent_rank

# Créer des données pour les employés
employees_data = [
    (1, "Alice", "IT", 75000, "2022-01-01"),
    (2, "Bob", "IT", 65000, "2021-06-15"),
    (3, "Charlie", "HR", 55000, "2023-03-01"),
    (4, "David", "IT", 85000, "2021-01-01"),
    (5, "Eve", "HR", 60000, "2022-07-01"),
    (6, "Frank", "Finance", 70000, "2023-01-01"),
    (7, "Grace", "Finance", 80000, "2021-12-01"),
    (8, "Henry", "IT", 90000, "2022-09-15")
]

# Créer le DataFrame
employees_df = spark.createDataFrame(
    employees_data, 
    ["id", "name", "department", "salary", "hire_date"]
)

# 1. Classement des salaires par département
print("Classement des salaires par département:")
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

ranked_salaries = employees_df.withColumn("rank", rank().over(window_spec)) \
    .withColumn("dense_rank", dense_rank().over(window_spec)) \
    .withColumn("row_number", row_number().over(window_spec))

ranked_salaries.show()

# 2. Calcul du salaire moyen mobile par département
print("\nSalaire moyen mobile par département:")
window_avg = Window.partitionBy("department").orderBy("salary") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

avg_salaries = employees_df.withColumn(
    "running_avg_salary", 
    avg("salary").over(window_avg)
)

avg_salaries.show()

# 3. Comparaison avec le salaire précédent et suivant
print("\nComparaison avec salaires précédent et suivant:")
window_lag_lead = Window.partitionBy("department").orderBy("salary")

comparison = employees_df.withColumn(
    "prev_salary", 
    lag("salary").over(window_lag_lead)
) \
.withColumn(
    "next_salary", 
    lead("salary").over(window_lag_lead)
) \
.withColumn(
    "diff_from_prev", 
    col("salary") - col("prev_salary")
)

comparison.show()

# 4. Statistiques cumulatives
print("\nStatistiques cumulatives par département:")
window_cumulative = Window.partitionBy("department").orderBy("hire_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

cumulative_stats = employees_df.withColumn(
    "cumulative_employees", 
    count("*").over(window_cumulative)
) \
.withColumn(
    "cumulative_salary_cost", 
    sum("salary").over(window_cumulative)
) \
.withColumn(
    "avg_salary_so_far", 
    avg("salary").over(window_cumulative)
)

cumulative_stats.orderBy("department", "hire_date").show()

# 5. Calcul des percentiles par département
print("\nPercentiles des salaires par département:")
window_ntile = Window.partitionBy("department").orderBy("salary")

percentiles = employees_df.withColumn(
    "percentile", 
    percent_rank().over(window_ntile)
)

percentiles.orderBy("department", "salary").show()


Classement des salaires par département:
+---+-------+----------+------+----------+----+----------+----------+
| id|   name|department|salary| hire_date|rank|dense_rank|row_number|
+---+-------+----------+------+----------+----+----------+----------+
|  7|  Grace|   Finance| 80000|2021-12-01|   1|         1|         1|
|  6|  Frank|   Finance| 70000|2023-01-01|   2|         2|         2|
|  5|    Eve|        HR| 60000|2022-07-01|   1|         1|         1|
|  3|Charlie|        HR| 55000|2023-03-01|   2|         2|         2|
|  8|  Henry|        IT| 90000|2022-09-15|   1|         1|         1|
|  4|  David|        IT| 85000|2021-01-01|   2|         2|         2|
|  1|  Alice|        IT| 75000|2022-01-01|   3|         3|         3|
|  2|    Bob|        IT| 65000|2021-06-15|   4|         4|         4|
+---+-------+----------+------+----------+----+----------+----------+


Salaire moyen mobile par département:
+---+-------+----------+------+----------+------------------+
| id|   name|depa