# Lab: Query Optimization - Solutions

**Objective**: Master Spark query optimization techniques to write efficient data processing pipelines.

**Learning Outcomes**:
- Understand how Catalyst optimizer consolidates filters
- Recognize when caching helps vs hurts performance
- Identify predicate pushdown in explain plans
- Compare columnar (Parquet) vs row-based (CSV) formats
- Apply optimization principles to avoid common anti-patterns

**Estimated Time**: 45 minutes

---

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import time

spark = SparkSession.builder \
    .appName("Lab-Query-Optimization") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.logLevel", "ERROR") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

print(f"üöÄ Spark {spark.version} - Query Optimization Lab")
ui_url = spark.sparkContext.uiWebUrl
print(f"Spark UI: {ui_url}")
print("üí° In GitHub Codespaces: Check the 'PORTS' tab below for forwarded port 4040 to access Spark UI")

## Setup: Load IoT Sensor Data

We'll use IoT sensor readings to demonstrate query optimization techniques. This dataset contains sensor measurements from multiple buildings with various sensor types (temperature, humidity, motion, pressure).

In [None]:
# Load CSV data for initial examples
df = spark.read.csv("../Datasets/iot_sensor_readings.csv", header=True, inferSchema=True)

print("üìä IoT Sensor Dataset:")
print(f"Total records: {df.count():,}")
print(f"\nSchema:")
df.printSchema()
print(f"\nSample data:")
df.show(5, truncate=False)

## Part 1: Logical Optimizations

Spark's **Catalyst optimizer** automatically optimizes query plans. Let's explore how it handles multiple filter transformations.

### Example 1.1: Multiple Chained Filters

It's natural to write filters separately for readability. Let's see what Catalyst does with multiple chained filters.

In [None]:
# Multiple chained filters - might seem inefficient
limit_sensors_df = (df
    .filter(col("sensor_id") != "TEMP_001")
    .filter(col("sensor_id") != "TEMP_002") 
    .filter(col("sensor_id") != "HUMID_001")
    .filter(col("sensor_id") != "HUMID_002")
    .filter(col("sensor_id") != "MOTION_001")
    .filter(col("location") != "Building_A_Floor_1")
    .filter(col("location") != "Building_A_Floor_2")
    .filter(col("location") != "Building_B_Floor_1")
)

print("Multiple chained filters - Explain plan:")
print("="*80)
limit_sensors_df.explain(True)

**Key Observation**: Notice in the **Optimized Logical Plan** that Catalyst automatically consolidates all the filters into a single Filter operation with combined conditions!

### Example 1.2: Single Consolidated Filter

We could write it ourselves with a single filter. Let's compare the plans.

In [None]:
# Single consolidated filter - manually optimized
better_df = (df
    .filter(
        (col("sensor_id").isNotNull()) &
        (col("sensor_id") != "TEMP_001") &
        (col("sensor_id") != "TEMP_002") &
        (col("sensor_id") != "HUMID_001") &
        (col("sensor_id") != "HUMID_002") &
        (col("sensor_id") != "MOTION_001") &
        (col("location") != "Building_A_Floor_1") &
        (col("location") != "Building_A_Floor_2") &
        (col("location") != "Building_B_Floor_1")
    )
)

print("Single consolidated filter - Explain plan:")
print("="*80)
better_df.explain(True)

**Key Observation**: The **Optimized Logical Plan** is nearly identical! Catalyst produces the same optimized query regardless of how you write it. Write for readability - Catalyst handles optimization.

### Example 1.3: Duplicate Filters

In complex queries, you might accidentally duplicate filter conditions. Let's see how Catalyst handles this.

In [None]:
# Duplicate filters - accidentally filtering same condition multiple times
duplicate_df = (df
    .filter(col("status") != "ERROR")
    .filter(col("status") != "ERROR")  # Duplicate
    .filter(col("status") != "ERROR")  # Duplicate
    .filter(col("status") != "ERROR")  # Duplicate
    .filter(col("status") != "ERROR")  # Duplicate
)

print("Duplicate filters - Explain plan:")
print("="*80)
duplicate_df.explain(True)

print("\n" + "="*80)
print("Result: Catalyst eliminates duplicates and creates a single filter!")

### Exercise 1.1: Filter Optimization Analysis

**Task**: Create two DataFrames:
1. One with 5 chained filters on different columns
2. One with the same conditions combined in a single filter

Use `explain()` to verify the optimized plans are identical, then confirm both produce the same count.

In [None]:
# Solution: Filter Optimization Exercise
print("üéØ Exercise 1.1: Filter Optimization Analysis\n")

# Approach 1: Chained filters
exercise_chained = (df
    .filter(col("status") == "NORMAL")
    .filter(col("unit") == "celsius")
    .filter(col("value") > 5)
    .filter(col("value") < 15)
    .filter(col("battery_level") > 70)
)

# Approach 2: Consolidated filter
exercise_consolidated = df.filter(
    (col("status") == "NORMAL") &
    (col("unit") == "celsius") &
    (col("value") > 5) &
    (col("value") < 15) &
    (col("battery_level") > 70)
)

# Show explain plans
print("Chained filters explain plan:")
exercise_chained.explain()

print("\n" + "="*80)
print("Consolidated filter explain plan:")
exercise_consolidated.explain()

# Verify same results
chained_count = exercise_chained.count()
consolidated_count = exercise_consolidated.count()

print("\n" + "="*80)
print(f"Chained approach: {chained_count:,} records")
print(f"Consolidated approach: {consolidated_count:,} records")

assert chained_count == consolidated_count, "Counts should match!"
print("\n‚úì Exercise 1.1 completed! Both approaches produce identical optimized plans and results.")

## Part 2: Understanding Caching

By default, DataFrame data exists on a Spark cluster only while being processed during a query. You can explicitly persist a DataFrame using the **`cache()`** method.

### When to Cache (Best Practices)

‚úÖ **DO cache when**:
- **Exploratory data analysis**: Running multiple different queries on the same dataset
- **Machine learning**: Iteratively training models on the same data
- **Iterative algorithms**: Reusing the same DataFrame multiple times

‚ùå **DON'T cache when**:
- Using data only once (no benefit, wastes resources)
- Caching consumes cluster memory that could be used for task execution
- **Caching can prevent query optimizations** (like predicate pushdown, as we'll see next)

‚ö†Ô∏è **Important**: Always call **`unpersist()`** when done with cached data to free up memory!

In [None]:
# Example: When caching makes sense
print("Example: Multiple operations on cached data\n")

# Cache for iterative analysis
analysis_df = df.filter(col("status") == "NORMAL").cache()

# First action triggers caching
print(f"Total NORMAL readings: {analysis_df.count():,}")

# Subsequent operations use cached data (faster)
print(f"Average value: {analysis_df.agg({'value': 'avg'}).collect()[0][0]:.2f}")
print(f"Distinct sensors: {analysis_df.select('sensor_id').distinct().count()}")
print(f"Distinct locations: {analysis_df.select('location').distinct().count()}")

# Always unpersist when done
analysis_df.unpersist()
print("\n‚úì Cache cleared")

### Exercise 2.1: Caching Strategy Analysis

**Task**: Compare performance with and without caching for multiple queries on the same filtered dataset.

In [None]:
# Solution: Caching Strategy Exercise
print("üéØ Exercise 2.1: Caching Strategy Analysis\n")

# Test WITHOUT caching
print("Scenario 1: Without caching (recompute each time)")
base_df = df.filter(col("battery_level") > 50)

start = time.time()
query1 = base_df.count()
query2 = base_df.agg({'value': 'avg'}).collect()[0][0]
query3 = base_df.select('sensor_id').distinct().count()
time_without_cache = time.time() - start

print(f"Time: {time_without_cache:.4f}s")
print(f"Results: {query1:,} records, avg value: {query2:.2f}, {query3} sensors\n")

# Test WITH caching
print("Scenario 2: With caching (compute once, reuse)")
cached_base_df = df.filter(col("battery_level") > 50).cache()

start = time.time()
query1_cached = cached_base_df.count()
query2_cached = cached_base_df.agg({'value': 'avg'}).collect()[0][0]
query3_cached = cached_base_df.select('sensor_id').distinct().count()
time_with_cache = time.time() - start

print(f"Time: {time_with_cache:.4f}s")
print(f"Results: {query1_cached:,} records, avg value: {query2_cached:.2f}, {query3_cached} sensors\n")

# Compare
print("="*80)
if time_with_cache < time_without_cache:
    speedup = time_without_cache / time_with_cache
    print(f"‚úì Caching provided {speedup:.2f}x speedup for multiple queries!")
else:
    print("Note: Speedup varies based on data size and number of reuses")

# Clean up
cached_base_df.unpersist()

assert query1 == query1_cached, "Results should match"
print("\n‚úì Exercise 2.1 completed! Caching benefits multiple operations on same data.")

## Part 3: Predicate Pushdown with Parquet

**Predicate pushdown** is a critical optimization where Spark pushes filter operations down to the data source, reducing the amount of data that needs to be read into memory.

### How it Works:
- **Parquet format** stores column statistics (min/max values) for each row group
- Spark can skip reading entire row groups that don't match filter conditions
- This dramatically reduces I/O and improves query performance

### What to Look for in Explain Plans:
- **`PushedFilters:`** Shows filters pushed to the data source
- **`FileScan parquet`** Shows the Parquet scan operation
- **No separate `Filter` operation** means filtering happens during read

In [None]:
# Load Parquet data
parquet_df = spark.read.parquet("../Datasets/iot_sensor_readings.parquet")

print("üìä Loaded Parquet dataset")
print(f"Total records: {parquet_df.count():,}")

### Example 3.1: Parquet Predicate Pushdown

Let's apply a filter and examine the explain plan to see predicate pushdown in action.

In [None]:
# Filter Parquet data - observe predicate pushdown
filtered_parquet = parquet_df.filter(col("status") == "NORMAL")

print("Parquet with Filter - Explain Plan:")
print("="*80)
filtered_parquet.explain(True)

print("\n" + "="*80)
print("üîç Look for 'PushedFilters: [IsNotNull(status), EqualTo(status,NORMAL)]' in the scan!")
print("This means the filter is applied DURING the read, not after!")

### Example 3.2: CSV vs Parquet Comparison

Let's compare Parquet (with pushdown) vs CSV (without pushdown) to see the difference.

In [None]:
# CSV does NOT support predicate pushdown
csv_df = spark.read.csv("../Datasets/iot_sensor_readings.csv", header=True, inferSchema=True)
filtered_csv = csv_df.filter(col("status") == "NORMAL")

print("CSV with Filter - Explain Plan:")
print("="*80)
filtered_csv.explain(True)

print("\n" + "="*80)
print("üîç Notice: No 'PushedFilters' for CSV!")
print("CSV must read the entire file, then apply the filter in a separate operation.")

### Example 3.3: Performance Comparison

Let's measure the actual performance difference between CSV and Parquet.

In [None]:
# Performance test: CSV vs Parquet
print("‚ö° Performance Comparison: CSV vs Parquet\n")

# Test CSV
csv_df_test = spark.read.csv("../Datasets/iot_sensor_readings.csv", header=True, inferSchema=True)
start = time.time()
csv_filtered = csv_df_test.filter(
    (col("status") == "NORMAL") & 
    (col("value") > 50) & 
    (col("battery_level") > 60)
)
csv_count = csv_filtered.count()
csv_time = time.time() - start

# Test Parquet
parquet_df_test = spark.read.parquet("../Datasets/iot_sensor_readings.parquet")
start = time.time()
parquet_filtered = parquet_df_test.filter(
    (col("status") == "NORMAL") & 
    (col("value") > 50) & 
    (col("battery_level") > 60)
)
parquet_count = parquet_filtered.count()
parquet_time = time.time() - start

# Results
print(f"CSV:     {csv_count:,} records in {csv_time:.4f}s")
print(f"Parquet: {parquet_count:,} records in {parquet_time:.4f}s")
print(f"\nSpeedup: {csv_time/parquet_time:.2f}x faster with Parquet")
print(f"Savings: {((csv_time - parquet_time)/csv_time * 100):.1f}% time reduction")

assert csv_count == parquet_count, "Both should return same results"
print("\n‚úì Results verified - same data, better performance!")

### Exercise 3.1: Predicate Pushdown Analysis

**Task**: Create a complex filter with multiple conditions and analyze the predicate pushdown behavior for both CSV and Parquet formats.

In [None]:
# Solution: Predicate Pushdown Exercise
print("üéØ Exercise 3.1: Predicate Pushdown Analysis\n")

# Complex filter conditions
complex_filter = (
    (col("sensor_id").startswith("TEMP")) &
    (col("value") >= 5) &
    (col("value") <= 10) &
    (col("battery_level") > 75) &
    (col("location").contains("Building_B"))
)

# Test with Parquet
print("Parquet with complex filter:")
parquet_complex = parquet_df.filter(complex_filter)
parquet_complex.explain()

print("\n" + "="*80)

# Test with CSV
print("CSV with same complex filter:")
csv_complex = csv_df.filter(complex_filter)
csv_complex.explain()

print("\n" + "="*80)

# Count results
parquet_result = parquet_complex.count()
csv_result = csv_complex.count()

print(f"\nParquet result: {parquet_result:,} records")
print(f"CSV result: {csv_result:,} records")

assert parquet_result == csv_result, "Both should return same results"

print("\n‚úì Exercise 3.1 completed!")
print("Key insight: Parquet pushed numeric and equality filters to scan, CSV read everything first.")

## Part 4: Preventing Predicate Pushdown (Anti-Pattern)

Caching data **before** filtering prevents predicate pushdown optimization. This is a common anti-pattern to avoid.

### Example 4.1: The Anti-Pattern

Let's see what happens when we cache before filtering.

In [None]:
# Anti-pattern: Cache BEFORE filtering
print("‚ùå Anti-Pattern: Caching before filtering\n")

# Load and cache immediately (before any filters)
cached_parquet = spark.read.parquet("../Datasets/iot_sensor_readings.parquet").cache()

# Force caching with an action
cached_count = cached_parquet.count()
print(f"Cached {cached_count:,} records (entire dataset)\n")

# Now filter the cached data
filtered_from_cache = cached_parquet.filter(col("status") == "NORMAL")

print("Explain plan for filter on cached data:")
print("="*80)
filtered_from_cache.explain(True)

print("\n" + "="*80)
print("üîç Look for 'InMemoryTableScan' followed by 'Filter'!")
print("This means Spark had to:")
print("  1. Read the ENTIRE Parquet file")
print("  2. Cache ALL the data in memory")
print("  3. Scan the cached data and apply the filter")
print("\nPredicate pushdown is lost!")

### Example 4.2: The Correct Pattern

If you must cache, filter FIRST to cache only what you need.

In [None]:
# Correct pattern: Filter BEFORE caching
print("‚úì Correct Pattern: Filtering before caching\n")

# Load, filter, then cache
parquet_filtered_first = (
    spark.read.parquet("../Datasets/iot_sensor_readings.parquet")
    .filter(col("status") == "NORMAL")  # Filter first!
    .cache()  # Then cache the smaller result
)

filtered_count = parquet_filtered_first.count()
print(f"Cached only {filtered_count:,} filtered records (smaller dataset)\n")

print("Benefits:")
print("  1. Predicate pushdown worked during initial read")
print("  2. Less memory used (only caching filtered data)")
print("  3. Faster caching (less data to materialize)")
print(f"\nMemory savings: {((cached_count - filtered_count) / cached_count * 100):.1f}% less data cached")

### Example 4.3: Performance Impact

Let's measure the performance difference between caching before vs after filtering.

In [None]:
# Performance comparison: Cache before vs after filtering
print("‚ö° Performance Impact: Cache Before vs After Filtering\n")

# Clear any existing cache
spark.catalog.clearCache()

# Scenario 1: Cache before filtering (anti-pattern)
start = time.time()
df_cache_first = spark.read.parquet("../Datasets/iot_sensor_readings.parquet").cache()
df_cache_first.count()  # Materialize cache
result1 = df_cache_first.filter(
    (col("status") == "NORMAL") & (col("value") > 50)
).count()
time_cache_before = time.time() - start

print(f"Cache before filtering: {result1:,} results in {time_cache_before:.4f}s")

# Clear cache
df_cache_first.unpersist()
spark.catalog.clearCache()

# Scenario 2: Filter before caching (correct pattern)
start = time.time()
df_filter_first = (
    spark.read.parquet("../Datasets/iot_sensor_readings.parquet")
    .filter((col("status") == "NORMAL") & (col("value") > 50))
    .cache()
)
result2 = df_filter_first.count()  # Materialize cache
time_filter_before = time.time() - start

print(f"Filter before caching: {result2:,} results in {time_filter_before:.4f}s")

# Cleanup
df_filter_first.unpersist()
cached_parquet.unpersist()
parquet_filtered_first.unpersist()
spark.catalog.clearCache()

# Analysis
print("\n" + "="*80)
if time_filter_before < time_cache_before:
    improvement = ((time_cache_before - time_filter_before) / time_cache_before * 100)
    print(f"‚úì Filtering first is {improvement:.1f}% faster!")
    print(f"Speedup: {time_cache_before/time_filter_before:.2f}x")
else:
    print("Note: Performance varies by data size and filter selectivity")

assert result1 == result2, "Both approaches should return same results"
print("\n‚úì Performance comparison completed!")

### Exercise 4.1: Identify Anti-Patterns

**Task**: Review the explain plans for cached vs uncached filtered queries and identify the performance implications.

In [None]:
# Solution: Anti-Pattern Identification Exercise
print("üéØ Exercise 4.1: Identify Anti-Patterns\n")

# Setup test scenarios
spark.catalog.clearCache()

# Scenario A: No caching (baseline with predicate pushdown)
scenario_a = (
    spark.read.parquet("../Datasets/iot_sensor_readings.parquet")
    .filter((col("sensor_id").startswith("HUMID")) & (col("value") > 40))
)

print("Scenario A: No caching (predicate pushdown enabled)")
scenario_a.explain()
print("\n" + "="*80 + "\n")

# Scenario B: Cache full dataset then filter (anti-pattern)
base_cached = spark.read.parquet("../Datasets/iot_sensor_readings.parquet").cache()
base_cached.count()  # Materialize
scenario_b = base_cached.filter(
    (col("sensor_id").startswith("HUMID")) & (col("value") > 40)
)

print("Scenario B: Cache before filter (anti-pattern)")
scenario_b.explain()
print("\n" + "="*80 + "\n")

# Scenario C: Filter then cache (correct if multiple reuses needed)
scenario_c = (
    spark.read.parquet("../Datasets/iot_sensor_readings.parquet")
    .filter((col("sensor_id").startswith("HUMID")) & (col("value") > 40))
    .cache()
)
scenario_c.count()  # Materialize

print("Scenario C: Filter before cache (correct pattern)")
scenario_c.explain()
print("\n" + "="*80 + "\n")

# Verify results
count_a = scenario_a.count()
count_b = scenario_b.count()
count_c = scenario_c.count()

print(f"Results:")
print(f"  Scenario A (no cache): {count_a:,} records")
print(f"  Scenario B (cache before): {count_b:,} records")
print(f"  Scenario C (filter before cache): {count_c:,} records")

# Cleanup
base_cached.unpersist()
scenario_c.unpersist()
spark.catalog.clearCache()

assert count_a == count_b == count_c, "All scenarios should return same results"

print("\n‚úì Exercise 4.1 completed!")
print("\nKey Takeaways:")
print("  ‚úì Scenario A: Best for single-use queries (predicate pushdown)")
print("  ‚ùå Scenario B: Anti-pattern (loses predicate pushdown, wastes memory)")
print("  ‚úì Scenario C: Best for multiple reuses (predicate pushdown + caching only what's needed)")

## Summary: Query Optimization Best Practices

### Key Concepts Mastered:

1. **Catalyst Optimizer**
   - Automatically consolidates multiple filters
   - Eliminates duplicate conditions
   - Write for readability - Catalyst handles efficiency

2. **Caching Strategy**
   - ‚úÖ Cache for iterative operations (EDA, ML training)
   - ‚ùå Don't cache for single-use queries
   - ‚úÖ Always `unpersist()` when done
   - ‚úÖ Filter BEFORE caching to reduce memory usage

3. **Predicate Pushdown**
   - Parquet enables pushdown via column statistics
   - Look for `PushedFilters` in explain plans
   - 2-5x performance improvement typical
   - CSV requires full file scan, then filtering

4. **Anti-Patterns to Avoid**
   - ‚ùå Caching before filtering (loses predicate pushdown)
   - ‚ùå Using CSV for large analytical queries (use Parquet)
   - ‚ùå Caching rarely-used data (wastes resources)

### Performance Impact Summary:

| **Optimization** | **Impact** | **When to Use** |
|-----------------|-----------|----------------|
| Parquet vs CSV | 2-5x faster | Always for analytics |
| Predicate Pushdown | 3-10x faster | Filter on indexed columns |
| Caching (correct) | 2-3x faster | Multiple operations on same data |
| Filter before Cache | 50-80% memory savings | When caching is necessary |

### Best Practices:
1. ‚úÖ Use Parquet for analytical workloads
2. ‚úÖ Apply filters as early as possible
3. ‚úÖ Let Catalyst optimize - write readable code
4. ‚úÖ Cache only when reusing data multiple times
5. ‚úÖ Filter before caching
6. ‚úÖ Always unpersist() cached DataFrames
7. ‚úÖ Use `explain()` to verify optimizations
8. ‚úÖ Monitor Spark UI for query performance

---

**Next Steps**: Apply these optimization techniques to your data pipelines. Use `explain()` regularly to verify that Spark is optimizing your queries as expected!

In [None]:
# Final cleanup
spark.catalog.clearCache()
spark.stop()

print("üéâ Lab: Query Optimization completed!")
print("\n‚úì Learned: Catalyst optimization, caching strategies, predicate pushdown, and anti-patterns")
print("\n‚û°Ô∏è  Next: Apply these techniques to optimize your Spark applications!")