In this section, let us present to you some Machine Learning algorithms, there are many, but 3 algorithms below can be considered as the most popular in Machine Learning :

- 1/ Regression - Linear Regression
- 2/ Classification - Random Forest
- 3/ Clustering - KMeans

This notebook will focus on the first one, we'll take a dataset and then build a linear regression model based on it. 

"Linear regression is the most basic type of regression and commonly used predictive analysis.  The overall idea of regression is to examine two things: (1) does a set of predictor variables do a good job in predicting an outcome variable?  Is the model using the predictors accounting for the variability in the changes in the dependent variable? (2) Which variables in particular are significant predictors of the dependent variable?  And in what way do they--indicated by the magnitude and sign of the beta estimates--impact the dependent variable?  These regression estimates are used to explain the relationship between one dependent variable and one or more independent variables. (3) What is the regression equation that shows how the set of predictor variables can be used to predict the outcome?  The simplest form of the equation with one dependent and one independent variable is defined by the formula y = c + b*x, where y = estimated dependent score, c = constant, b = regression coefficients, and x = independent variable."

(source : http://www.statisticssolutions.com/what-is-linear-regression/)

# Installation et imports

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=14194eb146feae8986f88d9e887bf07471c2f47f8a6712fced7f3c5342686c26
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [22]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.rdd import RDD
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import MultivariateStatisticalSummary, Statistics

from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel

# Download datasets

In [5]:
!gdown https://drive.google.com/uc?id=1sp0G0pLgqn7hk7v6Sx2Il3Tg_nqZuSG4 -O dataset.zip

Downloading...
From: https://drive.google.com/uc?id=1sp0G0pLgqn7hk7v6Sx2Il3Tg_nqZuSG4
To: /content/dataset.zip
  0% 0.00/831k [00:00<?, ?B/s]100% 831k/831k [00:00<00:00, 104MB/s]


In [6]:
!unzip dataset.zip

Archive:  dataset.zip
   creating: dataset/
  inflating: dataset/iris.csv        
  inflating: __MACOSX/dataset/._iris.csv  
  inflating: dataset/titanic.csv     
  inflating: __MACOSX/dataset/._titanic.csv  
  inflating: dataset/wine.csv        
  inflating: __MACOSX/dataset/._wine.csv  
  inflating: dataset/house.csv       
  inflating: __MACOSX/dataset/._house.csv  
  inflating: dataset/mtcars.csv      
  inflating: __MACOSX/dataset/._mtcars.csv  
  inflating: dataset/diabetes.csv    
  inflating: __MACOSX/dataset/._diabetes.csv  


In [7]:
ls ./dataset/

diabetes.csv  house.csv  iris.csv  mtcars.csv  titanic.csv  wine.csv


# Modèle de régression avec Spark Scala

Here we use the dataset from https://gist.github.com/seankross/a412dfbd88b3db70b74b#file-mtcars-csv

The data was extracted from the 1974 Motor Trend US magazine, and comprises fuel consumption and 10 aspects of automobile design and performance for 32 automobiles (1973–74 models).

A data frame with 32 observations on 11 (numeric) variables.

- mpg	: Miles/(US) gallon
- cyl :	Number of cylinders
- disp : Displacement (cu.in.)
- hp : Gross horsepower
- drat : Rear axle ratio
-	wt :	Weight (1000 lbs)
-	qsec : 1/4 mile time
- vs : Engine (0 = V-shaped, 1 = straight)
- am : Transmission (0 = automatic, 1 = manual)
- gear : Number of forward gears
- carb : Number of carburetors

La variable cible est la consommation en carburant des véhicules `mpg Miles/gallon` en fonction des variables explicatives.

In [8]:
spark = (SparkSession.builder
                    .master("local[2]")
                    .appName("regression_model")
                    .getOrCreate())

In [9]:
df = (spark.read.format("com.databricks.spark.csv")
              .option("header", "true")
              .option("inferSchema", "true") 
              .load("./dataset/mtcars.csv"))

## Analyse des données

1. Show the dataframe
2. Print the schema of the dataframe
3. Statistics summary 
4. Correlations of variables

In [10]:
df.show()

+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|      Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|         Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|     Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|  Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
|            Valiant|18.1|  6|225.0|105|2.76| 3.46|20.22|  1|  0|   3|   1|
|         Duster 360|14.3|  8|360.0|245|3.21| 3.57|15.84|  0|  0|   3|   4|
|          Merc 240D|24.4|  4|146.7| 62|3.69| 3.19| 20.0|  1|  0|   4|   2|
|           Merc 230|22.8|  4|140.8| 95|3.92| 3.15| 22.9|  1|  0|   4|   2|
|           Merc 280|19.2|  6|167.6|123|3.92| 3.44| 18.3|  1|  0|   4|   4|
|          M

In [None]:
df.printSchema()

root
 |-- model: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- cyl: integer (nullable = true)
 |-- disp: double (nullable = true)
 |-- hp: integer (nullable = true)
 |-- drat: double (nullable = true)
 |-- wt: double (nullable = true)
 |-- qsec: double (nullable = true)
 |-- vs: integer (nullable = true)
 |-- am: integer (nullable = true)
 |-- gear: integer (nullable = true)
 |-- carb: integer (nullable = true)



In [None]:
# Analyse des données
df.describe().show()

+-------+-----------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|summary|      model|               mpg|               cyl|              disp|               hp|              drat|                wt|              qsec|                vs|                 am|              gear|              carb|
+-------+-----------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+
|  count|         32|                32|                32|                32|               32|                32|                32|                32|                32|                 32|                32|                32|
|   mean|       null|20.090624999999996|            6.1875|230.7218750000000

In [16]:
# Convert df to RDD to be able to use the library MultiVariateStatisticalSummary.
rdd = (df.drop("model").map(l => (l(0).asInstanceOf[Double], 
                                  l(1).asInstanceOf[Integer].toDouble, 
                                  l(2).asInstanceOf[Double],
                                  l(3).asInstanceOf[Integer].toDouble,
                                  l(4).asInstanceOf[Double],
                                  l(5).asInstanceOf[Double],
                                  l(6).asInstanceOf[Double],
                                  l(7).asInstanceOf[Integer].toDouble, 
                                  l(8).asInstanceOf[Integer].toDouble,
                                  l(9).asInstanceOf[Integer].toDouble,
                                  l(10).asInstanceOf[Integer].toDouble)).rdd)

# Convert rdd to the rdd of vectors
observations = rdd.map(l => Vectors.dense(l._1, l._2, l._3, l._4, l._5))   

# Compute column summary statistics.
summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
println("Vectors of observations' mean : " + summary.mean)  
println("Vectors of observations' variance : " + summary.variance)  
println("Vectors of observations' number of column not null : " + summary.numNonzeros)  
println()

SyntaxError: ignored

In [None]:
# Calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method
# If a method is not specified, Pearson's method will be used by default.
correlMatrix = Statistics.corr(observations, "pearson")
println(correlMatrix.toString)

## Modélisation

In this example, we don't have many variables descriptives, so we suppose that we can use all variables to build the regression model. Otherwise, we need to do a selection of variables to select the variables that affect the most the target variable. To do selection variable, depending on the type of variables, we can use different methods. In Spark, we have some basic tools to do that, for example https://spark.apache.org/docs/latest/ml-features.html#feature-selectors.


###  Vector Assembler

To prepare for the construction of linear regression by using ML library, we have to have a data with 2 columns only ("label" and "features"). To have that, we need to put all the variables descriptives into a single vector column named "features" and column of the target variable should be renamed to "label". 

In [None]:
assembler = new VectorAssembler()
  .setInputCols(Array("cyl", "disp", "hp", "drat", "wt", "qsec", "vs", "am", "gear", "carb"))
  .setOutputCol("features")
  
training = assembler.transform(data)
                        .select("mpg", "features")
                        .withColumnRenamed("mpg", "label")

Array(train, test) = data.randomSplit(Array(0.8, 0.2))                        

### Build a linear regression model 

To have the best model, we can try to fluctuate the parameters such as : number of max iterations, regularization parameters, etc. To find all the parameters supported by Spark that we can play with, you can see it in : https://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.ml.regression.LinearRegression

In [None]:
lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

### Evaluation of model 

Some other metrics that can be computed : https://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.ml.regression.LinearRegressionTrainingSummary

In [None]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

### Conclusion

Without any optimization, the quality of the model is pretty good (r2 = 0.76). In reality, we can try to optimize this indicator by removing the anomalies, selecting the most important features to train model, adding more observations or more variables and fluctuating the parameters when we train model...

### Note :

All models created in Spark can be saved in HDFS by doing : 

* model.save(sc, "file:///Apps/spark/data/mllib/testModelPath") 

To load it for future usage : 

* val sameModel = SVMModel.load(sc, "file:///Apps/spark/data/mllib/testModelPath"). 

In this example, it's SVM model, so it's SVMModel.load

Plus, for some models, we can convert it to PMML format. It's good if you knew already PMML, if not, it's also fine ;) you can read here : https://www.ibm.com/developerworks/library/ba-ind-PMML1/index.html.

You can see list of supported models in Spark here : https://spark.apache.org/docs/2.0.0-preview/mllib-pmml-model-export.html

# Exercice
## Comment peut-on transformer ce code en créant un pipeline ? 
## Comment peut-on améliorer le modèle avec une cross-validation ?


### Pre-processing
Préparer les phases de `assembler` et on va ajouter l'étape de `Standardisation` des données.
Il faut donc créer deux objets :
- VectorAssembler
- StandardScaler (https://spark.apache.org/docs/3.2.1/ml-features.html#standardscaler)

Rechercher dans la documentation les fonctions nécessaires : https://spark.apache.org/docs/3.2.1/ml-guide.html

Nous appelerons ces deux objets (ie. variables immuables "val") *assembler* et *scaler*. Ces deux premières étapes du pipeline, ont pour objectif de transformer et formater les données pour le modèle et de normaliser les données numériques, afin que les variables numériques soient comparables entre elles.

In [None]:
assembler =

In [None]:
scaler =

### Model
Créer le modèle de régression linéaire de votre choix (Linéaire simple, Lasso, Ridge, ElasticNet).
Créer les ensembles de test et d'entraînement.

In [None]:
lr =

In [None]:
train, test =

### Pipeline
Créer la chaîne pipeline avec les différentes étapes `stages`.
Puis entraîner le modèle.

In [None]:
pipeline =

In [None]:
lrModel =

### Metrics
Calculer les métriques des erreurs usuelles, qui sont le RMSE et le coefficient de détermination (noté r2).
Le coefficient de détermination est une mesure de la qualité de la prédiction d'une régression linéaire. Il représente le pourcentage de la variance expliquée par la régression (ie. des prédictions faites) sur la variance totale de la variable réelle. Cet indicateur estime donc la corrélation entre les prédictions et la réalité. Plus il est proche de 1, plus le modèle est performat.

**! Attention :**
Ici il ne suffit pas de faire `lrModel.summary` car ici `lrModel` est de type pipeline. Il faut donc extraire d'abord du pipeline la composante représentant le modèle.
Pour cela vous allez avoir besoin des objets suivants :
- la fonction .stages(*numero_stage*) --> en spécifiant le numéro de l'étape qu'on souhaite extraire
- la fonction .asInstanceOf[*type_stage*] --> en spécifiant le type de l'objet qu'on souhaite extraire du pipeline
- la librairie du *type_stage* à extraire : import org.apache.spark.ml.regression.LinearRegressionModel


In [None]:
traininSummary =

### Test
Appliquer le modèle entraîner sur l'ensemble de données de test. Vous allez donc devoir utiliser la fonction `.transform`, qui permet d'appliquer le modèle, avec les différentes étapes du pipeline qui sont nécessaires, automatiquement.

Ensuite, évaluer les prédictions effectuées à l'aide de l'indicateur **RMSE**. Pour cela vous allez avoir besoin des objets suivants :
- la librairie *org.apache.spark.ml*, contenant la fonction *evaluation.RegressionEvaluator* donc cela donne *org.apache.spark.ml.evaluation.RegressionEvaluator*
- instancier un objet, qu'on appelera *evaluator*, permettant de calculer le RMSE entre la variable réelle `Water` (en tant que LabelCol) et la variable prédite `prediction` (en tant que Predictionol).

Suivre la documentation suivante : https://spark.apache.org/docs/2.1.1/api/scala/index.html#org.apache.spark.ml.evaluation.RegressionEvaluator

Enfin, appliquer l'objet *evaluator* créé, à l'objet *predictions* (dataframe contenant les prédictions faites sur l'ensemble de test), en utilisant la fonction *.evaluate*.
Puis pour finir afficher le résultat du RMSE.

In [None]:
predictions =

In [None]:
evaluator =

In [None]:
rmse =
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

### Cross validation
L'objectif est d'utiliser la méthode de **Cross-validation** pour ajuster les différents paramètres de la régression linéaire.
Les paramètres à ajuster sont les suivants :
- regParam
- elasticNetParam

Afin de réaliser cela, nous allons tout d'abord créer une grille de recherche.
Par exemple, voici l'ensemble des valeurs que nous allons tester pour chaque paramètre :
- Pour `regParam` nous allos vouloir tester les valeurs suivantes : 0.1, 0.01, 0.2, 0.3
- Pour `elasticNetParam` nous allos vouloir tester les valeurs suivantes : 0.1, 0.8
Une fois que la grille est créée, il faut ensuite créer le modèle de cross-validation, puis pour terminer l'appliquer à l'ensemble d'entraînement.

En étudiant la documentation construisez les objets suivants :
- paramGrid
- cv
- cvModel

Enfin tester à nouveau le nouveau modèle `cvModel` sur l'ensemble de test.

In [None]:
paramGrid =

In [None]:
cv =

In [None]:
cvModel = 

In [None]:
predictionsCvModel =

In [None]:
evaluatorCvModel =

In [None]:
rmse =
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

# Correction
à venir

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator

assembler = new VectorAssembler()
  .setInputCols(Array("cyl", "disp", "hp", "drat", "wt", "qsec", "vs", "am", "gear", "carb"))
  .setOutputCol("features")

scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)
  
lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
  .setLabelCol("mpg")

Array(train, test) = data.randomSplit(Array(0.8, 0.2)) 

pipeline = new Pipeline()
  .setStages(Array(assembler, scaler, lr))
  
lrModel = pipeline.fit(train)

# Summarize the model over the training set and print out some metrics
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

# Make predictions.
predictions = lrModel.transform(test)

# Select example rows to display.
predictions.select("prediction", "mpg", "features").show(5)

# Select (prediction, true label) and compute test error.
evaluator = new RegressionEvaluator()
  .setLabelCol("mpg")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
  
rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")