This Notebook steps :
    
   * Create a Spark session
    
   * Import training images to proceed, as binary, in a Spark DataFrame
    
   * Labeling images, by fruit name, extracted from images path 
    
   * Enhance image by tweeking color, sharpness, contrast, brightness
    
   * Extract 2048 features array by tranfert learning, using Keras Resnet50 CNN
    
   * Apply PCA reduction dimension previously fitted on Training
    
   * Stores path, label and PCA reducted feature array, partitionned by label, in parquet format file on S3 

In [1]:
print('Welcome to my EMR Notebook!')

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1631354174370_0011,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Welcome to my EMR Notebook!

# Install dependencies

In [2]:
sc.install_pypi_package('pandas==1.2.5')

sc.install_pypi_package('pillow')

sc.install_pypi_package('pyarrow==2')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.2.5
  Using cached https://files.pythonhosted.org/packages/e6/0a/90da8840e044c329a0271fb0244ff40a68a2615bc360c296a3dc5e326ab6/pandas-1.2.5-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl
Collecting python-dateutil>=2.7.3 (from pandas==1.2.5)
  Using cached https://files.pythonhosted.org/packages/36/7a/87837f39d0296e723bb9b62bbb257d0355c7f6128853c78955f57342a56d/python_dateutil-2.8.2-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.2.5 python-dateutil-2.8.2

Collecting pillow
  Using cached https://files.pythonhosted.org/packages/d6/28/827b9cac687e086110eb133ab7e4f36ab4b35a1e1654c6329840ce045354/Pillow-8.3.2-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl
Installing collected packages: pillow
Successfully installed pillow-8.3.2

Collecting pyarrow==2
  Using cached https://files.pythonhosted.org/packages/c8/58/d07e7ee8b0cffe509f9e5a3742e09636a4a58b2113d193166615b934846f/pyarrow-2.0.0-cp37-cp37m-m

# Imports

In [3]:
import pandas as pd

from PIL import Image

from PIL import ImageEnhance

import numpy as np

import io

import os

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, pandas_udf, udf, PandasUDFType, size

from pyspark import SparkContext, SparkConf

import tensorflow as tf

from tensorflow.keras.models import Model

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input

from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.ml import PipelineModel

from pyspark.ml.functions import vector_to_array

from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.ml.feature import StandardScaler, PCA

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Constants

LOAD_PATH = 's3a://fruits-images-to-proceed/Test/'

SAVE_PATH = 's3a://fruits-images-proceded/Test_featured-reducted.parquet'

MODEL_PATH = 's3a://pca-reduction-model/PCA reduction.model'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Enable pyArrow

In [5]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Load images from storage

In [6]:
images =(spark
         .read
         .format('binaryFile')
         .option('pathGlobFilter', '*.jpg')
         .option('recursiveFileLookup', 'true')
         .load(LOAD_PATH)
        )

images.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)

In [7]:
# Total number of images
totalMunber = images.count()
print('Total number of images in train set {}'.format(totalMunber))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total number of images in train set 22688

## Retrieve labels from image path

In [8]:
# Offset of starting image name
path_offset = len(LOAD_PATH)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
#Get only fruit name from path
from pyspark.sql.functions import udf
from pyspark.sql import types 

col_label = udf(lambda s : extract_label(s), types.StringType())

def extract_label(s):
    last = s[path_offset :]
    return last[:last.rfind('/')]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
images = images.withColumn('label',col_label(images.path))
images.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

In [11]:
#Get only fruit name from path
images.select('label').show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|label     |
+----------+
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
|Watermelon|
+----------+
only showing top 20 rows

In [12]:
# By label count
print('By label images count :')
images.groupBy('label').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

By label images count :
+-----------------+-----+
|            label|count|
+-----------------+-----+
|     Pear Forelle|  234|
|     Cantaloupe 1|  164|
|           Orange|  160|
|       Clementine|  166|
|              Fig|  234|
|      Onion White|  146|
|    Pepper Orange|  234|
| Strawberry Wedge|  246|
|      Cauliflower|  234|
|         Beetroot|  150|
|     Potato White|  150|
|  Grapefruit Pink|  166|
|Tomato Cherry Red|  164|
|    Grape White 4|  158|
|     Cantaloupe 2|  164|
|        Mango Red|  142|
|       Nut Forest|  218|
|    Passion Fruit|  166|
|       Grape Blue|  328|
| Onion Red Peeled|  155|
+-----------------+-----+
only showing top 20 rows

# Images enhancement

In [13]:
# Enhance image
def enhance(img,
            color = 1.25,
            sharpness = 4.5,
            contrast = 1.25,
            brigthness= 1.5):
    colorEnhancer = ImageEnhance.Color(img)
    img = colorEnhancer.enhance(color)
    
    sharpnessEnhancer = ImageEnhance.Sharpness(img)
    sharpnessEnhancer.enhance(sharpness)
    
    contrastEnhancer = ImageEnhance.Contrast(img)
    contrastEnhancer.enhance(contrast)
    
    brigthnessEnhancer = ImageEnhance.Brightness(img)
    brigthnessEnhancer.enhance(brigthness)
    
    return img

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Transfert learning (Resnet50)

In [14]:
def model_fn():
    '''
    Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
    '''
    resnet_full = ResNet50()

    model = Model(inputs = resnet_full.inputs,
                  outputs = resnet_full.layers[-2].output)
   
    return model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
def preprocess(content):
    '''
    Preprocesses raw image bytes for prediction.
    '''
    # load raw image from dataframe and resize it to ResNet specifications
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    # Enhance image
    img = enhance(img)
    # image to Tensor array
    arr = img_to_array(img)
    # return ResNet50 preprocessed image
    return preprocess_input(arr)


def featurize_series(model, content_series):
    '''
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    '''
    #   input = np.stack(content_series.map(preprocess))
    input = tf.convert_to_tensor(np.stack(content_series.map(preprocess)), dtype=tf.float32)
    # features from image
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    # return features vector
    return pd.Series(output)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
from typing import Iterator

@pandas_udf('array<float>')
def featurize_udf(content_series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
  '''
  This method is a Scalar Iterator pandas UDF wrapping our featurization function.
  The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
  :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
  ''' 
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Apply featurization to the DataFrame of images

In [17]:
# Avoiding Out Of Memory (OOM) errors by reducing the Arrow batch size
spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '512')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Transfert learning 
images = images.withColumn('features', featurize_udf(images.content))
images.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

# Standadization and PCA reduction

In [19]:
# UDF array -> vector
list_to_vector_udf = udf(lambda vs: Vectors.dense([float(i) for i in vs]),
                         VectorUDT())
# Create new column with vectors
images = images.withColumn('Vect_features', list_to_vector_udf(images.features))
images.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- Vect_features: vector (nullable = true)

In [20]:
reduction = PipelineModel.load(MODEL_PATH)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Apply reduction
images = reduction.transform(images)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Vector to array

In [22]:
# Transform denseVector to array

images = images.withColumn('feat_array', vector_to_array('PCA_features'))
images.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- Vect_features: vector (nullable = true)
 |-- Scaled_features: vector (nullable = true)
 |-- PCA_features: vector (nullable = true)
 |-- feat_array: array (nullable = false)
 |    |-- element: double (containsNull = false)

# Save to storage

In [23]:
(images
 .select('path','label','feat_array')
 .write
 .partitionBy('label')
 .mode('overwrite')
 .parquet(SAVE_PATH)
) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# End Spark session

In [24]:
spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…