# Préambule

Ce notebook a été extrait de databricks. Il regroupe le code permettant le traitement distribué des données : de l'import des images depuis Google Cloud Storage jusqu'à l'enregistrement des features et prédictions.

# Librairies


In [0]:
# Bases
import pandas as pd
from PIL import Image
import numpy as np
import io

# Tensorflow
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.models import Model
from tensorflow.keras import Input

# Pyspark SQL
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, udf
from pyspark.sql.types import StringType

# Pyspark ML
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.ml.feature import StandardScaler, PCA, StringIndexer
from pyspark.ml.functions import vector_to_array
from pyspark.ml.classification import RandomForestClassifier

# Configs

In [0]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

INPUT_FILE_PATH = "/FileStore/tables/fruits-360-original-size/"
N_COMPONENTS = 50

# Import data

In [0]:
# Read images data with a binary format (images are binary)
images = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").option("recursiveFileLookup", "true").load(INPUT_FILE_PATH)
print(f"Number of images: {images.count()}")

# Import pretrain model

VGG-16 est un réseau de neurone de convolution à 16 couches. Il a été pré-entrainé sur plus d'un million d'images du dataset ImageNet. Le modèle pré-entrainé classifie 1000 catégories d'image (objets et animaux).

In [None]:
pretrain_model = VGG16(input_tensor=Input(shape=(224, 224, 3)))
model = Model(inputs=pretrain_model.inputs, outputs=pretrain_model.layers[-2].output)

# Functions

In [0]:
@udf(StringType())
def get_label(x):
    return x.split('/')[-2].split('_')[0]

def model_fn():
  """
  Returns a VGG16 model with top layer removed and broadcasted pretrained weights.
  """
  pretrain_model = VGG16(input_tensor=Input(shape=(224, 224, 3)))

  # remove two lasts layers to return 4096 features instead of classification predictions
  model = Model(inputs=pretrain_model.inputs, outputs=pretrain_model.layers[-2].output)
  return model

def preprocess(content):
  """
  Preprocesses raw image bytes for prediction.
  """
  # open image (binary format) and resize it (224 on 224 pixels) 
  img = Image.open(io.BytesIO(content)).resize([224, 224])
  # convert image to an numpy array
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  """
  Featurize a pd.Series of raw images using the input model.
  :return: a pd.Series of image features
  """
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  output = [p.flatten() for p in preds]
  return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  '''
  This method is a Scalar Iterator pandas UDF wrapping our featurization function.
  The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
  Parameters
  ----------
      content_series_iter: 
        This argument is an iterator over batches of data, where each batch is a pandas Series of image data.
  '''
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

# Feature extraction

In [0]:
# partition dataset in 16 parts
features_df = images.repartition(16).select(col("path"), featurize_udf("content").alias("features"))

features = np.array(features_df.select('features').limit(1).collect()[0])
print(f"Number of extract features per image: {features.shape[1]}")

features_df.show(5)

# Retrieve labels and encode them

In [0]:
extract_label = udf(lambda x: x.split('/')[-2].split('_')[0], StringType())
features_df = features_df.withColumn('label', extract_label(features_df['path']))

indexer = StringIndexer(inputCol="label", outputCol="labelIndex") 
features_df = indexer.fit(features_df).transform(features_df) 

features_df.show(5)

# Dimension reduction

In [0]:
# Format features to Spark vector format
array_to_vector = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.withColumn('vectors', array_to_vector('features'))

# Scaling
standardizer = StandardScaler(inputCol="vectors", outputCol="scaled_vectors", withStd=True, withMean=True)
features_df = standardizer.fit(features_df).transform(features_df)

# PCA
pca = PCA(k=N_COMPONENTS, inputCol='scaled_vectors', outputCol='pca_vectors')
features_df = pca.fit(features_df).transform(features_df)

# Save features

# Classification

## Training

In [0]:
rf = RandomForestClassifier(featuresCol = 'pca_vectors', labelCol = 'labelIndex')
model = rf.fit(features_df)

## Predict

In [0]:
predictions_df = model.transform(features_df)
predictions_df = predictions_df.select(col('path'), col('probability'), col('prediction'))

predictions_df.show(5)

# Saving data

In [None]:
features_df = features_df.withColumn('pca_features', vector_to_array('pca_vectors'))
features_df = features_df.select(col('path'), col('label'), col('features'), col('pca_features'))

features_df.show(5)

In [0]:
predictions_df.write.mode("overwrite").parquet("dbfs:/ml/tmp/fruits-360-original-size-features/features")
features_df.write.mode("overwrite").parquet("dbfs:/ml/tmp/fruits-360-original-size-features/predictions")
model.save("dbfs:/ml/tmp/fruits-360-original-size-features/model")