## Project Template

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from delta import *
from delta.tables import *
import os
import uuid
import time

builder = SparkSession.builder.appName("Streaming2_practice") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false") \
    .config("spark.jars.packages", 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0') \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .config("spark.sql.adaptive.enabled", "false") \
    .config("spark.sql.debug.maxToStringFields", "10000")

spark = configure_spark_with_delta_pip(builder).getOrCreate() # type: ignore


In [2]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, FloatType

schema = StructType(
      [
        StructField("hack_license", StringType(), False),
        StructField("pick_up_location", StringType(), False),
        StructField("drop_off_location", StringType(), False),
        StructField("pick_up_time", TimestampType(), False),
        StructField("drop_off_time", TimestampType(), False),
        StructField("timestamp", TimestampType(), False),
      ]
    )

In [8]:
brokers = os.getenv('KAFKA_ADVERTISED_LISTENERS')
protocol = os.getenv('KAFKA_LISTENER_SECURITY_PROTOCOL_MAP') 

lines = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokers)
    .option("kafka.security.protocol", protocol)
    .option("subscribe", "stock")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 100)
    .load()
)

parsed_lines = lines.select(F.from_json(lines.value.cast("string"), schema).alias("data")).select("data.*")
parsed_lines.writeStream.format("console").start().awaitTermination(5)


False

## [Query 1] Utilization over a window of 5, 10, and 15 minutes per taxi/driver. This can be computed by computing the idle time per taxi. How does it change? Is there an optimal window?

In [1]:
from datetime import datetime

checkpoint_path = "./checkpoint" 
output_path = "output/"
watermark_duration = "30 minutes"


# Set up queries for each window duration
queries = []
windows = [5, 10, 15]
for duration in windows:
    window_duration = f"{duration} minutes"

    df = parsed_lines.withWatermark("pick_up_time", watermark_duration)\
        .withColumn("active_time", F.unix_timestamp("drop_off_time") - F.unix_timestamp("pick_up_time"));

    df.writeStream.format("console").start().awaitTermination(5)


    df_grouped = df.groupBy(
        F.col("hack_license"), 
        F.window(F.col("pick_up_time"), window_duration)
    ).agg(
        F.sum("active_time").alias("total_active_time"),
        F.min("pick_up_time").alias("first_pick_up_time"),
        F.max("drop_off_time").alias("last_drop_off_time")
    )

    df_grouped.writeStream.format("console").start().awaitTermination(5)


    # Calculate the idle time
    idle_time_df = df_grouped.withColumn("idle_time", \
                                         (F.unix_timestamp(F.col("last_drop_off_time")) - F.unix_timestamp(F.col("first_pick_up_time"))) - F.col("total_active_time"))

    idle_time_df.writeStream.format("console").start().awaitTermination(5)


    query = idle_time_df.writeStream \
            .outputMode("append") \
            .format("delta") \
            .trigger(processingTime="120 seconds") \
            .option("checkpointLocation", f"{checkpoint_path}/duration_{duration}_min") \
            .option("path", f"{output_path}/duration_{duration}_min") \
            .option("mergeSchema", "true") \
            .start()
    queries.append(query)


NameError: name 'parsed_lines' is not defined

In [7]:
for query in queries:
    query.stop()

In [6]:
import matplotlib.pyplot as plt
import pandas as pd

checkpoint_path = "./checkpoint" 
output_path = "output/"
windows = [5, 10, 15]

# Assuming you have a function to read delta tables into a dataframe
def read_delta_table(path):
    return spark.read.format("delta").load(path).toPandas()

# Collect data from different window outputs
data_frames = {duration: read_delta_table(f"{output_path}/duration_{duration}_min") for duration in windows}
print(data_frames)

# for duration, df in data_frames.items():
#     df['window_size'] = f"{duration}min"  # Add window size as a string, e.g., '5min'
#     df['window_start'] = pd.to_datetime(df['window'].str.extract(r'\((.*),')[0])  # Optional: Extract start of window for any further temporal analysis

# # Combine all data frames into one
# combined_df = pd.concat(data_frames.values(), ignore_index=True)

# # Drop the 'window' column if no longer needed
# combined_df.drop('window', axis=1, inplace=True)

# # Pivot the DataFrame
# pivot_df = combined_df.pivot_table(index='hack_licence', columns='window_size', values='idle_time', aggfunc='mean')
# print(pivot_df)

# # Optional: Reset the index if you want 'hack_licence' as a regular column for plotting
# pivot_df.reset_index(inplace=True)

# # Example of plotting
# import matplotlib.pyplot as plt

# pivot_df.set_index('hack_licence').plot(kind='bar', figsize=(10, 7))
# plt.title('Average Idle Time per Window Size')
# plt.xlabel('Driver')
# plt.ylabel('Average Idle Time (seconds)')
# plt.xticks(rotation=0)
# plt.legend(title='Window size')
# plt.show()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: output/duration_5_min.

## [Query 2] The average time it takes for a taxi to find its next fare(trip) per destination borough. This can be computed by finding the time difference, e.g. in seconds, between the trip's drop off and the next trip's pick up within a given unit of time

In [None]:
# remember you can register another stream


## [Query 3] The number of trips that started and ended within the same borough in the last hour

In [None]:
# remember you can register another stream


## [Query 4] The number of trips that started in one borough and ended in another one in the last hour