## pip install pyspark


In [None]:
#!pip install pyspark


## 1.Création d'une session Spark

In [1]:
from pyspark.sql import SparkSession        # SparkSession est le point d’entrée principal pour travailler avec DataFrames dans PySpark.

spark = SparkSession.builder \
    .appName("MonPremierPySpark") \
    .getOrCreate()


##2. Chargement des données

In [2]:
df = spark.read.csv("Espérance de vie.csv", header=True, inferSchema=True) # header=True : la première ligne contient les noms de colonnes et inferSchema=True : Spark devine les types de données


In [None]:
df.show(5) # afficher les données

+---+-----------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+----------+-----------+---------------------+-------------------+--------------------------------------+-------------+
|_c0|       Pays|Année|    Statut|Espérance de vie|Mortalité des adultes|Décès de nourrissons|Alcool|Dépenses en pourcentage|Hépatite B|Rougeole| IMC|Décès d'enfants de moins de cinq ans|Polio|Dépenses totales|Diphtérie|VIH/SIDA |       PIB| Population| thinness  1-19 years| thinness 5-9 years|Composition des revenus des ressources|Scolarisation|
+---+-----------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+----------+-----------+---------------------+-------------------+-------

In [None]:
type(df) # type de données

## 3. Manipulation de DataFrames

In [3]:
df.printSchema()  # Afficher le schéma




root
 |-- _c0: integer (nullable = true)
 |-- Pays: string (nullable = true)
 |-- Année: integer (nullable = true)
 |-- Statut: string (nullable = true)
 |-- Espérance de vie: double (nullable = true)
 |-- Mortalité des adultes: double (nullable = true)
 |-- Décès de nourrissons: integer (nullable = true)
 |-- Alcool: double (nullable = true)
 |-- Dépenses en pourcentage: double (nullable = true)
 |-- Hépatite B: double (nullable = true)
 |-- Rougeole: integer (nullable = true)
 |-- IMC: double (nullable = true)
 |-- Décès d'enfants de moins de cinq ans: integer (nullable = true)
 |-- Polio: double (nullable = true)
 |-- Dépenses totales: double (nullable = true)
 |-- Diphtérie: double (nullable = true)
 |-- VIH/SIDA : double (nullable = true)
 |-- PIB: double (nullable = true)
 |-- Population: double (nullable = true)
 |--  thinness  1-19 years: double (nullable = true)
 |--  thinness 5-9 years: double (nullable = true)
 |-- Composition des revenus des ressources: double (nullable = t

In [4]:
df.select("Pays", "Statut","Espérance de vie").show() # Sélection de colonnes


+-----------+----------+----------------+
|       Pays|    Statut|Espérance de vie|
+-----------+----------+----------------+
|Afghanistan|Developing|            65.0|
|Afghanistan|Developing|            59.9|
|Afghanistan|Developing|            59.9|
|Afghanistan|Developing|            59.5|
|Afghanistan|Developing|            59.2|
|Afghanistan|Developing|            58.8|
|Afghanistan|Developing|            58.6|
|Afghanistan|Developing|            58.1|
|Afghanistan|Developing|            57.5|
|Afghanistan|Developing|            57.3|
|Afghanistan|Developing|            57.3|
|Afghanistan|Developing|            57.0|
|Afghanistan|Developing|            56.7|
|Afghanistan|Developing|            56.2|
|Afghanistan|Developing|            55.3|
|Afghanistan|Developing|            54.8|
|    Albania|Developing|            77.8|
|    Albania|Developing|            77.5|
|    Albania|Developing|            77.2|
|    Albania|Developing|            76.9|
+-----------+----------+----------

In [5]:
from pyspark.sql.functions import col            # Ajout de colonnes

df = df.withColumn("Espérance de vie_plus_5", col("Espérance de vie") + 5)
df.show()


+---+-----------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+-----------+-----------+---------------------+-------------------+--------------------------------------+-------------+-----------------------+
|_c0|       Pays|Année|    Statut|Espérance de vie|Mortalité des adultes|Décès de nourrissons|Alcool|Dépenses en pourcentage|Hépatite B|Rougeole| IMC|Décès d'enfants de moins de cinq ans|Polio|Dépenses totales|Diphtérie|VIH/SIDA |        PIB| Population| thinness  1-19 years| thinness 5-9 years|Composition des revenus des ressources|Scolarisation|Espérance de vie_plus_5|
+---+-----------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+-----------+----------

## 4. Filtres, Sélections, Agrégations

In [6]:
df.filter(col("Année") > 2013).show()             # Filtrer les données


+---+-------------------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+-----------+-----------+---------------------+-------------------+--------------------------------------+-------------+-----------------------+
|_c0|               Pays|Année|    Statut|Espérance de vie|Mortalité des adultes|Décès de nourrissons|Alcool|Dépenses en pourcentage|Hépatite B|Rougeole| IMC|Décès d'enfants de moins de cinq ans|Polio|Dépenses totales|Diphtérie|VIH/SIDA |        PIB| Population| thinness  1-19 years| thinness 5-9 years|Composition des revenus des ressources|Scolarisation|Espérance de vie_plus_5|
+---+-------------------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+--------

In [None]:
df.groupBy("Pays").count().show()                                    # Agréger



+--------------------+-----+
|                Pays|count|
+--------------------+-----+
|       Côte d'Ivoire|   16|
|                Chad|   16|
|Micronesia (Feder...|   16|
|            Paraguay|   16|
|               Yemen|   16|
|             Senegal|   16|
|          Cabo Verde|   16|
|              Sweden|   16|
|            Kiribati|   16|
|   Republic of Korea|   16|
|              Guyana|   16|
|             Eritrea|   16|
|         Philippines|   16|
|            Djibouti|   16|
|               Tonga|   16|
|            Malaysia|   16|
|           Singapore|   16|
|                Fiji|   16|
|              Turkey|   16|
|              Malawi|   16|
+--------------------+-----+
only showing top 20 rows



In [None]:
df.groupBy("Pays").avg("Espérance de vie").show()

+--------------------+---------------------+
|                Pays|avg(Espérance de vie)|
+--------------------+---------------------+
|       Côte d'Ivoire|              50.3875|
|                Chad|              50.3875|
|Micronesia (Feder...|                 68.2|
|            Paraguay|              73.1125|
|               Yemen|   63.862500000000004|
|             Senegal|             62.56875|
|          Cabo Verde|             72.51875|
|              Sweden|             82.51875|
|            Kiribati|    65.14999999999999|
|   Republic of Korea|              80.4875|
|              Guyana|              65.6375|
|             Eritrea|              60.6875|
|         Philippines|    67.57499999999999|
|            Djibouti|             60.75625|
|               Tonga|    72.53124999999999|
|            Malaysia|    73.75625000000001|
|           Singapore|    81.47500000000001|
|                Fiji|              68.7125|
|              Turkey|              73.9125|
|         

In [None]:
# Afficher uniquement les français avec Espérance de vie de plus de 70 ans
df.filter((col("Pays") == "France") & (col("Espérance de vie") > 70)).show()


+---+------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+-----------+-----------+---------------------+-------------------+--------------------------------------+-------------+-----------------------+
|_c0|  Pays|Année|    Statut|Espérance de vie|Mortalité des adultes|Décès de nourrissons|Alcool|Dépenses en pourcentage|Hépatite B|Rougeole| IMC|Décès d'enfants de moins de cinq ans|Polio|Dépenses totales|Diphtérie|VIH/SIDA |        PIB| Population| thinness  1-19 years| thinness 5-9 years|Composition des revenus des ressources|Scolarisation|Espérance de vie_plus_5|
+---+------+-----+----------+----------------+---------------------+--------------------+------+-----------------------+----------+--------+----+------------------------------------+-----+----------------+---------+---------+-----------+-----------+-------------

In [None]:
df.select("Pays", "Espérance de vie").filter(col("Pays") == "France").filter(col("Espérance de vie") > 70).show()

+------+----------------+
|  Pays|Espérance de vie|
+------+----------------+
|France|            82.4|
|France|            82.2|
|France|            82.0|
|France|            81.5|
|France|            81.7|
|France|            81.3|
|France|            81.1|
|France|            89.0|
|France|            89.0|
|France|            86.0|
|France|            81.0|
|France|            82.0|
|France|            79.3|
|France|            79.2|
|France|            79.0|
|France|            78.8|
+------+----------------+



## 5.Transformations vs Actions

In [None]:
filtre = df.filter(col("Année") > 2014)  # Transformation



In [None]:
filtre.count()  # Action -> déclenche le calcul

183

##6. Qu’est-ce qu’un RDD ?
Un RDD est une collection distribuée d’objets, répartie sur plusieurs nœuds du cluster. Les RDDs permettent des opérations parallèles, tolérantes aux pannes

## a. Création d’un RDD à partir d’une liste Python

In [7]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])


In [None]:
rdd2 = rdd.map(lambda x: x +2)                              #ransformation avec map
print(rdd2.collect())


[3, 4, 5, 6, 7, 8, 9, 10, 11, 12]


In [None]:
rdd_filtre = rdd2.filter(lambda x: x % 2 == 0)
print(rdd_filtre.collect())


[4, 6, 8, 10, 12]


## b. Création d’un RDDà partir d’un fichier texte

In [12]:
rdd3 = spark.sparkContext.textFile("Data_science.txt")   #  Charger le fichier .txt en RDD


In [13]:
rdd3.take(5)  # Affiche les 5 premières lignes


['La data science est un domaine interdisciplinaire qui utilise des méthodes scientifiques, des algorithmes et des systèmes pour extraire des connaissances et des insights à partir de données structurées et non structurées. Elle combine des compétences issues des mathématiques, de la statistique, de l’informatique et du domaine métier.',
 'Au cœur de la data science, on trouve des étapes clés : la collecte de données, le nettoyage, l’exploration, l’analyse et la modélisation prédictive. Les données peuvent provenir de diverses sources : bases de données, capteurs IoT, réseaux sociaux, fichiers log, etc. Une grande partie du travail du data scientist consiste à rendre ces données exploitables, ce qui inclut la gestion des valeurs manquantes, des doublons et des formats incohérents.',
 'Les outils et langages les plus courants sont Python, R, SQL, ainsi que des bibliothèques comme pandas, scikit-learn, TensorFlow, ou encore PySpark pour le traitement de données volumineuses. Les data sci

In [14]:
rdd3.count()   #  Compter le nombre de lignes


7

Traitement du texte

In [15]:
mots = rdd3.flatMap(lambda ligne: ligne.split(" "))  # Séparer en mots


In [16]:
import re
mots_nettoyes = mots.map(lambda mot: re.sub(r'\W+', '', mot.lower())).filter(lambda mot: mot != "")  # Nettoyer les mots (ponctuation + minuscules)


In [18]:

# Compter la fréquence des mots

paires = mots_nettoyes.map(lambda mot: (mot, 1))
frequence_mots = paires.reduceByKey(lambda a, b: a + b)

# Trier les mots par fréquence décroissante
mots_tries = frequence_mots.sortBy(lambda x: x[1], ascending=False)
mots_tries.take(10)  # Afficher les 10 mots les plus fréquents




[('des', 21),
 ('de', 17),
 ('la', 14),
 ('data', 12),
 ('et', 11),
 ('données', 9),
 ('les', 8),
 ('science', 6),
 ('à', 4),
 ('le', 4)]