# Environment prep

In [1]:
!pip install pyspark

Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j
Successfully installed py4j-0.10.9.7


In [2]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.1.4-py2.py3-none-any.whl.metadata (9.1 kB)
Downloading kafka_python-2.1.4-py2.py3-none-any.whl (276 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.1.4


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, udf, count, rank, lit, when, struct, collect_list, median, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
import math
import time

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("BDM_project_2(DEBS2015-TaxiChallenge)") \
    .getOrCreate()

print("Spark version:", spark.version)

Spark version: 3.5.3


In [4]:
# Define schema for taxi data
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True)
])

# Data cleaning (Query 0)

In [5]:
data_path = "./sorted_data_1gb"
df = spark.read.csv(data_path, schema = schema, header = True)

cleaned_df = df.filter(
    (col("pickup_longitude").isNotNull()) &
    (col("pickup_latitude").isNotNull()) &
    (col("dropoff_longitude").isNotNull()) &
    (col("dropoff_latitude").isNotNull()) &
    (col("pickup_longitude") != 0) &
    (col("pickup_latitude") != 0) &
    (col("dropoff_longitude") != 0) &
    (col("dropoff_latitude") != 0) &
    (col("medallion").isNotNull()) &
    (col("hack_license").isNotNull()) &
    (col("trip_time_in_secs") > 0) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0)
)

print("Query 0 - Sample of Cleaned Data:")
cleaned_df.show(5)

Query 0 - Sample of Cleaned Data:
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+
|945F1F65FAA293DA1...|D7421B620BD448E6B...|2013-02-17 16:13:00|2013-02-17 16:23:00|              600|         1.31|      -73.974167|      40.753681|       -73.990501|       40.751266|         CRD|        7.5|     

In [6]:
def get_grid_cell(lat, lon, cell_size_m):
    if lat is None or lon is None:
        return None
    origin_lat, origin_lon = 41.474937, -74.913585
    lat_per_cell = cell_size_m / 111000.0
    lon_per_cell = cell_size_m / (111000.0 * math.cos(math.radians(origin_lat)))
    lat_offset = math.floor((lat - origin_lat) / lat_per_cell) + 1
    lon_offset = math.floor((lon - origin_lon) / lon_per_cell) + 1
    return f"{lat_offset}.{lon_offset}"

grid_cell_500_udf = udf(lambda lat, lon: get_grid_cell(lat, lon, 500), StringType())
grid_cell_250_udf = udf(lambda lat, lon: get_grid_cell(lat, lon, 250), StringType())

# Grid transformation
df_grid = cleaned_df \
    .withColumn("pickup_grid_500", grid_cell_500_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_500", grid_cell_500_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
    .withColumn("pickup_grid_250", grid_cell_250_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_250", grid_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
    .filter(col("pickup_grid_500").isNotNull() & col("dropoff_grid_500").isNotNull() &
            col("pickup_grid_250").isNotNull() & col("dropoff_grid_250").isNotNull())

print("Grid Cell Sample:")
df_grid.select("pickup_grid_500", "dropoff_grid_500", "pickup_grid_250", "dropoff_grid_250").show(5)

Grid Cell Sample:
+---------------+----------------+---------------+----------------+
|pickup_grid_500|dropoff_grid_500|pickup_grid_250|dropoff_grid_250|
+---------------+----------------+---------------+----------------+
|       -160.157|        -160.154|       -320.313|        -321.308|
|       -160.157|        -160.155|       -320.314|        -321.309|
|       -156.158|        -166.152|       -312.316|        -332.303|
|       -152.157|        -159.155|       -304.314|        -319.309|
|       -160.156|        -158.157|       -320.311|        -316.314|
+---------------+----------------+---------------+----------------+
only showing top 5 rows



# Query 1 - Part 1

In [8]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a fixed reference timestamp
reference_time = df_grid.agg(F.max("dropoff_datetime")).collect()[0][0]

# Filter the data for trips that occurred in the last 30 minutes relative to the reference time
time_limit = F.lit(reference_time) - F.expr("INTERVAL 30 MINUTES")

# Filter the DataFrame to include only trips completed in the last 30 minutes from the reference time
df_recent = df_grid.filter(F.col("dropoff_datetime") >= time_limit)

# Perform the groupBy aggregation to find frequent routes within the last 30 minutes
route_counts = df_recent \
    .groupBy("pickup_grid_500", "dropoff_grid_500") \
    .agg(F.count("*").alias("ride_count"))

# Rank routes by their frequency
window_spec = Window.orderBy(F.col("ride_count").desc())

# Apply the window function to rank the routes
top_10_routes_static = route_counts \
    .withColumn("rank", F.rank().over(window_spec)) \
    .filter(F.col("rank") <= 10)

# Select only the relevant columns for output: start cell, end cell, and number of rides
top_10_routes_static = top_10_routes_static.select(
    F.col("pickup_grid_500").alias("start_cell"),
    F.col("dropoff_grid_500").alias("end_cell"),
    "ride_count"
)

# Show the top 10 frequent routes
top_10_routes_static.show(10)

+----------+--------+----------+
|start_cell|end_cell|ride_count|
+----------+--------+----------+
|  -156.160|-159.154|         2|
|  -154.155|-153.157|         2|
|  -153.160|-157.158|         2|
|  -149.158|-156.155|         1|
|  -157.157|-153.156|         1|
|  -170.151|-157.160|         1|
|  -149.158|-148.159|         1|
|  -156.175|-165.153|         1|
|  -154.161|-162.156|         1|
|  -157.159|-157.157|         1|
+----------+--------+----------+
only showing top 10 rows



# Query 1 - Part 2

In [9]:
df_grid = df_grid.withColumn("event_time", col("dropoff_datetime"))

# Apply watermarking to track events dynamically
windowed_routes = df_grid \
    .withWatermark("event_time", "30 minutes") \
    .groupBy(window("event_time", "30 minutes"), "pickup_grid_500", "dropoff_grid_500") \
    .agg(count("*").alias("ride_count"))

# Rank routes dynamically within each 30-minute window
window_spec = Window.partitionBy("window").orderBy(col("ride_count").desc())

top_routes = windowed_routes \
    .withColumn("rank", rank().over(window_spec)) \
    .filter(col("rank") <= 10)

# Processing delay
top_routes = top_routes.withColumn("delay", (current_timestamp() - col("window.start")).cast("long"))

final_result = top_routes.select(
    col("window.start").alias("pickup_datetime"),
    col("window.end").alias("dropoff_datetime"),
    col("pickup_grid_500").alias("start_cell"),
    col("dropoff_grid_500").alias("end_cell"),
    col("delay")
)

final_result.show(10)

+-------------------+-------------------+----------+--------+---------+
|    pickup_datetime|   dropoff_datetime|start_cell|end_cell|    delay|
+-------------------+-------------------+----------+--------+---------+
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -152.157|-150.157|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -157.158|-164.153|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -144.161|-145.161|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -151.157|-134.165|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -151.160|-159.157|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -155.161|-156.161|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -156.154|-165.154|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -156.158|-150.159|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -156.158|-167.159|386371750|
|2013-01-01 02:00:00|2013-01-01 02:30:00|  -157.155|-159.158|386371750|
+-------------------+-------------------+----------+--------+---

# Query 2 Part 1

In [11]:
from pyspark.sql import Window
from pyspark.sql.functions import col, median, count, when, rank

# Add profit column and 250m grid cells
profit_df = cleaned_df \
    .withColumn("profit", col("fare_amount") + col("tip_amount")) \
    .withColumn("pickup_grid_250", grid_cell_250_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_250", grid_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
    .filter(col("pickup_grid_250").isNotNull() & col("dropoff_grid_250").isNotNull())

median_profit = profit_df \
    .groupBy("pickup_grid_250") \
    .agg(median("profit").alias("median_profit"))

empty_taxis = profit_df \
    .groupBy("medallion", "dropoff_grid_250", "dropoff_datetime") \
    .agg(count("*").alias("trip_count")) \
    .groupBy("dropoff_grid_250") \
    .agg(count("medallion").alias("empty_taxis"))

profitability = median_profit \
    .join(empty_taxis, median_profit.pickup_grid_250 == empty_taxis.dropoff_grid_250, "left_outer") \
    .na.fill({"empty_taxis": 0}) \
    .withColumn("profitability",
                when(col("empty_taxis") > 0, col("median_profit") / col("empty_taxis"))
                .otherwise(col("median_profit"))) \
    .select(col("pickup_grid_250").alias("cell_id"), "empty_taxis", "median_profit", "profitability")

window_spec_profit = Window.orderBy(col("profitability").desc())
top_10_profit_static = profitability \
    .withColumn("rank", rank().over(window_spec_profit)) \
    .filter(col("rank") <= 10) \
    .select("cell_id", "empty_taxis", "median_profit", "profitability", "rank")

# Show results
print("Query 2 Part 1 - Top 10 Profitable Areas:")
top_10_profit_static.show(10)

Query 2 Part 1 - Top 10 Profitable Areas:
+--------+-----------+-------------+-------------+----+
| cell_id|empty_taxis|median_profit|profitability|rank|
+--------+-----------+-------------+-------------+----+
|-128.590|          0|       504.77|       504.77|   1|
|-386.228|          0|        300.0|        300.0|   2|
|-256.372|          0|        296.4|        296.4|   3|
|-173.480|          1|        288.0|        288.0|   4|
|-365.113|          1|        288.0|        288.0|   4|
| -387.75|          0|        273.0|        273.0|   6|
|-157.512|          0|        270.0|        270.0|   7|
|-332.283|          1|        260.0|        260.0|   8|
|-307.454|          0|        242.0|        242.0|   9|
|-186.456|          0|        241.2|        241.2|  10|
+--------+-----------+-------------+-------------+----+



# Query 2 Part 2

In [17]:
previous_top_10_profit = []

from pyspark.sql import Window
from pyspark.sql.functions import col, lit, when, rank, expr, current_timestamp, unix_timestamp, lead
from pyspark.sql import functions as F
from datetime import datetime  # ✅ Needed for delay calculation


def process_profitability_batch(batch_df, batch_id):
    global previous_top_10_profit
    if batch_df.isEmpty():
        return

    # Get latest timestamp
    latest_time = batch_df.agg(F.max("dropoff_datetime")).collect()[0][0]

    # Calculate profit
    profit_df = batch_df \
        .withColumn("profit", col("fare_amount") + col("tip_amount")) \
        .withColumn("pickup_grid_250", grid_cell_250_udf(col("pickup_latitude"), col("pickup_longitude"))) \
        .withColumn("dropoff_grid_250", grid_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
        .filter(col("pickup_grid_250").isNotNull() & col("dropoff_grid_250").isNotNull())

    # Profit filter (last 15 mins)
    profit_df_filtered = profit_df.filter(col("pickup_datetime") >= (lit(latest_time) - expr("INTERVAL 15 MINUTES")))

    # Median profit
    median_profit = profit_df_filtered.groupBy("pickup_grid_250") \
        .agg(F.expr("percentile_approx(profit, 0.5)").alias("median_profit"))

    # Empty taxis = dropoffs in last 30 mins with no next pickup
    empty_df = profit_df \
        .filter(col("dropoff_datetime") >= (lit(latest_time) - expr("INTERVAL 30 MINUTES"))) \
        .withColumn("next_pickup", lead("pickup_datetime").over(Window.partitionBy("medallion").orderBy("dropoff_datetime"))) \
        .filter(col("next_pickup").isNull() | (col("next_pickup") > col("dropoff_datetime"))) \
        .groupBy("dropoff_grid_250") \
        .agg(F.countDistinct("medallion").alias("empty_taxis"))

    # Join profit + empty taxis → profitability
    profitability = median_profit \
        .join(empty_df, median_profit.pickup_grid_250 == empty_df.dropoff_grid_250, "left_outer") \
        .na.fill({"empty_taxis": 0}) \
        .withColumn("profitability", 
            when(col("empty_taxis") > 0, col("median_profit") / col("empty_taxis"))
            .otherwise(col("median_profit"))) \
        .select(col("pickup_grid_250").alias("cell_id"), "empty_taxis", "median_profit", "profitability")

    # Top 10 by profitability
    ranked = profitability.withColumn("rank", rank().over(Window.orderBy(col("profitability").desc()))) \
        .filter(col("rank") <= 10).orderBy("rank").collect()

    # Check if top 10 changed
    current_top_10 = [r["cell_id"] for r in ranked]
    if current_top_10 != previous_top_10_profit:
        previous_top_10_profit = current_top_10

        # Format output
        output = [str(latest_time)] * 2  # pickup_datetime, dropoff_datetime
        for i in range(10):
            if i < len(ranked):
                r = ranked[i]
                output.extend([
                    r["cell_id"],
                    str(r["empty_taxis"]),
                    str(round(r["median_profit"], 2)),
                    str(round(r["profitability"], 4))
                ])
            else:
                output.extend(["NULL", "NULL", "NULL", "NULL"])

        # Compute delay (simplified for demo)
        delay = int((datetime.now() - latest_time).total_seconds())
        output.append(str(delay))

        print("Updated Top 10 Profitable Areas:")
        print(",".join(output))


In [18]:
stream_df = spark.readStream \
    .option("header", False) \
    .schema(schema) \
    .csv("sorted_data_1gb/")


In [None]:
query2_stream = stream_df.writeStream \
    .foreachBatch(process_profitability_batch) \
    .outputMode("append") \
    .start()

query2_stream.awaitTermination()