# PROJET 8 : DÉPLOYEZ UN MODÈLE DANS LE CLOUD

## 0 - COMMENTAIRE :

Il m'a été impossible d'installer correctement Pyspark sur mon environnement Windows 10 - Anaconda. Après de nombreux essais, et combinaisons de versions java/python/pyspark, j'ai opté pour l'utilisation de la plateforme Databricks. Le code ci-dessous a été exécuté sous cette plateforme ; la mise en page (dont ce commentaire) a, quant à elle, été effectuée sur Jupyter notebook.

L'environnement Databricks Community Édition impliquait un système de gestion de données particulier qui a notamment énormément compliqué le redimensionnement des images (les bibliothèques PIL ou CV2 ne pouvaient lire les images depuis la source, seul Spark le pouvait. J'ai pu résoudre ce problème (voir II-B) en adaptant un code trouvé dans un article sur le traitement des images avec Spark : https://godatadriven.com/blog/real-distributed-image-processing-with-apache-spark/

N'ayant pu installer Spark sur mon PC, il m'était impossible de lancer un script pyspark via, par exemple, un cluster EMR de Amazon Web Services. J'ai résolu ce problème en souscrivant à l'offre d'essai de Databricks AWS Édition. Ce dernier m'a permis de lier mes comptes Databricks et AWS et d'exécuter ainsi mon code depuis Databricks sur les données stockées sur mon bucket S3.

J'ai enregistré les étapes effectuées pour déployer mon programme de preprocessing avec un enregistreur d'actions utilisateur (l'enregistrement est présent dans les livrables sous le numéro 7)

## I - PRÉALABLES :

### A - Importation des librairies :

In [0]:
# Importation des librairies :
from typing import Iterator
import numpy as np
import pandas as pd
from PIL import Image
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, udf, col
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.ml.feature import PCA

### B - Définition du répertoire :

#### 1 - Databricks :

#### 2 - AWS :

In [None]:
# Adresse du compartiment S3 :
path = "s3a://vivianorsprojet8/dataset/"

### C - Instanciation de Spark :

In [0]:
# Instanciation de Spark :
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

## II - PREPROCESSING DES IMAGES :

### A - Chargement des images :

In [0]:
# Récupération des images :
df = spark.read.format("image").load(path + "*")
print((df.count(), len(df.columns)))
print(df.printSchema())
df.show(5)

(200, 1)
root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)

None
+--------------------+
|               image|
+--------------------+
|{dbfs:/FileStore/...|
|{dbfs:/FileStore/...|
|{dbfs:/FileStore/...|
|{dbfs:/FileStore/...|
|{dbfs:/FileStore/...|
+--------------------+
only showing top 5 rows



### B - Traitement et redimensionnement des images (problèmes mémoire) :

In [0]:
# Traitement et redimensionnement des images (à cause d'erreurs mémoire)
schema = StructType(df.select("image.*").schema.fields + [
    StructField("data_array", ArrayType(IntegerType()), True)
])

def preprocess_array_image(data):
    mode = 'RGB' 
    img = Image.frombytes(mode=mode, data=data.data, size=[100, 100])
    img = img.resize([10, 10])
    arr = np.asarray(img)
    arr = arr.reshape([10*10*3])
    return arr

def array_image(dataframe_batch_iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for dataframe_batch in dataframe_batch_iterator:
        dataframe_batch["data_array"] = dataframe_batch.apply(preprocess_array_image, axis=1)
        yield dataframe_batch

df = df.select("image.*").mapInPandas(array_image, schema)

df.show()

+--------------------+------+-----+---------+----+--------------------+--------------------+
|              origin|height|width|nChannels|mode|                data|          data_array|
+--------------------+------+-----+---------+----+--------------------+--------------------+
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FF FF FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FD FE FF FD FF F...|[255, 255, 255, 2...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...

### C - Vectorisation :

In [0]:
# Vectorisation :
ImageSchema.imageFields
img2vec = udf(lambda l: Vectors.dense(l), VectorUDT())
df = df.withColumn("vectors", img2vec("data_array"))
df.show()

+--------------------+------+-----+---------+----+--------------------+--------------------+--------------------+
|              origin|height|width|nChannels|mode|                data|          data_array|             vectors|
+--------------------+------+-----+---------+----+--------------------+--------------------+--------------------+
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FF FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2

### D - Standardisation :

In [0]:
# Standardisation :
scaler = StandardScaler(inputCol="vectors", outputCol="scaled_vectors", withMean=True, withStd=True)
model_std = scaler.fit(df)
df = model_std.transform(df)
df.show()

+--------------------+------+-----+---------+----+--------------------+--------------------+--------------------+--------------------+
|              origin|height|width|nChannels|mode|                data|          data_array|             vectors|      scaled_vectors|
+--------------------+------+-----+---------+----+--------------------+--------------------+--------------------+--------------------+
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF F

### E - Réduction de dimension (PCA) :

In [0]:
# Réduction de dimension :
pca = PCA(k=10, inputCol="scaled_vectors", outputCol="vectors_redux")
redux = pca.fit(df)
df = redux.transform(df)
df.show()

+--------------------+------+-----+---------+----+--------------------+--------------------+--------------------+--------------------+--------------------+
|              origin|height|width|nChannels|mode|                data|          data_array|             vectors|      scaled_vectors|       vectors_redux|
+--------------------+------+-----+---------+----+--------------------+--------------------+--------------------+--------------------+--------------------+
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|[13.6054451869003...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|[13.3463750962408...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF FF F...|[255, 255, 255, 2...|[255.0,255.0,255....|[0.07071067811898...|[14.2306277378380...|
|dbfs:/FileStore/s...|   100|  100|        3|  16|[FF FF FE FF F

### F - Finalisation du Dataframe :

In [0]:
# Réorganisation du Dataframe et ajout de la target :
df = df.select("origin","vectors_redux")
df = df.withColumn("target", split(col("origin"), "dataset/").getItem(1))
df = df.withColumn("target", split(col("target"), "/").getItem(0))
df.show()

+--------------------+--------------------+---------+
|              origin|       vectors_redux|   target|
+--------------------+--------------------+---------+
|dbfs:/FileStore/s...|[13.6054451869003...|Pineapple|
|dbfs:/FileStore/s...|[13.3463750962408...|Pineapple|
|dbfs:/FileStore/s...|[14.2306277378380...|Pineapple|
|dbfs:/FileStore/s...|[14.3587066936779...|Pineapple|
|dbfs:/FileStore/s...|[13.8885565638878...|Pineapple|
|dbfs:/FileStore/s...|[13.6723987628422...|Pineapple|
|dbfs:/FileStore/s...|[14.2132031079448...|Pineapple|
|dbfs:/FileStore/s...|[13.8688005540647...|Pineapple|
|dbfs:/FileStore/s...|[13.9802638320546...|Pineapple|
|dbfs:/FileStore/s...|[14.9124765199279...|Pineapple|
|dbfs:/FileStore/s...|[13.8943097883129...|Pineapple|
|dbfs:/FileStore/s...|[14.1397887641841...|Pineapple|
|dbfs:/FileStore/s...|[14.2733212182557...|Pineapple|
|dbfs:/FileStore/s...|[14.2099528354214...|Pineapple|
|dbfs:/FileStore/s...|[14.9796009051326...|Pineapple|
|dbfs:/FileStore/s...|[14.30

## III - EXPORT :

### A - Export aux formats json et parquet :

In [0]:
# Export :
resultats = "s3://vivianorsprojet8/résultats/"
df.write.parquet(résultats)
df.write.json(résultats)

### B - Réimport, assemblage et export au format csv :

In [1]:
# Importation de librairies
import pandas as pd
import glob

In [3]:
# Répertoires des fichiers json :
json_path = "C:/Users/7700k/Desktop/json/"

In [5]:
# Fonction de lecture des fichiers json partitionnés :
def readFiles(path):
    files = glob.glob(path + "*")
    dfs = [] # an empty list to store the data frames
    for file in files:
        data = pd.read_json(file, lines=True) # read data frame from json file
        dfs.append(data) # append the data frame to the list

    df = pd.concat(dfs, ignore_index=True) # concatenate all the data frames in the list.
    return df

In [8]:
# Lecture des fichiers json réunis :
csv_data = readFiles(json_path)
csv_data.head(5)

Unnamed: 0,origin,vectors_redux,target
0,s3a://vivianorsprojet8/dataset/Pineapple/116_1...,"{'type': 1, 'values': [11.155305263739486, -1....",Pineapple
1,s3a://vivianorsprojet8/dataset/Pineapple/119_1...,"{'type': 1, 'values': [10.836595752156528, -1....",Pineapple
2,s3a://vivianorsprojet8/dataset/Pineapple/112_1...,"{'type': 1, 'values': [11.929215624288979, -2....",Pineapple
3,s3a://vivianorsprojet8/dataset/Pineapple/149_1...,"{'type': 1, 'values': [11.681932915058056, -1....",Pineapple
4,s3a://vivianorsprojet8/dataset/Pineapple/121_1...,"{'type': 1, 'values': [11.19777385992577, -1.7...",Pineapple


In [9]:
# Export au format CSV :
csv_data.to_csv("Ors_Vivian_2_images_082022.csv")