# Table of Contents

* [Loading libraries, images and starting Pyspark session](#chapter0)
* [Loading Model](#chapter1)
* [Preprocessing](#chapter2)
* [Processing](#chapter3)
* [Feature Reduction with ACP](#chapter4)
* [Saving datas](#chapter5)

# Lien vers la presentation <a class="anchor" id='chapter0'></a>

#### https://docs.google.com/presentation/d/1qWBxta1_aGsXhI4qatkODA4zdZ3gJ7K7XKPResbZ2b4/edit?usp=sharing

# Loading libraries, images and starting Pyspark session <a class="anchor" id='chapter0'></a>

In [1]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1709631853089_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
def randomly_copy_files(root_folder, num_files_to_copy=10, output_folder="Training-local"):
    
    """
    Randomly selects and copies a specified number of files from each subfolder within the given root folder
    to an output folder.

    Parameters:
    - root_folder (str): The root folder from which to start the search for files in subfolders.
    - num_files_to_copy (int, optional): The number of files to randomly select and copy from each subfolder. Default is 10.
    - output_folder (str, optional): The folder where the selected files will be copied. Default is "Training-local".
    """
    
    try:
        # Create the output folder if it doesn't exist
        if not os.path.exists(output_folder):
            os.makedirs(output_folder)

        # Walk through the root folder and its subfolders
        for category_folder, _, files in os.walk(root_folder):
            # Skip the root folder
            if category_folder == root_folder:
                continue

            # Get the category name from the folder path
            category_name = os.path.basename(category_folder)

            # Create a folder in the output directory for the category
            category_output_folder = os.path.join(output_folder, category_name)
            os.makedirs(category_output_folder, exist_ok=True)

            # Randomly pick num_files_to_copy items from the category
            selected_files = random.sample(files, min(num_files_to_copy, len(files)))

            # Copy the selected files to the output folder
            for selected_file in selected_files:
                source_path = os.path.join(category_folder, selected_file)
                destination_path = os.path.join(category_output_folder, selected_file)
                shutil.copy(source_path, destination_path)
            print(f"Completed copying {num_files_to_copy} random files from '{category_name}'.")

        print(f"Successfully created 'Training-local' with {num_files_to_copy} random files from each subfolder.")

    except FileNotFoundError:
        print(f"The folder '{root_folder}' does not exist.")
    except PermissionError:
        print(f"Permission error accessing '{root_folder}'.")
    except Exception as e:
        print(f"An error occurred: {e}")

#root_folder_path = '/content/drive/MyDrive/fruits/fruits-360_dataset/fruits-360/Training'
#randomly_copy_files(root_folder_path, num_files_to_copy=10, output_folder="/content/drive/MyDrive/fruits/fruits-360_dataset/fruits-360/Train-set")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
PATH = 's3://p8-data-fruits'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8-data-fruits
PATH_Data:   s3://p8-data-fruits/Test
PATH_Result: s3://p8-data-fruits/Results

In [4]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data-frui...|2024-02-05 17:16:50|  7353|[FF D8 FF E0 00 1...|
|s3://p8-data-frui...|2024-02-05 17:17:17|  7350|[FF D8 FF E0 00 1...|
|s3://p8-data-frui...|2024-02-05 17:17:35|  7349|[FF D8 FF E0 00 1...|
|s3://p8-data-frui...|2024-02-05 17:16:38|  7348|[FF D8 FF E0 00 1...|
|s3://p8-data-frui...|2024-02-05 17:16:14|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [6]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------------+----------+
|path                                             |label     |
+-------------------------------------------------+----------+
|s3://p8-data-fruits/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data-fruits/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data-fruits/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data-fruits/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data-fruits/Test/Watermelon/r_95_100.jpg |Watermelon|
+-------------------------------------------------+----------+
only showing top 5 rows

None

# Loading Model <a class="anchor" id='chapter1'></a>

In [7]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [8]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

# Preprocessing <a class="anchor" id='chapter2'></a>

In [11]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [13]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Processing <a class="anchor" id='chapter3'></a>

In [15]:
features_df.repartition("label").write.partitionBy("label").mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 3)

In [27]:
df.features.dtype

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

dtype('O')

In [31]:
spark_df = spark.read.parquet(PATH_Result, engine='pyarrow')

spark_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+----------+
|                path|            features|     label|
+--------------------+--------------------+----------+
|s3://p8-data-frui...|[0.22090243, 0.50...|Banana Red|
|s3://p8-data-frui...|[0.5853961, 0.108...|Banana Red|
|s3://p8-data-frui...|[0.90440553, 0.13...|Banana Red|
|s3://p8-data-frui...|[0.94255775, 0.01...|Banana Red|
|s3://p8-data-frui...|[0.34443653, 0.0,...|Banana Red|
|s3://p8-data-frui...|[0.8618154, 0.018...|Banana Red|
|s3://p8-data-frui...|[0.99609035, 0.16...|Banana Red|
|s3://p8-data-frui...|[0.34001258, 0.06...|Banana Red|
|s3://p8-data-frui...|[0.043630444, 0.1...|Banana Red|
|s3://p8-data-frui...|[0.018199764, 0.4...|Banana Red|
|s3://p8-data-frui...|[0.052573018, 0.2...|Banana Red|
|s3://p8-data-frui...|[0.5949341, 0.064...|Banana Red|
|s3://p8-data-frui...|[0.6204077, 0.054...|Banana Red|
|s3://p8-data-frui...|[0.7636681, 0.032...|Banana Red|
|s3://p8-data-frui...|[0.760086, 0.0182...|Banana Red|
|s3://p8-d

In [35]:
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Apply the UDF to create a new 'features_vector' column
spark_df = spark_df.withColumn('features_vector', array_to_vector_udf('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Feature Reduction <a class="anchor" id='chapter4'></a>

In [38]:
for n in range (1200,0,-100):
    n_components = n
    pca = PCA(k=n_components, inputCol='features_vector', outputCol='pcaFeatures')
    pca_model = pca.fit(spark_df)
    print(f"The explained variance is {pca_model.explainedVariance.toArray().sum()} for {n_components} components!")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The explained variance is 0.9996699331690085 for 1200 components!
The explained variance is 0.9986457305565695 for 1100 components!
The explained variance is 0.9969667692278852 for 1000 components!
The explained variance is 0.9945121193774749 for 900 components!
The explained variance is 0.9911238039334226 for 800 components!
The explained variance is 0.986538241766981 for 700 components!
The explained variance is 0.9803159302765977 for 600 components!
The explained variance is 0.9717506720978099 for 500 components!
The explained variance is 0.9594824434677158 for 400 components!
The explained variance is 0.9405877129088722 for 300 components!
The explained variance is 0.9073574507274882 for 200 components!
The explained variance is 0.8311113286300454 for 100 components!

# Saving datas <a class="anchor" id='chapter5'></a>

In [41]:
n_components = 200
pca = PCA(k=n_components, inputCol='features_vector', outputCol='pcaFeatures')
pca_model = pca.fit(spark_df)
result_df = pca_model.transform(spark_df)

# Process the Spark DataFrame column and select relevant columns
processed_df = result_df.select('pcaFeatures', 'label', 'path')
processed_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[pcaFeatures: vector, label: string, path: string]

In [81]:
df_matrix = processed_df.select(vector_to_array("pcaFeatures").alias("pcaFeatures"))
df_matrix

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[pcaFeatures: array<double>]

In [84]:
df_matrix.write.mode("overwrite").json('s3://p8-data-fruits/Reduced-matrix')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…