In [1]:
# Les packages suivants doivent être installés dans le cas d'utilisation d'AWS
#sc.install_pypi_package("pip==20.2.4")
#sc.install_pypi_package("opencv-python")
#sc.install_pypi_package("resize-image")
#sc.install_pypi_package("boto3")
#sc.install_pypi_package("pandas")
#sc.install_pypi_package("scikit-learn")
#sc.install_pypi_package("Pillow==7.0.0")
#sc.install_pypi_package("pyquickhelper")
#sc.list_packages()

In [2]:
##################
### PARAMETRES ###
##################


LOCAL = True # Serveur local (True) - Cloud (False)
LOC_EXP = True # Export de RDD en fichier texte (uniquement si LOCAL=True)
FOLDER = 'data/fruits_360_v3b/' # Chemin du jeu de données (images de fruits)
MIN_PARTITION = 18 # Nombre minimum de partitions (initialisation)
OPENCV = 'sift' # Méthode de calcul des descripteurs ('sift' ou 'orb')
CLS = True # Lancement du processus de classification (True)
OPT = False # Fonctions supplémentaires (True)
rs_ = 42 # Random state

# Format des titres
def title(title,level):
    if level == 1:
        print('\n'+'='*len(title),'\n'+title,'\n'+'='*len(title)+'\n')
    elif level == 2:
        print('\n'+title,'\n'+'='*len(title)+'\n')
    elif level == 3:
        print(title,'\n'+'-'*len(title)+'\n')

        
#############################
### IMPORT DES LIBRAIRIES ###
#############################


title("> > > > Import des librairies < < < <",1)

# Librairies système
import io
from io import StringIO
import os
import sys
import ast
from pyquickhelper.filehelper import remove_folder
import time
from time import strftime, gmtime

# Traitement des données
import pandas as pd
import numpy as np
from sklearn import preprocessing

# Traitement des images
from PIL import Image
from resizeimage import resizeimage
import cv2

# Librairies Pyspark
from pyspark import SparkContext
from pyspark.ml.feature import PCA
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.mllib.clustering import KMeans
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Librairies S3
import boto.s3
import boto3


################################
### DEFINITION DES FONCTIONS ###
################################


title("> > > > Définition des fonctions < < < <",1)

list_elapsed = []
list_elapsed_mem = [] # Mémorisation d'un résultat
list_ope = []

def time_calc(ope='dernière operation', elapsed_mem='', print_tot=0, print_all=0):
    
    elapsed = round((time.time() - t),2)
    
    if ope != 'dernière operation':
        print('Durée de l\'opération %r:' %ope, round(elapsed,2),'s')
    else:
        print('Durée de l\'opération:', round(elapsed,2),'s')
    
    list_elapsed.extend([elapsed])
    list_elapsed_mem.extend([elapsed_mem])
    list_ope.extend([ope])
    tot_duration = sum(list_elapsed)
    df_time = pd.DataFrame({'Opération': list_ope, 'Durée': list_elapsed, 'Estimation': list_elapsed_mem})

    if print_tot == 1:
        print('Durée totale de traitement:',strftime('%H', gmtime(tot_duration)),'h',
              strftime('%M', gmtime(tot_duration)),'m',strftime('%S', gmtime(tot_duration)),'s')
    
    if print_all == 1:
        print('')
        print('Durée des opérations')
        print(len('Durée des opérations')*'-')
        print(df_time)  


def clean(folder):
    if os.path.exists(folder):
        return remove_folder(folder)
    else:
        return []


def part_dim(rdd):
    print('\nNombre de partitions:',rdd.getNumPartitions())
    print('Dimension:',rdd.count())


def get_path(data_set):
    """ Identification des chemins d'accès aux répertoires d'images.
    
    - Le paramètre data_set est une chaîne de caractères qui indique le nom du répertoire
    contenant les sous-répertoires des images à traiter. Il prend les valeurs '/Training/' ou '/Test/'.    
    - La variable globale LOCAL indique la localisation des fichiers (local ou S3).    
    - La variable globale FOLDER indique le chemin d'accès au répertoire principal contenant
    les répertoires '/Training/' et '/Test/'.    
    - La fonction renvoie, sous la forme d'un chaîne de caractère, les chemins d'accès à l'ensemble
    des sous-répertoires contenant les images.
    """
    
    title("Identification des chemins d'accès aux répertoires d'images",1)
    
    image_path = ''
    
    list_dir = []
    list_file = []
    
    if LOCAL:
        image_folder = FOLDER + data_set
        for root, directories, files in os.walk(image_folder):
            list_file.append(len(files))
            
            for file in directories:
                # Pour chaque chemin de sous-répertoire de fruits
                # On récupère le dernier élément du chemin
                sub_folder = os.path.join(root, file).split('/')[-1]
                image_path = image_path + image_folder + sub_folder + ','            
                list_dir.append(sub_folder)           
    
        del list_file[0]
        df_dir_files = pd.DataFrame()
        df_dir_files['Catégorie'] = list_dir
        df_dir_files['Nombre d\'images'] = list_file
        print('Nombre d\'images par catégorie (sous-répertoire):\n\n',df_dir_files.head(),'\n')
        print('Nombre total d\'images:',sum(list_file),'\n')
                
    else:
        image_folder = 's3://oc-ds-p8/' + FOLDER + data_set
        conn = boto.s3.connect_to_region('eu-west-1')
        bucket = conn.get_bucket('oc-ds-p8')
        folders = bucket.list(prefix = FOLDER + data_set, delimiter='/')
        for folder in folders:
            sub_folder = folder.name.split('/')[-2]
            image_path = image_path + image_folder + folder.name.split('/')[-2] + '/' + ','

    # On supprime la virgule de fin (dernier caractère)
    image_path = image_path[0:-1]
    # Liste des chemins des sous-répertoires de fruits (catégories)
    cat_path = image_path.split(',')
    # Nombre de catégories
    nb_cat = len(cat_path)
    
    dataset_path = FOLDER + data_set
    print('dataset_path =',dataset_path)
    print('\nimage_path =',image_path)    
    
    print('\nNombre de catégories de fruits:',nb_cat)
    print('\n2 premières catégories:',cat_path[0:2])
    print('2 dernières catégories:',cat_path[-2:])
    print('')
    
    return nb_cat, image_path, dataset_path


def get_descriptors(image):
    """ La fonction prend en entrée le fichier binaire d'une image et procède à son traitement.
    
    - Transformation du fichier binaire.    
    - Traitement.
    - Calcul des descripteurs avec openCV sur chaque image.    
    - La fonction renvoie le nom de l'image comme clé et la liste des descripteurs comme valeur.
    """ 
    
    try:
        # Transformation du fichier binaire en image
        name, img = image
        image_img = Image.open(io.BytesIO(img))
        
        cat_id = name.split('/')[-2]
        image_id = name.split('/')[-2] + '_' + name.split('/')[-1]
        
        print('image_id:',image_id)
        
        # Traitement de l'image
        fill_color=(255, 255, 255, 0)
        x, y = image_img.size
        
        if x < y:
            size_x = y
            size_y = y
        else:
            size_x = x
            size_y = x
        
        new_im = Image.new('RGB', (size_x, size_y), fill_color)
        new_im.paste(image_img, (int((size_x - x) / 2), int((size_y - y) / 2)))
        img_cropped = new_im
        
        # Redimension de l'image
        width = 100
        img_rescaled = resizeimage.resize_cover(img_cropped, [width, width])
        # Choix du format (original ou traité)
        np_img = np.array(image_img)
        #np_img = np.array(img_rescaled)
        
        # Calcul des descripteurs
        if OPENCV == 'sift':
            sift = cv2.SIFT_create()
            keypoints_sift, desc = sift.detectAndCompute(np_img, None)
        elif OPENCV == 'orb':
            orb = cv2.ORB_create(nfeatures=100)
            keypoints_orb, desc = orb.detectAndCompute(np_img, None)
        
        # On supprime les clés dont les descripteurs sont vides
        # On construit un tableau que l'on remplit avec l'identifiant de l'image en cours de traitement
        # Dans le cas où le traitement n'a produit aucun descripteur...
        if desc is None:
            # Dimensions du tableau: 1 ligne
            ima = np.full(1, image_id)
            ima_cat = np.full(1, cat_id)
        # Si le traitement a produit au moins 1 descripteur...
        else:
            # Dimensions du tableau: autant de lignes que de descripteurs
            ima = np.full(desc.shape[0], image_id)
            ima_cat = np.full(desc.shape[0], cat_id)
         
    except:
        ima = np.full(1, "error")
        desc = None
        ima_cat = np.full(1, "error")
    
    return ima_cat, ima, desc


def get_images_descriptors(dataset_path):
    
    """ Cette fonction charge toutes les images et récupère les descripteurs via la fonction get_descriptors. 
   
    - La fonction utilise la chaîne donnée en entrée (image_path) pour charger les images avec binaryFiles.
    - Les données des images sont passées à la fonction get_descriptors qui renvoie les descripteurs par image.
    - On obtient un RDD avec les catégories, images et descripteurs (on supprime les clés sans descripteur).
    - La fonction renvoie la liste collectée des images et le RDD des descripteurs (non collecté).    
    """
    
    title("Calcul des descripteurs",1)

    title("Chargement des images (rdd_images)",2)
    
    if LOCAL == True:
        sdf_images = spark.read.format("binaryFile") \
                               .option("pathGlobFilter", "*.jpg") \
                               .option("recursiveFileLookup", "true") \
                               .load(dataset_path) \
                               .select("path","content")

        rdd_images = sdf_images.rdd

    elif LOCAL == False:
        
        rdd_images = sc.binaryFiles(image_path, minPartitions = MIN_PARTITION)
        rdd_images = rdd_images.repartition(MIN_PARTITION)
 
    print(rdd_images)
    part_dim(rdd_images)
        
    if LOCAL and LOC_EXP:
        clean("rdd/rdd_images.txt")
        rdd_images.saveAsTextFile(os.path.abspath("rdd/rdd_images.txt"))

    title("Catégories / Images / Descripteurs (rdd_cat_ima_desc)",2) 
    # On utilise la fonction get_descriptors() pour extraire les descripteurs
    # On récupères des tuples avec la catégorie, l'image et les descripteurs
    rdd_cat_ima_desc = rdd_images.map(lambda img: get_descriptors(img))    
    print(rdd_cat_ima_desc)
    
    # On supprime les clés où les descripteurs sont vides
    # On mémorise le résultat (cache)
    title("Catégories / Images / Descripteurs (rdd_cat_ima_desc_f)",2)
    
    rdd_cat_ima_desc_f = rdd_cat_ima_desc.filter(lambda x: x[2] is not None).cache()
    
    print(rdd_cat_ima_desc_f)
    
    if LOCAL and LOC_EXP:
        clean("rdd/rdd_cat_ima_desc_f.txt")
        rdd_cat_ima_desc_f.saveAsTextFile(os.path.abspath("rdd/rdd_cat_ima_desc_f.txt"))
    
    # Mise à plat des catégories des images
    title("Catégories (rdd_cat)",2)
    rdd_cat = rdd_cat_ima_desc_f.flatMap(lambda x: x[0])
    print(rdd_cat)
       
    # Mise à plat des identifiants des images
    title("Identifiants des images (rdd_ima)",2)
    rdd_ima = rdd_cat_ima_desc_f.flatMap(lambda x: x[1])
    print(rdd_ima)
    
    # Mise à plat des descripteurs
    title("Descripteurs (rdd_desc)",2)
    rdd_desc = rdd_cat_ima_desc_f.flatMap(lambda x: x[2])
    print(rdd_desc)    
    part_dim(rdd_desc)
    
    # On collecte les catégories des images
    title("Collecte des catégories d\'images (list_cat)",2)
    list_cat = rdd_cat.collect()
    print("3 premières occurences:",list_cat[0:3])
    
    # On collecte les identifiants des images
    title("Collecte des identifiants des images (list_ima)",2)
    list_ima = rdd_ima.collect()
    print("3 premières occurences:",list_ima[0:3])
        
    # On construit un DataFrame Pandas avec les listes des catégories et identifiants des images
    #title("Identifiants des images et des catégories (df_ima_cat)",2)
    df_ima_cat = pd.DataFrame({'ima': list_ima, 'cat': list_cat,})  
    print('df_ima_cat:',df_ima_cat.shape)
    df_ima_cat = df_ima_cat.drop_duplicates()
    print('df_ima_cat (sans dup):',df_ima_cat.shape)
    #print('3 premières occurences:\n\n',df_ima_cat.head(3))
    
    # On exporte le DataFrame Pandas
    if LOCAL:
        df_ima_cat.to_csv('temp/df_ima_cat.csv', index=False)
        df_ima_cat = pd.read_csv('temp/df_ima_cat.csv',encoding='utf-8',low_memory=False)
    else:
        csv_buffer = StringIO()
        df_ima_cat.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')
        # upload
        s3_resource.Object('oc-ds-p8', 'temp/df_ima_cat.csv').put(Body=csv_buffer.getvalue(), ACL='public-read')
        # download
        s3_resource.Object('oc-ds-p8', 'temp/df_ima_cat.csv').download_file(f'/tmp/df_ima_cat.csv')
        df_ima_cat = pd.read_csv('/tmp/df_ima_cat.csv',encoding='utf-8',low_memory=False)  
        df_ima_cat = df_ima_cat[['ima','cat']]
   
    sdf_ima_cat = spark.createDataFrame(df_ima_cat)
    title("Identifiants des images et des catégories (sdf_ima_cat)",2)
    sdf_ima_cat.printSchema()
    sdf_ima_cat.limit(3).show()
    
    return rdd_desc, list_ima, sdf_ima_cat, df_ima_cat


def kmeans_train(rdd_desc, nb_cat):
    """ Classification non supervisée des descripteurs avec l'algorithme K-Means. """
    
    title("Classification non supervisée des descripteurs avec K-Means",1)
    
    title("Modèle K-Means (km_model)",2)
    # Détermination du nombre de clusters
    nb_clusters = nb_cat * 10
    # Entraînement du modèle    
    km_model = KMeans.train(rdd_desc, nb_clusters, maxIterations=500, initializationMode="random")    
    print(km_model)
    print('\nNombre de clusters:',nb_clusters,'\n')
    
    return nb_clusters, km_model


def kmeans_pred(km_model, rdd_desc):
    """ Prédictions des descripteurs avec K-Means."""
    
    title("Prédictions des descripteurs avec K-Means",1)    
    
    title("Prédictions (rdd_km_pred)",2)
    # Prédiction    
    rdd_km_pred = km_model.predict(rdd_desc)    
    print(rdd_km_pred)    
    part_dim(rdd_km_pred)   
    title("Collecte des prédictions (list_km_pred)",2)
    # Collecte des prédictions
    list_km_pred = rdd_km_pred.collect()
    print(list_km_pred[0:10],'\n')
    
    return list_km_pred


def bow_creation(list_ima, list_km_pred):
    """ Création d'un bag of words pour les images.
    
    - Encodage des identifiants d'image dans la variable 'id'.
    - Parallélisation les données dans un DataFrame spark converti en RDD.
    - On applique un map et reduceByKey pour obtenir la liste des clusters pour chaque image.
    - Historisation: on compte le nombre de clusters par image (CountVectorizer).
    """
    
    title("Création du bag of words",1)
    
    title("Encodage des identifiants d'images et concatenation avec les prédictions (clusters K-Means)",2)
    
    title("Encodage des identifiants d'images (sdf_ima_label)",3)
    
    # Encodage des identifiants des images
    le = preprocessing.LabelEncoder()
    label_ima = le.fit_transform(list_ima)
    
    df_ima_label = pd.DataFrame()
    df_ima_label['IMA'] = list_ima
    df_ima_label['image_id'] = label_ima
    df_ima_label['prediction'] = list_km_pred
    
    # On exporte le DataFrame Pandas
    if LOCAL:
        df_ima_label.to_csv(os.path.abspath("temp/df_ima_label.csv"), index=False)
        df_ima_label = pd.read_csv('temp/df_ima_label.csv',encoding='utf-8',low_memory=False)
    else:
        csv_buffer = StringIO()
        df_ima_label.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')        
        # upload
        s3_resource.Object('oc-ds-p8', "temp/df_ima_label.csv") \
                   .put(Body=csv_buffer.getvalue(), ACL='public-read')     
        # download
        s3_resource.Object('oc-ds-p8', 'temp/df_ima_label.csv').download_file(f'/tmp/df_ima_label.csv')
        df_ima_label = pd.read_csv('/tmp/df_ima_label.csv',encoding='utf-8',low_memory=False)
    
    sdf_ima_label = spark.createDataFrame(df_ima_label)
    sdf_ima_label.limit(10).show()
    
    # Concatenation des identifiants des images (numériques) et des prédictions (clusters K-Means)
    numpy_arr = np.concatenate((np.array(label_ima).reshape(-1,1), 
                                np.array(list_km_pred).reshape(-1,1)), axis=1)
    
    # On construit un DataFrame Pandas avec comme variables les identifiants des images et les prédictions
    # L'identifiant unique du DataFrame représente les descripteurs (1 image a plusieurs descripteurs)
    df_ima_pred = pd.DataFrame(numpy_arr, columns=['id', 'prediction'])
    
    title("Prédictions (clusters K-Means) par image (sdf_ima_pred)",3)
    # On convertit le DataFrame Pandas en DataFrame Spark
    sdf_ima_pred = spark.createDataFrame(df_ima_pred)
    sdf_ima_pred.printSchema()
    sdf_ima_pred.limit(10).show()
    
    title("Liste des clusters par image (Map + reduceByKey)",2)
    
    title("Clusters par image (rdd_words)",3)
    # On liste les clusters pour chaque image  
    rdd_ima_pred = sdf_ima_pred.rdd.map(lambda x:x)
    rdd_words = rdd_ima_pred.reduceByKey(lambda a,b: str(a) + ',' + str(b)) 
    print(rdd_words)    
    part_dim(rdd_words)
    
    if LOCAL and LOC_EXP:
        clean("rdd/rdd_words.txt")
        rdd_words.saveAsTextFile(os.path.abspath("rdd/rdd_words.txt"))
    
    title("\nListe de 'words' par image (sdf_worcds)",3) 
    # On construit une liste de 'words' à partir de la liste des clusters (string)    
    sdf_words = rdd_words.map(lambda tupl_words: (tupl_words[0], str(tupl_words[1]).split(','))) \
                         .toDF(['image_id','words'])
    
    sdf_words.printSchema()
    sdf_words.limit(10).show()
    
    title("Création du bag of words à partir des listes de 'words' associées aux images (CountVectorizer)",2)

    title("Bag of words (sdf_bow)",3)
    # Création d'un vecteur de 'words' par image    
    vectorizer = CountVectorizer(inputCol="words", outputCol="bag_of_words")    
    vectorizer_transformer = vectorizer.fit(sdf_words)
    sdf_bow = vectorizer_transformer.transform(sdf_words).select('image_id', 'bag_of_words')
    sdf_bow.printSchema()
    sdf_bow.limit(10).show()
         
    return sdf_bow, sdf_ima_label, df_ima_label


def bow_save(sdf_bow, list_ima, name):
    """ Sauvegarde du bag of words en local ou sur S3. """
    
    title("Sauvegarde du bag of words",1)

    # On transforme le bag of words (DataFrame Spark) en DataFrame Pandas
    df_bow = sdf_bow.toPandas()
    
    title("Bag of words (df_bow)",2)
    print(df_bow.head(),'\n')
    
    # On exporte le DataFrame Pandas
    if LOCAL:
        df_bow.to_csv('out/'+name, index=False)
    else:
        csv_buffer = StringIO()
        df_bow.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')
        s3_resource.Object('oc-ds-p8', 'out/'+name).put(Body=csv_buffer.getvalue(), ACL='public-read')
        
    return df_bow


def bow_transform(df_bow, nb_clusters, name, list_ima, df_ima_cat, df_ima_label):
    """ Transformation du bag of words pour affichage en mode tableau. """
    
    if LOCAL:
        bag_of_words = pd.read_csv('out/df_bow_'+name,encoding='utf-8',low_memory=False)
    else:
        s3_resource = boto3.resource('s3')
        s3_resource.Object('oc-ds-p8', 'out/df_bow_'+name).download_file(f'/tmp/df_bow_'+name)
        bag_of_words = pd.read_csv('/tmp/df_bow_'+name,encoding='utf-8',low_memory=False)

    list_image_id = bag_of_words['image_id']
    
    x0 = bag_of_words['bag_of_words']
    # eval: transformation string en tuple
    x01 = x0.transform(lambda x: eval(x.replace(str(nb_clusters)+',','').replace('(','').replace(')','')))
    x01df = pd.DataFrame(x01)

    visual_words = list(range(nb_clusters))
    list_vw_val_all = []
    
    # Pour chaque image
    for i in list(range(x01.shape[0])):
        x01 = x01df.iloc[i,0]

        list_vw_val = []
        ind = 0

        # Pour chaque visual word
        for vw in visual_words:
            # Si le visual word existe dans le tuple de l'image
            if vw in x01[0]:
                # On stocke sa valeur dans la liste des valeurs de visual words
                list_vw_val.append(x01[1][ind])
                # On incrémente l'indice de localisation des valeurs
                ind = ind + 1
            # Si le visual word n'est pas présent pour l'image
            else:
                # On ajoute 0 à la liste concernant ce visual word
                list_vw_val.append(0)

        list_vw_val_all.append(list_vw_val)

    bow = pd.DataFrame(list_vw_val_all)
    bow["image_id"] = list_image_id
    
    del df_ima_label['prediction']
    df_ima_label = df_ima_label.drop_duplicates()
    
    bow = bow.merge(df_ima_label, left_on = 'image_id', right_on = 'image_id')

    list1 = ['image_id','IMA']
    list2 = bow.columns.tolist()[:-2]
    list1.extend(list2)
    bow = bow[list1]

    bow = df_ima_cat.merge(bow, left_on='ima', right_on='IMA')
    del bow['image_id']
    del bow['IMA']
        
    title("Bag of words",2)
    print(bow.iloc[0:5,0:8],'\n')
    print('Dimensions du jeu de données:',bow.shape,'\n')
    
    # On exporte le DataFrame Pandas
    if LOCAL:
        bow.to_csv('out/bow_'+name, index=False)
    else:
        csv_buffer = StringIO()
        bow.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')
        s3_resource.Object('oc-ds-p8', 'out/bow_'+name).put(Body=csv_buffer.getvalue(), ACL='public-read')


def bow_reduce(sdf_bow, nb_clusters, sdf_ima_cat, data_set_name, sdf_ima_label):
    """ Réduction des dimensions du bag of words avec la méthode ACP. """
    
    title('Réduction de dimension PCA',1)
    
    pca_dim = int(nb_clusters-(nb_clusters*0.3))
    pca = PCA(k=pca_dim, inputCol="bag_of_words", outputCol="features")
    model = pca.fit(sdf_bow)
    sdf_features = model.transform(sdf_bow).select("features")
    
    title("Résultats de la PCA (sdf_features)",2)
    sdf_features.printSchema()
    #sdf_features.show(truncate=False)
    sdf_features.limit(10).show()
    
    # Fonction de création d'un index dans un DataFrame Spark    
    def with_column_index(sdf):
        new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
        return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

    df1_ci = with_column_index(sdf_bow)
    df2_ci = with_column_index(sdf_features)
    sdf_ima_features = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")
    title("Jointure entre les ids des images et les features (sdf_ima_features)",2)
    sdf_ima_features.printSchema()
    sdf_ima_features.limit(10).show()    

    sdf_ima_label_2 = sdf_ima_label.drop('prediction')  
    sdf_ima_lab_features = sdf_ima_label_2.join(sdf_ima_features, "image_id")  
    
    sdf_ima_cat_features = sdf_ima_lab_features.join(sdf_ima_cat, "IMA")    
    sdf_cat_features = sdf_ima_cat_features.drop('IMA')
    sdf_cat_features = sdf_ima_cat_features.drop('image_id')
    sdf_cat_features = sdf_ima_cat_features.drop('bag_of_words')    
    
    sdf_cat_features = sdf_cat_features.distinct()
    
    title("Jointure entre les catégories et les features (sdf_cat_features)",2)
    sdf_cat_features.printSchema()
    sdf_cat_features.limit(10).show()

    # Encodage de la variable catégorie
    indexer = StringIndexer(inputCol="cat", outputCol="label").fit(sdf_cat_features)
    sdf_lab_features = indexer.transform(sdf_cat_features)
    sdf_lab_features = sdf_lab_features.drop("cat")    
    sdf_lab_features = sdf_lab_features.select("label","features")
    
    title("Encodage de la variable catégories (sdf_lab_features)",2)
    sdf_lab_features.printSchema()
    sdf_lab_features.limit(10).show()
    
    df_lab_features = sdf_lab_features.toPandas()
    df_lab_features = df_lab_features.drop_duplicates()

    # On exporte le DataFrame Pandas
    if LOCAL:
        df_lab_features.to_csv(os.path.abspath("temp/df_lab_features_0.csv"), index=False)
        df_lab_features = pd.read_csv('temp/df_lab_features_0.csv',encoding='utf-8',low_memory=False)
    else:
        csv_buffer = StringIO()
        df_lab_features.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')        
        # upload
        s3_resource.Object('oc-ds-p8', "temp/df_lab_features_0.csv") \
                   .put(Body=csv_buffer.getvalue(), ACL='public-read')      
        # download
        s3_resource.Object('oc-ds-p8', 'temp/df_lab_features_0.csv').download_file(f'/tmp/df_lab_features_0.csv')
        df_lab_features = pd.read_csv('/tmp/df_lab_features_0.csv',encoding='utf-8',low_memory=False)
    
    df_lab_features = df_lab_features[['label','features']]

    feat_list_all = []    
    for i in list(range(df_lab_features.shape[0])):
        for feat_str in df_lab_features.iloc[i,1:]:
            feat_list = ast.literal_eval(feat_str)
            feat_list_all.append(feat_list)

    df_lab_features_feat = pd.DataFrame(feat_list_all)     
    df_lab_features_lab = df_lab_features[['label']]

    df_lab_features = df_lab_features_lab.merge(df_lab_features_feat, left_index=True, right_index=True)

    # On exporte le DataFrame Pandas
    if LOCAL:
        df_lab_features.to_csv(os.path.abspath("out/df_lab_features_" + data_set_name + ".csv"), index=False)
    else:
        csv_buffer = StringIO()
        df_lab_features.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')
        s3_resource.Object('oc-ds-p8', "out/df_lab_features_" + data_set_name + ".csv") \
        .put(Body=csv_buffer.getvalue(), ACL='public-read')

    title("Bag of words après réduction de dimension (df_lab_features)",2)
    print(df_lab_features.iloc[0:9,0:7])
    print('\nDimensions du nouveau jeu de données avec les étiquettes (df_lab_features):',df_lab_features.shape)
        
    if OPT == True:
        
        # df_features
        df_features = sdf_features.toPandas()
            
        # On exporte le DataFrame Pandas
        if LOCAL:
            df_features.to_csv(os.path.abspath("temp/df_features.csv"), index=False)
            df_features = pd.read_csv('temp/df_features.csv',encoding='utf-8',low_memory=False)
        else:
            csv_buffer = StringIO()
            df_features.to_csv(csv_buffer)
            s3_resource = boto3.resource('s3')
            # upload
            s3_resource.Object('oc-ds-p8', "temp/df_features.csv").put(Body=csv_buffer.getvalue(), ACL='public-read')
            # download
            s3_resource.Object('oc-ds-p8', 'temp/df_features.csv').download_file(f'/tmp/df_features.csv')
            df_features = pd.read_csv('/tmp/df_features.csv',encoding='utf-8',low_memory=False)
        
        df_features = df_features[['features']]
        
        feat_list_all = []    
        for i in list(range(df_features.shape[0])):
            for feat_str in df_features.iloc[i,:]:
                feat_list = ast.literal_eval(feat_str)
                feat_list_all.append(feat_list)

        title("\nBag of words après réduction de dimension (df_features)",2)
        df_features = pd.DataFrame(feat_list_all)
        print(df_features.iloc[0:5,0:5])
        print('\nDimension du nouveau jeu de données (df_features):',df_features.shape,'\n')
    
    return sdf_lab_features, sdf_features, pca_dim


def rf_classification(sdf_lab_features):
    
    title('Classification RF',1)
    
    rf = RandomForestClassifier(numTrees=1000, maxDepth=10, labelCol="label", seed=42)
    # Valeur max pour maxDepth = 30
    rf_model = rf.fit(sdf_lab_features)
    
    return rf_model
 

def rf_prediction(sdf_features, sdf_lab_features, rf_model):
    
    title('Prédictions RF',1)
    
    test_lab_pred = rf_model.transform(sdf_features).select("features", "prediction")
    title('Prédictions RF (test_lab_pred)',2)
    print(type(test_lab_pred))
    print(test_lab_pred)
    
    test_lab_pred_2 = rf_model.transform(sdf_lab_features)
    predictionAndLabels = test_lab_pred_2.select("prediction", "label")
    title('Prédictions (predictionAndLabels)',2)
    print(type(predictionAndLabels))
    print(predictionAndLabels)
    predictionAndLabels.limit(10).show()
    
    predictionAndLabels_2 = test_lab_pred_2.select("prediction", "label", "features")
    predictionAndLabels_2.limit(10).show()
    
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    accuracy = round(evaluator.evaluate(predictionAndLabels),2)
    print("Test set accuracy (RF) = " + str(accuracy))
    
    return test_lab_pred

  
def mlp_classification(sdf_lab_features, pca_dim, nb_cat):
    
    title('Classification MLP',1)
    
    layers = [pca_dim, pca_dim, pca_dim, nb_cat]
    
    mlp = MultilayerPerceptronClassifier(layers=layers, seed=rs_)
    mlp.setMaxIter(200)
    mlp.setBlockSize(128)
    mlp_model = mlp.fit(sdf_lab_features)
    mlp_model.setFeaturesCol("features")    
    
    print("Modèle:\n\n",mlp_model,'\n')
    
    return mlp_model


def mlp_prediction(sdf_features, sdf_lab_features, mlp_model):
    
    title('Prédictions MLP',1)
    
    if OPT == True:
        test_pred = mlp_model.predict(sdf_features.head().features)
        test_pred_raw = mlp_model.predictRaw(sdf_features.head().features)
        test_pred_prob = mlp_model.predictProbability(sdf_features.head().features)

    test_lab_pred = mlp_model.transform(sdf_features).select("features", "prediction")
    
    title('Prédictions (test_lab_pred)',2)
    print(test_lab_pred)
    test_lab_pred.limit(10).show()
    
    test_lab_pred_2 = mlp_model.transform(sdf_lab_features)
    predictionAndLabels = test_lab_pred_2.select("prediction", "label")  
    
    predictionAndLabels_2 = test_lab_pred_2.select("prediction", "label", "features")
    title('Prédictions (predictionAndLabels_2)',2)
    print(predictionAndLabels_2)
    predictionAndLabels_2.limit(10).show()
    
    evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
    accuracy = round(evaluator.evaluate(predictionAndLabels),2)
    print("Test set accuracy (MLP) = " + str(accuracy))
    
    return test_lab_pred
    
    
def cls_eval(sdf_ima_cat, test_lab_pred, sdf_bow, sdf_ima_label):
    
    title('Evaluation',1)

    # Fonction de création d'un index dans un DataFrame Spark    
    def with_column_index(sdf):
        new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
        return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

    df1_ci = with_column_index(sdf_bow)
    df2_ci = with_column_index(test_lab_pred)
    sdf_ima_features = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")
    
    title('Jointure entre les identifiants des images et les features (sdf_ima_features)',2)
    sdf_ima_features.printSchema()
    sdf_ima_features.limit(10).show()    

    sdf_ima_label_2 = sdf_ima_label.drop('prediction') 
    sdf_ima_lab_features = sdf_ima_label_2.join(sdf_ima_features, "image_id")  
    
    sdf_ima_cat_features = sdf_ima_lab_features.join(sdf_ima_cat, "IMA")    
    sdf_cat_features = sdf_ima_cat_features.drop('IMA')
    sdf_cat_features = sdf_ima_cat_features.drop('image_id')
    sdf_cat_features = sdf_ima_cat_features.drop('bag_of_words')    
    
    sdf_cat_features = sdf_cat_features.distinct()
    
    title("Jointure entre les catégories et les features (sdf_cat_features)",2)
    sdf_cat_features.printSchema()
    sdf_cat_features.limit(10).show()

    # Encodage de la variable catégorie
    indexer = StringIndexer(inputCol="cat", outputCol="label").fit(sdf_cat_features)
    sdf_lab_features = indexer.transform(sdf_cat_features)  
    sdf_lab_features = sdf_lab_features.select("ima","cat","label","prediction","features")
    
    title('Encodage de la variable catégories (sdf_lab_features)',2)
    sdf_lab_features.printSchema()
    sdf_lab_features.limit(10).show()
    
    # DataFrame Spark --> DataFrame Pandas
    df_lab_features = sdf_lab_features.toPandas() 
    df_lab_features['occur'] = pd.Series(1, range(df_lab_features.shape[0])) # On ajoute une variable contenant
                                                                             # uniquement la valeur 1
                                                           # afin de pouvoir réaliser une transformation pivot

    df_mlp_2 = pd.pivot_table(df_lab_features, values='occur',index=['label'], \
                              columns=['prediction'], aggfunc=np.sum)

    df_mlp_2.fillna(0, inplace=True) # On remplace les valeurs nulles par 0
    df_mlp_2 = round(100*(df_mlp_2.T / df_mlp_2.T.sum()).T) # On calcule les % de prédictions
    
    title('Catégories réelles (label) vs Prédictions (prediction)',2)
    
    print(df_mlp_2)


    
##############################
### LANCEMENT DU PROGRAMME ###
##############################


spark = SparkSession(sc)


title("> > > > TRAITEMENT DU JEU DE DONNES D'ENTRAINEMENT < < < <",1)  
t = time.time()

# Récupération des images
nb_cat, image_path, dataset_path = get_path('Training/')
time_calc('Récupération des images','',0,0)
t = time.time()
print('')

# Extraction des descripteurs des images
rdd_desc, list_ima, sdf_ima_cat, df_ima_cat = get_images_descriptors(dataset_path)
time_calc('Extraction des descripteurs des images','',0,0)
t = time.time()
print('')

# Clustering K-Means
nb_clusters, km_model = kmeans_train(rdd_desc, nb_cat)
time_calc('Clustering K-Means','',0,0)
t = time.time()
print('')
list_km_pred = kmeans_pred(km_model, rdd_desc)
time_calc('Prédiction K-Means','',0,0)
t = time.time()
print('')

# Création du bag of words
sdf_bow, sdf_ima_label, df_ima_label = bow_creation(list_ima, list_km_pred)
df_bow = bow_save(sdf_bow, list_ima, 'df_bow_train.csv')
bow_transform(df_bow, nb_clusters, 'train.csv', list_ima, df_ima_cat, df_ima_label)  
time_calc('Création du bag of words','',0,0)
t = time.time()
print('')

# Réduction de dimension
sdf_lab_features ,sdf_features, pca_dim = bow_reduce(sdf_bow, nb_clusters,
                                                     sdf_ima_cat, "train", sdf_ima_label)   
print('')
time_calc('Réduction de dimension','',0,0)
t = time.time()
print('')

if CLS:
    
    if LOCAL:
        # Classification MLP
        mlp_model = mlp_classification(sdf_lab_features, pca_dim, nb_cat)
    else:
        # Classification RF
        rf_model = rf_classification(sdf_lab_features)

    time_calc('Classification','',0,0)
    t = time.time()
    print('')


title("> > > > TRAITEMENT DU JEU DE DONNEES DE TEST < < < <",1)
t = time.time()

# Récupération des images
nb_cat, image_path, dataset_path = get_path('Test/')
time_calc('Récupération des images - Test','',0,0)
t = time.time()
print('')

# Extraction des descripteurs des images
rdd_desc, list_ima, sdf_ima_cat, df_ima_cat = get_images_descriptors(dataset_path)
time_calc('Extraction des descripteurs des images - Test','',0,0)
t = time.time()
print('')

#nb_clusters, km_model = kmeans_train(rdd_desc, nb_cat)
#time_calc('Test - Clustering K-Means','',1,0)
#t = time.time()
#print('')
# Prédictions K-Means
list_km_pred = kmeans_pred(km_model, rdd_desc)
time_calc('Prédiction K-Means - Test','',0,0)
t = time.time()
print('')

# Création du bag of words
sdf_bow, sdf_ima_label, df_ima_label = bow_creation(list_ima, list_km_pred)
df_bow = bow_save(sdf_bow, list_ima, 'df_bow_test.csv') 
bow_transform(df_bow, nb_clusters, 'test.csv', list_ima, df_ima_cat, df_ima_label)

time_calc('Création du bag of words - Test','',0,0)
t = time.time()

# Réduction de dimension
sdf_lab_features, sdf_features, pca_dim = bow_reduce(sdf_bow, nb_clusters,
                                                     sdf_ima_cat, "test", sdf_ima_label)    
print('')
time_calc('Réduction de dimension - Test','',0,0)
t = time.time()
print('')

if CLS:
    
    if LOCAL:
        # Prédictions MLP
        test_lab_pred = mlp_prediction(sdf_features, sdf_lab_features, mlp_model)
    else:
        # Prédictions RF
        test_lab_pred = rf_prediction(sdf_features, sdf_lab_features, rf_model)
    
    print('')
    time_calc('Prédiction - Test','',0,0)
    t = time.time()
    print('')
    
    # Evaluation
    cls_eval(sdf_ima_cat, test_lab_pred, sdf_bow, sdf_ima_label)
    print('')
    time_calc('Evaluation - Test','',0,0)
    t = time.time()

print('')    
time_calc('Fin des traitements','',1,1)

title("> > > > Traitements finalisés < < < <",1)


> > > > Import des librairies < < < < 


> > > > Définition des fonctions < < < < 


> > > > TRAITEMENT DU JEU DE DONNES D'ENTRAINEMENT < < < < 


Identification des chemins d'accès aux répertoires d'images 

Nombre d'images par catégorie (sous-répertoire):

    Catégorie  Nombre d'images
0       Corn               50
1  Raspberry               50
2     Orange               50 

Nombre total d'images: 150 

dataset_path = data/fruits_360_v3b/Training/

image_path = data/fruits_360_v3b/Training/Corn,data/fruits_360_v3b/Training/Raspberry,data/fruits_360_v3b/Training/Orange

Nombre de catégories de fruits: 3

2 premières catégories: ['data/fruits_360_v3b/Training/Corn', 'data/fruits_360_v3b/Training/Raspberry']
2 dernières catégories: ['data/fruits_360_v3b/Training/Raspberry', 'data/fruits_360_v3b/Training/Orange']

Durée de l'opération 'Récupération des images': 0.01 s


Calcul des descripteurs 


Chargement des images (rdd_images) 

MapPartitionsRDD[4] at javaToPython at NativeMethodA

+--------------------+
|            features|
+--------------------+
|[28.6589691872513...|
|[30.6704889986520...|
|[26.6684862365683...|
|[31.6841436746050...|
|[29.2510349188184...|
|[33.9143218745699...|
|[35.3151410157159...|
|[27.0750731114220...|
|[27.7020165980217...|
|[31.7533006032619...|
+--------------------+


Jointure entre les ids des images et les features (sdf_ima_features) 

root
 |-- image_id: long (nullable = true)
 |-- bag_of_words: vector (nullable = true)
 |-- features: vector (nullable = true)

+--------+--------------------+--------------------+
|image_id|        bag_of_words|            features|
+--------+--------------------+--------------------+
|      76|(30,[0,1,4,7,11,1...|[-2.1029243806689...|
|      60|(30,[0,1,4,7,8,14...|[-1.3391962404690...|
|      48|(30,[0,1,2,4,8,9,...|[0.77967717300582...|
|     144|(30,[0,1,2,3,4,5,...|[29.1377046696358...|
|      40|(30,[0,1,2,4,5,8,...|[1.31246907909725...|
|     110|(30,[0,1,2,3,4,5,...|[28.6589691872513...|


root
 |-- id: long (nullable = true)
 |-- prediction: long (nullable = true)

+---+----------+
| id|prediction|
+---+----------+
| 55|         3|
| 55|         3|
| 55|         3|
| 55|        25|
| 55|         7|
| 55|        19|
| 55|        11|
| 55|         0|
| 55|        27|
| 55|         1|
+---+----------+


Liste des clusters par image (Map + reduceByKey) 

Clusters par image (rdd_words) 
------------------------------

PythonRDD[702] at RDD at PythonRDD.scala:53

Nombre de partitions: 2
Dimension: 75

Liste de 'words' par image (sdf_worcds) 
----------------------------------------

root
 |-- image_id: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------+--------------------+
|image_id|               words|
+--------+--------------------+
|      54|[3, 3, 3, 25, 21,...|
|      56|[13, 3, 3, 25, 21...|
|      60|[19, 25, 7, 27, 3...|
|      66|[19, 25, 25, 7, 2...|
|      62|[19, 19, 25, 15, ...|
|      58|[19, 3

Test set accuracy (MLP) = 0.72

Durée de l'opération 'Prédiction - Test': 18.71 s


Evaluation 


Jointure entre les identifiants des images et les features (sdf_ima_features) 

root
 |-- image_id: long (nullable = true)
 |-- bag_of_words: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)

+--------+--------------------+--------------------+----------+
|image_id|        bag_of_words|            features|prediction|
+--------+--------------------+--------------------+----------+
|       6|(30,[0,1,3,4,5,6,...|[1.88671564762636...|       0.0|
|      10|(30,[0,1,3,6,7,9,...|[0.39603563099539...|       1.0|
|       5|(30,[0,1,2,3,4,7,...|[2.14251332966443...|       1.0|
|      30|(30,[0,1,2,3,6,9,...|[-0.6024618689648...|       1.0|
|      41|(30,[0,1,2,3,6,8,...|[0.67371931039150...|       1.0|
|      54|(30,[0,1,2,3,4,5,...|[30.0510914065737...|       2.0|
|      26|(30,[0,1,2,6,9,14...|[-1.5597208874028...|       1.0|
|      68|(