# Déployez un modèle dans le cloud

# Contexte :

Nous sommes Data Scientist dans une très jeune start-up de l'AgriTech, nommée "Fruits!", qui cherche à proposer des solutions innovantes pour la récolte des fruits.
La volonté de l’entreprise est de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents.

La start-up "Fruits" souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire une première version de l'architecture Big Data nécessaire.

![Screenshot%202024-09-16%20115437.png](attachment:Screenshot%202024-09-16%20115437.png)

# Sommaire :

**1. Préambule**<br />
&emsp;1.1 Problématique<br />
&emsp;1.2 Objectifs dans ce projet<br />
&emsp;1.3 Déroulement des étapes du projet<br />
**2. Choix techniques généraux retenus**<br />
&emsp;2.1 Calcul distribué<br />
&emsp;2.2 Transfert Learning<br />
**3. Déploiement de la solution en local**<br />
&emsp;3.1 Environnement de travail<br />
&emsp;3.2 Installation de Spark<br />
&emsp;3.3 Installation des packages<br />
&emsp;3.4 Import des librairies<br />
&emsp;3.5 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;3.6 Création de la SparkSession<br />
&emsp;3.7 Traitement des données<br />
&emsp;&emsp;3.7.1 Chargement des données<br />
&emsp;&emsp;3.7.2 Préparation du modèle<br />
&emsp;&emsp;3.7.3 Définition du processus de chargement des images et application <br />
&emsp;&emsp;&emsp;&emsp;&emsp;de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;3.7.4 Exécution des actions d'extractions de features<br />
&emsp;3.8 Chargement des données enregistrées et validation du résultat<br />
**4. Déploiement de la solution sur le cloud**<br />
&emsp;4.1 Choix du prestataire cloud : AWS<br />
&emsp;4.2 Choix de la solution technique : EMR<br />
&emsp;4.3 Choix de la solution de stockage des données : Amazon S3<br />
&emsp;4.4 Configuration de l'environnement de travail<br />
&emsp;4.5 Upload de nos données sur S3<br />
&emsp;4.6 Configuration du serveur EMR<br />
&emsp;&emsp;4.6.1 Étape 1 : Logiciels et étapes<br />
&emsp;&emsp;&emsp;4.6.1.1 Configuration des logiciels<br />
&emsp;&emsp;&emsp;4.6.1.2 Modifier les paramètres du logiciel<br />
&emsp;&emsp;4.6.2 Étape 2 : Matériel<br />
&emsp;&emsp;4.6.3 Étape 3 : Paramètres de cluster généraux<br />
&emsp;&emsp;&emsp;4.6.3.1 Options générales<br />
&emsp;&emsp;&emsp;4.6.3.2 Actions d'amorçage<br />
&emsp;&emsp;4.6.4 Étape 4 : Sécurité<br />
&emsp;&emsp;&emsp;4.6.4.1 Options de sécurité<br />
&emsp;4.7 Instanciation du serveur<br />
&emsp;4.8 Création du tunnel SSH à l'instance EC2 (Maître)<br />
&emsp;&emsp;4.8.1 Création des autorisations sur les connexions entrantes<br />
&emsp;&emsp;4.8.2 Création du tunnel ssh vers le Driver<br />
&emsp;&emsp;4.8.3 Configuration de FoxyProxy<br />
&emsp;&emsp;4.8.4 Accès aux applications du serveur EMR via le tunnel ssh<br />
&emsp;4.9 Connexion au notebook JupyterHub<br />
&emsp;4.10 Exécution du code<br />
&emsp;&emsp;4.10.1 Démarrage de la session Spark<br />
&emsp;&emsp;4.10.2 Installation des packages<br />
&emsp;&emsp;4.10.3 Import des librairies<br />
&emsp;&emsp;4.10.4 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;&emsp;4.10.5 Traitement des données<br />
&emsp;&emsp;&emsp;4.10.5.1 Chargement des données<br />
&emsp;&emsp;&emsp;4.10.5.2 Préparation du modèle<br />
&emsp;&emsp;&emsp;4.10.5.3 Définition du processus de chargement des images<br />
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;et application de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;&emsp;4.10.5.4 Exécutions des actions d'extractions de features<br />
&emsp;&emsp;4.10.6 Chargement des données enregistrées et validation du résultat<br />
&emsp;4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark<br />
&emsp;4.12 Résiliation de l'instance EMR<br />
&emsp;4.13 Cloner le serveur EMR (si besoin)<br />
&emsp;4.14 Arborescence du serveur S3 à la fin du projet<br />
**5. Conclusion**

# Sommaire :

**1. Préambule**<br />
&emsp;1.1 Problématique<br />
&emsp;1.2 Objectifs dans ce projet<br />
&emsp;1.3 Déroulement des étapes du projet<br />
**2. Choix techniques généraux retenus**<br />
&emsp;2.1 Calcul distribué<br />
&emsp;2.2 Transfert Learning<br />
**3. Déploiement de la solution en local**<br />
&emsp;3.1 Environnement de travail<br />
&emsp;3.2 Installation de Spark<br />
&emsp;3.3 Installation des packages<br />
&emsp;3.4 Import des librairies<br />
&emsp;3.5 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;3.6 Création de la SparkSession<br />
&emsp;3.7 Traitement des données<br />
&emsp;&emsp;3.7.1 Chargement des données<br />
&emsp;&emsp;3.7.2 Préparation du modèle<br />
&emsp;&emsp;3.7.3 Définition du processus de chargement des images et application <br />
&emsp;&emsp;&emsp;&emsp;&emsp;de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;3.7.4 Exécution des actions d'extractions de features<br />
&emsp;3.8 Chargement des données enregistrées et validation du résultat<br />
**4. Déploiement de la solution sur le cloud**<br />
&emsp;4.1 Choix du prestataire cloud : AWS<br />
&emsp;4.2 Choix de la solution technique : EMR<br />
&emsp;4.3 Choix de la solution de stockage des données : Amazon S3<br />
&emsp;4.4 Configuration de l'environnement de travail<br />
&emsp;4.5 Upload de nos données sur S3<br />
&emsp;4.6 Configuration du serveur EMR<br />
&emsp;&emsp;4.6.1 Étape 1 : Logiciels et étapes<br />
&emsp;&emsp;&emsp;4.6.1.1 Configuration des logiciels<br />
&emsp;&emsp;&emsp;4.6.1.2 Modifier les paramètres du logiciel<br />
&emsp;&emsp;4.6.2 Étape 2 : Matériel<br />
&emsp;&emsp;4.6.3 Étape 3 : Paramètres de cluster généraux<br />
&emsp;&emsp;&emsp;4.6.3.1 Options générales<br />
&emsp;&emsp;&emsp;4.6.3.2 Actions d'amorçage<br />
&emsp;&emsp;4.6.4 Étape 4 : Sécurité<br />
&emsp;&emsp;&emsp;4.6.4.1 Options de sécurité<br />
&emsp;4.7 Instanciation du serveur<br />
&emsp;4.8 Création du tunnel SSH à l'instance EC2 (Maître)<br />
&emsp;&emsp;4.8.1 Création des autorisations sur les connexions entrantes<br />
&emsp;&emsp;4.8.2 Création du tunnel ssh vers le Driver<br />
&emsp;&emsp;4.8.3 Configuration de FoxyProxy<br />
&emsp;&emsp;4.8.4 Accès aux applications du serveur EMR via le tunnel ssh<br />
&emsp;4.9 Connexion au notebook JupyterHub<br />
&emsp;4.10 Exécution du code<br />
&emsp;&emsp;4.10.1 Démarrage de la session Spark<br />
&emsp;&emsp;4.10.2 Installation des packages<br />
&emsp;&emsp;4.10.3 Import des librairies<br />
&emsp;&emsp;4.10.4 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;&emsp;4.10.5 Traitement des données<br />
&emsp;&emsp;&emsp;4.10.5.1 Chargement des données<br />
&emsp;&emsp;&emsp;4.10.5.2 Préparation du modèle<br />
&emsp;&emsp;&emsp;4.10.5.3 Définition du processus de chargement des images<br />
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;et application de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;&emsp;4.10.5.4 Exécutions des actions d'extractions de features<br />
&emsp;&emsp;4.10.6 Chargement des données enregistrées et validation du résultat<br />
&emsp;4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark<br />
&emsp;4.12 Résiliation de l'instance EMR<br />
&emsp;4.13 Cloner le serveur EMR (si besoin)<br />
&emsp;4.14 Arborescence du serveur S3 à la fin du projet<br />
**5. Conclusion**

## 1.3 Déroulement des étapes du projet

Le projet va être réalisé en 2 temps, dans deux environnements différents. <br />
Nous allons dans un premier temps développer et exécuter notre code en local, <br />
en travaillant sur un nombre limité d'images à traiter.

Une fois les choix techniques validés, nous déploierons notre solution <br />
dans un environnement Big Data en mode distribué.

<u>Pour cette raison, ce projet sera divisé en 3 parties</u>:
1. Liste des choix techniques généraux retenus
2. Déploiement de la solution en local
3. Déploiement de la solution dans le cloud

# 2. Choix techniques généraux retenus

## 2.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** <br />
afin de <u>prendre en compte l’augmentation très rapide du volume <br />
de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** <br />
et son principe de fonctionnement, nous vous conseillons de lire <br />
cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:<br />
« *Lorsque l’on parle de traitement de bases de données sur python, <br />
on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a <br />
affaire à des bases de données trop massives, les calculs deviennent trop lents.<br />
Heureusement, il existe une autre librairie python, assez proche <br />
de pandas, qui permet de traiter des très grandes quantités de données : PySpark.<br />
Apache Spark est un framework open-source développé par l’AMPLab <br />
de UC Berkeley permettant de traiter des bases de données massives <br />
en utilisant le calcul distribué, technique qui consiste à exploiter <br />
plusieurs unités de calcul réparties en clusters au profit d’un seul <br />
projet afin de diviser le temps d’exécution d’une requête.<br />
Spark a été développé en Scala et est au meilleur de ses capacités <br />
dans son langage natif. Cependant, la librairie PySpark propose de <br />
l’utiliser avec le langage Python, en gardant des performances <br />
similaires à des implémentations en Scala.<br />
Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on <br />
cherche à traiter des jeux de données trop volumineux qui entraînent <br />
des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer <br />
avec **Spark** via le langage **Python**.<br />
**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner <br />
l'exécution de tâches sur des données à travers un groupe d'ordinateurs. <br />
<u>Spark (ou Apache Spark) est un framework open source de calcul distribué <br />
in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour <br />
comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle <br />
des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») <br />
et de plusieurs exécuteurs (« executor processes »). Il peut être configuré <br />
pour être lui-même l’exécuteur (local mode) ou en utiliser autant que <br />
nécessaire pour traiter l’application, Spark prenant en charge la mise <br />
à l’échelle automatique par une configuration d’un nombre minimum <br />
et maximum d’exécuteurs.*

![Schéma de Spark] ![spark-schema.png](attachment:spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie <br />
les tâches entre les différents exécuteurs qui les exécutent et permettent <br />
un traitement réparti. Il est le responsable de l’exécution du code <br />
sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct <br />
dont il est possible de configurer le nombre de CPU et la quantité de <br />
mémoire qui lui est alloué. <br />
Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** <br />
et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons <br />
le calcul distribué** afin de valider que notre solution fonctionne.<br />

## 2.2 Transfert Learning

L'énoncé du projet nous demande également de <br />
réaliser une première chaîne de traitement <br />
des données qui comprendra le preprocessing et <br />
une étape de réduction de dimension.

Il est également précisé qu'il n'est pas nécessaire <br />
d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**.

Simplement, le **transfert learning** consiste <br />
à utiliser la connaissance déjà acquise <br />
par un modèle entraîné (ici **MobileNetV2**) pour <br />
l'adapter à notre problématique.

Nous allons fournir au modèle nos images, et nous allons <br />
<u>récupérer l'avant dernière couche</u> du modèle.<br />
En effet la dernière couche de modèle est une couche softmax <br />
qui permet la classification des images ce que nous ne <br />
souhaitons pas dans ce projet.

L'avant dernière couche correspond à un **vecteur <br />
réduit** de dimension (1,1,1280).

Cela permettra de réaliser une première version du moteur <br />
pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, <br />
particulièrement adaptée pour le traitement d'un gros volume <br />
de données ainsi que la <u>faible dimensionnalité du vecteur <br />
de caractéristique en sortie</u> (1,1,1280)

# 3. Déploiement de la solution en local

Nous venons de valider le processus sur un jeu de données allégé en local où nous avons simulé un cluster de machines en répartissant la charge de travail sur différents cœurs de processeur au sein d'une même machine.

Nous allons maintenant généraliser le processus en déployant notre solution sur un réel cluster de machines et nous travaillerons désormais sur la totalité des 22819 images de notre dossier **sample_300_images**.

# 4. Déploiement de la solution sur le cloud

Maintenant que nous avons vérifié que notre solution fonctionne, <br />
il est temps de la <u>déployer à plus grande échelle sur un vrai cluster de machines</u>.

**Attention**, *je travaille sous Linux avec une version Ubuntu, <br />
les commandes décrites ci-dessous sont donc réalisées <br />
exclusivement dans cet environnement.*

<u>Plusieurs contraintes se posent</u> :
 1. Quel prestataire de Cloud choisir ?
 2. Quelles solutions de ce prestataire adopter ?
 3. Où stocker nos données ?
 4. Comment configurer nos outils dans ce nouvel environnement ?

## 4.1 Choix du prestataire cloud : AWS

Le prestataire le plus connu et qui offre à ce jour l'offre <br />
la plus large dans le cloud computing est **Amazon Web Services** (AWS).<br />
Certaines de leurs offres sont parfaitement adaptées à notre problématique <br />
et c'est la raison pour laquelle j'utiliserai leurs services.

L'objectif premier est de pouvoir, grâce à AWS, <u>louer de la puissance de calcul à la demande</u>. <br />
L'idée étant de pouvoir, quel que soit la charge de travail, <br />
obtenir suffisamment de puissance de calcul pour pouvoir traiter nos images, <br />
même si le volume de données venait à fortement augmenter.

De plus, la capacité d'utiliser cette puissance de calcul à la demande <br />
permet de diminuer drastiquement les coûts si l'on compare les coûts d'une location <br />
de serveur complet sur une durée fixe (1 mois, 1 année par exemple).

## 4.2 Choix de la solution technique : EMR

<u>Plusieurs solutions s'offre à nous</u> :
1. Solution **IAAS** (Infrastructure AS A Service)
 - Dans cette configuration **AWS** met à notre disposition des serveurs vierges <br />
   sur lequel nous avons un accès en administrateur, ils sont nommés **instance EC2**.<br />
   Pour faire simple, nous pouvons avec cette solution reproduire pratiquement <br />
   à l'identique la solution mis en œuvre en local sur notre machine.<br />
   <u>On installe nous-même l'intégralité des outils puis on exécute notre script</u> :
  - Installation de **Spark**, **Java** etc.
  - Installation de **Python** (via Anaconda par exemple)
  - Installation de **Jupyter Notebook**
  - Installation des **librairies complémentaires**
  - Il faudra bien évidement veiller à **implémenter les librairies 
    nécessaires à toutes les machines (workers) du cluster**
  - <u>Avantages</u> :
      - Liberté totale de mise en œuvre de la solution
      - Facilité de mise en œuvre à partir d'un modèle qui s'exécute en local sur une machine Linux
  - <u>Inconvénients</u> :
      - Cronophage
          - Nécessité d'installer et de configurer toute la solution
      - Possible problèmes techniques à l'installation des outils (des problématiques qui <br />
        n'existaient pas en local sur notre machine peuvent apparaitre sur le serveur EC2)
      - Solution non pérenne dans le temps, il faudra veiller à la mise à jour des outils <br />
        et éventuellement devoir réinstaller Spark, Java etc. 
2. Solution **PAAS** (Plateforme As A Service)
 - **AWS** fournit énormément de services différents, dans l'un de ceux-là <br />
   il existe une offre qui permet de louer des **instances EC2** <br />
   avec des applications préinstallées et configurées : il s'agit du **service EMR**.
 - **Spark** y sera déjà installé
 - Possibilité de demander l'installation de **Tensorflow** ainsi que **JupyterHub**
 - Possibilité d'indiquer des **packages complémentaires** à installer <br />
   à l'initialisation du serveur **sur l'ensemble des machines du cluster**.
 - <u>Avantages</u> :
     - Facilité de mise en œuvre
         - Il suffit de très peu de configuration pour obtenir <br />
           un environnement parfaitement fonctionnel
     - Rapidité de mise en œuvre
         - Une fois la première configuration réalisée, il est très facile <br />
           et très rapide de recréer des clusters à l'identique qui seront <br />
           disponibles presque instantanément (le temps d'instancier les <br />
           serveurs soit environ 15/20 minutes)
     - Solutions matérielless et logicielles optimisées par les ingénieurs d'AWS
         - On sait que les versions installées vont fonctionner <br />
           et que l'architecture proposée est optimisée
     - Stabilité de la solution
    - Solution évolutive
        Il est facile d’obtenir à chaque nouvelle instanciation une version à jour <br />
        de chaque package, en étant garanti de leur compatibilité avec le reste de l’environnement.
  - Plus sécurisé
	- Les éventuels patchs de sécurité seront automatiquement mis à jour <br />
      à chaque nouvelle instanciation du cluster EMR.
 - <u>Inconvénients</u> :
     - Peut-être un certain manque de liberté sur la version des packages disponibles ? <br />
       Même si je n'ai pas constaté ce problème.
   

Je retiens la solution **PAAS** en choisissant d'utiliser <br />
le service **EMR** d'Amazon Web Services.<br />
Je la trouve plus adaptée à notre problématique et permet <br />
une mise en œuvre qui soit à la fois plus rapide et <br />
plus efficace que la solution IAAS.

## 4.3 Choix de la solution de stockage des données : Amazon S3

<u>Amazon propose une solution très efficace pour la gestion du stockage des données</u> : **Amazon S3**. <br />
S3 pour Amazon Simple Storage Service.

Il pourrait être tentant de stocker nos données sur l'espace alloué par le serveur **EC2**, <br />
mais si nous ne prenons aucune mesure pour les sauvegarder ensuite sur un autre support, <br />
<u>les données seront perdues</u> lorsque le serveur sera résilié (on résilie le serveur lorsqu'on <br />
ne s'en sert pas pour des raisons de coût).<br />
De fait, si l'on décide d'utiliser l'espace disque du serveur EC2 il faudra imaginer <br />
une solution pour sauvegarder les données avant la résiliation du serveur.
De plus, nous serions exposés à certaines problématiques si nos données venaient à <br />
**saturer** l'espace disponible de nos serveurs (ralentissements, disfonctionnements).

<u>Utiliser **Amazon S3** permet de s'affranchir de toutes ces problématiques</u>. <br />
L'espace disque disponible est **illimité**, et il est **indépendant de nos serveurs EC2**. <br />
L'accès aux données est **très rapide** car nous restons dans l'environnement d'AWS <br />
et nous prenons soin de <u>choisir la même région pour nos serveurs **EC2** et **S3**</u>.

De plus, comme nous le verrons <u>il est possible d'accéder aux données sur **S3** <br />
    de la même manière que l'on **accède aux données sur un disque local**</u>.<br />
Nous utiliserons simplement un **PATH au format s3://...** .

## 4.4 Configuration de l'environnement de travail

La première étape est d'installer et de configurer [**AWS Cli**](https://aws.amazon.com/fr/cli/),<br />
il s'agit de l'**interface en ligne de commande d'AWS**.<br />
Elle nous permet d'**interagir avec les différents services d'AWS**, comme **S3** par exemple.

Pour pouvoir utiliser **AWS Cli**, il faut le configurer en créant préalablement <br />
un utilisateur à qui on donnera les autorisations dont nous aurons besoin.<br />
Dans ce projet il faut que l'utilisateur ait à minima un contrôle total sur le service S3.

<u>La gestion des utilisateurs et de leurs droits s'effectue via le service **AMI**</u> d'AWS.

Une fois l'utilisateur créé et ses autorisations configurées nous créons une **paire de clés** <br />
qui nous permettra de nous **connecter sans à avoir à devoir saisir systématiquement notre login/mot de passe**.<br />

Il faut également configurer l'**accès SSH** à nos futurs serveurs EC2. <br />
Ici aussi, via un système de clés qui nous dispense de devoir nous authentifier "à la main" à chaque connexion.

Toutes ses étapes de configuration sont parfaitement décrites <br />
dans le cours du projet: [Réalisez des calculs distribués sur des données massives / Découvrez Amazon Web Services](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308686-decouvrez-amazon-web-services#/id/r-4355822)

## 4.5 Upload de nos données sur S3

Nos outils sont configurés. <br />
Il faut maintenant uploader nos données de travail sur Amazon S3.

Ici aussi les étapes sont décrites avec précision <br />
dans le cours [Réalisez des calculs distribués sur des données massives / Stockez des données sur S3](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308691-stockez-des-donnees-sur-s3)

Je décide de n'uploader que les données contenues dans le dossier **Test** du [jeu de données du projet](https://www.kaggle.com/moltean/fruits/download)


La première étape consiste à **créer un bucket sur S3** <br />
dans lequel nous uploaderons les données du projet:
- **aws s3 mb s3://tkap9**

On vérifie que le bucket à bien été créé
- **aws s3 ls**
 - Si le nom du bucket s'affiche alors c'est qu'il a été correctement créé.

On copie ensuite le contenu du dossier "**Test**" <br />
dans un répertoire "**Test**" sur notre bucket "**tkap9**":
1. On se place à l'intérieur du répertoire **Test**
2. **aws sync . s3://tkap9/Test**

La commande **sync** est utile pour synchroniser deux répertoires.

<u>Nos données du projet sont maintenant disponibles sur Amazon S3</u>.

## 4.6 Configuration du serveur EMR

Une fois encore, le cours [Réalisez des calculs distribués sur des données massives / Déployez un cluster de calculs distribués](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308696-deployez-un-cluster-de-calculs-distribues) <br /> détaille l'essentiel des étapes pour lancer un cluster avec **EMR**.

<u>Je détaillerai ici les étapes particulières qui nous permettent <br />
de configurer le serveur selon nos besoins</u> :

1. Cliquez sur Créer un cluster

![emr_creer_cluster.png](attachment:emr_creer_cluster.png)

2. Offre d'applications



### 4.6.1 Étape 1 : Logiciels et étapes

#### 4.6.1.1 Configuration des logiciels

<u>Sélectionnez les packages dont nous aurons besoin comme dans la capture d'écran</u> :
1. Nous sélectionnons la dernière version d'**EMR**, soit la version **6.7.0** au moment où je rédige ce document
2. Nous cochons bien évidement **Hadoop** et **Spark** qui seront préinstallés dans leur version la plus récente
3. Nous aurons également besoin de **TensorFlow** pour importer notre modèle et réaliser le **transfert learning**
4. Nous travaillerons enfin avec un **notebook Jupyter** via l'application **JupyterHub**<br />
 - Comme nous le verrons dans un instant nous allons <u>paramétrer l'application afin que les notebooks</u>, <br />
 - Comme le reste de nos données de travail, <u>soient enregistrés directement sur S3</u>.

#### 4.6.1.2 Modifier les paramètres du logiciel

<u>Paramétrez la persistance des notebooks créés et ouvert via JupyterHub</u> :
- On peut à cette étape effectuer des demandes de paramétrage particulières sur nos applications. <br />
  L'objectif est, comme pour le reste de nos données de travail, <br />
  d'éviter toutes les problématiques évoquées précédemment. <br />
  C'est l'objectif à cette étape, <u>nous allons enregistrer <br />
  et ouvrir les notebooks</u> non pas sur l'espace disque de  l'instance EC2 (comme <br />
  ce serait le cas dans la configuration par défaut de JupyterHub) mais <br />
  <u>directement sur **Amazon S3**</u>.
- <u>deux solutions sont possibles pour réaliser cela</u> :
 1. Créer un **fichier de configuration JSON** que l'on **upload sur S3** et on indique ensuite le chemin d’accès au fichier JSON
 2. Rentrez directement la configuration au format JSON
 
J'ai personnellement créé un fichier JSON lors de la création de ma première instance EMR, <br />
puis lorsqu'on décide de cloner notre serveur pour en recréer un facilement à l'identique, <br />
la configuration du fichier JSON se retrouve directement copié comme dans la capture ci-dessous.

<u>Voici le contenu de mon fichier JSON</u> :  [{"classification":"jupyter-s3-conf","properties":{"s3.persistence.bucket":"tkap9","s3.persistence.enabled":"true"}}]

![Capture%20d'%C3%A9cran%202024-11-05%20173335.png](attachment:Capture%20d'%C3%A9cran%202024-11-05%20173335.png)

### 4.6.2 Étape 2 : Matériel

A cette étape, laissez les choix par défaut. <br />
<u>L'important ici est la sélection de nos instances</u> :

1. je choisi les instances de type **M5** qui sont des **instances de type équilibrés**
2. je choisi le type **xlarge** qui est l'instance la **moins onéreuse disponible**
 [Plus d'informations sur les instances M5 Amazon EC2](https://aws.amazon.com/fr/ec2/instance-types/m5/)
3. Je sélectionne **1 instance Maître** (le driver) et **2 instances Principales** (les workeurs) <br />
   soit **un total de 3 instance EC2**.
![Choix du materiel] ![Capture d'écran 2024-10-29 152030.png](attachment:850726d0-6325-45a4-8cdd-51aef3272bca.png)

### 4.6.3 Étape 3 : Paramètres de cluster généraux

#### 4.6.3.1 Options générales
<u>La première chose à faire est de donner un nom au cluster</u> :<br />
*J'ai également décoché "Protection de la résiliation" pour des raisons pratiques.*
    
![Capture d'écran 2024-10-29 152129.png](attachment:454d1147-9f3f-4413-8953-82c588adb879.png)


#### 4.6.3.2 Actions d'amorçage

Nous allons à cette étape **choisir les packages manquants à installer** et qui <br />
nous serons utiles dans l'exécution de notre notebook.<br />
<u>L'avantage de réaliser cette étape maintenant est que les packages <br />
installés le seront sur l'ensemble des machines du cluster</u>.

La procédure pour créer le fichier **bootstrap** qui contient <br />
l'ensemble des instructions permettant d'installer tous <br />
les packages dont nous aurons besoin est expliqué dans <br />
le cours [Réalisez des calculs distribués sur des données massives / Bootstrapping](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308696-deployez-un-cluster-de-calculs-distribues#/id/r-4356490)

Nous créons donc un fichier nommé "**bootstrap.sh**" que nous <u>uploadons <br />
sur S3</u>(je l’installe à la racine de mon **bucket "tkap9"**) et nous l'ajoutons <br />
comme indiqué dans la capture d'écran ci-dessous:
                                                                                    
![Capture d'écran 2024-11-05 173253.png](attachment:de0770e0-2420-49f0-949b-6e98774eb2a2.png)


Voici le contenu du fichier **bootstrap-emr.sh**<br />
Comme on peut le constater il s'agit simplement de commande "**pip install**" <br />
pour **installer les bibliothèques manquantes** comme réalisé en local.<br />
Une fois encore, <u>il est nécessaire de réaliser ces actions à cette étape</u> <br />
pour que <u>les packages soient installés sur l'ensemble des machines du cluster</u> <br />
et non pas uniquement sur le driver, comme cela serait le cas si nous exécutions <br />
ces commandes directement dans le notebook JupyterHub ou dans la console EMR (connecté au driver).
![Contenu du fichier bootstrap] ![Capture d'écran 2024-11-07 140000.png](attachment:dde93c0d-d5ef-408b-9b8b-87498ab380d0.png)

**setuptools** et **pip** sont mis à jour pour éviter une problématique <br />
avec l'installation du package **pyarrow**.<br />
**Pandas** a eu droit à une mise à jour majeur (1.3.0) il y a moins d'une semaine <br />
au moment de la rédaction de ce notebook, et la nouvelle version de **Pandas** <br />
nécessite une version plus récente de **Numpy** que la version installée par <br />
défaut (1.16.5) à l'initialisation des instances **EC2**. <u>Il ne semble pas <br />
possible d'imposer une autre version de Numpy que celle installé par <br />
défaut</u> même si on force l'installation d'une version récente de **Numpy** <br />
(en tout cas, ni simplement ni intuitivement).<br />
La mise à jour étant très récente <u>la version de **Numpy** n'est pas encore <br />
mise à jour sur **EC2**</u> mais on peut imaginer que ce sera le cas très rapidement <br />
et il ne sera plus nécessaire d'imposer une version spécifique de **Pandas**.<br />
En attendant, je demande <u>l'installation de l'avant dernière version de **Pandas (1.2.5)**</u>



### 4.6.4 Étape 4 : Sécurité

#### 4.6.4.1 Options de sécurité

A cette étape nous sélectionnons la **paire de clés EC2** créé précédemment. <br />
Elle nous permettra de se connecter en **ssh** à nos **instances EC2** <br />
sans avoir à entrer nos login/mot de passe.<br />
On laisse les autres paramètres par défaut. <br />
Et enfin, on clique sur "***Créer un cluster***"
 
![EMR Sécurité] ![Capture d'écran 2024-11-05 173356.png](attachment:9e47e82a-4f6c-4114-863d-56cbfc6e5f85.png)

![Capture d'écran 2024-11-05 173423.png](attachment:983e1b42-7057-4698-acb2-dff2dd590b36.png)

## 4.7 Instanciation du serveur

Il ne nous reste plus qu'à attendre que le serveur soit prêt. <br />
Cette étape peut prendre entre **15 et 20 minutes**.

<u>Plusieurs étapes s'enchaîne, on peut suivre l'avancé du statut du **cluster EMR**</u> :

![Instanciation étape 1] ![Capture d'écran 2024-11-07 170314.png](attachment:487edb15-420d-4021-80c2-3dd4572ebe7f.png)
![Instanciation étape 2] ![Capture d'écran 2024-11-07 170452.png](attachment:1ca77bf8-e8bc-4d1a-960c-2813b1206382.png)
![Instanciation étape 3] ![Capture d'écran 2024-11-07 171550.png](attachment:770c92a8-465c-4448-a4f2-a23cc48a14b6.png)

<u>Lorsque le statut affiche en vert: "**En attente**" cela signifie que l'instanciation <br />
s'est bien déroulée et que notre serveur est prêt à être utilisé</u>.

## 4.8 Création du tunnel SSH à l'instance EC2 (Maître)

### 4.8.1 Création des autorisations sur les connexions entrantes

<u>Nous souhaitons maintenant pouvoir accéder à nos applications</u> :
 - **JupyterHub** pour l'exécution de notre notebook
 - **Serveur d'historique Spark** pour le suivi de l'exécution <br />
   des tâches de notre script lorsqu'il sera lancé
 
Cependant, <u>ces applications ne sont accessibles que depuis le réseau local du driver</u>, <br />
et pour y accéder nous devons **créer un tunnel SSH vers le driver**.

Par défaut, ce driver se situe derrière un firewall qui bloque l'accès en SSH. <br />
<u>Pour ouvrir le port 22 qui correspond au port sur lequel écoute le serveur SSH, <br />
il faut modifier le **groupe de sécurité EC2 du driver**</u>.

Cette étape est décrite dans le cours [Réalisez des calculs distribués sur des données massives / Lancement d'une application à partir du driver](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308696-deployez-un-cluster-de-calculs-distribues#/id/r-4356512): 

*Il faudra que l'on se connecte en SSH au driver de notre cluster. <br />
Par défaut, ce driver se situe derrière un firewall qui bloque l'accès en SSH. <br />
Pour ouvrir le port 22 qui correspond au port sur lequel écoute le serveur SSH, <br />
il faut modifier le groupe de sécurité EC2 du driver. Sur la page de la console <br />
consacrée à EC2, dans l'onglet "Réseau et sécurité", cliquez sur "Groupes de sécurité". <br />
Vous allez devoir modifier le groupe de sécurité d’ElasticMapReduce-Master. <br />
Dans l'onglet "Entrant", ajoutez une règle SSH dont la source est "N'importe où" <br />
(ou "Mon IP" si vous disposez d'une adresse IP fixe).*


<u>Une fois cette étape réalisée vous devriez avoir une configuration semblable à la mienne</u> :

![Configuration ssh terminée] ![Capture d'écran 2024-10-28 122404.png](attachment:71d8be02-1720-4190-af7a-0ffa1d508ec3.png)

### 4.8.2 Création du tunnel ssh vers le Driver

On peut maintenant établir le **tunnel SSH** vers le **Driver**. <br />
Pour cela on récupère les informations de connexion fournis par Amazon <br />
depuis la page du service EMR / Cluster / onglet Récapitulatif en <br />
cliquant sur "**Activer une connexion SSH**"

![Activer une connexion SSH] ![Capture d'écran 2024-10-28 122459.png](attachment:7f4c1ca6-2ddb-41e8-b23f-e9739a73d13d.png)

<u>On récupère ensuite la commande fournis par Amazon pour **établir le tunnel SSH**</u> :

![Récupérer la commande pour établir le tunnel ssh] ![Capture d'écran 2024-11-06 132540.png](attachment:8b109006-38f4-48f6-896f-b7335e6b38bc.png)

<u>Dans mon cas, la commande ne fonctionne pas tel</u> quel et j'ai du **l'adapter à ma configuration**. <br />
La **clé ssh** se situe dans un dossier "**.ssh**" elle-même située dans <br />
mon **répertoire personnel** dont le symbole est, sous Linux, identifié par un tilde "**~**".

Ayant suivi le cours [Réalisez des calculs distribués sur des données massives / Lancement d'une application à partir du driver](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives) <br />
j'ai choisi d'utiliser le port **5555** au lieu du **8157**, même si le choix n'est pas très important.<br />
    j'ai également rencontré un <u>problème de compatibilité</u> avec <br />
l'argument "**-N**" (liste des arguments et leur significations <br />
disponibles [ici](https://explainshell.com/explain?cmd=ssh+-L+-N+-f+-l+-D)) j'ai décidé de simplement le supprimer.

<u>Finalement, nous choisissons l'option avec PuTTY et nous suivons les étapes indiquées <br />
    mon tunnel ssh (seul l'URL change d'une instance à une autre)</u> : <br />
![Capture d'écran 2024-11-06 161426.png](attachment:a11739cf-aa32-4def-8459-b982ffa0bc05.png)


<u>On clique sur "**Accept**" pour valider la connexion et si <br />
    la connexion est établit on obtient le résultat suivant</u> :

![Création du tunnel SSH] ![Capture d'écran 2024-11-06 161527.png](attachment:1c9b9b4a-a5f5-469d-8e32-dc26c0aec2dc.png)

Nous avons **correctement établi le tunnel ssh avec le driver** sur le port "5555".

### 4.8.3 Configuration de FoxyProxy

Une dernière étape est nécessaire pour accéder à nos applications, <br />
en demandant à notre navigateur d'emprunter le tunnel ssh.<br />
J'utilise pour cela **FoxyProxy**.
[Une fois encore, vous pouvez utiliser le cours pour le configurer](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308701-realisez-la-maintenance-dun-cluster#/id/r-4356554).

Sinon, ouvrez la configuration de **FoxyProxy** et <u>cliquez sur **Ajouter**</u> en haut à gauche <br />
puis renseigner les éléments comme dans la capture ci-dessous :

![Configuration FoxyProxy Etape 1] ![Capture d'écran 2024-11-06 170739.png](attachment:1afc96ce-4862-49d8-a25f-07dfef738843.png)

<u>On obtient le résultat ci-dessous</u> :

![Configuration FoxyProxy Etape 2] ![Capture d'écran 2024-11-06 172541.png](attachment:64c6e051-abee-4d99-b696-d86333419652.png)


### 4.8.4 Accès aux applications du serveur EMR via le tunnel ssh

<u>On active le **tunnel ssh** comme vu précédemment puis on demande <br />
à notre navigateur de l'utiliser avec **FoxyProxy**</u> :

![FoxyProxy activation] ![Capture d'écran 2024-11-07 181538.png](attachment:6bfe3833-e1fe-44e8-93a2-71f67c355984.png)

<u>On peut maintenant s'apercevoir que plusieurs applications nous sont accessibles</u> :


![Capture d'écran 2024-11-19 143745.png](attachment:d2193e62-f924-4a67-8ffb-90863f2bc811.png)


## 4.9 Connexion au notebook JupyterHub

Pour se connecter à **JupyterHub** en vue d'exécuter notre **notebook**, <br />
il faut commencer par <u>cliquer sur l'application **JupyterHub**</u> apparu <br />
depuis que nous avons configuré le **tunnel ssh** et **foxyproxy** sur <br />
notre navigateur (actualisez la page si ce n’est pas le cas).

On passe les éventuels avertissements de sécurité puis <br />
nous arrivons sur une page de connexion.
    
<u>On se connecte avec les informations par défaut</u> :
 - <u>login</u>: **jovyan**
 - <u>password</u>: **jupyter**
 
![Connexion à JupyterHub] ![Capture d'écran 2024-11-19 115607.png](attachment:315ff7df-1ef5-4ef9-80ff-0faf2b744a18.png)

Nous arrivons ensuite dans un dossier vierge de notebook.<br />
Il suffit d'en créer un en cliquant sur "**New**" en haut à droite.

Il est également possible d'en <u>uploader un directement dans notre **bucket S3**</u>.

Grace à la <u>**persistance** paramétrée à l'instanciation du cluster <br />
nous sommes actuellement dans l'arborescence de notre **bucket S3**</u>


Je décide d'**importer un notebook déjà rédigé en local directement <br />
sur S3** et je l'ouvre depuis **l'interface JupyterHub**.


# L'exécution de cette cellule démarre l'application Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1732016337502_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1732016337502_0001,pyspark,idle,Link,Link,,✔


### 4.10.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 4.10.3 Import des librairies


In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
PATH = 's3://tkap9'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://tkap9
PATH_Data:   s3://tkap9/Test
PATH_Result: s3://tkap9/Results

In [5]:
PATH_Data

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

's3://tkap9/Test'

<p class="h4"; style="color:red; text-align:left; background-color:white">  4.10.5 Traitement des données </p>

#### 4.10.5.1 Chargement des données

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://tkap9/Test/W...|2024-11-07 15:48:40|  7353|[FF D8 FF E0 00 1...|
|s3://tkap9/Test/W...|2024-11-07 15:48:40|  7350|[FF D8 FF E0 00 1...|
|s3://tkap9/Test/W...|2024-11-07 15:48:40|  7349|[FF D8 FF E0 00 1...|
|s3://tkap9/Test/W...|2024-11-07 15:48:40|  7348|[FF D8 FF E0 00 1...|
|s3://tkap9/Test/W...|2024-11-07 15:48:41|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------+----------+
|path                                    |label     |
+----------------------------------------+----------+
|s3://tkap9/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://tkap9/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://tkap9/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://tkap9/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://tkap9/Test/Watermelon/r_95_100.jpg |Watermelon|
+----------------------------------------+----------+
only showing top 5 rows

None

#### 4.10.5.2 Préparation du modèle

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [10]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.10.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### 4.10.5.4 Exécutions des actions d'extractions de features

In [17]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://tkap9/Results

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
features_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(path='s3://tkap9/Test/Watermelon/r_89_100.jpg', label='Watermelon', features=[0.1832752376794815, 0.06302006542682648, 0.0, 0.06413672119379044, 0.7891983985900879, 0.0, 0.681795060634613, 0.3774951100349426, 0.03228159248828888, 0.0, 0.6115354299545288, 0.10790547728538513, 0.06453823298215866, 0.1909630000591278, 0.33987513184547424, 0.0, 0.0, 0.0, 0.0, 0.5160492658615112, 0.008838692680001259, 0.0, 0.0, 0.0027974285185337067, 0.11359379440546036, 0.9916796088218689, 2.1383256912231445, 0.0, 0.0, 1.411964774131775, 0.020703762769699097, 0.05727219209074974, 0.21094225347042084, 0.610917329788208, 0.0, 0.07101571559906006, 0.37942996621131897, 2.655921220779419, 0.004604164510965347, 0.0, 0.12006523460149765, 0.0, 0.11573515087366104, 0.0, 0.0, 0.0, 0.8342514038085938, 0.06808987259864807, 1.6478095054626465, 0.3780951201915741, 0.0, 0.03379078209400177, 0.1279163956642151, 0.0, 0.019569367170333862, 1.9713212251663208, 0.03307284042239189, 0.6757906675338745, 0.0, 0.0170093160122

In [20]:
from pyspark.ml.feature import PCA
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Convert the features column to a dense vector
dense_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
dense_df = features_df.select("path", "label", dense_vector("features").alias("dense_features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Apply PCA to the dense vector
pca = PCA(k=50, inputCol="dense_features", outputCol="pca_features")
model = pca.fit(dense_df)
result = model.transform(dense_df).select("path", "label", "pca_features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Write the result after PCA to a parquet file
result.write.mode("overwrite").parquet(PATH_Result + "/pca_results")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
print(PATH_Result + "/pca_results")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://tkap9/Results/pca_results

#### 4.10.6 Chargement des données enregistrées et validation du résultat

In [25]:
df = spark.read.parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
df_pca=spark.read.parquet(PATH_Result + "/pca_results")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
print(df.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

None

In [28]:
print(df_pca.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: vector (nullable = true)

None

In [29]:
# Sélectionner la colonne "pca_features" et la renommer en "features"
df_pca_features = df_pca.select(col("pca_features").alias("features"))

# Récupérer la première ligne du DataFrame
first_row = df_pca_features.first()

# Obtenir la dimension de la colonne "features"
dimension = len(first_row["features"])
print("Dimension après réduction PCA:", dimension)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dimension apr?s r?duction PCA: 50

In [31]:
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['path', 'label', 'features']

In [32]:
df_pca.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['path', 'label', 'pca_features']

In [33]:

df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22688

In [34]:
df_pca.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22688

In [35]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(path='s3://tkap9/Test/Pineapple Mini/274_100.jpg', label='Pineapple Mini', features=[0.0, 4.866369724273682, 0.004606354050338268, 0.0, 0.0, 0.0, 0.27068406343460083, 0.001849086955189705, 0.0, 0.0, 0.0, 0.0, 0.08412822335958481, 0.0339178740978241, 0.0, 0.0, 0.0, 0.0951492041349411, 0.0, 0.25383099913597107, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.016619060188531876, 0.5936169028282166, 0.0, 0.0, 0.0007172020268626511, 0.0, 2.531836986541748, 0.0, 0.4488779604434967, 0.1003536581993103, 0.04555368423461914, 0.6521918773651123, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1140873432159424, 0.0, 0.4122297763824463, 0.0130351223051548, 0.6858490705490112, 0.7165094614028931, 0.0009431352373212576, 0.055232416838407516, 0.0, 0.0, 0.6336780190467834, 0.8137038946151733, 0.0, 0.024101033806800842, 0.5392674803733826, 0.0, 0.14285005629062653, 0.0, 0.0, 0.4082518219947815, 1.3634408712387085, 0.35161373019218445, 0.0, 0.12242394685745239, 0.0, 0.5851898789405823, 0.0, 1.1180251836776733, 0.0, 3.4697854518890

<u>On peut également constater la présence des fichiers <br />
    au format "**parquet**" sur le **serveur S3**</u> :

![Affichage des résultats sur S3] ![image.png](attachment:image.png)

## 4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark

Il est possible de voir l'avancement des tâches en cours <br />
avec le **serveur d'historique Spark**.

![Accès au serveur d'historique spark] ![image-2.png](attachment:image-2.png)

**Il est également possible de revenir et d'étudier les tâches <br />
qui ont été réalisé, afin de debugger, optimiser les futurs <br />
tâches à réaliser.**

<u>Lorsque la commande "**features_df.write.mode("overwrite").parquet(PATH_Result)**" <br />
était en cours, nous pouvions observer son état d'avancement</u> :

![Progression execution script](img/EMR_jupyterhub_avancement.png)

<u>Le **serveur d'historique Spark** nous permet une vision beaucoup plus précise <br />
de l'exécution des différentes tâche sur les différentes machines du cluster</u> :

![Suivi des tâches spark] ![image-3.png](attachment:image-3.png)

On peut également constater que notre cluster de calcul a mis <br />
un tout petit peu moins de 30 mins pour l'ensemble des images

![Temps de traitement] ![image.png](attachment:14b25f41-98a7-4af1-8662-31ec38a17a41.png)


## 4.12 Résiliation de l'instance EMR

Notre travail est maintenant terminé. <br />
Le cluster de machines EMR est **facturé à la demande**, <br />
et nous continuons d'être facturé même lorsque <br />
les machines sont au repos.<br />
Pour **optimiser la facturation**, il nous faut <br />
maintenant **résilier le cluster**.

<u>Je réalise cette commande depuis l'interface AWS</u> :

1. Commencez par **désactiver le tunnel ssh dans FoxyProxy** pour éviter des problèmes de **timeout*



2. Cliquez sur "**Résilier**"

3. Confirmez la résiliation

4. La résiliation prend environ **1 minute**

5. La résiliation est effectuée


## 4.13 Cloner le serveur EMR (si besoin)

Si nous devons de nouveau exécuter notre notebook dans les mêmes conditions, <br />
il nous suffit de **cloner notre cluster** et ainsi en obtenir une copie fonctionnelle <br />
sous 15/20 minutes, le temps de son instanciation.

<u>Pour cela deux solutions</u> :
1. <u>Depuis l'interface AWS</u> :
 1. Cliquez sur "**Cloner**"
   
 2. Dans notre cas nous ne souhaitons pas inclure d'étapes
  
 3. La configuration du cluster est recréée à l’identique. <br />
    On peut revenir sur les différentes étapes si on souhaite apporter des modifications<br />
    Quand tout est prêt, cliquez sur "**Créer un cluster**"

2. <u>En ligne de commande</u> (avec AWS CLI d'installé et de configuré et en s'assurant <br />
   de s'attribuer les droits nécessaires sur le compte AMI utilisé)
 1. Cliquez sur "**Exporter AWS CLI**"

 2. Copier/Coller la commande **depuis un terminal**


## 4.14 Arborescence du serveur S3 à la fin du projet

<u>Pour information, voici **l'arborescence complète de mon bucket S3 tkap9** à la fin du projet</u> : <br />
*Par soucis de lisibilité, je ne liste pas les 131 sous dossiers du répertoire "Test"* 

![Capture d'écran 2024-11-19 143320.png](attachment:72c4837e-f671-45c1-a407-a476a8a8202d.png)

# 5. Conclusion

Nous avons réalisé ce projet **en deux temps** en tenant <br />
compte des contraintes qui nous ont été imposées.

Nous avons **dans un premier temps développé notre solution en local** <br />
sur une machine virtuelle dans un environnement Linux Ubuntu.

La <u>première phase</u> a consisté à **installer l'environnement de travail Spark**. <br />
**Spark** a un paramètre qui nous permet de travaillé en local et nous permet <br />
ainsi de **simuler du calcul partagé** en considérant <br />
**chaque cœur d'un processeur comme un worker indépendant**.<br />
Nous avons travaillé sur un plus **petit jeu de donnée**, l'idée était <br />
simplement de **valider le bon fonctionnement de la solution**.

Nous avons fait le choix de réaliser du **transfert learning** <br />
à partir du model **MobileNetV2**.<br />
Ce modèle a été retenu pour sa **légèreté** et sa **rapidité d'exécution** <br />
ainsi que pour la **faible dimension de son vecteur en sortie**.

Les résultats ont été enregistrés sur disque en plusieurs <br />
partitions au format "**parquet**".

<u>**La solution a parfaitement fonctionné en mode local**</u>.

La <u>deuxième phase</u> a consisté à créer un **réel cluster de calculs**. <br />
L'objectif était de pouvoir **anticiper une future augmentation de la charge de travail**.

Le meilleur choix retenu a été l'utilisation du prestataire de services **Amazon Web Services** <br />
qui nous permet de **louer à la demande de la puissance de calculs**, <br />
pour un **coût tout à fait acceptable**.<br />
Ce service se nomme **EC2** et se classe parmi les offres **Infrastructure As A Service** (IAAS).

Nous sommes allez plus loin en utilisant un service de plus <br />
haut niveau (**Plateforme As A Service** PAAS)<br />
en utilisant le service **EMR** qui nous permet d'un seul coup <br />
d'**instancier plusieurs serveur (un cluster)** sur lesquels <br />
nous avons pu demander l'installation et la configuration de plusieurs<br />
programmes et librairies nécessaires à notre projet comme **Spark**, <br />
**Hadoop**, **JupyterHub** ainsi que la librairie **TensorFlow**.

En plus d'être plus **rapide et efficace à mettre en place**, nous avons <br />
la **certitude du bon fonctionnement de la solution**, celle-ci ayant été <br />
préalablement validé par les ingénieurs d'Amazon.

Nous avons également pu installer, sans difficulté, **les packages <br />
nécessaires sur l'ensembles des machines du cluster**.

Enfin, avec très peu de modification, et plus simplement encore, <br />
nous avons pu **exécuter notre notebook comme nous l'avions fait localement**.<br />
Nous avons cette fois-ci exécuté le traitement sur **l'ensemble des images de notre dossier "Test"**.

Nous avons opté pour le service **Amazon S3** pour **stocker les données de notre projet**. <br />
S3 offre, pour un faible coût, toutes les conditions dont nous avons besoin pour stocker <br />
et exploiter de manière efficace nos données.<br />
L'espace alloué est potentiellement **illimité**, mais les coûts seront fonction de l'espace utilisé.

Il nous sera **facile de faire face à une monté de la charge de travail** en **redimensionnant** <br />
simplement notre cluster de machines (horizontalement et/ou verticalement au besoin), <br />
les coûts augmenteront en conséquence mais resteront nettement inférieurs aux coûts engendrés <br />
par l'achat de matériels ou par la location de serveurs dédiés.