# Fonctions

In [41]:
def recuperer_valeurs_dataset_en_local(chemin_echantillon_dataset):
    valeurs = []
    for nom_dossier_images_fruit in listdir(chemin_echantillon_dataset):
        chemin_dossier_images_fruit = chemin_echantillon_dataset + nom_dossier_images_fruit + "/"
        for nom_image_fruit in listdir(chemin_dossier_images_fruit):
            chemin_image_fruit = chemin_dossier_images_fruit + nom_image_fruit
            valeurs.append([chemin_image_fruit, nom_dossier_images_fruit])
    return valeurs

def creer_dataset_initial_en_local(chemin_echantillon_dataset):
    colonnes = ["chemin_image", "label"]
    valeurs = recuperer_valeurs_dataset_en_local(chemin_echantillon_dataset)
    return spark.createDataFrame(valeurs, colonnes)

def pretraiter(image):
    """
    Retourne l'image dont on a appliqué des prétraitements.
    
    
    Paramètre :
        - image : image à prétraiter
    """
    image_pretraitee = cv.cvtColor(image, cv.COLOR_BGR2GRAY)
    image_pretraitee = cv.equalizeHist(image_pretraitee)
    image_pretraitee = cv.GaussianBlur(image_pretraitee, (5,5), 3)
    image_pretraitee = cv.resize(image_pretraitee, dsize=(100,100))
    return image_pretraitee

def extraire_descripteurs(nom_fichier_image):
    """
    Retourne les descripteurs associés aux features d'une image à partir du nom de son fichier
    
    
    Paramètre :
        - nom_fichier_image : nom du fichier contenant l'image dont on veut tirer les descripteurs
    """
    image = cv.imread(nom_fichier_image)
    image_pretraitee = pretraiter(image)
    sift = cv.SIFT_create()
    keypoints, descripteurs = sift.detectAndCompute(image_pretraitee, None)
    
    # Conversion des descripteurs en list pour que pyspark l'accepte
    descripteurs = descripteurs.tolist()
    
    return descripteurs

# Chargement des images et extraction des descripteurs

In [42]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, ArrayType, FloatType
from pyspark.sql.functions import udf
from os import listdir
import cv2 as cv

# Initialisation du programme
#appName = "[OC/P8] - Déployer un modèle sur le cloud"
#master = "local"
#conf = SparkConf().setAppName(appName).setMaster(master)
#sc = SparkContext(conf=conf).getOrCreate()
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
chemin_echantillon_dataset = "data/echantillon_dataset/"

df_images_fruits = creer_dataset_initial_en_local(chemin_echantillon_dataset)
udf_extraire_descripteurs = udf(extraire_descripteurs, ArrayType(ArrayType(FloatType())))

df_images_fruits = df_images_fruits.withColumn("descripteurs", udf_extraire_descripteurs("chemin_image"))
df_images_fruits.show(truncate=True)

+--------------------+------------------+--------------------+
|        chemin_image|             label|        descripteurs|
+--------------------+------------------+--------------------+
|data/echantillon_...|apple_granny_smith|[[1.0, 0.0, 0.0, ...|
|data/echantillon_...|apple_granny_smith|[[42.0, 30.0, 0.0...|
|data/echantillon_...|           avocado|[[54.0, 26.0, 0.0...|
|data/echantillon_...|           avocado|[[98.0, 84.0, 0.0...|
|data/echantillon_...|            banana|[[0.0, 0.0, 0.0, ...|
|data/echantillon_...|            banana|[[150.0, 22.0, 0....|
+--------------------+------------------+--------------------+



# Création du bag-of-features

## Création des visual words

In [84]:
from pyspark.sql.functions import explode

df_tous_descripteurs = df_images_fruits.select(explode(df_images_fruits.descripteurs).alias("descripteurs"))
df_tous_descripteurs.count()

86

In [86]:
from pyspark.ml.linalg import Vectors, VectorUDT

udf_convertir_en_vecteur = udf(lambda x: Vectors.dense(x), VectorUDT())
df_tous_descripteurs = df_tous_descripteurs.withColumn('descripteurs_as_vector', 
                                                       udf_convertir_en_vecteur(df_tous_descripteurs.descripteurs))

In [87]:
df_tous_descripteurs.show()

+--------------------+----------------------+
|        descripteurs|descripteurs_as_vector|
+--------------------+----------------------+
|[1.0, 0.0, 0.0, 0...|  [1.0,0.0,0.0,0.0,...|
|[48.0, 67.0, 0.0,...|  [48.0,67.0,0.0,0....|
|[38.0, 53.0, 0.0,...|  [38.0,53.0,0.0,0....|
|[105.0, 114.0, 0....|  [105.0,114.0,0.0,...|
|[2.0, 1.0, 0.0, 0...|  [2.0,1.0,0.0,0.0,...|
|[1.0, 10.0, 1.0, ...|  [1.0,10.0,1.0,0.0...|
|[13.0, 2.0, 0.0, ...|  [13.0,2.0,0.0,2.0...|
|[8.0, 2.0, 0.0, 1...|  [8.0,2.0,0.0,1.0,...|
|[0.0, 4.0, 2.0, 0...|  [0.0,4.0,2.0,0.0,...|
|[3.0, 19.0, 35.0,...|  [3.0,19.0,35.0,14...|
|[42.0, 30.0, 0.0,...|  [42.0,30.0,0.0,0....|
|[2.0, 8.0, 0.0, 0...|  [2.0,8.0,0.0,0.0,...|
|[23.0, 7.0, 0.0, ...|  [23.0,7.0,0.0,0.0...|
|[7.0, 2.0, 0.0, 1...|  [7.0,2.0,0.0,1.0,...|
|[3.0, 19.0, 34.0,...|  [3.0,19.0,34.0,15...|
|[54.0, 26.0, 0.0,...|  [54.0,26.0,0.0,0....|
|[39.0, 31.0, 0.0,...|  [39.0,31.0,0.0,0....|
|[95.0, 11.0, 0.0,...|  [95.0,11.0,0.0,0....|
|[2.0, 1.0, 84.0, ...|  [2.0,1.0,8

In [88]:
df_tous_descripteurs.printSchema()

root
 |-- descripteurs: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- descripteurs_as_vector: vector (nullable = true)



In [89]:
import math

# Définition du nombre de clusters
nb_total_descripteurs = df_tous_descripteurs.count()
nb_clusters = int(round(math.sqrt(nb_total_descripteurs), 0))

In [90]:
nb_clusters

9

In [91]:
from pyspark.ml.clustering import KMeans

# Clustering
kmeans = KMeans(featuresCol="descripteurs_as_vector", k=nb_clusters, seed=0)
kmeans.fit(df_tous_descripteurs)

KMeansModel: uid=KMeans_c2e2ad4e0c66, k=9, distanceMeasure=euclidean, numFeatures=128

In [None]:
from pyspark.ml.feature import VectorAssembler

assemble=VectorAssembler(inputCols=['descripteurs'], outputCol='descripteurs_as_vector')
assembled_data=assemble.transform(data_customer)

In [None]:
# transform our preprocess data to vector for PCA model via udf fonction to transform array into vector
to_vector = F.udf(lambda x: Vectors.dense(x), VectorUDT())

# creating vector column
df_images = df_images.withColumn('preprocess_data_vector', to_vector(df_images.preprocess_data))

In [76]:
from pyspark.sql import Row
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
eDF.show()
#eDF.select(explode(eDF.intlist).alias("anInt")).collect()

+---+---------+--------+
|  a|  intlist|mapfield|
+---+---------+--------+
|  1|[1, 2, 3]|{a -> b}|
+---+---------+--------+



In [81]:

eDF.select(explode(eDF.intlist).alias("anInt")).show()

+-----+
|anInt|
+-----+
|    1|
|    2|
|    3|
+-----+



In [69]:
image_fruit = df_images_fruits.first()

In [73]:
image_fruit.descripteurs

[[1.0,
  0.0,
  0.0,
  0.0,
  0.0,
  24.0,
  106.0,
  3.0,
  165.0,
  64.0,
  0.0,
  0.0,
  0.0,
  16.0,
  39.0,
  17.0,
  165.0,
  144.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  1.0,
  34.0,
  26.0,
  1.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  11.0,
  65.0,
  58.0,
  0.0,
  148.0,
  12.0,
  0.0,
  0.0,
  30.0,
  50.0,
  13.0,
  8.0,
  165.0,
  93.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  5.0,
  29.0,
  21.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  9.0,
  75.0,
  35.0,
  8.0,
  0.0,
  122.0,
  1.0,
  0.0,
  10.0,
  77.0,
  11.0,
  0.0,
  33.0,
  165.0,
  23.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  40.0,
  7.0,
  4.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  1.0,
  0.0,
  0.0,
  38.0,
  83.0,
  2.0,
  0.0,
  1.0,
  103.0,
  0.0,
  0.0,
  15.0,
  31.0,
  0.0,
  0.0,
  80.0,
  94.0,
  2.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  48.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0,
  0.0],
 [48.0,
  67.0,
  0.0,
  0.0,
  14.0,
  36.0,
  0.0,


In [66]:
rdd_images_fruits = df_images_fruits.rdd

In [68]:
rdd_images_fruits.collect()

[Row(chemin_image='data/echantillon_dataset/apple_granny_smith/0_100.jpg', label='apple_granny_smith', descripteurs=[[1.0, 0.0, 0.0, 0.0, 0.0, 24.0, 106.0, 3.0, 165.0, 64.0, 0.0, 0.0, 0.0, 16.0, 39.0, 17.0, 165.0, 144.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 34.0, 26.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 11.0, 65.0, 58.0, 0.0, 148.0, 12.0, 0.0, 0.0, 30.0, 50.0, 13.0, 8.0, 165.0, 93.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 29.0, 21.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 9.0, 75.0, 35.0, 8.0, 0.0, 122.0, 1.0, 0.0, 10.0, 77.0, 11.0, 0.0, 33.0, 165.0, 23.0, 0.0, 0.0, 0.0, 0.0, 0.0, 40.0, 7.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 38.0, 83.0, 2.0, 0.0, 1.0, 103.0, 0.0, 0.0, 15.0, 31.0, 0.0, 0.0, 80.0, 94.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 48.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], [48.0, 67.0, 0.0, 0.0, 14.0, 36.0, 0.0, 0.0, 165.0, 88.0, 0.0, 0.0, 1.0, 0.0, 0.0, 3.0, 32.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 44.0, 19.0, 0.0, 15.0,

In [74]:
def isoler_descripteurs(image_fruit):
    #image_fruit_descripteurs = image_fruit.descripteurs
    
    return Row(descripteurs=image_fruit.descripteurs)

In [59]:
df_images_fruits["descripteurs"].getItem(0)
test.show()

TypeError: 'Column' object is not callable

In [75]:
rdd_images_fruits = df_images_fruits.rdd
rdd_tous_descripteurs = rdd_images_fruits.map(isoler_descripteurs)
df_tous_descripteurs = spark.createDataFrame(rdd_tous_descripteurs)
df_tous_descripteurs.show()

+--------------------+
|        descripteurs|
+--------------------+
|[[1.0, 0.0, 0.0, ...|
|[[42.0, 30.0, 0.0...|
|[[54.0, 26.0, 0.0...|
|[[98.0, 84.0, 0.0...|
|[[0.0, 0.0, 0.0, ...|
|[[150.0, 22.0, 0....|
+--------------------+



In [50]:
des = df_images_fruits.collect()[0][2]
len(des[1])

128

In [51]:
des = df_images_fruits.collect()[1][2]
len(des[1])

128

In [52]:
import numpy as np

liste_descripteurs = df_images_fruits.collect()[0][2]
for i in range(1, df_images_fruits.count()):
    liste_descripteurs = np.vstack((liste_descripteurs, df_images_fruits.collect()[i][2]))

In [55]:
liste_descripteurs.shape

(86, 128)

In [None]:
def 

In [31]:
des = df_images_fruits.collect()[1][2]
len(des)

640

In [30]:
len(des)

1280

In [34]:
from pyspark.ml.linalg import Vectors
data = [(Vectors.dense([0.0, 0.0]), 2.0), (Vectors.dense([1.0, 1.0]), 2.0),
        (Vectors.dense([9.0, 8.0]), 2.0), (Vectors.dense([8.0, 9.0]), 2.0)]
df = spark.createDataFrame(data, ["features", "weighCol"])

In [36]:
df.show(truncate=True)

+---------+--------+
| features|weighCol|
+---------+--------+
|[0.0,0.0]|     2.0|
|[1.0,1.0]|     2.0|
|[9.0,8.0]|     2.0|
|[8.0,9.0]|     2.0|
+---------+--------+

