In [1]:
from pyspark.sql import SparkSession

In [2]:
SparkSession

pyspark.sql.session.SparkSession

In [2]:
# Initialisation de Spark
import findspark
import json 
import os 
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NotebookTest") \
    .master("spark://spark-master:7077") \
    .getOrCreate()



print("Session Spark connectée :", spark.version)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/10 11:33:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Session Spark connectée : 3.5.7


In [4]:
base_nosql = spark.read.option("header", "true").csv("hdfs://namenode:8020/data_collection/base_nosql.csv")

                                                                                

In [5]:
base_nosql.show()

+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
| nom| prenom|age|sexe|      ville|     maladie|             hopital|date_consultation|   medecin|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
|Diop|    Awa| 32|   F|      Dakar|     Diabète|   Hôpital Principal|       2025-10-10|Dr. Ndiaye|
|Fall|Mamadou| 45|   M|      Thiès|Hypertension|Hôpital Régional ...|       2025-09-22|Dr. Diallo|
| Sow|  Fatou| 28|   F|Saint-Louis|   Paludisme|Centre de Santé d...|       2025-10-02|    Dr. Sy|
|Diop|    Awa| 32|   F|      Dakar|     Diabète|   Hôpital Principal|       2025-10-10|Dr. Ndiaye|
|Fall|Mamadou| 45|   M|      Thiès|Hypertension|Hôpital Régional ...|       2025-09-22|Dr. Diallo|
| Sow|  Fatou| 28|   F|Saint-Louis|   Paludisme|Centre de Santé d...|       2025-10-02|    Dr. Sy|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+



In [13]:
#Voir la structure des données
base_nosql.printSchema()



root
 |-- nom: string (nullable = true)
 |-- prenom: string (nullable = true)
 |-- age: string (nullable = true)
 |-- sexe: string (nullable = true)
 |-- ville: string (nullable = true)
 |-- maladie: string (nullable = true)
 |-- hopital: string (nullable = true)
 |-- date_consultation: string (nullable = true)
 |-- medecin: string (nullable = true)



In [17]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType

# Convertir age en entier
base_nosql = base_nosql.withColumn("age", col("age").cast(IntegerType()))

# Convertir date_consultation en date
base_nosql = base_nosql.withColumn("date_consultation", to_date(col("date_consultation"), "yyyy-MM-dd"))

base_nosql.printSchema()

root
 |-- nom: string (nullable = true)
 |-- prenom: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sexe: string (nullable = true)
 |-- ville: string (nullable = true)
 |-- maladie: string (nullable = true)
 |-- hopital: string (nullable = true)
 |-- date_consultation: date (nullable = true)
 |-- medecin: string (nullable = true)



In [19]:
#Voir le nombre total de lignes
base_nosql.count()

6

In [25]:
#supprimer les doublons

base_nosql = base_nosql.dropDuplicates()
base_nosql.show()



+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
| nom| prenom|age|sexe|      ville|     maladie|             hopital|date_consultation|   medecin|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
|Diop|    Awa| 32|   F|      Dakar|     Diabète|   Hôpital Principal|       2025-10-10|Dr. Ndiaye|
| Sow|  Fatou| 28|   F|Saint-Louis|   Paludisme|Centre de Santé d...|       2025-10-02|    Dr. Sy|
|Fall|Mamadou| 45|   M|      Thiès|Hypertension|Hôpital Régional ...|       2025-09-22|Dr. Diallo|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+



In [32]:
#Nettoyer les espaces dans toutes les colonnes texte
from pyspark.sql.functions import trim

for col_name in base_nosql.columns:
    base_nosql = base_nosql.withColumn(col_name, trim(col(col_name)))
base_nosql.show()


+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
| nom| prenom|age|sexe|      ville|     maladie|             hopital|date_consultation|   medecin|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
|Diop|    Awa| 32|   F|      Dakar|     Diabète|   Hôpital Principal|       2025-10-10|Dr. Ndiaye|
| Sow|  Fatou| 28|   F|Saint-louis|   Paludisme|Centre De Santé D...|       2025-10-02|    Dr. Sy|
|Fall|Mamadou| 45|   M|      Thiès|Hypertension|Hôpital Régional ...|       2025-09-22|Dr. Diallo|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+



In [32]:
#Nettoyer les espaces dans toutes les colonnes texte
from pyspark.sql.functions import trim

for col_name in base_nosql.columns:
    base_nosql = base_nosql.withColumn(col_name, trim(col(col_name)))
base_nosql.show()


+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
| nom| prenom|age|sexe|      ville|     maladie|             hopital|date_consultation|   medecin|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
|Diop|    Awa| 32|   F|      Dakar|     Diabète|   Hôpital Principal|       2025-10-10|Dr. Ndiaye|
| Sow|  Fatou| 28|   F|Saint-louis|   Paludisme|Centre De Santé D...|       2025-10-02|    Dr. Sy|
|Fall|Mamadou| 45|   M|      Thiès|Hypertension|Hôpital Régional ...|       2025-09-22|Dr. Diallo|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+



In [31]:
#Mettre la première lettre en Majuscule pour les colonnes texte (nom, prenom, ville, maladie, hopital, medecin, sexe)
from pyspark.sql.functions import initcap

cols_to_format = ["nom", "prenom", "ville", "maladie", "hopital", "medecin", "sexe"]

for c in cols_to_format:
    base_nosql = base_nosql.withColumn(c, initcap(col(c)))

base_nosql.show()


+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
| nom| prenom|age|sexe|      ville|     maladie|             hopital|date_consultation|   medecin|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+
|Diop|    Awa| 32|   F|      Dakar|     Diabète|   Hôpital Principal|       2025-10-10|Dr. Ndiaye|
| Sow|  Fatou| 28|   F|Saint-louis|   Paludisme|Centre De Santé D...|       2025-10-02|    Dr. Sy|
|Fall|Mamadou| 45|   M|      Thiès|Hypertension|Hôpital Régional ...|       2025-09-22|Dr. Diallo|
+----+-------+---+----+-----------+------------+--------------------+-----------------+----------+



In [33]:
base_sql = spark.read.option("header", "true").csv("hdfs://namenode:8020/data_collection/base_sql.csv")
base_sql.show()

+---+-----------------+---+----+---------------+-----------------+
| id|              nom|age|sexe|        maladie|date_consultation|
+---+-----------------+---+----+---------------+-----------------+
|  1|Catherine Michaud| 73|   F|         Asthme|       2024-10-28|
|  2|  Richard Thierry| 84|   M|    Tuberculose|       2024-07-25|
|  3|    Aurore Brunet| 26|   F|            VIH|       2024-09-10|
|  4|   Tristan Renaud| 31|   F|         Asthme|       2025-11-02|
|  5|     Marine Neveu| 38|   F|            VIH|       2025-04-29|
|  6| Marthe Letellier| 60|   M|        Diabète|       2024-04-22|
|  7|    Astrid Perret|  4|   F|Gastro-entérite|       2025-05-02|
|  8|   Marcelle Costa| 40|   F|       Covid-19|       2024-03-20|
|  9|   Nathalie David| 34|   F|        Choléra|       2024-08-27|
| 10|    Victor Menard| 55|   M|    Tuberculose|       2025-05-09|
| 11|  Édouard Vasseur| 61|   M|   Hypertension|       2024-01-01|
| 12|       David Blot| 38|   F|         Grippe|       2025-01

In [35]:
# voir la structure des données
base_sql.printSchema()

root
 |-- id: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- age: string (nullable = true)
 |-- sexe: string (nullable = true)
 |-- maladie: string (nullable = true)
 |-- date_consultation: string (nullable = true)



In [44]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType

# Convertir age en entier
base_sql = base_sql.withColumn("age", col("age").cast(IntegerType()))

# Convertir id en entier
base_sql = base_sql.withColumn("id", col("id").cast(IntegerType()))


# Convertir date_consultation en date
base_sql = base_sql.withColumn("date_consultation", to_date(col("date_consultation"), "yyyy-MM-dd"))

base_sql.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nom: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sexe: string (nullable = true)
 |-- maladie: string (nullable = true)
 |-- date_consultation: date (nullable = true)



In [37]:
#voir le nombre total de lignes
base_sql.count()

200

In [38]:
# Supprimer les doublons
base_sql = base_sql.dropDuplicates()
base_sql.show()

+---+----------------+---+----+---------------+-----------------+
| id|             nom|age|sexe|        maladie|date_consultation|
+---+----------------+---+----+---------------+-----------------+
| 10|   Victor Menard| 55|   M|    Tuberculose|       2025-05-09|
| 94| Bertrand Rocher| 80|   M|       Covid-19|       2025-02-18|
|194|  Michèle Thomas| 39|   M|         Asthme|       2023-12-16|
|117|  Thomas Gautier| 53|   M|   Hypertension|       2024-05-15|
|143| Claudine Dubois| 53|   M|   Hypertension|       2025-03-24|
| 13| Michelle Martin| 24|   M|         Grippe|       2024-05-28|
| 42|  René Rodrigues| 75|   M|            VIH|       2025-06-09|
| 27|Georges Chrétien| 76|   M|        Choléra|       2025-08-03|
|144|    Alix Antoine| 80|   M|       Covid-19|       2024-01-04|
|197|Marianne Lemaire| 72|   M|   Hypertension|       2025-05-10|
|156| Charles Jacques| 14|   M|         Grippe|       2024-02-12|
| 14| Émilie Leclercq| 35|   M|Gastro-entérite|       2024-03-21|
|108|  Hél

In [39]:
base_sql.count()

200

In [40]:
#Nettoyer les espaces dans toutes les colonnes texte
from pyspark.sql.functions import trim

for col_name in base_sql.columns:
    base_sql = base_sql.withColumn(col_name, trim(col(col_name)))
base_sql.show()


+---+----------------+---+----+---------------+-----------------+
| id|             nom|age|sexe|        maladie|date_consultation|
+---+----------------+---+----+---------------+-----------------+
| 10|   Victor Menard| 55|   M|    Tuberculose|       2025-05-09|
| 94| Bertrand Rocher| 80|   M|       Covid-19|       2025-02-18|
|194|  Michèle Thomas| 39|   M|         Asthme|       2023-12-16|
|117|  Thomas Gautier| 53|   M|   Hypertension|       2024-05-15|
|143| Claudine Dubois| 53|   M|   Hypertension|       2025-03-24|
| 13| Michelle Martin| 24|   M|         Grippe|       2024-05-28|
| 42|  René Rodrigues| 75|   M|            VIH|       2025-06-09|
| 27|Georges Chrétien| 76|   M|        Choléra|       2025-08-03|
|144|    Alix Antoine| 80|   M|       Covid-19|       2024-01-04|
|197|Marianne Lemaire| 72|   M|   Hypertension|       2025-05-10|
|156| Charles Jacques| 14|   M|         Grippe|       2024-02-12|
| 14| Émilie Leclercq| 35|   M|Gastro-entérite|       2024-03-21|
|108|  Hél

In [41]:
#Mettre la première lettre en Majuscule pour les colonnes texte (nom, prenom, ville, maladie, hopital, medecin, sexe)
from pyspark.sql.functions import initcap

cols_to_format = ["nom", "maladie", "sexe"]

for c in cols_to_format:
    base_sql = base_sql.withColumn(c, initcap(col(c)))

base_sql.show()

+---+----------------+---+----+---------------+-----------------+
| id|             nom|age|sexe|        maladie|date_consultation|
+---+----------------+---+----+---------------+-----------------+
| 10|   Victor Menard| 55|   M|    Tuberculose|       2025-05-09|
| 94| Bertrand Rocher| 80|   M|       Covid-19|       2025-02-18|
|194|  Michèle Thomas| 39|   M|         Asthme|       2023-12-16|
|117|  Thomas Gautier| 53|   M|   Hypertension|       2024-05-15|
|143| Claudine Dubois| 53|   M|   Hypertension|       2025-03-24|
| 13| Michelle Martin| 24|   M|         Grippe|       2024-05-28|
| 42|  René Rodrigues| 75|   M|            Vih|       2025-06-09|
| 27|Georges Chrétien| 76|   M|        Choléra|       2025-08-03|
|144|    Alix Antoine| 80|   M|       Covid-19|       2024-01-04|
|197|Marianne Lemaire| 72|   M|   Hypertension|       2025-05-10|
|156| Charles Jacques| 14|   M|         Grippe|       2024-02-12|
| 14| Émilie Leclercq| 35|   M|Gastro-entérite|       2024-03-21|
|108|  Hél

In [42]:
donnee_sante = spark.read.option("header", "true").csv("hdfs://namenode:8020/data_collection/donnee_sante.csv")
donnee_sante.show()

+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| id|     region|             hopital|sexe|age|     maladie|date_consultation|   traitement|cout_consultation|
+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
|  1| Ziguinchor|Hôpital Aristide ...|   M|  5|     Diabète|       2024-04-22|     Quinimax|            13727|
|  2|      Dakar|   Hôpital Principal|   F| 12|Hypertension|       2024-08-28|     Quinimax|             2920|
|  3|Tambacounda|Hôpital de Ziguin...|   M| 85|   Paludisme|       2024-01-08|  Paracétamol|             4863|
|  4|      Dakar|Hôpital Aristide ...|   F| 27|   Paludisme|       2024-02-07|Antibiotiques|            17624|
|  5|      Thiès|    Hôpital Régional|   M| 40| Tuberculose|       2024-08-21|Antibiotiques|             7540|
|  6|      Dakar|Centre de Santé T...|   M| 21|Hypertension|       2024-02-04|     Quinimax|             9731|
|

In [43]:
donnee_sante.printSchema()

root
 |-- id: string (nullable = true)
 |-- region: string (nullable = true)
 |-- hopital: string (nullable = true)
 |-- sexe: string (nullable = true)
 |-- age: string (nullable = true)
 |-- maladie: string (nullable = true)
 |-- date_consultation: string (nullable = true)
 |-- traitement: string (nullable = true)
 |-- cout_consultation: string (nullable = true)



In [45]:
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import IntegerType

# Convertir age en entier
donnee_sante = donnee_sante.withColumn("age", col("age").cast(IntegerType()))

# Convertir id en entier
donnee_sante = donnee_sante.withColumn("id", col("id").cast(IntegerType()))

# Convertir cout_consultation en entier
donnee_sante = donnee_sante.withColumn("cout_consultation", col("cout_consultation").cast(IntegerType()))


# Convertir date_consultation en date
donnee_sante = donnee_sante.withColumn("date_consultation", to_date(col("date_consultation"), "yyyy-MM-dd"))

donnee_sante.printSchema()

root
 |-- id: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- hopital: string (nullable = true)
 |-- sexe: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- maladie: string (nullable = true)
 |-- date_consultation: date (nullable = true)
 |-- traitement: string (nullable = true)
 |-- cout_consultation: integer (nullable = true)



In [46]:
# Supprimer les doublons
donnee_sante = donnee_sante.dropDuplicates()
donnee_sante.show()

+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| id|     region|             hopital|sexe|age|     maladie|date_consultation|   traitement|cout_consultation|
+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| 92|Saint-Louis|   Hôpital Principal|   F| 16|      Grippe|       2024-10-01|  Paracétamol|            12485|
| 97|      Dakar|Hôpital Aristide ...|   M| 46|     Diabète|       2024-09-24|        Repos|            17703|
| 22|    Kaolack|    Hôpital Régional|   M| 30|Hypertension|       2024-07-04|     Quinimax|             3894|
| 90|    Kaolack|    Hôpital Régional|   F| 11|    Covid-19|       2024-09-10|  Chloroquine|            16685|
|  6|      Dakar|Centre de Santé T...|   M| 21|Hypertension|       2024-02-04|     Quinimax|             9731|
| 59|      Dakar|   Hôpital Principal|   M|  8| Tuberculose|       2024-06-05|Antibiotiques|             4484|
|

In [47]:
donnee_sante.count()

100

In [48]:
#Nettoyer les espaces dans toutes les colonnes texte
from pyspark.sql.functions import trim

for col_name in donnee_sante.columns:
    donnee_sante = donnee_sante.withColumn(col_name, trim(col(col_name)))
donnee_sante.show()

+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| id|     region|             hopital|sexe|age|     maladie|date_consultation|   traitement|cout_consultation|
+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| 92|Saint-Louis|   Hôpital Principal|   F| 16|      Grippe|       2024-10-01|  Paracétamol|            12485|
| 97|      Dakar|Hôpital Aristide ...|   M| 46|     Diabète|       2024-09-24|        Repos|            17703|
| 22|    Kaolack|    Hôpital Régional|   M| 30|Hypertension|       2024-07-04|     Quinimax|             3894|
| 90|    Kaolack|    Hôpital Régional|   F| 11|    Covid-19|       2024-09-10|  Chloroquine|            16685|
|  6|      Dakar|Centre de Santé T...|   M| 21|Hypertension|       2024-02-04|     Quinimax|             9731|
| 59|      Dakar|   Hôpital Principal|   M|  8| Tuberculose|       2024-06-05|Antibiotiques|             4484|
|

In [49]:
#Mettre la première lettre en Majuscule pour les colonnes texte (nom, prenom, ville, maladie, hopital, medecin, sexe)
from pyspark.sql.functions import initcap

cols_to_format = ["region", "hopital", "sexe", "maladie", "traitement"]

for c in cols_to_format:
   donnee_sante = donnee_sante.withColumn(c, initcap(col(c)))

donnee_sante.show()

+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| id|     region|             hopital|sexe|age|     maladie|date_consultation|   traitement|cout_consultation|
+---+-----------+--------------------+----+---+------------+-----------------+-------------+-----------------+
| 92|Saint-louis|   Hôpital Principal|   F| 16|      Grippe|       2024-10-01|  Paracétamol|            12485|
| 97|      Dakar|Hôpital Aristide ...|   M| 46|     Diabète|       2024-09-24|        Repos|            17703|
| 22|    Kaolack|    Hôpital Régional|   M| 30|Hypertension|       2024-07-04|     Quinimax|             3894|
| 90|    Kaolack|    Hôpital Régional|   F| 11|    Covid-19|       2024-09-10|  Chloroquine|            16685|
|  6|      Dakar|Centre De Santé T...|   M| 21|Hypertension|       2024-02-04|     Quinimax|             9731|
| 59|      Dakar|   Hôpital Principal|   M|  8| Tuberculose|       2024-06-05|Antibiotiques|             4484|
|

In [50]:
donnees_sanitaires_api = spark.read.option("header", "true").csv("hdfs://namenode:8020/data_collection/donnees_sanitaires_api.csv")
donnees_sanitaires_api.show()

+-------------+-------------------+--------------------+--------+----------+------+-----------+---------+--------------+-------+--------+------------------+-------------------+---------+------------------+----------+-----------------+----------------+-----------------+----------------+-------------------+----------------------+---------------------+
|      updated|            country|         countryInfo|   cases|todayCases|deaths|todayDeaths|recovered|todayRecovered| active|critical|casesPerOneMillion|deathsPerOneMillion|    tests|testsPerOneMillion|population|        continent|oneCasePerPeople|oneDeathPerPeople|oneTestPerPeople|activePerOneMillion|recoveredPerOneMillion|criticalPerOneMillion|
+-------------+-------------------+--------------------+--------+----------+------+-----------+---------+--------------+-------+--------+------------------+-------------------+---------+------------------+----------+-----------------+----------------+-----------------+----------------+----------

In [51]:
donnees_sanitaires_api.printSchema()

root
 |-- updated: string (nullable = true)
 |-- country: string (nullable = true)
 |-- countryInfo: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- todayCases: string (nullable = true)
 |-- deaths: string (nullable = true)
 |-- todayDeaths: string (nullable = true)
 |-- recovered: string (nullable = true)
 |-- todayRecovered: string (nullable = true)
 |-- active: string (nullable = true)
 |-- critical: string (nullable = true)
 |-- casesPerOneMillion: string (nullable = true)
 |-- deathsPerOneMillion: string (nullable = true)
 |-- tests: string (nullable = true)
 |-- testsPerOneMillion: string (nullable = true)
 |-- population: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- oneCasePerPeople: string (nullable = true)
 |-- oneDeathPerPeople: string (nullable = true)
 |-- oneTestPerPeople: string (nullable = true)
 |-- activePerOneMillion: string (nullable = true)
 |-- recoveredPerOneMillion: string (nullable = true)
 |-- criticalPerOneMillion: 

In [52]:
from pyspark.sql.functions import col

# Colonnes ENTIERES (int)
cols_int = [
    "cases","todayCases","deaths","todayDeaths","recovered","todayRecovered",
    "active","critical","tests","population","oneCasePerPeople",
    "oneDeathPerPeople","oneTestPerPeople"
]

# Colonnes DECIMALES (double)
cols_double = [
    "casesPerOneMillion","deathsPerOneMillion","testsPerOneMillion",
    "activePerOneMillion","recoveredPerOneMillion","criticalPerOneMillion"
]

# Conversion timestamp (long)
donnees_sanitaires_api = donnees_sanitaires_api.withColumn("updated", col("updated").cast("long"))

# Conversion INT
for c in cols_int:
   donnees_sanitaires_api = donnees_sanitaires_api.withColumn(c, col(c).cast("int"))

# Conversion DOUBLE
for c in cols_double:
    donnees_sanitaires_api = donnees_sanitaires_api.withColumn(c, col(c).cast("double"))

donnees_sanitaires_api.printSchema()


root
 |-- updated: long (nullable = true)
 |-- country: string (nullable = true)
 |-- countryInfo: string (nullable = true)
 |-- cases: integer (nullable = true)
 |-- todayCases: integer (nullable = true)
 |-- deaths: integer (nullable = true)
 |-- todayDeaths: integer (nullable = true)
 |-- recovered: integer (nullable = true)
 |-- todayRecovered: integer (nullable = true)
 |-- active: integer (nullable = true)
 |-- critical: integer (nullable = true)
 |-- casesPerOneMillion: double (nullable = true)
 |-- deathsPerOneMillion: double (nullable = true)
 |-- tests: integer (nullable = true)
 |-- testsPerOneMillion: double (nullable = true)
 |-- population: integer (nullable = true)
 |-- continent: string (nullable = true)
 |-- oneCasePerPeople: integer (nullable = true)
 |-- oneDeathPerPeople: integer (nullable = true)
 |-- oneTestPerPeople: integer (nullable = true)
 |-- activePerOneMillion: double (nullable = true)
 |-- recoveredPerOneMillion: double (nullable = true)
 |-- criticalPerO

In [53]:
# Supprimer les doublons
donnees_sanitaires_api = donnees_sanitaires_api.dropDuplicates()
donnees_sanitaires_api.show()

+-------------+--------------------+--------------------+--------+----------+------+-----------+---------+--------------+------+--------+------------------+-------------------+--------+------------------+----------+-------------+----------------+-----------------+----------------+-------------------+----------------------+---------------------+
|      updated|             country|         countryInfo|   cases|todayCases|deaths|todayDeaths|recovered|todayRecovered|active|critical|casesPerOneMillion|deathsPerOneMillion|   tests|testsPerOneMillion|population|    continent|oneCasePerPeople|oneDeathPerPeople|oneTestPerPeople|activePerOneMillion|recoveredPerOneMillion|criticalPerOneMillion|
+-------------+--------------------+--------------------+--------+----------+------+-----------+---------+--------------+------+--------+------------------+-------------------+--------+------------------+----------+-------------+----------------+-----------------+----------------+-------------------+-----

In [54]:
donnees_sanitaires_api.count()

231

In [55]:
#Nettoyer les espaces dans toutes les colonnes texte
from pyspark.sql.functions import trim

for col_name in donnees_sanitaires_api.columns:
    donnees_sanitaires_api = donnees_sanitaires_api.withColumn(col_name, trim(col(col_name)))
donnees_sanitaires_api.show()

+-------------+--------------------+--------------------+--------+----------+------+-----------+---------+--------------+------+--------+------------------+-------------------+--------+------------------+----------+-------------+----------------+-----------------+----------------+-------------------+----------------------+---------------------+
|      updated|             country|         countryInfo|   cases|todayCases|deaths|todayDeaths|recovered|todayRecovered|active|critical|casesPerOneMillion|deathsPerOneMillion|   tests|testsPerOneMillion|population|    continent|oneCasePerPeople|oneDeathPerPeople|oneTestPerPeople|activePerOneMillion|recoveredPerOneMillion|criticalPerOneMillion|
+-------------+--------------------+--------------------+--------+----------+------+-----------+---------+--------------+------+--------+------------------+-------------------+--------+------------------+----------+-------------+----------------+-----------------+----------------+-------------------+-----

In [23]:
# Paramètres de connexion
url = "jdbc:postgresql://postgres:5432/post_db_collect"
user = "admin"
password = "admin"
driver = "org.postgresql.Driver" 

In [25]:
# Lire une table existante (exemple : table 'clients')
df_pg = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "information_schema.tables") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", driver) \
    .load()

df_pg.show(5)


Py4JJavaError: An error occurred while calling o168.load.
: org.postgresql.util.PSQLException: FATAL: password authentication failed for user "('admin',)"
	at org.postgresql.core.v3.ConnectionFactoryImpl.doAuthentication(ConnectionFactoryImpl.java:698)
	at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:207)
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:262)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:273)
	at org.postgresql.Driver.makeConnection(Driver.java:446)
	at org.postgresql.Driver.connect(Driver.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:161)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:157)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:241)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
