In [None]:
import json
import os
import threading
import time
from kafka import KafkaConsumer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, avg, approx_count_distinct, unix_timestamp, lit
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType


DATA_DIR = 'kafka_data'
os.makedirs(DATA_DIR, exist_ok=True)

def consume_and_store():
    consumer = KafkaConsumer(
        'vehicle_positions',
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        group_id='vehicle_consumer_group',
        value_deserializer=lambda m: json.loads(m.decode('utf-8'))
    )
 

    file_path = os.path.join(DATA_DIR, 'data.json')
    with open(file_path, 'a') as file:
        for message in consumer:
            file.write(json.dumps(message.value) + '\n')

def process_with_spark():

    spark = SparkSession.builder \
        .appName("MongoDB Spark Connector")\
        .config("spark.mongodb.input.uri", "mongodb://localhost:27017/vehicle.vehicle_position")\
        .config("spark.mongodb.output.uri","mongodb://localhost:27017/vehicle.vehicle_position")\
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0") \
        .getOrCreate()


    schema = StructType([
        StructField("vehicle_name", StringType(), True),
        StructField("origin", StringType(), True),
        StructField("destination", StringType(), True),
        StructField("time", TimestampType(), True),
        StructField("link", StringType(), True),
        StructField("position", StructType([
            StructField("x", DoubleType(), True),
            StructField("y", DoubleType(), True)
        ]), True),
        StructField("spacing", DoubleType(), True),
        StructField("speed", DoubleType(), True)
    ])

    
    file_path = os.path.join(DATA_DIR, 'data.json')
    
    if os.path.exists(file_path):
        df = spark.read.json(file_path, schema=schema)

       
        df.printSchema()
        df.show(truncate=False)

        
        min_time = df.selectExpr("min(time) as min_time").collect()[0]["min_time"]

        raw_df = df.select("*") 
        
        result_df = df \
            .withColumn("elapsed_time", unix_timestamp(col("time")) - unix_timestamp(lit(min_time))) \
            .groupBy(window(col("time"), "1 second"), col("link")) \
            .agg(
                approx_count_distinct("vehicle_name").alias("vcount"),  # vcount
                avg("speed").alias("vspeed")                            # Average speed 
            ) \
            .withColumn("window_start_unix", unix_timestamp(col("window.start"))) \
            .withColumn("elapsed_seconds", col("window_start_unix") - unix_timestamp(lit(min_time))) \
            .select(
                col("elapsed_seconds").alias("time"),
                col("link"),
                col("vcount"),
                col("vspeed")
            ) \
            .orderBy("elapsed_seconds", "link")  

        result_df.show(truncate=False)

        result_df.write \
            .format("mongodb") \
            .mode("append") \
            .option("uri", "mongodb://localhost:27017/vehicle.vehicle_position") \
            .option("database", "vehicle") \
            .option("collection", "vehicle_position") \
            .save()
        
        raw_df.write \
            .format("mongodb") \
            .mode("append") \
            .option("uri", "mongodb://localhost:27017/vehicle.vehicle_position") \
            .option("database", "vehicle") \
            .option("collection", "raw_data") \
            .save()
if __name__ == "__main__":
    
    consumer_thread = threading.Thread(target=consume_and_store, daemon=True)
    consumer_thread.start()
    time.sleep(10)
    
    process_with_spark()
    consumer_thread.join()


root
 |-- vehicle_name: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- link: string (nullable = true)
 |-- position: struct (nullable = true)
 |    |-- x: double (nullable = true)
 |    |-- y: double (nullable = true)
 |-- spacing: double (nullable = true)
 |-- speed: double (nullable = true)

+------------+------+-----------+--------------------------+--------+--------------------------+-------+------------------+
|vehicle_name|origin|destination|time                      |link    |position                  |spacing|speed             |
+------------+------+-----------+--------------------------+--------+--------------------------+-------+------------------+
|0           |S4    |N1         |2024-08-24 12:01:15.219788|S4_to_I4|{0.0, -1.0}               |-1.0   |30.0              |
|0           |S4    |N1         |2024-08-24 12:01:15.220793|S4_to_I4|{150.0, -1.0}             |-1.0   |30