# Déployez un modèle dans le cloud

## 1. Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1706185631653_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1706185631653_0001,pyspark,idle,Link,Link,✔


## 2. Import des librairies

In [4]:
import pandas as pd
from PIL import Image
import numpy as np
import io
from typing import Iterator
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import PCA
import pyarrow.parquet as pq
import s3fs
import boto3
import pyarrow

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3. Définition des PATH pour charger les images et enregistrer les résultats


In [5]:
PATH = 's3://camilleb-projet8'
PATH_Data = PATH+'/donneesimages'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://camilleb-projet8
PATH_Data:   s3://camilleb-projet8/donneesimages
PATH_Result: s3://camilleb-projet8/Results

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Affichage de 5 images
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://camilleb-pro...|2024-01-23 10:37:48|  6328|[FF D8 FF E0 00 1...|
|s3://camilleb-pro...|2024-01-23 10:37:39|  6322|[FF D8 FF E0 00 1...|
|s3://camilleb-pro...|2024-01-23 10:37:38|  6308|[FF D8 FF E0 00 1...|
|s3://camilleb-pro...|2024-01-23 10:38:36|  6304|[FF D8 FF E0 00 1...|
|s3://camilleb-pro...|2024-01-23 10:37:35|  6300|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [8]:
# Ajout d'une nouvelle colonne 'label' au dataframe images
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

# Impression des résultats
images.select('path','label').show(5,False)

# Impression du schéma du dataframe
print(images.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------------------------+--------------+
|path                                                          |label         |
+--------------------------------------------------------------+--------------+
|s3://camilleb-projet8/donneesimages/Apple Golden 1/114_100.jpg|Apple Golden 1|
|s3://camilleb-projet8/donneesimages/Apple Golden 1/103_100.jpg|Apple Golden 1|
|s3://camilleb-projet8/donneesimages/Apple Golden 1/101_100.jpg|Apple Golden 1|
|s3://camilleb-projet8/donneesimages/Apple Golden 1/96_100.jpg |Apple Golden 1|
|s3://camilleb-projet8/donneesimages/Apple Golden 1/100_100.jpg|Apple Golden 1|
+--------------------------------------------------------------+--------------+
only showing top 5 rows

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None

In [9]:
# Vérification du nombre d'images (779 attendu) : 
images.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

779

## 4. Création du modèle

In [10]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [11]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    """
    Génère des caractéristiques à partir d'un itérateur sur des lots de données d'images en utilisant un modèle pré-chargé.

    Args:
        content_series_iter (Iterator[pandas.Series]): Un itérateur sur des lots de données, où chaque lot
                                                      est une série Pandas de données d'image.

    Yields:
        pandas.Series: Une série Pandas contenant les caractéristiques générées pour chaque lot d'images.
    """
    
    # Avec les Pandas UDF de type Scalar Iterator, nous pouvons charger le modèle une fois et le réutiliser
    # pour plusieurs lots de données. Cela amortit les frais généraux de chargement de gros modèles.
   
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## 5. Extraction des features

In [17]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Visualisation des 5 premières lignes du DataFrame obtenu : 
features_df.show(5, truncate=True)

# Vérification du nombre d'images (779 attendues) : 
print(f"Nombre d'images : {features_df.count()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+
|                path|             label|            features|
+--------------------+------------------+--------------------+
|s3://camilleb-pro...|    Apple Golden 1|[0.0, 0.026634023...|
|s3://camilleb-pro...|Apple Crimson Snow|[0.0, 0.0, 0.0, 0...|
|s3://camilleb-pro...|Apple Crimson Snow|[0.03941424, 0.0,...|
|s3://camilleb-pro...|Apple Crimson Snow|[0.0, 0.0, 0.0, 0...|
|s3://camilleb-pro...|Apple Crimson Snow|[0.010371416, 0.0...|
+--------------------+------------------+--------------------+
only showing top 5 rows

Nombre d'images : 779

## 6. Réduction de dimensions
A l'aide d'une PCA avec 59 composantes.

Les 59 composantes ayant étauent définies lors du test local, permettant d'atteindre 95% de la variance expliquée.

La colonne features étant de type array, nous aurons besoin pour cette étape de la transformer en type vecteur.

In [19]:
# Création d'une fonction de conversion de la colonne 'features' en vecteur : 
features_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Application de la fonction au DataFrame et création d'une nouvelle colonne : 
features_df = features_df.withColumn("features_vector", features_to_vector_udf("features"))

# Création d'un modèle PCA avec les 59 composantes principales pour atteindre 95% de la variance : 
pca = PCA(k=59, inputCol="features_vector", outputCol="vectorized_components_pca_features")

# Application de la PCA sur le DataFrame : 
pca = pca.fit(features_df)
features_df = pca.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Affichage des 5 premières lignes :
features_df.show(5, truncate=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+--------------------+----------------------------------+
|                path|             label|            features|     features_vector|vectorized_components_pca_features|
+--------------------+------------------+--------------------+--------------------+----------------------------------+
|s3://camilleb-pro...|    Apple Golden 1|[0.0, 0.026634023...|[0.0,0.0266340225...|              [7.18137546892192...|
|s3://camilleb-pro...|Apple Crimson Snow|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|              [-4.7984835453983...|
|s3://camilleb-pro...|    Apple Golden 1|[0.0, 0.010257924...|[0.0,0.0102579239...|              [7.39648465460412...|
|s3://camilleb-pro...|    Apple Golden 1|[0.041660447, 0.0...|[0.04166044667363...|              [5.89570597208540...|
|s3://camilleb-pro...|    Apple Braeburn|[0.5408363, 0.207...|[0.54083627462387...|              [-10.445465956631...|
+--------------------+------------------+-------

In [21]:
# Restructuration des vecteurs composantes PCA en array : 

# Fonction de conversion vector to array : 
vector_to_array_udf = udf(lambda vec: vec.toArray().tolist(), ArrayType(FloatType()))

# Application de la fonction pour créer une nouvelle colonne pca_features : 
features_df = features_df.withColumn("pca_features", vector_to_array_udf("vectorized_components_pca_features"))

# Création du DataFrame final : 
final_df = features_df.select("path", "label", "pca_features")
final_df.show(5)
final_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+
|                path|             label|        pca_features|
+--------------------+------------------+--------------------+
|s3://camilleb-pro...|Apple Crimson Snow|[-5.513868, 1.751...|
|s3://camilleb-pro...|Apple Crimson Snow|[-3.6820664, 2.13...|
|s3://camilleb-pro...|Apple Crimson Snow|[-10.743139, -4.1...|
|s3://camilleb-pro...|Apple Crimson Snow|[-9.877423, -6.95...|
|s3://camilleb-pro...|Apple Crimson Snow|[-7.8800607, -5.7...|
+--------------------+------------------+--------------------+
only showing top 5 rows

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [22]:
# Enregistrement des données :
final_df.write.mode("overwrite").parquet(PATH_Result)
print(f'Données enregistrées dans : {PATH_Result}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Données enregistrées dans : s3://camilleb-projet8/Results

## 7. <a id='toc7_'></a>[Validation des résultats](#toc0_)
### 7.1. <a id='toc7_1_'></a>[Chargement des données](#toc0_)

In [23]:
# Chargement des données depuis path_result : 
df = pd.read_parquet(PATH_Result, engine='pyarrow')

print(f'Dimension de df : {df.shape}')
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dimension de df : (779, 3)
                                                path  ...                                       pca_features
0  s3://camilleb-projet8/donneesimages/Apple Gold...  ...  [7.88964, -0.3597631, 2.2718263, 6.529913, -3....
1  s3://camilleb-projet8/donneesimages/Apple Crim...  ...  [-6.3116193, 0.05037647, 0.4182736, -3.408764,...
2  s3://camilleb-projet8/donneesimages/Apple Crim...  ...  [-4.2573357, 1.65644, -0.42834908, -7.437944, ...
3  s3://camilleb-projet8/donneesimages/Apple Crim...  ...  [-9.877423, -6.9510164, 0.1374369, 5.4345984, ...
4  s3://camilleb-projet8/donneesimages/Apple Crim...  ...  [-7.8800607, -5.7731857, 0.19645865, 4.6762943...

[5 rows x 3 columns]

In [24]:
# Validation de la dimension des pca_features :
print(f"Dimension des pca_features : {df.loc[0, 'pca_features'].shape}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dimension des pca_features : (59,)

### 7.2. <a id='toc7_2_'></a>[Création d'un colonne par composante](#toc0_)

In [25]:
columns = []
nbr_composantes = 59

# Récupération des series de composates :
for i in range(nbr_composantes):
    columns.append(pd.Series(df['pca_features'].apply(lambda x: x[i]), name=f'pca_feature_{i+1}'))

# Concaténation des colonnes au DataFrame df :
df = pd.concat([df] + columns, axis=1)

# Suppression de pca_features :
df = df.drop('pca_features', axis=1)

# Affichage :
print(f"Dimension de df : {df.shape}")
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dimension de df : (779, 61)
                                                path  ... pca_feature_59
0  s3://camilleb-projet8/donneesimages/Apple Gold...  ...       0.783105
1  s3://camilleb-projet8/donneesimages/Apple Crim...  ...      -0.045493
2  s3://camilleb-projet8/donneesimages/Apple Crim...  ...       0.206825
3  s3://camilleb-projet8/donneesimages/Apple Crim...  ...       1.014006
4  s3://camilleb-projet8/donneesimages/Apple Crim...  ...       0.210727

[5 rows x 61 columns]

In [26]:
# Répartition par labels :
df['label'].value_counts()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Apple Braeburn        164
Apricot               164
Apple Golden 1        160
Apple Crimson Snow    148
Avocado               143
Name: label, dtype: int64

### 7.3. <a id='toc7_3_'></a>[Sauvegarde des résultats](#toc0_)
Sauvegarde du DataFrame au format CSV dans le bucket s3

In [27]:
# Enregistrement du DataFrame en tant que fichier CSV sur S3
df.to_csv(PATH_Result + '/df_results_cloud.csv', index=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Vérification de l'enregistrement : 
df = pd.read_csv(PATH_Result + '/df_results_cloud.csv')

# Affichage des 5 premières lignes : 
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ... pca_feature_59
0  s3://camilleb-projet8/donneesimages/Apple Gold...  ...       0.783105
1  s3://camilleb-projet8/donneesimages/Apple Crim...  ...      -0.045493
2  s3://camilleb-projet8/donneesimages/Apple Crim...  ...       0.206825
3  s3://camilleb-projet8/donneesimages/Apple Crim...  ...       1.014006
4  s3://camilleb-projet8/donneesimages/Apple Crim...  ...       0.210727

[5 rows x 61 columns]