# <a style='color:blue'>Réalisez un traitement dans un environnement Big Data sur le Cloud

- <a href="#C11">Démarrage de Spark</a>
- <a href="#C12">Installation des packages</a>
- <a href="#C13">Import des librairies</a>
- <a href="#C14">Définition des paths</a>    
- <a href="#C15">Extraction de features à partir du modèle MobileNetv2</a>
    - <a href="#C151">Chargement des données - conversion en binaire</a>
    - <a href="#C152">Préparation du modèle</a>
    - <a href="#C153">Définition du processus du chargement des images et application de leur featurisation à travers l'utilisation de Pandas UDF</a>
    - <a href="#C154">Exécution de l'extraction de features/a>
- <a href="#C16">Chargement des données enregistrées et validation des résultats</a>
- <a href="#C17">ACP</a>
    - <a href="#C171">Détermination du nombre de composantes pour expliquer 80% de la variance</a>
    - <a href="#C172">Application de l'ACP et enregistrement des données dans S3</a>

# <a name="C11" a style='color:blue'>Démarrage de Spark</a>

In [1]:
# Avant de commencer, il faut s'assurer d'utiliser le kernel pyspark
# L'exécution de cette cellule démarre l'application Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1626050279029_0001,pyspark,idle,Link,Link,✔


# <a name="C12" a style='color:blue'>Installation des packages</a>

Les packages nécessaires ont été installé via l'étape de bootstrap à l'instanciation du serveur.

# <a name="C13" a style='color:blue'>Import des librairies</a>

In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# <a name="C14" a style='color:blue'>Définition des paths</a>

Nous accédons directement à nos données sur S3 comme si elles étaient stockées localement.

In [4]:
PATH = 's3://p9-data-vincent'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results_Test'
PATH_Result_ACP = PATH+'/Results_Test_ACP'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result+'\nPATH_Result_ACP: '+PATH_Result_ACP
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8-data
PATH_Data:   s3://p8-data/Test
PATH_Result: s3://p8-data/Results

# <a name="C15" a style='color:blue'>Extraction de features à partir du modèle MobileNetv2</a>

# <a name="C151" a style='color:blue'>Chargement des données - conversion en binaire</a>


In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data/Test...|2021-07-03 09:00:08|  7353|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7350|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7349|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7348|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:09|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------+----------+
|path                                      |label     |
+------------------------------------------+----------+
|s3://p8-data/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_95_100.jpg |Watermelon|
+------------------------------------------+----------+
only showing top 5 rows

None

# <a name="C152" a style='color:blue'>Préparation du modèle</a>

Utilisation du transfert learning:

    - Modèle pré-entrainé MobileNetV2 (rapidité d'exécution, adapté pour le traitement de gros volumes de données, faible dimensionnalité du vecteur de features en sortie

    - Pas de ré-entrainement
    
    - Récupération de l'avant-dernière couche (vecteur de dimension (1,1,1280))

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# <a name="C153" a style='color:blue'>Définition du processus du chargement des images et application de leur featurisation à travers l'utilisation de Pandas UDF</a>

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



# <a name="C154" a style='color:blue'>Exécution de l'extraction de features</a>

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p8-data/Results

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# <a name="C16" a style='color:blue'>Chargement des données enregistrées et validation des résultats</a>

In [18]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                           path  ...                                           features
0    s3://p8-data/Test/Watermelon/r_174_100.jpg  ...  [0.0059991637, 0.44703647, 0.0, 0.0, 3.3713572...
1  s3://p8-data/Test/Pineapple Mini/128_100.jpg  ...  [0.0146466885, 4.080593, 0.055877004, 0.0, 0.0...
2  s3://p8-data/Test/Pineapple Mini/137_100.jpg  ...  [0.0, 4.9659867, 0.0, 0.0, 0.0, 0.0, 0.5144821...
3      s3://p8-data/Test/Watermelon/275_100.jpg  ...  [0.22511952, 0.07235509, 0.0, 0.0, 1.690149, 0...
4      s3://p8-data/Test/Watermelon/271_100.jpg  ...  [0.3286234, 0.18830013, 0.0, 0.0, 1.9123534, 0...

[5 rows x 3 columns]

In [20]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [21]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 3)

# <a name="C17" a style='color:blue'>ACP</a>

# <a name="C171" a style='color:blue'>Détermination du nombre de composantes pour expliquer 80% de la variance</a>

In [None]:
df['features'] = df['features'].apply(lambda x: Vectors.dense(x))

In [None]:
df

In [None]:
# Convertir le DataFrame Pandas en un DataFrame Spark
df_spark = spark.createDataFrame(df)

In [None]:
pca = PCA(k=100, inputCol="features", outputCol="pca_features")
#pca = PCA(inputCol="features", outputCol="pca_features")

In [None]:
pca_model = pca.fit(df_spark)

In [None]:
df_pca = pca_model.transform(df_spark)

In [None]:
explained_variance = pca_model.explainedVariance

In [None]:
# Calcul de la variance expliquée cumulée
cumulative_variance = [sum(explained_variance[:i+1]) for i in range(len(explained_variance))]

In [None]:
# Tracé du graphique
plt.plot(range(1, len(explained_variance) + 1), cumulative_variance, marker='o')
plt.xlabel('Nombre de composantes principales (k)')
plt.ylabel('Variance expliquée cumulée')
plt.title('Variance expliquée cumulée en fonction de k')
plt.grid(True)
plt.show()

In [None]:
%matplot plt

In [None]:
# Trouver la première valeur de k où la variance cumulée dépasse 0.8
target_variance = 0.8
for k, cum_var in enumerate(cumulative_variance):
    if cum_var >= target_variance:
        print(f"Variance cumulée de {target_variance} atteinte à k = {k+1}")
        break

# <a name="C172" a style='color:blue'>Application de l'ACP et enregistrement des données dans S3</a>

In [None]:
pca = PCA(k=k+1, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(df_spark)
df_pca = pca_model.transform(df_spark)

In [None]:
# Affichage du dataframe spark après ACP
df_pca.show()

In [None]:
print(PATH_Result_ACP)

In [None]:
# Enregistrement des données au format parquet dans S3
df_pca.write.mode("overwrite").parquet(PATH_Result_ACP)