In [1]:

# -------------------------------------------
# SETUP
# -------------------------------------------
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("RideHailing_CaseStudy") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# -------------------------------------------
# RAW DATA (as provided)
# -------------------------------------------
raw_drivers = [
    ("D001","Ramesh","35","Hyderabad","Car,Bike"),
    ("D002","Suresh","Forty","Bangalore","Auto"),
    ("D003","Anita",None,"Mumbai",["Car"]),
    ("D004","Kiran","29","Delhi","Car|Bike"),
    ("D005","", "42","Chennai",None)
]

raw_cities = [
    ("Hyderabad","South"),
    ("Bangalore","South"),
    ("Mumbai","West"),
    ("Delhi","North"),
    ("Chennai","South")
]

raw_trips = [
    ("T001","D001","Hyderabad","2024-01-05","Completed","450"),
    ("T002","D002","Bangalore","05/01/2024","Cancelled","0"),
    ("T003","D003","Mumbai","2024/01/06","Completed","620"),
    ("T004","D004","Delhi","invalid_date","Completed","540"),
    ("T005","D001","Hyderabad","2024-01-10","Completed","700"),
    ("T006","D005","Chennai","2024-01-12","Completed","350")
]

raw_activity = [
    ("D001","login,accept_trip,logout","{'device':'mobile'}",180),
    ("D002",["login","logout"],"device=laptop",60),
    ("D003","login|accept_trip",None,120),
    ("D004",None,"{'device':'tablet'}",90),
    ("D005","login","{'device':'mobile'}",30)
]


Part a - Data cleaning and structuring

1. Explicit Schemas

In [2]:

# -------------------------------------------
# EXPLICIT SCHEMAS
# -------------------------------------------
drivers_schema = StructType([
    StructField("driver_id", StringType(), False),
    StructField("driver_name", StringType(), True),
    StructField("age_raw", StringType(), True),
    StructField("home_city", StringType(), True),
    StructField("vehicle_raw", StringType(), True)
])

cities_schema = StructType([
    StructField("city", StringType(), False),
    StructField("region", StringType(), False)
])

trips_schema = StructType([
    StructField("trip_id", StringType(), False),
    StructField("driver_id", StringType(), False),
    StructField("city", StringType(), False),
    StructField("trip_date_raw", StringType(), True),
    StructField("status", StringType(), True),
    StructField("fare_raw", StringType(), True)
])

activity_schema = StructType([
    StructField("driver_id", StringType(), False),
    StructField("actions_raw", StringType(), True),
    StructField("metadata_raw", StringType(), True),
    StructField("active_seconds", IntegerType(), True)
])


drivers_raw_df  = spark.createDataFrame(raw_drivers, schema=drivers_schema)
cities_df       = spark.createDataFrame(raw_cities,  schema=cities_schema)
trips_raw_df    = spark.createDataFrame(raw_trips,   schema=trips_schema)
activity_raw_df = spark.createDataFrame(raw_activity, schema=activity_schema)


2. Normalization (Age, Fare, Dates, Arrays)

In [4]:

# -------------------------------------------
# HELPERS: normalizing arrays from messy strings
# -------------------------------------------
def normalize_list_col(col_str):

    cleaned = F.regexp_replace(col_str, r"[\[\]']", "")
    cleaned = F.regexp_replace(cleaned, r"\s+", "")
    cleaned = F.regexp_replace(cleaned, r"\|", ",")
    arr = F.split(cleaned, ",")

    arr = F.filter(F.transform(arr, lambda x: F.trim(x)), lambda x: (x.isNotNull()) & (x != ""))
    return F.when(col_str.isNull(), F.array().cast("array<string>")).otherwise(arr)

# -------------------------------------------
# CLEAN DRIVERS
# -------------------------------------------
drivers_df = (
    drivers_raw_df

    .withColumn(
        "age",
        F.coalesce(
            F.col("age_raw").cast("int"),
            F.when(F.lower(F.col("age_raw")) == "forty", F.lit(40)).otherwise(F.lit(None).cast("int"))
        )
    )

    .withColumn("vehicle_types", normalize_list_col(F.col("vehicle_raw")))

    .withColumn("driver_name", F.when(F.length(F.col("driver_name")) == 0, None).otherwise(F.col("driver_name")))
    .select("driver_id", "driver_name", "age", "home_city", "vehicle_types")
)


invalid_drivers_df = drivers_df.where(
    (F.col("driver_name").isNull()) | (F.col("age").isNull()) | (F.size(F.col("vehicle_types")) == 0)
)

# -------------------------------------------
# CLEAN TRIPS
# -------------------------------------------

trips_df = (
    trips_raw_df
    .withColumn("trip_date",
        F.coalesce(
            F.to_date("trip_date_raw", "yyyy-MM-dd"),
            F.to_date("trip_date_raw", "yyyy/MM/dd"),
            F.to_date("trip_date_raw", "dd/MM/yyyy")
        )
    )
    .withColumn("fare", F.col("fare_raw").cast("double"))
    .withColumn("is_completed", F.lower(F.col("status")) == F.lit("completed"))
    .select("trip_id", "driver_id", "city", "trip_date", "status", "is_completed", "fare")
)


invalid_trips_df = trips_df.where(F.col("trip_date").isNull() | F.col("fare").isNull() | (F.col("fare") < 0))

# -------------------------------------------
# CLEAN ACTIVITY
# -------------------------------------------

activity_intermediate_df = activity_raw_df.withColumn("actions", normalize_list_col(F.col("actions_raw")))


metadata_json_like = F.regexp_replace(F.col("metadata_raw"), r"'", '"')
metadata_kv_json = F.when(
    F.col("metadata_raw").rlike(r"^[A-Za-z0-9_]+\=[A-Za-z0-9_\-]+$"),
    F.concat(F.lit('{"'),
             F.split(F.col("metadata_raw"), "=").getItem(0),
             F.lit('":"'),
             F.split(F.col("metadata_raw"), "=").getItem(1),
             F.lit('"}'))
).otherwise(metadata_json_like)

metadata_schema = MapType(StringType(), StringType())

activity_df = (
    activity_intermediate_df
    .withColumn("metadata_json", metadata_kv_json)
    .withColumn("metadata", F.from_json(F.col("metadata_json"), metadata_schema))
    .select("driver_id", "actions", "metadata", "active_seconds")
)


Part: B - Data Integration (Joins)

In [11]:
# -------------------------------------------
# BROADCAST the small city dimension
# -------------------------------------------
cities_b = F.broadcast(cities_df)

# Remove orphan trips (drivers not in master): use INNER join with drivers
trips_plus_drivers_df = (
    trips_df.alias("t")
    .join(drivers_df.alias("d"), on=F.col("t.driver_id") == F.col("d.driver_id"), how="inner")
    .select(
        F.col("t.trip_id"),
        F.col("t.driver_id").alias("trip_driver_id"),
        F.col("t.city").alias("trip_city"),
        F.col("t.trip_date"),
        F.col("t.status"),
        F.col("t.is_completed"),
        F.col("t.fare"),
        F.col("d.driver_id").alias("master_driver_id"),
        F.col("d.driver_name"),
        F.col("d.age"),
        F.col("d.home_city"),
        F.col("d.vehicle_types")
    )
)

# Join with city lookup (broadcast)
trips_enriched_df = (
    trips_plus_drivers_df.alias("td")
    .join(cities_b.alias("c"), on=F.col("td.trip_city") == F.col("c.city"), how="left")
    .select(
        F.col("td.*"), # Select all columns from trips_plus_drivers_df (now aliased as td)
        F.col("c.city").alias("region_city"), # Alias city from cities_b
        F.col("c.region") # region is unique
    )
)

# Prove broadcast decision

trips_enriched_df.explain(True)


== Parsed Logical Plan ==
'Project [td.*, 'c.city AS region_city#86, 'c.region]
+- Join LeftOuter, (trip_city#84 = city#5)
   :- SubqueryAlias td
   :  +- Project [trip_id#7, driver_id#8 AS trip_driver_id#83, city#9 AS trip_city#84, trip_date#35, status#11, is_completed#37, fare#36, driver_id#0 AS master_driver_id#85, driver_name#34, age#30, home_city#3, vehicle_types#31]
   :     +- Join Inner, (driver_id#8 = driver_id#0)
   :        :- SubqueryAlias t
   :        :  +- Project [trip_id#7, driver_id#8, city#9, trip_date#35, status#11, is_completed#37, fare#36]
   :        :     +- Project [trip_id#7, driver_id#8, city#9, trip_date_raw#10, status#11, fare_raw#12, trip_date#35, fare#36, (lower(status#11) = completed) AS is_completed#37]
   :        :        +- Project [trip_id#7, driver_id#8, city#9, trip_date_raw#10, status#11, fare_raw#12, trip_date#35, cast(fare_raw#12 as double) AS fare#36]
   :        :           +- Project [trip_id#7, driver_id#8, city#9, trip_date_raw#10, status#

PART C - Analytics & Aggregations

In [12]:
completed_trips_df = trips_enriched_df.where(F.col("is_completed") & F.col("fare").isNotNull())

# The trips_enriched_df now has unique column names, so direct selection is possible.
# Using meaningful aliases for clarity in aggregations.

# 1) Total trips per city
total_trips_per_city_df = (
    completed_trips_df.groupBy("trip_city")
    .agg(F.countDistinct("trip_id").alias("total_completed_trips"))
)

# 2) Total revenue per city
total_revenue_per_city_df = (
    completed_trips_df.groupBy("trip_city")
    .agg(F.sum("fare").alias("total_revenue"))
)

# 3) Average fare per driver (over completed trips)
avg_fare_per_driver_df = (
    completed_trips_df.groupBy("master_driver_id", "driver_name")
    .agg(F.avg("fare").alias("avg_fare"))
)

# 4) Total completed trips per driver
completed_trips_per_driver_df = (
    completed_trips_df.groupBy("master_driver_id", "driver_name")
    .agg(F.countDistinct("trip_id").alias("completed_trips"))
)

# 5) Identify drivers with no completed trips
drivers_with_completed_df = completed_trips_df.select("master_driver_id").distinct()
drivers_no_completed_df = drivers_df.join(drivers_with_completed_df, on=F.col("driver_id") == F.col("master_driver_id"), how="left_anti")

PART D — Window Functions

In [17]:

from pyspark.sql import functions as F

# 1) Join trips with drivers, then SELECT canonical names
trips_plus_drivers_df = (
    trips_df.alias("t")
    .join(drivers_df.alias("d"),
          F.col("t.driver_id") == F.col("d.driver_id"),
          how="inner")
    .select(
        F.col("t.trip_id").alias("trip_id"),
        F.col("t.driver_id").alias("driver_id"),        # canonical
        F.col("d.driver_name").alias("driver_name"),    # canonical
        F.col("t.city").alias("city"),                  # canonical
        F.col("t.trip_date").alias("trip_date"),
        F.col("t.status").alias("status"),
        F.col("t.is_completed").alias("is_completed"),
        F.col("t.fare").alias("fare"),
        F.col("d.age").alias("age"),
        F.col("d.home_city").alias("home_city"),
        F.col("d.vehicle_types").alias("vehicle_types")
    )
)

# 2) Broadcast join with city master, keep canonical 'city' and add 'region'
trips_enriched_df = (
    trips_plus_drivers_df.alias("td")
    .join(F.broadcast(cities_df).alias("c"),
          F.col("td.city") == F.col("c.city"),
          how="left")
    .select("td.*", F.col("c.region"))  # keep all canonical cols + region
)

# 3) Completed trips view used by Part C/D
completed_trips_df = trips_enriched_df.where(
    F.col("is_completed") & F.col("fare").isNotNull()
)


revenue_per_driver_df = (
    completed_trips_df.groupBy("driver_id", "driver_name")
    .agg(F.sum("fare").alias("total_revenue"))
)



PART E — Performance Classification (Prefer built‑ins)

In [18]:

# Define thresholds (example): High >= 1000, Medium >= 500 else Low
classified_drivers_df = (
    revenue_per_driver_df
    .withColumn(
        "performance_level",
        F.when(F.col("total_revenue") >= 1000, F.lit("High"))
         .when(F.col("total_revenue") >= 500,  F.lit("Medium"))
         .otherwise(F.lit("Low"))
    )
)

# Justification:
# - Using built-ins keeps execution in Catalyst/ Tungsten, avoids Python UDF overhead, and enables better optimization.
# - A UDF is unnecessary for simple threshold logic and would introduce serialization costs.



In [20]:
#PART F



# -------------------------------------------
# Imports
# -------------------------------------------
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# -------------------------------------------
# (A) Ensure completed_trips_df exists with canonical columns
#     Required columns: trip_id, driver_id, driver_name, city, trip_date, is_completed, fare
# -------------------------------------------
# If you already have trips_enriched_df from earlier parts, keep this block.
# Else, rebuild minimal inputs from raw data for completeness.

try:
    # Quick schema check (skip rebuild if exists)
    _ = completed_trips_df.columns
except NameError:
    # Minimal rebuild using previously defined DataFrames: trips_df, drivers_df, cities_df
    # (Adjust if your variable names differ)
    trips_plus_drivers_df = (
        trips_df.alias("t")
        .join(drivers_df.alias("d"), F.col("t.driver_id") == F.col("d.driver_id"), "inner")
        .select(
            F.col("t.trip_id").alias("trip_id"),
            F.col("t.driver_id").alias("driver_id"),
            F.col("d.driver_name").alias("driver_name"),
            F.col("t.city").alias("city"),
            F.col("t.trip_date").alias("trip_date"),
            F.col("t.status").alias("status"),
            F.col("t.is_completed").alias("is_completed"),
            F.col("t.fare").alias("fare"),
            F.col("d.age").alias("age"),
            F.col("d.home_city").alias("home_city"),
            F.col("d.vehicle_types").alias("vehicle_types"),
        )
    )

    trips_enriched_df = (
        trips_plus_drivers_df.alias("td")
        .join(F.broadcast(cities_df).alias("c"), F.col("td.city") == F.col("c.city"), "left")
        .select("td.*", F.col("c.region"))
    )

    completed_trips_df = trips_enriched_df.where(
        F.col("is_completed") & F.col("fare").isNotNull()
    )

# -------------------------------------------
# (B) Part F metrics prerequisites
#     1) Total revenue per city
#     2) Revenue per driver within each city
#     3) Window spec for ranking within city
# -------------------------------------------
total_revenue_per_city_df = (
    completed_trips_df.groupBy("city")
    .agg(F.sum("fare").alias("total_revenue"))
)

revenue_per_driver_city_df = (
    completed_trips_df.groupBy("city", "driver_id", "driver_name")
    .agg(F.sum("fare").alias("city_revenue"))
)

# Window: partition by city, order by revenue desc, tie-break by driver_id
w_city = Window.partitionBy("city").orderBy(F.desc("city_revenue"), F.asc("driver_id"))

# -------------------------------------------
# (C) Part F — Sorting & Ordering
# -------------------------------------------
# 1) Sort cities by total revenue (descending)
cities_sorted_by_revenue_df = total_revenue_per_city_df.orderBy(F.desc("total_revenue"))

# 2) Sort drivers by revenue within each city
drivers_sorted_by_city_revenue_df = (
    revenue_per_driver_city_df
    .withColumn("rn", F.row_number().over(w_city))  # optional helper column
    .orderBy(F.asc("city"), F.desc("city_revenue"), F.asc("driver_id"))
).drop("rn")  # drop helper if not needed

# -------------------------------------------
# (D) Optional: show results
# -------------------------------------------
print("Cities sorted by total revenue:")
cities_sorted_by_revenue_df.show(truncate=False)

print("Drivers sorted by revenue within each city:")
drivers_sorted_by_city_revenue_df.show(truncate=False)


Cities sorted by total revenue:
+---------+-------------+
|city     |total_revenue|
+---------+-------------+
|Hyderabad|1150.0       |
|Mumbai   |620.0        |
|Delhi    |540.0        |
|Chennai  |350.0        |
+---------+-------------+

Drivers sorted by revenue within each city:
+---------+---------+-----------+------------+
|city     |driver_id|driver_name|city_revenue|
+---------+---------+-----------+------------+
|Chennai  |D005     |NULL       |350.0       |
|Delhi    |D004     |Kiran      |540.0       |
|Hyderabad|D001     |Ramesh     |1150.0      |
|Mumbai   |D003     |Anita      |620.0       |
+---------+---------+-----------+------------+



In [21]:
#PART G


# Drivers who completed trips
drivers_completed_df = completed_trips_df.select("driver_id").distinct()

# Drivers who were active (had 'login' action)
drivers_active_df = (
    activity_df.where(F.array_contains(F.col("actions"), F.lit("login")))
    .select("driver_id")
    .distinct()
)

# 1) Drivers who logged in but never completed trips
active_no_completed_df = drivers_active_df.exceptAll(drivers_completed_df)

# 2) Drivers who completed trips and were active
completed_and_active_df = drivers_completed_df.intersect(drivers_active_df)



In [24]:
#PART H



from pyspark.sql import functions as F
from pyspark.sql.window import Window

# -------------------------------------------
# (0) Ensure completed_trips_df exists with canonical columns
#     Required columns in completed_trips_df:
#     trip_id, driver_id, driver_name, city, trip_date, status, is_completed, fare, age, home_city, vehicle_types, region
# -------------------------------------------
try:
    _ = completed_trips_df.columns
except NameError:
    # Rebuild from earlier inputs (adjust names if your variables differ)
    trips_plus_drivers_df = (
        trips_df.alias("t")
        .join(drivers_df.alias("d"), F.col("t.driver_id") == F.col("d.driver_id"), "inner")
        .select(
            F.col("t.trip_id").alias("trip_id"),
            F.col("t.driver_id").alias("driver_id"),
            F.col("d.driver_name").alias("driver_name"),
            F.col("t.city").alias("city"),
            F.col("t.trip_date").alias("trip_date"),
            F.col("t.status").alias("status"),
            F.col("t.is_completed").alias("is_completed"),
            F.col("t.fare").alias("fare"),
            F.col("d.age").alias("age"),
            F.col("d.home_city").alias("home_city"),
            F.col("d.vehicle_types").alias("vehicle_types"),
        )
    )

    trips_enriched_df = (
        trips_plus_drivers_df.alias("td")
        .join(F.broadcast(cities_df).alias("c"), F.col("td.city") == F.col("c.city"), "left")
        .select("td.*", F.col("c.region"))
    )

    completed_trips_df = trips_enriched_df.where(
        F.col("is_completed") & F.col("fare").isNotNull()
    )

# -------------------------------------------
# (1) Build overall revenue per driver (prereq for ranking)
# -------------------------------------------
revenue_per_driver_df = (
    completed_trips_df
    .groupBy("driver_id", "driver_name")
    .agg(F.sum("fare").alias("total_revenue"))
)

# -------------------------------------------
# (2) Window rank: drivers by total revenue (overall)
# -------------------------------------------
w_overall = Window.orderBy(F.desc("total_revenue"), F.asc("driver_id"))
driver_rank_overall_df = revenue_per_driver_df.withColumn(
    "revenue_rank_overall", F.dense_rank().over(w_overall)
)

# -------------------------------------------
# (3) Cities sorted by total revenue (for your explain on sorting)
# -------------------------------------------
total_revenue_per_city_df = (
    completed_trips_df.groupBy("city")
    .agg(F.sum("fare").alias("total_revenue"))
)
cities_sorted_by_revenue_df = total_revenue_per_city_df.orderBy(F.desc("total_revenue"))

# -------------------------------------------
# (4) EXPLAIN requests
# -------------------------------------------
print("A) Join with city master (broadcast) — physical plan:")
trips_enriched_df.explain(True)

print("\nB) Window ranking — physical plan:")
driver_rank_overall_df.explain(True)

print("\nC) Sorting (global) — physical plan:")
cities_sorted_by_revenue_df.explain(True)  # Expect SortExec + Exchange (global orderBy triggers shuffle)





A) Join with city master (broadcast) — physical plan:
== Parsed Logical Plan ==
'Project [td.*, 'c.region]
+- Join LeftOuter, (city#189 = city#5)
   :- SubqueryAlias td
   :  +- Project [trip_id#7 AS trip_id#186, driver_id#8 AS driver_id#187, driver_name#34 AS driver_name#188, city#9 AS city#189, trip_date#35 AS trip_date#190, status#11 AS status#191, is_completed#37 AS is_completed#192, fare#36 AS fare#193, age#30 AS age#194, home_city#3 AS home_city#195, vehicle_types#31 AS vehicle_types#196]
   :     +- Join Inner, (driver_id#8 = driver_id#0)
   :        :- SubqueryAlias t
   :        :  +- Project [trip_id#7, driver_id#8, city#9, trip_date#35, status#11, is_completed#37, fare#36]
   :        :     +- Project [trip_id#7, driver_id#8, city#9, trip_date_raw#10, status#11, fare_raw#12, trip_date#35, fare#36, (lower(status#11) = completed) AS is_completed#37]
   :        :        +- Project [trip_id#7, driver_id#8, city#9, trip_date_raw#10, status#11, fare_raw#12, trip_date#35, cast(far

In [25]:

completed_by_city_df = completed_trips_df.repartition("city").cache()


total_revenue_per_city_df = (
    completed_by_city_df.groupBy("city").agg(F.sum("fare").alias("total_revenue"))
)
running_revenue_per_city_df = (
    completed_by_city_df.where(F.col("trip_date").isNotNull())
    .groupBy("city", "trip_date").agg(F.sum("fare").alias("daily_revenue"))
    .withColumn("running_revenue",
        F.sum("daily_revenue").over(Window.partitionBy("city").orderBy("trip_date").rowsBetween(Window.unboundedPreceding, Window.currentRow))
    )
)
