## 1. ETL images into a Delta table

---
* Use [flowers dataset](https://www.tensorflow.org/datasets/catalog/tf_flowers) hosted under `dbfs:/databricks-datasets`.
* Use binary file data source from Apache Spark to store images in a Spark table.
* Extract image metadata and store them together with image data.
* Use Delta Lake to simplify data management.

In [1]:
import io
import numpy as np
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, regexp_extract
from PIL import Image

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

### The flowers dataset

We use the [flowers dataset](https://www.tensorflow.org/datasets/catalog/tf_flowers) from the TensorFlow team as our example dataset,
which contains flower photos stored under five sub-directories, one per class.
It is hosted under Databricks Datasets for easy access.

In [2]:
ls fs/databricks-datasets/flower_photos

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

ls: cannot access 'fs/databricks-datasets/flower_photos': No such file or directory


### Load images into a DataFrame using binary file data source.

Databricks Runtime 5.4 and above support the binary file data source, which reads binary files and converts each file into a single record that contains the raw content and metadata of the file.

In [3]:
images = spark.read.format("binaryFile") \
  .option("recursiveFileLookup", "true") \
  .option("pathGlobFilter", "*.jpg") \
  .load("/databricks-datasets/flower_photos")

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

###Expand the DataFrame with extra metadata columns.

We extract some frequently used metadata from `images` DataFrame:
* extract labels from file paths,
* extract image sizes.

In [4]:
def extract_label(path_col):
  """Extract label from file path using built-in SQL functions."""
  return regexp_extract(path_col, "flower_photos/([^/]+)", 1)

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

In [5]:
def extract_size(content):
  """Extract image size from its raw content."""
  image = Image.open(io.BytesIO(content))
  return image.size

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

In [6]:
@pandas_udf("width: int, height: int")
def extract_size_udf(content_series):
  sizes = content_series.apply(extract_size)
  return pd.DataFrame(list(sizes))

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

In [7]:
df = images.select(
  col("path"),
  col("modificationTime"),
  extract_label(col("path")).alias("label"),
  extract_size_udf(col("content")).alias("size"),
  col("content"))

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

In [19]:
df.limit(5).show()

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

object.__init__() takes exactly one argument (the instance to initialize)
This is deprecated in traitlets 4.2.This error will be raised in a future release of traitlets.
  super(Widget, self).__init__(**kwargs)


+--------------------+-------------------+----------+----------+--------------------+
|                path|   modificationTime|     label|      size|             content|
+--------------------+-------------------+----------+----------+--------------------+
|dbfs:/databricks-...|2019-12-11 22:18:32|    tulips|[500, 441]|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2019-12-11 22:18:00|sunflowers|[500, 333]|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2019-12-11 22:18:52|    tulips|[500, 333]|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2019-12-11 22:17:56|sunflowers|[500, 290]|[FF D8 FF E0 00 1...|
|dbfs:/databricks-...|2019-12-11 22:16:30|     daisy|[500, 322]|[FF D8 FF E0 00 1...|
+--------------------+-------------------+----------+----------+--------------------+



### Save as a Delta table.

In [14]:
%sql CREATE DATABASE IF NOT EXISTS ml_tmp

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

UsageError: Line magic function `%sql` not found.


In [16]:
# Image data is already compressed. So we turn off Parquet compression.
spark.conf.set("spark.sql.parquet.compression.codec", "uncompressed")
df.write.format("delta").mode("overwrite").saveAsTable("ml_tmp.flowers")

Accordion(children=(VBox(),), layout=Layout(display='none'), selected_index=None)

object.__init__() takes exactly one argument (the instance to initialize)
This is deprecated in traitlets 4.2.This error will be raised in a future release of traitlets.
  super(Widget, self).__init__(**kwargs)


###Make SQL queries (optional).

In [17]:
%sql SELECT COUNT(*) FROM ml_tmp.flowers WHERE label = 'daisy'

count(1)
633


In [18]:
%sql SELECT label, COUNT(*) AS cnt FROM ml_tmp.flowers
  WHERE size.width >= 400 AND size.height >= 400
  GROUP BY label ORDER BY cnt

label,cnt
tulips,2
sunflowers,3
dandelion,6
daisy,6
roses,7
