In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xvzf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyarrow
try:
  # %tensorflow_version only exists in Colab.
  !pip install --disable-pip-version-check install tf-nightly
except Exception:
  pass


spark-3.1.1-bin-hadoop3.2/
spark-3.1.1-bin-hadoop3.2/NOTICE
spark-3.1.1-bin-hadoop3.2/kubernetes/
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/python_executable_check.py
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/autoscale.py
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/worker_memory_check.py
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/py_container_checks.py
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/decommissioning.py
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/pyfiles.py
spark-3.1.1-bin-hadoop3.2/kubernetes/tests/decommissioning_cleanup.py
spark-3.1.1-bin-hadoop3.2/kubernetes/dockerfiles/
spark-3.1.1-bin-hadoop3.2/kubernetes/dockerfiles/spark/
spark-3.1.1-bin-hadoop3.2/kubernetes/dockerfiles/spark/decom.sh
spark-3.1.1-bin-hadoop3.2/kubernetes/dockerfiles/spark/entrypoint.sh
spark-3.1.1-bin-hadoop3.2/kubernetes/dockerfiles/spark/bindings/
spark-3.1.1-bin-hadoop3.2/kubernetes/dockerfiles/spark/bindings/R/
spark-3.1.1-bin-hadoop3.2/kubernetes/docker

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
from __future__ import absolute_import, division, print_function, unicode_literals

In [5]:
import tensorflow as tf

INFO:tensorflow:Enabling eager execution
INFO:tensorflow:Enabling v2 tensorshape
INFO:tensorflow:Enabling resource variables
INFO:tensorflow:Enabling tensor equality
INFO:tensorflow:Enabling control flow v2


# Dataformatting

In [6]:
!wget https://www.cs.toronto.edu/%7Ekriz/cifar-10-python.tar.gz

--2021-04-17 16:57:55--  https://www.cs.toronto.edu/%7Ekriz/cifar-10-python.tar.gz
Resolving www.cs.toronto.edu (www.cs.toronto.edu)... 128.100.3.30
Connecting to www.cs.toronto.edu (www.cs.toronto.edu)|128.100.3.30|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 170498071 (163M) [application/x-gzip]
Saving to: ‘cifar-10-python.tar.gz.7’


2021-04-17 16:57:57 (95.7 MB/s) - ‘cifar-10-python.tar.gz.7’ saved [170498071/170498071]



In [7]:
!tar xvf cifar-10-python.tar.gz

cifar-10-batches-py/
cifar-10-batches-py/data_batch_4
cifar-10-batches-py/readme.html
cifar-10-batches-py/test_batch
cifar-10-batches-py/data_batch_3
cifar-10-batches-py/batches.meta
cifar-10-batches-py/data_batch_2
cifar-10-batches-py/data_batch_5
cifar-10-batches-py/data_batch_1


In [8]:
!ls cifar-10-batches-py/

batches.meta  data_batch_2  data_batch_4  readme.html
data_batch_1  data_batch_3  data_batch_5  test_batch


In [9]:
def load_cifar10_batch(cifar10_dataset_folder_path, batch_id):
  import pickle
  with open(cifar10_dataset_folder_path + '/data_batch_' + str(batch_id), mode='rb') as file:
      # note the encoding type is 'latin1'
      batch = pickle.load(file, encoding='latin1')
      
  features = batch['data'].reshape((len(batch['data']), 3, 32, 32)).transpose(0, 2, 3, 1)
  labels = batch['labels']
      
  return features, labels

def load_label_names():
  return ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

In [10]:
import numpy as np

features_p = np.array([])
labels_p = np.array([])

for batch_id in range(1,6):
  features, labels = load_cifar10_batch('./cifar-10-batches-py', batch_id)
  labels = np.expand_dims(np.squeeze(labels),1)
  if batch_id-1:
    features_acc = np.vstack([features_p, features])
    labels_acc = np.vstack([labels_p, labels])
    features_p = features_acc
    labels_p = labels_acc
  else:
    features_p = features
    labels_p = labels

In [11]:
label_names = load_label_names()

In [12]:
features_acc.shape, labels_acc.shape 

((50000, 32, 32, 3), (50000, 1))

In [13]:
from PIL import Image as im
import os
import shutil
from tqdm.notebook import tqdm

def write_imagenet_format(features_acc, labels_acc, data_path):
  label_names = load_label_names()

  if not os.path.exists(data_path):
    os.makedirs(data_path)

  for label in label_names:
    sub_fold = os.path.join(data_path,label)
    if not os.path.exists(sub_fold):
      os.mkdir(sub_fold)

  for i in tqdm(range(features_acc.shape[0])):
    samp = features_acc[i]
    label = np.squeeze(labels_acc[i])
    data = im.fromarray(samp, 'RGB')
    data_save_path = os.path.join(data_path,label_names[label],str(i)+'.jpg')
    data.save(data_save_path)

In [14]:
train_path = 'c10_data/train_data'
write_imagenet_format(features_acc, labels_acc, train_path)

HBox(children=(FloatProgress(value=0.0, max=50000.0), HTML(value='')))




In [15]:
import pickle
with open('./cifar-10-batches-py/test_batch', mode='rb') as file:
    batch = pickle.load(file, encoding='latin1')
test_features = batch['data'].reshape((len(batch['data']), 3, 32, 32)).transpose(0, 2, 3, 1)
test_labels = batch['labels']

In [16]:
test_path = 'c10_data/test_data'
write_imagenet_format(test_features, test_labels, test_path)

HBox(children=(FloatProgress(value=0.0, max=10000.0), HTML(value='')))




# Read to SPARK

In [18]:
import io

from tensorflow.keras.applications.imagenet_utils import decode_predictions
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
from pyspark.sql.functions import col, pandas_udf, regexp_extract
import pyspark.sql.functions as sqlf
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
from PIL import Image

In [19]:
images = spark.read.format("binaryFile").option("recursiveFileLookup", "true").option("pathGlobFilter", "*.jpg").load('./c10_data/train_data')

In [20]:

def extract_label(path_col):
  """Extract label from file path using built-in SQL functions."""
  return regexp_extract(path_col, "./c10_data/train_data/([^/]+)", 1)

def extract_size(content):
  """Extract image size from its raw content."""
  image = Image.open(io.BytesIO(content))
  return image.size

@pandas_udf("width: int, height: int")
def extract_size_udf(content_series):
  sizes = content_series.apply(extract_size)
  return pd.DataFrame(list(sizes))

df = images.select(
  col("path"),
  col("modificationTime"),
  extract_label(col("path")).alias("label"),
  extract_size_udf(col("content")).alias("size"),
  col("content"))


In [21]:
class ImageNetDataset(Dataset):
  """
  Converts image contents into a PyTorch Dataset with standard ImageNet preprocessing.
  """
  def __init__(self, contents):
    self.contents = contents

  def __len__(self):
    return len(self.contents)

  def __getitem__(self, index):
    return self._preprocess(self.contents[index])

  def _preprocess(self, content):
    """
    Preprocesses the input image content using standard ImageNet normalization.
    
    See https://pytorch.org/docs/stable/torchvision/models.html.
    """
    image = Image.open(io.BytesIO(content))
    transform = transforms.Compose([
      transforms.Resize(256),
      transforms.CenterCrop(224),
      transforms.ToTensor(),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ])
    return transform(image)

In [22]:
def imagenet_model_udf(model_fn):
  """
  Wraps an ImageNet model into a Pandas UDF that makes predictions.
  
  You might consider the following customizations for your own use case:
    - Tune DataLoader's batch_size and num_workers for better performance.
    - Use GPU for acceleration.
    - Change prediction types.
  """
  def predict(content_series_iter):
    model = model_fn()
    model.eval()
    for content_series in content_series_iter:
      dataset = ImageNetDataset(list(content_series))
      loader = DataLoader(dataset, batch_size=64)
      with torch.no_grad():
        for image_batch in loader:
          predictions = model(image_batch).numpy()
          predicted_labels = [x[0] for x in decode_predictions(predictions, top=1)]
          yield pd.DataFrame(predicted_labels)
  return_type = "class: string, desc: string"
  return pandas_udf(return_type, PandasUDFType.SCALAR_ITER)(predict)

In [23]:
mobilenet_v2_udf = imagenet_model_udf(lambda: models.mobilenet_v2(pretrained=True))
predictions = df.withColumn("prediction", mobilenet_v2_udf(col("content")))
prediction_mobil = predictions.select(col("label"),col("prediction.desc").alias("mobilenetv2 prediction"))
prediction_mobil.show(25,False)



+----------+----------------------+
|label     |mobilenetv2 prediction|
+----------+----------------------+
|frog      |rock_python           |
|bird      |pinwheel              |
|truck     |bearskin              |
|automobile|mousetrap             |
|truck     |oil_filter            |
|truck     |thresher              |
|frog      |jaguar                |
|truck     |moving_van            |
|airplane  |waffle_iron           |
|automobile|panpipe               |
|frog      |sidewinder            |
|truck     |airliner              |
|automobile|maraca                |
|truck     |thresher              |
|frog      |clog                  |
|truck     |thresher              |
|truck     |moving_van            |
|frog      |jersey                |
|truck     |thresher              |
|cat       |fire_screen           |
|truck     |thresher              |
|truck     |moving_van            |
|frog      |sidewinder            |
|truck     |tobacco_shop          |
|frog      |custard_apple   

In [24]:
prediction_mobil_ser = prediction_mobil.limit(2500).toPandas()

In [25]:
top_num = 5
for label_name in label_names:
  filt_rows = prediction_mobil_ser.loc[prediction_mobil_ser['label'] == label_name]
  print(f"\n\n ####### Top {top_num} predictions for class {label_name} #######")
  final_rows = filt_rows['mobilenetv2 prediction'].value_counts().nlargest(top_num).to_frame('counts')
  print(final_rows)

# Predictions for different models

In [26]:
resnet50_udf = imagenet_model_udf(lambda: models.resnet50(pretrained=True))
predictions = df.withColumn("prediction", resnet50_udf(col("content")))
predictions_resnet = predictions.select(col("label"),col("prediction.desc").alias("resnet50 prediction"))
ser_df = predictions_resnet.limit(2500).toPandas()
top_num = 2
for label_name in label_names:
  filt_rows = ser_df.loc[ser_df['label'] == label_name]
  print(f"\n\n ####### Top {top_num} predictions for class {label_name} #######")
  final_rows = filt_rows['resnet50 prediction'].value_counts().nlargest(top_num).to_frame('counts')
  print(final_rows)

In [27]:
vgg19_udf = imagenet_model_udf(lambda: models.vgg19(pretrained=True))
predictions = df.withColumn("prediction", vgg19_udf(col("content")))
predictions_vgg19 = predictions.select(col("label"),col("prediction.desc").alias("vgg19 prediction"))
ser_df = predictions_vgg19.limit(2500).toPandas()
top_num = 2
for label_name in label_names:
  filt_rows = ser_df.loc[ser_df['label'] == label_name]
  print(f"\n\n ####### Top {top_num} predictions for class {label_name} #######")
  final_rows = filt_rows['vgg19 prediction'].value_counts().nlargest(top_num).to_frame('counts')
  print(final_rows)





 ####### Top 2 predictions for class airplane #######
           counts
chain_saw       9
thresher        7


 ####### Top 2 predictions for class automobile #######
            counts
moving_van     197
chain_saw       52


 ####### Top 2 predictions for class bird #######
                  counts
fox_squirrel          21
three-toed_sloth       7


 ####### Top 2 predictions for class cat #######
                  counts
fox_squirrel          14
Japanese_spaniel      11


 ####### Top 2 predictions for class deer #######
              counts
fox_squirrel      15
patas              4


 ####### Top 2 predictions for class dog #######
                  counts
Japanese_spaniel      27
Dandie_Dinmont        14


 ####### Top 2 predictions for class frog #######
              counts
fox_squirrel     109
sidewinder        33


 ####### Top 2 predictions for class horse #######
          counts
sorrel        69
thresher      23


 ####### Top 2 predictions for class ship #######
          

In [28]:
densenet121_udf = imagenet_model_udf(lambda: models.densenet121(pretrained=True))
predictions = df.withColumn("prediction", densenet121_udf(col("content")))
predictions_densenet121 = predictions.select(col("label"),col("prediction.desc").alias("densenet121 prediction"))
ser_df = predictions_densenet121.limit(2500).toPandas()
top_num = 2
for label_name in label_names:
  filt_rows = ser_df.loc[ser_df['label'] == label_name]
  print(f"\n\n ####### Top {top_num} predictions for class {label_name} #######")
  final_rows = filt_rows['densenet121 prediction'].value_counts().nlargest(top_num).to_frame('counts')
  print(final_rows)





 ####### Top 2 predictions for class airplane #######
             counts
rock_beauty       5
moving_van        5


 ####### Top 2 predictions for class automobile #######
            counts
moving_van     218
thresher        35


 ####### Top 2 predictions for class bird #######
              counts
fox_squirrel       7
limpkin            6


 ####### Top 2 predictions for class cat #######
               counts
fox_squirrel       12
affenpinscher       6


 ####### Top 2 predictions for class deer #######
              counts
fox_squirrel       9
sorrel             7


 ####### Top 2 predictions for class dog #######
                  counts
Japanese_spaniel      16
Dandie_Dinmont        11


 ####### Top 2 predictions for class frog #######
                  counts
fox_squirrel          98
three-toed_sloth      29


 ####### Top 2 predictions for class horse #######
                 counts
sorrel              111
Indian_elephant      32


 ####### Top 2 predictions for class ship 