
# ***Introduction:***

Le changement climatique est l'un des défis majeurs de notre époque, et la réduction des émissions de CO2 est cruciale pour atténuer ses effets. Dans ce projet, nous avons entrepris une analyse des émissions de CO2 des véhicules au Canada en utilisant des techniques de machine learning, plus spécifiquement le clustering. Notre objectif était de segmenter les véhicules en différents clusters basés sur leurs caractéristiques et leurs émissions de CO2. Pour réaliser cette tâche, nous avons utilisé Apache Spark, un framework de traitement de données massivement parallèle qui nous permet de gérer et de traiter efficacement de grandes quantités de données.

**Binome : **


*   Dounia KAMEL
*   Selsabile KACHA



## Initialisation de la Session Spark
Ce bloc de code importe la bibliothèque nécessaire et initialise une session Spark en mode local avec toutes les ressources disponibles, et nomme l'application "Intro".

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Intro") \
    .getOrCreate()

## Définition du Schéma de Données

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

custom_schema = StructType([
    StructField("Make", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("Vehicle Class", StringType(), True),
    StructField("Cylinders", DoubleType(), True),
    StructField("Transmission", StringType(), True),
    StructField("Fuel Type", StringType(), True),
    StructField("Fuel Consumption City (L/100 km)", DoubleType(), True),
    StructField("Fuel Consumption Hwy (L/100 km)", DoubleType(), True),
    StructField("Fuel Consumption Comb (L/100 km)", DoubleType(), True),
    StructField("Fuel Consumption Comb (mpg)", DoubleType(), True),
    StructField("CO2", DoubleType(), True)])



## Chargement des Données avec un Schéma Personnalisé

In [None]:
co2_data = spark.read.format("csv")\
    .schema(custom_schema) \
    .option("header", True) \
    .load("CO2_Emissions_Canada.csv")

In [None]:
co2_data.take(2)

[Row(Make='ACURA', Model='ILX', Vehicle Class='COMPACT', Cylinders=2.0, Transmission='4', Fuel Type='AS5', Fuel Consumption City (L/100 km)=None, Fuel Consumption Hwy (L/100 km)=9.9, Fuel Consumption Comb (L/100 km)=6.7, Fuel Consumption Comb (mpg)=8.5, CO2=33.0),
 Row(Make='ACURA', Model='ILX', Vehicle Class='COMPACT', Cylinders=2.4, Transmission='4', Fuel Type='M6', Fuel Consumption City (L/100 km)=None, Fuel Consumption Hwy (L/100 km)=11.2, Fuel Consumption Comb (L/100 km)=7.7, Fuel Consumption Comb (mpg)=9.6, CO2=29.0)]

In [None]:
cols_only_continues_values = {'Fuel Consumption City (L/100 km)':0}
#                               "Fuel Consumption Hwy (L/100 km)",
#         "Fuel Consumption Comb (L/100 km)"}

In [None]:
co2_data = co2_data.fillna(0.0)

In [None]:
co2_data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle Class: string (nullable = true)
 |-- Cylinders: double (nullable = false)
 |-- Transmission: string (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Consumption City (L/100 km): double (nullable = false)
 |-- Fuel Consumption Hwy (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (mpg): double (nullable = false)
 |-- CO2: double (nullable = false)



In [None]:
co2_data.take(2)

[Row(Make='ACURA', Model='ILX', Vehicle Class='COMPACT', Cylinders=2.0, Transmission='4', Fuel Type='AS5', Fuel Consumption City (L/100 km)=0.0, Fuel Consumption Hwy (L/100 km)=9.9, Fuel Consumption Comb (L/100 km)=6.7, Fuel Consumption Comb (mpg)=8.5, CO2=33.0),
 Row(Make='ACURA', Model='ILX', Vehicle Class='COMPACT', Cylinders=2.4, Transmission='4', Fuel Type='M6', Fuel Consumption City (L/100 km)=0.0, Fuel Consumption Hwy (L/100 km)=11.2, Fuel Consumption Comb (L/100 km)=7.7, Fuel Consumption Comb (mpg)=9.6, CO2=29.0)]

## Preparation des données pour la régression

transformer les feature colonnes en colonnes indexés :

In [None]:
from pyspark.ml.feature import FeatureHasher
from pyspark.sql.functions import col

cols = ["Make", "Model", "Vehicle Class","Cylinders","Transmission","Fuel Type",
        "Fuel Consumption City (L/100 km)", "Fuel Consumption Hwy (L/100 km)",
        "Fuel Consumption Comb (L/100 km)","Fuel Consumption Comb (mpg)"]

cols_only_continues = ["Fuel Consumption City (L/100 km)", "Fuel Consumption Hwy (L/100 km)",
        "Fuel Consumption Comb (L/100 km)"]

hasher = FeatureHasher(outputCol="hashed_features", inputCols=cols_only_continues)
data = hasher.transform(co2_data)



In [None]:
data.select("hashed_features").show(5, truncate=False)

+---------------------------------------------+
|hashed_features                              |
+---------------------------------------------+
|(262144,[38607,109231,228390],[0.0,9.9,6.7]) |
|(262144,[38607,109231,228390],[0.0,11.2,7.7])|
|(262144,[38607,109231,228390],[0.0,6.0,5.8]) |
|(262144,[38607,109231,228390],[0.0,12.7,9.1])|
|(262144,[38607,109231,228390],[0.0,12.1,8.7])|
+---------------------------------------------+
only showing top 5 rows



In [None]:
data.select("hashed_features").take(1)

[Row(hashed_features=SparseVector(262144, {38607: 0.0, 109231: 9.9, 228390: 6.7}))]

In [None]:
data.select("hashed_features").show(5, truncate=False)

+---------------------------------------------+
|hashed_features                              |
+---------------------------------------------+
|(262144,[38607,109231,228390],[0.0,9.9,6.7]) |
|(262144,[38607,109231,228390],[0.0,11.2,7.7])|
|(262144,[38607,109231,228390],[0.0,6.0,5.8]) |
|(262144,[38607,109231,228390],[0.0,12.7,9.1])|
|(262144,[38607,109231,228390],[0.0,12.1,8.7])|
+---------------------------------------------+
only showing top 5 rows



In [None]:
data.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle Class: string (nullable = true)
 |-- Cylinders: double (nullable = false)
 |-- Transmission: string (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Consumption City (L/100 km): double (nullable = false)
 |-- Fuel Consumption Hwy (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (mpg): double (nullable = false)
 |-- CO2: double (nullable = false)
 |-- hashed_features: vector (nullable = true)



# Séléctionner les features les plus importantes:

Ce bloc de code utilise le sélecteur de caractéristiques univariées (UnivariateFeatureSelector) pour sélectionner les caractéristiques les plus pertinentes à partir de hashed_features en fonction de leur relation avec l'étiquette CO2. Les caractéristiques sélectionnées sont stockées dans une nouvelle colonne nommée selectedFeatures.

In [None]:
from pyspark.ml.feature import UnivariateFeatureSelector

selector = UnivariateFeatureSelector(outputCol="selectedFeatures", featuresCol="hashed_features", labelCol="CO2")

selector.setFeatureType("continuous")
selector.setLabelType("continuous")

model = selector.fit(data)
data = model.transform(data)

 ## Modélisation de Latent Dirichlet Allocation (LDA)

In [None]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA


lda = LDA(k=2, seed=1, optimizer="em",featuresCol="selectedFeatures")
lda.setMaxIter(100)


lda.clear(lda.maxIter)
lda_model = lda.fit(data)
lda_model.setSeed(1)

# check if the model itself is distributed across Spark executres
lda_model.isDistributed()

True

In [None]:
lda_model.describeTopics().show()

+-----+-----------+--------------------+
|topic|termIndices|         termWeights|
+-----+-----------+--------------------+
|    0|   [48, 49]|[0.58104675033297...|
|    1|   [48, 49]|[0.58168999987474...|
+-----+-----------+--------------------+



In [None]:
lda_model.vocabSize()

50

In [None]:
lda_predictions = lda_model.transform(data)

In [None]:
lda_predictions.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle Class: string (nullable = true)
 |-- Cylinders: double (nullable = false)
 |-- Transmission: string (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Consumption City (L/100 km): double (nullable = false)
 |-- Fuel Consumption Hwy (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (mpg): double (nullable = false)
 |-- CO2: double (nullable = false)
 |-- hashed_features: vector (nullable = true)
 |-- selectedFeatures: vector (nullable = true)
 |-- topicDistribution: vector (nullable = true)



In [None]:
lda_predictions.select("topicDistribution").show(2,truncate=False)

+----------------------------------------+
|topicDistribution                       |
+----------------------------------------+
|[0.4999956369386229,0.5000043630613772] |
|[0.49999497253338004,0.5000050274666199]|
+----------------------------------------+
only showing top 2 rows



## Modélisation avec kmeans KMeans

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans



kmeans = KMeans(k=3)
kmeans.setSeed(10)
kmeans.setFeaturesCol("selectedFeatures")

kmeans_model = kmeans.fit(data)
kmeans_model.getDistanceMeasure()




'euclidean'

In [None]:
kmeans_predictions = kmeans_model.transform(data)

In [None]:
kmeans_predictions.select("prediction").show(5, truncate=True)

+----------+
|prediction|
+----------+
|         0|
|         0|
|         0|
|         2|
|         2|
+----------+
only showing top 5 rows



In [None]:
kmeans_predictions.select("prediction").distinct().show()

+----------+
|prediction|
+----------+
|         1|
|         2|
|         0|
+----------+



In [None]:
summary = kmeans_model.summary

In [None]:
summary.cluster.printSchema()

root
 |-- prediction: integer (nullable = false)



## Modélisation avec GaussianMixture

In [None]:
from pyspark.ml.clustering import GaussianMixture

gm = GaussianMixture(k=42, tol=0.01, seed=10, featuresCol="selectedFeatures", maxIter=100)
gm_model = gm.fit(data)

gm_predictions = gm_model.transform(data)

afficher les params du modèle en utilisant `explainParams()` functionalité:

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent=4)
params = gm_model.explainParams()
pp.pprint(params)

('aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)\n'
 'featuresCol: features column name. (default: features, current: '
 'selectedFeatures)\n'
 'k: Number of independent Gaussians in the mixture model. Must be > 1. '
 '(default: 2, current: 42)\n'
 'maxIter: max number of iterations (>= 0). (default: 100, current: 100)\n'
 'predictionCol: prediction column name. (default: prediction)\n'
 'probabilityCol: Column name for predicted class conditional probabilities. '
 'Note: Not all models output well-calibrated probability estimates! These '
 'probabilities should be treated as confidences, not precise probabilities. '
 '(default: probability)\n'
 'seed: random seed. (default: 259027761374774626, current: 10)\n'
 'tol: the convergence tolerance for iterative algorithms (>= 0). (default: '
 '0.01, current: 0.01)\n'
 'weightCol: weight column name. If this is not set or empty, we treat all '
 'instance weights as 1.0. (undefined)')


# Création et ajustement du pipeline

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[hasher,selector, gm])
# Fit the pipeline to training data.
pipeline_model = pipeline.fit(co2_data)

In [None]:
transformed_by_pipeline = pipeline_model.transform(co2_data)

In [None]:
transformed_by_pipeline.printSchema()

root
 |-- Make: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle Class: string (nullable = true)
 |-- Cylinders: double (nullable = false)
 |-- Transmission: string (nullable = true)
 |-- Fuel Type: string (nullable = true)
 |-- Fuel Consumption City (L/100 km): double (nullable = false)
 |-- Fuel Consumption Hwy (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (L/100 km): double (nullable = false)
 |-- Fuel Consumption Comb (mpg): double (nullable = false)
 |-- CO2: double (nullable = false)
 |-- hashed_features: vector (nullable = true)
 |-- selectedFeatures: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: integer (nullable = false)



## Evaluation des modèles de clustring

Ce code utilise la classe ClusteringEvaluator de PySpark pour évaluer les performances des modèles de clustering générés par les algorithmes KMeans (kmeans) et GaussianMixture (GM).

- Tout d'abord, un objet evaluator est créé en spécifiant la colonne des caractéristiques sélectionnées (selectedFeatures) comme entrée.
- Ensuite, la colonne de prédiction (prediction) est définie à l'aide de la méthode setPredictionCol.
- Les performances des modèles sont ensuite évaluées à l'aide de la méthode evaluate de l'évaluateur sur les prédictions générées par les modèles KMeans (kmeans_predictions) et - GaussianMixture (gm_predictions) respectivement, en utilisant la distance euclidienne.
- Cela permet de quantifier la qualité des clusters générés par les deux modèles en utilisant une métrique spécifique fournie par la classe ClusteringEvaluator.



In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol='selectedFeatures')
evaluator.setPredictionCol("prediction")

#evaluate with eucliden distance
print("kmeans: "+str(evaluator.evaluate(kmeans_predictions)))
print("GM: "+ str(evaluator.evaluate(gm_predictions)))

kmeans: 0.6791002214675337
GM: -0.1517797715036008


In [None]:
evaluator.isLargerBetter()

True

In [None]:
evaluator.setDistanceMeasure("cosine")
print("kmeans: "+str(evaluator.evaluate(kmeans_predictions)))
print("GM: "+ str(evaluator.evaluate(gm_predictions)))

kmeans: -0.07958234502129219
GM: -0.19012403274289733


In [None]:
evaluator.isLargerBetter()

True

In [None]:
evaluator.explainParams()

"distanceMeasure: The distance measure. Supported options: 'squaredEuclidean' and 'cosine'. (default: squaredEuclidean, current: cosine)\nfeaturesCol: features column name. (default: features, current: selectedFeatures)\nmetricName: metric name in evaluation (silhouette) (default: silhouette)\npredictionCol: prediction column name. (default: prediction, current: prediction)\nweightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (undefined)"

#### Étant donné que la sortie de l'évaluateur pour isLargerBetter était vraie, nous pouvons définir que l'algorithme KMeans a produit un meilleur modèle que GM."

# Experimentation des Hyperparameters et du Tuning


In [None]:
from pyspark.ml.tuning import TrainValidationSplit , ParamGridBuilder

grid = ParamGridBuilder().addGrid(kmeans.maxIter, [20,50,100]).build()

tvs = TrainValidationSplit(estimator=kmeans, estimatorParamMaps=grid, evaluator=evaluator,
                           collectSubModels=True, parallelism=1, seed=42)
tvs_model = tvs.fit(data)
tvs_model.getTrainRatio()



0.75

In [None]:
tvs_model.validationMetrics

[-0.06270405194965402, -0.06402059325959049, -0.06402059325959049]

In [None]:
from pyspark.ml.tuning import TrainValidationSplit , ParamGridBuilder

grid = ParamGridBuilder().addGrid(kmeans.maxIter, [20,50,100]) \
        .addGrid(kmeans.distanceMeasure, ['euclidean','cosine']).build()


In [None]:
tvs_model.validationMetrics

[-0.06270405194965402, -0.06402059325959049, -0.06402059325959049]

In [None]:
from pyspark.ml.tuning import TrainValidationSplit , ParamGridBuilder

grid = ParamGridBuilder().addGrid(kmeans.maxIter, [20,50,100]) \
        .addGrid(kmeans.distanceMeasure, ['euclidean','cosine']) \
        .addGrid(evaluator.distanceMeasure, ['euclidean','cosine']).build()


tvs = TrainValidationSplit(estimator=kmeans, estimatorParamMaps=grid, evaluator=evaluator,
                           collectSubModels=True, parallelism=1, seed=42, trainRatio=0.8)
tvs_model = tvs.fit(data)
tvs_model.validationMetrics



[-0.06292946960479909,
 -0.06292946960479909,
 0.5520132682136769,
 0.5520132682136769,
 -0.06292946960479909,
 -0.06292946960479909,
 0.5520132682136769,
 0.5520132682136769,
 -0.06292946960479909,
 -0.06292946960479909,
 0.5520132682136769,
 0.5520132682136769]

## Ajout d'un evaluateur au grid params:

In [None]:
from pyspark.ml.tuning import TrainValidationSplit , ParamGridBuilder


grid = ParamGridBuilder().addGrid(kmeans.maxIter, [20,50,100]) \
        .addGrid(kmeans.distanceMeasure, ['euclidean','cosine']) \
        .addGrid(evaluator.distanceMeasure, ['euclidean','cosine'])\
        .baseOn({kmeans.featuresCol: 'selectedFeatures'}) \
        .build()

tvs = TrainValidationSplit(estimator=kmeans, estimatorParamMaps=grid, evaluator=evaluator,
                           collectSubModels=True, parallelism=1, seed=42, trainRatio=0.8)
tvs_model = tvs.fit(data)
tvs_model.validationMetrics

[-0.06292946960479909,
 -0.06292946960479909,
 0.5520132682136769,
 0.5520132682136769,
 -0.06292946960479909,
 -0.06292946960479909,
 0.5520132682136769,
 0.5520132682136769,
 -0.06292946960479909,
 -0.06292946960479909,
 0.5520132682136769,
 0.5520132682136769]

In [None]:
tvs_model.subModels

[KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
 KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasur

In [None]:
arr_models = tvs_model.subModels

# Split Les modèles

Dans cette partie du code, une technique cruciale appelée validation croisée est mise en œuvre pour optimiser les paramètres du modèle. La validation croisée, réalisée ici à l'aide de la classe CrossValidator de PySpark, divise les données en plusieurs sous-ensembles et effectue plusieurs entraînements et évaluations du modèle sur différentes combinaisons de ces sous-ensembles. Cela permet d'estimer de manière fiable les performances du modèle et d'ajuster les hyperparamètres pour obtenir les meilleurs résultats possibles. En spécifiant le modèle (kmeans), les paramètres à tester (grid), et l'évaluateur (evaluator), ainsi que d'autres paramètres tels que le nombre de plis (numFolds), la validation croisée permet de comparer objectivement les performances du modèle avec différentes configurations. Enfin, en accédant aux sous-modèles (subModels), nous pouvons examiner les modèles spécifiques générés lors de chaque itération de la validation croisée, ce qui peut fournir des informations précieuses sur la manière dont le modèle fonctionne avec différentes combinaisons de paramètres.

In [None]:
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel


cv = CrossValidator(estimator=kmeans, estimatorParamMaps=grid, evaluator=evaluator,
                           collectSubModels=True,  parallelism=2, numFolds=3)

cv_model = cv.fit(data)
cv_model.subModels

[[KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=euclidean, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, distanceMeasure=cosine, numFeatures=50,
  KMeansModel: uid=KMeans_792430fa1bb8, k=3, di

In [None]:
len(cv_model.subModels)

3

In [None]:
len(cv_model.subModels[0])

12

In [None]:
cv_model.avgMetrics

[-0.08053007809073798,
 -0.08053007809073798,
 0.5989818334335287,
 0.5989818334335287,
 -0.07970600680061901,
 -0.07970600680061901,
 0.5989818334335287,
 0.5989818334335287,
 -0.07970600680061901,
 -0.07970600680061901,
 0.5989818334335287,
 0.5989818334335287]

# ***Conclusion***

Ce projet de clustering des émissions de CO2 des véhicules au Canada a permis de démontrer l'efficacité des techniques de machine learning pour segmenter les données en groupes significatifs. En utilisant Apache Spark, nous avons pu traiter et analyser de grandes quantités de données de manière efficace, ce qui est essentiel pour des applications à grande échelle. Les résultats obtenus avec les algorithmes de clustering tels que KMeans et GaussianMixture, ainsi que l'évaluation de ces modèles, montrent des perspectives prometteuses pour l'identification des caractéristiques influençant les émissions de CO2. En conclusion, ce projet illustre comment l'application des technologies de big data et de machine learning peut contribuer à des analyses environnementales approfondies et à la prise de décisions éclairées pour la réduction des émissions de CO2.