<small><i>Ce notebook a été créé par Yann Vernaz (2017).</i></small>

<center>
<a href="https://www.meetup.com/fr-FR/Meetup-Machine-Learning-Pau/" ><img src="img/meetup_logo.png" style="float:left; max-width: 100px; display: inline" alt="Meetup"/></a> 
<a href="https://www.meetup.com/fr-FR/Meetup-Machine-Learning-Pau/" ><img src="img/meetup_ML_pau.png" style="float:center; max-width: 250px; display: inline"  alt="Meetup Machine Learning Pau"/></a>
<a href="http://www.helioparc.com" ><img src="img/helioparc_logo.svg" style="float:right; max-width: 200px; display: inline" alt="Technopole Héloparc"/> </a>
</center>
<br>
<hr>
<center><h1>Optimisation distribuée avec Apache Spark</h1></center>
<hr>
<center><h2>Lab 0 - Test de l'image Docker</h2></center>
<hr>

Dans ce _Notebook_ nous allons tester que notre image _Docker_ fonctionne correctement. Nous allons vérifier que l'on peut créer l'environnement _Spark_ et ensuite explorer les données fournies pour les travaux pratiques.

<center>
<h1>Création de l'environnement</h1>
<br>
<a href="http://spark.apache.org" ><img src="img/spark_logo.png" style="float:center; max-width: 320px; display: inline" alt="Apache Spark"/></a>
</center>

**Rappel**
>Dans un _Notebook Jupyter_, il faut appuyez sur `[Ctrl-Enter]` pour exécuter le contenu d'une cellule ou appuyer sur le bouton `run cell` dans la barre des outils.

In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

# using Spark local mode set to # cores on your machine
conf = SparkConf()
conf.setMaster("local[*]")
conf.setAppName("Meetup Machine Learning Pau")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

print("PySpark version:" + str(sc.version))

PySpark version:2.2.0


L'interface **Spark UI** est alors accessible dans votre navigateur web à partir de l'adresse http://localhost:4040. Cette interface affiche des informations utiles sur l'application (on en discutera lors des travaux pratiques).

- Une liste des étapes et des tâches du planificateur.
- Informations sur l'utilisation de la mémoire (taille des `RDD`, ...).
- Informations sur l'environnement.
- Informations sur les exécuteurs/noeuds (_workers_) en cours d'exécution.
- ...

Note - Cette interface permet de réaliser un _monitoring_ à minima de notre session _Spark_ en cours d'exécution. Plusieurs outils externes peuvent être également utilisés pour profiler les performances de _Spark_, par exemple [Ganglia](http://ganglia.sourceforge.net).

Normalement vous devez voir ceci dans votre navigateur :

<br>
<center>
<a href="http://spark.apache.org/docs/latest/monitoring.html" ><img src="img/screen_shot_Spark_UI.png" style="float:center; display: inline" alt="Apache Spark UI"/></a>
</center>

Maintenant vous pouvez utiliser _Spark_ en mode _local_ (sur votre ordinateur). Dans la suite, nous allons découvrir les données que nous utiliserons lors des travaux pratiques.

<center>
<h1>Les données</h1>
<br>
</center>
<p>Les données (fichier [retail.csv](retail.csv)) sont issues du programme de fidélité d'une grande chaîne de distribution alimentaire. Ce programme (_i.e._ carte de fidélité) permet de collecter des données sur les transactions des clients. Seule une petite fraction des données sont fournies (8 Mo sur plusieurs centaines de To). L'objectif est de prédire la probabilité du risque de désabonnement et ainsi détecter au plus tôt les clients succeptibles de quitter l'enseigne. Les données sont labéllisées (on connait les clients infidèles) et lors des travaux pratiques nous mettrons en oeuvre une méthode de classification supervisée simple (la régression logistique).</p>

Chaque ligne représente un client et chaque colonne contient les attributs suivants:

| Variable  | Description  |  Example |
|:--------------------- |:-------------- | ---------- |
| _clientId_  | Identifiant unique du client. | 901000010282532503 |
| _meanAcquiredPts_  | Nombre moyen de points acquis. | 0.40 |
| _meanConvertedPts_ | Nombre moyen de points convertis. | 0.0 |
| _relationLength_   | Durée de la relation (en nombre de jours). | 638 |
| _stdFreq_          | Écart-type de la fréquence des achats (en jours). | 47.60 |
| _meanArticle_      | Nombre d'articles moyen achetés à chaque visite. | 12.33 |
| _meanAmount_       | Panier moyen (en Euros). | 33.78 |
| _nbVisits_         | Nombre de visites depuis le début de la relation. | 24 |
| _meanFreq_         | Fréquence moyenne des achats (en jours). | 26.58 |
| _lastTime_         | Temps écoulé depuis la dernière visite. | 33|
| _stdAmount_        | Écart-type sur le montant du panier (en Euros). | 21.36 | 
| _stdArticle_       | Écart-type sur le nombre d'articles dans le panier. | 7.98 |
    
La variable cible (_label_) est donnée par _churn?_ qui vaut $\color{red}{1}$ si le client a quitté l'enseigne et $\color{blue}{0}$ sinon.

## Chargement des données

In [2]:
import os.path

inputFile = "retail.csv"

if os.path.isfile(inputFile) != True:
    print("Le fichier de données "+ inputFile + " n'existe pas.")
else:
    print("Le fichier de données " + inputFile + " existe.")
    print("Vous pouvez continuer.")

Le fichier de données retail.csv existe.
Vous pouvez continuer.


In [3]:
retailRDD = sc.textFile(inputFile).map(lambda line: line.split(','))

**Remarque** - Cette ligne de code crée une variable `retailRDD` (techniquement un `RDD`) qui pointe vers le fichier de données. La nature paresseuse (i.e. _lazy_) de `Spark` signifie qu'il n'exécute pas le code. Il attend une _action_ qui nécessite un calcul.

In [4]:
# nous supprimons l'en-tête du fichier
header = retailRDD.first()
retailRDD = retailRDD.filter(lambda line: line != header)

On vient de réaliser une _action_ (une exécution) en utilisant la fonction `.first()` qui récupère le premier élément du `RDD`, ici l'en-tête du fichier. Le `.filter()`comme le `.map()` sont des _transformations_ qui n'exécutent pas de code. 

Si vous regardez l'interface **Spark UI** vous devez voir ceci (ce qui correspond à l'action `.first()`) :

<br>
<center>
<img src="img/screen_shot_Spark_UI_2.png" style="float:center; display: inline" alt="Apache Spark UI"/>
</center>

Comptons le nombre d'exemples, de variables (ou _features_) et la proportion de la classe "1" (clients qui ont quitté l'enseigne).

In [5]:
numExamples = retailRDD.count()
numFeatures = len(retailRDD.take(1)[0])-1
numClass1   = retailRDD.map(lambda row: row[0]=="1").sum()
numClass0   = retailRDD.map(lambda row: row[0]=="0").sum()

print("Le jeux de données contient {} clients et a {} attributs. \
\nIl y a {:.2f}% des clients qui ont quittés l'enseigne." \
      .format(numExamples, numFeatures, numClass1/float(numExamples)*100))

Le jeux de données contient 57930 clients et a 12 attributs. 
Il y a 4.87% des clients qui ont quittés l'enseigne.


## Pré-traitements

Dans un premier temps nous allons transformer notre `RDD` pour pouvoir utiliser le format <a href="http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint">`LabeledPoint`</a> qui est plus simple à manipuler. Un `LabeledPoint` se  compose d'un _label_ et d'un vecteur de _features_.

In [6]:
from pyspark.mllib.regression import LabeledPoint

# On transforme notre RDD de tuple en un RDD de LabelPoint.
# Par le même occasion on retire clientID des variables.
labelPointRDD = retailRDD.map(lambda line: LabeledPoint(line[0],[line[2:]]))

In [7]:
labelPointRDD.take(1)

[LabeledPoint(0.0, [1.30818181818,0.0,595.0,81.4560673559,36.0,114.705454545,11.0,54.0909090909,10.0,103.729869022,33.9352324288])]

Nous avons à présent un `RDD` dont chaque élément est un `LabelPoint` qui contient le _label_ et le vecteur des 11 _features_.

La seconde étape (importante) de l'apprentissage par régression consiste à :

* Ajouter un _intercept_, c'est une variable supplémentaire égale à $1$.

* Normaliser les données pour avoir une moyenne nulle et une variance unitaire pour chaque variable (sauf l'_intercept_). 

In [8]:
from operator import add
import numpy as np

# mean vector
mean = labelPointRDD.map(lambda row: row.features.toArray()).reduce(add)/numExamples

# std
std = np.sqrt(labelPointRDD.map(lambda row: np.power(row.features.toArray()-mean, 2)).reduce(add))

# scaled features
data_scaled = labelPointRDD.map(lambda row: LabeledPoint(row.label, np.append((row.features.toArray()-mean)/std, 1.0)))

In [9]:
data_scaled.take(1)

[LabeledPoint(0.0, [-0.000414035031732,-0.0024740538601,-0.00341258853829,0.0124919073001,0.00520893750325,0.00738817026779,-0.00443563405927,0.0186510548985,-0.00103584854704,0.0131807431244,0.0109816170485,1.0])]

Pour terminer nous allons séparer notre jeux de données en deux parties : 

* Un ensemble de données d'entraînement sur lequel nous estimerons le modèle ($70\%$).
* Un ensemble de données de test sur lequel nous testerons nos prédictions ($30\%$). 

Nous utilisons la fonction <a href="http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.randomSplit"> `randomSplit` </a> qui divise aléatoirement un RDD.

In [10]:
weights = [.7, .3]
seed = 42

trainRDD, testRDD = data_scaled.randomSplit(weights, seed)

numExamplestrain = trainRDD.count()
numClass1train   = trainRDD.map(lambda ex: ex.label==1.0).sum()

numExamplestest = testRDD.count()
numClass1test   = testRDD.map(lambda ex: ex.label==1.0).sum()

print("[Total set]     %d examples (%d in class +1)." % (numExamples, numClass1))
print("[Training set]  %d examples (%d in class +1)." % (numExamplestrain, numClass1train))
print("[Testing set]   %d examples (%d in class +1)." % (numExamplestest, numClass1test))

[Total set]     57930 examples (2822 in class +1).
[Training set]  40561 examples (2001 in class +1).
[Testing set]   17369 examples (821 in class +1).


In [11]:
trainRDD.take(1)

[LabeledPoint(0.0, [-0.000414035031732,-0.0024740538601,-0.00341258853829,0.0124919073001,0.00520893750325,0.00738817026779,-0.00443563405927,0.0186510548985,-0.00103584854704,0.0131807431244,0.0109816170485,1.0])]

In [12]:
# pour arrêter Spark
sc.stop()