# Partie cloud EMR

Ce notebook reprend à partir de la section 4.10 (exécution sur EMR).

### 4.10.1 Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [None]:
%%info

### 4.10.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 4.10.3 Import des librairies

In [None]:
# Pour lire les images qui auront été chargées au format binaire :
import io
# Pour la gestion des chemins de fichiers
import os

# Pour valider les résultats et les exporter en CSV
import pandas as pd
# Pour redimensionner les images
from PIL import Image
# Pour la manipulation d'arrays
import numpy as np

# Désactivation des messages de debugging de tensorflow.
# Doit être exécuté avant les imports de tensorflow.
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# Pour l'extraction de features des images
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# Création et manipulation de dataframes Spark
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

# Pour convertion des features en vecteurs Spark
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
# Pour réalisation de la PCA sur les features
from pyspark.ml.feature import PCA

### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [None]:
# Root bucket (project-level)
S3_BUCKET = "s3://p11-fruits-710002907257-eu-west-3"

# Dataset root
PATH = f"{S3_BUCKET}/data/fruits-360"

# Input data
PATH_Data = f"{PATH}/Test"        # ou /Training selon l'étape

# Outputs
PATH_Result = f"{S3_BUCKET}/results"

print(
    "PATH:                 " + PATH +
    "\nPATH_Data:            " + PATH_Data +
    "\nPATH_Result:          " + PATH_Result
)


### 4.10.5 Traitement des données

#### 4.10.5.1 Chargement des données

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

Aperçu rapide des premières lignes pour vérifier que le chargement des images a bien fonctionné.


In [None]:
images.show(5)

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

#### 4.10.5.2 Préparation du modèle

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Construction du nouveau modèle qui sort les features depuis MobileNetV2.


In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Diffusion (broadcast) des poids du modèle aux exécutants Spark pour éviter des envois répétés.


In [None]:
broadcast_weights = sc.broadcast(new_model.get_weights())

Résumé du modèle pour valider l’architecture et les dimensions de sortie.


In [None]:
new_model.summary()

Fonction utilitaire qui reconstitue le modèle sur les workers à partir des poids diffusés.


In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights=None,
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

#### 4.10.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).convert('RGB').resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series, batch_size=32):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    batch = np.stack(content_series.map(preprocess))
    preds = model(batch, training=False).numpy()
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten().astype(np.float32).tolist() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)


#### 4.10.5.4 Exécutions des actions d'extractions de features

In [None]:
 spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Extraction des features via la Pandas UDF et mise en forme du DataFrame résultant.


In [None]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

Vérification du schéma du DataFrame de features.


In [None]:
features_df.printSchema()

### 4.10.6 PCA (MLlib) après extraction de features

Même logique que la section locale : nous passons d'un vecteur 1280D 
à un espace plus compact (ex. `PCA_K=50`). Cela prépare la suite du projet (classification, indexation).

> **Paramétrage mémoire** : `PCA_SAMPLE_FRACTION` permet d'entraîner la PCA sur
> un sous-échantillon pour limiter la consommation mémoire.

---


In [None]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.withColumn('features', list_to_vector_udf('features'))

n_componants = 28

pca = PCA(k=n_componants, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features_df)
features_df = model.transform(features_df).select("path", "label", "features", "pcaFeatures")



Affichage du pourcentage de variance expliqué par la PCA.


In [None]:
print(f"{n_componants} composantes captent {sum(model.explainedVariance)*100:.2f} % de la variance.")

Vérification du chemin de sortie avant l’écriture.


In [None]:
print(PATH_Result)

Sauvegarde des features (et composantes PCA) au format Parquet.


In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

### 4.10.6 Chargement des données enregistrées et validation du résultat

In [None]:
spark.read.parquet(PATH_Result).show(5, truncate=False)

# Optionnel: conversion en pandas sur un petit échantillon uniquement
sample_pdf = spark.read.parquet(PATH_Result).limit(10).toPandas()
sample_pdf.head()


Chargement local (Pandas) du Parquet pour contrôles rapides.


In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')
# Séparer les éléments de 'path' et ne garder que le dernier (nom du fichier)
df['path'] = df['path'].apply(lambda x: x.split('/')[-1])
# Renommer la colonne 'path' en 'filename'
df = df.rename(columns={'path': 'filename'})

Inspection d’un exemple de vecteur de features.


In [None]:
df['features'][0]

Extraction des valeurs numériques depuis les structures Spark.


In [None]:
df['features'] = df['features'].apply(lambda x: x['values'] if x is not None else None)
df['pcaFeatures'] = df['pcaFeatures'].apply(lambda x: x['values'] if x is not None else None)

Contrôle de la dimension du vecteur de features.


In [None]:
df.loc[0,'features'].shape

Contrôle de la dimension après PCA.


In [None]:
df.loc[0,'pcaFeatures'].shape

Aperçu des premières lignes du DataFrame Pandas.


In [None]:

df.head()

Ajustement de l’affichage NumPy pour éviter les retours à la ligne dans les vecteurs.


In [None]:

# np.printoptions pour éviter l'insertion de "\n" dans 'pcaFeatures' dans notre fichier csv
with np.printoptions(linewidth=10000):
    df[['filename', 'label', 'pcaFeatures']].to_csv(PATH_Result+'/'+'pcaFeatures.csv', index=False, sep='\t')

<u>On peut également constater la présence des fichiers <br />
    au format "**parquet**" sur le **serveur S3**</u> :

![Affichage des résultats sur S3](data/img/S3_Results.png)

## 4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark

Il est possible de voir l'avancement des tâches en cours <br />
avec le **serveur d'historique Spark**.

![Accès au serveur d'historique spark](data/img/EMR_serveur_historique_spark_acces.png)

**Il est également possible de revenir et d'étudier les tâches <br />
qui ont été réalisé, afin de debugger, optimiser les futurs <br />
tâches à réaliser.**

<u>Lorsque la commande "**features_df.write.mode("overwrite").parquet(PATH_Result)**" <br />
était en cours, nous pouvions observer son état d'avancement</u> :

![Progression execution script](data/img/EMR_jupyterhub_avancement.png)

<u>Le **serveur d'historique Spark** nous permet une vision beaucoup plus précise <br />
de l'exécution des différentes tâche sur les différentes machines du cluster</u> :

![Suivi des tâches spark](data/img/EMR_SHSpark_01.png)

On peut également constater que notre cluster de calcul a mis <br />
un tout petit peu **moins de 8 minutes** pour traiter les **22 688 images**.

![Temps de traitement](data/img/EMR_SHSpark_02.png)


## 4.12 Résiliation de l'instance EMR

Notre travail est maintenant terminé. <br />
Le cluster de machines EMR est **facturé à la demande**, <br />
et nous continuons d'être facturé même lorsque <br />
les machines sont au repos.<br />
Pour **optimiser la facturation**, il nous faut <br />
maintenant **résilier le cluster**.

<u>Je réalise cette commande depuis l'interface AWS</u> :

1. Commencez par **désactiver le tunnel ssh dans FoxyProxy** pour éviter des problèmes de **timeout**.
![Désactivation de FoxyProxy](data/img/EMR_foxyproxy_desactivation.png)
2. Cliquez sur "**Résilier**"
![Cliquez sur Résilier](data/img/EMR_resiliation_01.png)
3. Confirmez la résiliation
![Confirmez la résiliation](data/img/EMR_resiliation_02.png)
4. La résiliation prend environ **1 minute**
![Résiliation en cours](data/img/EMR_resiliation_03.png)
5. La résiliation est effectuée
![Résiliation terminée](data/img/EMR_resiliation_04.png)

**Checklist coût / cluster éphémère** :
- Créer le cluster uniquement pour les tests/démos.
- Vérifier que les jobs sont terminés (Spark UI).
- Sauvegarder les logs nécessaires dans S3.
- Résilier immédiatement le cluster après exécution.

## 4.13 Cloner le serveur EMR (si besoin)

Si nous devons de nouveau exécuter notre notebook dans les mêmes conditions, <br />
il nous suffit de **cloner notre cluster** et ainsi en obtenir une copie fonctionnelle <br />
sous 15/20 minutes, le temps de son instanciation.

<u>Pour cela deux solutions</u> :
1. <u>Depuis l'interface AWS</u> :
 1. Cliquez sur "**Cloner**"
   ![Cloner un cluster](data/img/EMR_cloner_01.png)
 2. Dans notre cas nous ne souhaitons pas inclure d'étapes
   ![Ne pas inclure d'étapes](data/img/EMR_cloner_02.png)
 3. La configuration du cluster est recréée à l’identique. <br />
    On peut revenir sur les différentes étapes si on souhaite apporter des modifications<br />
    Quand tout est prêt, cliquez sur "**Créer un cluster**"
  ![Vérification/Modification/Créer un cluster](data/img/EMR_cloner_03.png)
2. <u>En ligne de commande</u> (avec AWS CLI d'installé et de configuré et en s'assurant <br />
   de s'attribuer les droits nécessaires sur le compte AMI utilisé)
 1. Cliquez sur "**Exporter AWS CLI**"
 ![Exporter AWS CLI](data/img/EMR_cloner_cli_01.png)
 2. Copier/Coller la commande **depuis un terminal**
 ![Copier Coller Commande](data/img/EMR_cloner_cli_02.png)

## 4.14 Arborescence du serveur S3 à la fin du projet

<u>Pour information, voici **l'arborescence complète de mon bucket S3 p11-data** à la fin du projet</u> : <br />
*Par soucis de lisibilité, je ne liste pas les 131 sous dossiers du répertoire "Test"*

1. Results/_SUCCESS
1. Results/part-00000-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00001-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00002-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00003-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00004-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00005-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00006-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00007-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00008-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00009-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00010-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00011-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00012-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00013-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00014-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00015-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00016-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00017-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00018-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00019-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00020-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00021-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00022-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00023-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Test/
1. bootstrap-emr.sh
1. jupyter-s3-conf.json
1. jupyter/jovyan/.s3keep
1. jupyter/jovyan/P11_01_Notebook.ipynb
1. jupyter/jovyan/_metadata
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/file-perm.sqlite
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/html/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/latex/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbsignatures.db
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/notebook_secret
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled1-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/test3-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled1.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/test3.ipynb


# 5. Conclusion

Nous avons réalisé ce projet **en deux temps** en tenant <br />
compte des contraintes qui nous ont été imposées.

Nous avons **dans un premier temps développé notre solution en local** <br />
sur une machine virtuelle dans un environnement Linux Ubuntu.

La <u>première phase</u> a consisté à **installer l'environnement de travail Spark**. <br />
**Spark** a un paramètre qui nous permet de travaillé en local et nous permet <br />
ainsi de **simuler du calcul partagé** en considérant <br />
**chaque cœur d'un processeur comme un worker indépendant**.<br />
Nous avons travaillé sur un plus **petit jeu de donnée**, l'idée était <br />
simplement de **valider le bon fonctionnement de la solution**.

Nous avons fait le choix de réaliser du **transfert learning** <br />
à partir du model **MobileNetV2**.<br />
Ce modèle a été retenu pour sa **légèreté** et sa **rapidité d'exécution** <br />
ainsi que pour la **faible dimension de son vecteur en sortie**.

Les résultats ont été enregistrés sur disque en plusieurs <br />
partitions au format "**parquet**".

<u>**La solution a parfaitement fonctionné en mode local**</u>.

La <u>deuxième phase</u> a consisté à créer un **réel cluster de calculs**. <br />
L'objectif était de pouvoir **anticiper une future augmentation de la charge de travail**.

Le meilleur choix retenu a été l'utilisation du prestataire de services **Amazon Web Services** <br />
qui nous permet de **louer à la demande de la puissance de calculs**, <br />
pour un **coût tout à fait acceptable**.<br />
Ce service se nomme **EC2** et se classe parmi les offres **Infrastructure As A Service** (IAAS).

Nous sommes allez plus loin en utilisant un service de plus <br />
haut niveau (**Plateforme As A Service** PAAS)<br />
en utilisant le service **EMR** qui nous permet d'un seul coup <br />
d'**instancier plusieurs serveur (un cluster)** sur lesquels <br />
nous avons pu demander l'installation et la configuration de plusieurs<br />
programmes et librairies nécessaires à notre projet comme **Spark**, <br />
**Hadoop**, **JupyterHub** ainsi que la librairie **TensorFlow**.

En plus d'être plus **rapide et efficace à mettre en place**, nous avons <br />
la **certitude du bon fonctionnement de la solution**, celle-ci ayant été <br />
préalablement validé par les ingénieurs d'Amazon.

Nous avons également pu installer, sans difficulté, **les packages <br />
nécessaires sur l'ensembles des machines du cluster**.

Enfin, avec très peu de modification, et plus simplement encore, <br />
nous avons pu **exécuter notre notebook comme nous l'avions fait localement**.<br />
Nous avons cette fois-ci exécuté le traitement sur **l'ensemble des images de notre dossier "Test"**.

Nous avons opté pour le service **Amazon S3** pour **stocker les données de notre projet**. <br />
S3 offre, pour un faible coût, toutes les conditions dont nous avons besoin pour stocker <br />
et exploiter de manière efficace nos données.<br />
L'espace alloué est potentiellement **illimité**, mais les coûts seront fonction de l'espace utilisé.

Il nous sera **facile de faire face à une monté de la charge de travail** en **redimensionnant** <br />
simplement notre cluster de machines (horizontalement et/ou verticalement au besoin), <br />
les coûts augmenteront en conséquence mais resteront nettement inférieurs aux coûts engendrés <br />
par l'achat de matériels ou par la location de serveurs dédiés.