# Import modules

In [1]:
seed = 42 # random state set for reproducibility

# general modules
import numpy as np
import pandas as pd
import os
import PIL

# spark modules
# java 1.8.0_301
# spark 3.1.2
# hadoop 3.2.0
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# PCA ?
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.feature import StringIndexer, StandardScaler
from pyspark.ml.feature import PCA

# modules for featurization with transfer learning
from PIL import Image
import io
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, split


import boto3
import s3fs

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1634031888152_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Start and set up session

In [2]:
from pyspark.sql import SparkSession
# app name
# master is automatically infered by AWS
# access key for spark to connect s3
# secret key for spark to connect s3
# setting file system between hadoop and s3
spark = SparkSession.builder.appName('p8') \
                            .config('spark.hadoop.fs.s3a.access.key', '<your access key>') \
                            .config('spark.hadoop.fs.s3a.secret.key', '<your secret key>') \
                            .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
                            .getOrCreate()

sc = spark.sparkContext
# more settings for s3 and spark
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# connect to s3
client = boto3.client('s3', region_name='us-east-2')
# connexion check
# bucket = client.list_objects(Bucket='s3-p8', Prefix='input_data_light/')
# for content in bucket["Contents"]:
#    key = content["Key"]
#    print(key)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# check that spark is running
nums = sc.parallelize([1,2,3,4])
nums.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

4

In [5]:
# print spark config for checking
configurations = spark.sparkContext.getConf().getAll()
# for conf in configurations:
#     print(conf)
# spark.conf.get("spark.sql.files.maxRecordsPerFile")
# spark.conf.get("spark.sql.execution.arrow.maxRecordsPerBatch)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('spark.driver.host', 'ip-172-31-9-47.us-east-2.compute.internal')
('spark.eventLog.enabled', 'true')
('spark.jars', 'file:/usr/lib/livy/rsc-jars/livy-api-0.7.1-incubating.jar,file:/usr/lib/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar,file:/usr/lib/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar,file:/usr/lib/livy/rsc-jars/netty-all-4.1.17.Final.jar,file:/usr/lib/livy/repl_2.12-jars/commons-codec-1.9.jar,file:/usr/lib/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar,file:/usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar')
('spark.driver.appUIAddress', 'http://ip-172-31-9-47.us-east-2.compute.internal:4040')
('spark.sql.parquet.output.committer.class', 'com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter')
('spark.blacklist.decommissioning.timeout', '1h')
('spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS', '$(hostname -f)')
('spark.ui.proxyBase', '/proxy/application_1634031888152_0004')
('spark.sql.emr.internal.extensions', 'com.amazonaws.emr.s

# Load images

In [7]:
s3_url = "s3a://s3-p8/input_data_light/*"
images = spark.read.format("binaryFile") \
            .option("pathGlobFilter", "*.jpg") \
            .option("recursiveFileLookup", "true") \
            .load(s3_url)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# extracting fruit name from path
images = images.withColumn('label', split(col('path'), '/').getItem(8))
images = images.select('path', 'content', 'label')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Featurization and transfer learning with TensorFlow

https://docs.databricks.com/applications/machine-learning/preprocess-data/transfer-learning-tensorflow.html

In [9]:
# verify that the top layer is removed
model = ResNet50(include_top=False)
# model.summary()  

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
    """
    Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
    """
    model = ResNet50(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
def preprocess(content):
  """
  Preprocesses raw image bytes for prediction.
  """
  img = Image.open(io.BytesIO(content)).resize([224, 224])
  arr = img_to_array(img)
  return preprocess_input(arr)

def featurize_series(model, content_series):
  """
  Featurize a pd.Series of raw images using the input model.
  :return: a pd.Series of image features
  """
  input = np.stack(content_series.map(preprocess))
  preds = model.predict(input)
  # For some layers, output features will be multi-dimensional tensors.
  # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
  output = [p.flatten() for p in preds]
  return pd.Series(output)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  '''
  This method is a Scalar Iterator pandas UDF wrapping our featurization function.
  The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
  :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
  '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [13]:
# Pandas UDFs on large records (e.g., very large images) can run into Out Of Memory (OOM) errors.
# If you hit such errors in the cell below, try reducing the Arrow batch size via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# We can now run featurization on our entire Spark DataFrame.
features_df = images.select(col("path"), col("label"), featurize_udf("content").alias("features"))
# features_df.write.mode("overwrite").parquet("C:/Users/VP/Documents/OC/P8/img_features")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# PCA

In [15]:
# convert features from list to vector.dense
# it is done using a user defined function
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.select(col("path"),  col("label"), list_to_vector_udf(features_df["features"]).alias("features"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# features have to be standardized for PCA
standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features',
                              outputCol='feats_scaled')
std = standardizer.fit(features_df)
features_df_scaled = std.transform(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
pca = PCA(k=7, inputCol="feats_scaled", outputCol="pca")
modelpca = pca.fit(features_df_scaled)
transformed = modelpca.transform(features_df_scaled)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# convert the spark dataframe to pd.DataFrame and save it as csv locally
pd_transformed = transformed.select("path", "label", "pca").toPandas()
pd_transformed.to_csv("s3a://s3-p8/output_data.csv", index=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…