# Projet 8: Déployez un modèle dans le cloud

## Install Packages

In [0]:
!pip install Pandas 
!pip install pillow 
!pip install tensorflow 
!pip install pyspark 
!pip install pyarrow
!pip install mlflow
!pip install hyperopt
!pip install databricks-cli
!pip install fastparquet
!pip install opencv-python
!pip install xgboost

dbutils.library.restartPython()

## Import Librairies

In [0]:
import pandas as pd
import numpy as np
import io as io
import seaborn as sns
import math as math
import cv2 as cv2
import timeit as timeit
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.express as px
import pickle as pickle
import time as time
import logging as logging
import tensorflow as tf
import tensorflow.keras
import os as os
import re as re
import IPython.display as display
import matplotlib.pyplot as plt
import xgboost as xgb

from PIL import Image
from PIL import Image, ImageOps, ImageEnhance
from PIL import ImageFilter

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from tensorflow.keras.utils import load_img, img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.applications.vgg16 import decode_predictions
from keras.applications.vgg16 import VGG16
from keras.models import Model

from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras import metrics as kmetrics
from tensorflow.keras.layers import *
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.models import Model

from os import listdir




## Initialisation

Pour la 1e version, qui sera déployé en locale, nous partons du principe que les données sont stockées dans le même répertoire que le notebook. Nous n'utilisons qu'un extrait de 50 images à traiter dans une première version, qui sera déployé en local. L'extrait des images à charger est stockée dans le dossier Test1. Nous enregistrerons le résultat de notre traitement dans le dossier "Results_Local". Avant de commencer il est nécesair d'installer et de deployer Apache Spark sur son poste.

In [0]:
%fs mkdirs /FileStore/data 

In [0]:
%fs mkdirs /FileStore/data/Train

In [0]:
%fs mkdirs /FileStore/data/Train/fraise

In [0]:
%fs mkdirs /FileStore/data/Train/figue

In [0]:
%fs mkdirs /FileStore/data/Train/banana

In [0]:
%fs mkdirs /FileStore/data/Test

In [0]:
%fs mkdirs /FileStore/data/Test/fraise

In [0]:
%fs mkdirs /FileStore/data/Test/figue

In [0]:
%fs mkdirs /FileStore/data/Test/banana

In [0]:
%fs mkdirs /FileStore/data/Results_Train

In [0]:
%fs mkdirs /FileStore/data/Results_Test

In [0]:
# Définition  PATH pour charger les images et enregistrer les résultats

PATH = '/FileStore'
PATH_Train = PATH+'/data/Train'
PATH_Test = PATH+'/data/Test'
PATH_Train_Results = PATH+'/data/Results_Train'
PATH_Test_Results = PATH+'/data/Results_Test'
print('PATH:        '+\
      PATH+'\nPATH_Train:  '+\
      PATH_Train+'\nPATH_Train_Results: '+PATH_Train_Results)
print('PATH:        '+\
      PATH+'\nPATH_Test:   '+\
      PATH_Test+'\nPATH_Test_Results: '+PATH_Test_Results)

Comme decrit plus haut, l’application Spark est contrôlée grâce à un processus de pilotage (driver process) appelé SparkSession. Une instance de SparkSession est la façon dont Spark exécute les fonctions définies par l’utilisateur dans l’ensemble du cluster. Une SparkSession correspond toujours à une application Spark. Dans notre example nous allons créer une session Spark en spécifiant les paramètres suivants:

 1. appName: un nom pour l'application, qui sera affichée dans l'interface utilisateur Web Spark "**P8**"
 2. master: un paramètre pour indiquer que l'application doit s'exécuter localement. 
 3. config: une option de configuration supplémentaire permettant d'utiliser le **format "parquet"** 
 4. getOrCreate: un paramètre pour obtenir une session spark existante ou si aucune n'existe, en créer une nouvelle

## Traitement des données

<u>Dans la suite de notre flux de travail, 
nous allons successivement</u> :
1. Préparer nos données
    1. Importer les images dans un dataframe **pandas UDF**
    2. Associer aux images leur **label**
    3. Préprocesser en **redimensionnant nos images pour 
       qu'elles soient compatibles avec notre modèle**
2. Préparer notre modèle
    1. Importer le modèle **MobileNetV2**
    2. Créer un **nouveau modèle** dépourvu de la dernière couche de MobileNetV2
3. Définir le processus de chargement des images et l'application 
   de leur featurisation à travers l'utilisation de pandas UDF
3. Exécuter les actions d'extraction de features
4. Enregistrer le résultat de nos actions
5. Tester le bon fonctionnement en chargeant les données enregistrées

In [0]:
images_Train = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Train)
  
images_Test = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Test)

In [0]:
images_Train = images_Train.withColumn('label', element_at(split(images_Train['path'], '/'),-2))
print(images_Train.select('path','label').show(10,False))

images_Test = images_Test.withColumn('label', element_at(split(images_Test['path'], '/'),-2))
print(images_Test.select('path','label').show(10,False))

### Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images.
J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée 
à d'autres modèles comme *VGG16* par exemple.

Pour en savoir plus sur la conception et le fonctionnement de MobileNetV2, 
je vous invite à lire [cet article](https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c).

<u>Voici le schéma de son architecture globale</u> : 

![Architecture de MobileNetV2](img/mobilenetv2_architecture.png)

Il existe une dernière couche qui sert à classer les images 
selon 1000 catégories que nous ne voulons pas utiliser.
L'idée dans ce projet est de récupérer le **vecteur de caractéristiques 
de dimensions (1,1,1280)** qui servira, plus tard, au travers d'un moteur 
de classification à reconnaitre les différents fruits du jeu de données.

Comme d'autres modèles similaires, **MobileNetV2**, lorsqu'on l'utilise 
en incluant toutes ses couches, attend obligatoirement des images 
de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), 
nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** 
    issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec:
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [0]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
new_model.summary()


Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. 
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser 
ensuite les poids aux différents workeurs.

In [0]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [0]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [0]:
#Preprocessing: resize the images to 224*224
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### Extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), 
peuvent rencontrer des erreurs de type Out Of Memory (OOM).
Si vous rencontrez de telles erreurs dans la cellule ci-dessous, 
essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet 
et je laisse donc la commande en commentaire.

In [0]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.
<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter. 

Notre jeu de données de **Test** contient **22819 images**. 
Cependant, dans l'exécution en mode **local**, 
nous <u>traiterons un ensemble réduit de **330 images**</u>.

In [0]:
features_df_Train = images_Train.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )
features_df_Test = images_Test.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" 
contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [0]:
print(PATH_Train_Results)
print(PATH_Test_Results)

<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [0]:
features_df_Train.write.mode("overwrite").parquet(PATH_Train_Results)
features_df_Test.write.mode("overwrite").parquet(PATH_Test_Results)

## Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [0]:
df_Train = spark.read.parquet(PATH_Train_Results)
df_Train.show()
df_Train = df_Train.toPandas()
df_Train

In [0]:
df_Test = spark.read.parquet(PATH_Test_Results)
df_Test.show()
df_Test = df_Test.toPandas()
df_Test

In [0]:
print("Train: ",df_Train.loc[0,'features'].shape)
print("Test: ",df_Test.loc[0,'features'].shape)


In [0]:
print("Train features vector: \n")
train_images=np.array(df_Train['features'].values.tolist())
print(train_images)
print("\n")
print("Test features vector: \n")
test_images=np.array(df_Test['features'].values.tolist())
print(test_images)

Nous venons de valider le processus sur un jeu de données allégé en local 
où nous avons simulé un cluster de machines en répartissant la charge de travail 
sur différents cœurs de processeur au sein d'une même machine.

Nous allons maintenant généraliser le processus en déployant notre solution 
sur un réel cluster de machines et nous travaillerons désormais sur la totalité 
des 22819 images de notre dossier "Test".

In [0]:
train_labels=df_Train['label']
test_labels=df_Test['label']

#Encode labels from text to integers.
from sklearn import preprocessing
le = preprocessing.LabelEncoder()

le.fit(train_labels)
train_labels_encoded = le.transform(train_labels)
le.fit(test_labels)
test_labels_encoded = le.transform(test_labels)

In [0]:

#Split data into test and train datasets (already split but assigning to meaningful convention)
X_train, y_train, X_test, y_test = train_images, train_labels_encoded, test_images, test_labels_encoded


In [0]:
X_train

In [0]:
import mlflow
# Enable MLflow autologging for this notebook
mlflow.autolog()

In [0]:
with mlflow.start_run(run_name='gradient_boost') as run:
    
# Train the XGBoost model on the training data 
    import xgboost as xgb
    models = xgb.XGBClassifier(random_state=42)
    models.fit(X_train, y_train)

    #Now predict the class using the test dataset by means of the trained XGBoost model. 
    prediction = models.predict(X_test)

    #Transform the integer label back to its original name by means of the label encoder inverse function
    prediction_decoded = le.inverse_transform(prediction)

    #Print overall accuracy
    from sklearn import metrics
    print ("Accuracy = ", metrics.accuracy_score(test_labels, prediction_decoded))



In [0]:
#Confusion Matrix - verify accuracy of each class
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(test_labels, prediction_decoded)
sns.heatmap(cm, fmt='g', annot=True)

In [0]:
import sklearn.datasets
import sklearn.metrics
import sklearn.model_selection
import sklearn.ensemble
from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope

# Define the search space to explore
search_space = {
  'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
  'learning_rate': hp.loguniform('learning_rate', -3, 0),
  'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
}
 
def train_model(params):
  # Enable autologging on each worker
  mlflow.autolog()
  with mlflow.start_run(nested=True):
    model_hp = xgb.XGBClassifier(
      random_state=0,
      **params
    )
    model_hp.fit(X_train, y_train)
    prediction = model_hp.predict(X_test)
    prediction_decoded = le.inverse_transform(prediction)
    # Tune based on the test AUC
    # In production settings, you could use a separate validation set instead
    accuracy = sklearn.metrics.accuracy_score(test_labels, prediction_decoded)
    mlflow.log_metric('test_accuracy', accuracy)
    
    # Set the loss to -1*auc_score so fmin maximizes the auc_score
    return {'status': STATUS_OK, 'loss': -1*accuracy}
 
# SparkTrials distributes the tuning using Spark workers
# Greater parallelism speeds processing, but each hyperparameter trial has less information from other trials
# On smaller clusters or Databricks Community Edition try setting parallelism=2
spark_trials = SparkTrials(
  parallelism=8
)
 
with mlflow.start_run(run_name='gb_hyperopt') as run:
  # Use hyperopt to find the parameters yielding the highest AUC
  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest, 
    max_evals=8,
    trials=spark_trials)

In [0]:
# Sort runs by their test auc; in case of ties, use the most recent run
best_run = mlflow.search_runs(
  order_by=['metrics.test_accuracy DESC', 'start_time DESC'],
  max_results=10,
).iloc[0]
print('Best Run')
print('Accuracy: {}'.format(best_run["metrics.test_accuracy"]))
#print('Num Estimators: {}'.format(best_run["params.n_estimators"]))
print('Max Depth: {}'.format(best_run["params.max_depth"]))
print('Learning Rate: {}'.format(best_run["params.learning_rate"]))
 
best_model_pyfunc = mlflow.pyfunc.load_model(
  'runs:/{run_id}/model'.format(
    run_id=best_run.run_id
  )
)


In [0]:
with mlflow.start_run(run_name='gradient_boost') as run:
    
# Train the XGBoost model on the training data 
    import xgboost as xgb
    models = xgb.XGBClassifier(random_state=42, max_depth=4, learning_rate=0.22334119485819626)
    models.fit(X_train, y_train)

    #Now predict the class using the test dataset by means of the trained XGBoost model. 
    prediction = models.predict(X_test)

    #Transform the integer label back to its original name by means of the label encoder inverse function
    prediction_decoded = le.inverse_transform(prediction)

    #Print overall accuracy
    from sklearn import metrics
    print ("Accuracy = ", metrics.accuracy_score(test_labels, prediction_decoded))