In [3]:
# L'exécution de cette cellule démarre l'application Spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Import des librairies et fonctions

In [4]:
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql.types import StructType, StructField, FloatType
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.ml.functions import vector_to_array
import pyarrow.parquet as pq
import tensorflow as tf
from PIL import Image
import pandas as pd, numpy as np
import io, os, time
import boto3

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, **args):
    s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)

# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, s3=None, verbose=False, **args):
    s3_client = boto3.client('s3')
    s3 = boto3.resource('s3')
    s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath) if item.key.endswith('.parquet')]
    if not s3_keys:
        print('No parquet found in', bucket, filepath)
    elif verbose:
        print('Load parquets:')
        for p in s3_keys: 
            print(p)
    dfs = [pd_read_s3_parquet(key, bucket=bucket, **args) for key in s3_keys]
    return pd.concat(dfs, ignore_index=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1676211120774_0001,pyspark,idle,Link,Link,✔


# Introduction

Ce notebook implémente le préprocessing des images du dataset fruit-360. On souhaite exposer les avantages d'utiliser le calcul distribué pour paraléliser ses différentes étapes. Pour cela nous utiliserons le framework Spark de calcul distribué avec lequel nous communiquerons via l'API pyspark. 
Le préprocessing comprend trois étapes :
- Le redimmensionnement des images au format 240 x 240 px
- L'extraction de features à l'aide du modèle MobileNetV2
- La réduction de dimension à l'aide d'une ACP

# Définition des chemins pour les images sources et les résultats

Nous accédons directement à nos **images sur S3** comme si elles étaient **stockées localement**. On a chargé le dataset complet dans le bucket S3 que l'on a rangé dans le dossier full_dataset.
Voici l'arborescence du serveur S3 :

project-8-data <br>
│   bootstrap-emr-sh   
│<br>
└───full_dataset (dossier contenant les images)<br>
│   │<br>
│   └───Apple_Braeburn<br>
│   │   │   0_100.jpg<br>
│   │   │   1_100.jpg<br>
│   │   │   ...<br>
│   │<br>
│   └───Apple_Crimson_Snow<br>
│   │   │   0_100.jpg<br>
│   │   │   1_100.jpg<br>
│   │   │   ...<br>
│   │<br>
│   ...<br>
│   │<br>
│   └───Watermelon<br>
│       │   0_100.jpg<br>
│       │   1_100.jpg<br>
│       │   ...<br>
│<br>
└───results (dossier contenant le résultats du préprocessing)<br>
│<br>
└───jupyter (dossier contenant les notebooks)<br>

Pour cette démonstration, on ne chargera qu'une partie du dataset complet (de 3 à 35 classes d'imagesce qui correspond à 600 à 23000 images) pour rester dans des temps de calculs raisonnables.

In [7]:
PATH = 's3://project-8-data'
folders = boto3.client('s3').list_objects(Bucket='project-8-data', Delimiter='/', Prefix='full_dataset/')
MULTIPLE_PATH_DATA = [PATH + "/" + common_prefix.get('Prefix') for common_prefix in folders.get('CommonPrefixes')]
PATH_Result = PATH+'/Results'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On commence par créer une session Spark en instanciant l'objet python SparkSession. Cet objet va permettre de communiquer avec le cluster manager pour la répartition des données sur les différents noeuds du cluster et l'allocation des ressources (cpu, ram).

In [8]:
# Création de la session Spark
spark = (SparkSession \
             .builder \
             .appName('P8') \
             .config("spark.sql.parquet.writeLegacyFormat", 'true') \
             .getOrCreate())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On importe ensuites les images. l'entier n correspond aux nombres de classe de fruit que l'on veut importer. (il ya environ 600 images par classe de fruits) 

In [9]:
#Importation des images (0, 3, 15)
n = 3
images = spark.read \
            .format("binaryFile") \
            .option("pathGlobFilter", "*.jpg") \
            .option("recursiveFileLookup", "true") \
            .load(MULTIPLE_PATH_DATA[:n]) \

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Explication de la distribution 

L’extraction de feature à l’aide d’un modèle tensorflow nécessite l’utilisation des poids de ce modèle. Donc si on souhaite paralléliser cette tâche sur plusieurs nœuds, chaque nœuds doit avoir accès à ces poids.
→ C’est là que le broadcasting entre en jeu. 

In [12]:
# Broadcasting des poids du modèle sur les différents noeuds du cluster

model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Grâce au broadcasting des poids du modèle on peut maintenant définir une « user-defined-fonction » grâce au décorateur @pandas_udf
Cette fonction va permettre d’appliquer l’extraction de features à toutes les images du dataframe de façon distribuée.

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [15]:
# Application de l'udf d'extraction des features à notre dataframe distribué
features_df = images.repartition(24).select(col("path"), col("label"), featurize_udf("content").alias("features"))
features_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|s3://project-8-da...|Apple_Golden_1|[0.0, 0.027829845...|
|s3://project-8-da...|Apple_Golden_1|[0.0, 0.12024237,...|
|s3://project-8-da...|Apple_Golden_1|[0.0, 0.14426453,...|
|s3://project-8-da...|Apple_Golden_1|[0.35117388, 0.06...|
|s3://project-8-da...|Apple_Golden_1|[0.0036005825, 0....|
+--------------------+--------------+--------------------+
only showing top 5 rows

La méthode implémentant l'analyse en composantes principales appartient à la classe RowMatrix. nous devant donc convertir la colonne features pour pouvoir lui appliquer cette méthode

In [16]:
# COnversion de la colonne features en RowMatrix
rdd = features_df.select(col("features")).rdd

rdd_vectorized = rdd.map(lambda x: Vectors.dense(x))

matrix = RowMatrix(rdd_vectorized)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On peut maintenant lui appliquer l'ACP

In [17]:
# Application de l'ACP à nos features
n_comp = 4

pc = matrix.computePrincipalComponents(n_comp)

projected = matrix.multiply(pc)

collected = projected.rows.collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [82]:
#Création du schéma pour le dataframe
struct_fields = [StructField(str(i), FloatType(), True) for i in range(1, n_comp+1)]
schema = StructType(struct_fields)

#Formatage des donnée pour la création du dataframe
data = []
for l in collected:
    data.append((float(l[0]),float(l[1]),float(l[2]),float(l[3])))

#Création du dataframe
df = spark.createDataFrame(data, schema)
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+---------+----------+
|        1|         2|        3|         4|
+---------+----------+---------+----------+
|6.1351495| -6.020519|4.0145116|-1.9692829|
| 4.928417| 4.6934485|2.3818104|-4.5944924|
|6.9458036| -4.923555|6.6738276| -4.299764|
| 6.596392|-5.4613843|5.9482656| -5.157067|
| 6.412099|-2.3701758|2.6479127| -4.785862|
+---------+----------+---------+----------+
only showing top 5 rows

Il ne reste pluq qu'à sauvegarder les résultats

In [81]:
df.write.csv(PATH_Result, header = False, mode = "overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…