# <center> <font color="#8333ff"> Projet8 : Déployez un modèle dans le cloud </font></center>

## Problématique : 

Vous êtes Data Scientist dans une très jeune start-up de l'AgriTech, nommée  **"Fruits!"**, qui cherche à proposer des 
solutions innovantes pour la récolte des fruits.
Cette entreprise souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application
mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.
Le développement de l’application mobile permettra de construire **une première version de l'architecture Big Data 
nécessaire.**

## Mission :
Vous êtes donc chargé de développer dans un environnement Big Data une première chaîne de traitement des données qui 
comprendra le preprocessing et une étape de réduction de dimension.

## <font color="blue">Sommaire</font>

1. [Point de départ : la session Spark](#section_1)   
2. [Chargement et Lecture des données](#section_2)              
3. [Utilisation d'un modèle pré-entrainé ResNet50 ](#section_3) </br>
4. [Préprocessing et création des features](#section_4) </br>
5. [Réduction de dimension](#section_5) </br>
6. [Sauvegarde des données au format paquet ](#section_6) </br>


Dans ce notebook, nous allons manipuler un faible jeu de données, de 10 images seulement, pour tester si nos codes fonctionnent,
puis nous augmenteront le nombre d'images. 

Commençons par télécharger nos images à l'aide de awscli.<br>
Nous allons utilisé la commande suivante pour créer un dossier dans notre s3 : **aws s3 mb "s3://projet8-fruits-s3v2"** <br>
Puis nous chargerons (et mettons à jour en maintenant) s3 avec le dossier contenant nos images en local en utilisant la 
commande suivante :<br>
    **aws s3 sync "C:\Users\laila\images" "s3://projet8-fruits-s3v2"** <br>


In [1]:
%%info

## <font color="red" id="section_1">1.  Point de départ : la session Spark </font>

On importe aussi la librairie nécessaire pour notre projet.

In [2]:
from pyspark.sql.functions import *

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1667647930120_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#Pyspark
from pyspark import SparkContext, SparkConf

from pyspark.ml.feature import StandardScaler, StringIndexer

from pyspark.sql import Window
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, IntegerType, FloatType, StringType
from pyspark.sql.functions import split, col, udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import element_at
from pyspark.sql.functions import monotonically_increasing_id, row_number

from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <font color="red" id="section_2">2. Chargement et Lecture des données </font>

Nous allons charger et lire les données, contenues dans s3, dans un spark dataframe que nous nommerons data_img.

In [4]:
#Lecture des données images 
data_img = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load("s3a://projet8-fruits-s3v2/fruits/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nous allons maintenant exécuter cette commande en montrant cette dataframe.

In [5]:
data_img.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3a://projet8-fru...|2022-11-05 11:10:07|  6867|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  6855|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  6852|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  6831|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  6828|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  5688|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  5684|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  5675|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  5648|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|2022-11-05 11:10:07|  5615|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only s

In [6]:
#On ajoute une nouvelle colonne 'fruit' à l'aide du nom du fruit présent dans 'path'
data_img = data_img.withColumn('fruit', element_at(split(data_img['path'], '/'),-2))

#On affiche le schema de nos manipulations
print(data_img.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- fruit: string (nullable = true)

None

In [7]:
#On ne sélectionne que "path","fruit","content" dans notre spark dataframe
data_img = data_img.select('path','fruit','content')
data_img.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+
|                path|     fruit|             content|
+--------------------+----------+--------------------+
|s3a://projet8-fru...|Watermelon|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Watermelon|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Watermelon|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Watermelon|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Watermelon|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Strawberry|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Strawberry|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Strawberry|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Strawberry|[FF D8 FF E0 00 1...|
|s3a://projet8-fru...|Strawberry|[FF D8 FF E0 00 1...|
+--------------------+----------+--------------------+
only showing top 10 rows

In [8]:
#On vérifie le nombre d'images
data_img.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

61

In [9]:
from PIL import Image
import io

#collect permet simplement de lancer l’évaluation effective des transformations en amont.
collect = data_img.select('content').collect()

#image située dans la première ligne, première colonne
image1 = collect[0][0]
image_1 =Image.open(io.BytesIO(image1 ))
image_1


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=100x100 at 0x7F8A827B0990>

In [10]:
#Taille de l'image 1
image_1.size

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(100, 100)

## <font color="red" id="section_3"> 3. Utilisation d'un modèle pré-entrainé MobileNetV2 </font>

Nous allons utiliséun algotirhme de transfer learning, mais aussi modèle pré-entrainé 
sur une multitude d'images puisque nous ne devons pas, dans ce projet, entrainé un modèle.
Prenons le modèle CNN Réseau de neurones MobileNetV2.

Les modèles de type MobileNet sont conçus pour fonctionner sur des appareils à fortes contraintes en termes de 
performances tels que les smartphones. MobileNetV2 est un modèle très efficace pour la détection et la segmentation 
d'objets.

MobileNetV2 est un réseau neuronal convolutif. 
Cette fonction renvoie un modèle de classification d'images Keras, éventuellement chargé avec des poids pré-formés sur ImageNet.
Vous pouvez charger une version pré-entraînée du réseau formée sur plus d'un million d'images à partir 
de la base de données ImageNet [1] . Le réseau pré-entraîné peut classer les images en 1000 catégories d'objets,
telles que les fruits, le clavier, la souris, le crayon et de nombreux animaux. En conséquence, le réseau a appris des
représentations de caractéristiques riches pour une large gamme d'images. Le réseau a une taille d'entrée d'image
de 224 par 224. 



In [11]:
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

#'imagenet' : pré-formation sur ImageNet
#pooling='avg' : la mise en commun moyenne globale sera appliquée à la sortie du dernier bloc convolutionnel,
#et donc la sortie du modèle sera un tenseur 2D.
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [13]:
from tensorflow.keras import Model

modelv2 = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# On vérifie que la couche supérieure est supprimée
modelv2.summary() 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [15]:
# distribution et transformation des données en leur attribuant un poids issu du modèle Resnet50
bc_model_weights = sc.broadcast(modelv2.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
#Fonction qui applique le modèle MobileNetV2 aux données
def model_fn():
    """ 
    Retourne un modèle MobileNetV2 sans la couche supérieure et avec diffusion des poids pré-entraînés
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    modelv2 = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    modelv2.set_weights(bc_model_weights.value)
    return modelv2

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <font color="red" id="section_4"> 4. Preprocessing et création des features </font>

Nous allons redimensionner les images en taille 224x224 car cela est nécessaire pour appliquer le modèle de transfer 
learning.

Je me suis inspirée des scripts présent dans :
    "https://docs.databricks.com/machine-learning/preprocess-data/transfer-learning-tensorflow.html"

In [17]:
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications.resnet50 import preprocess_input

def preprocess(content):
    '''
    - Retourne des images prétraités pour la prédiction
    '''
    image = (Image.open(io.BytesIO(content))).resize([224, 224])
    array = img_to_array(image)
    return preprocess_input(array)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
import numpy as np
import pandas as pd
from pyspark.sql.functions import  col, pandas_udf, PandasUDFType
from pyspark.sql.types import ArrayType, IntegerType, FloatType, StringType

def featurize_series(model, content_series):
    '''
    - Extraction des features d'un pd.Series d'images brutes à l'aide du modèle d'entrée
    - Retourne un pd.Série de features des images
    '''
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # Pour certaines couches, les entités de sortie seront des tenseurs multidimensionnels.
    # Nous aplatissons les tenseurs de caractéristiques en vecteurs pour un stockage plus facile dans Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

#On applique le modèle dans un pandas UDF
@pandas_udf('array<float>', functionType=PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    Cette méthode est une UDF Scalar Iterator pandas enveloppant la fonction d'extraction des features.
    - Prend en entrée "content_series_iter", un itérateur sur des lots de données, où chaque lot est une
      série de pandas de données d'image.
    - Retourne une colonne Spark DataFrame de type ArrayType(FloatType).
    '''
    # Avec Scalar Iterator pandas UDFs, nous pouvons charger le modèle une fois puis le réutiliser
    # pour plusieurs lots de données. Cela amortit les frais généraux liés au chargement de gros modèles.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [57]:
# Les UDF de Pandas sur de grands enregistrements (par exemple, de très grandes images) peuvent rencontrer des erreurs de type Out Of Memory (OOM).
# Si vous rencontrez de telles erreurs dans la cellule ci-dessous, essayez de réduire la taille du lot Arrow via `maxRecordsPerBatch
#spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
#Regardons le nombre de partitions de images
data_img.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

In [20]:
#On garde les colonnes 'path', 'fruit','features' que l'on récupère à l'issue du préprocessing
data_features = data_img.repartition(2).select(col('path'),col('fruit'),featurize_udf("content").alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
#On affiche les 10 premières lignes
data_features.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         fruit|            features|
+--------------------+--------------+--------------------+
|s3a://projet8-fru...|          Kiwi|[0.67154574, 0.19...|
|s3a://projet8-fru...|Apple Braeburn|[0.024972016, 0.9...|
|s3a://projet8-fru...|    Strawberry|[0.7662081, 0.601...|
|s3a://projet8-fru...|    Watermelon|[0.54513586, 0.57...|
|s3a://projet8-fru...|         Cocos|[1.0977285, 0.278...|
|s3a://projet8-fru...|    Strawberry|[0.68121576, 0.73...|
|s3a://projet8-fru...|          Kiwi|[0.7037509, 0.229...|
|s3a://projet8-fru...|         Lemon|[0.38271257, 0.26...|
|s3a://projet8-fru...|         Cocos|[1.9467353, 0.331...|
|s3a://projet8-fru...|    Watermelon|[0.4738176, 0.062...|
+--------------------+--------------+--------------------+
only showing top 10 rows

## <font color="red" id="section_5">5. Reduction de dimensions </font>

## ACP

L'ACP forme un modèle pour projeter des vecteurs dans un espace dimensionnel inférieur
des k composants principaux principaux

Commençons par convertir les features en vecteurs de features.

In [22]:
from pyspark.ml.linalg import Vectors, VectorUDT

# Conversion au format vecteur dense
ud_f = udf(lambda x: Vectors.dense(x), VectorUDT())
data_vect = data_features.withColumn('features_vect', ud_f('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On applique la transformation PCA pour réduire les dimensions de notre base de données.

In [23]:
from pyspark.ml.feature import StandardScaler, PCA

#On standardise les features
scaler = StandardScaler(inputCol="features_vect")
scaler.setOutputCol("features_std")
data_fit=scaler.fit(data_vect)
data_sc = data_fit.transform(data_vect)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
data_sc.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+
|                path|         fruit|            features|       features_vect|        features_std|
+--------------------+--------------+--------------------+--------------------+--------------------+
|s3a://projet8-fru...|          Kiwi|[0.67154574, 0.19...|[0.67154574394226...|[1.19819261001848...|
|s3a://projet8-fru...|Apple Braeburn|[0.024972016, 0.9...|[0.02497201599180...|[0.04455584044505...|
|s3a://projet8-fru...|    Strawberry|[0.7662081, 0.601...|[0.76620811223983...|[1.36709212455513...|
|s3a://projet8-fru...|    Watermelon|[0.54513586, 0.57...|[0.54513585567474...|[0.97264819205188...|
|s3a://projet8-fru...|         Cocos|[1.0977285, 0.278...|[1.09772849082946...|[1.95860099983255...|
|s3a://projet8-fru...|    Strawberry|[0.68121576, 0.73...|[0.68121576309204...|[1.21544615616708...|
|s3a://projet8-fru...|          Kiwi|[0.7037509, 0.229...|[0.70375090837478...|[1.255654056

In [33]:
# Transformation PCA sur les k composantes principales à générer

pca = PCA(k=16,inputCol='features_std',outputCol='features_pca')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
#On entraine et transforme les données standardisées pour obtenir les données après transformation PCA
model_pca=pca.fit(data_sc)
data_pca = model_pca.transform(data_sc)
data_pca = data_pca.filter(data_pca.features_pca.isNotNull())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
#On affiche les 10 premières lignes de data_pca
data_pca.select(col('path'),col('fruit'),col('features'),col('features_vect'),col('features_std'),col('features_pca')).show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|                path|         fruit|            features|       features_vect|        features_std|        features_pca|
+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|s3a://projet8-fru...|          Kiwi|[0.67154574, 0.19...|[0.67154574394226...|[1.19819261001848...|[-13.998540821516...|
|s3a://projet8-fru...|Apple Braeburn|[0.024972016, 0.9...|[0.02497201599180...|[0.04455584044505...|[-3.1367429564872...|
|s3a://projet8-fru...|    Strawberry|[0.7662081, 0.601...|[0.76620811223983...|[1.36709212455513...|[-2.0658984368732...|
|s3a://projet8-fru...|    Watermelon|[0.54513586, 0.57...|[0.54513585567474...|[0.97264819205188...|[5.10726363301637...|
|s3a://projet8-fru...|         Cocos|[1.0977285, 0.278...|[1.09772849082946...|[1.95860099983255...|[5.08421000370752...|
|s3a://projet8-fru...|  

## Variance expliquée

In [36]:
variance = np.round(100*model_pca.explainedVariance.toArray(),2).cumsum()
variance

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

array([13.52, 24.77, 34.16, 42.54, 49.58, 55.85, 62.02, 67.2 , 72.  ,
       76.04, 79.85, 81.12, 82.28, 83.39, 84.34, 85.21])

## <font color="red" id="section_6">6. Sauvegarde des données au format paquet </font>

In [37]:
data_final = data_pca.select(col('path'),col('fruit'), col('features_pca'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
data_final.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         fruit|        features_pca|
+--------------------+--------------+--------------------+
|s3a://projet8-fru...|          Kiwi|[-13.998540821516...|
|s3a://projet8-fru...|Apple Braeburn|[-3.1367429564872...|
|s3a://projet8-fru...|    Strawberry|[-2.0658984368732...|
|s3a://projet8-fru...|    Watermelon|[5.10726363301637...|
|s3a://projet8-fru...|         Cocos|[5.08421000370752...|
|s3a://projet8-fru...|    Strawberry|[-1.4127839634763...|
|s3a://projet8-fru...|          Kiwi|[-13.123450934157...|
|s3a://projet8-fru...|         Lemon|[-10.912686464067...|
|s3a://projet8-fru...|         Cocos|[4.37252930719185...|
|s3a://projet8-fru...|    Watermelon|[2.55219826478972...|
|s3a://projet8-fru...|          Kiwi|[-9.5551930693832...|
|s3a://projet8-fru...|         Cocos|[3.73664935091293...|
|s3a://projet8-fru...|    Strawberry|[-2.2263926715440...|
|s3a://projet8-fru...|Apple Braeburn|[-3.2606053321817..

On enregistre les résultats de l'ACP (après modélisation) au format parquet.

In [39]:
data_final.write.mode("overwrite").parquet('s3a://projet8-fruits-s3v2/resultat/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…