In [None]:
#start Spark Session

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Fusion-Spark-Streaming")
    .config("spark.sql.shuffle.partitions", 32)
    .config("spark.streaming.backpressure.enabled", "true")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

In [None]:
#Import Modules

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType, IntegerType
from src.preprocessing.lemma_tokenizer import lemma_tokenize
from src.utils.tech_mapper import map_technology
from src.streaming.microbatch_stream import (
    LatencyTracker,
    log_streaming_metrics
)

In [None]:
# Streaming Source (Flume/HDFS tail)

"""
Streaming source aligned with Flume → HDFS pipeline.
Hadoop fs -tail -f \
hdfs://192.168.1.165:9000/user/flume/ml/FlumeData.*
"""

input_path = "hdfs://192.168.1.165:9000/user/flume/ml/"

stream_df = (
    spark.readStream
    .format("json")
    .option("maxFilesPerTrigger", 1)
    .load(input_path)
)


In [None]:
# Micro-Batch Interval (Δt = 5 sec)

MICRO_BATCH_INTERVAL = 5  # seconds 

token_udf = udf(lemma_tokenize, ArrayType(StringType()))
tech_udf = udf(map_technology, IntegerType())

processed_stream = (
    stream_df
    .withColumn("tokens", token_udf(col("text")))
    .withColumn("tech_id", tech_udf(col("text")))
)


In [None]:
# For each Batch Metrics
from pyspark.sql import DataFrame

def process_microbatch(batch_df: DataFrame, batch_id: int):
    """
    Implements B_k processing and metric tracking.
    """

    tracker = LatencyTracker()
    tracker.start()

    batch_size = batch_df.count()

    # --- Write to HDFS ---
    output_path = "hdfs://192.168.1.165:9000/user/flume/output/"

    (
        batch_df
        .write
        .mode("append")
        .csv(output_path, header=True)
    )

    tracker.stop()

    metrics = log_streaming_metrics(
        batch_size=batch_size,
        batch_interval=MICRO_BATCH_INTERVAL,
        latency=tracker.latency()
    )

    print(f"[Batch {batch_id}] Metrics:", metrics)


In [None]:
# Start Streaming Query

query = (
    processed_stream.writeStream
    .foreachBatch(process_microbatch)
    .trigger(processingTime=f"{MICRO_BATCH_INTERVAL} seconds")
    .start()
)

query.awaitTermination()
