## Spark pour le Machine learning

Depuis Spark 2.0, on utilise des DataFrame au lieu des RDD combiné à la bibliothèque spark.ml

Attention : ce notebook ne fonctionne que si vous le lancez depuis un serveur sur lequel spark et pyspark sont installés avec une version >2.0

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression


On va commencer par générer des données

In [None]:
train = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["cible", "variables_expl"])


On crée un modèle de régression logistique et on fait l'apprentissage sur les données

In [None]:
lr = LogisticRegression(maxIter=10, regParam=0.01)

In [None]:
model_lr = lr.fit(train)

On va créer un jeu de test

In [None]:
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["cible", "variables_expl"])

On prédit avec le modèle et on récupère les résultats

In [None]:
prediction = model2.transform(test)
selected = prediction.select("cible", "variables_expl", "probability", "prediction")
for row in selected.collect():
    print row

### Construction d'un pipeline

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

On veut enchainer plusieurs traitements, on va donc construire un pipeline

On génère de nouvelles données

In [None]:
training = spark.createDataFrame([
    (0L, "a b c d e spark", 1.0),
    (1L, "b d", 0.0),
    (2L, "spark f g h", 1.0),
    (3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])

test = spark.createDataFrame([
    (4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")], ["id", "text"])

On peut créer le pipeline de traitement

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

On prédit à partir de notre modèle

In [None]:
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print(row)

### Ajustement des hyper-paramètres

On va utiliser un Grid Search pour ajuster les hyper-paramètres

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

On crée les données

In [None]:
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

test = spark.createDataFrame([
    (4L, "spark i j k"),
    (5L, "l m n"),
    (6L, "mapreduce spark"),
    (7L, "apache hadoop")
], ["id", "text"])

On crée les modèles

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

On va construire la grille

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

On applique cette grille avec une méthode de validation croisée

In [None]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)  

cvModel = crossval.fit(training)

On fait de la prédiction en utilisant le modèle de validation croisée

In [None]:
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)