# 1. Configuration de Spark

In [1]:
AWSAccessKeyId='Ma clé ID secrète'
AWSSecretKey='Ma clé secrète'

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [4]:
from pyspark import SparkContext
s = SparkContext()

On va charger toutes les libraires/modules/fonctions nécessaires pour la reduction de dimension et le lancement de PySpark.

In [5]:
import os
from datetime import datetime

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.applications.mobilenet import preprocess_input
from tensorflow.keras.models import load_model

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import col, pandas_udf, lit

Lancement de la session PySpark et accès au bucket S3.

In [6]:
spark = SparkSession.builder.config("spark.python.worker.reuse", "False") \
                            .appName('dim_red') \
                            .getOrCreate()

spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWSAccessKeyId)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWSSecretKey)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-west-3.amazonaws.com")

# 2.  Lecture du S3 Bucket 

On va charger toutes les images du dossier Test du bucket S3.

In [7]:
# nom du S3 bucket

S3dir ='s3a://projet8-images'

In [8]:
df = spark.read.format('image').option("dropInvalid", True).load(S3dir + '/Test/*/*', recursive=True)

In [9]:
df.show(10)

+--------------------+
|               image|
+--------------------+
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
|{s3a://projet8-im...|
+--------------------+
only showing top 10 rows



# 3. Choix d'un modèle de Keras pour du transfert learning

Ici on fait le choix d'un mobileNetV2 car le modèle est assez léger (14MB)

In [10]:
from keras.applications import MobileNetV2
from keras.applications.mobilenet_v2 import preprocess_input

In [11]:
image_size = 96
img_shape = (image_size, image_size, 3)

mobilenet = MobileNetV2(input_shape=img_shape,
                    weights="imagenet")

On n'utilise ici pas tout le modèle, on ne va garder que le modèle jusqu'à l'avant dernière couche. La dernière couche étant une couche de classification avec 1000 classes, elle n'est pas apriori à notre problème ici.

In [12]:
from keras import Model
model = Model(inputs=mobilenet.input, outputs=mobilenet.layers[-2].output)

On sauvegarde le modèle pour le charger dans la fonction de réduction qui suit. 

In [13]:
model.save('models', save_format='tf')

INFO:tensorflow:Assets written to: models/assets


In [14]:
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 96, 96, 3)]  0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 48, 48, 32)   864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 48, 48, 32)   128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 48, 48, 32)   0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

# 4. Fonction de réduction de dimension

Pour limiter les coûts, ici nous nous limiterons à la stratégie suivante :
    - réaliser un prétraitement des images avec preprocess_input
    - appliquer l'algorithme de Keras en supprimant la dernière couche de classification 

Ainsi à chaque image, on associe un vecteur de 1280.

In [15]:
def reduct_dim(width, height, nChannels, data):
    # transformation des images pour les traiter avec le model
    images = []

    for i in range(height.shape[0]):        
        x = np.ndarray(
               shape=(height[i], width[i], nChannels[i]),
               dtype=np.uint8,
               buffer=data[i],
               strides=(width[i] * nChannels[i], nChannels[i], 1))
        images.append(preprocess_input(x))

    # redimensionnement des images
    images = np.array(tf.image.resize(images, [image_size, image_size]))
    
    # chargement du modèle
    model = load_model('models')
    
    # prédiction du modèle pour récupérer les features de chaque image
    features = model.predict(images).reshape(len(width), 1280)
    
    return pd.Series(list(features))

On utilise pandas udf pour obtenir une fonction compatible avec PySpark

In [16]:
reduct_dim_udf = pandas_udf(reduct_dim, returnType=ArrayType(DoubleType()))

# 6. Etape finale : écriture au format csv la réduction de dimension

Pour nommer nos dossiers de test dans S3 durant les différents tests on utilise la date à laquelle on a réalisé les tests.

In [17]:
import datetime
today = datetime.datetime.now().strftime('%Y_ %m_ %d_ %H_ %M_ %S_')

On va ensuite appliquer notre fonction de réduction de dimension.

In [19]:
df.withColumn("dim_red", reduct_dim_udf(col("image.width"), col("image.height"),
                                     col("image.nChannels"),
                                     col("image.data"))) \
    .withColumn("dim_red_string", lit(col("dim_red").cast("string"))) \
    .select("image.origin", 'dim_red_string') \
    .repartition(5).write.csv(S3dir + '/results' +today)

On retrouve les fichiers au format csv au lien : 
    s3://projet8-images/results2021_ 05_ 16_ 09_ 07_ 45_/