# Full Pipeline - Few Images

Assume that we are using the PySpark kernel

## Read files recursively

In [101]:
import tensorflow as tf
import tensorflow_hub as hub

import requests
from PIL import Image
from io import BytesIO

#import matplotlib.pyplot as plt
import numpy as np

from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.linalg import DenseVector, VectorUDT, DenseMatrix

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
import pandas

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
this version of pandas is incompatible with numpy < 1.17.3
your numpy version is 1.16.5.
Please upgrade numpy to >= 1.17.3 to use this pandas version
Traceback (most recent call last):
  File "/usr/local/lib64/python3.7/site-packages/pandas/__init__.py", line 22, in <module>
    from pandas.compat import (
  File "/usr/local/lib64/python3.7/site-packages/pandas/compat/__init__.py", line 15, in <module>
    from pandas.compat.numpy import (
  File "/usr/local/lib64/python3.7/site-packages/pandas/compat/numpy/__init__.py", line 27, in <module>
    f"this version of pandas is incompatible with numpy < {_min_numpy_ver}\n"
ImportError: this version of pandas is incompatible with numpy < 1.17.3
your numpy version is 1.16.5.
Please upgrade numpy to >= 1.17.3 to use this pandas version



In [4]:
import matplotlib.pyplot as plt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Matplotlib requires numpy>=1.17; you have 1.16.5
Traceback (most recent call last):
  File "/usr/local/lib64/python3.7/site-packages/matplotlib/__init__.py", line 208, in <module>
    _check_versions()
  File "/usr/local/lib64/python3.7/site-packages/matplotlib/__init__.py", line 204, in _check_versions
    raise ImportError(f"Matplotlib requires {modname}>={minver}; "
ImportError: Matplotlib requires numpy>=1.17; you have 1.16.5



In [5]:
# https://sparkbyexamples.com/spark/spark-read-binary-file-into-dataframe/
df = spark.read.format("image").option("recursiveFileLookup", True).load("s3://multimedia-commons/data/images/000/")
#df = spark.read.format("image").option("recursiveFileLookup", True).load("s3://multimedia-commons/data/images/00*")
#df = spark.read.format("image").option("recursiveFileLookup", True).load("s3://multimedia-commons/data/images/{00*,01*}")
#df = spark.read.format("image").option("recursiveFileLookup", True).load("s3://multimedia-commons/data/images/{00*,01*}").persist()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = true)
 |    |-- width: integer (nullable = true)
 |    |-- nChannels: integer (nullable = true)
 |    |-- mode: integer (nullable = true)
 |    |-- data: binary (nullable = true)

In [7]:
print(f'Number of images found = {df.count()}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of images found = 5

## Convert image to NDArray
*Runs quickly for large amounts of data*

In [8]:
# https://stackoverflow.com/a/69215982/11262633
import pyspark.sql.functions as F
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import DenseVector, VectorUDT

@F.udf(returnType=VectorUDT())
def img2vec(image):
    try:
        image_np = DenseVector(ImageSchema.toNDArray(image).flatten())
    except:
        image_np = None 
    return image_np

print(f'Image fields = {ImageSchema.imageFields}')
df_new = df.withColumn('vecs',img2vec('image')).persist()
df_new.show()
#df_new.select('vecs').first().asDict().keys()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Image fields = ['origin', 'height', 'width', 'nChannels', 'mode', 'data']
+--------------------+--------------------+
|               image|                vecs|
+--------------------+--------------------+
|{s3://multimedia-...|[31.0,30.0,34.0,1...|
|{s3://multimedia-...|[255.0,231.0,195....|
|{s3://multimedia-...|[48.0,50.0,51.0,3...|
|{s3://multimedia-...|[0.0,0.0,0.0,0.0,...|
|{s3://multimedia-...|[0.0,0.0,0.0,0.0,...|
+--------------------+--------------------+

## List images where the conversion failed

*Q:  Slow for large amounts of data.  Why?*

In [9]:
df_null = df_new.where(df_new.vecs.isNull()).select('image.origin')
df_null.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+
|origin|
+------+
+------+

## Set up ML

In [10]:
#@title Helper functions for loading image (hidden)

original_image_cache = {}

def preprocess_image(image):
  image = np.array(image)
  # reshape into shape [batch_size, height, width, num_channels]
  img_reshaped = tf.reshape(image, [1, image.shape[0], image.shape[1], image.shape[2]])
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  image = tf.image.convert_image_dtype(img_reshaped, tf.float32)
  return image

def load_image_from_url(img_url):
  """Returns an image with shape [1, height, width, num_channels]."""
  user_agent = {'User-agent': 'Colab Sample (https://tensorflow.org)'}
  response = requests.get(img_url, headers=user_agent)
  image = Image.open(BytesIO(response.content))
  image = preprocess_image(image)
  return image

def load_image(image_url, image_size=256, dynamic_size=False, max_dynamic_size=512):
  """Loads and preprocesses images."""
  # Cache image file locally.
  if image_url in original_image_cache:
    img = original_image_cache[image_url]
  elif image_url.startswith('https://'):
    img = load_image_from_url(image_url)
  else:
    fd = tf.io.gfile.GFile(image_url, 'rb')
    img = preprocess_image(Image.open(fd))
  original_image_cache[image_url] = img
  # Load and convert to float32 numpy array, add batch dimension, and normalize to range [0, 1].
  img_raw = img
  if tf.reduce_max(img) > 1.0:
    img = img / 255.
  if len(img.shape) == 3:
    img = tf.stack([img, img, img], axis=-1)
  if not dynamic_size:
    img = tf.image.resize_with_pad(img, image_size, image_size)
  elif img.shape[1] > max_dynamic_size or img.shape[2] > max_dynamic_size:
    img = tf.image.resize_with_pad(img, max_dynamic_size, max_dynamic_size)
  return img, img_raw

def show_image(image, title=''):
  image_size = image.shape[1]
  w = (image_size * 6) // 320
  plt.figure(figsize=(w, w))
  plt.imshow(image[0], aspect='equal')
  plt.axis('off')
  plt.title(title)
  plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
def to_img(row):
    row_dict = row.asDict()  
    image_dict = row_dict['image'].asDict()
    image_data = image_dict['data']
    h = image_dict['height']
    w = image_dict['width']
    c = image_dict['nChannels']
    img_b = bytes(image_data)
    # https://stackoverflow.com/a/50026948/11262633
    img_pil = Image.frombytes('RGB', (h,w), img_b, 'raw')
    return img_pil, c

def img_to_t(image):
    return preprocess_image(image)

def to_img_t(row):
    image, c = to_img(row)
    return img_to_t(image)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Hardcoded values for TensorFlow Hub

In [12]:
model_name = "efficientnetv2-s"
model_handle = 'https://tfhub.dev/google/imagenet/efficientnet_v2_imagenet1k_s/classification/2'
image_size = 224
dynamic_size = False
labels_file = "https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Get classes

In [13]:
#download labels and creates a maps
downloaded_file = tf.keras.utils.get_file("labels.txt", origin=labels_file)

classes = []

with open(downloaded_file) as f:
  labels = f.readlines()
  classes = [l.strip() for l in labels]
print(f'Downloaded {len(classes)} classes')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloaded 1001 classes

In [14]:
classifier = hub.load(model_handle)

input_shape = (1, image_size, image_size, 3) #image.shape
warmup_input = tf.random.uniform(input_shape, 0, 1.0)
warmup_logits = classifier(warmup_input).numpy()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Test locally

In [15]:
rows = df.take(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
for idx, row in enumerate(rows):
    image, c = to_img(row)
    print(idx, image.size, c)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0 (333, 500) 3
1 (375, 500) 3
2 (333, 500) 3
3 (500, 293) 3
4 (260, 500) 3

In [17]:
# TODO Use a custom function, not a loop
for idx, row in enumerate(rows):
    
    row_dict = row.image.asDict()
    
    print(row_dict['origin'])
    
    image = to_img_t(row)
    
    # Run model on image
    probabilities = tf.nn.softmax(classifier(image)).numpy()

    top_5 = tf.argsort(probabilities, axis=-1, direction="DESCENDING")[0][:5].numpy()
    np_classes = np.array(classes)

    # Some models include an additional 'background' class in the predictions, so
    # we must account for this when reading the class labels.
    includes_background_class = probabilities.shape[1] == 1001

    for i, item in enumerate(top_5):
      class_index = item if includes_background_class else item + 1
      line = f'({i+1}) {class_index:4} - {classes[class_index]}: {probabilities[0][top_5][i]}'
      print(line)

    #show_image(image, '')
    
    if idx > 2:
        break

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://multimedia-commons/data/images/000/9b1/0009b10615497f91833b17d86281.jpg
(1)  905 - window screen: 0.42160648107528687
(2)  906 - window shade: 0.2104795128107071
(3)  670 - mosquito net: 0.02144552953541279
(4)  795 - shower curtain: 0.014206805266439915
(5)  754 - radiator: 0.010209481231868267
s3://multimedia-commons/data/images/000/4bb/0004bb7bbb2676da9b22642423e90.jpg
(1)  905 - window screen: 0.7016111612319946
(2)  906 - window shade: 0.042685624212026596
(3)  754 - radiator: 0.010777434334158897
(4)  800 - sliding door: 0.003164273453876376
(5)  557 - fire screen: 0.0031357111874967813
s3://multimedia-commons/data/images/000/3d1/0003d13b530ab3dba22749272de6c.jpg
(1)  905 - window screen: 0.48613327741622925
(2)  906 - window shade: 0.020183252170681953
(3)  816 - spider web: 0.013462544418871403
(4)  592 - handkerchief: 0.01244745310395956
(5)  133 - American egret: 0.01186402142047882
s3://multimedia-commons/data/images/000/5e5/0005e54d1faf7b25ccaec519387a.jpg
(1)  906 - w

Demonstrate the ability to call Python functions within Spark

In [18]:
rdd = df_new.rdd.mapPartitions(lambda iter: [row.image.origin for row in iter])
for x in rdd.collect():
    print(x)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://multimedia-commons/data/images/000/9b1/0009b10615497f91833b17d86281.jpg
s3://multimedia-commons/data/images/000/4bb/0004bb7bbb2676da9b22642423e90.jpg
s3://multimedia-commons/data/images/000/3d1/0003d13b530ab3dba22749272de6c.jpg
s3://multimedia-commons/data/images/000/5e5/0005e54d1faf7b25ccaec519387a.jpg
s3://multimedia-commons/data/images/000/24a/00024a73d1a4c32fb29732d56a2.jpg

In [19]:
rdd = df_new.rdd.mapPartitions(lambda iter: [to_img_t(row).shape for row in iter])
for x in rdd.collect():
    print(x)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1, 500, 333, 3)
(1, 500, 375, 3)
(1, 500, 333, 3)
(1, 293, 500, 3)
(1, 500, 260, 3)

## Build call to ML within Spark

A simple UDF

In [21]:
_udf = F.udf(lambda image: image.origin)  # Works
df_show = df_new.withColumn('t',_udf(df_new['image']))
for x in df_show.select('t').collect():
    print(x)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(t='s3://multimedia-commons/data/images/000/9b1/0009b10615497f91833b17d86281.jpg')
Row(t='s3://multimedia-commons/data/images/000/4bb/0004bb7bbb2676da9b22642423e90.jpg')
Row(t='s3://multimedia-commons/data/images/000/3d1/0003d13b530ab3dba22749272de6c.jpg')
Row(t='s3://multimedia-commons/data/images/000/5e5/0005e54d1faf7b25ccaec519387a.jpg')
Row(t='s3://multimedia-commons/data/images/000/24a/00024a73d1a4c32fb29732d56a2.jpg')

In [22]:
def to_img_np(image):
    image_data = image.data
    h = image.height
    w = image.width
    c = image.nChannels
    img_b = bytes(image_data)
    # https://stackoverflow.com/a/50026948/11262633
    img_pil = Image.frombytes('RGB', (h,w), img_b, 'raw')
    img_np = np.asarray(img_pil)
    return img_np

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
_udf = F.udf(lambda image: DenseVector(to_img_np(image).shape), VectorUDT()) 
df_show = df_new.withColumn('img_np',_udf(df_new['image']))
for x in df_show.select('img_np').collect():
    print(x)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(img_np=DenseVector([500.0, 333.0, 3.0]))
Row(img_np=DenseVector([500.0, 375.0, 3.0]))
Row(img_np=DenseVector([500.0, 333.0, 3.0]))
Row(img_np=DenseVector([293.0, 500.0, 3.0]))
Row(img_np=DenseVector([500.0, 260.0, 3.0]))

## Add ML

*Q:  How can I debug the UDF - write entries to a log file?  E.g. the data type of ```result_stx```*

*Q:  How can I convert a multi-dimensional NDArray to a structure such as a DenseMatrix?.  Use https://stackoverflow.com/a/48333361/11262633?*

In [104]:
top_5 = tf.argsort(probabilities, axis=-1, direction="DESCENDING")[0][:5].numpy() # Shape (5,)
#return top_5
top_5_prob = probabilities[0][top_5.astype(np.int32)]
result_stx = np.stack([top_5, top_5_prob])
result_stx.tolist()
Array(result_stx.shape[0], result_stx.shape[1], result_stx.flatten())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[[905.0, 132.0, 977.0, 904.0, 753.0], [0.12334741652011871, 0.07040168344974518, 0.048677053302526474, 0.03721398115158081, 0.033004648983478546]]

In [106]:
from pyspark.sql.functions import struct

def eval_image(data_tuple):
  classifier = hub.load(model_handle)
  w = data_tuple[0]
  h = data_tuple[1]
  c = data_tuple[2]
  vecs = data_tuple[3]
  # reshape into shape [batch_size, height, width, num_channels]
  img_reshaped = tf.reshape(vecs, [1, h, w, c])
  # Use `convert_image_dtype` to convert to floats in the [0,1] range.
  image = tf.image.convert_image_dtype(img_reshaped, tf.float32)
  # Run model on image
  results = classifier(image)
  probabilities = tf.nn.softmax(results).numpy()
  top_5 = tf.argsort(probabilities, axis=-1, direction="DESCENDING")[0][:5].numpy() # Shape (5,)
  #return top_5
  top_5_prob = probabilities[0][top_5.astype(np.int32)]
  result_stx = np.stack([top_5, top_5_prob])
  return result_stx.tolist()

@F.udf(returnType=ArrayType(FloatType()))
def _udf(image):
  result_stx = eval_image(image)
  return result_stx #DenseVector(result_stx)

#https://stackoverflow.com/questions/44067861/pyspark-add-a-new-column-with-a-tuple-created-from-columns
df_new2 = df_new.withColumn('c',struct(df_new.image.width, df_new.image.height, df_new.image.nChannels, df_new.vecs))

df_show = df_new2.withColumn('ml_result',_udf(df_new2['c']))

ml_result = df_show.select('ml_result').collect()
for x in ml_result:
    print(x)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(ml_result=[None, None])
Row(ml_result=[None, None])
Row(ml_result=[None, None])
Row(ml_result=[None, None])
Row(ml_result=[None, None])