# Project 2

### Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import max, col, unix_timestamp, lit

### Query 0 - Data Cleansing and Setup

In [3]:
spark = SparkSession.builder \
    .appName("Project") \
    .getOrCreate()

In [4]:

df_small = spark.read.csv("input/sorted_data_smaller.csv", header=False, inferSchema=True)

In [5]:
columns = [
    "medallion",
    "hack_license",
    "pickup_datetime",
    "dropoff_datetime",
    "trip_time_in_secs",
    "trip_distance",
    "pickup_longitude",
    "pickup_latitude",
    "dropoff_longitude",
    "dropoff_latitude",
    "payment_type",
    "fare_amount",
    "surcharge",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "total_amount"
]
df_small = df_small.toDF(*columns)

In [5]:
df_small.show(5)  # See the first 5 rows
df_small.printSchema()

+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|5EE2C4D3BF57BDB45...|E96EF8F6E6122591F...|2013-01-01 00:00:09|2013-01-01 00:00:36|               26|          0.1|       -73.99221|      40.725124|       -73.991646|       40.726658|         CSH|        2.5|

In [6]:

# Remove rows with missing or 0.0 coordinates
df_clean = df_small.filter(
    (col("pickup_longitude").isNotNull()) & (col("pickup_longitude") != 0.0) &
    (col("pickup_latitude").isNotNull()) & (col("pickup_latitude") != 0.0) &
    (col("dropoff_longitude").isNotNull()) & (col("dropoff_longitude") != 0.0) &
    (col("dropoff_latitude").isNotNull()) & (col("dropoff_latitude") != 0.0)
)

# Remove rows with missing medallions or licenses
df_clean = df_clean.filter(
    (col("medallion").isNotNull()) & (col("medallion") != "") &
    (col("hack_license").isNotNull()) & (col("hack_license") != "")
)
# Tme Model - this supports time-based queries
df_clean = df_clean.withColumn("pickup_ts", unix_timestamp("pickup_datetime")) \
                   .withColumn("dropoff_ts", unix_timestamp("dropoff_datetime"))

In [7]:
print("Original row count:", df_small.count())
print("Cleaned row count:", df_clean.count())

Original row count: 14432092
Cleaned row count: 14186504


### Query 1: Frequent Routes

In [8]:
import math
from pyspark.sql.functions import col, udf, desc
from pyspark.sql.types import StringType

def to_cell(lat, lon):
    if not (40.5 <= lat <= 41.8 and -74.25 <= lon <= -73.7):
        return None
    lat0 = 41.474937
    lon0 = -74.913585
    meters_per_deg_lat = 111320
    meters_per_deg_lon = 40075000 * math.cos(math.radians(lat0)) / 360
    cell_x = int((lon - lon0) * meters_per_deg_lon / 500) + 1
    cell_y = int((lat0 - lat) * meters_per_deg_lat / 500) + 1
    if 1 <= cell_x <= 300 and 1 <= cell_y <= 300:
        return f"{cell_x}.{cell_y}"
    return None

to_cell_udf = udf(to_cell, StringType())

df_filtered = df_clean.withColumn("pickup_cell", to_cell_udf(col("pickup_latitude"), col("pickup_longitude"))) \
                      .withColumn("dropoff_cell", to_cell_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
                      .filter(col("pickup_cell").isNotNull() & col("dropoff_cell").isNotNull())

routes = df_filtered.groupBy("pickup_cell", "dropoff_cell").count()

top10_routes = routes.orderBy(desc("count")).limit(10)

top10_routes.select(
    col("pickup_cell").alias("start_cell"),
    col("dropoff_cell").alias("end_cell"),
    col("count").alias("Number_of_Rides")
).show(truncate=False)


+----------+--------+---------------+
|start_cell|end_cell|Number_of_Rides|
+----------+--------+---------------+
|155.160   |154.162 |2200           |
|154.162   |155.160 |2042           |
|156.159   |154.162 |2004           |
|157.161   |154.162 |1935           |
|154.162   |156.161 |1854           |
|154.162   |156.159 |1763           |
|154.162   |157.161 |1751           |
|158.159   |157.161 |1711           |
|155.160   |156.159 |1606           |
|154.162   |155.164 |1578           |
+----------+--------+---------------+



In [9]:
### PART 2


In [15]:
from datetime import timedelta
import time

# As the data isn't a stream meaning we don't have a Kafka or something set up, this is the next best thing: It starts sliding the window to imitate 
# the streaming data that is coming and updates the routes accordingly.

# Uncomment this line to achieve longer run time with full data (i stopped at 23 minutes)
# df_sample = df_clean


df_sample = df_clean.limit(10000)

df_pd = df_sample.withColumn("pickup_cell", to_cell_udf(col("pickup_latitude"), col("pickup_longitude"))) \
                 .withColumn("dropoff_cell", to_cell_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
                 .filter(col("pickup_cell").isNotNull() & col("dropoff_cell").isNotNull()) \
                 .orderBy("dropoff_datetime") \
                 .select("pickup_datetime", "dropoff_datetime", "pickup_cell", "dropoff_cell") \
                 .collect()

print("Collected sample into memory:", len(df_pd), "rows")

window_minutes = 30
results = []
prev_top10_keys = []

for i, row in enumerate(df_pd):
    now = row["dropoff_datetime"]
    pickup_time = row["pickup_datetime"]
    window_start = now - timedelta(minutes=window_minutes)

    j = i
    while j >= 0 and df_pd[j]["dropoff_datetime"] >= window_start:
        j -= 1
    window_rows = df_pd[j+1:i+1]

    route_counts = {}
    for r in window_rows:
        key = (r["pickup_cell"], r["dropoff_cell"])
        route_counts[key] = route_counts.get(key, 0) + 1

    top10 = sorted(route_counts.items(), key=lambda x: -x[1])[:10]
    top10_keys = [route for route, _ in top10]

    if top10_keys != prev_top10_keys:
        prev_top10_keys = top10_keys.copy()

        while len(top10) < 10:
            top10.append(((None, None), None))

        delay = round(time.time() - time.mktime(now.timetuple()), 3)

        row_out = [pickup_time, now]
        for (start, end), count in top10:
            row_out.extend([start, end, count])
        row_out.append(delay)
        results.append(row_out)

        if len(results) <= 5:
            print(f"\nUpdate #{len(results)}")
            print("Pickup:", pickup_time)
            print("Dropoff:", now)
            for idx, ((s, e), c) in enumerate(top10, 1):
                print(f"  Route {idx}: {s} → {e} — {c} rides")
            print("Delay:", delay, "seconds")

print("\nDone. Total updates triggered:", len(results))

if len(results) > 5:
    print("\nLast 5 updates:")
    for row in results[-5:]:
        print(f"\nPickup: {row[0]}, Dropoff: {row[1]}")
        for i in range(2, 32, 3):
            print(f"  Route {(i - 2)//3 + 1}: {row[i]} → {row[i+1]} — {row[i+2]} rides")
        print("Delay:", row[-1], "seconds")


Collected sample into memory: 9997 rows

Update #1
Pickup: 2013-01-01 00:00:09
Dropoff: 2013-01-01 00:00:36
  Route 1: 154.167 → 154.167 — 1 rides
  Route 2: None → None — None rides
  Route 3: None → None — None rides
  Route 4: None → None — None rides
  Route 5: None → None — None rides
  Route 6: None → None — None rides
  Route 7: None → None — None rides
  Route 8: None → None — None rides
  Route 9: None → None — None rides
  Route 10: None → None — None rides
Delay: 385920091.993 seconds

Update #2
Pickup: 2013-01-01 00:00:00
Dropoff: 2013-01-01 00:03:00
  Route 1: 154.167 → 154.167 — 1 rides
  Route 2: 151.172 → 158.152 — 1 rides
  Route 3: None → None — None rides
  Route 4: None → None — None rides
  Route 5: None → None — None rides
  Route 6: None → None — None rides
  Route 7: None → None — None rides
  Route 8: None → None — None rides
  Route 9: None → None — None rides
  Route 10: None → None — None rides
Delay: 385919947.993 seconds

Update #3
Pickup: 2013-01-01 00:02

### Query 2: Profitable areas

This query aims to identify areas that are currently most profitable for taxi drivers. The
profitability of an area is determined by dividing the area's profit by the number of empty
taxis in that area within the last 15 minutes. The profit originating from an area is computed
by calculating the median fare + tip for trips that started in the area and ended within the last
15 minutes. The number of empty taxis in an area is the sum of taxis with a drop-off location
less than 30 minutes ago and no following pickup.

#### Part 1
The result stream of the query must be

pickup_datetime, dropoff_datetime, profitable_cell_id_1,
empty_taxies_in_cell_id, median_profit_in_cell_id, profitability_of_cell,

Report only the 10 most profitable areas

In [8]:
import math
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def to_cell_250(lat, lon):
    if lat is None or lon is None:
        return None
    lat0 = 41.474937
    lon0 = -74.913585
    meters_per_deg_lat = 111320
    meters_per_deg_lon = 40075000 * math.cos(math.radians(lat0)) / 360
    cell_x = int((lon - lon0) * meters_per_deg_lon / 250) + 1
    cell_y = int((lat0 - lat) * meters_per_deg_lat / 250) + 1
    if 1 <= cell_x <= 600 and 1 <= cell_y <= 600:
        return f"{cell_x}.{cell_y}"
    return None

to_cell_250_udf = udf(to_cell_250, StringType())

df_enriched = df_clean.withColumn("pickup_cell", to_cell_250_udf(col("pickup_latitude"), col("pickup_longitude")))

In [9]:
df_enriched.show(5)  # See the first 5 rows

+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+----------+----------+-----------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount| pickup_ts|dropoff_ts|pickup_cell|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+----------+----------+-----------+
|5EE2C4D3BF57BDB45...|E96EF8F6E6122591F...|2013-01-01 00:00:09|2013-01-01 00:00:36|               26|     

In [10]:
df_profit = df_enriched.withColumn("profit", col("fare_amount") + col("tip_amount"))

In [11]:
df_profit.show(5)

+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+----------+----------+-----------+------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount| pickup_ts|dropoff_ts|pickup_cell|profit|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+----------+----------+-----------+------+
|5EE2C4D3BF57BDB45...|E96EF8F6E6122591F...|2013-01-01 00:00:09|2013-01-01 00:00:36|  

In [14]:
from datetime import timedelta
import time
from pyspark.sql.functions import expr
from pyspark.sql.functions import desc

# Collect a sample of rows ordered by dropoff_ts to simulate event-time progression
df_sample = df_profit.limit(10000) # lets take all the rows from the data since we already have a subset of the whole data.
rows = df_sample.orderBy("dropoff_ts").select("pickup_datetime", "dropoff_datetime", "dropoff_ts", "pickup_cell", "profit").collect()

results = []
prev_top10_keys = []

for i, row in enumerate(rows):
    # Use each row's drop-off time as the current event time (in Unix seconds)
    current_event_ts = row["dropoff_ts"]
    profit_window_start = current_event_ts - 15 * 60  # last 15 minutes
    empty_window_start = current_event_ts - 30 * 60   # last 30 minutes

    # Filter data within the respective windows
    df_profit_window = df_profit.filter(col("dropoff_ts") >= profit_window_start)
    df_empty_window = df_enriched.filter(col("dropoff_ts") >= empty_window_start)
    
    # Aggregate median profit per cell in the profit window
    median_profit_df = df_profit_window.groupBy("pickup_cell") \
        .agg(expr("percentile_approx(profit, 0.5)").alias("median_profit"))
    
    # Aggregate empty taxi counts per cell in the empty window (using dropoff_cell if available,
    # or you may assume pickup_cell==dropoff_cell for simplicity)
    # Here, we assume the same cell function applies for drop-off coordinates.
    df_empty_window = df_empty_window.withColumn("dropoff_cell", to_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude")))
    from pyspark.sql.functions import countDistinct
    empty_taxis_df = df_empty_window.groupBy("dropoff_cell") \
        .agg(countDistinct("medallion").alias("empty_taxis"))
    
    # Join on cell (assuming pickup_cell matches dropoff_cell for profit calculation)
    # what do we do if the cells dont match up?
    # what do we do if the pickup_cell is not the dropoff_cell? it way well not be the case?
    df_profitability = median_profit_df.join(
        empty_taxis_df,
        median_profit_df.pickup_cell == empty_taxis_df.dropoff_cell,
        "inner"
    ).select(
        median_profit_df.pickup_cell.alias("cell_id"),
        "median_profit",
        "empty_taxis"
    )
    
    # Compute profitability for each cell
    # i guess this is based on the equation that was given in the original task description
    df_profitability = df_profitability.withColumn("profitability", col("median_profit") / col("empty_taxis"))
    
    # Get the top 10 cells by profitability for this event
    top10_cells = df_profitability.orderBy(desc("profitability")).limit(10)

In [None]:
top10_cells.limit(10).collect()

Py4JJavaError: An error occurred while calling o550257.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 25.0 failed 1 times, most recent failure: Lost task 9.0 in stage 25.0 (TID 188) (10.10.53.106 executor driver): java.net.SocketException: Connection reset by peer: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at java.io.FilterOutputStream.write(Unknown Source)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(Unknown Source)
	at java.net.SocketOutputStream.write(Unknown Source)
	at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
	at java.io.BufferedOutputStream.write(Unknown Source)
	at java.io.DataOutputStream.write(Unknown Source)
	at java.io.FilterOutputStream.write(Unknown Source)
	at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


#### Part 2

The resulting stream of the query must provide the 10 most profitable areas in the subsequent
format:

pickup_datetime, dropoff_datetime, profitable_cell_id_1,
empty_taxies_in_cell_id_1, median_profit_in_cell_id_1,
profitability_of_cell_1, ... , profitable_cell_id_10,
empty_taxies_in_cell_id_10, median_profit_in_cell_id_10,
profitability_of_cell_10, delay

With attribute names containing cell_id_1 corresponding to the most profitable cell and
attribute containing cell_id_10 corresponding to the 10th most profitable cell. If less than 10
cells can be identified within the last 30 min, then NULL is to be returned for all cells that
lack data. Query results must be updated whenever the 10 most profitable areas change. The
pickup_datetime dropoff_datetime in the output are the timestamps of the trip report that
triggered the change.

The attribute “delay” captures the time delay between reading the input event that triggered
the output and the time when the output is produced. Participants must determine the delay
using the current system time right after reading the input and right before writing the output.
This attribute will be used in the evaluation of the submission.

Note: We use the same numbering scheme as for query 1 but with a different resolution. In
query two we assume a cell size of 250m X 250m, i.e., the area to be considered spans from
cell 1.1 to cell 600.600

In [None]:
from datetime import timedelta
import time
from pyspark.sql.functions import expr, desc, col, countDistinct
from pyspark.sql import Row

# Sample limited data (already a subset), simulate event-time progression
df_sample = df_profit.limit(10000)
rows = df_sample.orderBy("dropoff_ts").select("pickup_datetime", "dropoff_datetime", "dropoff_ts", "pickup_cell", "profit").collect()

results = []
prev_top10_keys = []

for i, row in enumerate(rows):
    current_event_ts = row["dropoff_ts"]
    profit_window_start = current_event_ts - 15 * 60  # last 15 min
    empty_window_start = current_event_ts - 30 * 60   # last 30 min

    # Profit window: calculate median profit per pickup cell
    df_profit_window = df_profit.filter(col("dropoff_ts") >= profit_window_start)
    median_profit_df = df_profit_window.groupBy("pickup_cell") \
        .agg(expr("percentile_approx(profit, 0.5)").alias("median_profit"))

    # Empty taxi window: count distinct medallions per dropoff cell
    df_empty_window = df_enriched.filter(col("dropoff_ts") >= empty_window_start) \
        .withColumn("dropoff_cell", to_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude")))

    empty_taxis_df = df_empty_window.groupBy("dropoff_cell") \
        .agg(countDistinct("medallion").alias("empty_taxis"))

    # Join profit and empty counts on matching cells (pickup_cell == dropoff_cell)
    df_profitability = median_profit_df.join(
        empty_taxis_df,
        median_profit_df.pickup_cell == empty_taxis_df.dropoff_cell,
        "inner"
    ).select(
        median_profit_df.pickup_cell.alias("cell_id"),
        "median_profit",
        "empty_taxis"
    ).withColumn(
        "profitability", col("median_profit") / col("empty_taxis")
    )

    # Get top 10 profitable cells
    top10_cells = df_profitability.orderBy(desc("profitability")).limit(10)
    top10_list = top10_cells.collect()

    # Fill in remaining slots with None if fewer than 10 found
    while len(top10_list) < 10:
        top10_list.append(Row(cell_id=None, median_profit=None, empty_taxis=None, profitability=None))

    top10_keys = [cell.cell_id for cell in top10_list]

    if top10_keys != prev_top10_keys:
        prev_top10_keys = top10_keys.copy()
        delay = round(time.time() - current_event_ts, 3)

        result_row = {
            "pickup_datetime": row["pickup_datetime"],
            "dropoff_datetime": row["dropoff_datetime"]
        }

        for idx, cell in enumerate(top10_list, 1):
            result_row[f"profitable_cell_id_{idx}"] = cell.cell_id
            result_row[f"empty_taxies_in_cell_id_{idx}"] = cell.empty_taxis
            result_row[f"median_profit_in_cell_id_{idx}"] = cell.median_profit
            result_row[f"profitability_of_cell_{idx}"] = cell.profitability

        result_row["delay"] = delay
        results.append(result_row)

        # Display first few updates only for demonstration
        if len(results) <= 3:
            print(f"\nUpdate #{len(results)}")
            for key, value in result_row.items():
                print(f"{key}: {value}")
