# Classification binaire avec MLIB


Ce notebook est conçu pour vous aider à démarrer avec Apache Spark MLlib.

Nous allons étudier un problème de classification binaire:
**Prédire si le revenu d'un individu est supérieur à 50 000 $** à partir d'une base de données démographiques? 

L'ensemble de données provient du référentiel [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult) et est fourni avec Databricks Runtime. 

Ce bloc-notes présente certaines des fonctionnalités disponibles dans MLlib: notamment des outils de prétraitement des données, des pipelines d'apprentissage automatique et plusieurs algorithmes d'apprentissage automatique.

<b>Ce cahier comprend les étapes suivantes:</b> 

0. Charger le jeu de données
0. Nettoayage des données
0. Définir le modèle
0. Construisez le pipeline
0. Évaluer le modèle
0. Réglage des hyperparamètres
0. Faire des prédictions et évaluer les performances du modèle

# Étape 0. Chargez l'ensemble de données

Afficher les premières lignes des données.

In [None]:
%fs head --maxBytes=1024 databricks-datasets/adult/adult.data

Créer un schéma pour attribuer des noms de colonne et des types de données.

In [None]:
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)

Divisez aléatoirement les données en ensembles d'apprentissage et de test.

In [None]:
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)
print(trainDF.cache().count()) # Cache car accès aux données d'entraînement plusieurs fois
print(testDF.count())

# Visualiser les données

In [None]:
display(trainDF.select("hours_per_week").summary())

In [None]:
display(trainDF
        .groupBy("education")
        .count()
        .sort("count", ascending=False))

# Transformers, estimators, and pipelines

-**Transformer:** prend un DataFrame comme entrée et renvoie un nouveau DataFrame. Les transformateurs n'apprennent aucun paramètre à partir des données et appliquent simplement des transformations basées sur des règles pour **préparer les données** pour l'apprentissage du modèle **ou générer des prédictions** à l'aide d'un modèle MLlib entraîné. Vous appelez un transformateur avec une méthode .transform ().

-**Estimator:** **apprend** (ou «ajuste») les paramètres de votre DataFrame via une méthode .fit () et renvoie un modèle, qui est un transformateur.

-**Pipeline:** combine plusieurs étapes dans un flux de travail unique qui peut être facilement exécuté. La création d'un modèle d'apprentissage automatique implique généralement la configuration de nombreuses étapes différentes et leur itération. Les pipelines vous aident à automatiser ce processus.

[ML Pipelines](https://spark.apache.org/docs/latest/ml-pipeline.html#ml-pipelines)

# Étape 1. Nettoayage des données
On veut construire un modèle qui prédit le niveau de revenu à partir des caractéristiques incluses dans l'ensemble de données (niveau d'éducation, état matrimonial, profession, etc.). La première étape consiste à manipuler ou prétraiter les données afin qu'elles soient au format requis par MLlib.

## Convertir les variables catégorielles en variables numériques

Certains algorithmes d'apprentissage automatique, tels que la régression linéaire et logistique, nécessitent des fonctionnalités numériques. L'ensemble de données Adultes comprend des caractéristiques catégoriques telles que l'éducation, la profession et l'état matrimonial.

Le bloc de code suivant illustre comment utiliser `StringIndexer` et` OneHotEncoder` pour convertir des variables catégorielles en un ensemble de variables numériques qui ne prennent que les valeurs 0 et 1.

- `StringIndexer` convertit une colonne de valeurs strings en une colonne d'index d'étiquette. Par exemple, il peut convertir les valeurs «rouge», «bleu» et «vert» en 0, 1 et 2.
- `OneHotEncoder` mappe une colonne d'indices de catégorie à une colonne de vecteurs binaires, avec au plus un" 1 "dans chaque ligne qui indique l'index de catégorie pour cette ligne.

L'encodage One-hot dans Spark est un processus en deux étapes. Vous utilisez d'abord le StringIndexer, suivi de OneHotEncoder. Le bloc de code suivant définit le StringIndexer et OneHotEncoder mais ne l'applique pas encore à des données.

[StringIndexer](http://spark.apache.org/docs/latest/ml-features.html#stringindexer)   
[OneHotEncoder](https://spark.apache.org/docs/latest/ml-features.html#onehotencoder)

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]) 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 

# The label column ("income") is also a string value - it has two possible values, "<=50K" and ">50K". 
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

Nous allons créer un pipeline combinant toutes nos étapes de nettoayage et apprentisage. Mais prenons une minute pour examiner de plus près le fonctionnement des estimateurs et des transformateurs en appliquant l'estimateur `stringIndexer` que nous avons créé dans le bloc de code précédent.

Vous pouvez appeler la méthode `.fit ()` pour renvoyer un `StringIndexerModel`, que vous pouvez ensuite utiliser pour transformer l'ensemble de données.

La méthode `.transform ()` de `StringIndexerModel` renvoie un nouveau DataFrame avec les nouvelles colonnes ajoutées. Faites défiler vers la droite pour voir les nouvelles colonnes si nécessaire.

Pour plus d'informations: [StringIndexerModel]

In [None]:
stringIndexerModel = stringIndexer.fit(trainDF)
display(stringIndexerModel.transform(trainDF))

### Combine toutes les colonnes d'entités en un seul vecteur d'entités

La plupart des algorithmes MLlib nécessitent une seule colonne de fonctionnalités en entrée. Chaque ligne de cette colonne contient un vecteur de points de données correspondant à l'ensemble des caractéristiques utilisées pour la prédiction.

MLlib fournit le transformateur `VectorAssembler` pour créer une seule colonne vectorielle à partir d'une liste de colonnes.

Le bloc de code suivant montre comment utiliser VectorAssembler.

Pour plus d'informations: [VectorAssembler] (https://spark.apache.org/docs/latest/ml-features.html#vectorassembler)

In [1]:
from pyspark.ml.feature import VectorAssembler

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

ModuleNotFoundError: No module named 'pyspark'

## Étape 2. Définir le modèle d'apprentisage

On utilise un modèle de [régression logistique] (https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression).

## Étape 3. Construisez le pipeline

Un «Pipeline» est une liste ordonnée de transformateurs et d'estimateurs. Vous pouvez définir un pipeline pour automatiser et garantir la répétabilité des transformations à appliquer à un ensemble de données. Dans cette étape, nous définissons le pipeline, puis nous l'appliquons à l'ensemble de données de test.

Semblable à ce que nous avons vu avec `StringIndexer`, un` Pipeline` est un estimateur. La méthode `pipeline.fit ()` renvoie un `PipelineModel`, qui est un transformateur.

Pour plus d'informations:
[Pipeline] (https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline)
[PipelineModel] (https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/PipelineModel.html)

In [2]:
from pyspark.ml import Pipeline

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

ModuleNotFoundError: No module named 'pyspark'

Display the predictions from the model. The features column is a sparse vector, which is often the case after one-hot encoding, because there are so many 0 values.

In [3]:
display(predDF.select("features", "label", "prediction", "probability"))

NameError: name 'predDF' is not defined

## Étape 4. Évaluer le modèle

La commande `display` a une option de courbe ROC intégrée.

To evaluate the model, we use the `BinaryClassificationEvaluator` to evalute the area under the ROC curve and the `MulticlassClassificationEvaluator` to evalute the accuracy.

For more information:  
[BinaryClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.BinaryClassificationEvaluator)  
[MulticlassClassificationEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.MulticlassClassificationEvaluator)

In [None]:
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")

mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

## Étape 5. Réglage des hyperparamètres

MLlib fournit des méthodes pour faciliter le réglage des hyperparamètres et la validation croisée.
- Pour le réglage des hyperparamètres, `ParamGridBuilder` vous permet de définir une recherche de grille sur un ensemble d'hyperparamètres de modèle.
- Pour la validation croisée, `CrossValidator` vous permet de spécifier un estimateur (le pipeline à appliquer à l'ensemble de données d'entrée), un évaluateur, un espace de grille d'hyperparamètres et le nombre de plis à utiliser pour la validation croisée.
  
Pour plus d'informations:
[Sélection du modèle par validation croisée] (https://spark.apache.org/docs/latest/ml-tuning.html)
[ParamGridBuilder] (https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.tuning)
[CrossValidator] (https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)

Utilisez `ParamGridBuilder` et` CrossValidator` pour régler le modèle. Cet exemple utilise trois valeurs pour `regParam` et trois pour` elasticNetParam`, pour un total de 3 x 3 = 9 combinaisons d'hyperparamètres pour `CrossValidator` à examiner.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

Chaque fois que vous appelez `CrossValidator` dans MLlib, Databricks suit automatiquement toutes les exécutions en utilisant [MLflow] (https://mlflow.org/). Vous pouvez utiliser l'interface utilisateur de MLflow ([AWS] (https://docs.databricks.com/applications/mlflow/index.html) | [Azure] (https://docs.microsoft.com/azure/databricks/applications/ mlflow /)) pour comparer les performances de chaque modèle.

Dans cet exemple, nous utilisons le pipeline que nous avons créé comme estimateur.

In [None]:
# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)

# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

## Étape 6. Faire des prédictions et évaluer les performances du modèle
Utilisez le meilleur modèle identifié par la validation croisée pour faire des prédictions sur l'ensemble de données de test, puis évaluez les performances du modèle en utilisant la zone sous la courbe ROC.

In [None]:
# Use the model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)

# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")