<a href="https://colab.research.google.com/github/sachins301/UTA-Distributed-Computing/blob/main/UTA_Spark_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Installing Spark and dependencies

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3.tgz
!tar xf spark-3.5.2-bin-hadoop3.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.2-bin-hadoop3"

In [None]:
import findspark
findspark.init()


In [None]:
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, BooleanType, IntegerType, TimestampType, DecimalType
from pyspark.sql.functions import col, explode, input_file_name, regexp_extract, concat, lit


In [None]:
# spark = SparkSession.builder.master("local[*]").getOrCreate()
# spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
# spark

In [None]:
# Unpack the json dump
import shutil
shutil.unpack_archive('/content/json_dumps.zip', '/content/json_dumps/')

In [None]:
import time
def timing_decorator(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()  # Start time
        result = func(*args, **kwargs)
        end_time = time.time()  # End time
        elapsed_time = end_time - start_time
        print(f"Function '{func.__name__}' took {elapsed_time:.4f} seconds to execute.")
        return result
    return wrapper

In [None]:
@timing_decorator
# def read_json():
#   df = spark.read.json("/content/json_dumps/*.json")
#   return df

# df = read_json()
# df.show(5)

Function 'read_json' took 144.7838 seconds to execute.
+--------------------+
|                Siri|
+--------------------+
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
+--------------------+
only showing top 5 rows



In [None]:
@timing_decorator
# def read_json():
#   df = spark.read.json("/content/json_dumps/*.json").repartition(100)
#   return df

# df = read_json()
# df.show(5)


Function 'read_json' took 144.8710 seconds to execute.
+--------------------+
|                Siri|
+--------------------+
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
+--------------------+
only showing top 5 rows



In [None]:
@timing_decorator
# def read_json():
#   df = spark.read.option("wholeFile", True).json("/content/json_dumps/*.json").repartition(100)
#   return df

# df = read_json()
# df.show(5)

Function 'read_json' took 138.6985 seconds to execute.
+--------------------+
|                Siri|
+--------------------+
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
|{1.3, http://www....|
+--------------------+
only showing top 5 rows



In [None]:
@timing_decorator
# def read_json():
#   df = spark.read \
#             .option("wholeFile", True) \
#             .json("/content/json_dumps/*.json").repartition(100) \
#             .coalesce(10)
#   return df

# df = read_json()
# df.show(5)

In [None]:
# spark.stop()
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()
spark

In [None]:
# Define the schema for the nested JSON structure
schema = StructType([
    StructField("Siri", StructType([
        StructField("@version", StringType(), True),
        StructField("@xmlns", StringType(), True),
        StructField("ResponseTimestamp", TimestampType(), True),
        StructField("VehicleMonitoringDelivery", StructType([
            StructField("@version", StringType(), True),
            StructField("ResponseTimestamp", TimestampType(), True),
            StructField("ValidUntil", StringType(), True),
            StructField("VehicleActivity", StructType([
                StructField("RecordedAtTime", TimestampType(), True),
                StructField("MonitoredVehicleJourney", ArrayType(StructType([
                    StructField("LineRef", StringType(), True),
                    StructField("DirectionRef", StringType(), True),
                    StructField("FramedVehicleJourneyRef", StructType([
                        StructField("DataFrameRef", StringType(), True),
                        StructField("DatedVehicleJourneyRef", StringType(), True)
                    ]), True),
                    StructField("PublishedLineName", StringType(), True),
                    StructField("OriginRef", StringType(), True),
                    StructField("DestinationRef", StringType(), True),
                    StructField("Monitored", StringType(), True),
                    StructField("VehicleLocation", StructType([
                        StructField("Longitude", DecimalType(17, 14), True),
                        StructField("Latitude", DecimalType(17, 14), True)
                    ]), True),
                    StructField("ProgressRate", StringType(), True),
                    StructField("CourseOfJourneyRef", StringType(), True),
                    StructField("VehicleRef", StringType(), True),
                    StructField("MonitoredCall", StructType([
                        StructField("StopPointRef", StringType(), True),
                        StructField("VisitNumber", StringType(), True),
                        StructField("VehicleAtStop", StringType(), True)
                    ]), True),
                    StructField("Extensions", StructType([
                        StructField("LastGPSFix", TimestampType(), True),
                        StructField("Scheduled", StringType(), True),
                        StructField("Bearing", DecimalType(5, 2), True),
                        StructField("Speed", DecimalType(5, 2), True),
                        StructField("DestinationName", StringType(), True)
                    ]), True)
                ])), True)
            ]), True)
        ]), True)
    ]), True)
])

In [None]:
@timing_decorator
def read_json():
    df = spark.read.option("wholeFile", True) \
        .schema(schema) \
        .json("/content/json_dumps/*.json") \
        .withColumn("FileId", input_file_name()) \
        .repartition(100)
    return df

df = read_json().persist()
df.show(5)

Function 'read_json' took 87.2871 seconds to execute.
+--------------------+--------------------+
|                Siri|              FileId|
+--------------------+--------------------+
|{1.3, http://www....|file:///content/j...|
|{1.3, http://www....|file:///content/j...|
|{1.3, http://www....|file:///content/j...|
|{1.3, http://www....|file:///content/j...|
|{1.3, http://www....|file:///content/j...|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
df.select("Siri.VehicleMonitoringDelivery.VehicleActivity.MonitoredVehicleJourney.Extensions").show(5, False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|

In [None]:
flattened_df = df.select(
    "FileId",
    col("Siri.@version").alias("Siri_version"),
    col("Siri.@xmlns").alias("Siri_xmlns"),
    col("Siri.ResponseTimestamp").alias("Siri_ResponseTimestamp"),
    col("Siri.VehicleMonitoringDelivery.@version").alias("VehicleMonitoringDelivery_version"),
    col("Siri.VehicleMonitoringDelivery.ResponseTimestamp").alias("VehicleMonitoringDelivery_ResponseTimestamp"),
    col("Siri.VehicleMonitoringDelivery.ValidUntil").alias("VehicleMonitoringDelivery_ValidUntil"),
    col("Siri.VehicleMonitoringDelivery.VehicleActivity.RecordedAtTime").alias("RecordedAtTime"),
    explode("Siri.VehicleMonitoringDelivery.VehicleActivity.MonitoredVehicleJourney").alias("MonitoredVehicleJourney")
)

In [None]:
flattened_df = flattened_df.select(
    "FileId",
    "Siri_version", "Siri_xmlns", "Siri_ResponseTimestamp",
    "VehicleMonitoringDelivery_version", "VehicleMonitoringDelivery_ResponseTimestamp",
    "VehicleMonitoringDelivery_ValidUntil", "RecordedAtTime",
    col("MonitoredVehicleJourney.LineRef").alias("LineRef"),
    col("MonitoredVehicleJourney.DirectionRef").alias("DirectionRef"),
    col("MonitoredVehicleJourney.FramedVehicleJourneyRef.DataFrameRef").alias("DataFrameRef"),
    col("MonitoredVehicleJourney.FramedVehicleJourneyRef.DatedVehicleJourneyRef").alias("DatedVehicleJourneyRef"),
    col("MonitoredVehicleJourney.PublishedLineName").alias("PublishedLineName"),
    col("MonitoredVehicleJourney.OriginRef").alias("OriginRef"),
    col("MonitoredVehicleJourney.DestinationRef").alias("DestinationRef"),
    col("MonitoredVehicleJourney.Monitored").alias("Monitored"),
    col("MonitoredVehicleJourney.VehicleLocation.Longitude").alias("VehicleLocation_Longitude"),
    col("MonitoredVehicleJourney.VehicleLocation.Latitude").alias("VehicleLocation_Latitude"),
    col("MonitoredVehicleJourney.ProgressRate").alias("ProgressRate"),
    col("MonitoredVehicleJourney.CourseOfJourneyRef").alias("CourseOfJourneyRef"),
    col("MonitoredVehicleJourney.VehicleRef").alias("VehicleRef"),
    col("MonitoredVehicleJourney.MonitoredCall.StopPointRef").alias("MonitoredCall_StopPointRef"),
    col("MonitoredVehicleJourney.MonitoredCall.VisitNumber").alias("MonitoredCall_VisitNumber"),
    col("MonitoredVehicleJourney.MonitoredCall.VehicleAtStop").alias("MonitoredCall_VehicleAtStop"),
    col("MonitoredVehicleJourney.Extensions.LastGPSFix").alias("Extensions_LastGPSFix"),
    col("MonitoredVehicleJourney.Extensions.Scheduled").alias("Extensions_Scheduled"),
    col("MonitoredVehicleJourney.Extensions.Bearing").alias("Extensions_Bearing"),
    col("MonitoredVehicleJourney.Extensions.Speed").alias("Extensions_Speed"),
    col("MonitoredVehicleJourney.Extensions.DestinationName").alias("Extensions_DestinationName")
)

In [None]:
flattened_df = flattened_df.withColumn("FileId", regexp_extract("FileId", r"([^/]+)(?=\.json$)", 1))
flattened_df.show(5)

+--------------+------------+--------------------+----------------------+---------------------------------+-------------------------------------------+------------------------------------+--------------------+-------+--------------+--------------------+----------------------+--------------------+---------+--------------+---------+-------------------------+------------------------+------------+------------------+----------+--------------------------+-------------------------+---------------------------+---------------------+--------------------+------------------+----------------+--------------------------+
|        FileId|Siri_version|          Siri_xmlns|Siri_ResponseTimestamp|VehicleMonitoringDelivery_version|VehicleMonitoringDelivery_ResponseTimestamp|VehicleMonitoringDelivery_ValidUntil|      RecordedAtTime|LineRef|  DirectionRef|        DataFrameRef|DatedVehicleJourneyRef|   PublishedLineName|OriginRef|DestinationRef|Monitored|VehicleLocation_Longitude|VehicleLocation_Latitude|Pro

## Exploratory Data Analysis

In [None]:
testdf = flattened_df.filter(col("FileId") == "455-1723569052")
testdf.withColumn("Lat_Long", concat("VehicleLocation_Latitude", lit(", "), "VehicleLocation_Longitude") ).show(100, False)

+--------------+------------+---------------------------+--------------------------+---------------------------------+-------------------------------------------+------------------------------------+--------------------------+-------+--------------+-------------------------+----------------------+-----------------------+---------+--------------+---------+-------------------------+------------------------+------------+------------------+----------+--------------------------+-------------------------+---------------------------+-----------------------+--------------------+------------------+----------------+--------------------------+--------------------------------------+
|FileId        |Siri_version|Siri_xmlns                 |Siri_ResponseTimestamp    |VehicleMonitoringDelivery_version|VehicleMonitoringDelivery_ResponseTimestamp|VehicleMonitoringDelivery_ValidUntil|RecordedAtTime            |LineRef|DirectionRef  |DataFrameRef             |DatedVehicleJourneyRef|PublishedLineName     

In [None]:
# Check if there are duplicates. Identify if there are duplicates for columns DatedVehicleJourneyRef, VehicleRef
testdf.groupBy("VehicleRef")\
    .count()\
    .filter(col("count") > 1)\
    .show()


+----------+-----+
|VehicleRef|count|
+----------+-----+
+----------+-----+



In [None]:

flattened_df.filter(col("VehicleRef") == "14008").orderBy("Siri_ResponseTimestamp").show(25, False)
# flattened_df.filter(col("Extensions_Scheduled") != False).show(5)

+--------------+------------+---------------------------+--------------------------+---------------------------------+-------------------------------------------+------------------------------------+--------------------------+-------+--------------+-------------------------+----------------------+-----------------------+---------+--------------+---------+-------------------------+------------------------+------------+------------------+----------+--------------------------+-------------------------+---------------------------+-----------------------+--------------------+------------------+----------------+--------------------------+
|FileId        |Siri_version|Siri_xmlns                 |Siri_ResponseTimestamp    |VehicleMonitoringDelivery_version|VehicleMonitoringDelivery_ResponseTimestamp|VehicleMonitoringDelivery_ValidUntil|RecordedAtTime            |LineRef|DirectionRef  |DataFrameRef             |DatedVehicleJourneyRef|PublishedLineName      |OriginRef|DestinationRef|Monitored|Ve

## Data Cleaning

In [None]:
select_df = flattened_df.select(
      "FileId",
      col("Siri_ResponseTimestamp").alias("ResponseTimestamp"),
      col("LineRef"),
      col("DirectionRef"),
      col("DatedVehicleJourneyRef"),
      col("PublishedLineName"),
      col("OriginRef"),
      col("DestinationRef"),
      col("VehicleLocation_Longitude").alias("Longitude"),
      col("VehicleLocation_Latitude").alias("Latitude"),
      col("CourseOfJourneyRef"),
      col("VehicleRef"),
      col("Extensions_LastGPSFix").alias("LastGPSFix"),
      col("Extensions_Bearing").alias("Bearing"),
      col("Extensions_Speed").alias("Speed"),
      col("Extensions_DestinationName").alias("DestinationName")
    ).withColumn("LatLong", concat("Latitude", lit(", "), "Longitude"))


In [None]:
select_df.show(5)
# select_df.groupBy(select_df.columns)\
#     .count() \
#     .filter(col("count") > 1) \
#     .show()
# select_df.count()
# select_df = select_df.repartition(1)

# select_df.rdd.getNumPartitions()

+--------------+--------------------+-------+--------------+----------------------+--------------------+---------+--------------+-------------------+-----------------+------------------+----------+--------------------+-------+-----+--------------------+--------------------+
|        FileId|   ResponseTimestamp|LineRef|  DirectionRef|DatedVehicleJourneyRef|   PublishedLineName|OriginRef|DestinationRef|          Longitude|         Latitude|CourseOfJourneyRef|VehicleRef|          LastGPSFix|Bearing|Speed|     DestinationName|             LatLong|
+--------------+--------------------+-------+--------------+----------------------+--------------------+---------+--------------+-------------------+-----------------+------------------+----------+--------------------+-------+-----+--------------------+--------------------+
|455-1723568479|2024-08-13 17:01:...|    455| OGDEN VIA WSU|               5319029|U OF U/DAVIS COUN...|   127027|        623423|-111.83105200000000|40.75620800000000|        

## Spark Writes
When writing data, Spark will break down the DataFrame into partitions and distribute those partitions across the available executors. Each executor will handle multiple partitions, and within an executor, each partition is processed using a single thread. So, multithreading per se isn't explicitly used for writes in Spark; instead, Spark relies on task parallelism, where multiple partitions are written concurrently.

Parquet is a columnar storage file format optimized for analytical queries and efficient data storage. Parquets are used to store large amount of data and for efficient storage, spark partitions are set such that each parquet is between 100mb and 1gb.

In this use case since the volume of data is significantly less, coalescing the data to 1 single partition is more efficient than having multiple partition.
For reference, 100 spark partitions takes 25 seconds to write data to 100 parquets of few kb's each, while writing to 1 parquet of 1mb takes about 2-3 seconds. Writing large volumes (generally in TB's) of data in a single partition takes more time, in such cases we set partition to such that each parquet is of few 100mb's.

In [None]:
# Check df schema
select_df.printSchema()

root
 |-- FileId: string (nullable = false)
 |-- ResponseTimestamp: timestamp (nullable = true)
 |-- LineRef: string (nullable = true)
 |-- DirectionRef: string (nullable = true)
 |-- DatedVehicleJourneyRef: string (nullable = true)
 |-- PublishedLineName: string (nullable = true)
 |-- OriginRef: string (nullable = true)
 |-- DestinationRef: string (nullable = true)
 |-- Longitude: decimal(17,14) (nullable = true)
 |-- Latitude: decimal(17,14) (nullable = true)
 |-- CourseOfJourneyRef: string (nullable = true)
 |-- VehicleRef: string (nullable = true)
 |-- LastGPSFix: timestamp (nullable = true)
 |-- Bearing: decimal(5,2) (nullable = true)
 |-- Speed: decimal(5,2) (nullable = true)
 |-- DestinationName: string (nullable = true)
 |-- LatLong: string (nullable = true)



Parquets use run length encoding to store the data in a comressed format. To make the most of this technique, it is important to sort the data in such a way that adjacent rows has minimal differences.

Example:
Without sorting
```
Row	UserID
1	1002
2	2001
3	1002         =====>    [1, 1002], [1, 2001], [1, 1002], [1, 2001], [1, 1002], [1, 2001]
4	2001
5	1002
6	2001
```
With Sorting
```
Row	UserID
1	1002
2	1002
3	1002        =====>    [3, 1002], [3, 2001]
4	2001
5	2001
6	2001
```

In [None]:
@timing_decorator
def write_parquet():
  select_df.coalesce(1).write.partitionBy("LineRef").mode("overwrite").parquet("/content/UtaVehicleRef")

write_parquet()


Function 'write_parquet' took 6.7596 seconds to execute.


## Duck DB Installation

Why DuckDB ? Answer is OLAP + in-process / in-memory

https://hannes.muehleisen.org/publications/SIGMOD2019-demo-duckdb.pdf

In [None]:
pip install duckdb



In [None]:
import duckdb

In [None]:
db_name = "UtaVehicleRef"

In [None]:
# Connect to DuckDB
conn = duckdb.connect(database = ":memory:"+db_name, read_only = False)

# Drop the existing table if it exists
conn.execute("DROP TABLE IF EXISTS VehicleRef")

# Create the table with the specified schema
create_table_query = """
CREATE TABLE VehicleRef (
    FileId STRING NOT NULL,
    ResponseTimestamp TIMESTAMP,
    DirectionRef STRING,
    DatedVehicleJourneyRef STRING,
    PublishedLineName STRING,
    OriginRef STRING,
    DestinationRef STRING,
    Longitude DECIMAL(17, 14),
    Latitude DECIMAL(17, 14),
    CourseOfJourneyRef STRING,
    VehicleRef STRING,
    LastGPSFix TIMESTAMP,
    Bearing DECIMAL(5, 2),
    Speed DECIMAL(5, 2),
    DestinationName STRING,
    LatLong STRING,
    LineRef STRING
);
"""
conn.execute(create_table_query)

# Path to the Parquet files
parquet_paths = [
    'UtaVehicleRef/LineRef=1/*.parquet',
    'UtaVehicleRef/LineRef=2/*.parquet',
    'UtaVehicleRef/LineRef=220/*.parquet',
    'UtaVehicleRef/LineRef=4/*.parquet',
    'UtaVehicleRef/LineRef=455/*.parquet'
]

# Read and insert data from each Parquet file path into the table
for path in parquet_paths:
    # Read data from Parquet
    read_query = f"SELECT * FROM read_parquet('{path}')"

    # Insert data into the table
    insert_query = f"INSERT INTO VehicleRef {read_query}"
    conn.execute(insert_query)


#verify
tables = conn.execute("SHOW TABLES").fetch_df()
print("Tables in database:")
print(tables)

# Check the schema of the newly created table
schema = conn.execute("DESCRIBE TABLE VehicleRef").fetch_df()
print("\nSchema of 'VehicleRef' table:")
print(schema)

# Fetch and display a sample of the data
result = conn.execute("SELECT * FROM VehicleRef LIMIT 10").fetch_df()
print("\nSample data from 'VehicleRef' table:")
print(result)

Tables in database:
         name
0  VehicleRef

Schema of 'VehicleRef' table:
               column_name     column_type null   key default extra
0                   FileId         VARCHAR  YES  None    None  None
1        ResponseTimestamp       TIMESTAMP  YES  None    None  None
2             DirectionRef         VARCHAR  YES  None    None  None
3   DatedVehicleJourneyRef         VARCHAR  YES  None    None  None
4        PublishedLineName         VARCHAR  YES  None    None  None
5                OriginRef         VARCHAR  YES  None    None  None
6           DestinationRef         VARCHAR  YES  None    None  None
7                Longitude  DECIMAL(17,14)  YES  None    None  None
8                 Latitude  DECIMAL(17,14)  YES  None    None  None
9       CourseOfJourneyRef         VARCHAR  YES  None    None  None
10              VehicleRef         VARCHAR  YES  None    None  None
11              LastGPSFix       TIMESTAMP  YES  None    None  None
12                 Bearing    DECIMAL

In [None]:
query = """
with start_end_time as (
  select LineRef, DatedVehicleJourneyRef, min(LastGPSFix) as start_time, max(LastGPSFix) as end_time,
  from VehicleRef
  group by LineRef, DatedVehicleJourneyRef
),
trip_details as (
  select se.LineRef, se.DatedVehicleJourneyRef, start_time, end_time, start.Latitude as start_lat, start.Longitude as start_long,
  e.Latitude as end_lat, e.Longitude as end_long,
    3959 * acos(
    cos(radians(start.Latitude)) * cos(radians(e.Latitude)) *
    cos(radians(e.Longitude) - radians(start.Longitude)) +
    sin(radians(start.Latitude)) * sin(radians(e.Latitude))
  ) AS distance_miles,
  extract(epoch from (end_time - start_time))/3600 as trip_duration
  from start_end_time se
  left join VehicleRef start on se.start_time = start.LastGPSFix and se.LineRef = start.LineRef and se.DatedVehicleJourneyRef = start.DatedVehicleJourneyRef
  left join VehicleRef e on se.end_time = e.LastGPSFix and se.LineRef = e.LineRef and se.DatedVehicleJourneyRef = e.DatedVehicleJourneyRef
)
select distinct LineRef, DatedVehicleJourneyRef, start_time, end_time, distance_miles, trip_duration, distance_miles*1.0/trip_duration as average_speed from trip_details
"""
conn.execute(query).fetchdf()

Unnamed: 0,LineRef,DatedVehicleJourneyRef,start_time,end_time,distance_miles,trip_duration,average_speed
0,1,5308330,2024-08-13 10:11:41.317,2024-08-13 11:27:12.657,4.459877,1.258706,3.543225
1,1,5308398,2024-08-13 11:27:22.677,2024-08-13 12:25:03.943,4.482024,0.961463,4.661672
2,1,5308400,2024-08-13 11:58:14.120,2024-08-13 12:38:15.240,4.425166,0.666978,6.634652
3,1,5308382,2024-08-13 12:32:17.587,2024-08-13 12:38:17.243,0.970742,0.099904,9.716709
4,1,5308332,2024-08-13 10:40:32.677,2024-08-13 11:58:04.150,4.450243,1.292076,3.444258
...,...,...,...,...,...,...,...
127,455,5319057,2024-08-13 10:02:55.827,2024-08-13 12:38:08.780,31.541836,2.586931,12.192761
128,4,5307384,2024-08-13 09:14:40.390,2024-08-13 10:08:42.127,8.738532,0.900483,9.704277
129,455,5319024,2024-08-13 09:14:35.757,2024-08-13 11:02:47.483,27.837435,1.803257,15.437307
130,2,5308058,2024-08-13 11:42:26.560,2024-08-13 12:08:27.127,3.662370,0.433491,8.448551
