In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode, round as spark_round
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType

In [15]:
spark = SparkSession.builder \
    .appName("PavementEye Stream") \
    .getOrCreate()

In [16]:
# kafka parameters
kafka_bootstrap_servers = 'kafka:9092'  # kafka:9092 as we are inside the docker network
kafka_topic = 'test' # Can be changed later

In [17]:
# read data from Kafka
kafka_stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .load()

In [18]:
# To be able to see the right parsed value of the message
parse_kafka_stream = kafka_stream_df.selectExpr('CAST(value as STRING) as json_value')

In [22]:
# Define schema for the incoming JSON messages
schema = StructType([
    StructField("lon", DoubleType()),
    StructField("lat", DoubleType()),
    StructField("time", StringType()),
    StructField("labels", ArrayType(
        StructType([
            StructField("label", StringType()),
            StructField("confidence", DoubleType()),
            StructField("x1", DoubleType()),
            StructField("x2", DoubleType()),
            StructField("y1", DoubleType()),
            StructField("y2", DoubleType())
        ])
    ))
])

In [23]:
#Parse JSON string into a structured DataFrame
json_df = parse_kafka_stream.select(from_json(col("json_value"), schema).alias("data"))

In [24]:
#Explode the labels array so each detected object becomes one row
exploded_df = json_df.select(
    col("data.lon"),
    col("data.lat"),
    col("data.time"),
    explode(col("data.labels")).alias("label_struct")
).select(
    col("lon"),
    col("lat"),
    col("time"),
    col("label_struct.label").alias("label"),
    col("label_struct.confidence").alias("confidence"),
    col("label_struct.x1").alias("x1"),
    col("label_struct.x2").alias("x2"),
    col("label_struct.y1").alias("y1"),
    col("label_struct.y2").alias("y2")
)

In [31]:
#Remove empty values
df_no_nulls = exploded_df.na.drop()

In [None]:
# Convert 'time' column from string to timestamp
df_no_nulls = df_no_nulls.withColumn("time", F.col("time").cast(TimestampType()))

# Now deduplicate only on the last 5 minutes
cleaned_df = df_no_nulls \
    .withWatermark("time", "5 minutes") \
    .dropDuplicates(["label", "x1", "y1", "x2", "y2", "lon", "lat"])

In [34]:
#Verify the correctness of the coordinates
df_valid_coords = cleaned_df.filter(
    (col("x1") < col("x2")) &
    (col("y1") < col("y2")) &
    (col("lon") >= -180) & (col("lon") <= 180) &  # التأكد من حدود longitude
    (col("lat") >= -90) & (col("lat") <= 90)      # التأكد من حدود latitude
)

In [None]:
parse_kafka_stream.writeStream\
    .outputMode("append")\
    .format("console")\
    .start()\
    .awaitTermination()

# See value at the docker logs
# When request is made you can see the value

In [None]:
parse_kafka_stream.show(5)