# I) Initialisation of the environment

## 1) Libraries

In [1]:
###################################
########## Libraries ##############
###################################
import pandas as pd
from PIL import Image
import numpy as np
import io
import os


# The two lines below inform tensorflows that No Cuda cores are present in this machine
# it prevents tons of warning messages
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
os.environ['TF_CPP_MIN_LOG_LEVEL']="2"

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

from pyspark.ml.functions import array_to_vector

from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA, StandardScaler


## 2) Initialisation of Spark session

In [2]:
##############################################
##### Initialisation of Spark session ########
##############################################

spark = (SparkSession
         .builder
         .appName('P8')
         .master('local')
         .config("spark.sql.parquet.writeLegacyFormat", 'true')
         .getOrCreate()
         )

sc = spark.sparkContext
sc.setLogLevel("WARN")
spark


23/02/20 13:54:18 WARN Utils: Your hostname, clement-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/02/20 13:54:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/20 13:54:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 3) Establishment of the data path

In [3]:
###################################
##### PATH ########################
###################################


PATH_Data = "/home/clement/Documents/P8_data/fruits/fruits-360_dataset/fruits-360/Training"
PATH_Result = "/home/clement/Documents/P8_data/test_local/Results"
PATH_Pipeline = "/home/clement/Documents/P8_data/test_local/pipeline_trained"

print('\nPATH_Data:   ' +PATH_Data+
'\nPATH_Result: '+PATH_Result+
'\nPATH_Pipeline:   ' +PATH_Pipeline)



PATH_Data:   /home/clement/Documents/P8_data/fruits/fruits-360_dataset/fruits-360/Training
PATH_Result: /home/clement/Documents/P8_data/test_local/Results
PATH_Pipeline:   /home/clement/Documents/P8_data/test_local/pipeline_trained


## 4) Initialisation of the model

In [4]:

###########################################
########## Initialisation of the model ####
###########################################
model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
for layer in model.layers:
    layer.trainable = False
new_model = Model(inputs=model.input,
                    outputs=model.layers[-2].output)
brodcast_weights = sc.broadcast(new_model.get_weights())
new_model.set_weights(brodcast_weights.value)


## 5) Functions to load, preprocess and generate feature from images

In [5]:

###################################
########## Functions ##############
###################################


def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                      outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)




# II) Feature extraction and PCA

## 1) Loading image, preprocessing and feature extraction

In [6]:
###################################
##### Loading images ##############
###################################
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)


images = images.withColumn('label', element_at(
    split(images['path'], '/'), -2)) 
print(images.printSchema())
print(images.select('path', 'label').show(5, False))


features_df = images.repartition(20).select(col("path"), col(
    "label"), featurize_udf("content").alias("features")).withColumn("features", array_to_vector("features")).cache()

# Array_to_vector is necessary before scaling and PCA
# Optimisation : Caching data is necessary to prevent computing the data 2 times (in the scaling + in the PCA)



print(features_df.printSchema())



                                                                                

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------------------------------------------------------------------------+--------------+
|path                                                                                                         |label         |
+-------------------------------------------------------------------------------------------------------------+--------------+
|file:/home/clement/Documents/P8_data/fruits/fruits-360_dataset/fruits-360/Training/Raspberry/176_100.jpg     |Raspberry     |
|file:/home/clement/Documents/P8_data/fruits/fruits-360_dataset/fruits-360/Training/Raspberry/179_100.jpg     |Raspberry     |
|file:/home/clement/Documents/P8_data/fruits/fruits-360_dataset/fruits-360/Training/Pineapple Mini/170_100.jpg|Pineapple Mini|
|file:/home/clement/Document

## 2) PCA : Creation and fitting a pipeline

In [10]:
###################################
##### Standard Scaling + PCA ######
###################################

# Initialisation of the two stages of the pipeline
Scalerizer = StandardScaler(inputCol="features", outputCol="Scaled_features")
PCA_model = PCA(k=200,inputCol="Scaled_features", outputCol="pca_features")


# Creation of the pipeline
pipeline = Pipeline(stages=[Scalerizer, PCA_model])


# fitting of the pipeline
pipeline_model = pipeline.fit(features_df)

# Explained variance of the 200 first components
print("Variance explained by the 200 first components: ",pipeline_model.stages[1].explainedVariance.sum())



23/02/17 11:12:09 WARN MemoryStore: Not enough space to cache rdd_31_12 in memory! (computed 34.2 MiB so far)
23/02/17 11:12:09 WARN BlockManager: Persisting block rdd_31_12 to disk instead.
23/02/17 11:12:09 WARN MemoryStore: Not enough space to cache rdd_31_12 in memory! (computed 34.2 MiB so far)




23/02/17 11:13:08 WARN MemoryStore: Not enough space to cache rdd_31_13 in memory! (computed 34.0 MiB so far)
23/02/17 11:13:08 WARN BlockManager: Persisting block rdd_31_13 to disk instead.
23/02/17 11:13:08 WARN MemoryStore: Not enough space to cache rdd_31_13 in memory! (computed 34.0 MiB so far)




23/02/17 11:14:06 WARN MemoryStore: Not enough space to cache rdd_31_14 in memory! (computed 34.0 MiB so far)
23/02/17 11:14:06 WARN BlockManager: Persisting block rdd_31_14 to disk instead.
23/02/17 11:14:07 WARN MemoryStore: Not enough space to cache rdd_31_14 in memory! (computed 34.0 MiB so far)




23/02/17 11:15:11 WARN MemoryStore: Not enough space to cache rdd_31_15 in memory! (computed 33.7 MiB so far)
23/02/17 11:15:11 WARN BlockManager: Persisting block rdd_31_15 to disk instead.
23/02/17 11:15:12 WARN MemoryStore: Not enough space to cache rdd_31_15 in memory! (computed 33.7 MiB so far)




23/02/17 11:16:16 WARN MemoryStore: Not enough space to cache rdd_31_16 in memory! (computed 33.6 MiB so far)
23/02/17 11:16:16 WARN BlockManager: Persisting block rdd_31_16 to disk instead.
23/02/17 11:16:17 WARN MemoryStore: Not enough space to cache rdd_31_16 in memory! (computed 33.6 MiB so far)




23/02/17 11:17:21 WARN MemoryStore: Not enough space to cache rdd_31_17 in memory! (computed 33.6 MiB so far)
23/02/17 11:17:21 WARN BlockManager: Persisting block rdd_31_17 to disk instead.
23/02/17 11:17:22 WARN MemoryStore: Not enough space to cache rdd_31_17 in memory! (computed 33.6 MiB so far)




23/02/17 11:18:23 WARN MemoryStore: Not enough space to cache rdd_31_18 in memory! (computed 33.8 MiB so far)
23/02/17 11:18:23 WARN BlockManager: Persisting block rdd_31_18 to disk instead.
23/02/17 11:18:23 WARN MemoryStore: Not enough space to cache rdd_31_18 in memory! (computed 33.8 MiB so far)




23/02/17 11:19:27 WARN MemoryStore: Not enough space to cache rdd_31_19 in memory! (computed 33.8 MiB so far)
23/02/17 11:19:27 WARN BlockManager: Persisting block rdd_31_19 to disk instead.
23/02/17 11:19:27 WARN MemoryStore: Not enough space to cache rdd_31_19 in memory! (computed 33.8 MiB so far)




23/02/17 11:19:37 WARN MemoryStore: Not enough space to cache rdd_31_12 in memory! (computed 34.2 MiB so far)
23/02/17 11:19:37 WARN MemoryStore: Not enough space to cache rdd_31_13 in memory! (computed 34.0 MiB so far)




23/02/17 11:19:38 WARN MemoryStore: Not enough space to cache rdd_31_14 in memory! (computed 34.0 MiB so far)




23/02/17 11:19:38 WARN MemoryStore: Not enough space to cache rdd_31_15 in memory! (computed 33.7 MiB so far)




23/02/17 11:19:38 WARN MemoryStore: Not enough space to cache rdd_31_16 in memory! (computed 33.6 MiB so far)




23/02/17 11:19:39 WARN MemoryStore: Not enough space to cache rdd_31_17 in memory! (computed 33.6 MiB so far)




23/02/17 11:19:39 WARN MemoryStore: Not enough space to cache rdd_31_18 in memory! (computed 33.8 MiB so far)




23/02/17 11:19:39 WARN MemoryStore: Not enough space to cache rdd_31_19 in memory! (computed 33.8 MiB so far)




23/02/17 11:20:16 WARN MemoryStore: Not enough space to cache rdd_31_12 in memory! (computed 34.2 MiB so far)




23/02/17 11:20:20 WARN MemoryStore: Not enough space to cache rdd_31_13 in memory! (computed 34.0 MiB so far)
23/02/17 11:20:23 WARN MemoryStore: Not enough space to cache rdd_31_14 in memory! (computed 34.0 MiB so far)




23/02/17 11:20:26 WARN MemoryStore: Not enough space to cache rdd_31_15 in memory! (computed 33.7 MiB so far)




23/02/17 11:20:29 WARN MemoryStore: Not enough space to cache rdd_31_16 in memory! (computed 33.6 MiB so far)




23/02/17 11:20:33 WARN MemoryStore: Not enough space to cache rdd_31_17 in memory! (computed 33.6 MiB so far)




23/02/17 11:20:36 WARN MemoryStore: Not enough space to cache rdd_31_18 in memory! (computed 33.8 MiB so far)
23/02/17 11:20:39 WARN MemoryStore: Not enough space to cache rdd_31_19 in memory! (computed 33.8 MiB so far)


                                                                                

23/02/17 11:20:43 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
23/02/17 11:20:43 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
0.7980893660716227


In [11]:
####################################################
##### Applying of the pipeline : transform data ####
####################################################

features_df = pipeline_model.transform(features_df)

features_df.show(5)


## 3) Exporting the data and the pipeline

In [None]:
###################################
##### Export output ###############
###################################

# Export the dataframe (parquet format)
features_df.write.mode("overwrite").parquet(PATH_Result)


# Export the fitted pipeline (scaling+PCA)
pipeline_model.write().overwrite().save(PATH_Pipeline)