# Big Data Analytics - Lab 03

## Setup

In [None]:
#####################################################
## This was no longer working as of 9/9/2025 | MCM ##
## I also tested runtime 2025.07             | MCM ##
#####################################################

In [None]:
# Do not change or modify this cell
# Need to install pyspark
# if pyspark is already installed, will print a message indicating requirement already satisfied
#! pip install pyspark >& /dev/null

In [None]:
# Create Spark Session
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.appName('BDA-Lab-03').getOrCreate()

In [None]:
#########################################
## These updates added 9/10/2025 | MCM ##
#########################################

In [None]:
# Do not change or modify this cell
! pip install pyspark==3.5.1 delta-spark findspark

In [None]:
# Create Spark Session

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark
from delta import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from delta.pip_utils import configure_spark_with_delta_pip

builder = (
    pyspark.sql.SparkSession.builder.appName("BDA-Lab-03")
    .config(
        "spark.sql.extensions"
        ,"io.delta.sql.DeltaSparkSessionExtension"
      )
    .config(
        "spark.sql.catalog.spark_catalog"
        ,"org.apache.spark.sql.delta.catalog.DeltaCatalog"
        ,
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Unstructured Data Storage Formats

From this week's reading in *Essential PySpark for Scalable Data Analytics*:

> Unstructured data is any data that is not represented by a predefined data model and can be either human or machine-generated. For instance, unstructured data could be data stored in plain text documents, PDF documents, sensor data, log files, video files, images, audio files, social media feeds, and more.

Let's start by importing a directory of images, an unstructured data format. We'll then convert this data to a structured format, a `DataFrame`, and write it to a data sink.

In [None]:
%%bash
if [[ ! -f images.zip ]]; then
   # download the data file from s3 and save it the local environment
   wget https://syr-bda.s3.us-east-2.amazonaws.com/images.zip - q
   unzip images.zip
fi

Note that we connect to this entire directory of files as a single source.

In [None]:
images = spark.read.format('image')\
.load('./images/')

images.printSchema()

image_df = images.select('image.origin',
                         'image.height',
                         'image.width',
                         'image.nChannels',
                         'image.mode',
                         'image.data')

image_df.show()

In the above section, we did the following:
- Load a set of image files using Spark's `image` format, resulting in a `DataFrame`.
- Use `printSchema()` which shows us there is nested column called `image` with `origin`, `height`, `width`, `nChannels`, `mode`, and `data`.
- Unnest each of these inner attributes to be top-level column names.
- Display the new `DataFrame` `image_df`.

Next, we'll use `image_df`, which reads unstructured image data and converts it to a `DataFrame`, then perform a moderately complex operation with it. We'll time this operation in order to make comparisons later.

Note that we have not covered Spark `DataFrames` yet. We'll go in depth on this topic next week.

In [None]:
%%timeit
from pyspark.sql.functions import max, lit
image_df.withColumn('max_width',
                    lit(image_df.agg(max('width')).first()[0]))\
                    .where('width == max_width')\
                    .show()

The `data` column is binary type. which is incompatible with `csv`. Before we write the file into a data lake, we need to convert `data` to a base64 string.

We'll use the `withColumn()` and `base64()` functions to convert the `data` column in `image_df` to a base64 string.

In [None]:
from pyspark.sql.functions import base64
image_df2 = image_df.withColumn('data', base64(image_df.data))
image_df2.printSchema()
image_df2.show()

Write `image_df2` to a data lake.

In [None]:
image_df2.write.option('header', 'true')\
.mode('overwrite')\
.csv('./data-lake/images.csv')

Note that `images.csv` is actually a directory containing multiple files. The number of `.csv` files in this directory depends on the number of executors and your particular Spark configuration.

In [None]:
! ls ./data-lake/images.csv/

## Structured Data Storage Formats

From this week's reading in *Essential PySpark for Scalable Data Analytics*:

> **Apache Parquet** is a binary, compressed, and columnar storage format that was designed to be efficient at data storage as well as query performance. Parquet is a first-class citizen of the Apache Spark framework, and Spark's in-memory storage format, called Tungsten, was designed to take full advantage of the Parquet format. Therefore, you will get the best performance and efficiency out of Spark when your data is stored in Parquet format.

In [None]:
image_df.write.parquet('./data-lake/images.parquet', mode = 'overwrite')
parquet_df = spark.read.parquet('./data-lake/images.parquet')
parquet_df.printSchema()
parquet_df.show()

For comparison, let's time the filtering operation from before on the parquet data.

In [None]:
%%timeit
from pyspark.sql.functions import max, lit
parquet_df.withColumn('max_width',
                      lit(parquet_df.agg(max('width')).first()[0]))\
                      .where('width == max_width')\
                      .show()

**Question**: With which format did the filtering operation run faster? Why?

**Answer:**

## Data Lakehouses and Delta Lake

[Delta Lake](https://docs.delta.io/latest/delta-intro.html) is an open source project that enables building a [Lakehouse architecture](https://www.cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf) with Apache Spark. Delta Lake provides [ACID transactions](https://www.databricks.com/glossary/acid-transactions), scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3 (Amazon), ADLS (Microsoft), GCS (Google), and HDFS (Hadoop).

### Setting Up a Delta Lake Session

Before using Delta Lake, we'll need to install a version of PySpark that is compatible with Delta Lake, a new package for Spark called `delta-spark`, and create a new Spark session configured to use Delta Lake.

In [None]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

import pyspark
from delta import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from delta.pip_utils import configure_spark_with_delta_pip

builder = (
    pyspark.sql.SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

### Writing to a Delta Lake

We'll start by creating and saving a very simple data set, the sales data from the last lab.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType

sales_data = [
    ['JAN', 'NY', 3.],
    ['JAN', 'PA', 1.],
    ['JAN', 'NJ', 2.],
    ['JAN', 'CT', 4.],
    ['FEB', 'PA', 1.],
    ['FEB', 'NJ', 1.],
    ['FEB', 'NY', 2.],
    ['FEB', 'VT', 1.],
    ['MAR', 'NJ', 2.],
    ['MAR', 'NY', 1.],
    ['MAR', 'VT', 2.],
    ['MAR', 'PA', 3.]
]

# define a schema for this data set
schema = StructType([
    StructField('month', StringType(), True),
    StructField('state', StringType(), True),
    StructField('sales', FloatType(), True)
])

sales = spark.createDataFrame(sales_data,
                           schema = schema)

sales.show()

Next, we'll save this data in *delta* format.

In [None]:
sales\
  .write.format('delta')\
  .mode('overwrite')\
  .save('./sales')

If we look at the contents of `/sales/`, we can see this is simply a collection of parquet files and a log file. We will inspect this log file soon.

In [None]:
! ls ./sales/

### Reading from a Delta Lake

We can read the sales data as delta format, just like we would a csv file or other format.

In [None]:
sales_delta = spark\
  .read.format('delta')\
  .load('./sales/')

sales_delta.show()

Let's add some data to the table. Specifically, we'll update it wil sales data for the month of April.

In [None]:
# make sure to keep the same schema
april_sales = spark.createDataFrame(
 [['APR', 'NY', 2.0],
  ['APR', 'PA', 1.5],
  ['APR', 'CT', 4.2],
  ['APR', 'NJ', 2.4],
  ['APR', 'WZ', 7.0]],
 schema = schema
)

# make sure to change mode to append!
april_sales\
  .write.format('delta')\
  .mode('append')\
  .save('./sales')

We can see more parquet files have been added to the delta file store.

In [None]:
! ls ./sales

Let's make sure the sales table contains the new April sales data.

In [None]:
sales_delta.show()

### Updating Data

Imagine a scenario in which you find out that past data was not recorded correctly. Traditional database management systems allow for simple table updates, but the nature of a data lake is such that data are distributed across many, many files and often many nodes. This is where Delta Lake improves over a traditional data lake architecture. We can make a change to the data, retain all previous records, and the metadata layer will keep track of which records belong in the current version of the table.

We found out that sales for Pennsylvania in the month of January were understated. Instead of 1.0 it should be 1.5.

In [None]:
sales_delta_table = DeltaTable.forPath(spark, './sales')

sales_delta_table.update(
  condition = (f.col('month') == 'JAN') & (f.col('state') == 'PA'),
  set = { 'sales': f.lit(1.5) }
)

Let's ensure the update was made.

In [None]:
sales_delta.show()

**Activity**: Notice that we introduced a typo when we updated the April sales data: there is now a 5th state mislabled "WZ" (this is supposed to be Washington). Correct this record so the state abbreviation reads "WA" instead. Everthing else in the record should stay the same.

In [None]:
# your code here

In [None]:
# ensure the update was made
sales_delta.show()

### Inspecting the Change Log

The `history` for `sales_delta_table` (from the log  file) contains a record of every operation performed on the `sales` table — from the initial write operation, to the updates we performed above.

In [None]:
sales_delta_table\
  .history()\
  .select('version', 'timestamp', 'operation', 'operationParameters')\
  .show(truncate = False)

In [None]:
! ls ./sales

### Time Travel

Sometimes we need to view or analyze a previous version of the Delta table. For example, we want to use the original version of the table, before we made any updates. We can do this by querying a specific version of the table. Note that `0` is the original version of the table, but we could do this for any version, based on the contents of the change log.

In [None]:
spark\
  .read.format('delta')\
  .option('versionAsOf', 0)\
  .load('./sales')\
  .show()

**Activity**: Instead of a specific version, we could *time travel* using a point in time as a condition. Inspect the change log and update the following code block so that the query will display the table in its original state (matching above), now using a date-time condition instead of a version condition.

Important notes from the Delta Lake [documentation](https://docs.delta.io/latest/delta-batch.html#-deltatimetravel):

> For timestamp_string, only date or timestamp strings are accepted. For example, "2019-01-01" and "2019-01-01T00:00:00.000Z"

>The timestamp of each version N depends on the timestamp of the log file corresponding to the version N in Delta table log. Hence, time travel by timestamp can break if you copy the entire Delta table directory to a new location. Time travel by version will be unaffected.

In [None]:
timestamp_string = ''

spark\
  .read.format('delta')\
  .option('timestampAsOf', timestamp_string)\
  .load('./sales')\
  .show()

Time travelling, as we just did, does not change the Delta table itself — it only shows us a previous version of the table. If we want to fully *roll back* the Delta table, we use the RESTORE operation.

In [None]:
sales_delta_table.restoreToVersion(0)

In [None]:
spark\
  .read.format('delta')\
  .load('./sales')\
  .show()

In [None]:
! ls ./sales

In [None]:
sales_delta_table\
  .history()\
  .select('version', 'timestamp', 'operation', 'operationParameters')\
  .show(truncate = False)

### Merging

Now that we have rolled back (reverted) our sales data to its original state, we no longer have any sales data for April. Suppose we obtained data from another source which includes: the April sales data (without the typo this time); the missing sales data for Washington for the previous 3 months; the corrected record for Pennsylvania in January; and some, but not all, of the data we already have.

**How can we use this data to update out current Delta table?**

According to the Delta Lake [documentation](https://docs.delta.io/latest/delta-update.html#upsert-into-a-table-using-merge&language-python):

> You can upsert data from a source table, view, or DataFrame into a target Delta table by using the MERGE SQL operation. Delta Lake supports inserts, updates and deletes in MERGE, and it supports extended syntax beyond the SQL standards to facilitate advanced use cases.

**Activity:** Read through the merge example provided in the Delta Lake documentation linked above, and adapt it to merge the following data update with the current version of our Delta table. Since we have a mix of new data and old data (but not all of the old data), this is more complex than a simple append. Specifically, we want to insert records we don't currently have, make changes to current records if there is a change (the bad entry from Pennsylvania in January), and leave all other records alone.

In [None]:
sales_updates = spark.createDataFrame(
 [['APR', 'NY', 2.0],
  ['APR', 'PA', 1.5],
  ['APR', 'CT', 4.2],
  ['APR', 'NJ', 2.4],
  ['APR', 'WA', 7.0],
  ['JAN', 'WA', 7.0],
  ['FEB', 'WA', 7.0],
  ['MAR', 'WA', 7.0],
  ['JAN', 'PA', 1.5],
  ['JAN', 'CT', 4.0],
  ['FEB', 'PA', 1.0],
  ['FEB', 'NJ', 1.0],
  ['FEB', 'VT', 1.0]],
 schema = schema
)

In [None]:
# your code here

In [None]:
sales_delta\
  .orderBy('month', 'state')\
  .show()

In [None]:
sales_delta_table\
  .history()\
  .select('version', 'timestamp', 'operation', 'operationParameters')\
  .show(truncate = False)