## Project Template

In [None]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from delta import *
from delta.tables import *

from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col

builder = SparkSession.builder \
    .appName("ProjectII") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
spark = configure_spark_with_delta_pip(builder).getOrCreate()

Be sure to start the stream on Kafka!

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType
# TO MODIFY FOR YOUR SCHEMA
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("rate_code", StringType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True),
    StructField("timestamp", TimestampType(), True)
])

In [None]:
kafka_server = "kafka1:9092"   
from pyspark.sql.functions import from_json

lines = (spark.readStream                        # Get the DataStreamReader
  .format("kafka")                                 # Specify the source format as "kafka"
  .option("kafka.bootstrap.servers", kafka_server) # Configure the Kafka server name and port
  .option("subscribe", "stock")                       # Subscribe to the "en" Kafka topic 
  .option("startingOffsets", "earliest")           # The start point when a query is started
  .option("maxOffsetsPerTrigger", 100)             # Rate limit on max offsets per trigger interval
  .load()
# Load the DataFrame
)
df = lines.select("parsed_value.*")


### The project starts here

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
from delta.tables import *
from dotenv import load_dotenv
import os
import uuid
import time

from pyspark.sql import SparkSession
from delta import *
from delta.tables import *

spark = SparkSession.builder \
    .appName("TaxiUtilizationDashboard") \
    .config("spark.sql.streaming.metricsEnabled", "true") \
    .config("spark.sql.execution.streaming.source.maxOffsetsPerTrigger", 200) \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.catalogImplementation", "hive") \
    .getOrCreate()

In [5]:
csv_file_path = "./split/"

stream_df = (spark.readStream
    .format("csv")
    .schema(schema)
    .option("header", "true")  # Assuming the CSV file has a header
    .option("maxFilesPerTrigger", 1)  # Limit on max files to be processed per trigger interval
    .load(csv_file_path))

In [6]:
import time
# if you create the table metastore before any data exists then the stream will result in an error as the table is generated with empty schema
def create_table_if_exists(output_path,table_name):
    data_exists = False
    for _i in range(60): # you can replace this with while, currently timeouts after about 60 seconds
        try:
            time.sleep(1)
            files = os.listdir(output_path)
            for _f in files:
                if ".parquet" in _f:
                    if len(os.listdir(f"{output_path}/_delta_log"))>0:
                        print("data exists")
                        data_exists = True
                        break
            if data_exists:
                spark.sql(f"CREATE TABLE IF NOT EXISTS {table_name} USING DELTA LOCATION '{table_name}'") # table metastore is created once there is some data (.parquet) in the directory
                break
        except Exception as e:
            #print(e) # if you want to see the exceptions, uncomment this
            pass

In [7]:
from pyspark.sql.functions import col, window, when, expr, unix_timestamp


In [4]:
# Function to show the first 10 values of each micro-batch
def show_first_10_values(batch_df, batch_id):
    print(f"Batch ID: {batch_id}")
    batch_df.show(10, truncate=False)

## [Query 1] Utilization over a window of 5, 10, and 15 minutes per taxi/driver. This can be computed by computing the idle time per taxi. How does it change? Is there an optimal window?

In [8]:
# Calculate idle time in seconds
stream_df = stream_df.withColumn("idle_time", 
    when(col("pickup_datetime").isNull() | col("dropoff_datetime").isNull(), unix_timestamp(col("timestamp")))
    .otherwise(unix_timestamp(col("timestamp")) - unix_timestamp(col("dropoff_datetime"))))

In [9]:
checkpoint_path = "./checkpoint"
output_path_utilization = "./streaming/question1/"
table_name_utilization = "taxi_utilization"

if not os.path.exists(output_path_utilization):
        os.makedirs(output_path_utilization)

# Add watermark to handle late data

# The watermark duration defines how late data can arrive for it to be considered for processing.
# Setting the watermark duration to 20 minutes means that data arriving more than 20 minutes late will be considered too late and will be discarded.
# In this context, a watermark duration of 20 minutes might have been used as a placeholder.
query1_df = stream_df.withWatermark("timestamp", "20 minutes")

# Define window aggregations for 5, 10, and 15 minutes with sliding windows
windowed_counts_5min = query1_df.groupBy(
    col("medallion"),
    window(col("timestamp"), "5 minutes", "5 seconds")
).agg({"idle_time": "sum"}).withColumnRenamed("sum(idle_time)", "idle_time_5min")

windowed_counts_10min = query1_df.groupBy(
    col("medallion"),
    window(col("timestamp"), "10 minutes", "5 seconds")
).agg({"idle_time": "sum"}).withColumnRenamed("sum(idle_time)", "idle_time_10min")

windowed_counts_15min = query1_df.groupBy(
    col("medallion"),
    window(col("timestamp"), "15 minutes", "5 seconds")
).agg({"idle_time": "sum"}).withColumnRenamed("sum(idle_time)", "idle_time_15min")

# Use Append mode for writing the stream in Parquet format
query_5min = (windowed_counts_5min.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", output_path_utilization + "/5min")
    .foreachBatch(show_first_10_values)
    .option("checkpointLocation", checkpoint_path + "/utilization_5min")
    .trigger(processingTime="5 seconds")
    .start())

query_10min = (windowed_counts_10min.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", output_path_utilization + "/10min")
    .option("checkpointLocation", checkpoint_path + "/utilization_10min")
    .foreachBatch(show_first_10_values)
    .trigger(processingTime="5 seconds")
    .start())

query_15min = (windowed_counts_15min.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", output_path_utilization + "/15min")
    .option("checkpointLocation", checkpoint_path + "/utilization_15min")
    .foreachBatch(show_first_10_values)
    .trigger(processingTime="5 seconds")
    .start())

# Create the table if data exists
create_table_if_exists(output_path_utilization + "/5min", table_name_utilization)
create_table_if_exists(output_path_utilization + "/10min", table_name_utilization)
create_table_if_exists(output_path_utilization + "/15min", table_name_utilization)

## [Query 2] The average time it takes for a taxi to find its next fare(trip) per destination borough. This can be computed by finding the time difference, e.g. in seconds, between the trip's drop off and the next trip's pick up within a given unit of time

In [10]:
!pip install geopandas 

In [3]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from pyspark.sql.functions import col, when, unix_timestamp, lead, avg, pandas_udf
# Load the geojson file to get borough boundaries
boroughs_gdf = gpd.read_file('nyc-boroughs.geojson')

# Function to get the borough from coordinates using geopandas
def get_borough(longitude, latitude):
    point = Point(longitude, latitude)
    for _, row in boroughs_gdf.iterrows():
        if row['geometry'].contains(point):
            return row['borough']
    return "Unknown"

# Create a Pandas UDF to use with Spark
@pandas_udf(StringType())
def get_borough_udf(longitude, latitude):
    return pd.Series([get_borough(lon, lat) for lon, lat in zip(longitude, latitude)])

In [4]:
boroughs_gdf.head()

In [12]:
# remember you can register another stream
checkpoint_path = "./checkpoint"
output_path_avg_fare_time = "./streaming/question2/"
table_name = "avg_time_taxi_utilization_per_borough"

if not os.path.exists(checkpoint_path):
        os.makedirs(checkpoint_path)

if not os.path.exists(output_path_avg_fare_time):
        os.makedirs(output_path_avg_fare_time)

In [18]:
from pyspark.sql.functions import col, when, unix_timestamp, lead, avg, pandas_udf
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, FloatType
from pyspark.sql import Window
import pandas as pd
import os
import time
# Derive destination borough using the UDF
query2_df = stream_df.withColumn("destination_borough", get_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# Add watermark to handle late data
query2_df = query2_df.withWatermark("dropoff_datetime", "20 minutes")

joined_stream = query2_df.alias("a").join(
    csv_stream_with_watermark.alias("b"),
    (col("a.medallion") == col("b.medallion")) &
    (col("a.dropoff_datetime") < col("b.pickup_datetime")) &
    (unix_timestamp(col("b.pickup_datetime")) - unix_timestamp(col("a.dropoff_datetime")) < 3600),
    "left_outer"
).select(
    col("a.medallion"),
    col("a.destination_borough"),
    col("a.dropoff_datetime"),
    col("b.pickup_datetime").alias("next_pickup_datetime")
).withColumn(
    "time_to_next_fare", 
    unix_timestamp(col("next_pickup_datetime")) - unix_timestamp(col("dropoff_datetime"))
).filter(
    col("time_to_next_fare").isNotNull() & (col("time_to_next_fare") > 0)
)

# Compute the average time to next fare per destination borough
avg_time_to_next_fare = joined_stream.groupBy("destination_borough", window(col("dropoff_datetime"), "1 hour"))
        .agg(avg("time_to_next_fare")
        .alias("avg_time_to_next_fare"))

# Function to show the first 10 values of each micro-batch
def show_first_10_values(batch_df, batch_id):
    print(f"Batch ID: {batch_id}")
    batch_df.show(10, truncate=False)

# Write stream with foreachBatch to show the first 10 values
query_avg_fare_time = (avg_time_to_next_fare.writeStream
    .outputMode("complete")
    .foreachBatch(show_first_10_values)
    .option("checkpointLocation", checkpoint_path + "/avg_fare_time")
    .trigger(processingTime="5 minutes")
    .start())

# Create the table if data exists
create_table_if_exists(output_path_avg_fare_time, table_name_avg_fare_time)

## [Query 3] The number of trips that started and ended within the same borough in the last hour

In [14]:
from pyspark.sql.functions import col, when, unix_timestamp, pandas_udf, window, count

checkpoint_path = "./checkpoint/"
output_path_same_borough = "./streaming/question3"
table_name_same_borough = "same_borough"

In [15]:
#Derive pickup and dropoff borough using the UDF
query3_df = stream_df.withColumn("pickup_borough", get_borough_udf(col("pickup_longitude"), col("pickup_latitude")))
query3_df = query3_df.withColumn("dropoff_borough", get_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# Add watermark to handle late data
query3_df = query3_df.withWatermark("pickup_datetime", "1 hour")

# Filter trips that started and ended in the same borough
same_borough_trips = query3_df.filter(col("pickup_borough") == col("dropoff_borough"))

# Windowed aggregation to count trips in the last hour
same_borough_count = same_borough_trips.groupBy(
    window(col("pickup_datetime"), "1 hour"),
    col("pickup_borough")
).agg(count("*").alias("trip_count"))

# Use Append mode for writing the stream in Parquet format
query_same_borough = (same_borough_count.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", output_path_same_borough)
    .option("checkpointLocation", checkpoint_path + "/same_borough")
    .foreachBatch(show_first_10_values)
    .trigger(processingTime="1 minutes")
    .start())

# Create the table if data exists
create_table_if_exists(output_path_same_borough, table_name_same_borough)

In [37]:
# Path to the Parquet file
parquet_file_path = "./streaming/question1/5min/part-00000-527d1fbd-ae29-43a8-a255-caadc98d3e98-c000.snappy.parquet"

import pandas as pd
df = pd.read_parquet(parquet_file_path)
df

## [Query 4] The number of trips that started in one borough and ended in another one in the last hour

In [17]:
from pyspark.sql.functions import col, when, unix_timestamp, pandas_udf, window, count

checkpoint_path = "./checkpoint/"
output_path_different_borough = "./streaming/question4"
table_name_different_borough = "different_borough"

In [19]:
# Derive pickup and dropoff borough using the UDF
query4_df = stream_df.withColumn("pickup_borough", get_borough_udf(col("pickup_longitude"), col("pickup_latitude")))
query4_df = query4_df.withColumn("dropoff_borough", get_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# Add watermark to handle late data
query4_df = query4_df.withWatermark("pickup_datetime", "1 hour")

# Filter trips that started in one borough and ended in another
different_borough_trips = query4_df.filter(col("pickup_borough") != col("dropoff_borough"))

# Windowed aggregation to count trips in the last hour
different_borough_count = different_borough_trips.groupBy(
    window(col("pickup_datetime"), "1 hour"),
    col("pickup_borough"),
    col("dropoff_borough")
).agg(count("*").alias("trip_count"))

# Use Append mode for writing the stream in Parquet format
query_different_borough = (different_borough_count.writeStream
    .outputMode("append")  # Use complete mode to display all results
    .format("parquet")
    .option("path", output_path_different_borough)
    .option("checkpointLocation", checkpoint_path + "/different_borough")
    # .foreachBatch(show_first_10_values)
    .trigger(processingTime="1 minutes")
    .start())

# Create the table if data exists
create_table_if_exists(output_path_different_borough, table_name_different_borough)

# Wait for the termination signal for the query
# query_different_borough.awaitTermination()