
# Project - PID Streaming - Tomáš Mlynář - mlynatom@fel.cvut.cz
---

<div  style="text-align: center; line-height: 0; padding-top: 20px;">
  <img src="https://raw.githubusercontent.com/animrichter/BDT_2023/master/data/assets/streaming.png" style="width: 1200">
</div>

**TODO**

- klidně jen zkusit přes vánoce a po vánocích kdyžtak je konzultae (ale chatovací nástroje by to měly zvládnout)
- na konec reportu uveďte zdroje - pro info kdo co používá (e.g. copilot, chatGPT+verze)
- soft deadline 15. 1. (ideálně do téhle doby) -> po komunikaci s Filasem lze posunout
- V průběhu den -> potřebuji buď jeden den, nebo to rozdělit po dnech a intervalech
- NULL typ vozidel jsou vlaky!
- začít s uzavřením streamu a pak batch (kvůli prohlédnutí dat)
- později ideálně stream prostující celou pipeline

## Zadání B
 Proveďte analýzu zpoždění různých typů dopravních prostředků (autobusy, tramvaje, metro, vlaky) v průběhu celého dne. Porovnejte průměrné zpoždění pro každý typ vozidla a zjistěte, jak se mění v 3 hod. časových intervalech. Výsledky zobrazte v grafu. Jako další krok identifikujte 3 místa s nejmenším průměrným zpožděním pro daný typ dopravního prostředku a vizualizujte je na mapě.
Cíl: Odhalit vzorce v zpoždění různých typů dopravních prostředků a lokalizovat klíčová místa, kde k nim dochází.


%md
## Data Fields

### 1. `geometry.coordinates`
- **Description**: Coordinates representing the location of the vehicle.

### 2. `properties.trip.vehicle_type.description_en`
- **Description**: Description of the vehicle type in English.
  - **Note**: If empty, it represents a train.

### 3. `properties.trip.agency_name.scheduled`
- **Description**: Agency name that currently operates the trip

### 4. `properties.trip.gtfs.trip_id`
- **Description**: Identifier of the trip in the GTFS Static feed.

### 5. `properties.trip.vehicle_registration_number`
- **Description**: Four-digit identifier of the vehicle in the system.

### 6. `properties.trip.gtfs.route_short_name`
- **Description**: Identification of the line used for the public.

### 7. `properties.last_position.delay.actual`
- **Description**: Current delay, in seconds.

### 8. `properties.last_position.origin_timestamp`
- **Description**: Time at which the position was sent from the vehicle (UTC).

### 9. `properties.last_position.shape_dist_traveled`
- **Description**: Number of kilometers traveled on the route.

## Kafka Connection settings

In [0]:
KAFKA_CLUSTER = "b-3-public.felkafkamsk.56s6v1.c2.kafka.eu-central-1.amazonaws.com:9196,b-2-public.felkafkamsk.56s6v1.c2.kafka.eu-central-1.amazonaws.com:9196,b-1-public.felkafkamsk.56s6v1.c2.kafka.eu-central-1.amazonaws.com:9196"

TOPIC = "fel-pid-topic"

#data schema for parsing raw data
SCHEMA = 'array<struct<geometry: struct<coordinates: array<double>, type: string>, properties: struct<last_position: struct<bearing: int, delay: struct<actual: int, last_stop_arrival: int, last_stop_departure: int>, is_canceled: string, last_stop: struct<arrival_time: string, departure_time: string, id: string, sequence: int>, next_stop: struct<arrival_time: string, departure_time: string, id: string, sequence: int>, origin_timestamp: string, shape_dist_traveled: string, speed: string, state_position: string, tracking: boolean>, trip: struct<agency_name: struct<real: string, scheduled: string>, cis: struct<line_id: string, trip_number: string>, gtfs: struct<route_id: string, route_short_name: string, route_type: int, trip_headsign: string, trip_id: string, trip_short_name: string>, origin_route_name: string, sequence_id: int, start_timestamp: string, vehicle_registration_number: string, vehicle_type: struct<description_cs: string, description_en: string, id: int>, wheelchair_accessible: boolean, air_conditioned: boolean, usb_chargers: boolean>>, type: string>>'

## Stream -> Batch processing

### Kafka Connection

In [0]:
#read data as a batch from kafka
raw = (
    spark
        .read #only change
        .format('kafka')
        .option('kafka.bootstrap.servers', KAFKA_CLUSTER)
        .option('subscribe', TOPIC)
        .option('startingOffsets', "earliest") #starting offset - the kafka is set to save everything
        .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
        .option('kafka.security.protocol', 'SASL_SSL')
        .option('kafka.sasl.jaas.config', 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="FELPIDUSER" password="dzs3c1vldy6np5";')
        .load()
)

checkpoint = '/mnt/pid/checkpoint_file_batch.txt'

#different write
(raw.write
   .format("delta")
   .option("checkpointLocation", checkpoint)
   .mode("overwrite")
   .saveAsTable("fel_pid_topic_data_batch")
)

In [0]:
%sql
--view the data we got
select string(value), * from fel_pid_topic_data_batch order by `timestamp` desc

### Create Bronze Table

In [0]:
%python
from pyspark.sql.functions import from_json, col

## load data as dataframe
pid_data_df = spark.table("fel_pid_topic_data_batch")

## retype value column to string
pid_data_df = pid_data_df.withColumn("value", pid_data_df["value"].cast("string"))

## unwrap the data in value column (list of dictionaries) and have them on separate rows
pid_data_df = pid_data_df.withColumn("value", from_json(col("value"), SCHEMA))
pid_data_df = pid_data_df.selectExpr("explode(value) as value", "timestamp")

## now extract value into 3 columns - geometry, properties, type
pid_data_df = pid_data_df.withColumn("geometry", pid_data_df["value.geometry"])
pid_data_df = pid_data_df.withColumn("properties", pid_data_df["value.properties"])
pid_data_df = pid_data_df.withColumn("type", pid_data_df["value.type"])

## drop value column
pid_data_df = pid_data_df.drop("value")

#order to better see result
pid_data_df = pid_data_df.orderBy("timestamp")

#display the result -> only now something is computed
pid_data_df.display()

#save the bronze table
pid_data_df.write.format("delta").mode("overwrite").saveAsTable("fel_pid_topic_data_bronze")

### Create Silver Table

Table with flat structure

In [0]:
## load bronze table
bronze_table = spark.table("fel_pid_topic_data_bronze")
bronze_table.display()

In [0]:
from pyspark.sql.functions import when, col, lit

silver_table = (bronze_table
                .withColumn("latitude", col("geometry.coordinates")[1])
                .withColumn("longitude", col("geometry.coordinates")[0])
                .withColumn("vehicle_type", when(col("properties.trip.vehicle_type.description_en").isNotNull(), col("properties.trip.vehicle_type.description_en")).otherwise(lit("train")))
                .withColumn("trip_id", col("properties.trip.gtfs.trip_id"))
                .withColumn("line_name", col("properties.trip.gtfs.route_short_name"))
                .withColumn("timestamp", col("properties.last_position.origin_timestamp").cast("timestamp"))
                .withColumn("current_delay", col("properties.last_position.delay.actual"))
                )

#drop geometry and properties
silver_table = silver_table.drop("geometry", "properties")

silver_table.display()

#save the silver table
silver_table.write.format("delta").mode("overwrite").saveAsTable("fel_pid_topic_data_silver")

### Gold Tables

#### Delays for each vehichle type - during a day

**Proveďte analýzu zpoždění různých typů dopravních prostředků (autobusy, tramvaje, metro, vlaky) v průběhu celého dne. Porovnejte průměrné zpoždění pro každý typ vozidla** a zjistěte, jak se mění v 3 hod. časových intervalech. Výsledky zobrazte v grafu.

In [0]:
gold_table = spark.table("fel_pid_topic_data_silver")

## add new column denoting day of the trip
gold_table = gold_table.withColumn("day", col("timestamp").cast("date"))
gold_table.display()

In [0]:
from pyspark.sql.functions import avg

#there are in each day more rows for each trip_id -> average current delay for each trip_id during a day - this way I get the average delay for each trip
avg_gold_table = gold_table.groupBy("day", "trip_id", "vehicle_type").agg(avg("current_delay").alias("avg_delay")).orderBy("day", "trip_id")
avg_gold_table.display()

In [0]:
#average for each vehichle_type in a day
avg_gold_table_by_vehicle = avg_gold_table.groupBy("day", "vehicle_type").agg(avg("avg_delay").alias("avg_delay_by_vehicle")).orderBy("day", "vehicle_type")
avg_gold_table_by_vehicle.display()



Databricks visualization. Run in Databricks to view.

In [0]:
#average also over the days
avg_gold_table_by_all_days= avg_gold_table.groupBy("vehicle_type").agg(avg("avg_delay").alias("avg_delay_by_vehicle")).orderBy("vehicle_type")
avg_gold_table_by_all_days.display()

Databricks visualization. Run in Databricks to view.

#### Delays in 3-hour windows
Proveďte analýzu zpoždění různých typů dopravních prostředků (autobusy, tramvaje, metro, vlaky) v průběhu celého dne. Porovnejte průměrné zpoždění pro každý typ vozidla **a zjistěte, jak se mění v 3 hod. časových intervalech. Výsledky zobrazte v grafu.**

In [0]:
from pyspark.sql.functions import col, floor, hour

gold_table = spark.table("fel_pid_topic_data_silver")

## add new column denoting day of the trip
gold_table = gold_table.withColumn("day", col("timestamp").cast("date")).orderBy("timestamp")

## divide each day into 8 3-hour windows denoted as 1,2,2,3,4,5,6,7,8 from midnight to midnight
### extract hour from timestamp
gold_table = gold_table.withColumn("hour", hour(col("timestamp")))
### divide hour into 8 3-hour windows
gold_table = gold_table.withColumn("window", floor(col("hour") / 3))

gold_table.display()

In [0]:
from pyspark.sql.functions import avg

#there are in each day and window more rows for each trip_id -> average current delay for each trip_id during a day and window - this way I get the average delay for each trip
#this way I can consider each trip in more windows, but as the trip happened during all the windows, I think it is a good representation
avg_gold_table_by_vehicle = gold_table.groupBy("day", "window", "vehicle_type", "trip_id").agg(avg("current_delay").alias("avg_delay")).orderBy("day", "window", "vehicle_type", "trip_id")

avg_gold_table_by_vehicle.display()


In [0]:
from pyspark.sql.functions import concat, lit, col
from pyspark.sql import functions as F

#now average the delay only by the windows and vehicle type
avg_gold_table_by_all_days = avg_gold_table_by_vehicle.groupBy("vehicle_type", "window").agg(avg("avg_delay").alias("avg_delay_by_vehicle")).orderBy("window", "vehicle_type")
#add column translating window into strings of hours of the day corresponding to the window (e.g. 0 -> "0-2")
avg_gold_table_by_all_days = avg_gold_table_by_all_days.withColumn("window_hours", F.format_string("%s-%s",  col("window") * 3,  col("window") * 3 + 2))

avg_gold_table_by_all_days.display()

Databricks visualization. Run in Databricks to view.

#### Min average delay places

Jako další krok identifikujte 3 místa s nejmenším průměrným zpožděním pro daný typ dopravního prostředku a vizualizujte je na mapě.

In [0]:
gold_table = spark.table("fel_pid_topic_data_silver")
gold_table.display()

In [0]:
#count rows in gold_table

gold_table.count()

In [0]:
#count rows with unique latitude-longitude combination
gold_table.select("latitude", "longitude").distinct().count()

In [0]:
from pyspark.sql.functions import round, col
#round latitude and logitude to 4 decimal places and count distinct rows
gold_table.select(round(col("latitude"), 4).alias("latitude"), round(col("longitude"), 4).alias("longitude")).distinct().count()

In [0]:
## compute average delay for each vechicle type on specific latitude and longitude
places_avg = gold_table.groupBy("vehicle_type", "latitude", "longitude").agg(avg("current_delay").alias("avg_delay")).orderBy("vehicle_type")

In [0]:
#3 places for each vehicle type with smallest avg_delay (in each vehicle type separately)
dfs = []
for v_type in places_avg.select("vehicle_type").distinct().collect():
    df = places_avg.where(col("vehicle_type") == v_type[0]).orderBy("avg_delay").limit(3)
    dfs.append(df)

#union dfs
three_places = dfs[0]
for df in dfs[1:]:
    three_places = three_places.union(df)

three_places.display()

Databricks visualization. Run in Databricks to view.

In [0]:
## now the same thing but with latitude and longitude rounded to 4 decimal places -> sufficient to around 10 m precision
# round to 4 decimal places
places_avg = gold_table.withColumn("latitude", round(col("latitude"), 4))
places_avg = places_avg.withColumn("longitude", round(col("longitude"), 4))

places_avg = places_avg.groupBy("vehicle_type", "latitude", "longitude").agg(avg("current_delay").alias("avg_delay")).orderBy("vehicle_type")

#3 places for each vehicle type with smallest avg_delay (in each vehicle type separately)
dfs = []
for v_type in places_avg.select("vehicle_type").distinct().collect():
    df = places_avg.where(col("vehicle_type") == v_type[0]).orderBy("avg_delay").limit(3)
    dfs.append(df)

#union dfs
three_places = dfs[0]
for df in dfs[1:]:
    three_places = three_places.union(df)

three_places.display()

Databricks visualization. Run in Databricks to view.

## Full stream processing

In [0]:
#read data as a stream
raw = (
    spark
        .readStream
        .format('kafka')
        .option('kafka.bootstrap.servers', KAFKA_CLUSTER)
        .option('subscribe', TOPIC)
        .option('startingOffsets', "earliest")
        .option('kafka.sasl.mechanism', 'SCRAM-SHA-512')
        .option('kafka.security.protocol', 'SASL_SSL')
        .option('kafka.sasl.jaas.config', 'kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="FELPIDUSER" password="dzs3c1vldy6np5";')
        .load()
)

#nebo lze jen sparkread a ten si sáhne do kafky a stáhne co má aktuálně k dispozici - lze nastvit offset (teď kafka obsahuje všechno)

checkpoint = '/mnt/pid/checkpoint_file2.txt'

(raw.writeStream
   .format("delta")
   .outputMode("append")
   .option("checkpointLocation", checkpoint)
   .toTable("fel_pid_topic_data_stream")
)

### Bronze stream

In [0]:
%python
from pyspark.sql.functions import from_json, col

## load data as dataframe
pid_data_df = spark.readStream.format("delta").table("fel_pid_topic_data_stream")

## retype value column to string
pid_data_df = pid_data_df.withColumn("value", pid_data_df["value"].cast("string"))

## unwrap the data in value column (list of dictionaries) and have them on separate rows
pid_data_df = pid_data_df.withColumn("value", from_json(col("value"), SCHEMA))
pid_data_df = pid_data_df.selectExpr("explode(value) as value", "timestamp")

## now extract value into 3 columns - geometry, properties, type
pid_data_df = pid_data_df.withColumn("geometry", pid_data_df["value.geometry"])
pid_data_df = pid_data_df.withColumn("properties", pid_data_df["value.properties"])
pid_data_df = pid_data_df.withColumn("type", pid_data_df["value.type"])

## drop value column
pid_data_df = pid_data_df.drop("value")

#order to better see result
#pid_data_df = pid_data_df.orderBy("timestamp")

#display the result -> only now something is computed
#pid_data_df.display()

#save the bronze table
checkpoint_bronze = '/mnt/pid/checkpoint_bronze'
pid_data_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", checkpoint_bronze).toTable("bronze_table_stream")

### Silver Stream

In [0]:
from pyspark.sql.functions import when, col, lit

## load bronze table stream
bronze_table = spark.readStream.format("delta").table("bronze_table_stream")

silver_table = (bronze_table
                .withColumn("latitude", col("geometry.coordinates")[1])
                .withColumn("longitude", col("geometry.coordinates")[0])
                .withColumn("vehicle_type", when(col("properties.trip.vehicle_type.description_en").isNotNull(), col("properties.trip.vehicle_type.description_en")).otherwise(lit("train")))
                .withColumn("trip_id", col("properties.trip.gtfs.trip_id"))
                .withColumn("line_name", col("properties.trip.gtfs.route_short_name"))
                .withColumn("timestamp", col("properties.last_position.origin_timestamp").cast("timestamp"))
                .withColumn("current_delay", col("properties.last_position.delay.actual"))
                )

#drop geometry and properties
silver_table = silver_table.drop("geometry", "properties")

#silver_table.display()

#save the silver table
checkpoint_silver = '/mnt/pid/checkpoint_silver2'
silver_table.writeStream.format("delta").outputMode("append").option("checkpointLocation", checkpoint_silver).toTable("silver_table_stream2")

### Gold Stream

#### Day analysis

In [0]:

from pyspark.sql.functions import avg

gold_table = spark.readStream.format("delta").table("silver_table_stream2")

## add new column denoting day of the trip
gold_table = gold_table.withColumn("day", col("timestamp").cast("date"))

#there are in each day more rows for each trip_id -> average current delay for each trip_id during a day - this way I get the average delay for each trip
avg_gold_table = gold_table.groupBy("day", "trip_id", "vehicle_type").agg(avg("current_delay").alias("avg_delay"))

#average for each vehichle_type in a day
avg_gold_table_by_vehicle = avg_gold_table.groupBy("day", "vehicle_type").agg(avg("avg_delay").alias("avg_delay_by_vehicle"))
avg_gold_table_by_vehicle.display()


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import avg

gold_table = spark.readStream.format("delta").table("silver_table_stream2")

## add new column denoting day of the trip
gold_table = gold_table.withColumn("day", col("timestamp").cast("date"))

#there are in each day more rows for each trip_id -> average current delay for each trip_id during a day - this way I get the average delay for each trip
avg_gold_table = gold_table.groupBy("day", "trip_id", "vehicle_type").agg(avg("current_delay").alias("avg_delay"))
#average also over the days
avg_gold_table_by_all_days= avg_gold_table.groupBy("vehicle_type").agg(avg("avg_delay").alias("avg_delay_by_vehicle"))
avg_gold_table_by_all_days.display()

Databricks visualization. Run in Databricks to view.

#### 3 hour windows

In [0]:
from pyspark.sql.functions import col, floor, hour, avg

spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")

gold_table = spark.readStream.format("delta").table("silver_table_stream2").withWatermark("timestamp", "10 minutes")

## add new column denoting day of the trip
gold_table = gold_table.withColumn("day", col("timestamp").cast("date"))

## divide each day into 8 3-hour windows denoted as 1,2,2,3,4,5,6,7,8 from midnight to midnight
### extract hour from timestamp
gold_table = gold_table.withColumn("hour", hour(col("timestamp")))
### divide hour into 8 3-hour windows
gold_table = gold_table.withColumn("window", floor(col("hour") / 3))

avg_gold_table_by_vehicle = gold_table.groupBy("day", "window", "vehicle_type", "trip_id").agg(avg("current_delay").alias("avg_delay"))
from pyspark.sql.functions import concat, lit, col
from pyspark.sql import functions as F

#now average the delay only by the windows and vehicle type
avg_gold_table_by_all_days = avg_gold_table_by_vehicle.groupBy("vehicle_type", "window").agg(avg("avg_delay").alias("avg_delay_by_vehicle"))
#add column translating window into strings of hours of the day corresponding to the window (e.g. 0 -> "0-2")
avg_gold_table_by_all_days = avg_gold_table_by_all_days.withColumn("window_hours", F.format_string("%s-%s",  col("window") * 3,  col("window") * 3 + 2))

# Display the result
display(avg_gold_table_by_all_days)

Databricks visualization. Run in Databricks to view.

#### Stream analysis of locations with minimum delays

In [0]:
gold_table = spark.readStream.format("delta").table("silver_table_stream2")
places_avg = gold_table.groupBy("vehicle_type", "latitude", "longitude").agg(avg("current_delay").alias("avg_delay")).orderBy("vehicle_type")

# Get the top 3 places for each vehicle type with the smallest average delay
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("vehicle_type").orderBy("avg_delay")
places_ranked = places_avg.withColumn("rank", row_number().over(window_spec)).filter(col("rank") <= 3)

three_places.display()

Databricks visualization. Run in Databricks to view.