# Notebook pour AWS Fruits Project Open Classroom 8
# Load le saved model afin de transform les features avec la PCA

### A faire :
- Etre sure d'être dans le kernel Pyspark

## Initialisation

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1691149654471_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [16]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1691149654471_0004,pyspark,idle,Link,Link,
4,application_1691149654471_0005,pyspark,idle,Link,Link,✔


### Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur :
- sudo python3 -m pip install -U setuptools
- sudo python3 -m pip install -U pip 
- sudo python3 -m pip install wheel
- sudo python3 -m pip install pillow 
- sudo python3 -m pip install pandas==1.2.5 
- sudo python3 -m pip install pyarrow 
- sudo python3 -m pip install boto3 
- sudo python3 -m pip install s3fs
- sudo python3 -m pip install fsspec


### Import des librairies

In [17]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [18]:
PATH = 's3://oc8-data-naomigua'
PATH_Data = PATH+'/test_15'
PATH_Result = PATH+'/Result_15'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://oc8-data-naomigua
PATH_Data:   s3://oc8-data-naomigua/test_15
PATH_Result: s3://oc8-data-naomigua/Result_15

### Traitement des données

#### Chargement des données

In [19]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://oc8-data-nao...|2023-08-04 12:06:56|  4742|[FF D8 FF E0 00 1...|
|s3://oc8-data-nao...|2023-08-04 12:06:56|  4490|[FF D8 FF E0 00 1...|
|s3://oc8-data-nao...|2023-08-04 12:06:56|  4405|[FF D8 FF E0 00 1...|
|s3://oc8-data-nao...|2023-08-04 12:06:56|  4352|[FF D8 FF E0 00 1...|
|s3://oc8-data-nao...|2023-08-04 12:06:56|  4156|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute une colonne contenant les **labels** de chaque image</u> :

In [21]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------------+-------------+
|path                                                     |label        |
+---------------------------------------------------------+-------------+
|s3://oc8-data-naomigua/test_15/Blueberry/r_200_100.jpg   |Blueberry    |
|s3://oc8-data-naomigua/test_15/Blueberry/r_81_100.jpg    |Blueberry    |
|s3://oc8-data-naomigua/test_15/Tomato Maroon/30_100.jpg  |Tomato Maroon|
|s3://oc8-data-naomigua/test_15/Tomato Maroon/230_100.jpg |Tomato Maroon|
|s3://oc8-data-naomigua/test_15/Tomato Maroon/r_34_100.jpg|Tomato Maroon|
+---------------------------------------------------------+-------------+
only showing top 5 rows

None

#### Préparation du modèle

In [22]:
images.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

####  Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



####  Exécutions des actions d'extractions de features

In [14]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
features_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [25]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://oc8-data-naomigua/Result_15

### PCA 

In [26]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
dense_vectors_for_PCA=features_df.rdd.map(lambda x: (x['label'], Vectors.dense(x['features']),x['path'])).toDF(['label', 'features','path'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
dense_vectors_for_PCA

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[label: string, features: vector, path: string]

#### load PCA

In [29]:
num_total_features = len(features_df.select('features').first()[0])
print(num_total_features)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1280

In [32]:
from pyspark.ml.feature import PCAModel

pca_model = PCAModel.load(PATH+'/pca_model')


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### transform via loaded PCA

In [33]:
df_pca = pca_model.transform(dense_vectors_for_PCA)
df_pca

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[label: string, features: vector, path: string, pcaFeatures: vector]

In [34]:
df_pca.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10

In [35]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

vector_to_array = udf(lambda vector: vector.toArray().tolist(), ArrayType(FloatType()))
df_pca_array = df_pca.withColumn("pcaFeaturesArray", vector_to_array("pcaFeatures"))
df_pca_array = df_pca_array.drop("features", "pcaFeatures")
df_pca_array = df_pca_array.withColumnRenamed("pcaFeaturesArray", "features")
final_df = df_pca_array.select("path", "label","features")
final_df


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[path: string, label: string, features: array<float>]

In [36]:
#features_df.write.mode("overwrite").parquet(PATH_Result)
final_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Chargement des données enregistrées et validation du résultat

In [37]:
#df = pd.read_parquet(PATH_Result, engine='pyarrow')
df_spark = spark.read.parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
df = df_spark.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
pd.set_option('display.max_colwidth', 40)
print(df.head())
print(df.shape)
print(df.columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                      path  ...                                 features
0  s3://oc8-data-naomigua/test_15/Tomat...  ...  [11.947848320007324, 5.5733852386474...
1  s3://oc8-data-naomigua/test_15/Tomat...  ...  [13.343982696533203, 3.9873738288879...
2  s3://oc8-data-naomigua/test_15/Tomat...  ...  [12.134568214416504, 4.0225086212158...
3  s3://oc8-data-naomigua/test_15/Tomat...  ...  [12.909977912902832, 2.7520880699157...
4  s3://oc8-data-naomigua/test_15/Pear ...  ...  [-0.1685662716627121, 6.787441730499...

[5 rows x 3 columns]
(10, 3)
Index(['path', 'label', 'features'], dtype='object')

In [40]:
#df.loc[0,'features'].shape
length_of_features_list = len(df.loc[0, 'features'])
print(length_of_features_list)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

41

In [41]:
final_df_single_partition = final_df.repartition(1)
from pyspark.sql.functions import concat_ws

# Convertir la colonne "features" en une chaîne de caractères
final_df_csv = final_df_single_partition.withColumn("features_str", concat_ws(",", "features"))
final_df_csv=final_df_csv.select("path", "label", "features_str")
final_df_csv.write.mode("overwrite").csv("s3://oc8-data-naomigua/final_features_test_15.csv", header=True, sep=",")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Ne pas oublier : Résiliation de l'instance EMR

1. Commencez par quitter dans putty : "exit"
2. Cliquez sur "**Résilier**"
3. Confirmez la résiliation, La résiliation prend environ **1 minute*

## Si on veut relancer le notebook il faut :

**cloner notre cluster** et ainsi en obtenir une copie fonctionnelle sous 15/20 minutes, le temps de son instanciation.

<u>Pour cela deux solutions</u> :
1. <u>Depuis l'interface AWS</u> :
 1. Cliquez sur "**Cloner**"
 2. Dans notre cas nous ne souhaitons pas inclure d'étapes
 3. La configuration du cluster est recréée à l’identique. <br />
