# D√©ployez un mod√®le dans le cloud


# Sommaire :


&emsp;4.10 Ex√©cution du code<br />
&emsp;&emsp;4.10.1 D√©marrage de la session Spark<br />
&emsp;&emsp;4.10.2 Installation des packages<br />
&emsp;&emsp;4.10.3 Import des librairies<br />
&emsp;&emsp;4.10.4 D√©finition des PATH pour charger les images et enregistrer les r√©sultats<br />
&emsp;&emsp;4.10.5 Traitement des donn√©es<br />
&emsp;&emsp;&emsp;4.10.5.1 Chargement des donn√©es<br />
&emsp;&emsp;&emsp;4.10.5.2 Pr√©paration du mod√®le<br />
&emsp;&emsp;&emsp;4.10.5.3 D√©finition du processus de chargement des images<br />
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;et application de leur featurisation √† travers l'utilisation de pandas UDF<br />
&emsp;&emsp;&emsp;4.10.5.4 Ex√©cutions des actions d'extractions de features<br />
&emsp;&emsp;4.10.6 Chargement des donn√©es enregistr√©es et validation du r√©sultat<br />
&emsp;4.11 Suivi de l'avancement des t√¢ches avec le Serveur d'Historique Spark<br />
&emsp;4.12 R√©siliation de l'instance EMR<br />
&emsp;4.13 Cloner le serveur EMR (si besoin)<br />
&emsp;4.14 Arborescence du serveur S3 √† la fin du projet<br />
**5. Conclusion**

# Pr√©ambule

## Probl√©matique

La tr√®s jeune start-up de l'AgriTech, nomm√©e "**Fruits**!", <br />
cherche √† proposer des solutions innovantes pour la r√©colte des fruits.

La volont√© de l‚Äôentreprise est de pr√©server la biodiversit√© des fruits <br />
en permettant des traitements sp√©cifiques pour chaque esp√®ce de fruits <br />
en d√©veloppant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire conna√Ætre en mettant <br />
√† disposition du grand public une application mobile qui permettrait aux <br />
utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public <br /> 
√† la biodiversit√© des fruits et de mettre en place une premi√®re version du moteur <br />
de classification des images de fruits.

De plus, le d√©veloppement de l‚Äôapplication mobile permettra de construire <br />
une premi√®re version de l'architecture **Big Data** n√©cessaire.

## Objectifs dans ce projet

1. D√©velopper une premi√®re cha√Æne de traitement des donn√©es qui <br />
   comprendra le **preprocessing** et une √©tape de **r√©duction de dimension**.
2. Tenir compte du fait que <u>le volume de donn√©es va augmenter <br />
   tr√®s rapidement</u> apr√®s la livraison de ce projet, ce qui implique de:
 - D√©ployer le traitement des donn√©es dans un environnement **Big Data**
 - D√©velopper les scripts en **pyspark** pour effectuer du **calcul distribu√©**

## Pipeline g√©n√©ral du notebook
- Chargement des images depuis Amazon S3 dans un DataFrame Spark.
- Extraction des caract√©ristiques des images avec MobileNetV2 (sans la derni√®re couche).
- Transformation des caract√©ristiques en vecteurs normalis√©s.
- Application de la PCA pour r√©duire la dimensionnalit√©.
- Sauvegarde et chargement des r√©sultats pour analyse.


## Pourquoi y a-t-il deux DataFrames ?
Il y a deux DataFrames principaux utilis√©s dans le notebook :

- features_df (avant PCA) :

Contient les caract√©ristiques extraites des images apr√®s passage dans MobileNetV2.
Chaque image est repr√©sent√©e par un vecteur de caract√©ristiques (de grande dimension).
Ce DataFrame est sauvegard√© en Parquet avant d'appliquer la PCA.

- df_pca (apr√®s PCA) :

Contient la version r√©duite des caract√©ristiques apr√®s r√©duction de dimension avec PCA.
La dimension des vecteurs est r√©duite pour capturer 95 % de la variance des donn√©es.
Ce DataFrame est sauvegard√© dans un autre fichier Parquet apr√®s PCA.
üëâ Pourquoi deux DataFrames ?

- Avant PCA, les vecteurs de caract√©ristiques sont de haute dimension (probablement 1280 valeurs ou plus).
- Apr√®s PCA, les vecteurs sont compress√©s en un espace de plus petite dimension tout en conservant l'information pertinente.
- La PCA est une √©tape distincte, et on souhaite conserver √† la fois les caract√©ristiques originales et r√©duites pour analyse et comparaison.
- A noter, le DataFrame df est utilis√© pour charger les features extraites avant la PCA, mais sous Pandas et non PySpark. √âviter des requ√™tes lourdes √† PySpark et charger les features en local sur le notebook. Permettre une analyse rapide et interactive sans ex√©cuter de jobs Spark. Faciliter la visualisation et le traitement statistique avec Pandas. Sur un cluster EMR, cela optimise les performances et la flexibilit√© en combinant Spark et Pandas

### 4.10.1 D√©marrage de la session Spark

In [1]:
# L'ex√©cution de cette cellule d√©marre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1626050279029_0001,pyspark,idle,Link,Link,‚úî


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1626050279029_0001,pyspark,idle,Link,Link,‚úî


### 4.10.2 Installation des packages

Les packages n√©cessaires ont √©t√© install√© via l'√©tape de **bootstrap** √† l'instanciation du serveur.

### 4.10.3 Import des librairies

In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

### 4.10.4 D√©finition des PATH pour charger les images et enregistrer les r√©sultats

Nous acc√©dons directement √† nos **donn√©es sur S3** comme si elles √©taient **stock√©es localement**.

In [4]:
PATH = 's3://p8-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

PATH:        s3://p8-data
PATH_Data:   s3://p8-data/Test
PATH_Result: s3://p8-data/Results

### 4.10.5 Traitement des donn√©es

#### 4.10.5.1 Chargement des donn√©es

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data/Test...|2021-07-03 09:00:08|  7353|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7350|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7349|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7348|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:09|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------+----------+
|path                                      |label     |
+------------------------------------------+----------+
|s3://p8-data/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_95_100.jpg |Watermelon|
+------------------------------------------+----------+
only showing top 5 rows

None

#### 4.10.5.2 Pr√©paration du mod√®le

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

#### 4.10.5.3 D√©finition du processus de chargement des images <br/> et application de leur featurisation √† travers l'utilisation de pandas UDF et r√©duction de dimension PCA 

- On entra√Æne PCA sur les caract√©ristiques standardis√©es (features_scaled).
- On analyse la variance expliqu√©e cumul√©e.
- On choisit le nombre minimal de composantes qui expliquent 95 % de la variance.
- Pourquoi cette √©tape ?

Plut√¥t que de fixer arbitrairement le nombre de dimensions, on le d√©termine en fonction des donn√©es.

In [None]:
def nb_composante(dataframe, nb_comp=160):
    '''
    Recherche du nombre de composantes expliquant 95% de la variance
    :param dataframe: Le dataframe d'images
    :param nb_comp: Le nombre de composantes initiales pour l'ACP
    :return: Le nombre de composantes expliquant 95% de la variance totale
    '''

    # Initialisation de l'analyse en composantes principales (ACP)
    pca = PCA(k=nb_comp, inputCol="features_scaled", outputCol="features_pca")

    # Entra√Ænement du mod√®le PCA
    model_pca = pca.fit(dataframe)

    # Calcul de la variance expliqu√©e
    varexpl = model_pca.explainedVariance * 100



    # Calcul de la variance cumul√©e
    cumSumVar = varexpl.cumsum()


    # Recherche du nombre de composantes pour atteindre 95% de variance expliqu√©e
    limit = 95
    min_plans = np.argmax(cumSumVar >= limit) + 1
    # Retour du nombre de composantes n√©cessaires pour expliquer 95% de la variance
    return min_plans


In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶



In [None]:
from pyspark.ml.feature import PCA
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler

### Fonction de Pr√©traitement des Donn√©es pour PCA

Cette fonction pr√©pare les donn√©es pour une Analyse en Composantes Principales (PCA) en effectuant :
- La transformation des images en **vecteurs denses**.
- La **standardisation** des donn√©es (centrage et r√©duction). PCA est sensible aux √©chelles des variables, il faut donc s'assurer que chaque dimension a une variance comparable.

**Param√®tres :**
- `dataframe` : DataFrame PySpark contenant les images sous forme de features.

**Retourne :**
- Un DataFrame avec une colonne `features_scaled` pr√™te pour PCA.


In [None]:
def preprocess_pca(dataframe):
  '''
     Pr√©paration des donn√©es :
     - transformation en vecteur dense
     - standardisation
     param : dataframe : dataframe d'images
     return : dataframe avec features vecteur dense standardis√©
  '''

  # Pr√©paration des donn√©es - conversion des donn√©es images en vecteur dense
  transform_vecteur_dense = udf(lambda r: Vectors.dense(r), VectorUDT())
  dataframe = dataframe.withColumn('features_vectors', transform_vecteur_dense('features'))

  # Standardisation obligatoire pour PCA
  scaler_std = StandardScaler(inputCol="features_vectors", outputCol="features_scaled", withStd=True, withMean=True)
  model_std = scaler_std.fit(dataframe)
  # Mise √† l'√©chelle
  dataframe = model_std.transform(dataframe)

  return dataframe

#### 4.10.5.4 Ex√©cutions des actions d'extractions de features

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

### Chargement et Transformation des Images en Features

Cette √©tape effectue :
- **Repartitionnement** des donn√©es en 24 partitions pour une meilleure distribution du calcul.
- **S√©lection des colonnes** importantes (`path`, `label` et `features`).
- **Extraction des features** √† partir du contenu des images √† l‚Äôaide de la fonction `featurize_udf`.

**Colonnes du DataFrame r√©sultant :**
- `path` : Chemin de l'image.
- `label` : Label de l'image (classe associ√©e).
- `features` : Vecteur de caract√©ristiques extrait √† partir du contenu de l‚Äôimage.


In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

s3://p8-data/Results

In [17]:
# Sauvegarde des features extraites dans un fichier parquet
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [None]:
# Pr√©-processing (vecteur dense, standardisation)
df_pca = preprocess_pca(features_df)

In [None]:
# Nombre de composante expliquant 95% de la variance
n_components = nb_composante(df_pca)

In [None]:
# Entrainement de l'algorithme
pca = PCA(k=n_components, inputCol='features_scaled', outputCol='features_pca')
model_pca = pca.fit(df_pca)

# Transformation des images sur les k premi√®res composantes
df_reduit = model_pca.transform(df_pca)

### Sauvegarde des R√©sultats du PCA au Format Parquet

Cette √©tape enregistre le DataFrame r√©sultant apr√®s l'Analyse en Composantes Principales (PCA) dans un fichier **Parquet** pour une utilisation ult√©rieure.

**D√©tails :**
- **Mode `overwrite`** : Remplace les fichiers existants si n√©cessaire.
- **Emplacement** : Les r√©sultats sont stock√©s dans `PATH_Result + "/pca_results"`.

Le format **Parquet** est choisi pour ses avantages en termes de **compression**, **stockage efficace** et **compatibilit√© avec Spark**.


In [None]:
# Sauvegarde des r√©sultats apr√®s PCA dans un fichier parquet
df_reduit.write.mode("overwrite").parquet(PATH_Result + "/pca_results")

In [None]:
# Print the path to the saved results
print(PATH_Result + "/pca_results")

### 4.10.6 Chargement des donn√©es enregistr√©es et validation du r√©sultat

In [18]:
# Chargement des donn√©es Parquet dans un DataFrame Pandas avec PyArrow
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [None]:
# Chargement des r√©sultats du PCA depuis le fichier Parquet
df_pca=spark.read.parquet(PATH_Result + "/pca_results")

In [19]:
# Affichage des 5 premi√®res lignes du DataFrame df
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

                                           path  ...                                           features
0    s3://p8-data/Test/Watermelon/r_174_100.jpg  ...  [0.0059991637, 0.44703647, 0.0, 0.0, 3.3713572...
1  s3://p8-data/Test/Pineapple Mini/128_100.jpg  ...  [0.0146466885, 4.080593, 0.055877004, 0.0, 0.0...
2  s3://p8-data/Test/Pineapple Mini/137_100.jpg  ...  [0.0, 4.9659867, 0.0, 0.0, 0.0, 0.0, 0.5144821...
3      s3://p8-data/Test/Watermelon/275_100.jpg  ...  [0.22511952, 0.07235509, 0.0, 0.0, 1.690149, 0...
4      s3://p8-data/Test/Watermelon/271_100.jpg  ...  [0.3286234, 0.18830013, 0.0, 0.0, 1.9123534, 0...

[5 rows x 3 columns]

In [None]:
df_pca

In [None]:
# Affichage du sch√©ma du DataFrame df_pca
print(df_pca.printSchema())

In [20]:
# loc est utilis√© pour acc√©der √† un groupe de lignes et de colonnes par √©tiquettes. Ici, on affiche la premi√®re ligne
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

(1280,)

### Objectif : S√©lection et V√©rification des R√©sultats du PCA

Cette √©tape permet de manipuler et d‚Äôanalyser les r√©sultats de la r√©duction dimensionnelle effectu√©e par l‚ÄôAnalyse en Composantes Principales (PCA).

#### S√©lection et Renommage de la Colonne PCA
- On extrait uniquement la colonne contenant les features apr√®s PCA (`features_pca`).
- On la renomme en **`features`** pour faciliter son utilisation dans les √©tapes suivantes.

#### Extraction de la Premi√®re Ligne du DataFrame
- On r√©cup√®re une seule ligne du DataFrame (`first_row`), ce qui permet d‚Äôacc√©der directement aux valeurs.

#### V√©rification de la Dimension des Donn√©es
- On calcule la **dimension** de la colonne `features`, correspondant au **nombre de composantes principales retenues** apr√®s PCA.
- L'affichage de cette dimension permet de confirmer la r√©duction de la taille des donn√©es.

**But final** : V√©rifier que le PCA a bien r√©duit la dimensionnalit√© des features tout en conservant un maximum d‚Äôinformation. On r√©cup√®re une ligne pour voir combien de valeurs restent apr√®s PCA. Cela permet de confirmer que la r√©duction de dimension a bien √©t√© effectu√©e.


In [None]:
# S√©lectionner la colonne "pca_features" et la renommer en "features"
df_pca_features = df_pca.select(col("features_pca").alias("features"))

# R√©cup√©rer la premi√®re ligne du DataFrame
first_row = df_pca_features.first()

# Obtenir la dimension de la colonne "features"
dimension = len(first_row["features"])
print("Dimension apr√®s r√©duction PCA:", dimension)

In [None]:
# Affichage de la premi√®re ligne du DataFrame
df.columns

In [None]:
# Affichage du nombre total de lignes dans le DataFrame
df.count()

In [None]:
# Affichage du nombre total de lignes dans le DataFrame apr√®s PCA
df_pca.count()

In [21]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

(22688, 3)

<u>On peut √©galement constater la pr√©sence des fichiers <br />
    au format "**parquet**" sur le **serveur S3**</u> :

![Affichage des r√©sultats sur S3](img/S3_Results.png)

## 4.11 Suivi de l'avancement des t√¢ches avec le Serveur d'Historique Spark

Il est possible de voir l'avancement des t√¢ches en cours <br />
avec le **serveur d'historique Spark**.

![Acc√®s au serveur d'historique spark](img/EMR_serveur_historique_spark_acces.png)

**Il est √©galement possible de revenir et d'√©tudier les t√¢ches <br />
qui ont √©t√© r√©alis√©, afin de debugger, optimiser les futurs <br />
t√¢ches √† r√©aliser.**

<u>Lorsque la commande "**features_df.write.mode("overwrite").parquet(PATH_Result)**" <br />
√©tait en cours, nous pouvions observer son √©tat d'avancement</u> :

![Progression execution script](img/EMR_jupyterhub_avancement.png)

<u>Le **serveur d'historique Spark** nous permet une vision beaucoup plus pr√©cise <br />
de l'ex√©cution des diff√©rentes t√¢che sur les diff√©rentes machines du cluster</u> :

![Suivi des t√¢ches spark](img/EMR_SHSpark_01.png)

On peut √©galement constater que notre cluster de calcul a mis <br />
un tout petit peu **moins de 8 minutes** pour traiter les **22 688 images**.

![Temps de traitement](img/EMR_SHSpark_02.png)


## 4.12 R√©siliation de l'instance EMR

Notre travail est maintenant termin√©. <br />
Le cluster de machines EMR est **factur√© √† la demande**, <br />
et nous continuons d'√™tre factur√© m√™me lorsque <br />
les machines sont au repos.<br />
Pour **optimiser la facturation**, il nous faut <br />
maintenant **r√©silier le cluster**.

<u>Je r√©alise cette commande depuis l'interface AWS</u> :

1. Commencez par **d√©sactiver le tunnel ssh dans FoxyProxy** pour √©viter des probl√®mes de **timeout**.
![D√©sactivation de FoxyProxy](img/EMR_foxyproxy_desactivation.png)
2. Cliquez sur "**R√©silier**"
![Cliquez sur R√©silier](img/EMR_resiliation_01.png)
3. Confirmez la r√©siliation
![Confirmez la r√©siliation](img/EMR_resiliation_02.png)
4. La r√©siliation prend environ **1 minute**
![R√©siliation en cours](img/EMR_resiliation_03.png)
5. La r√©siliation est effectu√©e
![R√©siliation termin√©e](img/EMR_resiliation_04.png)

## 4.13 Cloner le serveur EMR (si besoin)

Si nous devons de nouveau ex√©cuter notre notebook dans les m√™mes conditions, <br />
il nous suffit de **cloner notre cluster** et ainsi en obtenir une copie fonctionnelle <br />
sous 15/20 minutes, le temps de son instanciation.

<u>Pour cela deux solutions</u> :
1. <u>Depuis l'interface AWS</u> :
 1. Cliquez sur "**Cloner**"
   ![Cloner un cluster](img/EMR_cloner_01.png)
 2. Dans notre cas nous ne souhaitons pas inclure d'√©tapes
   ![Ne pas inclure d'√©tapes](img/EMR_cloner_02.png)
 3. La configuration du cluster est recr√©√©e √† l‚Äôidentique. <br />
    On peut revenir sur les diff√©rentes √©tapes si on souhaite apporter des modifications<br />
    Quand tout est pr√™t, cliquez sur "**Cr√©er un cluster**"
  ![V√©rification/Modification/Cr√©er un cluster](img/EMR_cloner_03.png)
2. <u>En ligne de commande</u> (avec AWS CLI d'install√© et de configur√© et en s'assurant <br />
   de s'attribuer les droits n√©cessaires sur le compte AMI utilis√©)
 1. Cliquez sur "**Exporter AWS CLI**"
 ![Exporter AWS CLI](img/EMR_cloner_cli_01.png)
 2. Copier/Coller la commande **depuis un terminal**
 ![Copier Coller Commande](img/EMR_cloner_cli_02.png)

## 4.14 Arborescence du serveur S3 √† la fin du projet

<u>Pour information, voici **l'arborescence compl√®te de mon bucket S3 p8-data** √† la fin du projet</u> : <br />
*Par soucis de lisibilit√©, je ne liste pas les 131 sous dossiers du r√©pertoire "Test"*

1. Results/_SUCCESS
1. Results/part-00000-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00001-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00002-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00003-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00004-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00005-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00006-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00007-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00008-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00009-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00010-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00011-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00012-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00013-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00014-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00015-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00016-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00017-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00018-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00019-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00020-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00021-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00022-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00023-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Test/
1. bootstrap-emr.sh
1. jupyter-s3-conf.json
1. jupyter/jovyan/.s3keep
1. jupyter/jovyan/P8_01_Notebook.ipynb
1. jupyter/jovyan/_metadata
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/file-perm.sqlite
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/html/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/latex/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbsignatures.db
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/notebook_secret
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled1-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/test3-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled1.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/test3.ipynb

# 5. Conclusion

Nous avons r√©alis√© ce projet **en deux temps** en tenant <br />
compte des contraintes qui nous ont √©t√© impos√©es.

Nous avons **dans un premier temps d√©velopp√© notre solution en local** <br />
sur une machine virtuelle dans un environnement Linux Ubuntu.

La <u>premi√®re phase</u> a consist√© √† **installer l'environnement de travail Spark**. <br />
**Spark** a un param√®tre qui nous permet de travaill√© en local et nous permet <br />
ainsi de **simuler du calcul partag√©** en consid√©rant <br />
**chaque c≈ìur d'un processeur comme un worker ind√©pendant**.<br />
Nous avons travaill√© sur un plus **petit jeu de donn√©e**, l'id√©e √©tait <br />
simplement de **valider le bon fonctionnement de la solution**.

Nous avons fait le choix de r√©aliser du **transfert learning** <br />
√† partir du model **MobileNetV2**.<br />
Ce mod√®le a √©t√© retenu pour sa **l√©g√®ret√©** et sa **rapidit√© d'ex√©cution** <br />
ainsi que pour la **faible dimension de son vecteur en sortie**.

Les r√©sultats ont √©t√© enregistr√©s sur disque en plusieurs <br />
partitions au format "**parquet**".

<u>**La solution a parfaitement fonctionn√© en mode local**</u>.

La <u>deuxi√®me phase</u> a consist√© √† cr√©er un **r√©el cluster de calculs**. <br />
L'objectif √©tait de pouvoir **anticiper une future augmentation de la charge de travail**.

Le meilleur choix retenu a √©t√© l'utilisation du prestataire de services **Amazon Web Services** <br />
qui nous permet de **louer √† la demande de la puissance de calculs**, <br />
pour un **co√ªt tout √† fait acceptable**.<br />
Ce service se nomme **EC2** et se classe parmi les offres **Infrastructure As A Service** (IAAS).

Nous sommes allez plus loin en utilisant un service de plus <br />
haut niveau (**Plateforme As A Service** PAAS)<br />
en utilisant le service **EMR** qui nous permet d'un seul coup <br />
d'**instancier plusieurs serveur (un cluster)** sur lesquels <br />
nous avons pu demander l'installation et la configuration de plusieurs<br />
programmes et librairies n√©cessaires √† notre projet comme **Spark**, <br />
**Hadoop**, **JupyterHub** ainsi que la librairie **TensorFlow**.

En plus d'√™tre plus **rapide et efficace √† mettre en place**, nous avons <br />
la **certitude du bon fonctionnement de la solution**, celle-ci ayant √©t√© <br />
pr√©alablement valid√© par les ing√©nieurs d'Amazon.

Nous avons √©galement pu installer, sans difficult√©, **les packages <br />
n√©cessaires sur l'ensembles des machines du cluster**.

Enfin, avec tr√®s peu de modification, et plus simplement encore, <br />
nous avons pu **ex√©cuter notre notebook comme nous l'avions fait localement**.<br />
Nous avons cette fois-ci ex√©cut√© le traitement sur **l'ensemble des images de notre dossier "Test"**.

Nous avons opt√© pour le service **Amazon S3** pour **stocker les donn√©es de notre projet**. <br />
S3 offre, pour un faible co√ªt, toutes les conditions dont nous avons besoin pour stocker <br />
et exploiter de mani√®re efficace nos donn√©es.<br />
L'espace allou√© est potentiellement **illimit√©**, mais les co√ªts seront fonction de l'espace utilis√©.

Il nous sera **facile de faire face √† une mont√© de la charge de travail** en **redimensionnant** <br />
simplement notre cluster de machines (horizontalement et/ou verticalement au besoin), <br />
les co√ªts augmenteront en cons√©quence mais resteront nettement inf√©rieurs aux co√ªts engendr√©s <br />
par l'achat de mat√©riels ou par la location de serveurs d√©di√©s.