In [0]:
import os
# Setting up the main output directory
output_dir = "/Workspace/Repos/mayowaaloko@gmail.com/fhvhv-trip-data-analysis"
visuals_dir = f"{output_dir}/visualizations"

# Creating subdirectories for different types of analysis
folders = [
    f"{visuals_dir}/01_cleaning",
    f"{visuals_dir}/02_temporal",
    f"{visuals_dir}/03_geographic",
    f"{visuals_dir}/04_equity",
    f"{visuals_dir}/05_environmental",
    f"{visuals_dir}/06_economic",
    f"{visuals_dir}/07_comparison"
]

for folder in folders:
    dbutils.fs.mkdirs(folder)
    print(f"‚úì Created: {folder}")

print("\n Folder structure created successfully!")


In [0]:
# I'm importing all the tools I need for data processing and visualization
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import numpy as np

# Setting up my visualization preferences
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("‚úÖ Libraries loaded successfully")
print(f"Analysis started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

In [0]:
# I'm loading all three months of data from my Google Drive mount
df_jan = spark.table("workspace.google_drive.fhvhv_tripdata_2025_01")
df_feb = spark.table("workspace.google_drive.fhvhv_tripdata_2025_02")
df_mar = spark.table("workspace.google_drive.fhvhv_tripdata_2025_03")

# Combining all three months into one dataset
df_raw = df_jan.union(df_feb).union(df_mar)

# Let me see what I'm working with
print(f"‚úÖ Data loaded successfully")
print(f"Total records: {df_raw.count():,}")
print(f"Total columns: {len(df_raw.columns)}")

# Saving the original count so I can track how much data I remove during cleaning
original_count = df_raw.count()

In [0]:
# I want to see the structure of my data before I start cleaning
print("Column names and types:")
df_raw.printSchema()

In [0]:
# Looking at a few sample records to understand the data
print("\nFirst 5 rows:")
display(df_raw.limit(20))

In [0]:
# Getting a summary of what I'm dealing with
print(f"Dataset dimensions:")
print(f"  Rows: {df_raw.count():,}")
print(f"  Columns: {len(df_raw.columns)}")

# Showing all column names in a readable format
print(f"\nAll columns ({len(df_raw.columns)} total):")
for i, col in enumerate(df_raw.columns, 1):
    print(f"  {i:2d}. {col}")

In [0]:
# I need to know which columns have missing data and how much
# This helps me decide what to drop vs what to keep

missing_data = []

for col in df_raw.columns:
    null_count = df_raw.filter(F.col(col).isNull()).count()
    null_pct = (null_count / original_count) * 100
    
    if null_count > 0:
        missing_data.append({
            'column': col,
            'missing_count': null_count,
            'missing_pct': round(null_pct, 2)
        })

# Converting to pandas for easier visualization
missing_df = pd.DataFrame(missing_data).sort_values('missing_pct', ascending=False)

print("Columns with missing values:")
print(missing_df.to_string(index=False))

In [0]:
# Creating a chart to see which columns have the most missing data

if len(missing_df) > 0:
    fig, ax = plt.subplots(figsize=(12, 8))
    
    # Only showing top 20 to keep it readable
    top_missing = missing_df.head(20)
    
    sns.barplot(data=top_missing, y='column', x='missing_pct', ax=ax, palette='Reds_r')
    ax.set_xlabel('Missing Percentage (%)', fontsize=12)
    ax.set_ylabel('Column Name', fontsize=12)
    ax.set_title('Top 20 Columns with Missing Data', fontsize=14, fontweight='bold')
    
    # Adding percentage labels on bars
    for i, v in enumerate(top_missing['missing_pct']):
        ax.text(v + 0.5, i, f'{v:.1f}%', va='center', fontsize=10)
    
    plt.tight_layout()
    plt.show()
else:
    print("‚úÖ No missing data found!")

In [0]:
# I'm creating a copy to work with so I don't mess up the raw data
df = df_raw

print("‚úÖ Created working copy of data")
print("Starting data cleaning process...")

In [0]:
# Making sure all datetime columns are properly formatted
# These are critical for time-based analysis

datetime_cols = ['request_datetime', 'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime']

print("Cleaning datetime columns...")

for col in datetime_cols:
    if col in df.columns:
        # Converting to proper timestamp format
        df = df.withColumn(col, F.to_timestamp(F.col(col)))
        print(f"  ‚úì {col} converted to timestamp")

# Checking for any null values created during conversion
print("\nChecking for invalid dates:")
for col in datetime_cols:
    if col in df.columns:
        null_count = df.filter(F.col(col).isNull()).count()
        if null_count > 0:
            print(f"  ‚ö†Ô∏è  {col}: {null_count:,} invalid dates")
        else:
            print(f"  ‚úì {col}: all valid")

In [0]:
# Removing records where dates don't make logical sense

print("Removing records with invalid date logic...")

# Issue 1: Pickup should be after request
before_count = df.count()
df = df.filter(F.col('pickup_datetime') >= F.col('request_datetime'))
after_count = df.count()
removed = before_count - after_count
print(f"  Removed {removed:,} records where pickup was before request")

# Issue 2: Dropoff should be after pickup
before_count = df.count()
df = df.filter(F.col('dropoff_datetime') > F.col('pickup_datetime'))
after_count = df.count()
removed = before_count - after_count
print(f"  Removed {removed:,} records where dropoff was before/equal to pickup")

# Issue 3: All dates should be in Q1 2025
before_count = df.count()
df = df.filter(
    (F.col('pickup_datetime') >= '2025-01-01') & 
    (F.col('pickup_datetime') < '2025-04-01')
)
after_count = df.count()
removed = before_count - after_count
print(f"  Removed {removed:,} records outside Q1 2025")

print(f"\n‚úÖ Records remaining: {df.count():,}")

In [0]:
# Standardizing the company identifiers and mapping to readable names

print("Cleaning company license numbers...")

# Removing whitespace and making everything uppercase
df = df.withColumn('hvfhs_license_num', F.upper(F.trim(F.col('hvfhs_license_num'))))

# Showing the distribution
print("\nCompany distribution:")
df.groupBy('hvfhs_license_num').count().orderBy('count', ascending=False).show()

# Mapping license numbers to company names
# HV0003 = Uber, HV0005 = Lyft (official NYC TLC codes)
df = df.withColumn('company_name',
    F.when(F.col('hvfhs_license_num') == 'HV0003', 'Uber')
     .when(F.col('hvfhs_license_num') == 'HV0005', 'Lyft')
     .otherwise('Other')
)

print("\nCompany names assigned:")
df.groupBy('company_name').count().orderBy('count', ascending=False).show()

In [0]:
# These columns are from Databricks ingestion and not part of the actual NYC data
# I'm dropping them because they don't help with the analysis

print("Removing Databricks metadata columns...")

columns_to_drop = ['_line', '_fivetran_synced']

for col in columns_to_drop:
    if col in df.columns:
        df = df.drop(col)
        print(f"  ‚úì Dropped {col}")

print(f"\nColumns remaining: {len(df.columns)}")
print(f"Records remaining: {df.count():,}")

In [0]:
# NYC has 263 official taxi zones (IDs 1-263)
# Any ID outside this range is invalid

print("Validating location IDs...")

# Checking pickup locations
invalid_pickup = df.filter(
    (F.col('pulocation_id') < 1) | 
    (F.col('pulocation_id') > 263) |
    F.col('pulocation_id').isNull()
).count()

print(f"  Invalid pickup locations: {invalid_pickup:,}")

# Checking dropoff locations
invalid_dropoff = df.filter(
    (F.col('dolocation_id') < 1) | 
    (F.col('dolocation_id') > 263) |
    F.col('dolocation_id').isNull()
).count()

print(f"  Invalid dropoff locations: {invalid_dropoff:,}")

# Removing records with invalid locations
before_count = df.count()
df = df.filter(
    (F.col('pulocation_id').between(1, 263)) &
    (F.col('dolocation_id').between(1, 263))
)
after_count = df.count()
removed = before_count - after_count

print(f"  Removed {removed:,} records with invalid locations")
print(f"\n‚úÖ Records remaining: {df.count():,}")

In [0]:
# I want to see which zones are most popular before cleaning further

# Getting top 20 pickup zones
top_pickups = df.groupBy('pulocation_id').count().orderBy('count', ascending=False).limit(20).toPandas()

# Getting top 20 dropoff zones
top_dropoffs = df.groupBy('dolocation_id').count().orderBy('count', ascending=False).limit(20).toPandas()

# Creating side-by-side bar charts
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Pickup zones chart
axes[0].barh(range(len(top_pickups)), top_pickups['count'], color='steelblue')
axes[0].set_yticks(range(len(top_pickups)))
axes[0].set_yticklabels([f"Zone {int(x)}" for x in top_pickups['pulocation_id']])
axes[0].set_xlabel('Number of Pickups')
axes[0].set_title('Top 20 Pickup Locations', fontsize=14, fontweight='bold')
axes[0].invert_yaxis()
axes[0].grid(True, alpha=0.3, axis='x')

# Dropoff zones chart
axes[1].barh(range(len(top_dropoffs)), top_dropoffs['count'], color='coral')
axes[1].set_yticks(range(len(top_dropoffs)))
axes[1].set_yticklabels([f"Zone {int(x)}" for x in top_dropoffs['dolocation_id']])
axes[1].set_xlabel('Number of Dropoffs')
axes[1].set_title('Top 20 Dropoff Locations', fontsize=14, fontweight='bold')
axes[1].invert_yaxis()
axes[1].grid(True, alpha=0.3, axis='x')

plt.tight_layout()
plt.show()

In [0]:
# Removing trips with impossible distances

print("Cleaning trip distances...")

# Checking current distribution
print("\nTrip distance statistics BEFORE cleaning:")
df.select('trip_miles').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').show()

# Issue 1: Negative distances (impossible)
negative_distance = df.filter(F.col('trip_miles') < 0).count()
print(f"  Negative distances: {negative_distance:,}")

# Issue 2: Zero distances (likely errors or cancellations)
zero_distance = df.filter(F.col('trip_miles') == 0).count()
print(f"  Zero distances: {zero_distance:,}")

# Issue 3: Extremely long trips (over 100 miles likely errors)
extreme_distance = df.filter(F.col('trip_miles') > 100).count()
print(f"  Trips over 100 miles: {extreme_distance:,}")

# Removing invalid distances
before_count = df.count()
df = df.filter(
    (F.col('trip_miles') > 0) &
    (F.col('trip_miles') <= 100)
)
after_count = df.count()
removed = before_count - after_count

print(f"\n  Removed {removed:,} records with invalid distances")

print("\nTrip distance statistics AFTER cleaning:")
df.select('trip_miles').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').show()

print(f"‚úÖ Records remaining: {df.count():,}")

In [0]:
# Removing trips with impossible durations

print("Cleaning trip times...")

# Checking current distribution
print("\nTrip time statistics BEFORE cleaning:")
df.select('trip_time').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').show()

# Issue 1: Negative or zero times (impossible)
invalid_time = df.filter((F.col('trip_time') <= 0) | F.col('trip_time').isNull()).count()
print(f"  Zero or negative trip times: {invalid_time:,}")

# Issue 2: Extremely long trips (over 6 hours = 21600 seconds likely errors)
extreme_time = df.filter(F.col('trip_time') > 21600).count()
print(f"  Trips over 6 hours: {extreme_time:,}")

# Removing invalid trip times
before_count = df.count()
df = df.filter(
    (F.col('trip_time') > 0) &
    (F.col('trip_time') <= 21600)
)
after_count = df.count()
removed = before_count - after_count

print(f"\n  Removed {removed:,} records with invalid trip times")

print("\nTrip time statistics AFTER cleaning:")
df.select('trip_time').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').show()

print(f"‚úÖ Records remaining: {df.count():,}")

In [0]:
# Cleaning all monetary fields and checking for logical consistency

print("Cleaning fare components...")

fare_columns = ['base_passenger_fare', 'tolls', 'bcf', 'congestion_surcharge', 
                'sales_tax', 'tips', 'driver_pay', 'airport_fee', 'cbd_congestion_fee']

# Checking each fare component
print("\nFare component statistics:")
for col in fare_columns:
    if col in df.columns:
        negative_count = df.filter(F.col(col) < 0).count()
        null_count = df.filter(F.col(col).isNull()).count()
        
        if negative_count > 0 or null_count > 0:
            print(f"  {col}: {negative_count:,} negative | {null_count:,} null")

# Issue 1: Base fare must be positive (this is the core charge)
before_count = df.count()
df = df.filter(F.col('base_passenger_fare') > 0)
after_count = df.count()
removed = before_count - after_count
print(f"\n  Removed {removed:,} records with invalid base fare")

# Issue 2: Tips can be null (means $0 tip) so I'll fill those with 0
df = df.withColumn('tips', F.when(F.col('tips').isNull(), 0).otherwise(F.col('tips')))

# Issue 3: Other fees that are null should also be 0
for col in [
    'tolls',
    'bcf',
    'congestion_surcharge',
    'sales_tax',
    'airport_fee',
    'cbd_congestion_fee'
]:
    if col in df.columns:
        df = df.withColumn(
            col,
            F.when(
                F.col(col).isNull(),
                0
            ).otherwise(F.col(col))
        )

# Issue 4: Driver pay must be positive
before_count = df.count()
df = df.filter(F.col('driver_pay') > 0)
after_count = df.count()
removed = before_count - after_count
print(f"  Removed {removed:,} records with invalid driver pay")

print(f"\n‚úÖ Records remaining: {df.count():,}")

In [0]:
# Creating distribution plots for key fare metrics

# Getting a sample for visualization (plotting 60M points would crash)
sample_df = df.sample(fraction=0.001, seed=42).select(
    'base_passenger_fare', 'tips', 'driver_pay', 'trip_miles'
).toPandas()

fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# Base fare distribution
axes[0, 0].hist(sample_df['base_passenger_fare'], bins=50, color='steelblue', edgecolor='black')
axes[0, 0].set_xlabel('Base Passenger Fare ($)')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].set_title('Base Fare Distribution', fontweight='bold')
axes[0, 0].set_xlim(0, 100)
axes[0, 0].grid(True, alpha=0.3)

# Tips distribution
axes[0, 1].hist(sample_df['tips'], bins=50, color='green', edgecolor='black')
axes[0, 1].set_xlabel('Tips ($)')
axes[0, 1].set_ylabel('Frequency')
axes[0, 1].set_title('Tips Distribution', fontweight='bold')
axes[0, 1].set_xlim(0, 20)
axes[0, 1].grid(True, alpha=0.3)

# Driver pay distribution
axes[1, 0].hist(sample_df['driver_pay'], bins=50, color='coral', edgecolor='black')
axes[1, 0].set_xlabel('Driver Pay ($)')
axes[1, 0].set_ylabel('Frequency')
axes[1, 0].set_title('Driver Pay Distribution', fontweight='bold')
axes[1, 0].set_xlim(0, 80)
axes[1, 0].grid(True, alpha=0.3)

# Trip miles distribution
axes[1, 1].hist(sample_df['trip_miles'], bins=50, color='purple', edgecolor='black')
axes[1, 1].set_xlabel('Trip Distance (miles)')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].set_title('Trip Distance Distribution', fontweight='bold')
axes[1, 1].set_xlim(0, 30)
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("‚úÖ Fare distributions look reasonable after cleaning")

In [0]:
# Validating all the yes/no flag columns

print("Cleaning flag fields...")

flag_columns = ['shared_request_flag', 'shared_match_flag', 'access_a_ride_flag', 
                'wav_request_flag', 'wav_match_flag']

for col in flag_columns:
    if col in df.columns:
        print(f"\n{col} distribution:")
        df.groupBy(col).count().orderBy('count', ascending=False).show()
        
        # These flags should only be Y, N, or null
        # I'm standardizing them
        df = df.withColumn(col, F.upper(F.trim(F.col(col))))
        
        # Converting nulls to 'N' (means the flag wasn't triggered)
        df = df.withColumn(col, F.when(F.col(col).isNull(), 'N').otherwise(F.col(col)))
        
        print(f"  ‚úì Standardized {col}")

In [0]:
# Extracting useful time features from the pickup datetime

print("Creating time-based features...")

df = df.withColumn('pickup_hour', F.hour('pickup_datetime'))
df = df.withColumn('pickup_day_of_week', F.dayofweek('pickup_datetime'))  # 1=Sunday, 7=Saturday
df = df.withColumn('pickup_date', F.to_date('pickup_datetime'))
df = df.withColumn('pickup_month', F.month('pickup_datetime'))

# Creating a weekend flag (Saturday=7, Sunday=1)
df = df.withColumn('is_weekend', 
    F.when(F.col('pickup_day_of_week').isin([1, 7]), 'Y').otherwise('N')
)

# Creating day name for easier reading
df = df.withColumn('day_name',
    F.when(F.col('pickup_day_of_week') == 1, 'Sunday')
     .when(F.col('pickup_day_of_week') == 2, 'Monday')
     .when(F.col('pickup_day_of_week') == 3, 'Tuesday')
     .when(F.col('pickup_day_of_week') == 4, 'Wednesday')
     .when(F.col('pickup_day_of_week') == 5, 'Thursday')
     .when(F.col('pickup_day_of_week') == 6, 'Friday')
     .when(F.col('pickup_day_of_week') == 7, 'Saturday')
)

print("‚úÖ Time features created:")
print("  - pickup_hour (0-23)")
print("  - pickup_day_of_week (1-7)")
print("  - pickup_date")
print("  - pickup_month (1-3)")
print("  - is_weekend (Y/N)")
print("  - day_name")

In [0]:
# Wait time is how long passengers waited between requesting and getting picked up
# This is a key equity metric - longer waits indicate poor service

print("Calculating wait time...")

df = df.withColumn('wait_time_seconds', 
    F.unix_timestamp('pickup_datetime') - F.unix_timestamp('request_datetime')
)

# Converting to minutes for easier interpretation
df = df.withColumn('wait_time_minutes', F.col('wait_time_seconds') / 60)

# Checking the distribution
print("\nWait time statistics:")
df.select('wait_time_minutes').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').show()

# Removing extreme outliers (waits over 2 hours are likely data errors or abandoned requests)
extreme_waits = df.filter(F.col('wait_time_minutes') > 120).count()
print(f"\nWaits over 2 hours (likely errors): {extreme_waits:,}")

before_count = df.count()
df = df.filter(F.col('wait_time_minutes') <= 120)
after_count = df.count()
removed = before_count - after_count

print(f"Removed {removed:,} records with extreme wait times")
print(f"‚úÖ Records remaining: {df.count():,}")

In [0]:
# This metric helps identify pricing inequality across different areas

print("Calculating fare per mile...")

df = df.withColumn('fare_per_mile',
    F.col('base_passenger_fare') / F.col('trip_miles')
)

# Checking the distribution
print("\nFare per mile statistics:")
df.select('fare_per_mile').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').show()

# Removing extreme outliers (fare per mile over $100 is truly impossible in NYC)
extreme_fpm = df.filter(F.col('fare_per_mile') > 100).count()
print(f"\nFare per mile over $100: {extreme_fpm:,}")

before_count = df.count()
df = df.filter(F.col('fare_per_mile') <= 100)   
after_count = df.count()
removed = before_count - after_count

print(f"Removed {removed:,} records with extreme fare per mile")
print(f"Records remaining: {df.count():,}")

In [0]:
# This shows what percentage of the total fare goes to drivers vs the platform
print("Calculating driver earnings percentage...")

# Total amount passenger paid (including tips ‚Äî this is correct for 2025 raw files)
df = df.withColumn('passenger_total_paid',
    F.col('base_passenger_fare') + 
    F.coalesce(F.col('tolls'), F.lit(0)) +
    F.coalesce(F.col('bcf'), F.lit(0)) +
    F.coalesce(F.col('congestion_surcharge'), F.lit(0)) +
    F.coalesce(F.col('sales_tax'), F.lit(0)) +
    F.coalesce(F.col('tips'), F.lit(0)) +
    F.coalesce(F.col('airport_fee'), F.lit(0)) +
    F.coalesce(F.col('cbd_congestion_fee'), F.lit(0))
)

# driver_pay √∑ total passenger paid = actual take-home %
df = df.withColumn('driver_pct',
    F.round(
        F.when(F.col('passenger_total_paid') > 0,
               (F.col('driver_pay') / F.col('passenger_total_paid') * 100)
        ).otherwise(None),
        2
    )
)


print("\nDriver earnings percentage statistics:")
df.select('driver_pct').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').display()

print(f"\nQ1 2025 average driver take-rate: {df.select(F.mean('driver_pct')).collect()[0][0]:.2f}%")


print("Driver percentage calculated")

In [0]:
# Using EPA standard: average car emits 0.411 kg CO‚ÇÇ per mile
# This is for sustainability analysis

print("Calculating CO‚ÇÇ emissions...")

CO2_PER_MILE = 0.411  # kg of CO‚ÇÇ per mile (EPA standard)

df = df.withColumn('co2_kg', F.round(F.col('trip_miles') * F.lit(CO2_PER_MILE), 3))

# Also converting to pounds for US audience
df = df.withColumn('co2_lbs', F.round(F.col('co2_kg') * 2.20462, 3))

print("\nCO‚ÇÇ emissions statistics:")
df.select('co2_kg', 'co2_lbs').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').display()

# Calculating total Q1 emissions
total_co2_kg = df.agg(F.sum('co2_kg')).collect()[0][0]
total_co2_tons = total_co2_kg / 1000

print(f"\nüåç TOTAL Q1 2025 EMISSIONS:")
print(f"   {total_co2_kg:,.0f} kg")
print(f"   {total_co2_tons:,.0f} metric tons")
print(f"   Equivalent to {total_co2_tons/4.6:,.0f} passenger vehicles driven for a year")

print("\n‚úÖ Emissions calculated")

In [0]:
# Checking if trip speeds make sense (helps catch remaining data errors)
print("Calculating average speed...")

# Speed = distance / time
# Trip time is in seconds ‚Üí convert to hours for mph
df = df.withColumn('speed_mph',
    F.round(
        F.col('trip_miles') / (F.col('trip_time') / 3600), 
        2
    )
)

print("\nSpeed statistics:")
df.select('speed_mph').summary('count', 'mean', 'min', 'max', '50%', '75%', '95%').display()

# Checking for unrealistic speeds
very_slow = df.filter(F.col('speed_mph') < 1).count()
very_fast = df.filter(F.col('speed_mph') > 80).count()

print(f"\nTrips under 1 mph (stuck in traffic/error): {very_slow:,}")
print(f"Trips over 80 mph (likely errors): {very_fast:,}")

# Removing impossible speeds (over 80 mph in NYC is unrealistic)
before_count = df.count()
df = df.filter((F.col('speed_mph') >= 1) & (F.col('speed_mph') <= 80))
after_count = df.count()
removed = before_count - after_count

print(f"\nRemoved {removed:,} records with unrealistic speeds")
print(f"Records remaining: {df.count():,}")

In [0]:
# Looking at how demand varies by hour of day
print("Analyzing hourly patterns...")

# Getting hourly trip counts
hourly_data = df.groupBy('pickup_hour').agg(
    F.count('*').alias('trip_count'),
    F.avg('wait_time_minutes').alias('avg_wait'),
    F.avg('fare_per_mile').alias('avg_fare_per_mile'),
    F.avg('driver_pct').alias('avg_driver_take_rate')   # ‚Üê how much drivers actually keep by hour
).orderBy('pickup_hour').toPandas()

# Creating a FOUR-panel visualization
fig, axes = plt.subplots(4, 1, figsize=(16, 16))

# Panel 1: Trip volume by hour
axes[0].bar(hourly_data['pickup_hour'], hourly_data['trip_count'], color='steelblue', edgecolor='black')
axes[0].set_xlabel('Hour of Day', fontsize=12)
axes[0].set_ylabel('Number of Trips', fontsize=12)
axes[0].set_title('Hourly Trip Volume - Q1 2025', fontsize=14, fontweight='bold')
axes[0].set_xticks(range(0, 24))
axes[0].grid(True, alpha=0.3, axis='y')

# Panel 2: Average wait time by hour
axes[1].plot(hourly_data['pickup_hour'], hourly_data['avg_wait'], marker='o', color='coral', linewidth=2)
axes[1].set_xlabel('Hour of Day', fontsize=12)
axes[1].set_ylabel('Average Wait Time (minutes)', fontsize=12)
axes[1].set_title('Average Wait Time by Hour', fontsize=14, fontweight='bold')
axes[1].set_xticks(range(0, 24))
axes[1].grid(True, alpha=0.3)

# Panel 3: Average fare per mile by hour
axes[2].plot(hourly_data['pickup_hour'], hourly_data['avg_fare_per_mile'], marker='s', color='green', linewidth=2)
axes[2].set_xlabel('Hour of Day', fontsize=12)
axes[2].set_ylabel('Fare per Mile ($)', fontsize=12)
axes[2].set_title('Pricing by Hour', fontsize=14, fontweight='bold')
axes[2].set_xticks(range(0, 24))
axes[2].grid(True, alpha=0.3)

# Panel 4: Driver take-rate by hour (the money shot!)
axes[3].plot(hourly_data['pickup_hour'], hourly_data['avg_driver_take_rate'], 
             marker='D', color='purple', linewidth=3, markersize=6)
axes[3].set_xlabel('Hour of Day', fontsize=12)
axes[3].set_ylabel('Driver Take Rate (%)', fontsize=12)
axes[3].set_title('How Much Drivers Actually Keep by Hour', fontsize=14, fontweight='bold')
axes[3].set_xticks(range(0, 24))
axes[3].set_ylim(60, 90)
axes[3].grid(True, alpha=0.3)

plt.tight_layout()
display(plt.gcf())
plt.savefig(
    '/Workspace/Repos/mayowaaloko@gmail.com/fhvhv-trip-data-analysis/visualizations/02_temporal/hourly_patterns.png',
    dpi=300,
    bbox_inches='tight',
    facecolor='white'
)
plt.close()

print("Hourly patterns visualized ‚Äì including driver earnings percentage!")

In [0]:
# Comparing weekday vs weekend patterns
print("Analyzing day of week patterns...")

# Getting daily statistics
daily_data = df.groupBy('day_name', 'pickup_day_of_week').agg(
    F.count('*').alias('trip_count'),
    F.avg('wait_time_minutes').alias('avg_wait'),
    F.avg('base_passenger_fare').alias('avg_fare')
).orderBy('pickup_day_of_week').toPandas()

# Creating visualizations
fig, axes = plt.subplots(1, 3, figsize=(18, 6))

# Panel 1: Trip volume by day
axes[0].bar(daily_data['day_name'], daily_data['trip_count'], color='purple', edgecolor='black')
axes[0].set_xlabel('Day of Week', fontsize=12)
axes[0].set_ylabel('Number of Trips', fontsize=12)
axes[0].set_title('Trip Volume by Day', fontsize=14, fontweight='bold')
axes[0].tick_params(axis='x', rotation=45)
axes[0].grid(True, alpha=0.3, axis='y')

# Panel 2: Wait times by day
axes[1].bar(daily_data['day_name'], daily_data['avg_wait'], color='orange', edgecolor='black')
axes[1].set_xlabel('Day of Week', fontsize=12)
axes[1].set_ylabel('Average Wait Time (minutes)', fontsize=12)
axes[1].set_title('Wait Times by Day', fontsize=14, fontweight='bold')
axes[1].tick_params(axis='x', rotation=45)
axes[1].grid(True, alpha=0.3, axis='y')

# Panel 3: Fares by day
axes[2].bar(daily_data['day_name'], daily_data['avg_fare'], color='teal', edgecolor='black')
axes[2].set_xlabel('Day of Week', fontsize=12)
axes[2].set_ylabel('Average Fare ($)', fontsize=12)
axes[2].set_title('Fares by Day', fontsize=14, fontweight='bold')
axes[2].tick_params(axis='x', rotation=45)
axes[2].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
display(plt.gcf()) 
plt.savefig(
    '/Workspace/Repos/mayowaaloko@gmail.com/fhvhv-trip-data-analysis/visualizations/02_temporal/daily_patterns.png',
    dpi=300,
    bbox_inches='tight',
    facecolor='white'
)
plt.close()

print("Day of week patterns visualized")

In [0]:
# This is critical for the accessibility crisis investigation
print("Analyzing wheelchair accessibility...")

# Calculating accessibility rates
wav_stats = df.groupBy('wav_match_flag').count().toPandas()
total_trips = df.count()
print("\nWheelchair accessible vehicle (WAV) matches:")
print(wav_stats)

wav_yes = df.filter(F.col('wav_match_flag') == 'Y').count()
wav_pct = (wav_yes / total_trips) * 100

print(f"\nACCESSIBILITY CRISIS:")
print(f" WAV-matched trips: {wav_yes:,}")
print(f" Percentage: {wav_pct:.3f}%")
print(f" This means only {wav_pct:.3f}% of rides accommodate wheelchair users")

# Comparing WAV requests vs matches
wav_requests = df.filter(F.col('wav_request_flag') == 'Y').count()
wav_matches = df.filter((F.col('wav_request_flag') == 'Y') & (F.col('wav_match_flag') == 'Y')).count()
match_rate = (wav_matches / wav_requests) * 100 if wav_requests > 0 else 0

print(f"\n WAV requests: {wav_requests:,}")
print(f" WAV matches (when requested): {wav_matches:,}")
print(f" Match rate: {match_rate:.1f}%")

# Visualization
fig, axes = plt.subplots(1, 2, figsize=(15, 7))

# Left: overall WAV usage
axes[0].pie([total_trips - wav_yes, wav_yes],
            labels=['Not Accessible', 'Accessible'],
            autopct='%1.3f%%',
            colors=['#ff6b6b', '#51cf66'],
            explode=(0, 0.1),
            textprops={'fontsize': 13, 'weight': 'bold'},
            shadow=True,
            startangle=90)
axes[0].set_title('Wheelchair Accessibility Rate', fontsize=15, fontweight='bold')

# Right: fulfillment rate
if wav_requests > 0:
    axes[1].pie([wav_requests - wav_matches, wav_matches],
                labels=['Unfulfilled', 'Fulfilled'],
                autopct='%1.1f%%',
                colors=['#ff6b6b', '#51cf66'],
                explode=(0, 0.1),
                textprops={'fontsize': 13, 'weight': 'bold'},
                shadow=True,
                startangle=90)
else:
    axes[1].pie([1], colors=['lightgray'])
    axes[1].text(0, 0, 'No WAV\nrequests', ha='center', va='center', fontsize=14, weight='bold')

axes[1].set_title('WAV Request Fulfillment Rate', fontsize=15, fontweight='bold')

plt.tight_layout()
display(plt.gcf())
plt.savefig(
    '/Workspace/Repos/mayowaaloko@gmail.com/fhvhv-trip-data-analysis/visualizations/04_equity/accessibility_crisis.png',
    dpi=300,
    bbox_inches='tight',
    facecolor='white'
)
plt.close()

print("\nAccessibility analysis complete")



# -------------------------------------------------------------------
# Adding contextual visualization to clarify accessibility interpretation
# -------------------------------------------------------------------
print("\nAdding contextual accessibility chart...")

# Calculating proportions for added context
non_wav_trips = total_trips - wav_yes

fig2, ax = plt.subplots(figsize=(8, 7))

# Bar chart showing:
# 1. Total trips
# 2. WAV requests
# 3. WAV matches
ax.bar(['Total Trips', 'WAV Requests', 'WAV Matches'],
       [total_trips, wav_requests, wav_matches],
       color=['#4dabf7', '#ffa94d', '#51cf66'])

# Labeling bars with values
for i, value in enumerate([total_trips, wav_requests, wav_matches]):
    ax.text(i, value, f'{value:,}', ha='center', va='bottom', fontsize=12, fontweight='bold')

# Title and explanation
ax.set_title('Context: Comparing Total Trips vs WAV Requests vs Matches',
             fontsize=15, fontweight='bold')
ax.set_ylabel('Count', fontsize=13)

# Explanatory note directly in the plot
ax.text(1, -max(total_trips, wav_requests, wav_matches) * 0.05,
        "Low WAV share (~9%) reflects low demand, not unmet need.\n"
        "Nearly all WAV requests are fulfilled (‚âà100%).",
        ha='center', va='top', fontsize=12)


plt.tight_layout()
display(fig2)
plt.close()

print("Contextual visualization added.\n")


In [0]:
# Summarizing everything I've cleaned

print("\n" + "="*80)
print("FINAL CLEANING SUMMARY")
print("="*80)

final_count = df.count()
records_removed = original_count - final_count
pct_removed = (records_removed / original_count) * 100

print(f"\nDATA CLEANING RESULTS:")
print(f"   Original records: {original_count:,}")
print(f"   Final records: {final_count:,}")
print(f"   Removed: {records_removed:,} ({pct_removed:.2f}%)")

print(f"\nCLEANED DATASET READY:")
print(f"   Columns: {len(df.columns)}")
print(f"   Date range: Q1 2025 (Jan-Mar)")
print(f"   Companies: Uber, Lyft")
print(f"   All datetime fields validated")
print(f"   All numeric fields validated")
print(f"   All flags standardized")
print(f"   New features created: 15")

print("\nNEW FEATURES ADDED:")
new_features = [
    'company_name', 'pickup_hour', 'pickup_day_of_week', 'pickup_date', 
    'pickup_month', 'is_weekend', 'day_name', 'wait_time_seconds', 
    'wait_time_minutes', 'fare_per_mile', 'total_fare', 'driver_pct', 
    'co2_kg', 'co2_lbs', 'speed_mph'
]
for i, feature in enumerate(new_features, 1):
    print(f"   {i:2d}. {feature}")

In [0]:
# Saving the cleaned dataset so I don't have to rerun cleaning every time

print("\n" + "="*80)
print("SAVING CLEANED DATA")
print("="*80)

# Delta Lake is Databricks' optimized storage format
# It's better than Parquet because it supports ACID transactions and time travel

table_name = "nyc_ridehail_q1_2025_clean"

print(f"\nSaving to Delta Lake table: {table_name}")

# Writing the cleaned data
df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_name)

print(f"‚úÖ Data saved successfully!")

# Verifying the save
saved_count = spark.table(table_name).count()
print(f"\nVerification: {saved_count:,} records in saved table")

if saved_count == final_count:
    print("‚úì Record count matches!")
else:
    print("‚ö†Ô∏è  Warning: Record count mismatch")

print("\n" + "="*80)
print("PHASE 1: DATA CLEANING COMPLETE")
print("="*80)

In [0]:
# I'm creating a summary document of all the important findings from cleaning

findings = {
    "Data Quality": {
        "Original Records": f"{original_count:,}",
        "Final Records": f"{final_count:,}",
        "Records Removed": f"{original_count - final_count:,} ({((original_count - final_count)/original_count*100):.2f}%)",
        "Data Quality": "High - only 5.33% removed"
    },
    
    "Market Share": {
        "Uber": f"{df.filter(F.col('company_name') == 'Uber').count():,} trips (77.04%)",
        "Lyft": f"{df.filter(F.col('company_name') == 'Lyft').count():,} trips (22.96%)"
    },
    
    "Temporal Patterns": {
        "Peak Hour": "6 PM (18:00) - 3.4M trips",
        "Lowest Hour": "4 AM - 850K trips",
        "Busiest Day": "Saturday - 9.5M trips",
        "Slowest Day": "Tuesday - 6.9M trips",
        "Weekend Percentage": f"{(df.filter(F.col('is_weekend') == 'Y').count() / final_count * 100):.1f}%"
    },
    
    "Service Metrics": {
        "Average Wait Time": "4.67 minutes",
        "Average Trip Distance": "4.37 miles",
        "Average Trip Duration": "17.9 minutes",
        "Average Speed": "13.2 mph"
    },
    
    "Economic Metrics": {
        "Average Base Fare": f"${df.agg(F.avg('base_passenger_fare')).collect()[0][0]:.2f}",
        "Average Fare Per Mile": f"${df.agg(F.avg('fare_per_mile')).collect()[0][0]:.2f}",
        "Average Driver Pay": f"${df.agg(F.avg('driver_pay')).collect()[0][0]:.2f}",
        "Driver Take Rate": "61.74% of total fare"
    },
    
    "Accessibility Crisis": {
        "WAV Match Rate": "9.40% of all trips",
        "WAV Requests": "135,829 (0.24% of trips)",
        "Request Fulfillment": "~100% (nearly all requests matched)",
        "Key Finding": "Low accessibility rate reflects low DEMAND, not unmet need"
    },
    
    "Environmental Impact": {
        "Total Q1 CO2 Emissions": "102,574 metric tons",
        "Average Per Trip": "1.80 kg CO2",
        "Equivalent To": "22,299 passenger vehicles for one year"
    }
}

print("\n" + "="*80)
print("KEY FINDINGS FROM PHASE 1: DATA CLEANING")
print("="*80)

for category, metrics in findings.items():
    print(f"\n{category.upper()}:")
    for key, value in metrics.items():
        print(f"  ‚Ä¢ {key}: {value}")

print("\n" + "="*80)