# Déploiement de la solution sur le cloud

Maintenant que nous avons vérifié que notre solution fonctionne, <br />
il est temps de la <u>déployer à plus grande échelle sur un vrai cluster de machines</u>.

## Exécution du code

Je décide d'exécuter cette partie du code depuis **JupyterHub hébergé sur notre cluster EMR**.<br />

<u>Avant de commencer</u>, il faut s'assurer d'utiliser le **kernel pyspark**.

**En utilisant ce kernel, une session spark est créé à l'exécution de la première cellule**. <br />
Il n'est donc **plus nécessaire d'exécuter le code "spark = (SparkSession ..."** comme lors <br />
de l'exécution de notre notebook en local sur notre VM Ubuntu.

### 1. Démarrage de la session Spark

L’application Spark est contrôlée grâce à un processus de pilotage (driver process) appelé **SparkSession**. <br />
<u>Une instance de **SparkSession** est la façon dont Spark exécute les fonctions définies par l’utilisateur <br />
dans l’ensemble du cluster</u>. <u>Une SparkSession correspond toujours à une application Spark</u>.

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1708632367491_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1708632367491_0002,pyspark,idle,Link,Link,,✔


### 2. Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 3. Import des librairies

In [3]:
import pandas as pd

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
#import pandas as pd
import numpy as np
import io
import os
from PIL import Image

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import PandasUDFType, col, element_at, pandas_udf, split, udf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4. Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [13]:
PATH = 's3://p8-data-juliesbo'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/ResultsPCA'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8-data-juliesbo
PATH_Data:   s3://p8-data-juliesbo/Test
PATH_Result: s3://p8-data-juliesbo/ResultsPCA

### 5. Traitement des données

#### 5.1 Chargement des données

In [29]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data-juli...|2024-02-19 14:34:10|  7353|[FF D8 FF E0 00 1...|
|s3://p8-data-juli...|2024-02-19 14:34:10|  7350|[FF D8 FF E0 00 1...|
|s3://p8-data-juli...|2024-02-19 14:34:10|  7349|[FF D8 FF E0 00 1...|
|s3://p8-data-juli...|2024-02-19 14:34:10|  7348|[FF D8 FF E0 00 1...|
|s3://p8-data-juli...|2024-02-19 14:34:10|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>J'ajoute une colonne contenant les **labels** de chaque image</u> :

In [31]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------+----------+
|path                                               |label     |
+---------------------------------------------------+----------+
|s3://p8-data-juliesbo/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data-juliesbo/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data-juliesbo/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data-juliesbo/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data-juliesbo/Test/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------------+----------+
only showing top 5 rows

None

<u>Je supprime les colonnes contenant le **path**, **date** et la **longueur** de chaque image</u> :

In [32]:
images = images.drop('modificationTime', 'length','path')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+
|             content|     label|
+--------------------+----------+
|[FF D8 FF E0 00 1...|Watermelon|
|[FF D8 FF E0 00 1...|Watermelon|
|[FF D8 FF E0 00 1...|Watermelon|
|[FF D8 FF E0 00 1...|Watermelon|
|[FF D8 FF E0 00 1...|Watermelon|
+--------------------+----------+
only showing top 5 rows

#### 5.2 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images.<br />
J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée <br />
à d'autres modèles comme *VGG16* par exemple.


Il existe une dernière couche qui sert à classer les images <br />
selon 1000 catégories que nous ne voulons pas utiliser.<br />
L'idée dans ce projet est de récupérer le **vecteur de caractéristiques <br />
de dimensions (1,1,1280)** qui servira, plus tard, au travers d'un moteur <br />
de classification à reconnaitre les différents fruits du jeu de données.

Comme d'autres modèles similaires, **MobileNetV2**, lorsqu'on l'utilise <br />
en incluant toutes ses couches, attend obligatoirement des images <br />
de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), <br />
nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** <br />
    issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec:
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [17]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [18]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Affichage du résumé de notre nouveau modèle où nous constatons <br />
que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [20]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. <br />
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser <br />
ensuite les poids aux différents workeurs.

<u>Mettons cela sous forme de fonction</u> :

In [19]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 5.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [36]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    # Resizes the image to a fixed size of 224x224 pixels.
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    # Converts the image to a NumPy array using img_to_array.
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    # Calls the preprocess function for each image in the Series.
    # Stacks the preprocessed images into a single NumPy array.
    input = np.stack(content_series.map(preprocess))
    #Feeds the array into the provided model for prediction
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches. This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

# User Defined Function
# Takes an array as input and converts it into a Dense Vector using Vectors.dense from the Spark MLlib library.
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 5.4 Exécutions des actions d'extractions de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), <br />
peuvent rencontrer des erreurs de type Out Of Memory (OOM).<br />
Si vous rencontrez de telles erreurs dans la cellule ci-dessous, <br />
essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet <br />
et je laisse donc la commande en commentaire.

In [19]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.<br />

Notre jeu de données de **Test** contient **22819 images**. <br />

<u>Je crée la colonne **vector** à l aide des fonctions ci-dessus et je supprime les variables **features** et **content**</u> :
    

In [37]:
features_df = images.withColumn("features",featurize_udf("content"))
features_df = features_df.withColumn('vectors',to_vector("features"))
features_df = features_df.drop('features','content')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
features_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|     label|             vectors|
+----------+--------------------+
|Watermelon|[0.0,0.2339717894...|
|Watermelon|[0.08841577917337...|
|Watermelon|[0.0,0.4033161997...|
|Watermelon|[0.00337623083032...|
|Watermelon|[0.00501129589974...|
|Watermelon|[0.41546595096588...|
|Watermelon|[0.11047153174877...|
|Watermelon|[0.15326495468616...|
|Watermelon|[0.04279938340187...|
|Watermelon|[0.12482067942619...|
|Watermelon|[0.18327464163303...|
|Watermelon|[0.02352689206600...|
|Watermelon|[0.02950812317430...|
|Watermelon|[0.0,0.9346365928...|
|Watermelon|[0.18860186636447...|
|Watermelon|[1.14287447929382...|
|Watermelon|[0.60736495256423...|
|Watermelon|[1.24986505508422...|
|Watermelon|[0.00234450120478...|
|Watermelon|[0.01771630160510...|
+----------+--------------------+
only showing top 20 rows

#### 5.5 Réalisation d'une reduction de dimension PCA

L'analyse en composantes principales (PCA) est une technique de réduction de dimensionnalité couramment utilisée en apprentissage automatique, notamment pour la classification d'images. La PCA permet de représenter les données dans un espace de dimension inférieure tout en préservant autant que possible la variance des données d'origine. 

In [40]:
data = features_df[['label','vectors']]
pca = PCA(k=6, inputCol="vectors",outputCol="pca")
model = pca.fit(data)
pca_data = model.transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
pca_data=pca_data.withColumn('pca',col('pca').cast('string'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 6. Ecriture des resultats

In [42]:
pca_data.select(pca_data['label'],pca_data['pca']).write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 7. Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [44]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>On affiche les 5 premières lignes du DataFrame</u> :

In [45]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        label                                                pca
0  Watermelon  [-2.4891792994210356,3.8066505945734748,-5.252...
1  Watermelon  [-2.6730127737098064,4.782834441916454,-5.4078...
2  Watermelon  [-2.179505215397753,3.5839903482152082,-5.6857...
3  Watermelon  [-1.8046526522991992,3.3591732109843133,-6.165...
4  Watermelon  [-2.865687257925557,5.4260489946068375,-5.0637...

<u>On valide que la dimension du vecteur de caractéristiques des images est bien de dimension 1280</u> :

In [25]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [26]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 3)

On a donc bien généraliser le processus en déployant notre solution <br />
sur un réel cluster de machines et en travaillant sur la totalité <br />
des 22688 images de notre dossier "Test".
On pourra donc bien anticiper une future augmentation de la charge de travail.

Il nous sera facile de faire face à une monté de la charge de travail <br /> en redimensionnant simplement notre cluster de machines <br />(horizontalement et/ou verticalement au besoin), les coûts augmenteront  <br /> en conséquence mais resteront nettement inférieurs aux coûts <br /> engendrés par l'achat de matériels ou par la location de serveurs dédiés.