# Project 2

## Setup

In [1]:
from pyspark.sql import SparkSession
import time
from delta import *
from pyspark.sql.functions import from_json, col, floor, window, concat_ws
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

In [2]:
# Initialize SparkSession with the Kafka JAR
spark = SparkSession.builder \
    .appName("KafkaTaxiStream") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
print("Spark Session created successfully!")

Spark Session created successfully!


In [3]:
# Define Schema for Incoming Data
schema = StructType() \
    .add("medallion", StringType()) \
    .add("hack_license", StringType()) \
    .add("pickup_datetime", TimestampType()) \
    .add("dropoff_datetime", TimestampType()) \
    .add("trip_time_in_secs", DoubleType()) \
    .add("trip_distance", DoubleType()) \
    .add("pickup_longitude", DoubleType()) \
    .add("pickup_latitude", DoubleType()) \
    .add("dropoff_longitude", DoubleType()) \
    .add("dropoff_latitude", DoubleType()) \
    .add("payment_type", StringType()) \
    .add("fare_amount", DoubleType()) \
    .add("surcharge", DoubleType()) \
    .add("mta_tax", DoubleType()) \
    .add("tip_amount", DoubleType()) \
    .add("tolls_amount", DoubleType())

In [4]:
# Read Kafka stream
taxi_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "taxi-trips") \
    .option("startingOffsets", "earliest") \
    .load()

# Parse JSON
parsed_taxi_stream = taxi_stream.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

# Query 0: Data Cleansing and setup

In [5]:
# remove malformed and invalid data
cleaned_taxi_stream = parsed_taxi_stream \
    .filter("medallion IS NOT NULL AND hack_license IS NOT NULL") \
    .filter("pickup_longitude != 0.0 AND pickup_latitude != 0.0") \
    .filter("dropoff_longitude != 0.0 AND dropoff_latitude != 0.0")

In [6]:
query = cleaned_taxi_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("taxi_trips_cleaned") \
    .start()

In [7]:
# View cleansed stream in notebook
time.sleep(1)
spark.sql("SELECT * FROM taxi_trips_cleaned").show(truncate=False)

+---------+------------+---------------+----------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+
|medallion|hack_license|pickup_datetime|dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|
+---------+------------+---------------+----------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+
+---------+------------+---------------+----------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+



# Query 1: Frequent Routes

## Part 1

In [8]:
from pyspark.sql.functions import udf

In [9]:
reference_lat = 41.474937
reference_lon = -74.913585
total_cells = 300

# Cell sizes from http://www.debs2015.org/call-grand-challenge.html
cell_size_lat_deg = 0.004491556  # 500m south
cell_size_lon_deg = 0.005986     # 500m east

In [10]:
# Main function to calculate how far the cell location is from the origin
def get_cell_id(lat, lon):
    if lat is None or lon is None:
        return None
    try:
        dx = int((lon - reference_lon) / cell_size_lon_deg) + 1 # how many cells east
        dy = int((reference_lat - lat) / cell_size_lat_deg) + 1 # how many cells south
        if 1 <= dx <= total_cells and 1 <= dy <= total_cells: # validate
            return f"{dx}.{dy}"
        else:
            return None
    except:
        return None

# create spark udf
get_cell_udf = udf(get_cell_id, StringType())

In [11]:
# Take cleaned taxi stream data and convert pickup and dropoff locations into start and end cell IDs
stream_with_cells = cleaned_taxi_stream \
    .withColumn("start_cell_id", get_cell_udf("pickup_latitude", "pickup_longitude")) \
    .withColumn("end_cell_id", get_cell_udf("dropoff_latitude", "dropoff_longitude")) \
    .filter("start_cell_id IS NOT NULL AND end_cell_id IS NOT NULL")

In [12]:
# Count the number of rides for each route in the last 30 minutes 
frequent_routes = stream_with_cells \
    .withWatermark("dropoff_datetime", "30 minutes") \
    .groupBy(
        window(col("dropoff_datetime"), "30 minutes"),
        col("start_cell_id"),
        col("end_cell_id")
    ) \
    .count() \
    .select("start_cell_id", "end_cell_id", "count") \
    .orderBy(col("count").desc())

In [13]:
# store results
frequent_routes.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("top_routes") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f6b8ade6ad0>

In [14]:
#time.sleep(15)
spark.sql("SELECT * FROM top_routes LIMIT 10").show(truncate=False)

+-------------+-----------+-----+
|start_cell_id|end_cell_id|count|
+-------------+-----------+-----+
+-------------+-----------+-----+



## Part 2

# Query 2: Profitable Areas

## Part 1

In [15]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import window, max as max_

In [16]:
# divide query 1 degrees by 2
cell_size_lat_deg = 0.002245778   # 250m south
cell_size_lon_deg = 0.002993      # 250m east
total_cells = 600

get_cell_udf = udf(get_cell_id, StringType())

In [17]:
from pyspark.sql.functions import col, expr, window

# Add profit and cell columns
taxi_stream_v2 = cleaned_taxi_stream \
    .withColumn("pickup_time", col("pickup_datetime")) \
    .withColumn("dropoff_time", col("dropoff_datetime")) \
    .withColumn("profit", col("fare_amount") + col("tip_amount")) \
    .withColumn("pickup_cell", get_cell_udf("pickup_latitude", "pickup_longitude")) \
    .withColumn("dropoff_cell", get_cell_udf("dropoff_latitude", "dropoff_longitude"))

In [20]:
from pyspark.sql.functions import col, expr, window, unix_timestamp

# Watermarked dropoffs and pickups
dropoffs = taxi_stream_v2 \
    .filter("dropoff_cell IS NOT NULL") \
    .withWatermark("dropoff_time", "30 minutes") \
    .selectExpr("medallion AS medallion_drop", "dropoff_time", "dropoff_cell")

pickups = taxi_stream_v2 \
    .filter("pickup_cell IS NOT NULL") \
    .withWatermark("pickup_time", "30 minutes") \
    .selectExpr("medallion AS medallion_pick", "pickup_time")


# Left join to simulate left_anti
joined_dropoffs = dropoffs.join(
    pickups,
    expr("""
        medallion_drop = medallion_pick AND
        pickup_time > dropoff_time AND
        pickup_time <= dropoff_time + interval 30 minutes
    """),
    "leftOuter"
)

In [21]:
# Writing dropoffs to memory
dropoffs_query = dropoffs.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("dropoffs") \
    .start()

# Writing pickups to memory
pickups_query = pickups.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("pickups") \
    .start()

# Print the results
joined_query = joined_dropoffs.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("joined") \
    .start()

In [23]:
recent_dropoffs = joined_dropoffs \
    .filter((unix_timestamp(current_timestamp()) - unix_timestamp("dropoff_time") <= 1800) & col("pickup_time").isNull()) \
    .select("dropoff_time", "dropoff_cell")

# Step 2: Group by dropoff cell and apply window to count the empty taxis
empty_taxi_count = recent_dropoffs \
    .groupBy(
        window("dropoff_time", "15 minutes", "1 minute"),  # 15-minute window with 1-minute slide
        col("dropoff_cell").alias("cell_id")
    ) \
    .agg(expr("count(*) AS empty_taxis"))

In [24]:
recent_dropoffs = recent_dropoffs.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("recent_dropoffs") \
    .start()

empty_taxi_count_query = empty_taxi_count.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("empty_taxi_count") \
    .start()

+------------+------------+
|dropoff_time|dropoff_cell|
+------------+------------+
+------------+------------+

+------+-------+-----------+
|window|cell_id|empty_taxis|
+------+-------+-----------+
+------+-------+-----------+



In [26]:
profit_stream = taxi_stream_v2 \
    .filter("pickup_cell IS NOT NULL") \
    .withWatermark("dropoff_time", "20 minutes") \
    .groupBy(
        window("dropoff_time", "15 minutes", "1 minute"),
        col("pickup_cell").alias("cell_id")
    ).agg(expr("percentile_approx(profit, 0.5) AS median_profit"))

# Join and FLATTEN the window for Delta
profitability_stream = profit_stream.join(
    empty_taxi_count,
    ["window", "cell_id"]
).withColumn(
    "profitability", col("median_profit") / col("empty_taxis")
).select(
    col("window.start").cast("timestamp").alias("pickup_datetime"),
    col("window.end").cast("timestamp").alias("dropoff_datetime"),
    "cell_id",
    "empty_taxis",
    "median_profit",
    "profitability"
)

def write_batch_to_delta(batch_df, batch_id):
    if batch_df.isEmpty():
        print(f"[Batch {batch_id}] â›” Skipped empty batch")
        return

    # Only write if DataFrame has records
    batch_df.select(
        col("window.start").alias("pickup_datetime"),
        col("window.end").alias("dropoff_datetime"),
        "cell_id",
        "empty_taxis",
        "median_profit",
        "profitability"
    ).write \
     .format("delta") \
     .mode("append") \
     .save("./output/profitable_areas_delta")


# Start the stream
query = profitability_stream.writeStream \
    .outputMode("append") \
    .option("checkpointLocation", "./checkpoints/profitable_areas") \
    .foreachBatch(write_batch_to_delta) \
    .start()

In [27]:
query.isActive  # Should return True
query.lastProgress  # Shows the most recent batch info (after one has run)
query.status  # Will tell you "Waiting for data" or "Processing new data"


{'message': 'Initializing sources',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [28]:
spark.sql("""
    SELECT pickup_latitude, pickup_longitude
    FROM taxi_trips_cleaned
    WHERE pickup_latitude IS NOT NULL AND pickup_longitude IS NOT NULL
    LIMIT 5
""").show(truncate=False)


+---------------+----------------+
|pickup_latitude|pickup_longitude|
+---------------+----------------+
+---------------+----------------+



In [29]:
dropoffs.writeStream \
    .format("console") \
    .option("truncate", False) \
    .start()

pickups.writeStream \
    .format("console") \
    .option("truncate", False) \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f6c1d1e32d0>

In [30]:
df = spark.read.format("delta").load("./output/profitable_areas_delta")
df.orderBy("profitability", ascending=False).show()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: output/profitable_areas_delta.

In [None]:
spark.sql("DROP TABLE IF EXISTS profitable_areas")
spark.sql("""
    CREATE TABLE profitable_areas
    USING DELTA
    LOCATION './output/profitable_areas_delta'
""")
spark.sql("SELECT * FROM profitable_areas ORDER BY profitability DESC LIMIT 10").show()


In [None]:
# # Create profit stream steps by calculating start cell, profit, apply 15-min watermark and compute median profit.
# profit_stream = cleaned_taxi_stream \
#     .withColumn("start_cell_id", get_cell_udf("pickup_latitude", "pickup_longitude")) \
#     .filter("start_cell_id IS NOT NULL") \
#     .withColumn("profit", when((col("fare_amount") >= 0) & (col("tip_amount") >= 0), col("fare_amount") + col("tip_amount"))) \
#     .withWatermark("dropoff_datetime", "15 minutes") \
#     .groupBy(window("dropoff_datetime", "15 minutes"), col("start_cell_id")) \
#     .agg(
#         expr("percentile_approx(profit, 0.5)").alias("median_profit"),
#         expr("first(pickup_datetime)").alias("pickup_datetime"),
#         expr("first(dropoff_datetime)").alias("dropoff_datetime")
#     ) \
#     .selectExpr("start_cell_id as cell_id", "median_profit", "window.end as profit_window_end", "pickup_datetime", "dropoff_datetime")

# profit_stream.writeStream \
#     .format("memory") \
#     .queryName("profit_stream") \
#     .outputMode("complete") \
#     .trigger(processingTime="1 seconds") \
#     .start()

In [None]:
# # Create empty_taxies_stream by saving pickup table 
# cleaned_taxi_stream \
#     .select("medallion", "pickup_datetime") \
#     .filter("medallion IS NOT NULL") \
#     .withWatermark("pickup_datetime", "30 minutes") \
#     .writeStream \
#     .format("memory") \
#     .queryName("pickup_table") \
#     .outputMode("append") \
#     .start()

# pickup_static = spark.read.table("pickup_table")

# # get all drop off in last 30 minutes
# dropoffs = cleaned_taxi_stream \
#     .withColumn("dropoff_cell_id", get_cell_udf("dropoff_latitude", "dropoff_longitude")) \
#     .filter("dropoff_cell_id IS NOT NULL") \
#     .select("medallion", "dropoff_datetime", "dropoff_cell_id") \
#     .withWatermark("dropoff_datetime", "30 minutes")

# # calculate empty taxies based on datetimes
# empty_taxis = dropoffs.alias("d").join(
#     pickup_static.alias("p"),
#     (col("d.medallion") == col("p.medallion")) & (col("p.pickup_datetime") > col("d.dropoff_datetime")),
#     how="left_anti"
# )

# # calculate empty taxes per cell
# empty_taxis_per_cell = empty_taxis \
#     .groupBy(window("dropoff_datetime", "30 minutes"), col("dropoff_cell_id")) \
#     .agg(expr("approx_count_distinct(medallion)").alias("empty_taxis")) \
#     .selectExpr("dropoff_cell_id as cell_id", "empty_taxis", "window.end as empty_window_end")

# #read to memory
# empty_taxis_per_cell.writeStream \
#     .format("memory") \
#     .queryName("empty_taxis_stream") \
#     .outputMode("complete") \
#     .trigger(processingTime="30 seconds") \
#     .start()

In [None]:
# from pyspark.sql.functions import col, expr, window

# # Prepare the pickup stream
# pickup_stream = cleaned_taxi_stream \
#     .select("medallion", "pickup_datetime") \
#     .filter("medallion IS NOT NULL") \
#     .withWatermark("pickup_datetime", "30 minutes")

# # Prepare the dropoff stream with cell ID
# dropoff_stream = cleaned_taxi_stream \
#     .withColumn("dropoff_cell_id", get_cell_udf("dropoff_latitude", "dropoff_longitude")) \
#     .filter("dropoff_cell_id IS NOT NULL") \
#     .select("medallion", "dropoff_datetime", "dropoff_cell_id") \
#     .withWatermark("dropoff_datetime", "30 minutes")

# # Perform stream-stream join to find matching pickups (future pickups after a dropoff)
# dropoff_with_future_pickups = dropoff_stream.alias("d").join(
#     pickup_stream.alias("p"),
#     (col("d.medallion") == col("p.medallion")) &
#     (col("p.pickup_datetime") > col("d.dropoff_datetime")) &
#     (col("p.pickup_datetime") <= expr("d.dropoff_datetime + interval 30 minutes")),
#     "inner"
# )

# # Find dropoffs without future pickups (anti-join)
# dropoff_stream_aliased = dropoff_stream.alias("d")
# pickup_stream_aliased = pickup_stream.alias("p")

# empty_taxis = dropoff_stream_aliased.join(
#     pickup_stream_aliased,
#     (col("d.medallion") == col("p.medallion")) &
#     (col("p.pickup_datetime") > col("d.dropoff_datetime")) &
#     (col("p.pickup_datetime") <= expr("d.dropoff_datetime + interval 30 minutes")),
#     how="leftOuter"
# ).filter(col("p.pickup_datetime").isNull())  # Unambiguous now



# # Count distinct empty taxis per dropoff cell in 30-minute windows
# empty_taxis_per_cell = empty_taxis \
#     .groupBy(
#         window("dropoff_datetime", "30 minutes"),
#         col("dropoff_cell_id")
#     ) \
#     .agg(expr("approx_count_distinct(medallion)").alias("empty_taxis")) \
#     .selectExpr(
#         "dropoff_cell_id as cell_id",
#         "empty_taxis",
#         "window.end as empty_window_end"
#     )

# # Write to memory for query
# empty_taxis_per_cell.writeStream \
#     .format("memory") \
#     .queryName("empty_taxis_stream") \
#     .outputMode("append") \
#     .trigger(processingTime="1 seconds") \
#     .start()


In [None]:
# # process the patches
# def process_batch_part1(_, __): # skip parameters
#     spark.sql("""
#         SELECT 
#             p.pickup_datetime, 
#             p.dropoff_datetime, 
#             p.cell_id AS profitable_cell_id,
#             e.empty_taxis AS empty_taxies_in_cell, 
#             p.median_profit AS median_profit_in_cell,
#         CASE 
#             WHEN e.empty_taxis = 0 THEN NULL 
#             ELSE p.median_profit / e.empty_taxis 
#             END AS profitability_of_cell
#         FROM profit_stream p
#         JOIN empty_taxis_stream e ON p.cell_id = e.cell_id

#     """).show(truncate=False, n=50)

# cleaned_taxi_stream.selectExpr("CAST(NULL AS STRING) as test") \
#     .writeStream \
#     .foreachBatch(process_batch_part1) \
#     .outputMode("update") \
#     .trigger(processingTime="10 seconds") \
#     .option("checkpointLocation", "/tmp/part1_checkpoint") \
#     .start()


## Part 2

In [None]:
# import
from pyspark.sql.functions import col
import time

In [None]:
def process_batch_profit(df, epoch_id):
    if df.isEmpty():
        return

    import time
    from pyspark.sql.functions import col

    # Get current time in ms
    output_time = time.time() * 1000

    # Get latest record for delay calculation
    latest_event = df.orderBy(col("processing_time").desc()).first()
    delay = output_time - latest_event["processing_time"]

    # Get top 10 profitable areas
    top_areas = spark.sql("""
        SELECT cell_id, empty_taxis, median_profit, profitability
        FROM profitable_areas
        ORDER BY profitability DESC
        LIMIT 10
    """).collect()

    # Construct result row
    result = {
        "pickup_datetime": latest_event["pickup_datetime"],
        "dropoff_datetime": latest_event["dropoff_datetime"],
        "delay": delay
    }

    for i in range(10):
        if i < len(top_areas):
            result[f"cell_id_{i+1}"] = top_areas[i]["cell_id"]
            result[f"empty_taxis_in_cell_{i+1}"] = top_areas[i]["empty_taxis"]
            result[f"median_profit_in_cell_{i+1}"] = top_areas[i]["median_profit"]
            result[f"profitability_of_cell_{i+1}"] = top_areas[i]["profitability"]
        else:
            result[f"cell_id_{i+1}"] = None
            result[f"empty_taxis_in_cell_{i+1}"] = None
            result[f"median_profit_in_cell_{i+1}"] = None
            result[f"profitability_of_cell_{i+1}"] = None

    # Convert to DataFrame and write to Delta Lake
    result_df = spark.createDataFrame([result])
    
    result_df.write \
        .format("delta") \
        .mode("overwrite") \
        .save("/mnt/output/top_profitable_areas_delta")


In [None]:
profitability_with_time.writeStream \
    .foreachBatch(process_batch_profit) \
    .outputMode("append") \
    .trigger(processingTime="1 second") \
    .start()


In [None]:
top10_df = spark.read.format("delta").load("/mnt/output/top_profitable_areas_delta")
top10_df.show(truncate=False)
