In [0]:
# ENTER YOUR CODE HERE

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os
import sys
import json
import threading

#Pyspark.SQL

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.streaming import StreamingQueryListener


# Delta and MLflow
from delta.tables import DeltaTable
import mlflow
import mlflow.pyfunc

# Utilities
import time
import re
from datetime import datetime
from datetime import timedelta

#Hugging Face:
from transformers import pipeline

# ML-Flow Section 

from sklearn.metrics import (accuracy_score, precision_score, recall_score,
    f1_score, confusion_matrix, classification_report)

#Additional Setup for Date-Time processing:
spark.conf.set("spark.sql.session.timeZone", "UTC")





In [0]:
# Specify the raw tweet path
TWEET_SOURCE_PATH = f"dbfs:/FileStore/tables/raw_tweets/"


USER_NAME = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get().split('@')[0]
USER_DIR = f'/tmp/{USER_NAME}/'
TEST_PATH=USER_DIR + "temp_files/"
BRONZE_CHECKPOINT = USER_DIR + 'bronze.checkpoint'
BRONZE_DELTA = USER_DIR + 'bronze.delta'

SILVER_CHECKPOINT = USER_DIR + 'silver.checkpoint'
SILVER_DELTA = USER_DIR + 'silver.delta'

GOLD_CHECKPOINT = USER_DIR + 'gold.checkpoint'
GOLD_DELTA = USER_DIR + 'gold.delta'

MODEL_NAME = "MuratAL_HF_Sentiment"
#"HF_TWEET_SENTIMENT" #USER_NAME + "_Model"

# https://huggingface.co/finiteautomata/bertweet-base-sentiment-analysis
HF_MODEL_NAME = "finiteautomata/bertweet-base-sentiment-analysis"

In [0]:

# Function to optimize Delta table if it exists
def optimize_table(path):
    if os.path.exists(path):
        DeltaTable.forPath(spark, path).optimize().executeCompaction()
        print(f"Optimized Delta Table at {path}")
    else:
        print(f"Delta Table at {path} does not exist.")

# This routine requires the paths defined in the includes notebook
# and it clears data from the previous run.
def clear_previous_run() -> bool:
    # delete previous run 
    dbutils.fs.rm(BRONZE_CHECKPOINT, True)
    dbutils.fs.rm(BRONZE_DELTA, True)
    dbutils.fs.rm(SILVER_CHECKPOINT, True)
    dbutils.fs.rm(SILVER_DELTA, True)
    dbutils.fs.rm(GOLD_CHECKPOINT, True)
    dbutils.fs.rm(GOLD_DELTA, True)
    return True

def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped


def stop_named_stream(spark: SparkSession, namedStream: str) -> bool:
    stopped = False
    for stream in spark.streams.active:
        if stream.name == namedStream:
            stopped = True 
            stream.stop()
    return stopped

def wait_stream_start(spark: SparkSession, namedStream: str) -> bool:
    started = False
    count = 0
    if started == False and count <= 3:
        for stream in spark.streams.active:
            if stream.name == namedStream:
                started = True
        count += 1
        time.sleep(10)
    return started    

# Function to wait for the Delta table to be ready
def wait_for_delta_table(path, timeout=30, check_interval=2):
    """
    Waits for a Delta table to be available before proceeding.

    Args:
        path (str): Path to the Delta table.
        timeout (int): Maximum wait time in seconds.
        check_interval (int): Time interval to check for table availability.

    Returns:
        bool: True if the table is ready, False otherwise.
    """
    elapsed_time = 0
    while elapsed_time < timeout:
        try:
            if spark.read.format("delta").load(path).count() > 0:
                return True
        except:
            pass
        time.sleep(check_interval)
        elapsed_time += check_interval
    return False

# Function to retrieve streaming statistics
def get_streaming_stats():
    """
    Retrieves streaming statistics such as elapsed time, input row count, and processing time.

    Returns:
        pd.DataFrame: A dataframe containing streaming statistics for active queries.
    """
    data = []
    start_time = None  # Track when the job started

    for q in spark.streams.active:
        progress = q.recentProgress
        if progress:
            for p in progress:
                timestamp = datetime.strptime(p["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")

                # Set the start time on the first iteration
                if start_time is None:
                    start_time = timestamp

                elapsed_time = (timestamp - start_time).total_seconds()  # Convert to seconds

                # Check if 'addBatch' exists in 'durationMs' before accessing it
                processing_time = p["durationMs"].get("addBatch", None) if "durationMs" in p else None

                data.append({
                    "query": q.name,
                    "elapsed_time": elapsed_time,  # Time in seconds since job start
                    "input_rows": p.get("numInputRows", 0),  # Default to 0 if missing
                    "processing_time": processing_time,  # Could be None if not available
                    "memory_used": p.get("aggregatedStateOperators", [{}])[0].get("stateMemory", 0) if p.get("aggregatedStateOperators") else 0
                })

    return pd.DataFrame(data)



In [0]:
#1) Feeding function for the Test folder:
def copy_new_test_files(num_files=50, verbose=True):
    """
    Copy the most recent `num_files` from TWEET_SOURCE_PATH to TEST_PATH.
    Files are renamed using timestamp + counter to simulate streaming ingestion.
    Safe for use with S3/cloudFiles streaming ingestion.
    """
  
    # Ensure test directory exists (don't delete it in streaming context)
    dbutils.fs.mkdirs(TEST_PATH)

    try:
        # Load and convert file listing
        files = dbutils.fs.ls(TWEET_SOURCE_PATH)
        files_df = spark.createDataFrame(files)
    except Exception as e:
        print("❌ Error listing source path files:", str(e))
        return

    # Filter and select most recent JSON files
    top_files = (
        files_df
        .filter(col("name").endswith(".json"))
        .orderBy("modificationTime", ascending=False)
        .limit(num_files)
        .collect()
    )

    if not top_files:
        print("⚠️ No .json files found in source directory.")
        return

    # Timestamp once, then increment counter for filenames
    base_ts = datetime.now().strftime("%Y%m%d%H%M%S")

    for i, f in enumerate(top_files):
        new_name = f"{base_ts}_{i:04d}.json"  # e.g., 20250503_0003.json
        target_path = TEST_PATH + new_name
        dbutils.fs.cp(f.path, target_path)

    if verbose:
        print(f"✅ Copied {len(top_files)} test files to: {TEST_PATH}")


#2) Keep feeding:
# import threading

def continuously_copy_test_files(interval_sec=10, total_rounds=5, num_files=20):
    for _ in range(total_rounds):
        copy_new_test_files(num_files=num_files)
        print(f"📁 Injected {num_files} files.")
        time.sleep(interval_sec)




   


In [0]:

# -----------------------------------------------------------
# 1. Load the model ONCE 
# --------------------------------------------------
   #Sentiment Model 
# hf_sentiment = pipeline("sentiment-analysis",
#                          model=HF_MODEL_NAME,
#                          return_all_scores=True)

def set_hf_sentiment_model_batch_size(batch_size: int):
    """
    Factory to create a pandas UDF for sentiment analysis with given batch_size.
    Returns a pandas UDF named `hf_sentiment`.
    """
    # Initialize the HF sentiment pipeline with the desired batch_size
    hf_sentiment = pipeline(
        "sentiment-analysis",
        model=HF_MODEL_NAME,
        return_all_scores=True,
        batch_size=batch_size
    )
    return hf_sentiment




# -----------------------------------------------------------
# 2. UDF – NEUTRAL SCORES to POS-NEG 
@pandas_udf("struct<label:string, score:double, binary:int>")
def get_sentiment_udf(texts: pd.Series) -> pd.DataFrame:
    all_batches = hf_sentiment(texts.fillna("").tolist())  # List[List[dict]]
    
    labels, scores, binaries = [], [], []

    for score_list in all_batches:
        # Sort descending by score
        sorted_items = sorted(score_list, key=lambda x: -x["score"])
        top = sorted_items[0]
        label = top["label"]
        score = top["score"]

        # Binary logic
        if label == "POS":
            b = 1
        elif label == "NEG":
            b = 0
        elif label == "NEU":
            # NEU is top → check second best
            second = sorted_items[1]["label"]
            if second == "POS":
                b = 1
            else:  # assume second is NEG or fallback
                b = 0
        else:
            # Unexpected label (safety fallback)
            b = 0

        labels.append(label)
        scores.append(score)
        binaries.append(b)

    return pd.DataFrame({
        "label": labels,
        "score": scores,
        "binary": binaries
    })



#TO extract multiple mentions:
@udf(ArrayType(StringType()))
def extract_mentions(text):
    if text:
        return re.findall(r"@\w+", text)
    return []

In [0]:
#3) Stop Streams and Start BRONZE: 

def bronze_restart(source_path=TEST_PATH, delay_between_starts=5):
    
    print("🔄 Stopping all active streams...")

    for stream in spark.streams.active:
        stream.stop()
    print("✅ All active streams stopped.")

    dbutils.fs.rm(BRONZE_CHECKPOINT, recurse=True)
    dbutils.fs.rm(SILVER_CHECKPOINT, recurse=True)
    dbutils.fs.rm(GOLD_CHECKPOINT, recurse=True)
    print("🧹 All checkpoint directories cleared.")

    bronze_schema = StructType([
        StructField("date", StringType(), True),
        StructField("sentiment", StringType(), True),
        StructField("text", StringType(), True),
        StructField("user", StringType(), True),
    ])

    bronze_stream = (
        spark.readStream
            .format("cloudFiles")
            .schema(bronze_schema)
            .option("cloudFiles.format", "json")
            .load(source_path)
            .withColumn("source_file", input_file_name())
            .withColumn("processing_time", current_timestamp())
    )

    bronze_query=(
        bronze_stream.writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", BRONZE_CHECKPOINT)
            .option("mergeSchema", "true")
            .queryName("bronze_stream")
            .start(BRONZE_DELTA)
    )
    print("🚀 Bronze stream restarted.")
    bronze_stream.printSchema()
    return bronze_query

In [0]:
# SILVER STREAM:
def silver_restart(delay_between_starts=5):

    if not wait_for_delta_table(BRONZE_DELTA, timeout=60):
        print("⚠️ Bronze Delta not ready. Skipping Silver stream.")
        return

    time.sleep(delay_between_starts)

    silver_stream_df = (
        spark.readStream
            .format("delta")
            .load(BRONZE_DELTA)
            .withColumn("timestamp", to_timestamp(regexp_replace(col("date"), " [A-Z]{3} ", " "), "EEE MMM dd HH:mm:ss yyyy"))
            .withColumn("mention", explode(extract_mentions(col("text"))))
            .withColumn("cleaned_text", regexp_replace(col("text"), "@\\w+", ""))
            .filter(col("cleaned_text").isNotNull() & (length(trim(col("cleaned_text"))) > 0))
            .select("timestamp", "mention", "cleaned_text", "sentiment")
    )

    silver_query=(
        silver_stream_df.writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", SILVER_CHECKPOINT)
            .option("mergeSchema", "true")
            .queryName("silver_stream")
            .start(SILVER_DELTA)
    )
    print("🚀 Silver stream restarted.")
    silver_stream_df.printSchema()
    return silver_query

In [0]:
 
def gold_restart(delay_between_starts=5):

    if not wait_for_delta_table(SILVER_DELTA, timeout=60):
        print("⚠️ Silver Delta not ready. Skipping Gold stream.")
        return

    time.sleep(delay_between_starts)

    gold_df = (
        spark.readStream
            .format("delta")
            .load(SILVER_DELTA)
            .filter(col("cleaned_text").isNotNull() & (length(trim(col("cleaned_text"))) > 0))
            .withColumn("sentiment_result", get_sentiment_udf(col("cleaned_text")))
         .withColumn("predicted_sentiment",      col("sentiment_result.label"))
         .withColumn("predicted_score",          col("sentiment_result.score"))
         .withColumn("predicted_sentiment_id",   col("sentiment_result.binary"))
         # true label → ID
         .withColumn(
            "sentiment_id",
             expr("CASE WHEN lower(sentiment) = 'positive' THEN 1 ELSE 0 END")
         )
         .drop("sentiment_result")
)

    gold_query=(
        gold_df.writeStream
            .format("delta")
            .outputMode("append")
            .option("checkpointLocation", GOLD_CHECKPOINT)
            .option("mergeSchema", "true")
            .queryName("gold_stream")
            # .trigger(once=True)
            .trigger(processingTime='10 seconds')
            .start(GOLD_DELTA)
    )
    
    print("🚀 Gold stream restarted.")
    gold_df.printSchema()
    return gold_query

In [0]:
def monitor_streams(interval_sec=10):
    """
    Poll Spark streams one time, sleeping interval_sec beforehand,
    and return a list of dicts with keys:
      - timestamp (str)
      - query     (stream.name)
      - input_rows
      - processing_time (ms)
    """
    import time
    from datetime import datetime

    # wait for the interval (so you get fresh data)
    time.sleep(interval_sec)

    metrics = []
    for stream in spark.streams.active:
        prog = stream.lastProgress
        if prog:
            metrics.append({
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "query":      stream.name or "<unnamed>",
                "input_rows": prog.get("numInputRows", 0),
                "processing_time": prog
                     .get("durationMs", {})\
                     .get("addBatch", None)
            })
    return metrics

In [0]:
def plot_stream_metrics(progress_log):
    """
    Given the output of monitor_streams() (a list of dicts),
    clear the current figure and redraw both charts in-place.
    """
    import pandas as pd
    import matplotlib.pyplot as plt
    from IPython.display import display, clear_output



    # turn on interactive mode
    plt.ion()

    df = pd.DataFrame(progress_log)
    if df.empty:
        print("No data to plot yet.")
        return

    # parse timestamps
    df["timestamp"] = pd.to_datetime(df["timestamp"])



    # two subplots: input_rows (top) and processing_time (bottom)
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)

    for name, grp in df.groupby("query"):
        ax1.plot(grp["timestamp"], grp["input_rows"],    label=name)
        ax2.plot(grp["timestamp"], grp["processing_time"], label=name)

    ax1.set_ylabel("Input Rows")
    ax1.set_title("Stream: Input Rows Over Time")
    ax1.legend(); ax1.grid(True)

    ax2.set_ylabel("Processing Time (ms)")
    ax2.set_title("Stream: Processing Time Over Time")
    ax2.legend(); ax2.grid(True)

    # rotate & label the x-axis ticks
    plt.xticks(rotation=30)
    plt.xlabel("Time")

    # force draw and pause so the UI updates
    # clear_output(wait=True)
    display(fig)
    plt.close(fig)

In [0]:


def mlflow_run_calls(
    y_true, y_pred,
    model_name: str,
    silver_delta_path: str,
    run_name: str,
    extra_params: dict = None
):
    """
    Logs a single classification experiment to MLflow.
    - y_true, y_pred: array-like ground truth & predictions
    - model_name: name of your HF model
    - silver_delta_path: path to your Silver Delta table
    - run_name: human-readable name for this run
    - extra_params: any other params (e.g. batch_size) to log
    """

    with mlflow.start_run(run_name=run_name):
        # 1) Metrics
        report = classification_report(y_true, y_pred, output_dict=True)
        mlflow.log_metric("accuracy", report["accuracy"])
        mlflow.log_metric("precision", report["weighted avg"]["precision"])
        mlflow.log_metric("recall",    report["weighted avg"]["recall"])
        mlflow.log_metric("f1_score",  report["weighted avg"]["f1-score"])

        # 2) Confusion matrix
        cm = confusion_matrix(y_true, y_pred)
        fig, ax = plt.subplots(figsize=(6, 5))
        sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", ax=ax)
        ax.set_xlabel("Predicted"); ax.set_ylabel("Actual")
        ax.set_title("Confusion Matrix")
        plt.tight_layout()
        cm_path = "conf_matrix.png"
        fig.savefig(cm_path)
        mlflow.log_artifact(cm_path)
        plt.close(fig)

        # 3) Params
        mlflow.log_param("model_name", model_name)
        mlflow.log_param("mlflow_version", mlflow.__version__)

        if extra_params:
            for k, v in extra_params.items():
                mlflow.log_param(k, v)

        # 4) Silver table version
        silver_table = DeltaTable.forPath(spark, silver_delta_path)
        version = silver_table.history(1).collect()[0]["version"]
        mlflow.log_param("silver_table_version", version)


In [0]:
def load_bronze_data():
    return spark.read.format("delta").load(BRONZE_DELTA)

def load_silver_data():
    return spark.read.format("delta").load(SILVER_DELTA)

def load_gold_data():
    return spark.read.format("delta").load(GOLD_DELTA)


 **Bench Marking HF BatchSize:**
  
| Batch Size | Time (sec) | Tweets per Second |
|------------|------------|-------------------|
| 1          | 51.38      | 9.96              |
| 8          | 16.68      | 30.70             |
| 16         | 13.88      | 36.89             |
| 32         | 16.06      | 31.88             |
| 64         | 12.42      | 41.23             |
| 128        | 12.16      | 42.11             |


In [0]:
#  **Bench Marking HF BatchSize:**
  
# | Batch Size | Time (sec) | Tweets per Second |
# |------------|------------|-------------------|
# | 1          | 51.38      | 9.96              |
# | 8          | 16.68      | 30.70             |
# | 16         | 13.88      | 36.89             |
# | 32         | 16.06      | 31.88             |
# | 64         | 12.42      | 41.23             |
# | 128        | 12.16      | 42.11             |

In [0]:

def test_bencmark_hf_model():
    sample_texts = ["I love this product!"] * 512
    batch_sizes = [1, 8, 16, 32, 64, 128]
    results = []

    for batch_size in batch_sizes:
        pipe = pipeline("sentiment-analysis", model=HF_MODEL_NAME, return_all_scores=True, batch_size=batch_size)
        start_time = time.time()
        pipe(sample_texts)
        elapsed = time.time() - start_time
        results.append({
            "batch_size": batch_size,
            "time_sec": elapsed,
            "tweets_per_sec": len(sample_texts) / elapsed
        })

    benchmark_df = pd.DataFrame(results)
    display(benchmark_df)
    return benchmark_df


In [0]:
import time
from datetime import datetime

def stream_until_idle(query, idle_rounds_required=3, interval_sec=10, verbose=True):
    """
    Monitors a Spark StreamingQuery until it reports 0 input rows
    for `idle_rounds_required` consecutive intervals.
    
    Parameters:
        query:                StreamingQuery object (e.g., gold_query)
        idle_rounds_required: How many idle rounds before stopping
        interval_sec:         How often to poll
        verbose:              Print status messages
    
    Returns:
        List of progress snapshots (dicts)
    """
    idle_rounds = 0
    log = []

    while query.isActive:
        prog = query.lastProgress

        if prog:
            timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
            num_rows = prog.get("numInputRows", 0)
            duration = prog.get("durationMs", {}).get("addBatch", None)

            log_entry = {
                "timestamp": timestamp,
                "input_rows": num_rows,
                "processing_time_ms": duration,
                "query": query.name or "<unnamed>"
            }
            log.append(log_entry)

            if verbose:
                print(f"🕒 {timestamp} | Rows: {num_rows} | Duration: {duration} ms")

            if num_rows == 0:
                idle_rounds += 1
                if verbose:
                    print(f"⚠️ Idle round {idle_rounds}/{idle_rounds_required}")
            else:
                idle_rounds = 0  # reset idle counter on data

            if idle_rounds >= idle_rounds_required:
                if verbose:
                    print("✅ Idle threshold met. Stopping stream...")
                query.stop()
                break
        else:
            if verbose:
                print("⏳ Stream not yet reporting progress...")

        time.sleep(interval_sec)

    return log


In [0]:
print('*********** UTILITES INSTALLED! GOOD TO GO!! *******')
