## 1. <font color="7400b8">Set-up Spark Session and Import necessary packages</font>

### 1.1 <font color="006d77">Create a Spark Session</font>

In [1]:
# Import Spark Session from pyspark
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.appName("oc-project-08").getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1646598025477_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* Ensure that Spark Session has been created

In [2]:
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f509a9f6a90>

* Ensure that a SparkContext has been created

In [3]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-2>

### 1.2 <font color="006d77">Install missing Packages</font>

* Install **img2vec-pytorch** ==> To preprocess images & extract features using pre-trained models

In [5]:
sc.install_pypi_package("img2vec-pytorch")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting img2vec-pytorch
  Using cached img2vec_pytorch-1.0.1-py3-none-any.whl (6.9 kB)
Collecting torch
  Using cached torch-1.10.2-cp36-cp36m-manylinux1_x86_64.whl (881.9 MB)
Collecting torchvision
  Using cached torchvision-0.11.2-cp36-cp36m-manylinux1_x86_64.whl (23.3 MB)
Collecting typing-extensions
  Using cached typing_extensions-4.1.1-py3-none-any.whl (26 kB)
Collecting dataclasses
  Using cached dataclasses-0.8-py3-none-any.whl (19 kB)
Collecting pillow!=8.3.0,>=5.3.0
  Using cached Pillow-8.4.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
Collecting torch
  Using cached torch-1.10.1-cp36-cp36m-manylinux1_x86_64.whl (881.9 MB)
Installing collected packages: typing-extensions, dataclasses, torch, pillow, torchvision, img2vec-pytorch
Successfully installed dataclasses-0.8 img2vec-pytorch-1.0.1 pillow-8.4.0 torch-1.10.1 torchvision-0.11.2 typing-extensions-4.1.1

### 1.3 <font color="006d77">Import necessary packages</font>

In [6]:
# Import Imf2vec to extract features
from img2vec_pytorch import Img2Vec

# Import necessary tools from pyspark
from pyspark.sql.functions import udf, col, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA

# import other needed packages
import numpy as np
from PIL import Image

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

- check the libraries that are already available on the cluster

In [7]:
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1
boto                       2.49.0
dataclasses                0.8
img2vec-pytorch            1.0.1
jmespath                   0.9.4
lxml                       4.4.2
mysqlclient                1.4.6
nltk                       3.4.5
nose                       1.3.4
numpy                      1.19.5
pandas                     1.1.5
Pillow                     8.4.0
pip                        21.3.1
py-dateutil                2.2
python-dateutil            2.8.2
python36-sagemaker-pyspark 1.2.6
pytz                       2019.3
PyYAML                     3.11
setuptools                 59.6.0
six                        1.13.0
soupsieve                  1.9.5
torch                      1.10.1
torchvision                0.11.2
typing_extensions          4.1.1
wheel                      0.37.1
windmill                   1.6

## 2. <font color="7400b8">Load data, extract features and conduct a PCA</font>

### 2.1 <font color="006d77">Visualize available buckets on S3</font>

In [8]:
s3_training_data_url = "s3://oc-project-08-data-v2/Training/*"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.2 <font color="006d77">Create a Spark DataFame with images</font>

In [9]:
image_df = spark.read.format("image").load(s3_training_data_url)
image_df = image_df.withColumn('fruit_label', split(col('image.origin'), '/').getItem(4))
image_df = image_df.select('image.origin', 'image.data','fruit_label')
image_df.cache()
image_df.show(truncate=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------+
|              origin|                data|   fruit_label|
+--------------------+--------------------+--------------+
|s3://oc-project-0...|[FF FC FF FF FD F...|     Raspberry|
|s3://oc-project-0...|[FF FF FC FF FF F...|     Raspberry|
|s3://oc-project-0...|[FF FF FF FF FF F...|Pineapple_Mini|
|s3://oc-project-0...|[FB FF FE FB FF F...|     Raspberry|
|s3://oc-project-0...|[FA FF FC FA FF F...|     Raspberry|
|s3://oc-project-0...|[FF FF FB FF FF F...|     Raspberry|
|s3://oc-project-0...|[FF FF FF FF FF F...|Pineapple_Mini|
|s3://oc-project-0...|[FD FF FE FF FF F...|     Raspberry|
|s3://oc-project-0...|[FF FE FD FF FF F...|     Raspberry|
|s3://oc-project-0...|[FB FF FE FB FF F...|     Raspberry|
|s3://oc-project-0...|[FA FF FE F9 FE F...|     Raspberry|
|s3://oc-project-0...|[FF FF FC FF FF F...|     Raspberry|
|s3://oc-project-0...|[FD FF FB FB FF F...|     Raspberry|
|s3://oc-project-0...|[F8 FF FC FA FF F...|     Raspberr

In [10]:
print(f'Total Columns: {len(image_df.dtypes)}')
image_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 3
root
 |-- origin: string (nullable = true)
 |-- data: binary (nullable = true)
 |-- fruit_label: string (nullable = true)

### 2.3 <font color="006d77">Extract features from each images</font>

* Initiate Img2Vec method to use a pre-trained model for feature extraction

In [11]:
# create a udf function
img2vec = Img2Vec(cuda=False, model='resnet-18', layer='avgpool', layer_output_size=512)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /var/lib/livy/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100.0%

* Create an UDF function to extract features from pictures

In [12]:
@udf("float")
def extract_features(image_binary):
    
    # Create Pillow image from binary data
    img = Image.fromarray(np.array(image_binary).reshape(100,100,3)[:,:,::-1], 'RGB')
    
    # Extract feature from pillow image
    vec = img2vec.get_vec(img, tensor=False)
    
    return vec.tolist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* Create an UDF function to convert extracted features to vectors

In [13]:
list_to_vector_convertor = udf(lambda X: Vectors.dense(X), VectorUDT())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* Extract features

In [14]:
features_df = image_df.select(col("origin"), col("fruit_label"), extract_features(col("data")).alias("features"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* Convert extracted features to vector

In [15]:
features_df = features_df.select(col("origin"), col("fruit_label"),list_to_vector_convertor(col("features")).alias("features"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
features_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- origin: string (nullable = true)
 |-- fruit_label: string (nullable = true)
 |-- features: vector (nullable = true)

In [17]:
features_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|              origin|   fruit_label|            features|
+--------------------+--------------+--------------------+
|s3://oc-project-0...|     Raspberry|[1.89547717571258...|
|s3://oc-project-0...|     Raspberry|[1.91805100440979...|
|s3://oc-project-0...|Pineapple_Mini|[1.43753802776336...|
|s3://oc-project-0...|     Raspberry|[1.94135844707489...|
|s3://oc-project-0...|     Raspberry|[1.43012189865112...|
|s3://oc-project-0...|     Raspberry|[1.97930002212524...|
|s3://oc-project-0...|Pineapple_Mini|[2.56533122062683...|
|s3://oc-project-0...|     Raspberry|[0.85269927978515...|
|s3://oc-project-0...|     Raspberry|[1.57502710819244...|
|s3://oc-project-0...|     Raspberry|[2.03965735435485...|
|s3://oc-project-0...|     Raspberry|[1.44332206249237...|
|s3://oc-project-0...|     Raspberry|[1.78123617172241...|
|s3://oc-project-0...|     Raspberry|[1.84725892543792...|
|s3://oc-project-0...|     Raspberry|[1.73818552494049..

### 2.4 <font color="006d77">Dimensions reduction using PCA</font>

In [18]:
pca = PCA(k=10, inputCol="features", outputCol="pca")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
pca_model = pca.fit(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
pca_df = pca_model.transform(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
pca_df.select("origin","fruit_label","pca").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|              origin|   fruit_label|                 pca|
+--------------------+--------------+--------------------+
|s3://oc-project-0...|     Raspberry|[0.83938632517928...|
|s3://oc-project-0...|     Raspberry|[1.51890357198909...|
|s3://oc-project-0...|Pineapple_Mini|[5.46140825657934...|
|s3://oc-project-0...|     Raspberry|[1.70523477449207...|
|s3://oc-project-0...|     Raspberry|[1.97242142102054...|
|s3://oc-project-0...|     Raspberry|[1.81949561730243...|
|s3://oc-project-0...|Pineapple_Mini|[4.34348116308045...|
|s3://oc-project-0...|     Raspberry|[2.69360202161039...|
|s3://oc-project-0...|     Raspberry|[1.31938850854930...|
|s3://oc-project-0...|     Raspberry|[1.15301992213103...|
|s3://oc-project-0...|     Raspberry|[1.78302998257043...|
|s3://oc-project-0...|     Raspberry|[1.54436818387076...|
|s3://oc-project-0...|     Raspberry|[1.62521060515476...|
|s3://oc-project-0...|     Raspberry|[1.26411323099643..

## 3. <font color="7400b8"> Save dataframe after PCA</font>

* change pca column data type

In [27]:
pca_df = pca_df.withColumn("pca",pca_df.pca.cast('string'))
pca_df = pca_df.select("origin", "fruit_label", "pca")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
pca_df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[origin: string, fruit_label: string, pca: string]

In [33]:
pca_df.repartition(1).write.csv("s3://oc-project-08-data-v2/Images_extracted_features/pca_df.csv")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…