# Projet 8: Déployer un modèle dans le cloud

# I/ Import des librairies et des packages
Les packages nécessaires ont été installé via l'étape de bootstrap à l'instanciation du serveur.

In [2]:
# librairies classiques
import io
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

# librairies pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, IntegerType, FloatType, StringType
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler,PCA

#librairies tensorflow
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# II/ Configuration

In [6]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1676364475178_0001,pyspark,idle,Link,Link,,✔


In [7]:
sc = spark.sparkContext

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Définition des PATH pour charger les images et enregistrer les résultats

In [8]:
PATH = 's3://oc-projet8-fruits'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Resultats' 
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://oc-projet8-fruits
PATH_Data:   s3://oc-projet8-fruits/Test
PATH_Result: s3://oc-projet8-fruits/Resultats

## III/ Traitement des données

###  Chargement des données

In [9]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://oc-projet8-f...|2023-02-06 22:43:21|  7353|[FF D8 FF E0 00 1...|
|s3://oc-projet8-f...|2023-02-06 22:43:21|  7350|[FF D8 FF E0 00 1...|
|s3://oc-projet8-f...|2023-02-06 22:43:21|  7349|[FF D8 FF E0 00 1...|
|s3://oc-projet8-f...|2023-02-06 22:43:21|  7348|[FF D8 FF E0 00 1...|
|s3://oc-projet8-f...|2023-02-06 22:43:22|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

Je ne conserve que le path de l'image et j'ajoute une colonne contenant les labels de chaque image :

In [11]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------+----------+
|path                                                |label     |
+----------------------------------------------------+----------+
|s3://oc-projet8-fruits/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://oc-projet8-fruits/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://oc-projet8-fruits/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://oc-projet8-fruits/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://oc-projet8-fruits/Test/Watermelon/r_95_100.jpg |Watermelon|
+----------------------------------------------------+----------+
only showing top 5 rows

None

In [12]:
print(images.first().label)
Image.open(io.BytesIO(images.first().content))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Watermelon
<PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=100x100 at 0x7FB0A03180D0>

 # IV/ Analyse

 ## Préparation du modèle

In [13]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [14]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

> Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. 
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser 
ensuite les poids aux différents workeurs.

In [15]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [17]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [18]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### Exécutions des actions d'extractions de features

In [19]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Reduction de dimension PCA

In [21]:
# Convert to dense vector format (Les données images sont converties au format vecteur dense)
Vect_udf = udf(lambda x: Vectors.dense(x), VectorUDT())
features_df = features_df.withColumn('features_Vector', Vect_udf('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Standardization
scaler = StandardScaler(inputCol='features_Vector',outputCol='features_Std',
                        withStd=True, withMean=True).fit(features_df)
df_scaler = scaler.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Algorithm training
k=20
pca = PCA(k=k,inputCol='features_Std',outputCol='features_Pca').fit(df_scaler)

# Transformation of images on the first k components
df_pca = pca.transform(df_scaler)
df_pca = df_pca.filter(df_pca.features_Pca.isNotNull())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df_pca.select(col('features'),col('features_Vector'),col('features_Std'),col('features_Pca')).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+
|            features|     features_Vector|        features_Std|        features_Pca|
+--------------------+--------------------+--------------------+--------------------+
|[0.65066105, 0.23...|[0.65066105127334...|[0.44830505126157...|[-17.287628224922...|
|[0.03623737, 0.15...|[0.03623737022280...|[-0.6902510463981...|[-13.025205459359...|
|[0.015393682, 4.6...|[0.01539368182420...|[-0.7288753853613...|[-9.9118468454753...|
|[0.0, 4.519898, 0...|[0.0,4.5198979377...|[-0.7574006047611...|[-12.964930646664...|
|[0.0, 4.8245797, ...|[0.0,4.8245797157...|[-0.7574006047611...|[-6.2448406512363...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

In [25]:
df_features = df_pca.select(col('features'),col('features_Vector'),col('features_Std'),col('features_Pca'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 # V/ Résultats (S3)

In [26]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://oc-projet8-fruits/Resultats

In [24]:
#features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
#save features in parquet format
df_pca.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…