# Projet_09 : Réalisez un traitement dans un environnement Big Data sur le Cloud

# Etape 2 : Migrez votre chaîne de traitement dans le cloud
---

## Import des librairies

In [2]:
# start spark session

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [1]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf

from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1738920115233_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
## Définition des emplacements :

In [3]:
PATH = 's3://jlegal-bucket'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://jlegal-bucket
PATH_Data:   s3://jlegal-bucket/Test
PATH_Result: s3://jlegal-bucket/Results

---
---
## 1.2 - Chargement des données

Les images sont chargées au format binaire, ce qui offre, <br />
plus de souplesse dans la façon de prétraiter les images.

Avant de charger les images, nous spécifions que nous voulons charger <br />
uniquement les fichiers dont l'extension est **jpg**.

Nous indiquons également de charger tous les objets possibles contenus <br />
dans les sous-dossiers du dossier communiqué.

In [4]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>Je ne conserve que le **path** de l'image et j'ajoute une colonne contenant les **labels** de chaque image</u> :

In [5]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------+--------------+
|path                                                |label         |
+----------------------------------------------------+--------------+
|s3://jlegal-bucket/Test/Apple Braeburn/r_326_100.jpg|Apple Braeburn|
|s3://jlegal-bucket/Test/Apple Braeburn/r_4_100.jpg  |Apple Braeburn|
|s3://jlegal-bucket/Test/Apple Braeburn/r_8_100.jpg  |Apple Braeburn|
|s3://jlegal-bucket/Test/Apple Braeburn/r_324_100.jpg|Apple Braeburn|
|s3://jlegal-bucket/Test/Apple Braeburn/r_327_100.jpg|Apple Braeburn|
+----------------------------------------------------+--------------+
only showing top 5 rows

None

---
---
## 1.3 - Préparation du modèle :

### 1.3.1 - Chargement du modèle : **MobileNetV2**

In [6]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5
[1m14536120/14536120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step

---
### 1.3.2 - Création de notre modèle sans la dernière couche :

In [7]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Affichage du résumé de notre nouveau modèle où nous constatons <br />
que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [8]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃ Param # ┃ Connected to         ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━┩
│ input_layer         │ (None, 224, 224,  │       0 │ -                    │
│ (InputLayer)        │ 3)                │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │     864 │ input_layer[0][0]    │
│                     │ 32)               │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │     128 │ Conv1[0][0]          │
│ (BatchNormalizatio… │ 32)               │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │       0 │ 

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. <br />
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser <br />
ensuite les poids aux différents workeurs.

In [9]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Mettons cela sous forme de fonction</u> :

In [10]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
---
## 1.4 - Featurisation :

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

### 1.4.1 - Fonctions :

In [11]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



---
### 1.4.2 - Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), <br />
peuvent rencontrer des erreurs de type Out Of Memory (OOM).<br />

**Commande pour palier au problème de OOM :**  
*(A tester si besoin de garder cette commande dans le cloud)*

In [12]:
#spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" <br />
contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [14]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://jlegal-bucket/Results

<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [15]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
### 1.4.3 - Vérification des résultats :

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [16]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>On affiche les 5 premières lignes du DataFrame</u> :

In [17]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://jlegal-bucket/Test/Apple Braeburn/r_35_10...  ...  [0.639037, 0.036415372, 0.0, 0.0, 0.0, 0.86569...
1  s3://jlegal-bucket/Test/Apple Braeburn/r_33_10...  ...  [0.4867081, 0.0, 0.0, 0.0, 0.0, 0.59428054, 0....
2  s3://jlegal-bucket/Test/Apple Braeburn/r_63_10...  ...  [0.91799986, 0.25001913, 0.0, 0.0, 0.0, 0.4715...
3  s3://jlegal-bucket/Test/Apple Braeburn/r_65_10...  ...  [0.64750093, 0.21974614, 0.0, 0.0, 0.0, 0.5579...
4  s3://jlegal-bucket/Test/Apple Braeburn/r_77_10...  ...  [1.6727263, 0.09295292, 0.0, 0.0, 0.36180583, ...

[5 rows x 3 columns]

In [18]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(164, 3)

<u>On valide que la dimension du vecteur de caractéristiques des images est bien de dimension 1280</u> :

In [19]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

---
---
## 1.5 - Création du Pipeline :

Afin de gagner en efficacité, j'intègre tout le processus dans un pipeline :  
**featurisation** + **vectorisation** + **PCA**

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Utilisation de `VectorAssembler` pour la production, plus efficace en pipeline.

In [21]:
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

Idem précédemment, il faut convertir la coonne "features" en Vecteur Dense.

In [24]:
# array to dense vector
to_vector_udf = udf(lambda x: Vectors.dense(x), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
features_df = features_df.withColumn("features_vec", to_vector_udf(col("features")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features_vec: vector (nullable = true)

In [27]:
# convert "features" array in vector
vector_assembler = VectorAssembler(inputCols=["features_vec"], outputCol="features_vec_final")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# PCA
k_optimal = 186
pca = PCA(k=k_optimal, inputCol="features_vec_final", outputCol="pca_features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# pipeline
pipeline = Pipeline(stages=[vector_assembler, pca])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test du pipeline :

In [30]:
# fit 
pipeline_model = pipeline.fit(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# transform
df_pca = pipeline_model.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
df_pca.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features_vec: vector (nullable = true)
 |-- features_vec_final: vector (nullable = true)
 |-- pca_features: vector (nullable = true)

In [33]:
df_pca.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|                path|         label|            features|        features_vec|  features_vec_final|        pca_features|
+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|s3://jlegal-bucke...|Apple Braeburn|[0.639037, 0.0364...|[0.63903701305389...|(1280,[0,1,5,6,7,...|[-2.8049857102105...|
|s3://jlegal-bucke...|Apple Braeburn|[0.4867081, 0.0, ...|[0.48670810461044...|(1280,[0,5,6,8,9,...|[-4.2871749218263...|
|s3://jlegal-bucke...|Apple Braeburn|[0.91799986, 0.25...|[0.91799986362457...|(1280,[0,1,5,6,7,...|[-3.4103309966119...|
|s3://jlegal-bucke...|Apple Braeburn|[0.64750093, 0.21...|[0.64750093221664...|(1280,[0,1,5,6,7,...|[-3.6720595651866...|
|s3://jlegal-bucke...|Apple Braeburn|[1.6727263, 0.092...|[1.67272627353668...|(1280,[0,1,4,5,6,...|[-3.4438274100437...|
+--------------------+--

Enregistrement en fichiers "parquet" :

In [34]:
df_pca.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Possibilité de sauvegarder le pipeline et de le recharger ensuite :

In [35]:
# save pipeline
pipeline_model.save(PATH +"/model/model_pca")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Recharge avec :

In [None]:
from pyspark.ml import PipelineModel
pipeline_model_loaded = PipelineModel.load(PATH + "/model/model_pca")