# Analyse Big Data - Données COVID-19

**Projet UA3 - Traitement de données massives avec Apache Spark**

---

## Équipe
- Farid Bandoui
- Anis Hemaida
- N'Guessan Yves Guichard Allou
- Chetsong Ndontsop Chetsong

---

## Dataset
- **Source** : Données COVID-19 - Public Health Agency of Canada
- **Taille** : 76 MB, 903 597 lignes
- **Colonnes** : 13 variables (cas positifs, décès, provinces, dates, continents)

---
## 0. Installation et vérification des dépendances
Cette cellule vérifie et installe automatiquement PySpark si nécessaire.

In [1]:
# Verification et installation automatique des dependances
import subprocess
import sys

def install_package(package, version=None):
    """Installe un package Python si non present"""
    package_spec = f"{package}=={version}" if version else package
    try:
        __import__(package)
        print(f"{package} est deja installe")
    except ImportError:
        print(f"Installation de {package_spec}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", 
                              package_spec, "-q"])
        print(f"{package_spec} installe avec succes")

# Installation de PySpark (version 3.5.0 pour compatibilite)
install_package("pyspark", "3.5.0")

# Verification de matplotlib
install_package("matplotlib")

print("\nToutes les dependances sont installees")

pyspark est deja installe
matplotlib est deja installe

Toutes les dependances sont installees


---
## 1. Initialisation de Spark et des librairies

In [2]:
# Importation des librairies Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, max, min, desc, asc
from pyspark.sql.functions import year, month, to_date, when, round as spark_round
from pyspark.sql.types import IntegerType, DoubleType, DateType

# Librairies pour la visualisation
import matplotlib.pyplot as plt
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

# Configuration des graphiques
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10

# Création de la session Spark
spark = SparkSession.builder \
    .appName("COVID-19 Big Data Analysis") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print("Session Spark initialisée avec succès")

Spark Version: 3.5.0
Session Spark initialisée avec succès


---
## 2. Chargement et Exploration des Données
### 2.1 Importation du dataset

In [3]:
# Chargement du fichier CSV avec inférence automatique du schéma
df = spark.read.csv(
    "/home/jovyan/data/covid_data.csv",
    header=True,
    inferSchema=True
)

# Affichage des informations de base
nb_lignes = df.count()
nb_colonnes = len(df.columns)

print(f"Nombre de lignes : {nb_lignes:,}")
print(f"Nombre de colonnes : {nb_colonnes}")

Nombre de lignes : 903,596
Nombre de colonnes : 13


### 2.2 Schéma et aperçu des données

In [4]:
# Affichage du schéma
print("SCHEMA DES DONNEES")
print("-" * 50)
df.printSchema()

SCHEMA DES DONNEES
--------------------------------------------------
root
 |-- PEOPLE_POSITIVE_CASES_COUNT: integer (nullable = true)
 |-- COUNTY_NAME: string (nullable = true)
 |-- PROVINCE_STATE_NAME: string (nullable = true)
 |-- REPORT_DATE: date (nullable = true)
 |-- CONTINENT_NAME: string (nullable = true)
 |-- DATA_SOURCE_NAME: string (nullable = true)
 |-- PEOPLE_DEATH_NEW_COUNT: integer (nullable = true)
 |-- COUNTY_FIPS_NUMBER: integer (nullable = true)
 |-- COUNTRY_ALPHA_3_CODE: string (nullable = true)
 |-- COUNTRY_SHORT_NAME: string (nullable = true)
 |-- COUNTRY_ALPHA_2_CODE: string (nullable = true)
 |-- PEOPLE_POSITIVE_NEW_CASES_COUNT: integer (nullable = true)
 |-- PEOPLE_DEATH_COUNT: integer (nullable = true)



In [5]:
# Aperçu des premières lignes (ACTION: show)
print("APERCU DES DONNEES")
print("-" * 50)
df.show(10, truncate=False)

APERCU DES DONNEES
--------------------------------------------------
+---------------------------+-----------+-------------------------+-----------+--------------+------------------------------+----------------------+------------------+--------------------+------------------+--------------------+-------------------------------+------------------+
|PEOPLE_POSITIVE_CASES_COUNT|COUNTY_NAME|PROVINCE_STATE_NAME      |REPORT_DATE|CONTINENT_NAME|DATA_SOURCE_NAME              |PEOPLE_DEATH_NEW_COUNT|COUNTY_FIPS_NUMBER|COUNTRY_ALPHA_3_CODE|COUNTRY_SHORT_NAME|COUNTRY_ALPHA_2_CODE|PEOPLE_POSITIVE_NEW_CASES_COUNT|PEOPLE_DEATH_COUNT|
+---------------------------+-----------+-------------------------+-----------+--------------+------------------------------+----------------------+------------------+--------------------+------------------+--------------------+-------------------------------+------------------+
|17909                      |NULL       |Alberta                  |2020-09-29 |America    

In [None]:
# Liste des colonnes disponibles
print("LISTE DES COLONNES")
print("-" * 50)
for i, nom_col in enumerate(df.columns, 1):
    print(f"{i:2}. {nom_col}")

### 2.3 Vérification de la qualité des données

In [None]:
# Analyse des valeurs manquantes
from pyspark.sql.functions import sum as spark_sum

print("VALEURS MANQUANTES PAR COLONNE")
print("-" * 50)

null_counts = df.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) 
    for c in df.columns
])

null_counts.show(truncate=False)

In [None]:
# Statistiques descriptives
print("STATISTIQUES DESCRIPTIVES")
print("-" * 50)
df.describe().show()

### 2.4 Nettoyage des données

In [None]:
# TRANSFORMATION: select() - Selection des colonnes pertinentes
df_clean = df.select(
    "COUNTRY_SHORT_NAME",
    "PROVINCE_STATE_NAME",
    "CONTINENT_NAME",
    "REPORT_DATE",
    "PEOPLE_POSITIVE_CASES_COUNT",
    "PEOPLE_POSITIVE_NEW_CASES_COUNT",
    "PEOPLE_DEATH_COUNT",
    "PEOPLE_DEATH_NEW_COUNT"
)

print(f"Colonnes selectionnees : {len(df_clean.columns)}")
df_clean.printSchema()

In [None]:
# TRANSFORMATION: withColumn() - Conversion de la date et extraction annee/mois
df_clean = df_clean \
    .withColumn("REPORT_DATE", to_date(col("REPORT_DATE"), "yyyy-MM-dd")) \
    .withColumn("YEAR", year(col("REPORT_DATE"))) \
    .withColumn("MONTH", month(col("REPORT_DATE")))

print("Colonnes YEAR et MONTH ajoutees")
df_clean.show(5)

In [None]:
# TRANSFORMATION: filter() - Suppression des valeurs nulles critiques
df_filtered = df_clean.filter(
    (col("COUNTRY_SHORT_NAME").isNotNull()) & 
    (col("PEOPLE_POSITIVE_CASES_COUNT").isNotNull())
)

# ACTION: count()
count_avant = df_clean.count()
count_apres = df_filtered.count()

print(f"Lignes avant filtrage : {count_avant:,}")
print(f"Lignes apres filtrage : {count_apres:,}")
print(f"Lignes supprimees : {count_avant - count_apres:,}")

In [None]:
# Preparation du DataFrame final avec mise en cache pour optimisation
df_covid = df_filtered
df_covid.cache()

print("Donnees nettoyees et mises en cache")
print(f"Dataset final : {df_covid.count():,} lignes")

---
## 3. Justification des Choix Techniques

### Pourquoi Apache Spark ?

**1. Spark vs Hadoop MapReduce :**
- Traitement en mémoire (in-memory computing) jusqu'à 100x plus rapide
- API Python (PySpark) intuitive et moderne
- Intégration native avec Jupyter Notebook

**2. DataFrames vs RDD :**
- Optimisation automatique via Catalyst Optimizer
- Syntaxe déclarative proche de SQL et Pandas
- Gestion mémoire optimisée avec Tungsten
- Inférence automatique des schémas

**3. Architecture Docker :**
- Environnement reproductible et portable
- Cluster distribué (Master + Worker)
- Isolation des dépendances

---
## 4. Analyse des Données

### Question 1 : Quels sont les 10 pays les plus touchés ?

In [None]:
# TRANSFORMATION: groupBy() + agg() - Agregation par pays
# TRANSFORMATION: orderBy() - Tri decroissant

top_10_pays = df_covid \
    .groupBy("COUNTRY_SHORT_NAME") \
    .agg(
        max("PEOPLE_POSITIVE_CASES_COUNT").alias("TOTAL_CAS"),
        max("PEOPLE_DEATH_COUNT").alias("TOTAL_DECES")
    ) \
    .orderBy(desc("TOTAL_CAS")) \
    .limit(10)

print("TOP 10 PAYS LES PLUS TOUCHES (CAS POSITIFS)")
print("-" * 60)
top_10_pays.show(truncate=False)

In [None]:
# ACTION: collect() - Recuperation des donnees pour analyse detaillee
top_10_data = top_10_pays.collect()

print("\nANALYSE DETAILLEE")
print("-" * 80)
print(f"{'Rang':<5} {'Pays':<25} {'Cas':>15} {'Deces':>12} {'Taux mortalite':>15}")
print("-" * 80)

for i, row in enumerate(top_10_data, 1):
    pays = row["COUNTRY_SHORT_NAME"]
    cas = row["TOTAL_CAS"] or 0
    deces = row["TOTAL_DECES"] or 0
    taux = (deces / cas * 100) if cas > 0 else 0
    print(f"{i:<5} {pays:<25} {cas:>15,} {deces:>12,} {taux:>14.2f}%")

In [None]:
# Visualisation : Graphique en barres horizontales - Top 10 pays
df_top10_pd = top_10_pays.toPandas()

fig, ax = plt.subplots(figsize=(12, 8))

colors = plt.cm.Reds(range(50, 250, 20))
bars = ax.barh(df_top10_pd['COUNTRY_SHORT_NAME'][::-1], 
               df_top10_pd['TOTAL_CAS'][::-1], 
               color=colors)

ax.set_xlabel('Nombre total de cas', fontsize=12)
ax.set_ylabel('Pays', fontsize=12)
ax.set_title('Top 10 des pays les plus touchés par la COVID-19', fontsize=14, fontweight='bold')

# Ajout des valeurs sur les barres
for bar, val in zip(bars, df_top10_pd['TOTAL_CAS'][::-1]):
    ax.text(bar.get_width() + bar.get_width()*0.01, bar.get_y() + bar.get_height()/2,
            f'{val:,.0f}', va='center', fontsize=9)

plt.tight_layout()
plt.savefig('/home/jovyan/output/graphique_top10_pays.png', dpi=150, bbox_inches='tight')
plt.show()

print("Graphique sauvegarde : graphique_top10_pays.png")

### Question 2 : Comment évoluent les cas dans le temps ?

In [None]:
# Evolution mensuelle des nouveaux cas
evolution_mensuelle = df_covid \
    .groupBy("YEAR", "MONTH") \
    .agg(
        sum("PEOPLE_POSITIVE_NEW_CASES_COUNT").alias("NOUVEAUX_CAS"),
        sum("PEOPLE_DEATH_NEW_COUNT").alias("NOUVEAUX_DECES")
    ) \
    .orderBy("YEAR", "MONTH")

print("EVOLUTION MENSUELLE DES CAS COVID-19")
print("-" * 60)
evolution_mensuelle.show(30, truncate=False)

In [None]:
# Resume par annee
print("RESUME PAR ANNEE")
print("-" * 60)

evolution_annuelle = df_covid \
    .groupBy("YEAR") \
    .agg(
        sum("PEOPLE_POSITIVE_NEW_CASES_COUNT").alias("TOTAL_NOUVEAUX_CAS"),
        sum("PEOPLE_DEATH_NEW_COUNT").alias("TOTAL_NOUVEAUX_DECES")
    ) \
    .orderBy("YEAR")

evolution_annuelle.show(truncate=False)

In [None]:
# Visualisation : Evolution temporelle des cas
df_evolution_pd = evolution_mensuelle.toPandas()
df_evolution_pd['DATE'] = pd.to_datetime(df_evolution_pd['YEAR'].astype(str) + '-' + 
                                          df_evolution_pd['MONTH'].astype(str) + '-01')
df_evolution_pd = df_evolution_pd.sort_values('DATE')

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 10))

# Graphique des nouveaux cas
ax1.fill_between(df_evolution_pd['DATE'], df_evolution_pd['NOUVEAUX_CAS'], 
                 alpha=0.3, color='steelblue')
ax1.plot(df_evolution_pd['DATE'], df_evolution_pd['NOUVEAUX_CAS'], 
         color='steelblue', linewidth=2)
ax1.set_ylabel('Nouveaux cas', fontsize=12)
ax1.set_title('Évolution mensuelle des nouveaux cas COVID-19', fontsize=14, fontweight='bold')
ax1.tick_params(axis='x', rotation=45)

# Graphique des nouveaux deces
ax2.fill_between(df_evolution_pd['DATE'], df_evolution_pd['NOUVEAUX_DECES'], 
                 alpha=0.3, color='indianred')
ax2.plot(df_evolution_pd['DATE'], df_evolution_pd['NOUVEAUX_DECES'], 
         color='indianred', linewidth=2)
ax2.set_xlabel('Date', fontsize=12)
ax2.set_ylabel('Nouveaux décès', fontsize=12)
ax2.set_title('Évolution mensuelle des nouveaux décès COVID-19', fontsize=14, fontweight='bold')
ax2.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.savefig('/home/jovyan/output/graphique_evolution_temporelle.png', dpi=150, bbox_inches='tight')
plt.show()

print("Graphique sauvegarde : graphique_evolution_temporelle.png")

### Question 3 : Quelle est la distribution par continent ?

In [None]:
# Distribution par continent
distribution_continent = df_covid \
    .groupBy("CONTINENT_NAME") \
    .agg(
        max("PEOPLE_POSITIVE_CASES_COUNT").alias("TOTAL_CAS"),
        max("PEOPLE_DEATH_COUNT").alias("TOTAL_DECES"),
        count("*").alias("NB_ENREGISTREMENTS")
    ) \
    .orderBy(desc("TOTAL_CAS"))

print("DISTRIBUTION DES CAS PAR CONTINENT")
print("-" * 60)
distribution_continent.show(truncate=False)

In [None]:
# Calcul du taux de mortalite par continent
print("TAUX DE MORTALITE PAR CONTINENT")
print("-" * 60)

taux_mortalite_continent = distribution_continent \
    .withColumn("TAUX_MORTALITE", 
                spark_round((col("TOTAL_DECES") / col("TOTAL_CAS") * 100), 2)) \
    .select("CONTINENT_NAME", "TOTAL_CAS", "TOTAL_DECES", "TAUX_MORTALITE") \
    .orderBy(desc("TAUX_MORTALITE"))

taux_mortalite_continent.show(truncate=False)

In [None]:
# Visualisation : Diagramme circulaire et barres pour les continents
df_continent_pd = distribution_continent.toPandas()
df_continent_pd = df_continent_pd[df_continent_pd['CONTINENT_NAME'].notna()]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 7))

# Diagramme circulaire - Distribution des cas
colors_pie = plt.cm.Set3(range(len(df_continent_pd)))
explode = [0.02] * len(df_continent_pd)

ax1.pie(df_continent_pd['TOTAL_CAS'], labels=df_continent_pd['CONTINENT_NAME'],
        autopct='%1.1f%%', colors=colors_pie, explode=explode,
        shadow=True, startangle=90)
ax1.set_title('Répartition des cas par continent', fontsize=14, fontweight='bold')

# Graphique en barres - Comparaison cas et deces
x = range(len(df_continent_pd))
width = 0.35

bars1 = ax2.bar([i - width/2 for i in x], df_continent_pd['TOTAL_CAS'], 
               width, label='Cas totaux', color='steelblue')
bars2 = ax2.bar([i + width/2 for i in x], df_continent_pd['TOTAL_DECES'], 
               width, label='Décès totaux', color='indianred')

ax2.set_xlabel('Continent', fontsize=12)
ax2.set_ylabel('Nombre', fontsize=12)
ax2.set_title('Cas et décès par continent', fontsize=14, fontweight='bold')
ax2.set_xticks(x)
ax2.set_xticklabels(df_continent_pd['CONTINENT_NAME'], rotation=45, ha='right')
ax2.legend()
ax2.set_yscale('log')

plt.tight_layout()
plt.savefig('/home/jovyan/output/graphique_continents.png', dpi=150, bbox_inches='tight')
plt.show()

print("Graphique sauvegarde : graphique_continents.png")

### Question 4 : Quelles provinces canadiennes sont les plus touchées ?

In [None]:
# Filtrage pour le Canada et agregation par province
provinces_canada = df_covid \
    .filter(col("COUNTRY_SHORT_NAME") == "Canada") \
    .groupBy("PROVINCE_STATE_NAME") \
    .agg(
        max("PEOPLE_POSITIVE_CASES_COUNT").alias("TOTAL_CAS"),
        max("PEOPLE_DEATH_COUNT").alias("TOTAL_DECES")
    ) \
    .withColumn("TAUX_MORTALITE", 
                spark_round((col("TOTAL_DECES") / col("TOTAL_CAS") * 100), 2)) \
    .orderBy(desc("TOTAL_CAS"))

print("PROVINCES CANADIENNES LES PLUS TOUCHEES")
print("-" * 60)
provinces_canada.show(truncate=False)

In [None]:
# ACTION: take() - Recuperation des 5 premieres provinces
top_5_provinces = provinces_canada.take(5)

print("\nTOP 5 PROVINCES CANADIENNES")
print("-" * 60)
for i, row in enumerate(top_5_provinces, 1):
    province = row['PROVINCE_STATE_NAME']
    cas = row['TOTAL_CAS'] or 0
    deces = row['TOTAL_DECES'] or 0
    print(f"{i}. {province:<25} - Cas: {cas:>10,} - Deces: {deces:>8,}")

In [None]:
# Visualisation : Provinces canadiennes
df_provinces_pd = provinces_canada.toPandas()
df_provinces_pd = df_provinces_pd[df_provinces_pd['PROVINCE_STATE_NAME'].notna()]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))

# Graphique en barres - Cas par province
colors_bar = plt.cm.Blues(range(50, 250, int(200/len(df_provinces_pd))))
bars = ax1.barh(df_provinces_pd['PROVINCE_STATE_NAME'][::-1], 
                df_provinces_pd['TOTAL_CAS'][::-1],
                color=colors_bar)
ax1.set_xlabel('Nombre de cas', fontsize=12)
ax1.set_ylabel('Province', fontsize=12)
ax1.set_title('Cas COVID-19 par province canadienne', fontsize=14, fontweight='bold')

# Graphique taux de mortalite
colors_mortality = ['indianred' if x > 2 else 'orange' if x > 1 else 'green' 
                    for x in df_provinces_pd['TAUX_MORTALITE'].fillna(0)]
ax2.barh(df_provinces_pd['PROVINCE_STATE_NAME'][::-1], 
         df_provinces_pd['TAUX_MORTALITE'][::-1].fillna(0),
         color=colors_mortality[::-1])
ax2.set_xlabel('Taux de mortalité (%)', fontsize=12)
ax2.set_ylabel('Province', fontsize=12)
ax2.set_title('Taux de mortalité par province canadienne', fontsize=14, fontweight='bold')
ax2.axvline(x=2, color='red', linestyle='--', alpha=0.5, label='Seuil critique (2%)')
ax2.legend()

plt.tight_layout()
plt.savefig('/home/jovyan/output/graphique_provinces_canada.png', dpi=150, bbox_inches='tight')
plt.show()

print("Graphique sauvegarde : graphique_provinces_canada.png")

---
## 5. Résumé des Transformations et Actions

### Transformations utilisées (6)
| Transformation | Description | Utilisation |
|---------------|-------------|-------------|
| `select()` | Sélection de colonnes | Réduction aux colonnes pertinentes |
| `filter()` | Filtrage des lignes | Suppression des nulls, filtre Canada |
| `withColumn()` | Ajout/modification de colonnes | Création YEAR, MONTH, taux mortalité |
| `groupBy()` | Regroupement | Agrégation par pays, continent, province |
| `orderBy()` | Tri | Classement décroissant |
| `agg()` | Agrégations multiples | sum(), max(), count() |

### Actions utilisées (3)
| Action | Description | Utilisation |
|--------|-------------|-------------|
| `count()` | Comptage des lignes | Vérification du dataset |
| `show()` | Affichage des données | Visualisation des résultats |
| `collect()`/`take()` | Récupération des données | Analyse personnalisée |

---
## 6. Sauvegarde des Résultats

In [None]:
# Sauvegarde des resultats en CSV
print("SAUVEGARDE DES RESULTATS")
print("-" * 60)

# Top 10 pays
top_10_pays.coalesce(1).write.mode("overwrite").csv(
    "/home/jovyan/output/top_10_pays", header=True)
print("Fichier sauvegarde : top_10_pays")

# Evolution mensuelle
evolution_mensuelle.coalesce(1).write.mode("overwrite").csv(
    "/home/jovyan/output/evolution_mensuelle", header=True)
print("Fichier sauvegarde : evolution_mensuelle")

# Distribution par continent
taux_mortalite_continent.coalesce(1).write.mode("overwrite").csv(
    "/home/jovyan/output/distribution_continent", header=True)
print("Fichier sauvegarde : distribution_continent")

# Provinces Canada
provinces_canada.coalesce(1).write.mode("overwrite").csv(
    "/home/jovyan/output/provinces_canada", header=True)
print("Fichier sauvegarde : provinces_canada")

print("\nTous les resultats ont ete sauvegardes dans /home/jovyan/output/")

---
## 7. Fermeture de la Session

In [None]:
# Liberation des ressources
df_covid.unpersist()
spark.stop()
print("Session Spark fermee")

---
## 8. Conclusions

### Résultats clés
1. **Pays les plus touchés** : Les grandes économies avec de fortes populations concentrent le plus de cas
2. **Évolution temporelle** : Des vagues successives sont observables avec des pics saisonniers
3. **Distribution continentale** : Des disparités importantes existent entre continents
4. **Canada** : Ontario et Québec concentrent la majorité des cas canadiens

### Limites de l'analyse
- Méthodes de comptage variables selon les pays
- Données potentiellement incomplètes pour certaines régions
- Absence de normalisation par population

### Pistes d'amélioration
- Normaliser les données par 100 000 habitants
- Croiser avec des données démographiques et économiques
- Implémenter des modèles prédictifs avec Spark MLlib