# Preprocessing et étape de réduction via ACP

## Réalisation en local

### Import des librairies

In [61]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

### Chemins d'accès en local
Je vais d'abord tester les différentes étapes avec un dossier Test_light, dans lequel j'ai placé un petit nombre d'image

In [62]:
PATH = os.getcwd()
PATH_Data = PATH+'/Test_light'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/Projet 8
PATH_Data:   /Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/Projet 8/Test_light
PATH_Result: /Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/Projet 8/Results


### Création de la SparkSession

In [63]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)
sc = spark.sparkContext

In [64]:
spark

### Chargement et traitement des données

In [65]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [66]:
images = images.repartition(2).withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------------------------------------------------------------------------------+--------------+
|path                                                                                                                        |label         |
+----------------------------------------------------------------------------------------------------------------------------+--------------+
|file:/Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/Projet 8/Test_light/Watermelon/125_100.jpg   |Watermelon    |
|file:/Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/Projet 8/Test_light/Pineapple Mini/3_100.jpg |Pineapple Mini|
|file:/Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/

### Création du modèle

In [67]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [68]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [69]:
new_model.summary()

Model: "model_2"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_3 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_3[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']      

 block_3_expand_relu (ReLU)  (None, 56, 56, 144)          0         ['block_3_expand_BN[0][0]']   
                                                                                                  
 block_3_pad (ZeroPadding2D  (None, 57, 57, 144)          0         ['block_3_expand_relu[0][0]'] 
 )                                                                                                
                                                                                                  
 block_3_depthwise (Depthwi  (None, 28, 28, 144)          1296      ['block_3_pad[0][0]']         
 seConv2D)                                                                                        
                                                                                                  
 block_3_depthwise_BN (Batc  (None, 28, 28, 144)          576       ['block_3_depthwise[0][0]']   
 hNormalization)                                                                                  
          

 block_6_depthwise_BN (Batc  (None, 14, 14, 192)          768       ['block_6_depthwise[0][0]']   
 hNormalization)                                                                                  
                                                                                                  
 block_6_depthwise_relu (Re  (None, 14, 14, 192)          0         ['block_6_depthwise_BN[0][0]']
 LU)                                                                                              
                                                                                                  
 block_6_project (Conv2D)    (None, 14, 14, 64)           12288     ['block_6_depthwise_relu[0][0]
                                                                    ']                            
                                                                                                  
 block_6_project_BN (BatchN  (None, 14, 14, 64)           256       ['block_6_project[0][0]']     
 ormalizat

                                                                                                  
 block_9_add (Add)           (None, 14, 14, 64)           0         ['block_8_add[0][0]',         
                                                                     'block_9_project_BN[0][0]']  
                                                                                                  
 block_10_expand (Conv2D)    (None, 14, 14, 384)          24576     ['block_9_add[0][0]']         
                                                                                                  
 block_10_expand_BN (BatchN  (None, 14, 14, 384)          1536      ['block_10_expand[0][0]']     
 ormalization)                                                                                    
                                                                                                  
 block_10_expand_relu (ReLU  (None, 14, 14, 384)          0         ['block_10_expand_BN[0][0]']  
 )        

                                                                                                  
 block_13_expand_relu (ReLU  (None, 14, 14, 576)          0         ['block_13_expand_BN[0][0]']  
 )                                                                                                
                                                                                                  
 block_13_pad (ZeroPadding2  (None, 15, 15, 576)          0         ['block_13_expand_relu[0][0]']
 D)                                                                                               
                                                                                                  
 block_13_depthwise (Depthw  (None, 7, 7, 576)            5184      ['block_13_pad[0][0]']        
 iseConv2D)                                                                                       
                                                                                                  
 block_13_

 iseConv2D)                                                                                       
                                                                                                  
 block_16_depthwise_BN (Bat  (None, 7, 7, 960)            3840      ['block_16_depthwise[0][0]']  
 chNormalization)                                                                                 
                                                                                                  
 block_16_depthwise_relu (R  (None, 7, 7, 960)            0         ['block_16_depthwise_BN[0][0]'
 eLU)                                                               ]                             
                                                                                                  
 block_16_project (Conv2D)   (None, 7, 7, 320)            307200    ['block_16_depthwise_relu[0][0
                                                                    ]']                           
          

### Diffusion des poids du modèle sur les workers

In [70]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [71]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

### Processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [72]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



### Exécution des actions d'extraction de features

In [73]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [74]:
features_df = images.select(col("path"),col("label"),featurize_udf("content").alias("features"))

In [75]:
features_df.show(5)

[Stage 89:>                                                         (0 + 1) / 1]

+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|file:/Users/manue...|    Watermelon|[0.047144607, 0.4...|
|file:/Users/manue...|Pineapple Mini|[0.0, 4.857954, 0...|
|file:/Users/manue...|    Watermelon|[0.15568992, 0.10...|
|file:/Users/manue...|    Watermelon|[0.09226661, 0.19...|
|file:/Users/manue...|    Watermelon|[0.0, 0.23220576,...|
+--------------------+--------------+--------------------+
only showing top 5 rows



                                                                                

### Mise en place d'une ACP

On récupère d'abord les features et on les transforme de liste de float en vecteurs :

In [76]:
list_to_vector_udf = udf(lambda l: DenseVector(l), VectorUDT())

features_df = features_df.withColumn("vector_features", list_to_vector_udf("features"))

On applique un standard scaling sur les données :

In [77]:
standardizer = StandardScaler(inputCol="vector_features", outputCol="scaled_features",
                              withStd=True, withMean=True)
model_std = standardizer.fit(features_df)
features_df = model_std.transform(features_df)

                                                                                

On définit ensuite une première fois une ACP en définissant k = nombre total de features = 1280 :

In [78]:
pca_temp = PCA(k=1280, inputCol="scaled_features", outputCol="temp_features")
model_temp = pca_temp.fit(features_df)

                                                                                

En utilisant la somme des variances expliquées, on récupère le nombre de features qui donne 100% du total de la variance expliquée :

In [79]:
explained_variance_ratio = model_temp.explainedVariance
total_variance = sum(explained_variance_ratio)
#required_variance = 0.99 * total_variance

num_components = 0
current_variance = 0.0
for variance in explained_variance_ratio:
    current_variance += variance
    num_components += 1
    if current_variance >= total_variance:
        break

print(num_components)

49


On refait maintenant une nouvelle ACP, en prenant cette fois ci k = 'nombre de features qui donne 100% du total de la variance expliquée', et on ajoute une nouvelle colonne "pca_features" :

In [80]:
pca = PCA(k=num_components, inputCol="scaled_features", outputCol="pca_features")
model = pca.fit(features_df)
features_df = model.transform(features_df)

                                                                                

On retransforme nos vecteurs en liste de float :

In [81]:
vector_to_list_udf = udf(lambda v: [float(x) for x in v], ArrayType(FloatType()))

features_df = features_df.withColumn("array_pca_features", vector_to_list_udf("pca_features"))

In [82]:
features_df.select(col("path"),col("label"),col("features"),col("array_pca_features").alias("pca_features")).show(5)



+--------------------+--------------+--------------------+--------------------+
|                path|         label|            features|        pca_features|
+--------------------+--------------+--------------------+--------------------+
|file:/Users/manue...|    Watermelon|[0.047144607, 0.4...|[-20.983297, 0.06...|
|file:/Users/manue...|Pineapple Mini|[0.0, 4.857954, 0...|[21.303204, -13.1...|
|file:/Users/manue...|    Watermelon|[0.15568992, 0.10...|[-23.680433, -5.6...|
|file:/Users/manue...|    Watermelon|[0.09226661, 0.19...|[-25.061506, 1.98...|
|file:/Users/manue...|    Watermelon|[0.0, 0.23220576,...|[-17.961191, 0.01...|
+--------------------+--------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

Rappel du répertoire où seront inscrits les fichiers au format "parquet" contenant nos résultats, à savoir, un DataFrame contenant 4 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image avec 1280 features
 4. Vecteur de caractéristiques de l'image après réduction via ACP

In [83]:
print(PATH_Result)

/Users/manuelmartin67/Documents/Formation Data Scientist - Openclassrooms/Projet 8/Results


On exporte la liste au format "parquet" :

In [84]:
features_df.select(col("path"),col("label"),col("features"),col("array_pca_features").alias("pca_features")).write.mode("overwrite").parquet(PATH_Result)

                                                                                

### Chargement des données enregistrées et validation du résultat

In [85]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [86]:
df.head(5)

Unnamed: 0,path,label,features,pca_features
0,file:/Users/manuelmartin67/Documents/Formation...,Watermelon,"[0.047144607, 0.4858582, 0.057451103, 0.0, 2.3...","[-20.983297, 0.066809975, 0.572959, -3.6629622..."
1,file:/Users/manuelmartin67/Documents/Formation...,Pineapple Mini,"[0.0, 4.857954, 0.0, 0.0, 0.0, 0.0, 0.22289573...","[21.303204, -13.191279, 1.5589374, 2.4906194, ..."
2,file:/Users/manuelmartin67/Documents/Formation...,Watermelon,"[0.15568992, 0.10087934, 0.24235694, 0.0087293...","[-23.680433, -5.6297064, 15.834938, -21.896273..."
3,file:/Users/manuelmartin67/Documents/Formation...,Watermelon,"[0.09226661, 0.19952638, 0.10727616, 0.0, 2.23...","[-25.061506, 1.9876293, 5.297536, 11.438246, -..."
4,file:/Users/manuelmartin67/Documents/Formation...,Watermelon,"[0.0, 0.23220576, 0.5722912, 0.0, 2.1421719, 0...","[-17.961191, 0.015848497, 2.0389814, -4.005884..."


On vérifie que la colonne features contient bien le même nombre de features que précédemment :

In [87]:
df.loc[0,'features'].shape

(1280,)

On vérifie que la colonne pca_features contient bien le même nombre de features que précédemment :

In [88]:
df.loc[0,'pca_features'].shape

(49,)

In [89]:
df.shape

(16, 4)

In [90]:
np.set_printoptions(threshold=np.inf)
df.to_csv('features_reduction_ACP.csv',sep='\t',index=False)

## Réalisation sur AWS

### Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1688026780659_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1688026780659_0001,pyspark,idle,Link,Link,,✔


### Import des librairies

In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Chemins d'accès S3
Pour des raisons de coût, et de temps de démonstration, je vais également tester les différentes étapes avec un dossier Test_light, dans lequel j'ai placé un petit nombre d'image

In [4]:
PATH = 's3://manuelmartin67-projet8'
PATH_Data = PATH+'/Test_light'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://manuelmartin67-projet8
PATH_Data:   s3://manuelmartin67-projet8/Test_light
PATH_Result: s3://manuelmartin67-projet8/Results

### Chargement et traitement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://manuelmartin...|2023-06-29 08:05:32|  7353|[FF D8 FF E0 00 1...|
|s3://manuelmartin...|2023-06-29 08:05:33|  7350|[FF D8 FF E0 00 1...|
|s3://manuelmartin...|2023-06-29 08:05:33|  7349|[FF D8 FF E0 00 1...|
|s3://manuelmartin...|2023-06-29 08:05:33|  7348|[FF D8 FF E0 00 1...|
|s3://manuelmartin...|2023-06-29 08:05:44|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------------------+----------+
|path                                                           |label     |
+---------------------------------------------------------------+----------+
|s3://manuelmartin67-projet8/Test_light/Watermelon/r_106_100.jpg|Watermelon|
|s3://manuelmartin67-projet8/Test_light/Watermelon/r_109_100.jpg|Watermelon|
|s3://manuelmartin67-projet8/Test_light/Watermelon/r_108_100.jpg|Watermelon|
|s3://manuelmartin67-projet8/Test_light/Watermelon/r_107_100.jpg|Watermelon|
|s3://manuelmartin67-projet8/Test_light/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------------------------+----------+
only showing top 5 rows

None

### Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### Exécutions des actions d'extractions de features

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.select(col("path"),col("label"),featurize_udf("content").alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Mise en place d'une ACP

On récupère d'abord les features et on les transforme de liste de float en vecteurs :

In [16]:
list_to_vector_udf = udf(lambda l: DenseVector(l), VectorUDT())

features_df = features_df.withColumn("vector_features", list_to_vector_udf("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On applique un standard scaling sur les données :

In [17]:
standardizer = StandardScaler(inputCol="vector_features", outputCol="scaled_features",
                              withStd=True, withMean=True)
model_std = standardizer.fit(features_df)
features_df = model_std.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On définit ensuite une première fois une ACP en définissant k = nombre total de features = 1280 :

In [18]:
pca_temp = PCA(k=1280, inputCol="scaled_features", outputCol="temp_features")
model_temp = pca_temp.fit(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

En utilisant la somme des variances expliquées, on récupère le nombre de features qui donne 100% du total de la variance expliquée :

In [19]:
print(sum(explained_variance))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
name 'explained_variance' is not defined
Traceback (most recent call last):
NameError: name 'explained_variance' is not defined



In [20]:
explained_variance_ratio = model_temp.explainedVariance
total_variance = sum(explained_variance_ratio)
#required_variance = 0.99 * total_variance

num_components = 0
current_variance = 0.0
for variance in explained_variance_ratio:
    current_variance += variance
    num_components += 1
    if current_variance >= total_variance:
        break

print(num_components)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

322

On refait maintenant une nouvelle ACP, en prenant cette fois ci k = 'nombre de features qui donne 100% du total de la variance expliquée', et on ajoute une nouvelle colonne "pca_features" :

In [21]:
pca = PCA(k=num_components, inputCol="scaled_features", outputCol="pca_features")
model = pca.fit(features_df)
features_df = model.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On retransforme nos vecteurs en liste de float :

In [22]:
vector_to_list_udf = udf(lambda v: [float(x) for x in v], ArrayType(FloatType()))

features_df = features_df.withColumn("array_pca_features", vector_to_list_udf("pca_features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rappel du répertoire où seront inscrits les fichiers au format "parquet" contenant nos résultats, à savoir, un DataFrame contenant 4 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image avec 1280 features
 4. Vecteur de caractéristiques de l'image après réduction via ACP

In [23]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://manuelmartin67-projet8/Results

On exporte la liste au format "parquet" :

In [24]:
features_df.select(col("path"),col("label"),col("features"),col("array_pca_features").alias("pca_features")).write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Chargement des données enregistrées et validation du résultat

In [25]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                       pca_features
0  s3://manuelmartin67-projet8/Test_light/Waterme...  ...  [-17.494122, -1.5239439, -0.49008507, 0.429141...
1  s3://manuelmartin67-projet8/Test_light/Waterme...  ...  [-20.55025, -1.5177195, 1.3590307, 0.9668844, ...
2  s3://manuelmartin67-projet8/Test_light/Waterme...  ...  [-19.595888, -0.053389765, 0.6405229, -2.18608...
3  s3://manuelmartin67-projet8/Test_light/Waterme...  ...  [-19.673742, 0.06817288, -0.15944004, -0.68616...
4  s3://manuelmartin67-projet8/Test_light/Waterme...  ...  [-21.86672, 2.4515252, 1.8881966, 0.25001448, ...

[5 rows x 4 columns]

In [27]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [28]:
df.loc[0,'pca_features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(322,)

In [29]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(320, 4)

In [30]:
np.set_printoptions(threshold=np.inf)
df.to_csv('s3://manuelmartin67-projet8/features_reduction_ACP.csv',sep='\t',index=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…