# 1. Déploiement de la solution sur le cloud

## 1.1 Démarrage de la session Spark

In [22]:
# L'exécution de cette cellule démarre l'application Spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [23]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1677685681226_0009,pyspark,idle,Link,Link,✔


## 1.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

## 1.3 Import des librairies

In [24]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from time import time

from pyspark.ml.feature import StandardScaler, PCA
# https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.PCA.html
# https://spark.apache.org/docs/latest/ml-features.html#pca
from pyspark.ml.functions import array_to_vector, vector_to_array

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 1.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [25]:
PATH = 's3://oc-p8-data/data'
print("PATH :", PATH)

PATH_Data = PATH + '/Test'
PATH_Result = PATH + '/Results'

print('PATH:        ' +
      PATH + '\nPATH_Data:   ' +
      PATH_Data + '\nPATH_Result: ' + PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH : s3://oc-p8-data/data
PATH:        s3://oc-p8-data/data
PATH_Data:   s3://oc-p8-data/data/Test
PATH_Result: s3://oc-p8-data/data/Results

In [26]:
t0 = time()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 1.5 Traitement des données

### 1.5.1 Chargement des données

In [27]:
images = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").option("recursiveFileLookup", "true").load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://oc-p8-data/d...|2023-02-28 16:01:17|  7353|[FF D8 FF E0 00 1...|
|s3://oc-p8-data/d...|2023-02-28 16:01:18|  7350|[FF D8 FF E0 00 1...|
|s3://oc-p8-data/d...|2023-02-28 16:01:17|  7349|[FF D8 FF E0 00 1...|
|s3://oc-p8-data/d...|2023-02-28 16:01:18|  7348|[FF D8 FF E0 00 1...|
|s3://oc-p8-data/d...|2023-02-28 16:01:18|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [29]:
images = images.withColumn('label', element_at(split(images['path'], '/'), -2))
print(images.printSchema())
print(images.select('path', 'label').show(5, False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------+----------+
|path                                              |label     |
+--------------------------------------------------+----------+
|s3://oc-p8-data/data/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://oc-p8-data/data/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://oc-p8-data/data/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://oc-p8-data/data/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://oc-p8-data/data/Test/Watermelon/r_95_100.jpg |Watermelon|
+--------------------------------------------------+----------+
only showing top 5 rows

None

### 1.5.2 Préparation du modèle

In [30]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_2[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
____________________________________________________________________________________________

In [34]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                      outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [35]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    #model.set_weights(brodcast_weights.value)
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
model = model_fn()
brodcast_weights = sc.broadcast(model.get_weights())

### 1.5.4 Exécutions des actions d'extractions de features

In [36]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://oc-p8-data/data/Results

In [39]:
print("generating features")
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                            )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

generating features

### 1.6 PCA

In [38]:
def scale_features(df_features):
    """
    Returns the DataFrame entered as a parameter scaled using a Standard Scaler

    :param df_features: (pyspark.sql.dataframe.DataFrame)
    :return:
    :rtype: pyspark.sql.dataframe.DataFrame
    """

    # transform array to vector
    df_features = df_features.withColumn('features', array_to_vector('features'))
    
    print("Scaling features")
    # scale data
    scaler = StandardScaler(
        inputCol = 'features',
        outputCol = 'scaled_features',
        withMean = True,
        withStd = True
    ).fit(df_features)
    df_features_scaled = scaler.transform(df_features)

    return df_features_scaled

def get_pca(df_features_scaled, n_components=40):
    """
    make pca on data scaled

    :param df_features_scaled: (pyspark.sql.dataframe.DataFrame)
    :param n_components: (int) number of components to fit the PCA with

    """
    print("Fitting PCA")
    pca = PCA(
        k = n_components, # output from Keras model is an array of dim ??
        inputCol = 'scaled_features',
        outputCol = 'pca_features'
    ).fit(df_features_scaled)

    return pca

def get_pca_features(pca_model, df_features_scaled):
    """
    :param pca_model:
    :param df_features_scaled: (pyspark.sql.dataframe.DataFrame)
    :return:
    :rtype: pyspark.sql.dataframe.DataFrame

    Getting features based on PCA

    """
    df_features_pca = pca_model.transform(df_features_scaled)

    # drop scaled data
    # transform vector to array for saving
    df_features_pca = (df_features_pca
                       .drop("scaled_features")
                       .withColumn('features', vector_to_array('features'))
                       .withColumn('pca_features', vector_to_array('pca_features')))

    return df_features_pca


def main_pca(df_features):
    """

    :param df_features:
    :return:
    """
    # Scaling features
    print("1-Scaling features")
    df_features_scaled = scale_features(df_features)

    # Creating pca model fitted on scaled features
    print("2-Creating pca model fitted on scaled features")
    pca = get_pca(df_features_scaled)

    # Getting pca features
    print("3-Getting pca features")
    df_features_pca = get_pca_features(pca, df_features_scaled)

    return df_features_pca


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
print("creating PCA")
features_df_pca = main_pca(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

creating PCA
1-Scaling features
Scaling features
2-Creating pca model fitted on scaled features
Fitting PCA
3-Getting pca features

<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [41]:
features_df.show(2)
features_df_pca.show(2)

features_df_pca.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|s3://oc-p8-data/d...|    Watermelon|[0.8537659, 0.450...|
|s3://oc-p8-data/d...|Pineapple Mini|[0.0, 4.498071, 0...|
+--------------------+--------------+--------------------+
only showing top 2 rows

+--------------------+--------------+--------------------+--------------------+
|                path|         label|            features|        pca_features|
+--------------------+--------------+--------------------+--------------------+
|s3://oc-p8-data/d...|    Watermelon|[0.84154003858566...|[-13.578741245307...|
|s3://oc-p8-data/d...|Pineapple Mini|[0.0, 4.333771705...|[-5.8529876532093...|
+--------------------+--------------+--------------------+--------------------+
only showing top 2 rows

In [42]:
print("Computing time : {} seconds".format(time() - t0))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Computing time : 2216.4246039390564 seconds