# Data Design and Streaming

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.types import  StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
from pyspark.sql.functions import col, split, element_at, when,from_json, to_timestamp, coalesce,broadcast, expr, unix_timestamp
from pyspark.sql.functions import abs as spark_abs
import pandas as pd
from datetime import datetime

hostip = "172.28.144.1"


## Database setup

In [5]:
db_client = MongoClient(hostip, 27017) 
db = db_client["fit3182_a2_db"]

def insert_data(file_name: str, db_name: str, collection_name: str, client: MongoClient) -> None:
    """
    Reads data from a CSV file and inserts it into a specified MongoDB collection.
    
    Parameters:
    ----------
    file_name (str): The path to the CSV file containing the data to be inserted.
        
    db_name (str) : The name of the MongoDB database where the collection resides.
        
    collection_name (str) : The name of the MongoDB collection to insert data into. 
        
    client (pymongo.MongoClient) : MongoDB client instance used to connect to the database.    
    """
    
    # use panda to help parse csv
    file_df = pd.read_csv(file_name)

    db = client[db_name]
    collection = db[collection_name]  
    collection.drop()
    
    # Convert each row of the DataFrame into a dictionary
    documents = [row.to_dict() for _, row in file_df.iterrows()]
    
    collection.insert_many(documents)



Insert static data provided from the csv : camera and vehicle

In [None]:
insert_data("camera.csv","fit3182_a2_db","camera",db_client)

insert_data("vehicle.csv","fit3182_a2_db","vehicle",db_client)


Initializes two MongoDB collections: **no_match_records** and **violations**.  

Each collection is indexed on a key field that is commonly used in insertions, updates, or queries later. 

In [None]:
collection = db["no_match_records"]
collection.drop()
collection.create_index([("car_plate", 1)])


collection = db["violations"]
collection.drop()
collection.create_index([("violation_id",1)])
collection.create_index([("car_plate",1)])

# Event Stream Processing

## Instantaneous Speed Violation Detection

Three Kafka consumers are set up to listen to events from a Kafka producer that broadcasts camera events from Points A, B, and C 
this would be **topic_stream_cam_a_df** , **topic_stream_cam_b_df**, **topic_stream_cam_c_df**. Each stream will joined with a static camera metadata DataFrame **camera_df**, which provides the speed limit for the corresponding camera.

Once joined, each of the three data streams is sent directly to a MongoDB sink (**DBWriterSingle()**). Since each event now contains both the vehicle’s recorded speed and the speed limit (from the camera metadata), the violation check is performed by simply comparing the two. If the vehicle exceeds the speed limit, the event is stored in the MongoDB database as a violation, if it doesnt, it will simply be logged. No stream-to-stream joining is needed for this step.

## Average Speed Violation Detection

To detect average speed violations, where a vehicle passes through multiple camera points, stream-to-stream joins are necessary to correlate events from different camera locations.

This is implemented using PySpark Structured Streaming's stream-stream join functionality:

* **topic_stream_cam_a_df** is joined with **topic_stream_cam_b_df**

* **topic_stream_cam_b_df** is joined with **topic_stream_cam_c_df**

These joins would be on the car_plate and are performed within a defined time window.


### State Management with Watermarks

Because stream-stream joins require holding data to wait for matching records, watermarking is used to manage state to make sure data doesnt sit too long on the state.

A 10-minute watermark is set, which means, For any given stream, if an incoming event's timestamp is more than 10 minutes earlier than the maximum event time seen so far, it will be considered too late to match. Since a full outer join is used, unmatched records are emitted with null on one side of the join.

Unmatched records emitted from the stream-stream join are filtered into a separate stream and sent to a dedicated MongoDB sink (**DBWriterNoMatch()**), which writes them to the **no_match_record** collection. When future events arrive, they query this collection to check for a corresponding unmatched record. If a match is found, the system attempts to reconcile the two records.

If an event arrives after the watermark threshold, it will not participate in any join logic at all, it is ignored for average speed calculation. However, it can still be processed for instantaneous speed checking, since that logic is independent of joins.

Matched records resulting from the stream-stream join are routed into a dedicated stream for average speed calculation. This stream computes the average speed between two camera points and sends the results to a MongoDB sink (**DBWriterJoin()**). If the calculated speed exceeds the speed limit, the record is stored in the violation collection. Otherwise, the event is simply logged


In [4]:
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName('Spark Streaming Violation Detection')
    .getOrCreate()
)

# Define Kafka topics corresponding to the three camera event sources
topic_1 = 'Camera_A'
topic_2 = 'Camera_B'
topic_3 = 'Camera_C'


'violation_id_1'

Create a Spark DataFrame containing camera information, which will later be used to join with the camera event stream

In [5]:
pd_camera = pd.read_csv("camera.csv")

camera_df = spark.createDataFrame(pd_camera)

camera_df = camera_df.select(
    col("camera_id").cast(IntegerType()).alias("camera_id"),
    col("position"),
    col("speed_limit")
).cache() # cache to avoid recomputing the static camera_df during repeated joins with the streaming data

camera_df.printSchema()

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


root
 |-- camera_id: integer (nullable = true)
 |-- position: double (nullable = true)
 |-- speed_limit: long (nullable = true)



In [6]:
# Define the expected JSON schema for the Kafka message payload
json_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("car_plate", StringType(), True),
    StructField("camera_id", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("speed_reading", DoubleType(), True)
])

def create_consumer(topic):
    """
    Creates and returns a structured streaming DataFrame from a Kafka topic, joined with static camera metadata.
    
    Parameters:
        topic (str): The Kafka topic to subscribe to.

    Returns:
        pyspark.sql.DataFrame: A streaming DataFrame containing parsed, timestamped, and enriched event data.
    """
    topic_stream = (
        spark.readStream.format('kafka')
        .option('kafka.bootstrap.servers', f'{hostip}:9092')
        .option('subscribe', topic) 
        .load()
    )
    
     # Parse Kafka message value (JSON), extract fields, and add camera meta data
    modified_stream = (
        topic_stream
        .select(
            from_json(col("value").cast("string"), json_schema).alias("data")
        )
        .select("data.*")
        .withColumn("event_time", to_timestamp(col("timestamp"))) # watermark for joining work with timestamp data type
        .withColumnRenamed("camera_id", "cam_id") # to remove it later so only one column of camera_id
        .join(broadcast(camera_df), col("cam_id") == col("camera_id"))
        .drop("cam_id")
    )
    
    return modified_stream

In [7]:
# Create structured streams for each of the three camera topics
topic_stream_cam_a_df = create_consumer(topic_1)
topic_stream_cam_b_df = create_consumer(topic_2)
topic_stream_cam_c_df = create_consumer(topic_3)

## Join between stream a and b

As previously mentioned, watermarking is essential for managing state during stream-stream joins. While increasing the watermark duration can help reduce data loss by allowing more late-arriving data to be matched, it also increases the state retention time, which leads to higher memory usage and makes the join operation more resource-intensive.

In [None]:
# Apply watermarking to each input stream to manage state and control late data retention.
# Columns are renamed with prefixes ("left_" and "right_") to differentiate them after join
cam_a_watermarked = (
    topic_stream_cam_a_df.withWatermark("event_time", "10 minutes")
    .select([col(c).alias(f"left_{c}") for c in topic_stream_cam_a_df.columns])
) 

cam_b_watermarked = (
    topic_stream_cam_b_df.withWatermark("event_time", "10 minutes")
    .select([col(c).alias(f"right_{c}") for c in topic_stream_cam_b_df.columns])
) 


joined_stream_a_b = cam_a_watermarked.join(
    cam_b_watermarked,
    expr("""
        left_car_plate = right_car_plate AND
        right_event_time > left_event_time AND
        right_event_time <= left_event_time + interval 2 minutes
    """),
    "full_outer"
)

# From the joined stream, filter out records that have valid matches
stream_with_avg_speed_a_b = (
    joined_stream_a_b
    .filter("left_car_plate IS NOT NULL AND right_car_plate IS NOT NULL")
    .withColumn("distance_km", spark_abs(col("left_position") - col("right_position")))
    .withColumn("time_diff_hrs", (unix_timestamp("right_event_time") - unix_timestamp("left_event_time")) / 3600)
    .withColumn("avg_speed", col("distance_km") / col("time_diff_hrs"))
)


# Extract unmatched records from the full outer join.
# These represent partial records—vehicles observed at only one of the two camera points—
# which may be joined later when more data arrives.
unmatched_records_a_b = joined_stream_a_b.filter(
    "left_car_plate IS NULL OR right_car_plate IS NULL"
).select(
    coalesce(col("left_event_id"), col("right_event_id")).alias("event_id"),
    coalesce(col("left_car_plate"), col("right_car_plate")).alias("car_plate"),
    coalesce(col("left_timestamp"), col("right_timestamp")).alias("timestamp"),
    coalesce(col("left_speed_reading"), col("right_speed_reading")).alias("speed_reading"),
    coalesce(col("left_event_time"), col("right_event_time")).alias("event_time"),
    coalesce(col("left_camera_id"), col("right_camera_id")).alias("camera_id"),
    coalesce(col("left_position"), col("right_position")).alias("position"),
    coalesce(col("left_speed_limit"), col("right_speed_limit")).alias("speed_limit"),
)
    
stream_with_avg_speed_a_b.printSchema()
unmatched_records_a_b.printSchema()
    

root
 |-- left_event_id: string (nullable = true)
 |-- left_batch_id: integer (nullable = true)
 |-- left_car_plate: string (nullable = true)
 |-- left_timestamp: string (nullable = true)
 |-- left_speed_reading: double (nullable = true)
 |-- left_event_time: timestamp (nullable = true)
 |-- left_camera_id: integer (nullable = true)
 |-- left_position: double (nullable = true)
 |-- left_speed_limit: long (nullable = true)
 |-- right_event_id: string (nullable = true)
 |-- right_batch_id: integer (nullable = true)
 |-- right_car_plate: string (nullable = true)
 |-- right_timestamp: string (nullable = true)
 |-- right_speed_reading: double (nullable = true)
 |-- right_event_time: timestamp (nullable = true)
 |-- right_camera_id: integer (nullable = true)
 |-- right_position: double (nullable = true)
 |-- right_speed_limit: long (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- time_diff_hrs: double (nullable = true)
 |-- avg_speed: double (nullable = true)

root
 |-- even

## Join stream b and c

In [None]:
# Apply watermarking to each input stream to manage state and control late data retention.
# Columns are renamed with prefixes ("left_" and "right_") to differentiate them after join
cam_b_watermarked = (
    topic_stream_cam_b_df.withWatermark("event_time", "10 minutes")
    .select([col(c).alias(f"left_{c}") for c in topic_stream_cam_b_df.columns])
) 

cam_c_watermarked = (
    topic_stream_cam_c_df.withWatermark("event_time", "10 minutes")
    .select([col(c).alias(f"right_{c}") for c in topic_stream_cam_c_df.columns])
) 

joined_stream_b_c = cam_b_watermarked.join(
    cam_c_watermarked,
    expr("""
        left_car_plate = right_car_plate AND
        right_event_time > left_event_time AND
        right_event_time <= left_event_time + interval 2 minutes
    """),
    "full_outer"
)


# From the joined stream, filter out records that have valid matches
stream_with_avg_speed_b_c = (
    joined_stream_b_c
    .filter("left_car_plate IS NOT NULL AND right_car_plate IS NOT NULL")
    .withColumn("distance_km", spark_abs(col("left_position") - col("right_position")))
    .withColumn("time_diff_hrs", (unix_timestamp("right_event_time") - unix_timestamp("left_event_time")) / 3600)
    .withColumn("avg_speed", col("distance_km") / col("time_diff_hrs"))
)


# Extract unmatched records from the full outer join.
# These represent partial records—vehicles observed at only one of the two camera points—
# which may be joined later when more data arrives.
unmatched_records_b_c = joined_stream_b_c.filter(
    "left_car_plate IS NULL OR right_car_plate IS NULL"
).select(
    coalesce(col("left_event_id"), col("right_event_id")).alias("event_id"),
    coalesce(col("left_car_plate"), col("right_car_plate")).alias("car_plate"),
    coalesce(col("left_timestamp"), col("right_timestamp")).alias("timestamp"),
    coalesce(col("left_speed_reading"), col("right_speed_reading")).alias("speed_reading"),
    coalesce(col("left_event_time"), col("right_event_time")).alias("event_time"),
    coalesce(col("left_camera_id"), col("right_camera_id")).alias("camera_id"),
    coalesce(col("left_position"), col("right_position")).alias("position"),
    coalesce(col("left_speed_limit"), col("right_speed_limit")).alias("speed_limit"),
)
    

In [27]:
from pymongo import DeleteOne,ReplaceOne
from pymongo.errors import PyMongoError
import time

def safe_bulk_write(collection, operations, max_retries=3, delay=1):
    """
    Attempt to perform a bulk write operation on a MongoDB simple retry.
    Args:
        collection: The MongoDB collection object to perform the bulk write on.
        operations: A list of write operations
        max_retries (int): Maximum number of retry attempts if the write fails. Default is 3.
        delay (int or float): Delay in seconds between retry attempts. Default is 1 second.

    Returns:
        None

    """
    attempt = 0
    if operations == [] :
        return
    while attempt < max_retries:
        try:
            collection.bulk_write(operations)
            return
        except PyMongoError as e:
            attempt += 1
            print(f"[Retry {attempt}] Failed to write to '{collection}': {e}")
            time.sleep(delay)



class DbWriterJoin:
    
    """
    A class used to process stream of successful joined records 
    """
    def open(self, partition_id, epoch_id):
        self.client = MongoClient(host= hostip, port=27017)  
        self.db = self.client["fit3182_a2_db"]
        self.buffer = [] # to store the operation for bulk write in end of partition
        self.drop_pair = [] # to store pair that don't violate the speed limit to be logged

        return True

    def process(self, row):
        """
        Process a single row from the stream. If average speed exceeds the limit,
        prepare it for insertion as a violation; otherwise, store it to print it at the end of partition.

        """
        row_dict = row.asDict()
     
        if row_dict["avg_speed"] > row_dict["right_speed_limit"] :
            violation_id = row_dict["left_event_id"] + row_dict["right_event_id"] 
    
            record = {
                "violation_id" : violation_id,
                "car_plate": row_dict["left_car_plate"],
                "camera_id_end": row_dict["right_camera_id"],
                "camera_id_start": row_dict["left_camera_id"],
                "timestamp_start": row_dict["left_timestamp"],
                "timestamp_end": row_dict["right_timestamp"],
                "speed_reading": row_dict["avg_speed"],
                "violation_type" : "average"
            }
        
            # Prepare upsert operation to avoid duplicate entries for the same violation
            self.buffer.append(ReplaceOne({"violation_id" : violation_id},record,upsert = True))

        else :
            # Record that did not exceed speed limit; store for logging
            self.drop_pair.append(row_dict)
        
        
    def close(self, error):
        if error is None  :
            
            # Write all buffered violations to MongoDB
            safe_bulk_write(self.db["violations"], self.buffer, max_retries=3, delay=1)
            
            # Extract and print left and right record contents
            if self.drop_pair :
                print(f'drop pairs : ')
                for pair in self.drop_pair :
                    left_dict = {k[5:] : v for k, v in pair.items() if k.startswith('left_')}
                    right_dict = {k[6:] : v for k, v in pair.items() if k.startswith('right_')}
                    print((left_dict,right_dict))
        self.client.close()


class DbWriterNoMatch:
    """
    A class to handle writing unmatched records from a streaming join to MongoDB.

    This writer processes records that did not match in a join (e.g., no corresponding left/right event)
    and stores them in a 'no_match_records' MongoDB collection.
    """
    def open(self, partition_id, epoch_id):
        self.client = MongoClient(host= hostip, port=27017)  # replace with actual host/port
        self.db = self.client["fit3182_a2_db"]
        self.buffer = []  # to store the operation for bulk write in end of partition
        return True

    def process(self, row):
        """
        Process a single unmatched record and prepare it for MongoDB upsert.

        """
        row_dict = row.asDict()
        
        record = {
            "event_id" : row_dict["event_id"],
            "car_plate": row_dict["car_plate"],
            "timestamp": row_dict["timestamp"],
            "speed_reading": row_dict["speed_reading"],
            "event_time" : row_dict["event_time"],
            "camera" : {
                "camera_id": row_dict["camera_id"],
                "position" : row_dict["position"],
                "speed_limit" : row_dict["speed_limit"],
            }
        }

        # Prepare upsert operation to avoid duplicate entries for the same violation
        self.buffer.append(ReplaceOne({"event_id" : row_dict["event_id"]}, record ,upsert = True))
     
    def close(self, error):
        if error is None :
    
            safe_bulk_write(self.db["no_match_records"], self.buffer, max_retries=3, delay=1)
    
        self.client.close()
        
class DbWriterSingle:
    """
    A class for processing stream of single camera events.
    Aside from checking instantaneous speed, it also read unmatched records from the 'no_match_records' MongoDB collection,
    attempts to pair them base on car_plate and calculates average speed to determine possible violations.

    """
    def open(self, partition_id, epoch_id):
        self.client = MongoClient(host= hostip, port=27017)  # replace with actual host/port
        self.db = self.client["fit3182_a2_db"]
        self.buffer_violation = [] # Buffer for violation upserts
        self.buffer_nomatch = [] # Buffer for deleting matched no-match records
        self.record_to_drop = [] # Buffer for dropped record pairs (not violations)
        return True

    def process(self, row):
        """
        Process a single event record to determine if it violate speed limit and also 
        try to find match in the no_match_record

        """
        
        row_dict = row.asDict()
        
        match_records = self.db.no_match_records.find({"car_plate": row_dict["car_plate"]})
 
        for record in match_records :
        
             # Only consider cameras that are adjacent
            if abs( record["camera"]["camera_id"] - row_dict["camera_id"]) == 1 :
                t1 = record["event_time"]
                t2 = row_dict["event_time"]
                dif = t1-t2
                seconds = abs(dif.total_seconds())
                
                # determine which is the start point or end point
                if record["camera"]["camera_id"]  > row_dict["camera_id"] :
                    speed_limit = record["camera"]["speed_limit"]
                    camera_id_start = row_dict["camera_id"]
                    camera_id_end = record["camera"]["camera_id"]
                    timestamp_start = row_dict["timestamp"]
                    timestamp_end = record["timestamp"]
                else :
                    speed_limit = row_dict["speed_limit"]
                    camera_id_start = record["camera"]["camera_id"]
                    camera_id_end = row_dict["camera_id"]
                    timestamp_start = record["timestamp"]
                    timestamp_end = row_dict["timestamp"]

                position1 = record["camera"]["position"]
                position2 = row_dict["position"]
                distance = abs(position1-position2)
                avg_speed = distance / (seconds/3600)

                if avg_speed > speed_limit :
                    # Create a unique violation ID and construct a violation record
                    violation_id = row_dict["event_id"] + record["event_id"] 
                                                       
                    violation_record = {
                        "violation_id" : violation_id,
                        "car_plate": row_dict["car_plate"],
                        "camera_id_start": camera_id_start,
                        "camera_id_end": camera_id_end,
                        "timestamp_start": timestamp_start,
                        "timestamp_end": timestamp_end,
                        "speed_reading": avg_speed,
                        "violation_type" : "average"
                    }

                    # Buffer for upsert to violations                      
                    self.buffer_violation.append(ReplaceOne({"violation_id" : violation_id},violation_record ,upsert = True))
                                                       
                else :
                    
                    # collect to log it if it don't violate speed limit
                    self.record_to_drop.append((record,row_dict))
                                                    
                # remove the record from the collection since it had found a proper match
                self.buffer_nomatch.append(DeleteOne({"_id": record["_id"]}))
                  
        instantaneous speed
        if row_dict["speed_limit"] < row_dict["speed_reading"] :
            
            
            record = {
                "violation_id" : row_dict["event_id"],
                "car_plate": row_dict["car_plate"],
                "camera_id_end": row_dict["camera_id"],
                "camera_id_start": row_dict["camera_id"],
                "timestamp_start": row_dict["timestamp"],
                "timestamp_end": row_dict["timestamp"],
                "speed_reading": row_dict["speed_reading"],
                "violation_type" : "instantaneous"
            }
            
             Buffer for upsert to violations
            self.buffer_violation.append(ReplaceOne({"violation_id" : row_dict["event_id"]},record,upsert = True))

                                      
    def close(self, error):
        if error is None :                                               
            safe_bulk_write(self.db["violations"], self.buffer_violation, max_retries=3, delay=1)
            safe_bulk_write(self.db["no_match_records"], self.buffer_nomatch, max_retries=3, delay=1)
            
            # Print dropped pairs (valid matches that didn't result in a violation)
            if self.record_to_drop :                            
                print(f'dropped pairs : ')
                for i,j in self.record_to_drop :
                    # Clean up and reformat before printing
                    i["camera_id"] = i["camera"]["camera_id"]
                    i.pop("camera")
                    i.pop("_id")
                    i.pop("event_time")
                    j.pop("position")
                    j.pop("speed_limit")
                    j.pop("event_time")
                    print((i,j))
                                                       
        self.client.close()


In [None]:
# Combine average speed violations from two segments: A - > B and B -> C
stream_with_average = stream_with_avg_speed_a_b.union(stream_with_avg_speed_b_c)

# Combine all incoming camera event streams (Camera A, B, and C) into a single stream
union_cam_a_b_c = topic_stream_cam_a_df.union(topic_stream_cam_b_df).union(topic_stream_cam_c_df)

# Merge unmatched records from both segments A -> B and B -> C
no_match_record_union = unmatched_records_b_c.union(unmatched_records_a_b)

# Define a streaming write operation for unmatched records
no_match_record = (
    no_match_record_union
    .writeStream
    .outputMode('append')
    .foreach(DbWriterNoMatch())
)


# Define a streaming write for succesful join
join_writer = ( 
    stream_with_average
    .writeStream
    .outputMode('append')
    .foreach(DbWriterJoin())
)


# Define a streaming write for raw records directly from all three cameras
single_record_writer = (
    union_cam_a_b_c
    .writeStream
    .outputMode('append')
    .foreach(DbWriterSingle())
)



try:
    query_1 = no_match_record.start()
    query_2 = join_writer.start()
    query_3 = single_record_writer.start()

    
    query_1.awaitTermination()
    query_2.awaitTermination()
    query_3.awaitTermination()


    
except KeyboardInterrupt:
    print('Interrupted by CTRL-C. Stopped query')
except StreamingQueryException as exc:
    print(exc)
finally:
    query_1.stop()
    query_2.stop()
    query_3.stop()



