# Démarrage de la session Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1739197324002_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.
Voici la liste des versions python et modules

In [3]:
import sys
print("Version Python :\n", sys.version)

from pip import _internal
print("PIP LIST :\n", _internal.main(['list']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Version Python :
 3.9.20 (main, Dec 11 2024, 00:00:00) 
[GCC 11.4.1 20230605 (Red Hat 11.4.1-2)]
Package                      Version
---------------------------- -----------
absl-py                      1.3.0
aiobotocore                  2.19.0
aiohappyeyeballs             2.4.6
aiohttp                      3.11.12
aioitertools                 0.12.0
aiosignal                    1.3.2
appdirs                      1.4.4
astor                        0.8.1
astunparse                   1.6.3
async-timeout                5.0.1
attrs                        20.3.0
aws-cfn-bootstrap            2.0
awscli                       2.17.18
awscrt                       0.19.19
Babel                        2.9.1
beautifulsoup4               4.9.3
bleach                       3.3.1
boto                         2.49.0
boto3                        1.36.16
botocore                     1.36.3
cachetools                   4.1.1
certifi                      2024.12.14
cffi                         1.14.5
cha

# Import des librairies

In [6]:
import pandas as pd
import numpy as np
import io
import os
from PIL import Image

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, udf, PandasUDFType, element_at, split

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [7]:
PATH = 's3://ocp9-fr-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://ocp9-fr-data
PATH_Data:   s3://ocp9-fr-data/Test
PATH_Result: s3://ocp9-fr-data/Results

# Traitement des données

## Chargement des données
> Les données sont placées dans le bucket S3 ocp9-fr-data dans le répertoire Tests<br>
> Elles sont chargées au format binaire ce qui sera plus souple pour les prétraitements<br>
> on ne récupère que les fichiers jpg contenus dans toute l'arborescence


In [8]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Affichage données brutes
> nous pouvons voir le Path de l'image, sa date et heure de modification, sa taille , son contenu encodé en hexadécimal

In [9]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://ocp9-fr-data...|2025-02-07 13:40:33|  5061|[FF D8 FF E0 00 1...|
|s3://ocp9-fr-data...|2025-02-07 13:40:33|  5058|[FF D8 FF E0 00 1...|
|s3://ocp9-fr-data...|2025-02-07 13:40:33|  5056|[FF D8 FF E0 00 1...|
|s3://ocp9-fr-data...|2025-02-07 13:40:33|  5048|[FF D8 FF E0 00 1...|
|s3://ocp9-fr-data...|2025-02-07 13:40:33|  5048|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

## Affichage du Path et des labels
> nous recupérons le dernier niveau du path comme labelisation

In [10]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-----------------------------------------+-------+
|path                                     |label  |
+-----------------------------------------+-------+
|s3://ocp9-fr-data/Test/Tangelo/14_100.jpg|Tangelo|
|s3://ocp9-fr-data/Test/Tangelo/17_100.jpg|Tangelo|
|s3://ocp9-fr-data/Test/Tangelo/20_100.jpg|Tangelo|
|s3://ocp9-fr-data/Test/Tangelo/15_100.jpg|Tangelo|
|s3://ocp9-fr-data/Test/Tangelo/19_100.jpg|Tangelo|
+-----------------------------------------+-------+
only showing top 5 rows

None

# Récupération du modèle déjà entrainé

In [11]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5
[1m       0/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 0s/step[1m   49152/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m23s[0m 2us/step[1m   81920/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m27s[0m 2us/step[1m 1056768/14536120[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m2s[0m 0us/step [1m 1654784/14536120[0m [32m━━[0m[37m━━━━━━━━━━━━━━━━━━[0m [1m2s[0m 0us/step[1m 2646016/14536120[0m [32m━━━[0m[37m━━━━━━━━━━━━━━━━━[0m [1m1s[0m 0us/step[1m 4546560/14536120[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━

In [12]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Export des poids du modèle

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃ Param # ┃ Connected to         ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━┩
│ input_layer         │ (None, 224, 224,  │       0 │ -                    │
│ (InputLayer)        │ 3)                │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │     864 │ input_layer[0][0]    │
│                     │ 32)               │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │     128 │ Conv1[0][0]          │
│ (BatchNormalizatio… │ 32)               │         │                      │
├─────────────────────┼───────────────────┼─────────┼──────────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │       0 │ 

> importation des poids du modèle vers les executors

In [15]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Définition du processus de chargement des images <br> et application de leur featurisation à travers l'utilisation de pandas UDF

In [16]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## Exécutions des actions d'extractions de features

> Limitation des enregistrements pour alleger l'espace mémoire

In [17]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://ocp9-fr-data/Results

# PCA 

## Première étape, on transforme les features du format array vers vectorUDT

In [20]:
conv2Vec = udf(lambda vs: Vectors.dense(vs), VectorUDT())
features_df = features_df.withColumn('vectorizedFeatures', conv2Vec(features_df['features']))
print(features_df.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- vectorizedFeatures: vector (nullable = true)

None

## Deuxième étape on réalise l'ACP
> on créé un Pipe qui execute un standard scaler puis lance l'ACP 

In [21]:
scaler = StandardScaler().setInputCol("vectorizedFeatures").setOutputCol("scaledFeatures") \
                         .setWithStd(False).setWithMean(True)
pca_features = PCA(k=2).setInputCol("scaledFeatures").setOutputCol("pcaFeatures")
pipeline = Pipeline(stages=[scaler , pca_features])
pcaModel = pipeline.fit(features_df)
features_df = pcaModel.transform(features_df)
print(features_df.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- vectorizedFeatures: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)
 |-- pcaFeatures: vector (nullable = true)

None

# Sauvegarde des données au format CSV

In [32]:
features_df.select(col("path"),col("label"),col("pcaFeatures")).toPandas().to_csv(PATH_Result+"/Sortie_Pipeline.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# On verifie la conformité des données en sortie

In [33]:
df = features_df.select(col("path"),col("label"),col("pcaFeatures")).toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(300, 3)

In [35]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                pcaFeatures
0      s3://ocp9-fr-data/Test/Tomato Heart/2_100.jpg  ...  [-0.13212825757922453, 4.880532664700052]
1          s3://ocp9-fr-data/Test/Tangelo/10_100.jpg  ...         [5.42032315964, 2.899499244312785]
2  s3://ocp9-fr-data/Test/Tomato not Ripened/25_1...  ...    [11.352458808635163, 7.531871909516764]
3     s3://ocp9-fr-data/Test/Tomato Heart/15_100.jpg  ...   [-0.5482383898331199, 6.131384896541967]
4          s3://ocp9-fr-data/Test/Tomato 4/7_100.jpg  ...  [-3.1473057326550005, -9.153188831181625]

[5 rows x 3 columns]

> Vérification des dimensions de la sortie de l'ACP

In [36]:
df.loc[0,'pcaFeatures'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(2,)