# NYC Taxi Data Analysis with PySpark

This notebook provides comprehensive big data analysis of NYC TLC Trip Record Data using **Apache Spark**.

**Data Source**: [NYC Taxi and Limousine Commission](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

**Technologies Used**:
- **PySpark**: Distributed data processing
- **Spark SQL**: Large-scale SQL analytics
- **MLlib**: Machine learning at scale

**Analysis Includes**:
- Scalable data processing and ETL
- Distributed analytics and aggregations
- Advanced SQL queries on big data
- Performance optimization techniques

In [None]:
# Import required libraries
import sys
import os
sys.path.append('../scripts')

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# Import our Spark setup utilities
from spark_setup import (
    create_spark_session,
    configure_spark_for_taxi_data,
    print_spark_ui_info,
    cache_dataframe,
    create_temp_views
)

# Configure matplotlib
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('viridis')

print("🚀 NYC Taxi Big Data Analysis with PySpark")
print("=" * 60)

In [None]:
# Initialize Spark Session
print("🔧 Initializing Spark Session...")

# Create Spark session with optimized configuration
spark = create_spark_session(
    app_name="NYC_Taxi_Analytics",
    memory="6g",  # Adjust based on your system
    cores="*"
)

# Configure for taxi data analysis
configure_spark_for_taxi_data(spark)

# Print Spark UI information
print_spark_ui_info(spark)

print("\n✅ Spark initialization complete!")

## 1. Data Loading and Initial Processing

In [None]:
# Access NYC TLC data from Spark tables with nw_taxi namespace
print("📂 Accessing NYC TLC Trip Data from Spark Tables...")

# Check available tables in Spark catalog
try:
    available_tables = spark.sql("SHOW TABLES").collect()
    print("Available Spark tables:")
    for table in available_tables:
        print(f"  - {table.tableName}")
except Exception as e:
    print(f"Note: {e}")
    print("Checking for default database tables...")

# Try to access trip data from Spark table with namespace
try:
    print("\nAccessing trip data from Spark table...")
    trips_df = spark.sql("SELECT * FROM nw_taxi.trips")
    trip_count = spark.sql("SELECT COUNT(*) as count FROM nw_taxi.trips").collect()[0]['count']
    print(f"✅ Accessed trip data: {trip_count:,} trips")
except Exception as e:
    print(f"❌ Could not access 'nw_taxi.trips' table: {e}")
    print("Creating temporary table from sample data...")
    # Fallback: create sample data if table doesn't exist
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
    sample_schema = StructType([
        StructField("trip_id", IntegerType(), True),
        StructField("trip_type", StringType(), True),
        StructField("pickup_datetime", TimestampType(), True),
        StructField("dropoff_datetime", TimestampType(), True),
        StructField("pickup_location_id", IntegerType(), True),
        StructField("dropoff_location_id", IntegerType(), True),
        StructField("fare_amount", DoubleType(), True),
        StructField("total_amount", DoubleType(), True),
        StructField("tip_amount", DoubleType(), True),
        StructField("trip_distance", DoubleType(), True),
        StructField("payment_type", IntegerType(), True)
    ])
    trips_df = spark.createDataFrame([], sample_schema)
    trips_df.createOrReplaceTempView("trips")
    trip_count = 0

# Try to access zone lookup data from Spark table with namespace
try:
    print("Accessing zone lookup data from Spark table...")
    zones_df = spark.sql("SELECT * FROM nw_taxi.zones")
    zone_count = spark.sql("SELECT COUNT(*) as count FROM nw_taxi.zones").collect()[0]['count']
    print(f"✅ Accessed zone data: {zone_count:,} zones")
except Exception as e:
    print(f"❌ Could not access 'nw_taxi.zones' table: {e}")
    print("Creating temporary zones table...")
    # Fallback: create sample zones if table doesn't exist
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    zones_schema = StructType([
        StructField("location_id", IntegerType(), True),
        StructField("Borough", StringType(), True),
        StructField("Zone", StringType(), True),
        StructField("service_zone", StringType(), True)
    ])
    zones_df = spark.createDataFrame([], zones_schema)
    zones_df.createOrReplaceTempView("zones")
    zone_count = 0

# Alternative: Check if data exists in different table names with namespace
if trip_count == 0:
    print("\n🔍 Searching for alternative table names...")
    alternative_tables = ['nw_taxi.taxi_trips', 'nw_taxi.nyc_trips', 'nw_taxi.trip_data', 'nw_taxi.yellow_trips', 'nw_taxi.green_trips']
    
    for table_name in alternative_tables:
        try:
            test_df = spark.sql(f"SELECT COUNT(*) as count FROM {table_name}")
            count = test_df.collect()[0]['count']
            if count > 0:
                print(f"✅ Found data in table '{table_name}': {count:,} records")
                trips_df = spark.sql(f"SELECT * FROM {table_name}")
                trips_df.createOrReplaceTempView("trips")
                trip_count = count
                break
        except:
            continue

if trip_count > 0:
    # Cache the DataFrames for better performance
    trips_df = cache_dataframe(trips_df, "MEMORY_AND_DISK")
    zones_df = cache_dataframe(zones_df, "MEMORY_ONLY")
    print("💾 Data cached for optimized performance")
else:
    print("⚠️ No trip data found in Spark tables. Please ensure data is loaded into Spark first.")
    print("   You can load data using: python scripts/spark_data_processor.py")

In [None]:
# Examine data schema and structure
print("📋 Trip Data Schema:")
trips_df.printSchema()

print("\n📋 Zone Data Schema:")
zones_df.printSchema()

# Show sample data
print("\n📊 Sample Trip Data:")
trips_df.show(5, truncate=False)

print("\n📍 Sample Zone Data:")
zones_df.show(5, truncate=False)

In [None]:
# Data quality assessment using Spark
print("🔍 Data Quality Assessment")
print("=" * 40)

# Basic statistics
total_trips = trips_df.count()
date_range = trips_df.select(
    min("pickup_datetime").alias("min_date"),
    max("pickup_datetime").alias("max_date")
).collect()[0]

print(f"📊 Dataset Overview:")
print(f"   Total trips: {total_trips:,}")
print(f"   Date range: {date_range['min_date']} to {date_range['max_date']}")

# Trip type distribution
trip_type_dist = trips_df.groupBy("trip_type").count().orderBy(desc("count"))
print(f"\n🚖 Trip Type Distribution:")
trip_type_dist.show()

# Data quality checks
print("\n🔍 Data Quality Checks:")

# Check for nulls in critical columns
critical_columns = ["pickup_datetime", "dropoff_datetime", "pickup_location_id", 
                   "dropoff_location_id", "trip_distance", "fare_amount", "total_amount"]

for col in critical_columns:
    null_count = trips_df.filter(trips_df[col].isNull()).count()
    null_pct = (null_count / total_trips) * 100
    print(f"   {col}: {null_count:,} nulls ({null_pct:.2f}%)")

# Basic trip statistics
trip_stats = trips_df.select(
    avg("trip_distance").alias("avg_distance"),
    avg("fare_amount").alias("avg_fare"),
    avg("total_amount").alias("avg_total"),
    avg("tip_amount").alias("avg_tip")
).collect()[0]

print(f"\n📈 Trip Statistics:")
print(f"   Average distance: {trip_stats['avg_distance']:.2f} miles")
print(f"   Average fare: ${trip_stats['avg_fare']:.2f}")
print(f"   Average total: ${trip_stats['avg_total']:.2f}")
print(f"   Average tip: ${trip_stats['avg_tip']:.2f}")

## 2. Data Engineering - Feature Creation

In [None]:
# Create derived features using Spark SQL functions
print("🔧 Creating derived features...")

# Add time-based features
trips_enriched = trips_df.withColumn("pickup_hour", hour("pickup_datetime")) \
    .withColumn("pickup_day_of_week", dayofweek("pickup_datetime")) \
    .withColumn("pickup_month", month("pickup_datetime")) \
    .withColumn("pickup_year", year("pickup_datetime")) \
    .withColumn("pickup_date", to_date("pickup_datetime")) \
    .withColumn("is_weekend", when(dayofweek("pickup_datetime").isin([1, 7]), True).otherwise(False))

# Add trip duration in minutes
trips_enriched = trips_enriched.withColumn(
    "trip_duration_minutes",
    (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60
)

# Add trip speed (mph)
trips_enriched = trips_enriched.withColumn(
    "trip_speed_mph",
    when(col("trip_duration_minutes") > 0, 
         col("trip_distance") / (col("trip_duration_minutes") / 60))
    .otherwise(0)
)

# Add tip percentage
trips_enriched = trips_enriched.withColumn(
    "tip_percentage",
    when(col("fare_amount") > 0, 
         (col("tip_amount") / col("fare_amount")) * 100)
    .otherwise(0)
)

# Add fare per mile
trips_enriched = trips_enriched.withColumn(
    "fare_per_mile",
    when(col("trip_distance") > 0, 
         col("fare_amount") / col("trip_distance"))
    .otherwise(0)
)

# Add time period classification
trips_enriched = trips_enriched.withColumn(
    "time_period",
    when((col("pickup_hour") >= 7) & (col("pickup_hour") <= 9), "Morning Rush")
    .when((col("pickup_hour") >= 17) & (col("pickup_hour") <= 19), "Evening Rush")
    .when((col("pickup_hour") >= 10) & (col("pickup_hour") <= 16), "Daytime")
    .when((col("pickup_hour") >= 20) & (col("pickup_hour") <= 23), "Evening")
    .otherwise("Late Night/Early Morning")
)

# Add day name
trips_enriched = trips_enriched.withColumn(
    "day_name",
    when(col("pickup_day_of_week") == 1, "Sunday")
    .when(col("pickup_day_of_week") == 2, "Monday")
    .when(col("pickup_day_of_week") == 3, "Tuesday")
    .when(col("pickup_day_of_week") == 4, "Wednesday")
    .when(col("pickup_day_of_week") == 5, "Thursday")
    .when(col("pickup_day_of_week") == 6, "Friday")
    .when(col("pickup_day_of_week") == 7, "Saturday")
    .otherwise("Unknown")
)

# Add payment method description
trips_enriched = trips_enriched.withColumn(
    "payment_method",
    when(col("payment_type") == 1, "Credit Card")
    .when(col("payment_type") == 2, "Cash")
    .when(col("payment_type") == 3, "No Charge")
    .when(col("payment_type") == 4, "Dispute")
    .when(col("payment_type") == 5, "Unknown")
    .when(col("payment_type") == 6, "Voided Trip")
    .otherwise("Other")
)

# Filter for valid trips
valid_trips = trips_enriched.filter(
    (col("pickup_datetime") < col("dropoff_datetime")) &
    (col("trip_distance") > 0) &
    (col("trip_distance") < 100) &
    (col("fare_amount") > 0) &
    (col("total_amount") > 0) &
    (col("pickup_location_id").isNotNull()) &
    (col("dropoff_location_id").isNotNull()) &
    (col("trip_speed_mph") >= 1) &
    (col("trip_speed_mph") <= 80)
)

# Cache the enriched dataset
valid_trips = cache_dataframe(valid_trips, "MEMORY_AND_DISK")

print(f"✅ Feature engineering complete")
print(f"📊 Valid trips after filtering: {valid_trips.count():,}")
print(f"🔧 Added {len(trips_enriched.columns) - len(trips_df.columns)} new features")

In [None]:
# Create temporary views for SQL analysis with nw_taxi namespace
# Create database if it doesn't exist
spark.sql("CREATE DATABASE IF NOT EXISTS nw_taxi")

# Register temporary views with namespace
valid_trips.createGlobalTempView("nw_taxi.trips")
zones_df.createGlobalTempView("nw_taxi.zones")
print("📊 Created global temp views: nw_taxi.trips, nw_taxi.zones")

# Also create a joined view with zone information
trips_with_zones = valid_trips.alias("t") \
    .join(zones_df.alias("pz"), col("t.pickup_location_id") == col("pz.location_id"), "left") \
    .select(
        col("t.*"),
        col("pz.Borough").alias("pickup_borough"),
        col("pz.Zone").alias("pickup_zone")
    ).join(zones_df.alias("dz"), col("t.dropoff_location_id") == col("dz.location_id"), "left") \
    .select(
        col("t.*"),
        col("pickup_borough"),
        col("pickup_zone"),
        col("dz.Borough").alias("dropoff_borough"),
        col("dz.Zone").alias("dropoff_zone")
    )

trips_with_zones.createGlobalTempView("nw_taxi.trips_with_zones")
print("📊 Created global temp view: nw_taxi.trips_with_zones")

# Show sample of enriched data
print("\n📊 Sample enriched data:")
valid_trips.select(
    "pickup_datetime", "trip_type", "pickup_hour", "day_name", 
    "time_period", "trip_distance", "trip_duration_minutes", 
    "trip_speed_mph", "fare_amount", "tip_percentage", "payment_method"
).show(5)

## 3. Temporal Analysis with Spark SQL

In [None]:
# Temporal analysis using Spark SQL
print("⏰ Temporal Demand Analysis")
print("=" * 40)

# Hourly demand patterns
hourly_demand = spark.sql("""
    SELECT 
        pickup_hour,
        trip_type,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(trip_distance) as avg_distance,
        AVG(tip_percentage) as avg_tip_pct,
        AVG(trip_speed_mph) as avg_speed
    FROM nw_taxi.trips
    GROUP BY pickup_hour, trip_type
    ORDER BY pickup_hour, trip_type
""")

print("📊 Hourly Demand Pattern:")
hourly_demand.show(24)

# Daily patterns
daily_patterns = spark.sql("""
    SELECT 
        day_name,
        trip_type,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(trip_distance) as avg_distance,
        SUM(total_amount) as total_revenue
    FROM nw_taxi.trips
    GROUP BY day_name, trip_type
    ORDER BY 
        CASE day_name
            WHEN 'Monday' THEN 1
            WHEN 'Tuesday' THEN 2
            WHEN 'Wednesday' THEN 3
            WHEN 'Thursday' THEN 4
            WHEN 'Friday' THEN 5
            WHEN 'Saturday' THEN 6
            WHEN 'Sunday' THEN 7
        END,
        trip_type
""")

print("\n📅 Daily Demand Pattern:")
daily_patterns.show()

# Peak hours identification
peak_hours = spark.sql("""
    SELECT 
        pickup_hour,
        SUM(CASE WHEN trip_type = 'yellow' THEN 1 ELSE 0 END) as yellow_trips,
        SUM(CASE WHEN trip_type = 'green' THEN 1 ELSE 0 END) as green_trips,
        COUNT(*) as total_trips,
        AVG(fare_amount) as avg_fare
    FROM nw_taxi.trips
    GROUP BY pickup_hour
    ORDER BY total_trips DESC
    LIMIT 10
""")

print("\n🕐 Top 10 Peak Hours:")
peak_hours.show()

In [None]:
# Convert Spark DataFrames to Pandas for visualization
print("📊 Creating temporal visualizations...")

# Convert hourly data to Pandas for plotting
hourly_pandas = hourly_demand.toPandas()
daily_pandas = daily_patterns.toPandas()

# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(16, 10))

# Hourly trip volume by trip type
for trip_type in hourly_pandas['trip_type'].unique():
    data = hourly_pandas[hourly_pandas['trip_type'] == trip_type]
    axes[0, 0].plot(data['pickup_hour'], data['trip_count'], 
                   marker='o', linewidth=2, label=f'{trip_type.title()} Taxi')

axes[0, 0].set_xlabel('Hour of Day')
axes[0, 0].set_ylabel('Number of Trips')
axes[0, 0].set_title('Hourly Trip Volume by Taxi Type')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Daily trip volume
daily_pivot = daily_pandas.pivot(index='day_name', columns='trip_type', values='trip_count')
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
daily_pivot.reindex(day_order).plot(kind='bar', ax=axes[0, 1], color=['gold', 'green'])
axes[0, 1].set_xlabel('Day of Week')
axes[0, 1].set_ylabel('Number of Trips')
axes[0, 1].set_title('Daily Trip Volume by Taxi Type')
axes[0, 1].tick_params(axis='x', rotation=45)
axes[0, 1].legend(title='Taxi Type')

# Hourly average speed
for trip_type in hourly_pandas['trip_type'].unique():
    data = hourly_pandas[hourly_pandas['trip_type'] == trip_type]
    axes[1, 0].plot(data['pickup_hour'], data['avg_speed'], 
                   marker='s', linewidth=2, label=f'{trip_type.title()} Taxi')

axes[1, 0].set_xlabel('Hour of Day')
axes[1, 0].set_ylabel('Average Speed (mph)')
axes[1, 0].set_title('Average Trip Speed by Hour')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Daily revenue
daily_revenue = daily_pandas.pivot(index='day_name', columns='trip_type', values='total_revenue')
daily_revenue.reindex(day_order).plot(kind='bar', ax=axes[1, 1], color=['gold', 'green'])
axes[1, 1].set_xlabel('Day of Week')
axes[1, 1].set_ylabel('Total Revenue ($)')
axes[1, 1].set_title('Daily Revenue by Taxi Type')
axes[1, 1].tick_params(axis='x', rotation=45)
axes[1, 1].legend(title='Taxi Type')

plt.tight_layout()
plt.show()

print("✅ Temporal analysis visualizations complete")

## 4. Geographic Analysis with Distributed Processing

In [None]:
# Geographic analysis using Spark SQL
print("🗺️ Geographic Analysis")
print("=" * 40)

# Borough-level analysis
borough_stats = spark.sql("""
    SELECT 
        pickup_borough,
        trip_type,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(trip_distance) as avg_distance,
        AVG(tip_percentage) as avg_tip_pct,
        SUM(total_amount) as total_revenue,
        AVG(trip_speed_mph) as avg_speed
    FROM nw_taxi.trips_with_zones
    WHERE pickup_borough IS NOT NULL
    GROUP BY pickup_borough, trip_type
    ORDER BY trip_count DESC
""")

print("🏙️ Borough Statistics:")
borough_stats.show(truncate=False)

# Top pickup zones
top_pickup_zones = spark.sql("""
    SELECT 
        pickup_zone,
        pickup_borough,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        SUM(total_amount) as total_revenue,
        AVG(trip_distance) as avg_distance
    FROM nw_taxi.trips_with_zones
    WHERE pickup_zone IS NOT NULL
    GROUP BY pickup_zone, pickup_borough
    ORDER BY trip_count DESC
    LIMIT 20
""")

print("\n🏆 Top 20 Pickup Zones:")
top_pickup_zones.show(truncate=False)

# Borough-to-borough flow analysis
borough_flows = spark.sql("""
    SELECT 
        pickup_borough,
        dropoff_borough,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(trip_distance) as avg_distance,
        AVG(trip_duration_minutes) as avg_duration,
        SUM(total_amount) as total_revenue
    FROM nw_taxi.trips_with_zones
    WHERE pickup_borough IS NOT NULL AND dropoff_borough IS NOT NULL
    GROUP BY pickup_borough, dropoff_borough
    ORDER BY trip_count DESC
    LIMIT 15
""")

print("\n🔄 Top Borough-to-Borough Flows:")
borough_flows.show(truncate=False)

In [None]:
# Airport analysis
airport_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN pickup_zone LIKE '%Airport%' OR pickup_zone LIKE '%JFK%' 
                 OR pickup_zone LIKE '%LaGuardia%' OR pickup_zone LIKE '%LGA%'
                 OR pickup_zone LIKE '%Newark%' THEN 'From Airport'
            WHEN dropoff_zone LIKE '%Airport%' OR dropoff_zone LIKE '%JFK%' 
                 OR dropoff_zone LIKE '%LaGuardia%' OR dropoff_zone LIKE '%LGA%'
                 OR dropoff_zone LIKE '%Newark%' THEN 'To Airport'
            ELSE 'Non-Airport'
        END as trip_category,
        trip_type,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(total_amount) as avg_total,
        AVG(trip_distance) as avg_distance,
        AVG(trip_duration_minutes) as avg_duration,
        AVG(tip_percentage) as avg_tip_pct
    FROM nw_taxi.trips_with_zones
    GROUP BY trip_category, trip_type
    ORDER BY trip_count DESC
""")

print("\n✈️ Airport vs Non-Airport Trips:")
airport_analysis.show(truncate=False)

# Geographic visualization data
borough_viz_data = borough_stats.toPandas()
zone_viz_data = top_pickup_zones.limit(10).toPandas()

# Create geographic visualizations
fig, axes = plt.subplots(2, 2, figsize=(16, 10))

# Borough trip distribution
borough_trips = borough_viz_data.groupby('pickup_borough')['trip_count'].sum().sort_values(ascending=False)
borough_trips.plot(kind='bar', ax=axes[0, 0], color='steelblue')
axes[0, 0].set_xlabel('Borough')
axes[0, 0].set_ylabel('Number of Trips')
axes[0, 0].set_title('Trip Volume by Pickup Borough')
axes[0, 0].tick_params(axis='x', rotation=45)

# Borough revenue distribution
borough_revenue = borough_viz_data.groupby('pickup_borough')['total_revenue'].sum().sort_values(ascending=False)
borough_revenue.plot(kind='bar', ax=axes[0, 1], color='green')
axes[0, 1].set_xlabel('Borough')
axes[0, 1].set_ylabel('Total Revenue ($)')
axes[0, 1].set_title('Revenue by Pickup Borough')
axes[0, 1].tick_params(axis='x', rotation=45)

# Top pickup zones
zone_labels = [f"{zone[:20]}..." if len(zone) > 20 else zone for zone in zone_viz_data['pickup_zone']]
axes[1, 0].barh(range(len(zone_viz_data)), zone_viz_data['trip_count'], color='orange')
axes[1, 0].set_yticks(range(len(zone_viz_data)))
axes[1, 0].set_yticklabels(zone_labels, fontsize=8)
axes[1, 0].set_xlabel('Number of Trips')
axes[1, 0].set_title('Top 10 Pickup Zones')

# Trip type distribution by borough
borough_trip_type = borough_viz_data.pivot(index='pickup_borough', columns='trip_type', values='trip_count').fillna(0)
borough_trip_type.plot(kind='bar', ax=axes[1, 1], color=['gold', 'green'], stacked=True)
axes[1, 1].set_xlabel('Borough')
axes[1, 1].set_ylabel('Number of Trips')
axes[1, 1].set_title('Trip Type Distribution by Borough')
axes[1, 1].tick_params(axis='x', rotation=45)
axes[1, 1].legend(title='Taxi Type')

plt.tight_layout()
plt.show()

print("✅ Geographic analysis visualizations complete")

## 5. Payment and Pricing Analysis at Scale

In [None]:
# Payment and pricing analysis using Spark SQL
print("💳 Payment and Pricing Analysis")
print("=" * 40)

# Payment method analysis
payment_stats = spark.sql("""
    SELECT 
        payment_method,
        trip_type,
        COUNT(*) as trip_count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY trip_type), 2) as pct_of_type,
        AVG(fare_amount) as avg_fare,
        AVG(tip_amount) as avg_tip,
        AVG(tip_percentage) as avg_tip_pct,
        PERCENTILE_APPROX(tip_percentage, 0.5) as median_tip_pct,
        SUM(total_amount) as total_revenue
    FROM nw_taxi.trips
    GROUP BY payment_method, trip_type
    ORDER BY trip_type, trip_count DESC
""")

print("💳 Payment Method Analysis:")
payment_stats.show(truncate=False)

# Fare analysis by distance bands
distance_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN trip_distance <= 1 THEN '0-1 miles'
            WHEN trip_distance <= 2 THEN '1-2 miles'
            WHEN trip_distance <= 5 THEN '2-5 miles'
            WHEN trip_distance <= 10 THEN '5-10 miles'
            WHEN trip_distance <= 20 THEN '10-20 miles'
            ELSE '20+ miles'
        END as distance_band,
        trip_type,
        COUNT(*) as trip_count,
        AVG(trip_distance) as avg_distance,
        AVG(fare_amount) as avg_fare,
        AVG(fare_per_mile) as avg_fare_per_mile,
        AVG(total_amount) as avg_total,
        AVG(trip_duration_minutes) as avg_duration,
        AVG(trip_speed_mph) as avg_speed,
        PERCENTILE_APPROX(fare_per_mile, 0.5) as median_fare_per_mile
    FROM nw_taxi.trips
    GROUP BY distance_band, trip_type
    ORDER BY trip_type, 
             CASE distance_band
                 WHEN '0-1 miles' THEN 1
                 WHEN '1-2 miles' THEN 2
                 WHEN '2-5 miles' THEN 3
                 WHEN '5-10 miles' THEN 4
                 WHEN '10-20 miles' THEN 5
                 ELSE 6
             END
""")

print("\n📏 Fare Analysis by Distance Bands:")
distance_analysis.show(truncate=False)

# Tip analysis by time period
tip_by_time = spark.sql("""
    SELECT 
        time_period,
        trip_type,
        COUNT(*) as trip_count,
        AVG(tip_amount) as avg_tip_amount,
        AVG(tip_percentage) as avg_tip_pct,
        PERCENTILE_APPROX(tip_percentage, 0.5) as median_tip_pct,
        PERCENTILE_APPROX(tip_percentage, 0.95) as p95_tip_pct
    FROM nw_taxi.trips
    WHERE payment_method = 'Credit Card'  -- Focus on credit card tips
    GROUP BY time_period, trip_type
    ORDER BY avg_tip_pct DESC
""")

print("\n💰 Tip Analysis by Time Period (Credit Card Only):")
tip_by_time.show(truncate=False)

In [None]:
# Create payment and pricing visualizations
print("📊 Creating payment and pricing visualizations...")

# Convert to Pandas for visualization
payment_pandas = payment_stats.toPandas()
distance_pandas = distance_analysis.toPandas()
tip_pandas = tip_by_time.toPandas()

# Create visualizations
fig, axes = plt.subplots(2, 3, figsize=(18, 10))

# Payment method distribution (overall)
payment_dist = payment_pandas.groupby('payment_method')['trip_count'].sum().sort_values(ascending=False)
axes[0, 0].pie(payment_dist.values, labels=payment_dist.index, autopct='%1.1f%%', startangle=90)
axes[0, 0].set_title('Payment Method Distribution')

# Average tip by payment method
tip_by_payment = payment_pandas.groupby('payment_method')['avg_tip_pct'].mean().sort_values(ascending=False)
tip_by_payment.plot(kind='bar', ax=axes[0, 1], color='gold')
axes[0, 1].set_xlabel('Payment Method')
axes[0, 1].set_ylabel('Average Tip Percentage (%)')
axes[0, 1].set_title('Average Tip % by Payment Method')
axes[0, 1].tick_params(axis='x', rotation=45)

# Fare per mile by distance band
distance_viz = distance_pandas[distance_pandas['trip_type'] == 'yellow']
axes[0, 2].bar(distance_viz['distance_band'], distance_viz['avg_fare_per_mile'], color='steelblue')
axes[0, 2].set_xlabel('Distance Band')
axes[0, 2].set_ylabel('Average Fare per Mile ($)')
axes[0, 2].set_title('Fare per Mile by Distance (Yellow Taxis)')
axes[0, 2].tick_params(axis='x', rotation=45)

# Trip speed by distance band
for trip_type in distance_pandas['trip_type'].unique():
    data = distance_pandas[distance_pandas['trip_type'] == trip_type]
    axes[1, 0].plot(data['distance_band'], data['avg_speed'], 
                   marker='o', linewidth=2, label=f'{trip_type.title()} Taxi')

axes[1, 0].set_xlabel('Distance Band')
axes[1, 0].set_ylabel('Average Speed (mph)')
axes[1, 0].set_title('Average Speed by Distance Band')
axes[1, 0].tick_params(axis='x', rotation=45)
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Tips by time period
tip_viz = tip_pandas[tip_pandas['trip_type'] == 'yellow']
axes[1, 1].bar(tip_viz['time_period'], tip_viz['avg_tip_pct'], color='green')
axes[1, 1].set_xlabel('Time Period')
axes[1, 1].set_ylabel('Average Tip Percentage (%)')
axes[1, 1].set_title('Tips by Time Period (Yellow Taxis)')
axes[1, 1].tick_params(axis='x', rotation=45)

# Revenue by trip type and payment method
revenue_by_payment = payment_pandas.pivot_table(
    values='total_revenue', 
    index='payment_method', 
    columns='trip_type', 
    aggfunc='sum',
    fill_value=0
)
revenue_by_payment.plot(kind='bar', ax=axes[1, 2], color=['gold', 'green'])
axes[1, 2].set_xlabel('Payment Method')
axes[1, 2].set_ylabel('Total Revenue ($)')
axes[1, 2].set_title('Revenue by Payment Method and Trip Type')
axes[1, 2].tick_params(axis='x', rotation=45)
axes[1, 2].legend(title='Taxi Type')

plt.tight_layout()
plt.show()

print("✅ Payment and pricing analysis visualizations complete")

## 6. Advanced Analytics with Spark SQL

In [None]:
# Advanced analytics using window functions and complex aggregations
print("🔬 Advanced Analytics with Spark SQL")
print("=" * 40)

# Monthly trends with growth calculation
monthly_trends = spark.sql("""
    WITH monthly_stats AS (
        SELECT 
            pickup_year,
            pickup_month,
            trip_type,
            COUNT(*) as total_trips,
            SUM(total_amount) as total_revenue,
            AVG(fare_amount) as avg_fare,
            AVG(trip_distance) as avg_distance
        FROM nw_taxi.trips
        GROUP BY pickup_year, pickup_month, trip_type
    )
    SELECT 
        pickup_year,
        pickup_month,
        trip_type,
        total_trips,
        total_revenue,
        avg_fare,
        avg_distance,
        LAG(total_trips) OVER (
            PARTITION BY trip_type 
            ORDER BY pickup_year, pickup_month
        ) as prev_month_trips,
        ROUND(
            (total_trips - LAG(total_trips) OVER (
                PARTITION BY trip_type 
                ORDER BY pickup_year, pickup_month
            )) * 100.0 / LAG(total_trips) OVER (
                PARTITION BY trip_type 
                ORDER BY pickup_year, pickup_month
            ), 2
        ) as month_over_month_growth_pct
    FROM monthly_stats
    ORDER BY pickup_year, pickup_month, trip_type
""")

print("📈 Monthly Trends with Growth:")
monthly_trends.show(truncate=False)

# Percentile analysis for fare distribution
fare_percentiles = spark.sql("""
    SELECT 
        trip_type,
        COUNT(*) as total_trips,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.1), 2) as p10_fare,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.25), 2) as p25_fare,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.5), 2) as median_fare,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.75), 2) as p75_fare,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.9), 2) as p90_fare,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.95), 2) as p95_fare,
        ROUND(PERCENTILE_APPROX(fare_amount, 0.99), 2) as p99_fare,
        ROUND(AVG(fare_amount), 2) as mean_fare,
        ROUND(STDDEV(fare_amount), 2) as stddev_fare
    FROM nw_taxi.trips
    GROUP BY trip_type
    ORDER BY trip_type
""")

print("\n📊 Fare Distribution Percentiles:")
fare_percentiles.show(truncate=False)

# Rush hour vs non-rush hour comparison
rush_hour_analysis = spark.sql("""
    SELECT 
        CASE 
            WHEN pickup_hour IN (7, 8, 9, 17, 18, 19) AND NOT is_weekend THEN 'Rush Hour'
            ELSE 'Non-Rush Hour'
        END as period_type,
        trip_type,
        COUNT(*) as trip_count,
        AVG(fare_amount) as avg_fare,
        AVG(trip_distance) as avg_distance,
        AVG(trip_duration_minutes) as avg_duration,
        AVG(trip_speed_mph) as avg_speed,
        AVG(tip_percentage) as avg_tip_pct,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY trip_type), 2) as pct_of_total
    FROM nw_taxi.trips
    GROUP BY period_type, trip_type
    ORDER BY trip_type, period_type
""")

print("\n🚦 Rush Hour vs Non-Rush Hour Analysis:")
rush_hour_analysis.show(truncate=False)

In [None]:
# Performance optimization analysis
print("⚡ Performance and Efficiency Analysis")
print("=" * 40)

# Speed analysis by borough and hour
speed_analysis = spark.sql("""
    SELECT 
        pickup_hour,
        pickup_borough,
        trip_type,
        COUNT(*) as trip_count,
        ROUND(AVG(trip_speed_mph), 2) as avg_speed,
        ROUND(PERCENTILE_APPROX(trip_speed_mph, 0.5), 2) as median_speed,
        ROUND(PERCENTILE_APPROX(trip_speed_mph, 0.25), 2) as q1_speed,
        ROUND(PERCENTILE_APPROX(trip_speed_mph, 0.75), 2) as q3_speed,
        ROUND(AVG(trip_duration_minutes), 2) as avg_duration
    FROM nw_taxi.trips_with_zones
    WHERE pickup_borough IS NOT NULL 
      AND trip_speed_mph BETWEEN 1 AND 60
    GROUP BY pickup_hour, pickup_borough, trip_type
    HAVING COUNT(*) >= 100
    ORDER BY pickup_borough, pickup_hour, trip_type
""")

print("🏃 Speed Analysis by Borough and Hour:")
speed_analysis.filter(col("pickup_borough") == "Manhattan").show(20, truncate=False)

# Efficiency ranking by zone
zone_efficiency = spark.sql("""
    WITH zone_metrics AS (
        SELECT 
            pickup_zone,
            pickup_borough,
            COUNT(*) as trip_count,
            AVG(trip_speed_mph) as avg_speed,
            AVG(fare_per_mile) as avg_fare_per_mile,
            AVG(tip_percentage) as avg_tip_pct,
            SUM(total_amount) as total_revenue
        FROM nw_taxi.trips_with_zones
        WHERE pickup_zone IS NOT NULL
          AND trip_speed_mph BETWEEN 1 AND 60
        GROUP BY pickup_zone, pickup_borough
        HAVING COUNT(*) >= 500
    )
    SELECT 
        pickup_zone,
        pickup_borough,
        trip_count,
        ROUND(avg_speed, 2) as avg_speed_mph,
        ROUND(avg_fare_per_mile, 2) as avg_fare_per_mile,
        ROUND(avg_tip_pct, 2) as avg_tip_pct,
        ROUND(total_revenue, 2) as total_revenue,
        ROW_NUMBER() OVER (ORDER BY avg_speed DESC) as speed_rank,
        ROW_NUMBER() OVER (ORDER BY avg_fare_per_mile DESC) as fare_rank,
        ROW_NUMBER() OVER (ORDER BY total_revenue DESC) as revenue_rank
    FROM zone_metrics
    ORDER BY avg_speed DESC
    LIMIT 20
""")

print("\n🏆 Top 20 Most Efficient Zones (by Speed):")
zone_efficiency.show(truncate=False)

## 7. Business Intelligence Summary Dashboard

In [None]:
# Generate comprehensive business intelligence summary
print("📈 Business Intelligence Dashboard")
print("=" * 60)

# Executive summary metrics
exec_summary = spark.sql("""
    SELECT 
        'Overall Performance' as metric_category,
        COUNT(*) as total_trips,
        ROUND(SUM(total_amount), 2) as total_revenue,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(tip_percentage), 2) as avg_tip_pct,
        COUNT(DISTINCT pickup_location_id) as unique_pickup_zones,
        COUNT(DISTINCT dropoff_location_id) as unique_dropoff_zones,
        ROUND(AVG(trip_speed_mph), 2) as avg_speed_mph
    FROM nw_taxi.trips
""")

print("📊 EXECUTIVE SUMMARY:")
exec_summary.show(truncate=False)

# Market share analysis
market_share = spark.sql("""
    SELECT 
        trip_type,
        COUNT(*) as trip_count,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 2) as trip_market_share_pct,
        ROUND(SUM(total_amount), 2) as total_revenue,
        ROUND(SUM(total_amount) * 100.0 / SUM(SUM(total_amount)) OVER (), 2) as revenue_market_share_pct,
        ROUND(AVG(fare_amount), 2) as avg_fare,
        ROUND(AVG(trip_distance), 2) as avg_distance,
        ROUND(AVG(tip_percentage), 2) as avg_tip_pct
    FROM nw_taxi.trips
    GROUP BY trip_type
    ORDER BY trip_count DESC
""")

print("\n🚖 MARKET SHARE ANALYSIS:")
market_share.show(truncate=False)

# Peak performance indicators
peak_indicators = spark.sql("""
    WITH hourly_performance AS (
        SELECT 
            pickup_hour,
            COUNT(*) as trips,
            SUM(total_amount) as revenue
        FROM nw_taxi.trips
        GROUP BY pickup_hour
    ),
    daily_performance AS (
        SELECT 
            day_name,
            COUNT(*) as trips,
            SUM(total_amount) as revenue
        FROM nw_taxi.trips
        GROUP BY day_name
    )
    SELECT 
        'Peak Performance' as category,
        (SELECT pickup_hour FROM hourly_performance ORDER BY trips DESC LIMIT 1) as peak_hour,
        (SELECT MAX(trips) FROM hourly_performance) as peak_hour_trips,
        (SELECT day_name FROM daily_performance ORDER BY trips DESC LIMIT 1) as peak_day,
        (SELECT MAX(trips) FROM daily_performance) as peak_day_trips
""")

print("\n🕐 PEAK PERFORMANCE INDICATORS:")
peak_indicators.show(truncate=False)

# Revenue insights
revenue_insights = spark.sql("""
    WITH payment_revenue AS (
        SELECT 
            payment_method,
            SUM(total_amount) as revenue,
            AVG(tip_percentage) as avg_tip_pct
        FROM nw_taxi.trips
        GROUP BY payment_method
    )
    SELECT 
        payment_method,
        ROUND(revenue, 2) as total_revenue,
        ROUND(revenue * 100.0 / SUM(revenue) OVER (), 2) as revenue_share_pct,
        ROUND(avg_tip_pct, 2) as avg_tip_pct
    FROM payment_revenue
    ORDER BY revenue DESC
""")

print("\n💰 REVENUE BY PAYMENT METHOD:")
revenue_insights.show(truncate=False)

# Performance benchmarks
benchmarks = spark.sql("""
    SELECT 
        'Performance Benchmarks' as category,
        ROUND(PERCENTILE_APPROX(trip_speed_mph, 0.5), 2) as median_speed_mph,
        ROUND(PERCENTILE_APPROX(fare_per_mile, 0.5), 2) as median_fare_per_mile,
        ROUND(PERCENTILE_APPROX(tip_percentage, 0.5), 2) as median_tip_pct,
        ROUND(PERCENTILE_APPROX(trip_duration_minutes, 0.5), 2) as median_duration_min,
        COUNT(DISTINCT CASE WHEN pickup_hour BETWEEN 7 AND 9 OR pickup_hour BETWEEN 17 AND 19 THEN pickup_date END) as rush_hour_days,
        COUNT(DISTINCT pickup_date) as total_days
    FROM nw_taxi.trips
""")

print("\n⚡ PERFORMANCE BENCHMARKS:")
benchmarks.show(truncate=False)

print("\n" + "=" * 60)
print("✅ NYC Taxi Big Data Analysis Complete!")
print("🌐 Spark UI: Check the provided URL for detailed job execution metrics")
print("📊 All analytics performed using distributed Spark processing")

## 8. Cleanup and Session Management

In [None]:
# Optional: Save processed data for future analysis
print("💾 Saving processed data for future use...")

# Save enriched dataset (partitioned by trip_type for optimal access)
try:
    output_path = "../data/processed/nyc_taxi_enriched_spark"
    valid_trips.write.mode("overwrite").partitionBy("trip_type").parquet(output_path)
    print(f"✅ Enriched data saved to: {output_path}")
except Exception as e:
    print(f"⚠️ Could not save data: {e}")

# Unpersist cached DataFrames to free memory
valid_trips.unpersist()
trips_df.unpersist()
zones_df.unpersist()

print("🧹 Cache cleared")

# Print final statistics
print("\n📊 Final Analysis Statistics:")
print(f"   - Analyzed {exec_summary.collect()[0]['total_trips']:,} taxi trips")
print(f"   - Total revenue processed: ${exec_summary.collect()[0]['total_revenue']:,.2f}")
print(f"   - Used distributed processing across multiple cores")
print(f"   - Leveraged Spark SQL for complex analytics")

print("\n🎯 Key Insights Discovered:")
peak_data = peak_indicators.collect()[0]
market_data = market_share.collect()

print(f"   - Peak demand hour: {peak_data['peak_hour']}:00 ({peak_data['peak_hour_trips']:,} trips)")
print(f"   - Peak demand day: {peak_data['peak_day']} ({peak_data['peak_day_trips']:,} trips)")

for row in market_data:
    print(f"   - {row['trip_type'].title()} taxis: {row['trip_market_share_pct']:.1f}% market share")

# Keep Spark session running for interactive use
print("\n🚀 Spark session is still active for additional analysis")
print("💡 Use spark.stop() when you're completely done")

## Key Advantages of PySpark for NYC Taxi Analysis

### 🚀 **Scalability Benefits**
- **Distributed Processing**: Handle datasets too large for single-machine memory
- **Horizontal Scaling**: Add more cores/machines as data grows
- **Lazy Evaluation**: Optimizes query execution plans automatically

### ⚡ **Performance Optimizations**
- **In-Memory Caching**: Keep frequently accessed data in RAM
- **Columnar Storage**: Efficient parquet reading/writing
- **Catalyst Optimizer**: Automatic SQL query optimization
- **Code Generation**: JIT compilation for faster execution

### 🔧 **Advanced Analytics Capabilities**
- **Window Functions**: Complex time-series and ranking operations
- **Built-in ML**: MLlib for machine learning at scale
- **Streaming**: Real-time processing capabilities
- **Graph Processing**: Network analysis with GraphX

### 📊 **Big Data Integration**
- **Multiple Formats**: Parquet, JSON, CSV, Delta Lake
- **Database Connectivity**: JDBC connections to data warehouses
- **Cloud Integration**: Native support for S3, HDFS, Azure, GCS
- **Kafka Integration**: Real-time streaming from message queues

### 💼 **Production Ready**
- **Fault Tolerance**: Automatic recovery from node failures
- **Resource Management**: Dynamic resource allocation
- **Monitoring**: Rich metrics and debugging tools
- **Security**: Encryption, authentication, and authorization

---

**Next Steps**: 
- Scale to full historical dataset (years of data)
- Implement machine learning models for demand prediction
- Set up streaming analytics for real-time insights
- Deploy on cloud clusters for enterprise-scale analysis