In [1]:
# L'exécution de cette cellule démarre l'application Spark

In [2]:
%%info

UsageError: Cell magic `%%info` not found.


# Importation des librairies utiles

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, col, pandas_udf, PandasUDFType, element_at, split

# Chargement des données

In [None]:
PATH = 's3://p8-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [None]:
images.show(5)

In [None]:
#Conservation des labels
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

# Modélisation

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

# Extraction des features

In [3]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")

In [None]:
features_df.count()

In [None]:
features_df.show()

# Réduction de dimensions

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

def preprocess(dataframe):
  '''
     opérations à effectuer :
     - features sont de type array, il faudra la convertir en vecteur dense
     - standardisation
    
  '''
  
  # conversion des données images en vecteur dense
  transform_vecteur_dense = udf(lambda r: Vectors.dense(r), VectorUDT())
  dataframe = dataframe.withColumn('features_vectors', transform_vecteur_dense('features'))
  # Standardisation 
  scaler_std = StandardScaler(inputCol="features_vectors", outputCol="features_scaled", withStd=True, withMean=True)
  model_std = scaler_std.fit(dataframe)
  # Mise à l'échelle
  dataframe = model_std.transform(dataframe)
  
  return dataframe

In [None]:
# Recherche du nombre de composante expliquant 95% de la variance
def nb_composante(dataframe, nb_comp=100):
    pca = PCA(k = nb_comp,
              inputCol="features_scaled", 
              outputCol="features_pca")
 
    model_pca = pca.fit(dataframe)
    variance = model_pca.explainedVariance
 
    for i in range(100):
        a = variance.cumsum()[i]
        if a >= 0.95:
            print("{} composantes principales expliquent au moins 95% de la variance totale".format(i))
    return i

In [None]:
# Pré-processing (vecteur dense, standardisation)
df_preprocess = preprocess(features_df)

In [None]:
# Nombre de composante expliquant 95% de la variance
nombre_cp = nb_composante(df_preprocess)

In [None]:
# Réduction de dimension PCA
# Entrainement de l'algorithme
pca = PCA(k=nombre_cp, inputCol='features_scaled', outputCol='vectors_pca')
action_pca = pca.fit(df_preprocess)

In [None]:
# Transformation des images sur les k premières composantes
df_final= action_pca.transform(df_preprocess)

In [None]:
df_final.show()

In [None]:
print(PATH_Result)

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

# Validation des résultats

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [None]:
df.head()

In [None]:
df.loc[0,'features'].shape

In [None]:
df.shape