<a href="https://colab.research.google.com/github/nadiaSylla/Classification-of-flu-vaccine-centers-in-France/blob/main/Projet_IBDS_Sylla_Nadia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Projet IBDS
## Classification des centres de vaccins contre la grippe en France
Par SYLLA Nadia

Les données proviennent respectivement de:

- https://www.data.gouv.fr/fr/datasets/lieux-de-vaccination-contre-la-grippe-pharmacies-sante-fr/(Data centres de vaccination)

- https://explore.data.gouv.fr/tableau/?url=https%3A%2F%2Fwww.data.gouv.fr%2Ffr%2Fdatasets%2Fr%2Fdbe8a621-a9c4-4bc3-9cae-be1699c5ff25 (data code postaux)

## Importation des librairies pyspark and initialisation de la spark session

In [None]:
## please don't load this code if you are not using google colab
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
## please don't load this code if you are not using google colab
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
## please don't load this code if you are not using google colab
!ls

communes-departement-region.csv  sample_data		    spark-3.1.1-bin-hadoop3.2.tgz
data_IBDS.csv			 spark-3.1.1-bin-hadoop3.2


In [None]:
import findspark
findspark.init()

import pyspark

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, monotonically_increasing_id
from datetime import datetime

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, MultilayerPerceptronClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

from pyspark import SparkConf, SparkContext, SQLContext


spark = SparkSession \
    .builder \
    .appName("Projet") \
    .config('spark.sql.warehouse.dir', 'file:///random/path/as/we/need/to/config/this/but/dont/use/it') \
    .config('spark.executor.instances', 10) \
    .getOrCreate()


## Lectures des fichiers csv avec spark et creation de dataFrame

In [None]:
## lecture des csv
fileVaccin = 'data_IBDS.csv'
DataVaccin = spark.read.csv(fileVaccin, header=True, inferSchema=True, sep=";")

fileCommune = 'communes-departement-region.csv'
DataCommune = spark.read.csv(fileCommune, header=True, inferSchema=True, sep=",")

In [None]:
# afficher le schema de la dataframe 1
print(DataVaccin.count())
DataVaccin.printSchema()
DataVaccin.show()

2825
root
 |-- Finess: string (nullable = true)
 |-- Titre: string (nullable = true)
 |-- Adresse_voie 1: string (nullable = true)
 |-- Adresse_voie 2: string (nullable = true)
 |-- Adresse_codepostal: integer (nullable = true)
 |-- Adresse_ville: string (nullable = true)
 |-- Modalites_accueil: string (nullable = true)
 |-- Adresse_latitude: string (nullable = true)
 |-- Adresse_longitude: string (nullable = true)

+---------+--------------------+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------------+
|   Finess|               Titre|      Adresse_voie 1|Adresse_voie 2|Adresse_codepostal|       Adresse_ville|   Modalites_accueil|    Adresse_latitude|Adresse_longitude|
+---------+--------------------+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------------+
|750020927|Pharmacie Dang Trung|237 Rue de Charenton|          null|     

In [None]:
# afficher le schema de la dataFrame 2
print(DataCommune.count())
DataCommune.printSchema()
DataCommune.show()

39201
root
 |-- code_commune_INSEE: string (nullable = true)
 |-- nom_commune_postal: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- libelle_acheminement: string (nullable = true)
 |-- ligne_5: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- code_commune: integer (nullable = true)
 |-- article: string (nullable = true)
 |-- nom_commune: string (nullable = true)
 |-- nom_commune_complet: string (nullable = true)
 |-- code_departement: string (nullable = true)
 |-- nom_departement: string (nullable = true)
 |-- code_region: integer (nullable = true)
 |-- nom_region: string (nullable = true)

+------------------+--------------------+-----------+--------------------+---------+-------------+-------------+------------+-------+--------------------+--------------------+----------------+---------------+-----------+--------------------+
|code_commune_INSEE|  nom_commune_postal|code_postal|libelle_achemineme

## Préparation des données

In [None]:
## Retrait des features non exploitables pour l'etude

DataVaccin = DataVaccin.drop('Finess', 'Modalites_accueil','Adresse_ville','Adresse_voie 2', 'Adresse_voie 1')
DataCommune = DataCommune.drop('code_commune_INSEE', 'libelle_acheminement', 'ligne_5', 'code_commune','article',
                               'nom_commune', 'nom_commune_complet', 'code_departement', 'code_departement')

In [None]:
## forcer le type des donnees long. et lat. en double

DataVaccin= DataVaccin.withColumn("Adresse_latitude", col("Adresse_latitude").cast("Double"))
DataVaccin= DataVaccin.withColumn("Adresse_longitude", col("Adresse_longitude").cast("Double"))

In [None]:
DataVaccin.printSchema()
DataCommune.printSchema()
DataVaccin.show()
DataCommune.show()

root
 |-- Titre: string (nullable = true)
 |-- Adresse_codepostal: integer (nullable = true)
 |-- Adresse_latitude: double (nullable = true)
 |-- Adresse_longitude: double (nullable = true)

root
 |-- nom_commune_postal: string (nullable = true)
 |-- code_postal: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- nom_departement: string (nullable = true)
 |-- code_region: integer (nullable = true)
 |-- nom_region: string (nullable = true)

+--------------------+------------------+----------------+-----------------+
|               Titre|Adresse_codepostal|Adresse_latitude|Adresse_longitude|
+--------------------+------------------+----------------+-----------------+
|Pharmacie Dang Trung|             75012|       48.837516|         2.391121|
|Pharmacie Lemaitr...|             75018|       48.891153|         2.340128|
| Pharmacie Du Relais|             91210|       48.677829|         2.411785|
| Pharmacie Caumartin|           

In [None]:
## retrait des donnees Null dans la database primaire
DataVaccin= DataVaccin.na.drop()

In [None]:
# Verification du nombre de ligne
DataVaccin.count()

2632

In [None]:
## Ajout de la colonne Region code dans la dataframe principale DataVaccin par Jointure des deux dataframes

# creation d'un index pour l'extraction des donnees apres jointure
DataVaccin= DataVaccin.withColumn("index", monotonically_increasing_id())

# jointure avec left outer
joined_dataset = DataVaccin.join(DataCommune, DataVaccin['Adresse_codepostal']== DataCommune['code_postal'], "left_outer")

# la colonne avec laquelle on pourra extraire les donnees de facon unique
column_to_select_distinct = 'index'

# extraction des donnees des centres de vaccin avec leurs codes regionales
df = joined_dataset.select(column_to_select_distinct, 'Titre','Adresse_codepostal', 'Adresse_latitude', 'Adresse_longitude',
                           'code_region', 'nom_region').distinct().orderBy(column_to_select_distinct)


In [None]:
## verification des resultats apres jointure

print(df.count())
df.printSchema()
df.show()

2632
root
 |-- index: long (nullable = false)
 |-- Titre: string (nullable = true)
 |-- Adresse_codepostal: integer (nullable = true)
 |-- Adresse_latitude: double (nullable = true)
 |-- Adresse_longitude: double (nullable = true)
 |-- code_region: integer (nullable = true)
 |-- nom_region: string (nullable = true)

+-----+--------------------+------------------+----------------+-----------------+-----------+-------------+
|index|               Titre|Adresse_codepostal|Adresse_latitude|Adresse_longitude|code_region|   nom_region|
+-----+--------------------+------------------+----------------+-----------------+-----------+-------------+
|    0|Pharmacie Dang Trung|             75012|       48.837516|         2.391121|         11|Île-de-France|
|    1|Pharmacie Lemaitr...|             75018|       48.891153|         2.340128|         11|Île-de-France|
|    2| Pharmacie Du Relais|             91210|       48.677829|         2.411785|         11|Île-de-France|
|    3| Pharmacie Caumartin|

In [None]:
## Retrait des valeurs Null dans la database

df = df.na.drop()
print(df.count())


2630


In [None]:
from pyspark.sql.functions import countDistinct

result = (
    df.groupBy("nom_region")
      .agg({"index": "count"})
)
# Perform the count
distinct_count = result.agg(countDistinct("nom_region").alias('distinct_count')).collect()[0]['distinct_count']

# Show the result
result.show()
print(f"The number of region is: {distinct_count}")

+--------------------+------------+
|          nom_region|count(index)|
+--------------------+------------+
|    Pays de la Loire|         161|
|Bourgogne-Franche...|         157|
| Centre-Val de Loire|         114|
|Auvergne-Rhône-Alpes|         305|
|           Grand Est|         194|
|              Guyane|           2|
|          Martinique|           2|
|     Hauts-de-France|         260|
|  Nouvelle-Aquitaine|         260|
|               Corse|          26|
|       Île-de-France|         380|
|          Guadeloupe|          16|
|           Occitanie|         213|
|             Mayotte|           1|
|            Bretagne|         154|
|           Normandie|         124|
|          La Réunion|          26|
|Provence-Alpes-Cô...|         235|
+--------------------+------------+

The number of region is: 18


## Creation de transformers
TF-IDF, VectorAssembler et StringIndexer

### TF-IDF

In [None]:
## definit les transformers pour le TF-IDF

tokenizer = Tokenizer(inputCol="Titre", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashingTF = HashingTF(inputCol="filtered_words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="WordFeature")


### VectorAssembler

In [None]:
# assemblage des features en vecteur spark

assembler = VectorAssembler(inputCols=["Adresse_codepostal", "Adresse_latitude", "Adresse_longitude"],
                            outputCol="features")

### StringIndexer

In [None]:
# turn 'code region' into an index pour la classification

indexer = StringIndexer(inputCol="code_region", outputCol="label")

## Classification
(Random forest classifier et Decision Tree Classifier)

In [None]:
# Test de 2 classifiers

rf= RandomForestClassifier(labelCol="label", featuresCol="features", impurity='gini', maxBins=32)
lr= LogisticRegression(labelCol="label", featuresCol="features", maxIter=10, regParam=0.01)

## Pipeline

In [None]:
# Pipeline pour les transformations (pre-processing part)
# in order to make the code more organized, maintainable, and reusable, I'm not including classifiers.

pipeline_transformers = Pipeline(stages=[tokenizer, remover,
                                         hashingTF, idf, assembler, indexer])
model = pipeline_transformers.fit(df)
df_transformed = model.transform(df)

## Entrainement des modèles et évaluations

In [None]:
## creation de data test et data train (80% training, 20% testing)

(training_data, test_data) = df_transformed.randomSplit([0.8, 0.2], seed=123)

In [None]:
# Create a Pipeline with the RandomForestClassifier
pipeline_rf = Pipeline(stages=[rf])

# Start the timer
start_time = datetime.now()

# Train the model on the training set
model_rf = pipeline_rf.fit(training_data)

# Stop the timer
end_time = datetime.now()
elapsed_time_1 = (end_time - start_time).total_seconds()

# Print the time taken to fit the model
print(f"Model fitting took {elapsed_time_1} seconds.")

# Make predictions on the test set
predictions_rf = model_rf.transform(test_data)

# Evaluate the model's performance on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_rf = evaluator.evaluate(predictions_rf)
precision_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "weightedPrecision"})
recall_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "weightedRecall"})
f1_rf = evaluator.evaluate(predictions_rf, {evaluator.metricName: "f1"})

print(f"Accuracy on Test Set: {accuracy_rf}")
print(f"Precision on Test Set: {precision_rf}")
print(f"Recall on Test Set: {recall_rf}")
print(f"F1 Score on Test Set: {f1_rf}")

Model fitting took 56.193381 seconds.
Accuracy on Test Set: 0.9505703422053232
Precision on Test Set: 0.9519212000930103
Recall on Test Set: 0.9505703422053231
F1 Score on Test Set: 0.9508601201533273


In [None]:
# Create a Pipeline with the RandomForestClassifier
pipeline_lr = Pipeline(stages=[lr])

# Start the timer
start_time = datetime.now()

# Train the model on the training set
model_lr = pipeline_lr.fit(training_data)

# Stop the timer
end_time = datetime.now()
elapsed_time_2 = (end_time - start_time).total_seconds()

# Print the time taken to fit the model
print(f"Model fitting took {elapsed_time_2} seconds.")

# Make predictions on the test set
predictions_lr = model_lr.transform(test_data)

# Evaluate the model's performance on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_lr = evaluator.evaluate(predictions_lr)
precision_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "weightedPrecision"})
recall_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "weightedRecall"})
f1_lr = evaluator.evaluate(predictions_lr, {evaluator.metricName: "f1"})

print(f"Accuracy on Test Set: {accuracy_lr}")
print(f"Precision on Test Set: {precision_lr}")
print(f"Recall on Test Set: {recall_lr}")
print(f"F1 Score on Test Set: {f1_lr}")

Model fitting took 39.508834 seconds.
Accuracy on Test Set: 0.4011406844106464
Precision on Test Set: 0.2611600069790855
Recall on Test Set: 0.4011406844106464
F1 Score on Test Set: 0.30415005249286464


In [None]:
import pandas as pd

data = {"Model": ["RandomForest", "LogisticRegression"],
        "Accuracy": [accuracy_rf, accuracy_lr],
        "Precision": [precision_rf, precision_lr],
        "recall": [recall_rf, recall_lr],
        "F1 Score": [f1_rf, f1_lr],
        "run Time": [elapsed_time_1, elapsed_time_2]}

pandas_df = pd.DataFrame(data)

# Display the Pandas DataFrame
print(pandas_df)

                Model  Accuracy  Precision    recall  F1 Score   run Time
0        RandomForest  0.950570   0.951921  0.950570   0.95086  56.193381
1  LogisticRegression  0.401141   0.261160  0.401141   0.30415  39.508834


In [None]:
spark.stop()