## Spark Streaming, Storage Optimizations, and Delta Lake
### Learning Objectives
By the end of this notebook, you will be able to:
- Explain how Structured Streaming works internally
- Distinguish event time from processing time
- Understand column pruning, predicate pushdown, and partition pruning
- Read Spark physical execution plans
- Explain what Delta Lake adds on top of Parquet
- Use time travel, MERGE, and streaming writes with Delta Lake


## Part 1 – Structured Streaming

### Why Streaming?
In real systems, data does not arrive all at once. Logs, sensor readings, financial transactions, and user interactions are **continuous and unbounded**.

Structured Streaming allows Spark to process this infinite data using the same DataFrame API you already know.

Key mental model:

**A stream is an unbounded table to which rows are continuously appended.**


### Spark Session
We begin by creating a Spark session. This is identical to batch Spark.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("StructuredStreamingLecture") \
    .getOrCreate()

### Defining a Schema
In streaming, Spark cannot infer schema repeatedly. We must define it explicitly.

In [None]:
schema = '''
pickup_datetime TIMESTAMP,
passenger_count INT,
fare_amount DOUBLE
'''

### Reading Streaming Data
This simulates streaming by watching a directory where new Parquet files arrive continuously.

In [None]:
stream_df = spark.readStream \
    .schema(schema) \
    .parquet('https://github.com/msfasha/307401-Big-Data/blob/main/20251/datasets/yellow_tripdata_2025-10.parquet')

### Event Time, Watermarks, and Windows
Real streaming data often arrives late. Spark uses **watermarks** to limit how long it waits for late data and to manage state size.

In [None]:
from pyspark.sql.functions import col, window

agg_df = stream_df \
    .withWatermark('pickup_datetime', '10 minutes') \
    .groupBy(window(col('pickup_datetime'), '5 minutes')) \
    .count()

### Starting the Streaming Query
Streaming computations only begin when we write the result.

In [None]:
query = agg_df.writeStream \
    .format('console') \
    .outputMode('append') \
    .option('checkpointLocation', '/tmp/checkpoints/ny_taxi') \
    .start()

## Part 2 – Why Spark Is Fast

Spark performance is largely due to **how it reads data**, not just how fast it computes.

We now examine column pruning, predicate pushdown, and partition pruning using Parquet.

### Reading Parquet Data
Parquet is a columnar format optimized for analytics.

In [None]:
taxi_df = spark.read.parquet('/data/ny_taxi_parquet/')

### Column Pruning and Predicate Pushdown
Spark only reads the columns we need and pushes filters into the Parquet reader.

In [None]:
filtered_df = taxi_df \
    .select('pickup_datetime', 'fare_amount', 'passenger_count') \
    .filter((col('fare_amount') > 20) & (col('passenger_count') == 2))

### Examining the Physical Plan
Understanding the physical plan is essential for debugging performance issues.

In [None]:
filtered_df.explain(True)

### Partitioning Data
Partitioning allows Spark to skip entire directories during reads.

In [None]:
taxi_df.write \
    .mode('overwrite') \
    .partitionBy('pickup_year', 'pickup_month') \
    .parquet('/data/ny_taxi_partitioned/')

### Partition Pruning in Action

In [None]:
spark.read.parquet('/data/ny_taxi_partitioned/') \
    .filter(col('pickup_year') == 2022) \
    .explain(True)

## Part 3 – Delta Lake

Parquet alone does not support transactions, updates, or deletes. Delta Lake adds these capabilities using a transaction log.

### Writing a Delta Table

In [None]:
taxi_df.write \
    .format('delta') \
    .mode('overwrite') \
    .save('/data/delta/ny_taxi')

### Reading Delta Data

In [None]:
delta_df = spark.read \
    .format('delta') \
    .load('/data/delta/ny_taxi')

### Time Travel
Delta Lake allows querying previous versions of data.

In [None]:
old_df = spark.read \
    .format('delta') \
    .option('versionAsOf', 0) \
    .load('/data/delta/ny_taxi')

### MERGE (Upserts)
Delta Lake supports SQL-style upserts using MERGE.

In [None]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, '/data/delta/ny_taxi')
updates_df = spark.read.parquet('/data/ny_taxi_updates/')

In [None]:
delta_table.alias('t') \
    .merge(updates_df.alias('u'), 't.trip_id = u.trip_id') \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

### Streaming into Delta Lake
This enables production-grade lakehouse architectures.

In [None]:
stream_df.writeStream \
    .format('delta') \
    .outputMode('append') \
    .option('checkpointLocation', '/tmp/checkpoints/delta') \
    .start('/data/delta/ny_taxi_streaming')