# PySpark Data Processing Pipeline
## NYC Taxi Trip Data Analysis

**Dataset**: NYC Yellow Taxi Trip Data (2023)

**Size**: 1GB+ Parquet files

**Source**: NYC TLC via AWS Open Data Registry

This notebook demonstrates:
1. Data Processing Pipeline with optimizations
2. Performance Analysis using .explain() and Spark UI
3. Actions vs Transformations demonstration
4. Machine Learning with MLlib

## Setup: Initialize Spark Session

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import time

# Initialize Spark Session with optimized configurations
spark = SparkSession.builder \
    .appName("NYC_Taxi_Analysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.parquet.filterPushdown", "true") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

## 1. Data Processing Pipeline (55%)

### Load NYC Taxi Trip Data

We'll use the NYC Yellow Taxi Trip data from January-March 2023 (~3GB combined)

In [None]:
# Load data from AWS S3 (public dataset)
# Alternative: Download locally from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

# For local execution, download files and update paths:
# base_path = "yellow_tripdata_2023-*.parquet"

# For AWS S3 (requires AWS credentials or use public URLs)
base_path = "s3://nyc-tlc/trip data/yellow_tripdata_2023-*.parquet"

# Read Parquet files
print("Loading NYC Taxi Trip Data...")
start_time = time.time()

df_raw = spark.read \
    .format("parquet") \
    .load(base_path)

load_time = time.time() - start_time
print(f"Data loaded in {load_time:.2f} seconds (lazy evaluation - not fully loaded yet)")

# Display schema
print("\nDataset Schema:")
df_raw.printSchema()

In [None]:
# Get basic statistics (this triggers computation)
print("\nBasic Statistics:")
print(f"Total Records: {df_raw.count():,}")
print(f"Number of Partitions: {df_raw.rdd.getNumPartitions()}")

# Show sample data
print("\nSample Data:")
df_raw.show(5, truncate=False)

### Transformation 1: Filter Operations (Early filtering for optimization)

**Optimization Strategy**: Apply filters early in the pipeline to reduce data volume

In [None]:
# Filter 1: Remove invalid trips (fare amount, trip distance, passenger count)
# This is applied EARLY to reduce data volume throughout the pipeline
df_filtered = df_raw.filter(
    (col("fare_amount") > 0) & 
    (col("fare_amount") < 500) &  # Remove outliers
    (col("trip_distance") > 0) & 
    (col("trip_distance") < 100) &  # Remove unrealistic distances
    (col("passenger_count") > 0) & 
    (col("passenger_count") <= 6)
)

# Filter 2: Focus on specific time period and payment types
df_filtered = df_filtered.filter(
    (month(col("tpep_pickup_datetime")).between(1, 3)) &  # Q1 2023
    (col("payment_type").isin([1, 2]))  # Credit card (1) or Cash (2)
)

print("Filters applied (lazy evaluation):")
print("- Valid fare amounts (0-500)")
print("- Valid trip distances (0-100 miles)")
print("- Valid passenger counts (1-6)")
print("- Q1 2023 trips only")
print("- Credit card or cash payments only")

### Transformation 2: Column Transformations with withColumn

Create new features for analysis

In [None]:
# Add calculated columns
df_transformed = df_filtered \
    .withColumn("trip_duration_minutes", 
                (unix_timestamp("tpep_dropoff_datetime") - 
                 unix_timestamp("tpep_pickup_datetime")) / 60) \
    .withColumn("speed_mph", 
                when(col("trip_duration_minutes") > 0, 
                     col("trip_distance") / (col("trip_duration_minutes") / 60))
                .otherwise(0)) \
    .withColumn("fare_per_mile", 
                when(col("trip_distance") > 0, 
                     col("fare_amount") / col("trip_distance"))
                .otherwise(0)) \
    .withColumn("tip_percentage", 
                when(col("fare_amount") > 0, 
                     (col("tip_amount") / col("fare_amount")) * 100)
                .otherwise(0)) \
    .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
    .withColumn("pickup_day_of_week", dayofweek("tpep_pickup_datetime")) \
    .withColumn("is_weekend", 
                when(col("pickup_day_of_week").isin([1, 7]), 1).otherwise(0)) \
    .withColumn("time_of_day",
                when(col("pickup_hour").between(6, 11), "Morning")
                .when(col("pickup_hour").between(12, 17), "Afternoon")
                .when(col("pickup_hour").between(18, 21), "Evening")
                .otherwise("Night"))

# Filter out invalid calculated values
df_transformed = df_transformed.filter(
    (col("trip_duration_minutes") > 0) & 
    (col("trip_duration_minutes") < 120) &  # Less than 2 hours
    (col("speed_mph") < 80)  # Realistic speeds
)

print("New columns created:")
print("- trip_duration_minutes")
print("- speed_mph")
print("- fare_per_mile")
print("- tip_percentage")
print("- pickup_hour, pickup_day_of_week, is_weekend")
print("- time_of_day (Morning/Afternoon/Evening/Night)")

### Transformation 3: GroupBy with Aggregations

In [None]:
# Aggregation 1: Statistics by pickup location and time of day
location_stats = df_transformed.groupBy("PULocationID", "time_of_day") \
    .agg(
        count("*").alias("total_trips"),
        avg("fare_amount").alias("avg_fare"),
        avg("trip_distance").alias("avg_distance"),
        avg("trip_duration_minutes").alias("avg_duration"),
        avg("tip_percentage").alias("avg_tip_pct"),
        sum("total_amount").alias("total_revenue")
    ) \
    .orderBy(desc("total_trips"))

print("\nAggregation 1: Location and Time of Day Statistics")
location_stats.show(10)

# Aggregation 2: Daily statistics
daily_stats = df_transformed.groupBy(
    date_format("tpep_pickup_datetime", "yyyy-MM-dd").alias("date")
) \
    .agg(
        count("*").alias("total_trips"),
        avg("fare_amount").alias("avg_fare"),
        sum("total_amount").alias("daily_revenue"),
        avg("speed_mph").alias("avg_speed")
    ) \
    .orderBy("date")

print("\nAggregation 2: Daily Statistics")
daily_stats.show(10)

### Transformation 4: Join Operation

Create a lookup table for location zones and join with trip data

In [None]:
# Create a simplified zone lookup (in practice, load from taxi_zone_lookup.csv)
# For demonstration, we'll create aggregated location stats
pickup_location_summary = df_transformed.groupBy("PULocationID") \
    .agg(
        count("*").alias("pickup_count"),
        avg("fare_amount").alias("avg_pickup_fare")
    )

dropoff_location_summary = df_transformed.groupBy("DOLocationID") \
    .agg(
        count("*").alias("dropoff_count"),
        avg("fare_amount").alias("avg_dropoff_fare")
    )

# Join pickup and dropoff location statistics
# Using broadcast join for small table optimization
from pyspark.sql.functions import broadcast

location_comparison = pickup_location_summary.join(
    broadcast(dropoff_location_summary),
    pickup_location_summary.PULocationID == dropoff_location_summary.DOLocationID,
    "inner"
) \
    .select(
        pickup_location_summary.PULocationID.alias("LocationID"),
        "pickup_count",
        "dropoff_count",
        "avg_pickup_fare",
        "avg_dropoff_fare"
    ) \
    .withColumn("net_flow", col("pickup_count") - col("dropoff_count")) \
    .orderBy(desc("pickup_count"))

print("\nJoin Result: Pickup vs Dropoff Location Analysis")
location_comparison.show(15)

print("\nJoin Type: Broadcast join used for optimization")
print("Smaller table broadcasted to all executors to avoid shuffle")

### SQL Queries (2+ queries required)

In [None]:
# Register DataFrame as a temporary view for SQL queries
df_transformed.createOrReplaceTempView("taxi_trips")

print("Registered 'taxi_trips' temporary view for SQL queries")

In [None]:
# SQL Query 1: Top 10 busiest hours with payment type breakdown
query1 = """
SELECT 
    pickup_hour,
    COUNT(*) as total_trips,
    AVG(fare_amount) as avg_fare,
    AVG(trip_distance) as avg_distance,
    SUM(CASE WHEN payment_type = 1 THEN 1 ELSE 0 END) as credit_card_trips,
    SUM(CASE WHEN payment_type = 2 THEN 1 ELSE 0 END) as cash_trips,
    AVG(tip_percentage) as avg_tip_pct
FROM taxi_trips
GROUP BY pickup_hour
ORDER BY total_trips DESC
LIMIT 10
"""

result1 = spark.sql(query1)
print("\nSQL Query 1: Top 10 Busiest Hours")
result1.show()

In [None]:
# SQL Query 2: Weekend vs Weekday comparison
query2 = """
SELECT 
    CASE WHEN is_weekend = 1 THEN 'Weekend' ELSE 'Weekday' END as day_type,
    COUNT(*) as total_trips,
    ROUND(AVG(fare_amount), 2) as avg_fare,
    ROUND(AVG(trip_distance), 2) as avg_distance,
    ROUND(AVG(trip_duration_minutes), 2) as avg_duration,
    ROUND(AVG(speed_mph), 2) as avg_speed,
    ROUND(AVG(tip_percentage), 2) as avg_tip_pct,
    ROUND(SUM(total_amount), 2) as total_revenue
FROM taxi_trips
GROUP BY is_weekend
ORDER BY day_type
"""

result2 = spark.sql(query2)
print("\nSQL Query 2: Weekend vs Weekday Analysis")
result2.show()

In [None]:
# SQL Query 3: Route analysis - most profitable routes
query3 = """
SELECT 
    PULocationID as pickup_location,
    DOLocationID as dropoff_location,
    COUNT(*) as trip_count,
    ROUND(AVG(fare_amount), 2) as avg_fare,
    ROUND(AVG(trip_distance), 2) as avg_distance,
    ROUND(SUM(total_amount), 2) as total_revenue,
    ROUND(AVG(fare_per_mile), 2) as avg_fare_per_mile
FROM taxi_trips
WHERE PULocationID != DOLocationID
GROUP BY PULocationID, DOLocationID
HAVING trip_count > 100
ORDER BY total_revenue DESC
LIMIT 20
"""

result3 = spark.sql(query3)
print("\nSQL Query 3: Most Profitable Routes (>100 trips)")
result3.show()

### Write Results to Destination

Write processed data to Parquet format with partitioning for optimization

In [None]:
# Write main transformed dataset (partitioned by date for efficient queries)
output_path = "./output/processed_taxi_data"

print(f"\nWriting processed data to: {output_path}")
df_transformed.write \
    .mode("overwrite") \
    .partitionBy("is_weekend", "time_of_day") \
    .parquet(output_path)

print("✓ Processed data written successfully")

# Write aggregated results
location_stats.write \
    .mode("overwrite") \
    .parquet("./output/location_stats")

daily_stats.write \
    .mode("overwrite") \
    .parquet("./output/daily_stats")

print("✓ Aggregated results written successfully")

## 2. Performance Analysis (20%)

### Query Execution Plan Analysis

In [None]:
# Analyze the execution plan for our main transformation pipeline
print("="*80)
print("PHYSICAL EXECUTION PLAN ANALYSIS")
print("="*80)

# Show the physical plan
df_transformed.explain(mode="extended")

In [None]:
# Analyze query plan for aggregation
print("\n" + "="*80)
print("AGGREGATION QUERY PLAN")
print("="*80)

location_stats.explain(mode="formatted")

In [None]:
# Analyze join operation
print("\n" + "="*80)
print("JOIN OPERATION PLAN (with Broadcast)")
print("="*80)

location_comparison.explain(mode="extended")

### Performance Analysis Summary

**Key Optimizations Implemented:**

1. **Early Filtering (Predicate Pushdown)**
   - Applied filters immediately after loading data
   - Spark pushes these predicates down to the Parquet file reader
   - Reduces data read from disk by skipping irrelevant row groups
   - Observed in execution plan: `PushedFilters` section

2. **Columnar Storage (Parquet)**
   - Used Parquet format which stores data by column
   - Only reads required columns (column pruning)
   - Compression reduces I/O
   - Observed in plan: `ReadSchema` shows only needed columns

3. **Broadcast Join**
   - Used broadcast for joining small location summary with large dataset
   - Avoids expensive shuffle operation
   - Copies small table to all executors
   - Observed in plan: `BroadcastHashJoin` instead of `SortMergeJoin`

4. **Partitioning Strategy**
   - Output data partitioned by is_weekend and time_of_day
   - Future queries on these columns will be much faster
   - Partition pruning eliminates unnecessary file reads

5. **Adaptive Query Execution (AQE)**
   - Enabled AQE to dynamically optimize at runtime
   - Coalesces small partitions
   - Converts sort-merge joins to broadcast joins when beneficial
   - Handles data skew automatically

**Performance Bottlenecks Identified:**

1. **Initial Data Load**: Reading from S3/disk is I/O bound
   - Mitigation: Parquet format with compression
   - Mitigation: Column pruning reduces data read

2. **Shuffle Operations**: GroupBy operations require shuffling
   - Mitigation: Appropriate number of shuffle partitions (200)
   - Mitigation: Pre-filter data to reduce shuffle volume

3. **Wide Transformations**: Multiple aggregations create dependencies
   - Mitigation: Persist intermediate results if reused
   - Mitigation: Use caching for repeated actions

### Caching Optimization (Bonus)

Demonstrate how caching improves performance for repeated actions

In [None]:
# Test 1: Without caching
print("Test 1: Without Caching")
print("-" * 50)

# Create a subset for testing
test_df = df_transformed.sample(fraction=0.1, seed=42)

# First action (count)
start = time.time()
count1 = test_df.count()
time1 = time.time() - start
print(f"First count: {count1:,} records in {time1:.2f} seconds")

# Second action (different aggregation)
start = time.time()
avg_fare = test_df.agg(avg("fare_amount")).collect()[0][0]
time2 = time.time() - start
print(f"Average fare: ${avg_fare:.2f} in {time2:.2f} seconds")

# Third action
start = time.time()
max_distance = test_df.agg(max("trip_distance")).collect()[0][0]
time3 = time.time() - start
print(f"Max distance: {max_distance:.2f} miles in {time3:.2f} seconds")

total_time_uncached = time1 + time2 + time3
print(f"\nTotal time without caching: {total_time_uncached:.2f} seconds")

In [None]:
# Test 2: With caching
print("\n" + "="*50)
print("Test 2: With Caching")
print("-" * 50)

# Cache the DataFrame
test_df_cached = test_df.cache()

# First action (count) - will cache the data
start = time.time()
count1 = test_df_cached.count()
time1 = time.time() - start
print(f"First count (caching): {count1:,} records in {time1:.2f} seconds")

# Second action (will use cached data)
start = time.time()
avg_fare = test_df_cached.agg(avg("fare_amount")).collect()[0][0]
time2 = time.time() - start
print(f"Average fare (cached): ${avg_fare:.2f} in {time2:.2f} seconds")

# Third action (will use cached data)
start = time.time()
max_distance = test_df_cached.agg(max("trip_distance")).collect()[0][0]
time3 = time.time() - start
print(f"Max distance (cached): {max_distance:.2f} miles in {time3:.2f} seconds")

total_time_cached = time1 + time2 + time3
print(f"\nTotal time with caching: {total_time_cached:.2f} seconds")

# Calculate speedup
speedup = total_time_uncached / total_time_cached
print(f"\n{'='*50}")
print(f"CACHING SPEEDUP: {speedup:.2f}x faster")
print(f"Time saved: {total_time_uncached - total_time_cached:.2f} seconds")
print(f"{'='*50}")

# Unpersist to free memory
test_df_cached.unpersist()

## 3. Actions vs Transformations (10%)

### Lazy Evaluation Demonstration

In [None]:
print("="*80)
print("TRANSFORMATIONS vs ACTIONS DEMONSTRATION")
print("="*80)

# Create a simple DataFrame
data = [(1, "Alice", 100), (2, "Bob", 200), (3, "Charlie", 150), 
        (4, "David", 300), (5, "Eve", 250)]
schema = ["id", "name", "amount"]
demo_df = spark.createDataFrame(data, schema)

print("\nOriginal DataFrame:")
demo_df.show()

In [None]:
# TRANSFORMATIONS (Lazy - not executed immediately)
print("\n" + "-"*80)
print("TRANSFORMATIONS (Lazy Evaluation)")
print("-"*80)
print("\nApplying transformations...\n")

# Transformation 1: filter
print("1. Applying filter (amount > 150)...")
start = time.time()
filtered_df = demo_df.filter(col("amount") > 150)
elapsed = time.time() - start
print(f"   Completed in {elapsed*1000:.4f} ms (NO COMPUTATION - just builds plan)")

# Transformation 2: select
print("\n2. Applying select (name, amount)...")
start = time.time()
selected_df = filtered_df.select("name", "amount")
elapsed = time.time() - start
print(f"   Completed in {elapsed*1000:.4f} ms (NO COMPUTATION - just builds plan)")

# Transformation 3: withColumn
print("\n3. Applying withColumn (double_amount)...")
start = time.time()
transformed_df = selected_df.withColumn("double_amount", col("amount") * 2)
elapsed = time.time() - start
print(f"   Completed in {elapsed*1000:.4f} ms (NO COMPUTATION - just builds plan)")

print("\n✓ All transformations applied (but not executed!)")
print("  The DataFrame is NOT computed yet - Spark is just building an execution plan.")

In [None]:
# ACTIONS (Eager - trigger execution)
print("\n" + "="*80)
print("ACTIONS (Eager Evaluation - Triggers Computation)")
print("="*80)

# Action 1: count()
print("\n1. Executing ACTION: count()")
start = time.time()
count_result = transformed_df.count()
elapsed = time.time() - start
print(f"   Result: {count_result} rows")
print(f"   Time: {elapsed*1000:.2f} ms (ACTUAL COMPUTATION PERFORMED)")

# Action 2: show()
print("\n2. Executing ACTION: show()")
start = time.time()
transformed_df.show()
elapsed = time.time() - start
print(f"   Time: {elapsed*1000:.2f} ms (ACTUAL COMPUTATION PERFORMED)")

# Action 3: collect()
print("\n3. Executing ACTION: collect()")
start = time.time()
result_list = transformed_df.collect()
elapsed = time.time() - start
print(f"   Collected {len(result_list)} rows to driver")
print(f"   Time: {elapsed*1000:.2f} ms (ACTUAL COMPUTATION PERFORMED)")

# Action 4: first()
print("\n4. Executing ACTION: first()")
start = time.time()
first_row = transformed_df.first()
elapsed = time.time() - start
print(f"   First row: {first_row}")
print(f"   Time: {elapsed*1000:.2f} ms (ACTUAL COMPUTATION PERFORMED)")

In [None]:
# Summary comparison
print("\n" + "="*80)
print("SUMMARY: Key Differences")
print("="*80)

print("""
TRANSFORMATIONS (Lazy):
├── filter(), select(), withColumn(), groupBy(), join(), etc.
├── Return a new DataFrame (logical plan)
├── No computation happens immediately
├── Very fast (just builds DAG)
└── Multiple transformations are optimized together

ACTIONS (Eager):
├── count(), show(), collect(), first(), take(), write(), etc.
├── Trigger actual computation
├── Execute the entire transformation pipeline
├── Return results to driver or write to storage
└── This is when Spark optimizes and runs the job

WHY LAZY EVALUATION?
├── Spark can optimize the entire query plan
├── Avoid unnecessary computations
├── Predicate pushdown and column pruning
└── Better performance through optimization
""")

## 4. Machine Learning with MLlib (15%)

### Predicting Trip Duration using Random Forest Regressor

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

print("="*80)
print("MACHINE LEARNING: Trip Duration Prediction")
print("="*80)

In [None]:
# Prepare data for ML
# Select features and target variable
ml_df = df_transformed.select(
    "trip_distance",
    "fare_amount",
    "passenger_count",
    "PULocationID",
    "DOLocationID",
    "pickup_hour",
    "pickup_day_of_week",
    "is_weekend",
    "trip_duration_minutes"  # Target variable
).filter(
    (col("trip_duration_minutes") > 1) & 
    (col("trip_duration_minutes") < 60)
).sample(fraction=0.1, seed=42)  # Sample for faster training

print(f"\nML Dataset size: {ml_df.count():,} records")
print("\nFeatures used:")
for col_name in ml_df.columns[:-1]:
    print(f"  - {col_name}")
print(f"\nTarget variable: trip_duration_minutes")

In [None]:
# Split data into training and test sets
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

print(f"\nTraining set: {train_df.count():,} records")
print(f"Test set: {test_df.count():,} records")

In [None]:
# Create feature vector
feature_columns = [
    "trip_distance", "fare_amount", "passenger_count",
    "PULocationID", "DOLocationID", "pickup_hour",
    "pickup_day_of_week", "is_weekend"
]

# Build ML Pipeline
assembler = VectorAssembler(
    inputCols=feature_columns,
    outputCol="features_raw"
)

scaler = StandardScaler(
    inputCol="features_raw",
    outputCol="features",
    withStd=True,
    withMean=True
)

# Random Forest Regressor
rf = RandomForestRegressor(
    featuresCol="features",
    labelCol="trip_duration_minutes",
    predictionCol="predicted_duration",
    numTrees=50,
    maxDepth=10,
    seed=42
)

# Create pipeline
pipeline = Pipeline(stages=[assembler, scaler, rf])

print("\nML Pipeline created:")
print("  1. VectorAssembler: Combine features into vector")
print("  2. StandardScaler: Normalize features")
print("  3. RandomForestRegressor: Train model (50 trees, max depth 10)")

In [None]:
# Train the model
print("\nTraining Random Forest model...")
start = time.time()

model = pipeline.fit(train_df)

training_time = time.time() - start
print(f"✓ Model trained in {training_time:.2f} seconds")

In [None]:
# Make predictions on test set
print("\nMaking predictions on test set...")
predictions = model.transform(test_df)

# Show sample predictions
print("\nSample Predictions:")
predictions.select(
    "trip_distance",
    "fare_amount",
    "pickup_hour",
    "trip_duration_minutes",
    "predicted_duration"
).show(10)

In [None]:
# Evaluate the model
print("\n" + "="*80)
print("MODEL EVALUATION")
print("="*80)

# RMSE
rmse_evaluator = RegressionEvaluator(
    labelCol="trip_duration_minutes",
    predictionCol="predicted_duration",
    metricName="rmse"
)
rmse = rmse_evaluator.evaluate(predictions)

# R2
r2_evaluator = RegressionEvaluator(
    labelCol="trip_duration_minutes",
    predictionCol="predicted_duration",
    metricName="r2"
)
r2 = r2_evaluator.evaluate(predictions)

# MAE
mae_evaluator = RegressionEvaluator(
    labelCol="trip_duration_minutes",
    predictionCol="predicted_duration",
    metricName="mae"
)
mae = mae_evaluator.evaluate(predictions)

print(f"\nModel Performance:")
print(f"  Root Mean Squared Error (RMSE): {rmse:.2f} minutes")
print(f"  Mean Absolute Error (MAE): {mae:.2f} minutes")
print(f"  R-squared (R2): {r2:.4f}")

# Feature importance
rf_model = model.stages[-1]
feature_importance = rf_model.featureImportances

print("\nFeature Importance:")
for i, col_name in enumerate(feature_columns):
    print(f"  {col_name:25s}: {feature_importance[i]:.4f}")

In [None]:
# Additional model: Linear Regression for comparison
print("\n" + "="*80)
print("COMPARISON MODEL: Linear Regression")
print("="*80)

lr = LinearRegression(
    featuresCol="features",
    labelCol="trip_duration_minutes",
    predictionCol="predicted_duration"
)

lr_pipeline = Pipeline(stages=[assembler, scaler, lr])

print("\nTraining Linear Regression model...")
start = time.time()
lr_model = lr_pipeline.fit(train_df)
lr_training_time = time.time() - start
print(f"✓ Model trained in {lr_training_time:.2f} seconds")

# Make predictions
lr_predictions = lr_model.transform(test_df)

# Evaluate
lr_rmse = rmse_evaluator.evaluate(lr_predictions)
lr_r2 = r2_evaluator.evaluate(lr_predictions)
lr_mae = mae_evaluator.evaluate(lr_predictions)

print(f"\nLinear Regression Performance:")
print(f"  RMSE: {lr_rmse:.2f} minutes")
print(f"  MAE: {lr_mae:.2f} minutes")
print(f"  R2: {lr_r2:.4f}")

print("\n" + "="*80)
print("MODEL COMPARISON")
print("="*80)
print(f"\nRandom Forest vs Linear Regression:")
print(f"  Training Time: {training_time:.2f}s vs {lr_training_time:.2f}s")
print(f"  RMSE: {rmse:.2f} vs {lr_rmse:.2f} ({'RF wins' if rmse < lr_rmse else 'LR wins'})")
print(f"  R2: {r2:.4f} vs {lr_r2:.4f} ({'RF wins' if r2 > lr_r2 else 'LR wins'})")

## Summary and Conclusions

In [None]:
print("\n" + "="*80)
print("PROJECT SUMMARY")
print("="*80)

print("""
✓ Data Processing Pipeline (55%)
  - Loaded 1GB+ NYC Taxi Trip data in Parquet format
  - Applied multiple filter operations with early filtering optimization
  - Performed join operation using broadcast optimization
  - Used groupBy with complex aggregations
  - Created new columns using withColumn transformations
  - Executed 3+ SQL queries on the data
  - Wrote results to partitioned Parquet files

✓ Performance Analysis (20%)
  - Analyzed physical execution plans using .explain()
  - Identified optimization strategies:
    • Predicate pushdown for early filtering
    • Column pruning with Parquet
    • Broadcast join for small tables
    • Partition pruning with partitioned output
    • Adaptive Query Execution (AQE)
  - Demonstrated caching benefits (performance improvement)

✓ Actions vs Transformations (10%)
  - Demonstrated lazy evaluation with transformations
  - Showed eager execution with actions
  - Explained benefits of lazy evaluation

✓ Machine Learning (15%)
  - Built trip duration prediction models
  - Compared Random Forest vs Linear Regression
  - Evaluated using RMSE, MAE, and R²
  - Analyzed feature importance
""")

print("="*80)
print("Assignment completed successfully!")
print("="*80)

In [None]:
# Clean up
spark.stop()
print("\nSpark session stopped.")