# Import des librairies utiles

In [1]:
import pandas as pd
from PIL import Image
import numpy as np
import io


# Pyspark
import findspark
findspark.init()
import pyspark

# data handling
from pyspark.sql import SparkSession
from pyspark.sql.functions import element_at, split
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import *


# ml tasks
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA

#transformation
from pyspark.ml.linalg import Vectors, VectorUDT

import pyarrow

from contextlib import contextmanager
import time 
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

from typing import Iterator

# core featurizer
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
tf.__version__

import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

print('pyspark ==', pyspark.__version__)

pyspark == 3.2.1+amzn.0.dev0


In [2]:
def timer(title):
    t0 = time.time()
    yield
    print("{} - done in {:.0f}s".format(title, time.time() - t0))

In [3]:
import boto3

# Clés d'accès
Nous mettons nos clés d'accès afin d'accéder au serveur S3 sur ce Notebook

In [4]:
acces_key ='AKIAVJOAOTX6XK4HWD4P'
secret_acces_key = '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'

# Connexion au serveur
Nous nous connectons au serveur S3, grâce à la librairie boto3. Le nom de notre région est eu-west-3.

In [5]:
s3 = boto3.resource(
    service_name='s3',
    region_name='eu-west-3',
    aws_access_key_id=acces_key,
    aws_secret_access_key=secret_acces_key
)

# Check des fichiers sur notre bucket S3

In [6]:
connexion = boto3.client('s3')
contents = connexion.list_objects(Bucket='saadzizi-p8')['Contents']
for file in contents:
    print(file['Key'])

Code_Spark/
Code_Spark/P8_V1.py
Code_Spark/P8_V2.py
Code_Spark/bootstrap-emr.sh
Code_Spark/bootstrap_emr_vf.sh
data/
data/Fruits_sample/apple_red_yellow_1/r0_123.jpg
data/Fruits_sample/apple_red_yellow_1/r0_163.jpg
data/Fruits_sample/apple_red_yellow_1/r0_3.jpg
data/Fruits_sample/apple_red_yellow_1/r0_83.jpg
data/Fruits_sample/apple_red_yellow_1/r1_255.jpg
data/Fruits_sample/apple_rotten_1/r0_199.jpg
data/Fruits_sample/apple_rotten_1/r0_3.jpg
data/Fruits_sample/apple_rotten_1/r0_39.jpg
data/Fruits_sample/apple_rotten_1/r1_315.jpg
data/Fruits_sample/apple_rotten_1/r1_319.jpg
data/Fruits_sample/carrot_1/r0_199.jpg
data/Fruits_sample/carrot_1/r0_3.jpg
data/Fruits_sample/carrot_1/r0_43.jpg
data/Fruits_sample/cucumber_1/r0_199.jpg
data/Fruits_sample/cucumber_1/r0_3.jpg
data/Fruits_sample/cucumber_1/r0_39.jpg
data/Fruits_sample/pear_1/r0_39.jpg
data/Fruits_sample/pear_1/r1_115.jpg
data/Fruits_sample/pear_1/r1_195.jpg
data/Fruits_sample/pear_1/r1_235.jpg
data/Fruits_sample/pear_1/r1_35.jpg
da

# SparkSession Creation - Configuration - Spark UI
Spark est un moyen de traiter les mégadonnées en parallèle.

In [7]:
spark = (SparkSession
            .builder.master('local[*]')
            .appName('P8')
            .config('spark.hadoop.fs.s3a.access.key', acces_key)
            .config('spark.hadoop.fs.s3a.secret.key', secret_acces_key) 
            .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') 
            .getOrCreate()
            )

In [8]:
spark

spark.sql.repl.eagerEval.enabled : active la configuration de l’évualuation rapide de Pyspark Dataframe dans le portable Jupyter.
spark.sql.repl.eagerEval.maxNumRows : Nombre de lignes à afficher.
spark.sql.execution.arrow.pyspark.enabled : Arrow est disponible comme optimisation lors de la conversion d’un Spark DataFrame en Pandas DataFrame en utilisant l’appel toPandas() et lors de la création d’un Spark DataFrame à partir d’un Pandas DataFrame avec createDataFrame(pandas_df). Pour utiliser Arrow lors de l’exécution de ces appels, les utilisateurs doivent d’abord définir la configuration Spark "spark.sql.execution.arrow.enabled" à "true". Ceci est désactivé par défaut.

In [9]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 5)
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

# Test de connexion à S3 sur une image

Interface WEB

Interface utilisateur Web : le port par défaut est 4040 ; afficher les informations utiles sur les applications.
Une étape de liste et le travail du planificateur
Un résumé des tailles RDD et de l’utilisation de la mémoire
Env informations
Renseignements sur le liquidateur

sc = spark.sparkContext
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-west-1.amazonaws.com")


# Chemin d'accès aux images

In [10]:
path_s3 = 's3a://saadzizi-p8/data/Fruits_sample/**'
prod_s3 = 's3a://saadzizi-p8/resultats/'

# Configuration de la session Spark
L'objectif de cette configuration est de travailler avec notre Bucket AWS S3. Cette étape a nécessité une installation de différents packages, et l'objectif principal a été d'assurer et maintenir une bonne compatibilité de versions entre tous les logiciels installés : Pyspark, Java, Aws, Hadoop, sdk, bundle. Cette étape s'est avérée nécessaire pour la suite de notre analyse.

In [11]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-bundle-1.11.1026,org.apache.hadoop:hadoop-aws:3.2.0 pyspark-shell'

# Chargement de la data
Nous choisissons le format Binaryfile pour cela 

In [12]:
data = spark.read.format("binaryFile").load(path_s3)

# Analyse de nos données

In [13]:
data.count()

24

In [14]:
data.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



In [15]:
data.show()

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3a://saadzizi-p8...|2022-08-18 09:20:43| 56583|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:43| 55913|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:41| 55092|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:43| 52683|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:42| 45785|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:42| 44958|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:42| 43297|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:42| 41126|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:41| 38230|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:40| 33513|[FF D8 FF E0 00 1...|
|s3a://saadzizi-p8...|2022-08-18 09:20:44| 31831|[FF D8 FF E0 00 1...|
|s3a:/

# Extraction Image Label 

In [16]:
image_extrac = data.withColumn('label', split(col('path'), '/').getItem(5))
image_extrac = image_extrac.select('path', 'content', 'label')
image_extrac.show()

+--------------------+--------------------+------------------+
|                path|             content|             label|
+--------------------+--------------------+------------------+
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|        cucumber_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|        cucumber_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|            pear_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|        cucumber_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|            pear_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|            pear_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|            pear_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|            pear_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|   zucchini_dark_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|   zucchini_dark_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|          carrot_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|   zucchini_dark_1|
|s3a://saadzizi-p8...|[FF D8 FF E0 00 1...|          ca

# Définition du modèle

Nous utilisons ici une approche de featurization de nos images par transfer learning.

Cela consiste à extraire les features les plus pertinentes pour la classification de nos iamges en utilisant un modèle de deep learning pré-entraîné sur de la classification d'image, auquel on enlève la dernière couche - celle qui classifie - afin d'obtenir en sortie un tenseur des features les plus significatives de nos images.

Ce tenseur pourra ensuite être fourni à un autre modèle de classification, qui sera ainsi adapté à nos classes.

Nous utilisons ici un modèle ResNet50 pré-entraîné pour de la classification d'images, auquel nous retirons la dernière couche.

ResNet est caractérisé par l'emploi de connexions saute-couches. Un réseau de neurones résiduels (ResNet) est un réseau de neurones artificiels (artificial neuronal network) d'un type qui s'appuie sur des constructions connues à partir de cellules pyramidales du cortex cérébral.

In [17]:
model = ResNet50(include_top=False)
model.summary()

Model: "resnet50"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, None, None,  0           []                               
                                 3)]                                                              
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, None, None,   0           ['input_1[0][0]']                
                                3)                                                                
                                                                                                  
 conv1_conv (Conv2D)            (None, None, None,   9472        ['conv1_pad[0][0]']              
                                64)                                                        

# Diffusion du poids du modèle

In [18]:
bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

# Définition de la fontion "modele reset 50"
Cette fonction est censée retourner un modèle ResNet50 avec couche supérieure enlevée et les poids pré-dessinés diffusés.

In [19]:
def model_fn():
  model = ResNet50(weights=None, include_top=False)
  model.set_weights(bc_model_weights.value)
  return model

On définit une fonction preprocess pour pré-traite les octets d’images brutes pour la prédiction, et une fonction pour featuriser les séries - L'objectif étant de featurizer une série d’images brutes utilisant le modèle d’entrée.
La fonction va retourner une série de caractéristiques d'image avec pour taille 224x224.

On définit ensuite notre fonction pour effectuer la featurization, en se basant sur le modèle choisi.

In [20]:
def preprocess(content):
  img = Image.open(io.BytesIO(content)).resize([224, 224])
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  output = [p.flatten() for p in preds]
  return pd.Series(output)

Nous appliquons ensuite une méthode Scalar Iterator pandas UDF enveloppant notre fonction de featurization. Cette fonction renvoie une colonne Spark DataFrame de type ArrayType(FloatType).
  
paramètre content_series_iter : Cet argument est un itérateur sur des lots de données, où chaque lot est une série pandas de données d’image. Avec les FDU Scalar Iterator pandas, nous pouvons charger le modèle une fois et le réutiliser pour plusieurs lots de données. Cela amortit les frais généraux liés au chargement de grands modèles.

In [21]:
from typing import Iterator
@pandas_udf('array<float>')
def featurize_udf(content_series_iter:Iterator[pd.Series]) -> Iterator[pd.Series]:
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

# Featurization

In [22]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 1024)

Nous pouvons maintenant exécuter la fonctionnalité sur l’ensemble de notre DataFrame Spark.
Cela peut prendre beaucoup de temps (si nous prenons toutes les images du dataset), car cela applique un grand modèle à l’ensemble de données complet. Nous avons pris 24 images pour tester cela sur le cloud.

In [23]:
df_features = image_extrac.select(col("path"), col("label"), featurize_udf("content").alias("features"))

In [24]:
df_features.show()

+--------------------+------------------+--------------------+
|                path|             label|            features|
+--------------------+------------------+--------------------+
|s3a://saadzizi-p8...|        cucumber_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|        cucumber_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|        cucumber_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|   zucchini_dark_1|[0.8232221, 0.0, ...|
|s3a://saadzizi-p8...|   zucchini_dark_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|          carrot_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|   zucchini_dark_1|[0.0, 0.0, 0.0, 0...|
|s3a://saadzizi-p8...|          carrot_1|[0.0, 0.0, 0.0

# PCA - Dimension reduction

In [25]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
import seaborn as sns
import matplotlib.pyplot as plt

On établit une dimension de réduction et pour cela, nous créons une fonction avec pour objectif d'appliquer le PCA à toutes les images pour réduire le nombre de fonctionnalités du modèle

Paramètres :
df(pyspark dataFrame) : dataframe avec toutes les informations de l’image, col_name : la colonne de base de données qui permet d’appliquer le PCA, n_components(int) : nombre de dimensions à conserver

In [26]:
def pca_transformation(df, col_name:str, n_components:int=10, variance_plot:bool=False):

    # Les données d’image sont converties en format vectoriel dense
    to_vector_udf = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('X_vectors', to_vector_udf(col_name))

    # On effectue le Fit de la classe PCA
    pca = PCA(k=n_components, inputCol='X_vectors', outputCol='X_vectors_pca')
    model_pca = pca.fit(df)

    # Transformation de la Feature
    df = model_pca.transform(df)

    if variance_plot == True:
        # Affichage de la variance expliquée du modèle 
        var = model_pca.explainedVariance.cumsum()
        plt.figure(figsize=(15, 10))
        sns.set_context(context='poster', font_scale=0.8)
        sns.lineplot(x=[i for i in range(n_components + 1)], y=np.insert(var,0,0)*100, color='deepskyblue')
        plt.xlabel('PCs')
        plt.ylabel('Variance (%)')
        plt.ylim(0,100)
        plt.xlim(left=0)
        plt.show()      

    return df

In [27]:
df_final = pca_transformation(df=df_features, col_name='features', n_components=10)

In [28]:
df_final.show()

+--------------------+------------------+--------------------+--------------------+--------------------+
|                path|             label|            features|           X_vectors|       X_vectors_pca|
+--------------------+------------------+--------------------+--------------------+--------------------+
|s3a://saadzizi-p8...|        cucumber_1|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-69.283115404706...|
|s3a://saadzizi-p8...|        cucumber_1|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-62.068997467950...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[47.3427662422467...|
|s3a://saadzizi-p8...|        cucumber_1|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-44.184379980105...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[96.2504710153588...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[66.3958970246775...|
|s3a://saadzizi-p8...|            pear_1|[0.0, 0.0, 0.0

# Export des résultats au bucket S3

In [29]:
df_final = df_final.select('path', 'label', 'X_vectors_pca')

In [30]:
df_final_pandas = df_final.toPandas()
df_final_pandas.to_csv(prod_s3 + 'results_S3_cloud.csv', index=False)

  Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
