![SPARK_PYSPARK](img/spark_pyspark.jpeg)

# 💫 Apache Spark : le moteur du Big Data moderne

Le **Big Data** désigne des volumes de données trop grands pour être traités sur une seule machine.
Historiquement, on utilisait **Hadoop MapReduce**, mais celui-ci lisait et écrivait souvent sur disque → lent.

**Apache Spark** est né pour répondre à ce problème.
- **Traitement en mémoire (RAM)** → bien plus rapide
- **API haut niveau** (SQL, DataFrame, MLlib, etc.)
- **Multi-langages** : Scala, Java, Python (PySpark), R

Spark repose sur une architecture driver / executors.
- **Driver** : programme principal. Gère le plan d’exécution et collecte les résultats
- **Cluster Manager** : alloue les ressources (CPU, RAM) aux tâches Spark (YARN, Kubernetes, Standalone…)
- **Executors** : exécutent les tâches sur les partitions de données.

Spark distribue automatiquement les données et le calcul sur plusieurs machines.

# ✨ PySpark : l’interface Python de Spark

**PySpark** est l’interface de programmation en Python pour Apache Spark.\
Elle permet d’utiliser la puissance du moteur Spark tout en restant dans un environnement familier pour les développeurs Python.

Concrètement :
- Le code écrit en Python est traduit en instructions que le moteur Spark exécute sur le cluster.
- L’exécution réelle ne se fait pas dans le processus Python, mais dans la JVM de Spark (Java Virtual Machine).
- Cette communication passe par l’API Py4J, une passerelle entre Python et Java.

Ainsi, PySpark combine :
- la simplicité du Python (syntaxe, intégration avec pandas, notebooks, etc.)
- la scalabilité de Spark, capable de traiter des téraoctets de données sur des centaines de noeuds.

# ⌨️ Hands-On

**Objectifs**  
- Comprendre comment démarrer une `SparkSession` en local  
- Manipuler des DataFrames PySpark (lecture, inspection, transformations)  
- Comprendre `lazy evaluation`, `action`, `job / stage / task`  
- Sauvegarder/relire des données (CSV, Parquet)

## 1. Démarrer Spark en local

Avant de pouvoir manipuler des données avec PySpark, il faut créer une session Spark.\
Cette session est le point d’entrée principal vers toutes les fonctionnalités de Spark (lecture de données, SQL, transformations, machine learning, etc.).

En pratique, on la crée à l’aide de la classe **SparkSession** qui permet de configurer et d’initialiser le moteur Spark.\
Une fois cette cellule exécutée, Spark démarre un contexte local et affiche des informations dans la console.
C’est le signe que le driver est prêt à exécuter des tâches et à communiquer avec d’éventuels executors.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IntroSparkLocal") \
    .master("local[*]") \
    .getOrCreate()

# SparkSession.builder crée un objet constructeur pour définir les paramètres de la session.
# .appName("IntroSparkLocal") définit un nom d’application visible dans l’interface Spark (utile pour identifier ton job).
# .master("local[*]") indique le mode d’exécution :
#       - local signifie que le calcul se fera sur la machine locale.
#       - [*] indique que Spark utilisera tous les cœurs CPU disponibles.
# .getOrCreate() crée une nouvelle session Spark si elle n’existe pas encore, ou récupère celle déjà active.

Aussi, Spark démarre une interface web accessible à l’adresse suivante : http://localhost:4040

Cette interface permet de :
- visualiser les jobs Spark en cours ou terminés
- consulter les stages et tasks
- analyser les plans d’exécution
- suivre l’utilisation des ressources locales (mémoire, CPU)

### 🧭 Exploration recommandée

Ouvrir l’URL http://localhost:4040 dans le navigateur.
Parcourir les différentes sections pour comprendre comment Spark gère un traitement distribué :
- Jobs : résumé des traitements lancés,
- Stages : décomposition logique des jobs,
- Tasks : exécution concrète sur les partitions de données,
- Storage : aperçu du cache et des données en mémoire.

Prendre quelques minutes pour parcourir cette interface avant de continuer car elle aidera à visualiser ce qui se passe “sous le capot” à chaque fois qu'une opération est exécutée dans PySpark.

## 2. Charger le dataset de test :  `employees.csv`

Pour travailler concrètement avec Spark, nous allons charger un petit dataset de test nommé `employees.csv`.\
Ce fichier contient des informations simples sur des employés : identifiant, nom, âge, département, salaire et années d’expérience.

Dans PySpark, la lecture d’un fichier CSV s’effectue via la méthode `spark.read.csv()`.
Cette méthode renvoie un **DataFrame Spark,** c’est-à-dire une **structure tabulaire distribuée** (semblable à un tableau SQL ou à un DataFrame Pandas, mais répartie sur plusieurs noeuds).

In [2]:
# Charge le fichier csv
# header=True indique que la première ligne contient les noms des colonnes
# inferSchema=True demande à Spark de déduire automatiquement les types de données sinon toutes les colonnes seraient lues comme des chaînes de caractères
df = spark.read.csv("./data/employees.csv", header=True, inferSchema=True)

# Affiche la structure (schéma) du DataFrame, c’est-à-dire les noms de colonnes et leurs types 
df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- experience_years: integer (nullable = true)



🔍 **Détaillons ce qu’il se passe en arrière-plan.**

Lors de la lecture du CSV :
1. Le driver Spark reçoit la requête de lecture du fichier csv. Il construit un plan logique pour lire le fichier CSV. Le plan logique est une représentation interne (sous forme d’un graphe `DAG`) qui décrit les étapes nécessaires pour obtenir le résultat final : *"quel fichier lire"*, *"avec quel format (ici CSV)"*, *"quelles colonnes ou types de données"*, *"quelles transformations appliquer (filtres, jointures, agrégations, etc.)"*\
Mais à ce stade, **rien n’est encore exécuté**. Spark se contente de noter l’**intention du programme**.
2. Quand une action est déclenchée (.show(), .count(), etc.), le driver envoie les tâches aux executors.
3. Les executors lisent réellement les données (leurs partitions), exécutent les transformations, et renvoient leurs résultats partiels au driver. (En mode local, les executors sont simplement des threads dans le même processus JVM que le driver.)
4. Le driver agrège ces résultats et les renvoie au programme Python.

Même pour un petit CSV local, le mécanisme est le même que sur un cluster : Spark applique toujours un modèle distribué. Cela rend le passage à grande échelle quasiment transparent pour le code.

### 2.1. Inspection de base

Avant de transformer les données, il est souvent utile de vérifier la structure du DataFrame.\
L'opération est immédiate : elle interroge seulement les métadonnées du DataFrame (pas les données elles-mêmes).

In [3]:
df.columns

['employee_id', 'name', 'age', 'department', 'salary', 'experience_years']

### 2.2. Lazy Evaluation

⚠️ Contrairement à df.columns, les instructions suivantes déclenchent des actions Spark.\
Le moteur Spark exécute réellement des jobs distribués pour lire les données, parcourir toutes les partitions et agréger les résultats partiels.

🔍 **Détaillons ce qu’il se passe en arrière-plan.**
- `df.count()` parcourt l’ensemble du DataFrame pour compter le nombre de lignes. Pour cela : \
Spark crée un job composé de plusieurs stages et tasks, chacun exécuté par les executors sur une partition de données.\
Une fois terminées, les executors envoient leurs résultats partiels (leurs counts locaux) au driver.\
Le driver additionne ces valeurs pour obtenir le compte global du DataFrame,
puis renvoie le résultat final au programme Python.
- `df.show(5)` demande à Spark d’afficher les 5 premières lignes du DataFrame.
Cela nécessite de lire physiquement les données (au moins partiellement) → Spark déclenche donc aussi un job distribué.

In [4]:
display(df.count())
display(df.show(5))

10000

+-----------+---------------+----+----------+-------+----------------+
|employee_id|           name| age|department| salary|experience_years|
+-----------+---------------+----+----------+-------+----------------+
|          1|      Ugo Petit|36.0|        IT|6364.39|              10|
|          2|  Claire Girard|21.0|       r&d|   NULL|               3|
|          3|Wassim Lefebvre|45.0|   support|   NULL|              13|
|          4|Wassim Bertrand|36.0|Operations|6504.21|              14|
|          5|    Liam Garcia|21.0|     Sales|3631.12|               3|
+-----------+---------------+----+----------+-------+----------------+
only showing top 5 rows


None

## 3. Sélections, filtres, tris

Dans Spark, les DataFrames offrent une interface haut niveau pour manipuler les données de manière déclarative, un peu comme en SQL.
Les opérations de sélection, filtrage et tri sont des transformations qui **ne modifient pas les données originales** : elles produisent de **nouveaux DataFrames**.

💡 Chaque transformation crée un nouveau plan logique dans Spark (**aucune lecture ou exécution réelle n’a lieu** tant qu’une action, comme .show() ou .count(), n’est pas appelée).\
Ainsi, même un simple tri ou filtre suit la même logique de planification distribuée que sur un cluster.

In [5]:
from pyspark.sql.functions import col

# Sélection de colonnes
df_sel = df.select("name", "age", "department")
df_sel.show(5)

# Filtres
employees_filtered = df_sel.filter(col("age") >= 30) # Ne conserver que les employés d'un certain âge
employees_filtered = employees_filtered.orderBy(col("age").desc())
employees_filtered.show(5)

+---------------+----+----------+
|           name| age|department|
+---------------+----+----------+
|      Ugo Petit|36.0|        IT|
|  Claire Girard|21.0|       r&d|
|Wassim Lefebvre|45.0|   support|
|Wassim Bertrand|36.0|Operations|
|    Liam Garcia|21.0|     Sales|
+---------------+----+----------+
only showing top 5 rows
+---------------+----+----------+
|           name| age|department|
+---------------+----+----------+
|  Giulia Girard|64.0|operations|
|Wassim Lefebvre|64.0|   Finance|
|   Farid Durand|64.0| Support  |
| Alice Lefebvre|64.0| Marketing|
|    Ziad Martin|64.0|       R&D|
+---------------+----+----------+
only showing top 5 rows


### 3.1. Spark SQL

Il est possible d’écrire directement du ***SQL*** grâce à ***Spark SQL***, un module intégré à Apache Spark qui permet d’interroger les données via des requêtes SQL standard.\
Pour cela, il suffit d’enregistrer un DataFrame comme vue temporaire à l’aide de `createOrReplaceTempView()`. Cette vue agit comme une table virtuelle accessible uniquement durant la session en cours.\
On peut ensuite exécuter des requêtes SQL sur cette vue avec la méthode `spark.sql()`, comme dans l’exemple ci-dessous, qui fait exactement la même chose que le job précédent.

In [6]:
# Création ou remplacement d'une vue temporaire à partir du DataFrame
df.createOrReplaceTempView("employees")

# Requête SQL exécutée via Spark SQL
result = spark.sql("""
    SELECT department,
           AVG(salary) AS avg_salary
    FROM employees
    GROUP BY department
""")

# Affichage du résultat
result.show()

+------------+------------------+
|  department|        avg_salary|
+------------+------------------+
|     Legal  | 7160.863068181826|
|         r&d| 6943.013473684209|
|       Sales| 5572.902567567568|
|Operations  |  5422.08880794702|
|     finance|6706.1631499999985|
|       legal| 7365.044111675121|
|       Legal| 7523.037461928936|
|   MARKETING| 5363.743197674419|
|          HR| 4828.136005509641|
|     Sales  | 5597.308164556963|
|     Support| 4397.905555555553|
|  OPERATIONS| 5444.523483146067|
|     Finance|  6891.88554216867|
|       SALES| 5713.909941860465|
|       R&D  | 6886.110502793296|
|          Hr|  4954.37892655367|
|   marketing| 5268.799746192894|
|        HR  | 4892.545178571427|
|       sales| 5668.184748603351|
|         R&D| 6728.667552083339|
+------------+------------------+
only showing top 20 rows


### 🧩 Exercices

> Essayez de modifier et d’expérimenter :
> - sélectionnez d’autres colonnes avec .select().
> - changez la condition du filtre (age < 25, department == "HR", etc.).
> - triez par une autre colonne, ou combinez plusieurs tris
> - ajoutez plusieurs conditions avec les opérateurs logiques `&` (et), `|` (ou)

In [9]:
# Sélection de colonnes
df_sel = df.select("name", "age", "department", "salary", "experience_years")
df_sel.show(5)

# Filtres
employees_filtered = df_sel.filter((col("age") >= 30) & (col("age") <= 60) & (col("salary") >= 6000)) # Ne conserver que les employés d'un certain âge
employees_filtered = employees_filtered.orderBy(col("salary").desc())
employees_filtered.show(5)

+---------------+----+----------+-------+----------------+
|           name| age|department| salary|experience_years|
+---------------+----+----------+-------+----------------+
|      Ugo Petit|36.0|        IT|6364.39|              10|
|  Claire Girard|21.0|       r&d|   NULL|               3|
|Wassim Lefebvre|45.0|   support|   NULL|              13|
|Wassim Bertrand|36.0|Operations|6504.21|              14|
|    Liam Garcia|21.0|     Sales|3631.12|               3|
+---------------+----+----------+-------+----------------+
only showing top 5 rows
+------------+----+----------+--------+----------------+
|        name| age|department|  salary|experience_years|
+------------+----+----------+--------+----------------+
| Maya Garcia|53.0|   Legal  | 11254.6|              26|
|  Xia Martin|55.0|     Legal|10839.19|              22|
|   Qi Martin|58.0|     Legal|10609.12|              24|
|Noah Bernard|58.0|     R&D  |10350.71|              24|
|  Val Moreau|60.0|     LEGAL|10271.61|       

> Pensez à tester aussi Spark SQL dans cet exercice.

In [13]:
# Création ou remplacement d'une vue temporaire à partir du DataFrame
df.createOrReplaceTempView("employees")

# Requête SQL exécutée via Spark SQL
result = spark.sql("""
    SELECT department,
           AVG(salary) AS avg_salary,
           AVG(experience_years) as avg_exp_years
    FROM employees
    GROUP BY department
""")

# Affichage du résultat
result.show()

+------------+------------------+------------------+
|  department|        avg_salary|     avg_exp_years|
+------------+------------------+------------------+
|     Legal  | 7160.863068181826|10.292817679558011|
|         r&d| 6943.013473684209|11.614213197969542|
|       Sales| 5572.902567567568|10.835978835978835|
|Operations  |  5422.08880794702|11.265822784810126|
|     finance|6706.1631499999985|10.573529411764707|
|       legal| 7365.044111675121|11.078817733990148|
|       Legal| 7523.037461928936|11.722772277227723|
|   MARKETING| 5363.743197674419|10.938547486033519|
|          HR| 4828.136005509641|10.803763440860216|
|     Sales  | 5597.308164556963|10.894409937888199|
|     Support| 4397.905555555553|11.452261306532664|
|  OPERATIONS| 5444.523483146067|11.311475409836065|
|     Finance|  6891.88554216867|11.151428571428571|
|       SALES| 5713.909941860465|11.387640449438202|
|       R&D  | 6886.110502793296|11.097826086956522|
|          Hr|  4954.37892655367|11.8571428571

## 4. Nettoyage des données

Dans Spark, nous pouvons appliquer des fonctions de transformation directement sur les colonnes d’un DataFrame de manière distribuée.

Dans l’exemple ci-dessous :
- nous normalisons les noms de département
- nous traitons les valeurs manquantes

💡 Ces opérations sont aussi "lazy" : elles ne modifient pas les données tant qu’aucune action n’est exécutée.

In [14]:
from pyspark.sql.functions import when, trim, upper

# Normaliser le nom du département: trim + upper
df_sel = df
df_clean = df_sel.withColumn("department_norm", upper(trim(col("department"))))

# Gérer les valeurs manquantes d'âge et de salaire
df_clean = df_clean.withColumn("age", when(col("age").isNull(), 0).otherwise(col("age")))
df_clean = df_clean.na.fill({"salary": 0.0})

df_clean.select("employee_id","name","department","department_norm","age","salary").show(5)

+-----------+---------------+----------+---------------+----+-------+
|employee_id|           name|department|department_norm| age| salary|
+-----------+---------------+----------+---------------+----+-------+
|          1|      Ugo Petit|        IT|             IT|36.0|6364.39|
|          2|  Claire Girard|       r&d|            R&D|21.0|    0.0|
|          3|Wassim Lefebvre|   support|        SUPPORT|45.0|    0.0|
|          4|Wassim Bertrand|Operations|     OPERATIONS|36.0|6504.21|
|          5|    Liam Garcia|     Sales|          SALES|21.0|3631.12|
+-----------+---------------+----------+---------------+----+-------+
only showing top 5 rows


### 4.1. Créer des colonnes calculées

La préparation des données consiste aussi à créer de nouvelles colonnes dérivées à partir des colonnes existantes.\
Dans Spark, cela se fait avec la méthode `withColumn()`, qui permet d’ajouter une colonne calculée sans modifier le DataFrame original.

👉 Si le nom de colonne existe déjà, la colonne est remplacée (écrasée) par la nouvelle version.\
👉 Si le nom n’existe pas, une nouvelle colonne est simplement ajoutée au DataFrame, c'est le cas dans l'exemple suivant.

In [15]:
from pyspark.sql.functions import avg

# Catégoriser les revenus
df_feat = df_clean.withColumn(
    "salary_band",
    when(col("salary") < 3000, "LOW").when(col("salary") < 6000, "MID").otherwise("HIGH")
)

df_feat.show(5)


+-----------+---------------+----+----------+-------+----------------+---------------+-----------+
|employee_id|           name| age|department| salary|experience_years|department_norm|salary_band|
+-----------+---------------+----+----------+-------+----------------+---------------+-----------+
|          1|      Ugo Petit|36.0|        IT|6364.39|              10|             IT|       HIGH|
|          2|  Claire Girard|21.0|       r&d|    0.0|               3|            R&D|        LOW|
|          3|Wassim Lefebvre|45.0|   support|    0.0|              13|        SUPPORT|        LOW|
|          4|Wassim Bertrand|36.0|Operations|6504.21|              14|     OPERATIONS|       HIGH|
|          5|    Liam Garcia|21.0|     Sales|3631.12|               3|          SALES|        MID|
+-----------+---------------+----+----------+-------+----------------+---------------+-----------+
only showing top 5 rows


## 5. Regroupement et agrégation

La phase d’agrégation permet de résumer, regrouper et interpréter les informations issues de grands volumes de données.\
Avec PySpark, ces opérations s’effectuent facilement grâce aux fonctions de groupement (`groupBy`), de comptage (`count`), et d’agrégation (`agg`).

In [16]:
# Nombre d'employés par catégorie de revenus
df_feat\
    .groupBy("salary_band")\
    .count()\
    .orderBy(col("count").desc())\
    .show()

+-----------+-----+
|salary_band|count|
+-----------+-----+
|        MID| 5476|
|       HIGH| 4175|
|        LOW|  349|
+-----------+-----+



In [17]:
from pyspark.sql.functions import round as sround

# Analyse des salaires moyens par département 
dept_stats = df_feat\
                .groupBy("department_norm")\
                .agg(sround(avg("salary"),2).alias("avg_salary"))\
                .orderBy(col("avg_salary").desc())

dept_stats.show()


+---------------+----------+
|department_norm|avg_salary|
+---------------+----------+
|          LEGAL|   7098.72|
|            R&D|    6621.4|
|        FINANCE|   6598.94|
|             IT|   6207.31|
|          SALES|   5437.99|
|     OPERATIONS|   5227.45|
|      MARKETING|   5185.57|
|             HR|   4690.68|
|        SUPPORT|    4177.1|
+---------------+----------+



### 🧩 Exercices

> Comment Spark découpe les transformations ci-dessus en **stages** ?  
> Ouvrez l'onglet **SQL / DAG** dans l'UI Spark puis analysez les informations présentées.


## 6. Sauvegarde de résultats (CSV / Parquet)

Une fois les traitements réalisés nous pouvons sauvegarder les résultats pour les réutiliser ou les partager.\
PySpark permet d’écrire facilement un DataFrame dans différents formats, notamment CSV et Parquet (format binaire optimisé pour le Big Data).

- Le format CSV est universel mais peu efficace sur de gros volumes, car il ne conserve pas les types de données.
- Le format Parquet, lui, conserve le schéma, compresse les données et accélère les lectures : il est donc privilégié dans les environnements distribués.

### 🔎 Format Parquet
Pour aller plus loin concernant le format Parquet, voici quelques ressources :
- 📘 [Documentation officielle Apache Parquet](https://parquet.apache.org/docs/)  
- 🎥 [Qu’est-ce qu'une base de données orientée colonne ?](https://www.youtube.com/watch?v=1MnvuNg33pA)

In [18]:
# Écriture en CSV (dossier de sortie partitionné)
dept_stats.write.mode("overwrite").csv("./outputs/dept_stats_csv")

# Écriture en Parquet (conserve le schéma, plus efficace)
dept_stats.write.mode("overwrite").parquet("./outputs/dept_stats_parquet")

# Relecture du Parquet
reloaded = spark.read.parquet("./outputs/dept_stats_parquet")
reloaded.printSchema()
reloaded.show()


Py4JJavaError: An error occurred while calling o218.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:190)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:268)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
	at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1439)
	at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:131)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:192)
	at org.apache.spark.sql.classic.DataFrameWriter.runCommand(DataFrameWriter.scala:622)
	at org.apache.spark.sql.classic.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.classic.DataFrameWriter.saveInternal(DataFrameWriter.scala:241)
	at org.apache.spark.sql.classic.DataFrameWriter.save(DataFrameWriter.scala:118)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:426)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:840)
	Suppressed: org.apache.spark.util.Utils$OriginalTryStackTraceException: Full stacktrace of original doTryWithCallerStacktrace caller
		at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
		at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:314)
		at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:1116)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:798)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:838)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:837)
		at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:810)
		at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:988)
		at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
		at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:190)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:268)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:306)
		at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:189)
		at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:117)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:115)
		at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:129)
		at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:402)
		at org.apache.spark.sql.execution.adaptive.ResultQueryStageExec.$anonfun$doMaterialize$1(QueryStageExec.scala:325)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$4(SQLExecution.scala:322)
		at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:272)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$3(SQLExecution.scala:320)
		at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
		at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$2(SQLExecution.scala:316)
		at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		... 1 more
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:601)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:622)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:645)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:742)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1954)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1912)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1885)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$install$1(ShutdownHookManager.scala:194)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.Option.fold(Option.scala:263)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:195)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:55)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:53)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:159)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala:63)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:250)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:99)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:379)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:961)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:204)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:96)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1132)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1141)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:521)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:492)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:569)
	... 27 more


## 7. RDD (Resilient Distributed Dataset)

Jusqu’à présent, vous avez manipulé des **DataFrames** : une interface structurée, avec des colonnes nommées, des types de données, et des opérations familières (sélection, groupBy, agg, etc.).\
Les DataFrames sont très pratiques, car ils permettent d’écrire du code lisible et optimisé grâce au moteur **Catalyst** de Spark.

Mais sous cette interface haut niveau se cache la brique fondamentale de Spark : le **RDD (Resilient Distributed Dataset)**.\
Comprendre ce concept vous permettra de mieux saisir comment Spark distribue, exécute et reconstruit vos données à grande échelle.

Un RDD est une collection d’objets distribuée à travers le cluster Spark.
- **Resilient** : il peut être reconstruit automatiquement en cas de panne.
- **Distributed** : les données sont découpées en partitions réparties sur plusieurs nœuds.
- **Dataset** : c’est simplement un ensemble d’éléments (lignes, objets, etc.).

Autrement dit, un RDD est une collection répartie sur plusieurs machines, que Spark sait manipuler en parallèle.

### 7.1. Manipulation de base

In [None]:
sc = spark.sparkContext

# Exemple d'un RDD à partir d’une liste locale
# Spark crée un RDD contenant ces 5 éléments.
rdd = sc.parallelize([1, 2, 3, 4, 5])

In [None]:
# Rien n’est exécuté tant qu’on ne déclenche pas une action
display(rdd.collect())

In [None]:
# On peut transformer les données
rdd2 = rdd.map(lambda x: x * 2)
display(rdd2.collect())

#### 🧩 Exercices

> Soit la liste de produits :

```bash
ventes = [
    ("chaise", 200),
    ("table", 450),
    ("lampe", 100),
    ("chaise", 150),
    ("table", 300),
    ("lampe", 80)
]
```

> - Créez un RDD à partir de cette liste.
> - Récupérez les produits dont le prix dépasse 100€
> - Calculez le CA total par produit (possible d'utiliser .reduceByKey())
> - Triez par CA décroissant (possible d'utiliser .sortBy())

In [None]:
# Votre code ici

### 7.2. Manipulation de fichiers

In [None]:
# Spark permet de lire et de manipuler des fichiers sous forme de RDD.
employees_rdd = sc.textFile("./data/employees.csv")
employees_rdd.collect()

#### 🧩 Exercices

> Ne récupérer que les employés du département IT.\
> Indice : .map(), .filter(), .split()

```bash
Exemple :

[['22', 'Alice Bernard', '33.0', 'IT', '6231.15', '10'],
 ['32', 'Hugo Girard', '58.0', 'IT', '8616.31', '20'],
 ['40', 'Emma Morel', '42.0', 'IT', '6115.84', '10'],
 ['72', 'Tom Petit', '41.0', 'IT', '5449.79', '7'],
 ['83', 'Ines Durand', '54.0', 'IT', '7555.4', '13']]
 ```

In [None]:
# Votre code ici

## 8. Approfondir

Ce notebook propose une **introduction pratique et progressive** à Apache Spark et PySpark.  
Il ne couvre pas toutes les subtilités du framework, notamment celles liées à l’**optimisation des performances**, la **gestion des clusters**, ou les **fonctionnalités avancées**, MLlib, ou Structured Streaming.  

Pour compléter votre apprentissage, il est **fortement recommandé de poursuivre l’exploration** à travers d'autres ressources.\
Il existe **de nombreuses ressources de qualité disponibles en ligne**, il suffit de chercher un peu pour trouver des explications, tutoriels et exemples adaptés à votre niveau.

#### Ressources officielles
- [Documentation officielle de Spark](https://spark.apache.org/docs/latest/) — la source principale pour comprendre les modules (SQL, MLlib, Streaming, GraphX).  
- [API PySpark sur spark.apache.org](https://spark.apache.org/docs/latest/api/python/) — référence complète des fonctions disponibles.  
- [Apache Spark sur GitHub](https://github.com/apache/spark) — pour explorer le code source et les évolutions du projet.  

#### Tutoriels et cours gratuits
- [Spark Tutorial (DataCamp)](https://www.datacamp.com/tutorial/pyspark-tutorial-getting-started-with-pyspark) — bonnes bases pour pratiquer avec PySpark.  
- [YouTube – “Apache Spark Full Course” by Simplilearn](https://www.youtube.com/watch?v=_C8kWso4ne4) — formation vidéo complète et vulgarisée.   

In [2]:
import os
from pyspark.sql import SparkSession

print("Test avec timeout augmenté...")

# Créer Spark avec configurations spéciales pour Windows
spark = SparkSession.builder \
    .appName("Test") \
    .config("spark.python.worker.timeout", "600") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .config("spark.executorEnv.PYTHONHASHSEED", "0") \
    .getOrCreate()

print(f"✓ SparkSession créée (Spark {spark.version})")

# Test simple
try:
    test_data = [(1, "test"), (2, "spark")]
    df = spark.createDataFrame(test_data, ["id", "nom"])
    
    df.write.mode("overwrite").csv("./test_output")
    print("✓ Écriture CSV réussie !")
    
    # Nettoyage
    import shutil
    shutil.rmtree("./test_output", ignore_errors=True)
    
    print("\n✓ SUCCÈS !")
    
except Exception as e:
    print(f"✗ Erreur : {e}")

spark.stop()

Test avec timeout augmenté...
✓ SparkSession créée (Spark 4.0.1)
✗ Erreur : An error occurred while calling o76.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 14) (HDF-PC-PF1AVK92 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:252)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:143)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:158)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:178)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:261)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
	at org.apach

In [3]:
import sys
import os

print(f"Python utilisé : {sys.executable}")
print(f"Version Python : {sys.version}")
print(f"PYSPARK_PYTHON défini ? {os.environ.get('PYSPARK_PYTHON', 'Non')}")

Python utilisé : c:\Users\Utilisateur\Desktop\Big Data\Spark\SPARK_initiation\SPARK_initiation\venv\Scripts\python.exe
Version Python : 3.13.5 (tags/v3.13.5:6cb20a2, Jun 11 2025, 16:15:46) [MSC v.1943 64 bit (AMD64)]
PYSPARK_PYTHON défini ? Non


In [4]:
import os
import sys
from pyspark.sql import SparkSession

# Définir le Python à utiliser
python_path = sys.executable
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

print(f"PYSPARK_PYTHON défini : {python_path}")

# Créer Spark
spark = SparkSession.builder \
    .appName("Test") \
    .config("spark.python.worker.timeout", "600") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

print(f"✓ SparkSession créée")

# Test
try:
    test_data = [(1, "test"), (2, "spark")]
    df = spark.createDataFrame(test_data, ["id", "nom"])
    
    df.write.mode("overwrite").csv("./test_output")
    print("✓ SUCCÈS - Écriture CSV réussie !")
    
    import shutil
    shutil.rmtree("./test_output", ignore_errors=True)
    
except Exception as e:
    print(f"✗ Erreur : {type(e).__name__}")

spark.stop()

PYSPARK_PYTHON défini : c:\Users\Utilisateur\Desktop\Big Data\Spark\SPARK_initiation\SPARK_initiation\venv\Scripts\python.exe
✓ SparkSession créée
✗ Erreur : Py4JJavaError


In [1]:
import os
import sys
from pyspark.sql import SparkSession

print("=" * 60)
print("TEST FINAL - CONFIGURATION COMPLÈTE")
print("=" * 60)

# Définir les variables Python
python_path = sys.executable
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

print(f"\n1. Version Python : {sys.version.split()[0]}")
print(f"   Chemin : {python_path}")

# Créer SparkSession
spark = SparkSession.builder \
    .appName("Test Final") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
    .getOrCreate()

print(f"\n2. Spark version : {spark.version}")
print(f"   Hadoop version : {spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")

# TEST D'ÉCRITURE (le test qui plantait avant !)
try:
    test_data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
    df = spark.createDataFrame(test_data, ["id", "nom"])
    
    # Écriture CSV
    df.write.mode("overwrite").csv("./test_final_csv")
    print("\n3. ✅ Écriture CSV réussie !")
    
    # Écriture Parquet
    df.write.mode("overwrite").parquet("./test_final_parquet")
    print("   ✅ Écriture Parquet réussie !")
    
    # Nettoyage
    import shutil
    shutil.rmtree("./test_final_csv", ignore_errors=True)
    shutil.rmtree("./test_final_parquet", ignore_errors=True)
    
    print("\n" + "=" * 60)
    print("🎉 SUCCÈS TOTAL ! TOUT FONCTIONNE ! 🎉")
    print("=" * 60)
    
except Exception as e:
    print(f"\n❌ Erreur : {e}")

spark.stop()

TEST FINAL - CONFIGURATION COMPLÈTE

1. Version Python : 3.12.10
   Chemin : c:\Users\Utilisateur\Desktop\Big Data\Spark\SPARK_initiation\SPARK_initiation\venv\Scripts\python.exe

2. Spark version : 4.0.1
   Hadoop version : 3.4.1

❌ Erreur : An error occurred while calling o51.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7) (HDF-PC-PF1AVK92 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed). Consider setting 'spark.sql.execution.pyspark.udf.faulthandler.enabled' or'spark.python.worker.faulthandler.enabled' configuration to 'true' for the better Python traceback.
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:599)
	at scala.runtime.AbstractPartialFunction.appl

In [1]:
import os
import sys
from pyspark.sql import SparkSession

print("=" * 60)
print("TEST AVEC PYSPARK 3.5.3")
print("=" * 60)

python_path = sys.executable
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

print(f"\n1. Python : {sys.version.split()[0]}")

spark = SparkSession.builder \
    .appName("Test Final 3.5.3") \
    .getOrCreate()

print(f"2. Spark : {spark.version}")

# TEST D'ÉCRITURE
try:
    df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "nom"])
    df.write.mode("overwrite").csv("./test_final")
    print("\n✅ SUCCÈS ! Écriture CSV réussie !")
    
    import shutil
    shutil.rmtree("./test_final", ignore_errors=True)
    
    print("🎉 TOUT FONCTIONNE PARFAITEMENT ! 🎉")
    
except Exception as e:
    print(f"\n❌ Erreur : {e}")

spark.stop()

ImportError: cannot import name '_with_origin' from 'pyspark.errors.utils' (c:\Users\Utilisateur\Desktop\Big Data\Spark\SPARK_initiation\SPARK_initiation\venv\Lib\site-packages\pyspark\errors\utils.py)

In [2]:
import os
import sys
from pyspark.sql import SparkSession

print("=" * 60)
print("TEST FINAL - PYSPARK 3.5.3")
print("=" * 60)

python_path = sys.executable
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

print(f"\n1. Python : {sys.version.split()[0]}")

spark = SparkSession.builder \
    .appName("Test Final") \
    .getOrCreate()

print(f"2. Spark : {spark.version}")

# TEST D'ÉCRITURE (celui qui plantait !)
try:
    df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "nom"])
    df.write.mode("overwrite").csv("./test_final")
    print("\n✅ SUCCÈS ! Écriture CSV réussie !")
    
    import shutil
    shutil.rmtree("./test_final", ignore_errors=True)
    
    print("\n" + "=" * 60)
    print("🎉 CONFIGURATION COMPLÈTE ET FONCTIONNELLE ! 🎉")
    print("=" * 60)
    
except Exception as e:
    print(f"\n❌ Erreur : {e}")

spark.stop()

TEST FINAL - PYSPARK 3.5.3

1. Python : 3.12.10
2. Spark : 3.5.3

❌ Erreur : An error occurred while calling o44.csv.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 0.0 failed 1 times, most recent failure: Lost task 9.0 in stage 0.0 (TID 9) (HDF-PC-PF1AVK92 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.ha

In [3]:
import os
import sys
from pyspark.sql import SparkSession

python_path = sys.executable
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

spark = SparkSession.builder.appName("Test").getOrCreate()

# Test SANS écriture - juste affichage
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "nom"])
df.show()  # Juste afficher, pas écrire

spark.stop()

Py4JJavaError: An error occurred while calling o90.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (HDF-PC-PF1AVK92 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:386)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:612)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:594)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:789)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:386)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	... 26 more
