In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
from pyspark.sql.functions import col, hour, to_date, count, desc, sum, avg, round

In [2]:
spark = SparkSession.builder.appName("UberOlaDataAnalysis").getOrCreate()

In [3]:
schema = StructType([
    StructField("Trip_ID", StringType(), True),
    StructField("Pickup_DateTime", TimestampType(), True),
    StructField("Drop_DateTime", TimestampType(), True),
    StructField("Pickup_Location", StringType(), True),
    StructField("Drop_Location", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Fare_Amount", DoubleType(), True),
    StructField("Payment_Type", StringType(), True)
])

In [4]:
df = spark.read.csv("uber_ola_data.csv", header=True, schema=schema)

In [5]:
print("Schema of the dataset:")
df.printSchema()

Schema of the dataset:
root
 |-- Trip_ID: string (nullable = true)
 |-- Pickup_DateTime: timestamp (nullable = true)
 |-- Drop_DateTime: timestamp (nullable = true)
 |-- Pickup_Location: string (nullable = true)
 |-- Drop_Location: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Fare_Amount: double (nullable = true)
 |-- Payment_Type: string (nullable = true)



In [6]:
df = df.withColumn("Pickup_Date", to_date(col("Pickup_DateTime"))) \
       .withColumn("Pickup_Hour", hour(col("Pickup_DateTime")))

In [7]:
df_clean = df.filter(col("Fare_Amount") > 0)

In [8]:
# Exploratory Queries
# Top 10 busiest pickup locations
top_pickup_locations = df_clean.groupBy("Pickup_Location") \
    .agg(count("Trip_ID").alias("Trip_Count")) \
    .orderBy(desc("Trip_Count")) \
    .limit(10)

In [9]:
# Top 10 busiest drop locations
top_drop_locations = df_clean.groupBy("Drop_Location") \
    .agg(count("Trip_ID").alias("Trip_Count")) \
    .orderBy(desc("Trip_Count")) \
    .limit(10)

In [10]:
# Aggregations
total_rides_per_city = df_clean.groupBy("City").agg(count("Trip_ID").alias("Total_Rides")).orderBy(desc("Total_Rides"))
total_revenue_per_city = df_clean.groupBy("City").agg(round(sum("Fare_Amount"), 2).alias("Total_Revenue")).orderBy(desc("Total_Revenue"))
avg_fare_per_city = df_clean.groupBy("City").agg(round(avg("Fare_Amount"), 2).alias("Avg_Fare")).orderBy(desc("Avg_Fare"))

In [11]:
# Time-Based Analysis
trips_by_hour = df_clean.groupBy("Pickup_Hour").agg(count("Trip_ID").alias("Trip_Count")).orderBy("Pickup_Hour")

In [12]:
# Filter peak hours
peak_morning = trips_by_hour.filter((col("Pickup_Hour") >= 8) & (col("Pickup_Hour") <= 10))
peak_evening = trips_by_hour.filter((col("Pickup_Hour") >= 18) & (col("Pickup_Hour") <= 21))

In [13]:
# Payment Insights
payment_share = df_clean.groupBy("Payment_Type").agg(count("Trip_ID").alias("Count"))
total_trips = df_clean.count()
payment_share = payment_share.withColumn("Share_Percent", round((col("Count") / total_trips)*100, 2))

In [14]:
# Show outputs
print("Top 10 busiest pickup locations by trip count:")
top_pickup_locations.show(truncate=False)

print("Top 10 busiest drop locations by trip count:")
top_drop_locations.show(truncate=False)

print("Total rides per city:")
total_rides_per_city.show(truncate=False)

print("Total revenue per city:")
total_revenue_per_city.show(truncate=False)

print("Average fare per trip per city:")
avg_fare_per_city.show(truncate=False)

print("Trip count by hour:")
trips_by_hour.show(24)

print("Peak morning ride hours (8-10 AM):")
peak_morning.show()

print("Peak evening ride hours (6-9 PM):")
peak_evening.show()

print("Payment type share across all trips:")
payment_share.show(truncate=False)

Top 10 busiest pickup locations by trip count:
+---------------+----------+
|Pickup_Location|Trip_Count|
+---------------+----------+
|Kukatpally     |139       |
|OMR            |136       |
|HSR Layout     |135       |
|Lajpat Nagar   |127       |
|Kondapur       |126       |
|Baner          |124       |
|Colaba         |124       |
|Anna Nagar     |122       |
|Begumpet       |122       |
|Banashankari   |121       |
+---------------+----------+

Top 10 busiest drop locations by trip count:
+-----------------+----------+
|Drop_Location    |Trip_Count|
+-----------------+----------+
|Golconda Fort    |140       |
|Express Avenue   |130       |
|UB City Mall     |129       |
|Charminar        |128       |
|Lumbini Park     |125       |
|ITC Gardenia     |124       |
|AIIMS            |123       |
|Stadium          |123       |
|Commercial Street|122       |
|IISc Campus      |119       |
+-----------------+----------+

Total rides per city:
+---------+-----------+
|City     |Total_Rid

In [17]:
# Save output DataFrames to CSV files for export
output_dfs = {
    'top_pickup_locations': top_pickup_locations,
    'top_drop_locations': top_drop_locations,
    'total_rides_per_city': total_rides_per_city,
    'total_revenue_per_city': total_revenue_per_city,
    'avg_fare_per_city': avg_fare_per_city,
    'trips_by_hour': trips_by_hour,
    'peak_morning_hours': peak_morning,
    'peak_evening_hours': peak_evening,
    'payment_share': payment_share
}

for name, df_out in output_dfs.items():
    # Coalesce to 1 file to avoid multiple part files on save
    df_out.coalesce(1).write.mode('overwrite').option('header', 'true').csv(f'output_{name}')
    print(f"Saved '{name}' output to folder: output_{name}")


Saved 'top_pickup_locations' output to folder: output_top_pickup_locations
Saved 'top_drop_locations' output to folder: output_top_drop_locations
Saved 'total_rides_per_city' output to folder: output_total_rides_per_city
Saved 'total_revenue_per_city' output to folder: output_total_revenue_per_city
Saved 'avg_fare_per_city' output to folder: output_avg_fare_per_city
Saved 'trips_by_hour' output to folder: output_trips_by_hour
Saved 'peak_morning_hours' output to folder: output_peak_morning_hours
Saved 'peak_evening_hours' output to folder: output_peak_evening_hours
Saved 'payment_share' output to folder: output_payment_share
