# PROJET 8 : "Déployer un modèle dans le cloud"

Les objectifs de ce notebook Databricks sont :
  - connecter Databricks avec une instance de stockage AWS S3
  - charger les images stockées sur AWS S3
  - effectuer un préprocessing de ces images
  - effectuer une réduction de dimension
  - écrire le résultat de ce processing dans un fichier accessible sur AWS S3

In [0]:
!pip install tensorflow

In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

# Chargement des librairies
import datetime
import io
import sys
import time
from datetime import datetime
import pandas as pd
import numpy as np

# Visualisation
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

# Pyspark
import pyspark
from pyspark.sql.functions import element_at, split, col, pandas_udf, PandasUDFType, udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession


# Gestion des images
import PIL
from PIL import Image, ImageOps

# Taches ML
from pyspark.ml.image import ImageSchema
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, PandasUDFType

from pyspark.sql.functions import split, element_at

# Réduction de dimension - PCA
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler

# Conversion des données en vecteurs
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.sql.functions import udf

## 1. Connecter AWS S3 avec Databricks

##### 1.1. Chargement du fichier "credentials" que nous avons téléchargé depuis AWS IAM et contenant ID de connection et le code secret pour se connecter au bucket AWS S3

In [0]:
# Check the contents in tables folder
dbutils.fs.ls("/FileStore/tables")

Out[5]: [FileInfo(path='dbfs:/FileStore/tables/credentials.csv', name='credentials.csv', size=200),
 FileInfo(path='dbfs:/FileStore/tables/house_prices.csv', name='house_prices.csv', size=2016),
 FileInfo(path='dbfs:/FileStore/tables/new_user_credentials-1.csv', name='new_user_credentials-1.csv', size=207),
 FileInfo(path='dbfs:/FileStore/tables/new_user_credentials.csv', name='new_user_credentials.csv', size=205)]

In [0]:
# Define file type
file_type = "csv"
# Whether the file has a header
first_row_is_header = "true"
# Delimiter used in the file
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/credentials.csv")
#display(aws_keys_df)

##### 1.2. Extraction de l'ID de connexion et le code secret du dataframe spark

In [0]:
dbutils.fs.unmount("/mnt/dbp8v2")

/mnt/dbp8v2 has been unmounted.
Out[7]: True

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks_P8').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks_P8').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

##### 1.3. Etablissement de la connexion entre Databricks et le bucket AWS S3

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = "dbp8v2"
# Mount name for the bucket
MOUNT_NAME = "/mnt/dbp8v2"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

Out[9]: True

In [0]:
# Check if the AWS S3 bucket was mounted successfully
display(dbutils.fs.ls("/mnt/dbp8v2/"))

path,name,size
dbfs:/mnt/dbp8v2/output_features_parquet/,output_features_parquet/,0
dbfs:/mnt/dbp8v2/training_sample/,training_sample/,0


## 2. Chargement et lecture des données stockées sur le bucket AWS S3

In [0]:
# Chemin de stockage des images du jeu de données
path_train_set = "/mnt/dbp8v2/training_sample/*/*"

In [0]:
# Chargement des images du train set au format "binaryFile"
df_binary_train = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(path_train_set)

In [0]:
# Nombre d'images? (pour ce notebook j'ai pris un sample des données)
df_binary_train.count()

Out[13]: 1263

In [0]:
# Affichage des premières lignes du  dataframe contenant les images
df_binary_train.show(5)

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|dbfs:/mnt/dbp8v2/...|2022-01-14 04:40:27|106833|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|2022-01-14 04:40:23|104151|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|2022-01-14 04:40:29|104055|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|2022-01-14 04:40:24|103763|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|2022-01-14 04:40:21|102207|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows



## 3. Processing des données

##### 3.1. Extraction des features

* Plusieurs méthodes existent pour extraires les features les plus importantes des images (parmis lesquelles les méthodes ORB, SIFT, SURF, modèle de transfert learning)
* Nous allons utiliser sur algorithme Resnet50 pré-entrainé sur ImageNet afin d'extraire les features de nos images
* Pour cela nous supprimerons la dernière couche (softmax) de notre algorithme (à l'aide du paramètre : include_top=False)

In [0]:
# Préparation du dataframe
df_images = df_binary_train.select("path", "content")
df_images.show(5)

+--------------------+--------------------+
|                path|             content|
+--------------------+--------------------+
|dbfs:/mnt/dbp8v2/...|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|[FF D8 FF E0 00 1...|
|dbfs:/mnt/dbp8v2/...|[FF D8 FF E0 00 1...|
+--------------------+--------------------+
only showing top 5 rows



###### 3.2.1. Préparation du modèle ResNet50

* Utilisation d'un ResNet50 pré-entrainé
* On retire la couche fully-connected (include_top=False)

In [0]:
# Instanciation du modèle
model = ResNet50(include_top=False)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5


In [0]:
# Permettre aux workers Spark d'accéder aux poids utilisés par le modèle
bc_model_weights = sc.broadcast(model.get_weights())

# On utilise le broadcast de SparkContext pour partager dans le cluster des valeurs  (ICI les poids du modéle) en lecture-seule
# Cela permet de réduire les coûts de communication 

def model_fn():
    """
    Retourne un modèle ResNet50 avec la couche supérieure enlevée (fully-connected) et 
    des pondérations en broadcast déjà pré-entraînés. 
    On retire la couche supérieure car c'est celle qui permet de faire une classification, or 
    ici on se sert du modéle pour extraire des features.
    
    Pooling = "avg" est utilisé pour réduire le nombre de caractéristiques à 2048
    """

    model = ResNet50(weights=None, include_top=False, pooling='avg')
    #on ajoute les pondérations
    model.set_weights(bc_model_weights.value)
    
    return model

In [0]:
def preprocess(content):
    """
    Fonction de pre-processing d'images.
    """
    # Redimensionnement de l'image : Resnet 50 ne prend en charge que des images de taille 224x224
    img = Image.open(io.BytesIO(content)).resize([224, 224])   
    
    # Changer le type d'image en matrice  
    img2 = np.asarray(img) 
    
     # Préparation au pre-process de Keras
    arr = img_to_array(img2)
    return preprocess_input(arr) 
  
  
def featurize_series(model, content_series):
    """
    Retourne un pd.Series des features de l'image
    """
    
    input = np.stack(content_series.map(preprocess))
  # Extraction des features des images
    preds = model.predict(input)
  # Pour certaines couches, les caractéristiques de sortie sont des tensors multidimensionnels
  # On aplatit les caractéristiques de tensors en vecteurs pour faciliter le stockage dans les dataframes Spark
  # la fonction flatten() envoie une copie du tableau réduit à une seule dimension 
    output = [p.flatten() for p in preds]
    return pd.Series(output)  


In [0]:
@pandas_udf('array<double>', PandasUDFType.SCALAR_ITER)

def featurize_udf(content_series_iter):
    '''
    Cette méthode est un Itérateur Scalaire (pandas UDF signifiant User-Defined Functions) qui complète
    la fonction de featurisation.
    Cela renvoie une colonne Spark DataFrame de type ArrayType(FloatType).
  
    param content_series_iter : Cet argument est un itérateur sur des lots de données, où chaque lot
    est une série de données d'images pandas.
    '''
    
  # Avec les pandas UDF Scalar Iterator, on peut charger le modèle une fois et le réutiliser ensuite
  # pour plusieurs lots de données. Cela permet d'amortir les frais liés au chargement de grands modèles.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [0]:
# On lance la recherche de features sur notre dataframe Spark 

df_features = df_images.repartition(4).select(col("path"), featurize_udf("content").alias("features"))

##### 3.4. Labélisation

* Il est mportant de créer une colonne label dans notre dataframe car ces données serviront d'entrée à des algorithmes de classication.
* Nous pouvons constater que la classe de l'image est définie dans le nom de son répertoire.
* Nous allons donc extraire le nom de classe depuis chaque répertoire.

In [0]:
# Ajout dans la colonne label pour chaque image traitée de l'avant dernier élément du nom du répertoire de stockage de l'image==>df_binary_train["path"]
# On ajoute une colonne 'label' au dataframe des features

#on récupére le type de fruit à partir du chemin de l'image.
df_features_label = df_features.withColumn('label', element_at(split(df_features['path'],"/"),-2))

In [0]:
# Visualisation des 5 premières images avec la classe
df_features_label.show(5)

+--------------------+--------------------+---------------+
|                path|            features|          label|
+--------------------+--------------------+---------------+
|dbfs:/mnt/dbp8v2/...|[0.73054784536361...|cabbage_white_1|
|dbfs:/mnt/dbp8v2/...|[1.64908504486083...|cabbage_white_1|
|dbfs:/mnt/dbp8v2/...|[1.95849287509918...|cabbage_white_1|
|dbfs:/mnt/dbp8v2/...|[0.54736912250518...|cabbage_white_1|
|dbfs:/mnt/dbp8v2/...|[0.58331966400146...|cabbage_white_1|
+--------------------+--------------------+---------------+
only showing top 5 rows



## 4. Réduction de dimension avec une Analyse en composantes principales (PCA)

##### 4.1. Préparation des données (conversion des features en vecteurs)

In [0]:
#Construction d'une User defined Fonction qui transforme les listes de features en vecteurs denses.
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

# On applique cette UDF à la colonne "features" de notre dataframe 
df_with_vectors = df_features_label.select( df_features_label["path"],
                                            df_features_label["label"],
                                           list_to_vector_udf( df_features_label["features"]).alias("features"))

##### 4.2. Détermination du nombre optimal de composantes principales

In [0]:
# Recherche du nombre optimal de dimensions pour la réduction dimensionnelle des features

num_components = 2048
pca_opt = PCA(k = num_components,
         inputCol="features", 
         outputCol="features_pca")

model = pca_opt.fit(df_with_vectors)
scree = model.explainedVariance# Pré-processing (vecteur dense, standardisation)

In [0]:
# Scree plot pour déterminer le nombre optimal de composantes principales

import matplotlib.pyplot as plt
import numpy as np
plt.figure(figsize=(11,6))
plt.bar(np.arange(len(scree))+1, scree)
plt.plot(np.arange(len(scree))+1, scree.cumsum(),c="red",marker='o')
plt.xlabel("Nombre de composantes principales")
plt.ylabel("Variance expliquée en pourcentage")
plt.title("Scree plot")
plt.grid()
plt.show(block=False)

##### 4.2. Recherche du nombre composantes expliquant 95 % de la variance

In [0]:
# On chercher à  trouver le nombre de composantes principales à conserver.
# On se fixe ici de converser au moins 95 % de l'inertie .
nbr_pca=0
for i in range(50):
    a = scree.cumsum()[i]
    if a >= 0.95:
        print("{} composantes principales expliquent au moins 95% de la variance totale".format(i))
        print("Valeur exacte de variance expliquée:{}%".format(a*100))
        nbr_pca=i
        break

##### 4.3. Réduction de dimension suivant le nombre de dimensions choisies (k)

In [0]:
# On pratique la PCA  sur la colonne "Feature" de notre dataframe et on recuperera le resultat dans une nouvelle colonne "Feature_pca"
pca = PCA(k=40,inputCol="features",outputCol="features_pca")

# Entraînement du PCA
model = pca.fit(df_with_vectors)

In [0]:
# Dataframe post-PCA

df_pca = model.transform(df_with_vectors)
df_pca.show(5)

+--------------------+---------------+--------------------+--------------------+
|                path|          label|            features|        features_pca|
+--------------------+---------------+--------------------+--------------------+
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[0.73054784536361...|[2.58817404745408...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[1.64908504486083...|[3.21223116140212...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[1.95849287509918...|[1.89440665733666...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[0.54736912250518...|[2.68806227425664...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[0.58331966400146...|[2.28526925334920...|
+--------------------+---------------+--------------------+--------------------+
only showing top 5 rows



In [0]:
# Dataframe final avec chemin vers images, labels et features obtenus par PCA (on enlève la colonne features)

df_final = df_pca[['path','label','features_pca']]
df_final.show(5)

+--------------------+---------------+--------------------+
|                path|          label|        features_pca|
+--------------------+---------------+--------------------+
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[2.58817404745408...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[3.21223116140212...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[1.89440665733666...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[2.68806227425664...|
|dbfs:/mnt/dbp8v2/...|cabbage_white_1|[2.28526925334920...|
+--------------------+---------------+--------------------+
only showing top 5 rows



## 5. Sauvegarde des données sur le bucket AWS S3

On sauvegarde nos données finales directement sur notre bucket Amazon S3

In [0]:
# Sauvegarde des données
#df_final.write.mode("overwrite").parquet("/mnt/dbp8v2/output_features_parquet/df_final")

In [0]:
# Vérification : Est-ce-que le fichier a bien été enregistré dans le repertoire "output_features_parquet" de notre bucket S3 ?
#display(dbutils.fs.ls("/mnt/dbp8v2/output_features_parquet/df_final"))

## Conclusion

Nous avons bien réalisé les objectifs fixés :

* connecter Databricks avec une instance de stockage AWS S3 ("/mnt/dbp8v2")
* charger les images stockées sur un bucket AWS S3 (https://dbp8v2.s3.us-west-1.amazonaws.com/training_sample/)
* effectuer un préprocessing de ces images (extraction des features grâce à ResNet50, resizing des images (224x224), labelisation des images )
* effectuer une réduction de dimension (transformation des features en vecteur, détermination du nombre optimal de composantes principales, puis PCA)
* écrire le résultat de ce processing dans un fichier accessible sur AWS S3  (https://dbp8v2.s3.us-west-1.amazonaws.com/output_features_parquet/)