In [14]:
from pyspark.sql import SparkSession
import os 
# from pyspark.context import SparkContext
# # Get a reference to the Spark Session
# sc = SparkContext()
## spark = SparkSession(sc)

# multiprocessing
#tuple(multiprocessing.Pool(4).imap(print, (1, 2, 3)))

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .config("spark.driver.memory", "16g")\
        .getOrCreate()

In [15]:
import pandas as pd
from PIL import Image
import numpy as np
import io

from keras.preprocessing import image
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql.functions import col, pandas_udf, PandasUDFType



In [16]:
# https://docs.databricks.com/_static/notebooks/deep-learning/deep-learning-transfer-learning-keras.html

Calculer les features en utilisant udf
Charger la dataframe

In [17]:
path = "Fruit-Images/"
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(path)

display(images.limit(5))

DataFrame[path: string, modificationTime: timestamp, length: bigint, content: binary]

In [18]:
images.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



Préparation du modèle : 
Chargement du modèle sans la dernière couche

In [19]:
model = VGG16(weights=None, include_top=False)
model.summary()

Model: "vgg16"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, None, None, 3)]   0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, None, None, 64)    1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, None, None, 64)    36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, None, None, 64)    0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, None, None, 128)   73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, None, None, 128)   147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, None, None, 128)   0     

In [20]:
sc = spark.sparkContext
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
    """
    Returns a VGG16 model with top layer removed and broadcasted pretrained weights.
    """
    model = VGG16(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

In [21]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

In [22]:
@pandas_udf('array<Double>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [23]:
features_df = images.repartition(16).select(col("path"), featurize_udf("content").alias("features"))

In [24]:
features_df.printSchema()

root
 |-- path: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [25]:

features_df.show()

+--------------------+--------------------+
|                path|            features|
+--------------------+--------------------+
|file:/home/jo/Doc...|[0.12922628223896...|
|file:/home/jo/Doc...|[0.12837351858615...|
|file:/home/jo/Doc...|[0.11402831971645...|
|file:/home/jo/Doc...|[0.10962662845849...|
|file:/home/jo/Doc...|[0.13984775543212...|
|file:/home/jo/Doc...|[0.10361862182617...|
|file:/home/jo/Doc...|[0.10676220059394...|
|file:/home/jo/Doc...|[0.15082231163978...|
|file:/home/jo/Doc...|[0.12748508155345...|
|file:/home/jo/Doc...|[0.11399635672569...|
|file:/home/jo/Doc...|[0.13498100638389...|
+--------------------+--------------------+



In [26]:
# get the number of paritions
print(features_df.rdd.getNumPartitions())

16


In [27]:
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA

In [28]:
to_vector_udf = udf(lambda a: Vectors.dense(a), VectorUDT())

In [29]:
# data = features_df.select(to_vector_udf("features").alias("features_vec"))
data = features_df.select("features")

df = data.select(udf(Vectors.dense,VectorUDT())(*data.columns)).toDF("features")


In [30]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA


In [31]:
scaler = StandardScaler(
    inputCol = "features",
    outputCol = "scaledFeatures",
    withMean = True,
    withStd = True).fit(df)
df_scaled = scaler.transform(df)
df_scaled.show()

+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[0.13498100638389...|[0.76108909324869...|
|[0.12748508155345...|[0.26311856886238...|
|[0.15082231163978...|[1.81346131731481...|
|[0.12922628223896...|[0.37879030224211...|
|[0.12837351858615...|[0.32213936550394...|
|[0.10962662845849...|[-0.9232571557601...|
|[0.10361862182617...|[-1.3223820510724...|
|[0.10676220059394...|[-1.1135473038130...|
|[0.11402831971645...|[-0.6308432704167...|
|[0.11399635672569...|[-0.6329666411322...|
|[0.13984775543212...|[1.08439777502265...|
+--------------------+--------------------+



In [32]:
pca = PCA(k = 20,
         inputCol = 'scaledFeatures',
         outputCol = 'pcaFeatures').fit(df_scaled)
print("Explained Variance ratio", pca.explainedVariance.toArray())
df_pca.show()

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 40492)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/jo/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jo/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/home/jo/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
  File "/home/jo/anaconda3/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
 

Py4JError: An error occurred while calling o151.fit