In [None]:
# Κελί 1: Δημιουργία του SparkSession
from pyspark.sql import SparkSession

spark_session = SparkSession.builder \
    .appName("VehiclesDataFromKafka") \
    .getOrCreate()


In [None]:
# Κελί 2: Ορισμός της δομής των JSON δεδομένων
from pyspark.sql.types import StructType, StringType, FloatType

json_schema = StructType() \
    .add("name", StringType()) \
    .add("orig", StringType()) \
    .add("dest", StringType()) \
    .add("time", StringType()) \
    .add("link", StringType()) \
    .add("position", FloatType()) \
    .add("spacing", FloatType()) \
    .add("speed", FloatType())


In [None]:
# Κελί 3: Διάβασμα των JSON δεδομένων από το Kafka
dataframe_kafka = spark_session \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "vehicle_positions") \
    .option("startingOffsets", "earliest") \
    .option("logLevel", "ERROR") \
    .load()


In [None]:
# Κελί 4: Μετατροπή των JSON δεδομένων σε DataFrame
from pyspark.sql.functions import from_json, col

json_to_dataframe = dataframe_kafka.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", json_schema).alias("data")) \
    .select(
        col("data.link").alias("link"),
        col("data.time").alias("time"),
        col("data.speed").alias("speed")
    )   


In [None]:
# Κελί 5: Ορισμός συνάρτησης για αποθήκευση των δεδομένων στη MongoDB
def save_to_mongodb(batch_df, epoch_id):
    batch_df.write \
        .format("mongo") \
        .mode("overwrite") \
        .option("database", "MyVehiclesData") \
        .option("collection", "vehiclesData") \
        .option("uri", "mongodb://localhost:27017") \
        .save()

In [None]:
# Κελί 6: Εκκίνηση της επεξεργασίας streaming και αποθήκευση των δεδομένων στη MongoDB
selected_data_stream = json_to_dataframe \
    .writeStream \
    .foreachBatch(save_to_mongodb) \
    .outputMode("append") \
    .start()

selected_data_stream.awaitTermination()

In [None]:
# Εμφάνιση του DataFrame
json_to_dataframe.show()
