In [5]:
# Cell 1: Initialize Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns

# Create Spark session with connection to your cluster
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis - Day 11") \
    .master("spark://spark-master:7077") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

# Set log level to reduce verbose output
spark.sparkContext.setLogLevel("WARN")

print("✅ Spark Session Created Successfully!")
print(f"🚀 Spark Version: {spark.version}")
print(f"🖥 Master: {spark.sparkContext.master}")
print(f"📊 Available Cores: {spark.sparkContext.defaultParallelism}")
print(f"🆔 Application ID: {spark.sparkContext.applicationId}")

# Check connection to cluster
print(f"\n🔗 Cluster Connection Status:")
print(f"   Workers connected: Check Spark UI at http://localhost:8080")

✅ Spark Session Created Successfully!
🚀 Spark Version: 3.5.0
🖥 Master: spark://spark-master:7077
📊 Available Cores: 2
🆔 Application ID: app-20250626013602-0001

🔗 Cluster Connection Status:
   Workers connected: Check Spark UI at http://localhost:8080


In [None]:
# Cell 2: Load and Explore Data
print("📂 Loading NYC Taxi Data...")

# Load data with schema inference
taxi_df = spark.read.csv("/home/jovyan/data/nyc_taxi_data.csv",
                        header=True,
                        inferSchema=True)

print(f"📊 Dataset Shape: {taxi_df.count()} rows x {len(taxi_df.columns)} columns")

print("\n🔍 Schema Information:")
taxi_df.printSchema()

print("\n📋 Sample Data (First 5 rows):")
taxi_df.show(5, truncate=False)

print("\n📈 Column Names:")
for i, column in enumerate(taxi_df.columns):
    print(f"  {i+1:2d}. {column}")

print(f"\n🎯 Success! Loaded {taxi_df.count():,} taxi trip records into Spark DataFrame")

📂 Loading NYC Taxi Data...


In [None]:
# It is taking more than 20 minutes to run previous cell.

In [1]:
try:
    spark.stop()
    print("✅ Stopped previous Spark session")
except:
    print("No previous session to stop")

No previous session to stop


In [2]:
# Check if our data file is accessible
import os
import pandas as pd

print("🔍 Checking file accessibility...")
print(f"Current directory: {os.getcwd()}")

# List files in data directory
try:
    files = os.listdir('/home/jovyan/data/')
    print(f"Files in /home/jovyan/data/: {files}")
    
    # Check file size
    if 'nyc_taxi_data.csv' in files:
        file_path = '/home/jovyan/data/nyc_taxi_data.csv'
        file_size = os.path.getsize(file_path)
        print(f"✅ File found: {file_path}")
        print(f"📊 File size: {file_size:,} bytes ({file_size/1024/1024:.1f} MB)")
        
        # Quick check with pandas
        df_test = pd.read_csv(file_path, nrows=5)
        print(f"✅ Pandas can read it: {len(df_test)} rows, {len(df_test.columns)} columns")
        print("First few rows:")
        print(df_test.head())
    else:
        print("❌ nyc_taxi_data.csv not found!")
        
except Exception as e:
    print(f"❌ Error checking files: {e}")

🔍 Checking file accessibility...
Current directory: /home/jovyan
Files in /home/jovyan/data/: ['generate_sample_data.py', 'nyc_taxi_data.csv']
✅ File found: /home/jovyan/data/nyc_taxi_data.csv
📊 File size: 1,253,689 bytes (1.2 MB)
✅ Pandas can read it: 5 rows, 18 columns
First few rows:
   vendor_id tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0          1  2024-01-06 09:42:36   2024-01-06 09:44:36                2   
1          1  2024-01-02 10:42:58   2024-01-02 10:48:58                2   
2          2  2024-01-22 19:21:59   2024-01-22 19:59:59                1   
3          2  2024-01-05 23:05:00   2024-01-05 23:11:00                2   
4          2  2024-01-04 19:48:31   2024-01-04 20:24:31                1   

   trip_distance  pickup_longitude  pickup_latitude  rate_code_id  \
0          14.84        -73.827535        40.698978             1   
1          13.03        -73.984409        40.751071             1   
2           8.51        -73.966543        40.823

In [3]:
# Cell 1: Simple Spark Session (Troubleshooting Version)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

print("🚀 Creating Spark session with simpler config...")

# Create Spark session with local mode first (for troubleshooting)
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis - Troubleshooting") \
    .master("local[2]") \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")  # Reduce noise

print("✅ Spark Session Created!")
print(f"🚀 Spark Version: {spark.version}")
print(f"🖥 Master: {spark.sparkContext.master}")
print(f"📊 Available Cores: {spark.sparkContext.defaultParallelism}")

🚀 Creating Spark session with simpler config...
✅ Spark Session Created!
🚀 Spark Version: 3.5.0
🖥 Master: local[2]
📊 Available Cores: 2


In [4]:
# Cell 2: Simple Data Loading Test
print("📂 Loading data with local Spark...")

try:
    # Very simple loading
    taxi_df = spark.read \
        .option("header", "true") \
        .csv("/home/jovyan/data/nyc_taxi_data.csv")
    
    print("✅ Data loaded!")
    print(f"Row count: {taxi_df.count()}")
    print(f"Columns: {len(taxi_df.columns)}")
    
    # Show first 3 rows
    taxi_df.show(3)
    
except Exception as e:
    print(f"❌ Error: {e}")
    import traceback
    traceback.print_exc()

📂 Loading data with local Spark...
✅ Data loaded!
Row count: 10000
Columns: 18
+---------+--------------------+---------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code_id|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+--------------------+---------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|        1| 2024-01-06 09:42:36|  2024-01-06 09:44:36|              2|        14.84|      -73.827535|      40.698978|           1|        

In [5]:
# Cell 3: Data Quality Analysis
print("🔍 Data Quality Assessment:")

# Check for missing values
print("\n❓ Missing Values per Column:")
missing_counts = taxi_df.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in taxi_df.columns
])
missing_counts.show()

# Basic statistics for numerical columns
print("\n📈 Numerical Columns Statistics:")
taxi_df.describe().show()

# Check data ranges
print("\n🗓 Date Range:")
taxi_df.select(
    min("tpep_pickup_datetime").alias("earliest_pickup"),
    max("tpep_pickup_datetime").alias("latest_pickup"),
    count("*").alias("total_trips")
).show()

# Payment type distribution
print("\n💳 Payment Type Distribution:")
taxi_df.groupBy("payment_type").count().orderBy("payment_type").show()

# Passenger count distribution
print("\n👥 Passenger Count Distribution:")
taxi_df.groupBy("passenger_count").count().orderBy("passenger_count").show()

🔍 Data Quality Assessment:

❓ Missing Values per Column:
+---------+--------------------+---------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|vendor_id|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|rate_code_id|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|
+---------+--------------------+---------------------+---------------+-------------+----------------+---------------+------------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+------------+
|        0|                   0|                    0|              0|            0|               0|              0|           0|                 0|           

In [6]:
# Cell 4: Data Transformations and Feature Engineering
print("🧹 Data Cleaning and Feature Engineering...")

# First, let's properly cast the datetime columns
from pyspark.sql.functions import to_timestamp

# Convert string datetime to proper timestamp type
taxi_df_typed = taxi_df.withColumn(
    "pickup_datetime", to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss")
).withColumn(
    "dropoff_datetime", to_timestamp("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss")
)

# Clean the data (remove invalid trips)
cleaned_taxi_df = taxi_df_typed.filter(
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0) &
    (col("total_amount") > 0) &
    (col("passenger_count") > 0) &
    (col("passenger_count") <= 6) &
    (col("trip_distance") < 50) &
    (col("fare_amount") < 200) &
    (col("total_amount") < 300)
)

print(f"📊 Original records: {taxi_df.count():,}")
print(f"📊 After cleaning: {cleaned_taxi_df.count():,}")
print(f"📊 Removed: {taxi_df.count() - cleaned_taxi_df.count():,} records")

# Feature engineering
enriched_taxi_df = cleaned_taxi_df.withColumn(
    "pickup_hour", hour("pickup_datetime")
).withColumn(
    "pickup_day_of_week", dayofweek("pickup_datetime") 
).withColumn(
    "trip_duration_minutes",
    (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60
).withColumn(
    "tip_percentage",
    when(col("fare_amount") > 0, (col("tip_amount") / col("fare_amount")) * 100).otherwise(0)
).withColumn(
    "fare_per_mile",
    when(col("trip_distance") > 0, col("fare_amount") / col("trip_distance")).otherwise(0)
)

# Final filtering
final_taxi_df = enriched_taxi_df.filter(
    (col("trip_duration_minutes") > 1) &
    (col("trip_duration_minutes") < 120) &
    (col("fare_per_mile") > 0) &
    (col("fare_per_mile") < 50)
)

print(f"📊 Final dataset: {final_taxi_df.count():,} records")

# Cache for better performance
final_taxi_df.cache()
print("💾 DataFrame cached for better performance")

print("\n✅ Data transformation completed!")
print("🔍 New columns: pickup_hour, pickup_day_of_week, trip_duration_minutes, tip_percentage, fare_per_mile")

🧹 Data Cleaning and Feature Engineering...
📊 Original records: 10,000
📊 After cleaning: 9,573
📊 Removed: 427 records
📊 Final dataset: 9,410 records
💾 DataFrame cached for better performance

✅ Data transformation completed!
🔍 New columns: pickup_hour, pickup_day_of_week, trip_duration_minutes, tip_percentage, fare_per_mile


In [7]:
# Cell 5: Business Analytics with Spark SQL
print("🎯 Business Analytics with Spark SQL")

# Register DataFrame as SQL table
final_taxi_df.createOrReplaceTempView("taxi_trips")

# Analysis 1: Peak hours analysis
print("\n⏰ Peak Hours Analysis:")
peak_hours = spark.sql("""
    SELECT 
        pickup_hour,
        COUNT(*) as trip_count,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(total_amount), 2) as avg_fare,
        ROUND(AVG(tip_percentage), 2) as avg_tip_pct
    FROM taxi_trips 
    GROUP BY pickup_hour 
    ORDER BY pickup_hour
""")
peak_hours.show(24)

# Analysis 2: Day of week patterns  
print("\n📅 Day of Week Analysis:")
dow_analysis = spark.sql("""
    SELECT 
        CASE pickup_day_of_week
            WHEN 1 THEN 'Sunday'
            WHEN 2 THEN 'Monday' 
            WHEN 3 THEN 'Tuesday'
            WHEN 4 THEN 'Wednesday'
            WHEN 5 THEN 'Thursday'
            WHEN 6 THEN 'Friday'
            WHEN 7 THEN 'Saturday'
        END as day_name,
        COUNT(*) as trip_count,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(total_amount), 2) as avg_total
    FROM taxi_trips 
    GROUP BY pickup_day_of_week 
    ORDER BY pickup_day_of_week
""")
dow_analysis.show()

# Analysis 3: Distance categories
print("\n📏 Distance Analysis:")
distance_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN trip_distance <= 1 THEN 'Short (≤1 mi)'
            WHEN trip_distance <= 5 THEN 'Medium (1-5 mi)' 
            WHEN trip_distance <= 10 THEN 'Long (5-10 mi)'
            ELSE 'Very Long (>10 mi)'
        END as distance_category,
        COUNT(*) as trip_count,
        ROUND(AVG(trip_duration_minutes), 1) as avg_duration_min,
        ROUND(AVG(total_amount), 2) as avg_fare
    FROM taxi_trips
    GROUP BY 
        CASE 
            WHEN trip_distance <= 1 THEN 'Short (≤1 mi)'
            WHEN trip_distance <= 5 THEN 'Medium (1-5 mi)' 
            WHEN trip_distance <= 10 THEN 'Long (5-10 mi)'
            ELSE 'Very Long (>10 mi)'
        END
    ORDER BY avg_duration_min
""")
distance_analysis.show()

print("\n🎯 Key Insights Generated!")
print("📊 You've successfully processed 10,000+ records with Spark!")

🎯 Business Analytics with Spark SQL

⏰ Peak Hours Analysis:
+-----------+----------+------------+--------+-----------+
|pickup_hour|trip_count|avg_distance|avg_fare|avg_tip_pct|
+-----------+----------+------------+--------+-----------+
|          0|       395|        10.7|   49.01|       9.37|
|          1|       387|        10.5|   48.54|       9.15|
|          2|       377|        10.8|   49.25|       9.37|
|          3|       400|       10.29|   48.55|       9.88|
|          4|       396|       10.69|   51.03|       9.04|
|          5|       370|        10.5|   48.17|       8.91|
|          6|       414|       10.78|   50.11|       9.29|
|          7|       370|       10.01|   47.67|       9.41|
|          8|       392|       10.96|   51.03|       9.65|
|          9|       403|       10.59|   49.35|       9.07|
|         10|       384|       10.81|    49.8|      10.12|
|         11|       402|       10.46|    49.1|       8.91|
|         12|       380|       10.77|   49.39|       8.

In [8]:
# Geographic hot spots analysis
print("\n🗺 Geographic Analysis:")

# Popular pickup locations (rounded by coordinate)
pickup_hotspots = spark.sql("""
    SELECT
        ROUND(pickup_longitude, 3) AS pickup_lon_rounded,
        ROUND(pickup_latitude, 3) AS pickup_lat_rounded,
        COUNT(*) AS pickup_count,
        AVG(trip_distance) AS avg_trip_distance,
        AVG(total_amount) AS avg_fare
    FROM taxi_trips
    WHERE pickup_longitude BETWEEN -74.05 AND -73.75
      AND pickup_latitude BETWEEN 40.65 AND 40.85
    GROUP BY pickup_lon_rounded, pickup_lat_rounded
    HAVING pickup_count >= 100
    ORDER BY pickup_count DESC
    LIMIT 20
""")

print("🔝 Top Pickup Locations:")
pickup_hotspots.show()

# Airport trips analysis
airport_trips = spark.sql("""
    SELECT
        'JFK Airport' AS location,
        COUNT(*) AS trip_count,
        AVG(trip_distance) AS avg_distance,
        AVG(total_amount) AS avg_fare,
        AVG(trip_duration_minutes) AS avg_duration
    FROM taxi_trips
    WHERE (pickup_longitude BETWEEN -73.79 AND -73.76 AND pickup_latitude BETWEEN 40.64 AND 40.66)
       OR (dropoff_longitude BETWEEN -73.79 AND -73.76 AND dropoff_latitude BETWEEN 40.64 AND 40.66)
    UNION ALL
    SELECT
        'LGA Airport' AS location,
        COUNT(*) AS trip_count,
        AVG(trip_distance) AS avg_distance,
        AVG(total_amount) AS avg_fare,
        AVG(trip_duration_minutes) AS avg_duration
    FROM taxi_trips
    WHERE (pickup_longitude BETWEEN -73.89 AND -73.85 AND pickup_latitude BETWEEN 40.76 AND 40.78)
       OR (dropoff_longitude BETWEEN -73.89 AND -73.85 AND dropoff_latitude BETWEEN 40.76 AND 40.78)
""")

print("\n✈ Airport Trip Analysis:")
airport_trips.show()



🗺 Geographic Analysis:
🔝 Top Pickup Locations:
+------------------+------------------+------------+-----------------+--------+
|pickup_lon_rounded|pickup_lat_rounded|pickup_count|avg_trip_distance|avg_fare|
+------------------+------------------+------------+-----------------+--------+
+------------------+------------------+------------+-----------------+--------+


✈ Airport Trip Analysis:
+-----------+----------+------------------+------------------+------------------+
|   location|trip_count|      avg_distance|          avg_fare|      avg_duration|
+-----------+----------+------------------+------------------+------------------+
|JFK Airport|        72|10.072083333333335|46.365555555555574|            28.625|
|LGA Airport|       221|10.614615384615378|49.093755656108605|30.176470588235293|
+-----------+----------+------------------+------------------+------------------+



In [9]:
# Performance optimization techniques
print("🚀 Performance Optimization Techniques")

# 1. Partitioning analysis
print(f"\n📦 Current Partitions: {final_taxi_df.rdd.getNumPartitions()}")

# Check partition sizes
partition_info = spark.sql("""
    SELECT spark_partition_id() AS partition_id, COUNT(*) AS records_per_partition
    FROM taxi_trips
    GROUP BY spark_partition_id()
    ORDER BY spark_partition_id()
""")

print("📊 Records per Partition:")
partition_info.show()

# 2. Repartitioning for better performance
print("\n🔄 Optimizing Partitions...")
# Repartition based on pickup_hour for time-based analysis
optimized_df = final_taxi_df.repartition(8, "pickup_hour").cache()

print(f"📦 New Partitions: {optimized_df.rdd.getNumPartitions()}")

# 3. Broadcast join optimization example
print("\n📡 Broadcast Join Example:")

# Create a small lookup table for payment types
payment_types = spark.createDataFrame(
    [
        (1, "Credit Card"),
        (2, "Cash"),
        (3, "No Charge"),
        (4, "Dispute"),
        (5, "Unknown"),
        (6, "Voided Trip")
    ],
    ["payment_type", "payment_method"]
)

from pyspark.sql.functions import broadcast
enriched_df = optimized_df.join(
    broadcast(payment_types),
    on="payment_type",
    how="left"
)

print("✅ Payment method mapping applied with broadcast join")

# 4. Aggregation with window functions
print("\n📈 Advanced Window Functions:")

from pyspark.sql.functions import avg, row_number, desc
from pyspark.sql.window import Window

# Calculate running averages and rankings
window_spec = Window.partitionBy("pickup_hour").orderBy("tpep_pickup_datetime")

windowed_analysis = (
    optimized_df
    .withColumn(
        "running_avg_fare",
        avg("total_amount").over(window_spec.rowsBetween(-100, 0))
    )
    .withColumn(
        "trip_rank_in_hour",
        row_number().over(Window.partitionBy("pickup_hour").orderBy(desc("total_amount")))
    )
)

print("✅ Window functions applied for running averages and rankings")


🚀 Performance Optimization Techniques

📦 Current Partitions: 1
📊 Records per Partition:
+------------+---------------------+
|partition_id|records_per_partition|
+------------+---------------------+
|           0|                 9410|
+------------+---------------------+


🔄 Optimizing Partitions...
📦 New Partitions: 8

📡 Broadcast Join Example:
✅ Payment method mapping applied with broadcast join

📈 Advanced Window Functions:
✅ Window functions applied for running averages and rankings
