# Project 2

## Setup

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, floor, window, concat_ws
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

In [2]:
# Initialize SparkSession with the Kafka JAR
spark = SparkSession.builder \
    .appName("KafkaTaxiStream") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
print("Spark Session created successfully!")

Spark Session created successfully!


In [3]:
json_schema = StructType() \
    .add("0", StringType()) \
    .add("1", StringType()) \
    .add("2", TimestampType()) \
    .add("3", TimestampType()) \
    .add("4", DoubleType()) \
    .add("5", DoubleType()) \
    .add("6", DoubleType()) \
    .add("7", DoubleType()) \
    .add("8", DoubleType()) \
    .add("9", DoubleType()) \
    .add("10", StringType()) \
    .add("11", DoubleType()) \
    .add("12", DoubleType()) \
    .add("13", DoubleType()) \
    .add("14", DoubleType()) \
    .add("15", DoubleType()) \
    .add("16", DoubleType())

# Define Schema for Incoming Data
schema = StructType() \
    .add("medallion", StringType()) \
    .add("hack_license", StringType()) \
    .add("pickup_datetime", TimestampType()) \
    .add("dropoff_datetime", TimestampType()) \
    .add("trip_time_in_secs", DoubleType()) \
    .add("trip_distance", DoubleType()) \
    .add("pickup_longitude", DoubleType()) \
    .add("pickup_latitude", DoubleType()) \
    .add("dropoff_longitude", DoubleType()) \
    .add("dropoff_latitude", DoubleType()) \
    .add("payment_type", StringType()) \
    .add("fare_amount", DoubleType()) \
    .add("surcharge", DoubleType()) \
    .add("mta_tax", DoubleType()) \
    .add("tip_amount", DoubleType()) \
    .add("tolls_amount", DoubleType()) \
    .add("total_amount", DoubleType())

In [4]:
raw_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "taxi-trips") \
    .option("startingOffsets", "earliest") \
    .load()


parsed_taxi_stream = raw_df.selectExpr("CAST(value AS STRING) as json_data") \
    .select(from_json("json_data", json_schema).alias("data")) \
    .select("data.*") \
    .withColumnRenamed("0", "medallion") \
    .withColumnRenamed("1", "hack_license") \
    .withColumnRenamed("2", "pickup_datetime") \
    .withColumnRenamed("3", "dropoff_datetime") \
    .withColumnRenamed("4", "trip_time_in_secs") \
    .withColumnRenamed("5", "trip_distance") \
    .withColumnRenamed("6", "pickup_longitude") \
    .withColumnRenamed("7", "pickup_latitude") \
    .withColumnRenamed("8", "dropoff_longitude") \
    .withColumnRenamed("9", "dropoff_latitude") \
    .withColumnRenamed("10", "payment_type") \
    .withColumnRenamed("11", "fare_amount") \
    .withColumnRenamed("12", "surcharge") \
    .withColumnRenamed("13", "mta_tax") \
    .withColumnRenamed("14", "tip_amount") \
    .withColumnRenamed("15", "tolls_amount") \
    .withColumnRenamed("16", "total_amount")

# Query 0: Data Cleansing and setup

In [5]:
# remove malformed and invalid data
cleaned_taxi_stream = parsed_taxi_stream \
    .filter("medallion IS NOT NULL AND hack_license IS NOT NULL") \
    .filter("pickup_longitude != 0.0 AND pickup_latitude != 0.0") \
    .filter("dropoff_longitude != 0.0 AND dropoff_latitude != 0.0")

In [6]:
query = cleaned_taxi_stream.writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("taxi_trips_cleaned") \
    .start()

In [28]:
# View cleansed stream in notebook
spark.sql("SELECT * FROM taxi_trips_cleaned").show(truncate=False)

+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|medallion                       |hack_license                    |pickup_datetime    |dropoff_datetime   |trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|5EE2C4D3BF57BDB455E74B03B89E43A7|E96EF8F6E6122591F9465376043B946D|2013-01-01 00:00:09|2013-01-01 00:00:36|26.0             |0.1        

# Query 1: Frequent Routes

## Part 1

In [8]:
from pyspark.sql.functions import udf

In [9]:
reference_lat = 41.474937
reference_lon = -74.913585
total_cells = 300

# Cell sizes from http://www.debs2015.org/call-grand-challenge.html
cell_size_lat_deg = 0.004491556  # 500m south
cell_size_lon_deg = 0.005986     # 500m east

In [10]:
# Main function to calculate how far the cell location is from the origin
def get_cell_id(lat, lon):
    if lat is None or lon is None:
        return None
    try:
        dx = int((lon - reference_lon) / cell_size_lon_deg) + 1 # how many cells east
        dy = int((reference_lat - lat) / cell_size_lat_deg) + 1 # how many cells south
        if 1 <= dx <= total_cells and 1 <= dy <= total_cells: # validate
            return f"{dx}.{dy}"
        else:
            return None
    except:
        return None

# create spark udf
get_cell_udf = udf(get_cell_id, StringType())

In [11]:
# Take cleaned taxi stream data and convert pickup and dropoff locations into start and end cell IDs
stream_with_cells = cleaned_taxi_stream \
    .withColumn("start_cell_id", get_cell_udf("pickup_latitude", "pickup_longitude")) \
    .withColumn("end_cell_id", get_cell_udf("dropoff_latitude", "dropoff_longitude")) \
    .filter("start_cell_id IS NOT NULL AND end_cell_id IS NOT NULL")

In [12]:
# Count the number of rides for each route in the last 30 minutes 
frequent_routes = stream_with_cells \
    .withWatermark("dropoff_datetime", "30 minutes") \
    .groupBy(
        window(col("dropoff_datetime"), "30 minutes"),
        col("start_cell_id"),
        col("end_cell_id")
    ) \
    .count() \
    .select("start_cell_id", "end_cell_id", "count") \
    .orderBy(col("count").desc())

In [13]:
# store results
frequent_routes.writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("top_routes") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f12bbdb9250>

In [27]:
spark.sql("SELECT * FROM top_routes LIMIT 10").show(truncate=False)

+-------------+-----------+-----+
|start_cell_id|end_cell_id|count|
+-------------+-----------+-----+
|156.163      |155.162    |5    |
|155.161      |155.161    |5    |
|155.159      |155.159    |4    |
|153.162      |154.161    |3    |
|155.167      |156.166    |3    |
|160.157      |161.156    |3    |
|164.160      |164.160    |3    |
|152.168      |152.167    |3    |
|157.159      |158.160    |3    |
|155.169      |155.169    |3    |
+-------------+-----------+-----+



## Part 2

In [15]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [16]:
# Function to calculate delay
def process_batch(df, epoch_id):
    # Current time for delay calculation
    output_time = time.time() * 1000  # Convert to milliseconds
    
    if df.isEmpty():
        return
    
    # Get the latest event that triggered the update
    latest_event = df.orderBy(col("processing_time").desc()).first()
    
    # Calculate delay
    delay = output_time - latest_event["processing_time"]
    
    # Get current top 10 routes
    top_routes = spark.sql("""
        SELECT start_cell_id, end_cell_id, count
        FROM frequent_routes
        ORDER BY count DESC
        LIMIT 10
    """).collect()
    
    # Prepare the result with the 10 most frequent routes
    result = {
        "pickup_datetime": latest_event["pickup_datetime"],
        "dropoff_datetime": latest_event["dropoff_datetime"],
        "delay": delay
    }
    
    # Add all 10 routes (or NULL if fewer than 10 available)
    for i in range(10):
        if i < len(top_routes):
            result[f"start_cell_id_{i+1}"] = top_routes[i]["start_cell_id"]
            result[f"end_cell_id_{i+1}"] = top_routes[i]["end_cell_id"]
        else:
            result[f"start_cell_id_{i+1}"] = None
            result[f"end_cell_id_{i+1}"] = None
    
    # Create a DataFrame with the result
    result_df = spark.createDataFrame([result])
    
    # Append to results table
    result_df.write.mode("append").saveAsTable("top_routes_results")

# Add processing time column for delay calculation - fixed syntax
cleaned_taxi_stream = cleaned_taxi_stream.withColumn(
    "processing_time", unix_timestamp() * 1000
)

# Process stream with cell IDs
stream_with_cells = cleaned_taxi_stream \
    .withColumn("start_cell_id", get_cell_udf("pickup_latitude", "pickup_longitude")) \
    .withColumn("end_cell_id", get_cell_udf("dropoff_latitude", "dropoff_longitude")) \
    .filter("start_cell_id IS NOT NULL AND end_cell_id IS NOT NULL")

# Calculate frequent routes - store in memory table
frequent_routes_query = stream_with_cells \
    .withWatermark("dropoff_datetime", "30 minutes") \
    .groupBy(
        window(col("dropoff_datetime"), "30 minutes"),
        col("start_cell_id"),
        col("end_cell_id")
    ) \
    .count() \
    .select(
        col("start_cell_id"),
        col("end_cell_id"),
        col("count")
    ) \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("frequent_routes_new") \
    .start()

# Create a trigger to monitor changes in the stream
# This will trigger the output whenever any new data comes in that could affect the top 10
change_monitor = stream_with_cells \
    .withWatermark("dropoff_datetime", "30 minutes") \
    .writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .trigger(processingTime="1 second") \
    .start()

In [17]:
# Query to view the results
spark.sql("""
    SELECT 
        pickup_datetime, dropoff_datetime,
        start_cell_id_1, end_cell_id_1,
        start_cell_id_2, end_cell_id_2,
        start_cell_id_3, end_cell_id_3,
        start_cell_id_4, end_cell_id_4,
        start_cell_id_5, end_cell_id_5,
        start_cell_id_6, end_cell_id_6,
        start_cell_id_7, end_cell_id_7,
        start_cell_id_8, end_cell_id_8,
        start_cell_id_9, end_cell_id_9,
        start_cell_id_10, end_cell_id_10,
        delay
    FROM top_routes_results
    ORDER BY dropoff_datetime DESC
    LIMIT 10
""").show(truncate=False)

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `top_routes_results` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 15 pos 9;
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Sort ['dropoff_datetime DESC NULLS LAST], true
      +- 'Project ['pickup_datetime, 'dropoff_datetime, 'start_cell_id_1, 'end_cell_id_1, 'start_cell_id_2, 'end_cell_id_2, 'start_cell_id_3, 'end_cell_id_3, 'start_cell_id_4, 'end_cell_id_4, 'start_cell_id_5, 'end_cell_id_5, 'start_cell_id_6, 'end_cell_id_6, 'start_cell_id_7, 'end_cell_id_7, 'start_cell_id_8, 'end_cell_id_8, 'start_cell_id_9, 'end_cell_id_9, 'start_cell_id_10, 'end_cell_id_10, 'delay]
         +- 'UnresolvedRelation [top_routes_results], [], false


## Part 2

# Query 2: Profitable Areas

## Part 1

In [18]:
from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import window, max as max_

In [19]:
# divide query 1 degrees by 2
cell_size_lat_deg = 0.002245778   # 250m south
cell_size_lon_deg = 0.002993      # 250m east
total_cells = 600

get_cell_udf = udf(get_cell_id, StringType())

In [20]:
profit_stream = cleaned_taxi_stream \
    .withColumn("start_cell_id", get_cell_udf("pickup_latitude", "pickup_longitude")) \
    .filter("start_cell_id IS NOT NULL") \
    .withColumn("profit", when((col("fare_amount") >= 0) & (col("tip_amount") >= 0), col("fare_amount") + col("tip_amount"))) \
    .withWatermark("dropoff_datetime", "15 minutes") \
    .groupBy(window("dropoff_datetime", "15 minutes"), col("start_cell_id")) \
    .agg(
        expr("percentile_approx(profit, 0.5)").alias("median_profit"),
        max_("pickup_datetime").alias("pickup_datetime"),
        max_("dropoff_datetime").alias("dropoff_datetime")
    ) \
    .selectExpr("start_cell_id as cell_id", "median_profit", "window.end as profit_window_end", "pickup_datetime", "dropoff_datetime")

# Write to Delta with partitioning by cell_id
profit_stream.writeStream \
    .format("delta") \
    .queryName("profit_stream") \
    .outputMode("append") \
    .partitionBy("cell_id") \
    .trigger(processingTime="30 seconds") \
    .option("checkpointLocation", "./checkpoints/profit_stream") \
    .option("path", "./output/profitable_areas_delta") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f133fce9490>

In [21]:
# Create empty_taxies_stream by saving pickup table 
cleaned_taxi_stream \
    .select("medallion", "pickup_datetime") \
    .filter("medallion IS NOT NULL") \
    .withWatermark("pickup_datetime", "30 minutes") \
    .writeStream \
    .format("memory") \
    .queryName("pickup_table") \
    .outputMode("append") \
    .start()

pickup_static = spark.read.table("pickup_table")

# get all drop off in last 30 minutes
dropoffs = cleaned_taxi_stream \
    .withColumn("dropoff_cell_id", get_cell_udf("dropoff_latitude", "dropoff_longitude")) \
    .filter("dropoff_cell_id IS NOT NULL") \
    .select("medallion", "dropoff_datetime", "dropoff_cell_id") \
    .withWatermark("dropoff_datetime", "30 minutes")

# Calculate empty taxis based on datetimes
empty_taxis = dropoffs.alias("d").join(
    pickup_static.alias("p"),
    (col("d.medallion") == col("p.medallion")) & (col("p.pickup_datetime") > col("d.dropoff_datetime")),
    how="left_anti"
)

# Calculate empty taxis per cell
empty_taxis_per_cell = empty_taxis \
    .groupBy(window("dropoff_datetime", "30 minutes"), col("dropoff_cell_id")) \
    .agg(expr("approx_count_distinct(medallion)").alias("empty_taxis")) \
    .selectExpr("dropoff_cell_id as cell_id", "empty_taxis", "window.end as empty_window_end")

# Write empty taxis to memory
empty_taxis_per_cell.writeStream \
    .format("memory") \
    .queryName("empty_taxis_stream") \
    .outputMode("complete") \
    .trigger(processingTime="30 seconds") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7f133fcefe10>

In [22]:
# Join together and process batches
def process_batch(batch_df, batch_id):
    result_df = spark.sql("""
        SELECT 
            p.pickup_datetime, p.dropoff_datetime, p.cell_id AS profitable_cell_id,
            e.empty_taxis AS empty_taxies_in_cell, 
            p.median_profit AS median_profit_in_cell,
            CASE 
                WHEN e.empty_taxis = 0 THEN NULL 
                ELSE p.median_profit / e.empty_taxis 
            END AS profitability_of_cell
        FROM profit_stream p
        JOIN empty_taxis_stream e ON p.cell_id = e.cell_id
    """)
    result_df.show(truncate=False, n=50)

    # Write the results to Delta or Parquet
    result_df.write \
        .format("parquet") \
        .mode("append") \
        .save("./output/profitable_areas_parquet")

# Write processed batch to Delta or Parquet
cleaned_taxi_stream.selectExpr("CAST(NULL AS STRING) as test") \
    .writeStream \
    .foreachBatch(process_batch) \
    .outputMode("update") \
    .trigger(processingTime="10 seconds") \
    .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7f133fce8d10>

In [23]:
# # Read the Delta table that was written
# df = spark.read.format("delta").load("./output/profitable_areas_delta")

# # Show the top rows of the Delta data
# df.show(truncate=False)

# Read the Parquet file
df = spark.read.parquet("./output/profitable_areas_parquet")

# Show the data
df.show(truncate=False)

+-------------------+-------------------+------------------+--------------------+---------------------+---------------------+
|pickup_datetime    |dropoff_datetime   |profitable_cell_id|empty_taxies_in_cell|median_profit_in_cell|profitability_of_cell|
+-------------------+-------------------+------------------+--------------------+---------------------+---------------------+
|2013-01-01 00:18:00|2013-01-01 00:22:00|316.324           |2                   |7.0                  |3.5                  |
|2013-01-01 00:26:00|2013-01-01 00:29:00|321.314           |24                  |10.5                 |0.4375               |
|2013-01-01 00:14:00|2013-01-01 00:27:00|314.304           |10                  |13.0                 |1.3                  |
|2013-01-01 00:12:39|2013-01-01 00:14:33|312.310           |15                  |5.5                  |0.36666666666666664  |
|2013-01-01 00:19:00|2013-01-01 00:28:00|303.344           |3                   |7.5                  |2.5            

## Part 2

In [24]:
# import
from pyspark.sql.functions import col
import time

In [25]:
def process_batch_profit(df, epoch_id):
    if df.isEmpty():
        return

    import time
    from pyspark.sql.functions import col

    # Get current time in ms
    output_time = time.time() * 1000

    # Get latest record for delay calculation
    latest_event = df.orderBy(col("processing_time").desc()).first()
    delay = output_time - latest_event["processing_time"]

    # Get top 10 profitable areas
    top_areas = spark.sql("""
        SELECT cell_id, empty_taxis, median_profit, profitability
        FROM profitable_areas
        ORDER BY profitability DESC
        LIMIT 10
    """).collect()

    # Construct result row
    result = {
        "pickup_datetime": latest_event["pickup_datetime"],
        "dropoff_datetime": latest_event["dropoff_datetime"],
        "delay": delay
    }

    for i in range(10):
        if i < len(top_areas):
            result[f"cell_id_{i+1}"] = top_areas[i]["cell_id"]
            result[f"empty_taxis_in_cell_{i+1}"] = top_areas[i]["empty_taxis"]
            result[f"median_profit_in_cell_{i+1}"] = top_areas[i]["median_profit"]
            result[f"profitability_of_cell_{i+1}"] = top_areas[i]["profitability"]
        else:
            result[f"cell_id_{i+1}"] = None
            result[f"empty_taxis_in_cell_{i+1}"] = None
            result[f"median_profit_in_cell_{i+1}"] = None
            result[f"profitability_of_cell_{i+1}"] = None

    # Convert to DataFrame and write to Delta Lake
    result_df = spark.createDataFrame([result])
    
    result_df.write \
        .format("delta") \
        .mode("overwrite") \
        .save("./output/top_profitable_areas_delta")

In [26]:
profitability_with_time.writeStream \
    .foreachBatch(process_batch_profit) \
    .outputMode("append") \
    .trigger(processingTime="5 second") \
    .start()


NameError: name 'profitability_with_time' is not defined

In [None]:
top10_df = spark.read.format("delta").load("./output/top_profitable_areas_delta")
top10_df.show(truncate=False)