# **Lab 8**
## Submitted by ME18B183- Shinde Shubham Sunil

**Getting started**

In [None]:
!sudo apt-get install openjdk-11-jdk

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-11-jdk is already the newest version (11.0.14.1+1-0ubuntu1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 39 not upgraded.


In [None]:
!pip install pyspark
!pip install -q findspark
!pip install pyarrow
try:
  # %tensorflow_version only exists in Colab.
  !pip install tf-estimator-nightly==2.8.0.dev2021122109
except Exception:
  pass



In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
from pyspark.sql.functions import col, pandas_udf, regexp_extract
import io

from tensorflow.keras.applications.imagenet_utils import decode_predictions
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, PandasUDFType

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image

from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
import pathlib
import findspark
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt 

findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Loading CIFAR10 dataset**

In [24]:
data_dir = tf.keras.utils.get_file(origin='http://pjreddie.com/media/files/cifar.tgz',
                                         fname='cifar', untar=True)

print(data_dir)

Downloading data from http://pjreddie.com/media/files/cifar.tgz
/root/.keras/datasets/cifar


In [25]:
images = spark.read.format("binaryFile").option("recursiveFileLookup", "true").option("pathGlobFilter", "*.png").load(data_dir)
print(type(images))

<class 'pyspark.sql.dataframe.DataFrame'>


In [26]:
images.show(5)

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/root/.keras...|2016-11-18 20:24:13|  3354|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:13|  3352|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:12|  3351|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:12|  3349|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:13|  3349|[89 50 4E 47 0D 0...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows



In [27]:
files=images.select('path').rdd.map(lambda x :x.path ).collect()
files[0:10]

['file:/root/.keras/datasets/cifar/test/4672_frog.png',
 'file:/root/.keras/datasets/cifar/test/8562_bird.png',
 'file:/root/.keras/datasets/cifar/train/10327_frog.png',
 'file:/root/.keras/datasets/cifar/train/23455_deer.png',
 'file:/root/.keras/datasets/cifar/train/38450_frog.png',
 'file:/root/.keras/datasets/cifar/train/29550_frog.png',
 'file:/root/.keras/datasets/cifar/train/31532_bird.png',
 'file:/root/.keras/datasets/cifar/train/18370_frog.png',
 'file:/root/.keras/datasets/cifar/test/6801_frog.png',
 'file:/root/.keras/datasets/cifar/train/14628_frog.png']

In [28]:
def extract_label(path_col):
  """Extract label from file path using built-in SQL functions."""
  return regexp_extract(path_col, "_([^/.]+)", 1)

def extract_size(content):
  """Extract image size from its raw content."""
  image = Image.open(io.BytesIO(content))
  return image.size

@pandas_udf("width: int, height: int")
def extract_size_udf(content_series):
  sizes = content_series.apply(extract_size)
  return pd.DataFrame(list(sizes))

df = images.select(
  col("path"),
  col("modificationTime"),
  extract_label(col("path")).alias("label"),
  extract_size_udf(col("content")).alias("size"),
  col("content"))


df.show(5)

+--------------------+-------------------+-----+--------+--------------------+
|                path|   modificationTime|label|    size|             content|
+--------------------+-------------------+-----+--------+--------------------+
|file:/root/.keras...|2016-11-18 20:24:13| frog|{32, 32}|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:13| bird|{32, 32}|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:12| frog|{32, 32}|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:12| deer|{32, 32}|[89 50 4E 47 0D 0...|
|file:/root/.keras...|2016-11-18 20:24:13| frog|{32, 32}|[89 50 4E 47 0D 0...|
+--------------------+-------------------+-----+--------+--------------------+
only showing top 5 rows



In [31]:
class CIFAR10Dataset(Dataset):
  """
  Converts image contents into a PyTorch Dataset with standard cifar10 preprocessing.
  """
  def __init__(self, contents):
    self.contents = contents

  def __len__(self):
    return len(self.contents)

  def __getitem__(self, index):
    return self._preprocess(self.contents[index])

  def _preprocess(self, content):
    image = Image.open(io.BytesIO(content))

    transform = transforms.Compose([
      transforms.Resize(256),
      transforms.ToTensor(),
      transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])
    return transform(image)

In [32]:
def cifar10_model_udf(model_clf):

  """
  Wraps an cifar10 model into a Pandas UDF that makes predictions.
  
  You might consider the following customizations for your own use case:
    - Tune DataLoader's batch_size and num_workers for better performance.
    - Use GPU for acceleration.
    - Change prediction types.
  """

  def predict(content_series_iter : pd.Series) -> pd.DataFrame:
    model = model_clf()
    model.eval()      
    for content_series in content_series_iter:                                                    #Iterates overall all Images 
      dataset = CIFAR10Dataset(list(content_series))                                           
      loader = DataLoader(dataset, batch_size=64) 
      with torch.no_grad():
        for image_batch in loader:
          predictions = model(image_batch).numpy()                                                # Predictions for all 1000 classes of Mobilenetv2 Training Dataset
          predicted_labels = [x[0] for x in decode_predictions(predictions, top=1)]                       
          yield pd.DataFrame(predicted_labels)
    
        
  return_type = "class: string, desc: string, score:float"
  return pandas_udf(return_type, PandasUDFType.SCALAR_ITER)(predict)   

**VGG16 Classification model**

In [33]:
VGG16_udf = cifar10_model_udf(lambda: models.vgg16(pretrained=True))



In [35]:
predictions = df.withColumn("prediction", VGG16_udf(col("content")))
predictions.select(col("label"),col("prediction")).show(20, truncate = False)

+-----+--------------------------------------+
|label|prediction                            |
+-----+--------------------------------------+
|frog |{n02130308, cheetah, 8.439269}        |
|bird |{n02002724, black_stork, 5.3798957}   |
|frog |{n01744401, rock_python, 6.444077}    |
|deer |{n02114712, red_wolf, 5.7382617}      |
|frog |{n02129165, lion, 7.53472}            |
|frog |{n02128925, jaguar, 5.393148}         |
|bird |{n01873310, platypus, 4.491262}       |
|frog |{n01688243, frilled_lizard, 6.8681555}|
|frog |{n02356798, fox_squirrel, 6.2243824}  |
|frog |{n02114712, red_wolf, 8.19412}        |
|deer |{n02356798, fox_squirrel, 9.399976}   |
|bird |{n07248320, book_jacket, 4.6766686}   |
|frog |{n02356798, fox_squirrel, 6.3912754}  |
|frog |{n03447721, gong, 6.701816}           |
|frog |{n02115913, dhole, 8.544827}          |
|frog |{n02356798, fox_squirrel, 5.97714}    |
|frog |{n02128385, leopard, 6.434851}        |
|frog |{n02115913, dhole, 7.0508432}         |
|frog |{n0235

**ResNet50 Classification model**

In [36]:
ResNet50_udf = cifar10_model_udf(lambda: models.resnet50(pretrained=True))



In [38]:
predictions = df.withColumn("prediction", ResNet50_udf(col("content")))
predictions.select(col("label"),col("prediction")).show(20, truncate = False)

+-----+--------------------------------------+
|label|prediction                            |
+-----+--------------------------------------+
|frog |{n02130308, cheetah, 10.426459}       |
|bird |{n01443537, goldfish, 6.834964}       |
|frog |{n01496331, electric_ray, 7.9641924}  |
|deer |{n04525038, velvet, 7.9794655}        |
|frog |{n01688243, frilled_lizard, 7.3185043}|
|frog |{n01496331, electric_ray, 7.7286572}  |
|bird |{n02356798, fox_squirrel, 5.2075057}  |
|frog |{n02356798, fox_squirrel, 8.57772}    |
|frog |{n01644900, tailed_frog, 7.5442767}   |
|frog |{n02129165, lion, 9.034145}           |
|deer |{n02115913, dhole, 10.55313}          |
|bird |{n02356798, fox_squirrel, 10.846921}  |
|frog |{n02356798, fox_squirrel, 9.478152}   |
|frog |{n01644900, tailed_frog, 10.755041}   |
|frog |{n02356798, fox_squirrel, 10.489262}  |
|frog |{n02356798, fox_squirrel, 8.230193}   |
|frog |{n02002724, black_stork, 6.467869}    |
|frog |{n02356798, fox_squirrel, 8.548313}   |
|frog |{n0212

**MobileNetV2 Classification model**

In [39]:
MobileNetV2_udf = cifar10_model_udf(lambda: models.mobilenet_v2(pretrained=True))



In [40]:
predictions = df.withColumn("prediction", MobileNetV2_udf(col("content")))
predictions.select(col("label"),col("prediction")).show(20, truncate = False)

+-----+---------------------------------------+
|label|prediction                             |
+-----+---------------------------------------+
|frog |{n02356798, fox_squirrel, 7.4729676}   |
|bird |{n02002724, black_stork, 8.031027}     |
|frog |{n01744401, rock_python, 9.125906}     |
|deer |{n02356798, fox_squirrel, 7.5615854}   |
|frog |{n02457408, three-toed_sloth, 8.139731}|
|frog |{n01756291, sidewinder, 6.910098}      |
|bird |{n02002724, black_stork, 6.929695}     |
|frog |{n03764736, milk_can, 9.065403}        |
|frog |{n01688243, frilled_lizard, 9.57504}   |
|frog |{n02119789, kit_fox, 10.592067}        |
|deer |{n02422106, hartebeest, 9.47988}       |
|bird |{n02606052, rock_beauty, 6.9106827}    |
|frog |{n02325366, wood_rabbit, 6.7301545}    |
|frog |{n01744401, rock_python, 7.9928703}    |
|frog |{n02356798, fox_squirrel, 10.709429}   |
|frog |{n02356798, fox_squirrel, 7.484116}    |
|frog |{n01744401, rock_python, 7.1383753}    |
|frog |{n02356798, fox_squirrel, 11.2227