## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

In [None]:

dbutils.library.restartPython()
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

import time
# Set the notebooks starting time.
START_TIME = time.time()

#/Volumes/voc_catalog/default/voc_volume
# Specify the bucket name, url, and path
TWEET_BUCKET_NAME = 'voc-75-databricks-data'
TWEET_BUCKET_URL = f"https://{TWEET_BUCKET_NAME}.s3.amazonaws.com/"
TWEET_SOURCE_PATH = f"s3a://{TWEET_BUCKET_NAME}/voc_volume/"

# setup storage for this user
#username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]

USER_NAME = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get().split('@')[0]
USER_DIR = f'/tmp/{USER_NAME}/'

BRONZE_CHECKPOINT = USER_DIR + 'bronze.checkpoint'
BRONZE_DELTA = USER_DIR + 'bronze.delta'

SILVER_CHECKPOINT = USER_DIR + 'silver.checkpoint'
SILVER_DELTA = USER_DIR + 'silver.delta'

GOLD_CHECKPOINT = USER_DIR + 'gold.checkpoint'
GOLD_DELTA = USER_DIR + 'gold.delta'

MODEL_NAME = "HF_TWEET_SENTIMENT" #USER_NAME + "_Model"

# https://huggingface.co/finiteautomata/bertweet-base-sentiment-analysis
HF_MODEL_NAME = "finiteautomata/bertweet-base-sentiment-analysis"


from pyspark.sql.session import SparkSession
import time
import boto3, botocore
import pandas as pd

# Return a data frame with the files in the source directory
def get_source_listing_df() -> pd.DataFrame:
    # Create a boto3 resource for S3 using anonymous credentials
    s3 = boto3.resource('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))

    # Create a Bucket object
    bucket = s3.Bucket(TWEET_BUCKET_NAME)

    # List objects in the bucket
    objects = [obj.key for obj in bucket.objects.all()]

    # Convert the list of objects to a Pandas DataFrame
    df = pd.DataFrame(objects, columns=['File Name'])

    return df

# Show the contents of a file stored in S3
def show_s3_file_contents(filename: str) -> str:
    # Create a boto3 resource for S3 using anonymous credentials
    s3 = boto3.resource('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))
    # Show the first record
    obj = s3.Object(TWEET_BUCKET_NAME, filename)
    data=obj.get()['Body'].read()
    return(data)

# This routine requires the paths defined in the includes notebook
# and it clears data from the previous run.
def clear_previous_run() -> bool:
    # delete previous run 
    dbutils.fs.rm(BRONZE_CHECKPOINT, True)
    dbutils.fs.rm(BRONZE_DELTA, True)
    dbutils.fs.rm(SILVER_CHECKPOINT, True)
    dbutils.fs.rm(SILVER_DELTA, True)
    dbutils.fs.rm(GOLD_CHECKPOINT, True)
    dbutils.fs.rm(GOLD_DELTA, True)
    return True

def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped


def stop_named_stream(spark: SparkSession, namedStream: str) -> bool:
    stopped = False
    for stream in spark.streams.active:
        if stream.name == namedStream:
            stopped = True 
            stream.stop()
    return stopped

def wait_stream_start(spark: SparkSession, namedStream: str) -> bool:
    started = False
    count = 0
    if started == False and count <= 3:
        for stream in spark.streams.active:
            if stream.name == namedStream:
                started = True
        count += 1
        time.sleep(10)
    return started    

from pyspark.sql.types import StructType, StructField, StringType, TimestampType


## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
# ENTER YOUR CODE HERE
# Essential Spark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Delta Lake for data management within Spark
from delta.tables import *

# MLflow for model tracking and management
import mlflow
from mlflow.tracking import MlflowClient

# Libraries for handling JSON data and requests (if accessing APIs or similar)
import json
import requests

# For advanced data handling with Pandas (if necessary)
import pandas as pd

# For datetime operations
from datetime import datetime

# Importing specific machine learning models and transformers from Hugging Face
from transformers import AutoModelForSequenceClassification, AutoTokenizer




0,1,2
Variable Name,Value,Description
TWEET_BUCKET_NAME,voc-75-databricks-data,AWS S3 Bucket Name where the tweets are coming into your system.
TWEET_BUCKET_URL,https://voc-75-databricks-data.s3.amazonaws.com/,AWS S3 Bucket URL where the tweets are coming into your system.
TWEET_SOURCE_PATH,s3a://voc-75-databricks-data/voc_volume/,AWS S3 Path where the tweets are coming into your system.
USER_DIR,/tmp/labuser104917-2387441/,Path to the local storage (dbfs) for your project.
BRONZE_CHECKPOINT,/tmp/labuser104917-2387441/bronze.checkpoint,Store your Bronze Checkpoint data here.
BRONZE_DELTA,/tmp/labuser104917-2387441/bronze.delta,Store your Bronze Delta Table here.
SILVER_CHECKPOINT,/tmp/labuser104917-2387441/silver.checkpoint,Store your Silver Checkpoint data here.
SILVER_DELTA,/tmp/labuser104917-2387441/silver.delta,Store your Silver Delta Table here.
GOLD_CHECKPOINT,/tmp/labuser104917-2387441/gold.checkpoint,Store your Gold Checkpoint data here.


the includes are included


## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# ENTER YOUR CODE HERE
source_files_df = get_source_listing_df()

# Display the DataFrame to see the listing
print("Directory Listing:")
print(source_files_df)

# Count the source files
file_count = len(source_files_df)
print(f"Total number of source files: {file_count}")

# Print the contents of one of the files if there are any files
if file_count > 0:
    # Get the first file name from the DataFrame
    filename = source_files_df.iloc[0]['File Name']
    print(f"Contents of the file {filename}:")
    file_contents = show_s3_file_contents(filename)
    print(file_contents)
else:
    print("No files found in the directory.")


## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Define the schema for raw data
schema = StructType([
    StructField("id", StringType(), True),
    StructField("text", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_name", StringType(), True),
])

# Setup the read stream
source_path = TWEET_SOURCE_PATH
read_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema(schema)
    .load(source_path)
)

# Setup the write stream
checkpoint_path = BRONZE_CHECKPOINT
output_path = BRONZE_DELTA
bronze_stream = (
    read_stream
    .writeStream
    .format("delta")
    .option("checkpointLocation", checkpoint_path)
    .option("path", output_path)
    .outputMode("append")
    .option("mergeSchema", "true")
    .queryName("bronze_stream")
    .start()
)

## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

from pyspark.sql import functions as F
from pyspark.sql.functions import col, isnan, when, count

# Load the bronze Delta Table
bronze_df = spark.read.format("delta").load(BRONZE_DELTA)

# Task 1: How many tweets are captured in your Bronze Table?
total_tweets = bronze_df.count()
print(f"Total tweets in Bronze Table: {total_tweets}")

# Updated Task 2: Check for Null values, excluding NaN check for non-numeric data types
nan_null_counts = bronze_df.select([
    count(when(
        (col(c).isNull()) | 
        ((col(c).cast("double").isNotNull()) & isnan(col(c).cast("double"))), c)
    ).alias(c) for c in bronze_df.columns])
nan_null_counts.show()

# Suggest a remedy for NaN/Null values for silver transforms
print("Remedy: Replace NaN/Null values with default values or drop the rows/columns based on the analysis needs.")

# Task 3: Count the number of tweets by each unique user handle and sort the data by descending count
tweet_counts_by_user = bronze_df.groupBy("user_name").count().orderBy(F.desc("count"))
tweet_counts_by_user.show()

# Task 4: Calculate the number of tweets with and without mentions
tweets_with_mentions = bronze_df.filter(bronze_df["text"].contains("@")).count()
tweets_without_mentions = total_tweets - tweets_with_mentions
print(f"Tweets with mentions: {tweets_with_mentions}")
print(f"Tweets without mentions: {tweets_without_mentions}")

# Task 5: Plot a bar chart of the top 20 tweeters
import matplotlib.pyplot as plt


# First, convert the Spark DataFrame to a Pandas DataFrame for plotting
top_tweeters = tweet_counts_by_user.withColumn("user_name", F.col("user_name").cast("string")) \
    .na.drop(subset=["user_name"]) \
    .limit(20).toPandas()

# Plotting the bar chart
plt.figure(figsize=(12, 8))  # Set the figure size
plt.bar(top_tweeters['user_name'], top_tweeters['count'], color='blue')  # Create a bar chart
plt.xlabel('User Name')  # Label on X-axis
plt.ylabel('Number of Tweets')  # Label on Y-axis
plt.title('Top 20 Tweeters')  # Title of the plot
plt.xticks(rotation=45, ha="right")  # Rotate x-axis labels for better visibility
plt.tight_layout()  # Automatically adjust subplot parameters to give specified padding
plt.show()  # Display the plot

## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.functions import col

# Read stream setup from Bronze Delta Table
bronze_df = spark.readStream.format("delta").option("path", BRONZE_DELTA).load()

# Data transformations as defined in your notebook
transformed_bronze_df = bronze_df.filter(col("user_name").isNotNull() & (col("user_name") != ""))

# Write stream setup to append to the Silver Delta Table
silver_stream = (
    transformed_bronze_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", SILVER_CHECKPOINT)
    .option("path", SILVER_DELTA)
    .outputMode("append")
    .queryName("silver_stream")
    .start()
)


## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
import mlflow.pyfunc

# Read from the silver delta table
silver_df = spark.readStream.format("delta").option("path", SILVER_DELTA).load()

# Load the model from MLflow
model_uri = f"models:/{MODEL_NAME}/Production"
sentiment_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri, result_type=StringType())

# UDF to apply the model
@udf(StringType())
def analyze_sentiment(text):
    try:
        return sentiment_model.predict(text)
    except Exception as e:
        return str(e)  # Error handling

# Transform silver data by applying the UDF
gold_df = silver_df.withColumn("sentiment", analyze_sentiment(col("text")))

# Write transformed data to the gold delta table
gold_stream = (
    gold_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", GOLD_CHECKPOINT)
    .option("path", GOLD_DELTA)
    .outputMode("append")
    .queryName("gold_stream")
    .start()
)

## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# ENTER YOUR CODE HERE
import mlflow
from pyspark.sql.functions import col
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, confusion_matrix

# Start MLflow run
import mlflow
from mlflow.tracking import MlflowClient

# Set up MLflow client and experiment

import mlflow
from mlflow.tracking import MlflowClient

# Set up MLflow client and experiment
client = MlflowClient()
experiment_name = "/Workspace/Users/labuser104917-3135302@vocareum.com/dscc202-402-spring2024-main/final_project/EXP11"
experiment = mlflow.get_experiment_by_name(experiment_name)
if experiment is None:
    experiment_id = mlflow.create_experiment(experiment_name)
    experiment = mlflow.get_experiment(experiment_id)
    # No need to set the experiment as mlflow.start_run will automatically do it later
else:
    experiment_id = experiment.experiment_id
    # Directly using the existing experiment ID without setting the experiment

with mlflow.start_run(experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sklearn.metrics import confusion_matrix, classification_report
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Mock-up: Assuming 'sentiment' as both actual and predicted for the sake of demonstration
gold_df = spark.read.format("delta").load(GOLD_DELTA)  # Load your gold delta table

# Create a temporary view to use SQL
gold_df.createOrReplaceTempView("gold_data")
gold_df = gold_df.withColumn("predictions", col("sentiment"))

pdf = gold_df.select("sentiment", "predictions").toPandas()

# Rename columns for clarity
pdf.columns = ['actual', 'predicted']

# Sample query to mimic actual and predicted columns (this is just for demonstration)
gold_df = spark.sql("""
SELECT sentiment as actual, sentiment as predictions FROM gold_data
""")

if pdf['actual'].nunique() > 1:  # Ensuring there's more than one unique class
    report = classification_report(pdf['actual'], pdf['predicted'], output_dict=True, zero_division=0)
    precision = report['weighted avg']['precision']
    recall = report['weighted avg']['recall']
    f1_score = report['weighted avg']['f1-score']

    # Log metrics to MLflow
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)
    mlflow.log_metric("f1-score", f1_score)

    # Generate and log the confusion matrix
    conf_matrix = confusion_matrix(pdf['actual'], pdf['predicted'])
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt="d")
    plt.title("Confusion Matrix")
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    confusion_matrix_path = "confusion_matrix.png"
    plt.savefig(confusion_matrix_path)
    plt.close()
    mlflow.log_artifact(confusion_matrix_path)
else:
    print("Insufficient data variety for meaningful classification metrics.")
    
# Display the distribution of actual and predicted labels
print(gold_df.groupBy('actual').count().show())
print(gold_df.groupBy('predictions').count().show())

# Calculate classification metrics with zero_division handling
report = classification_report(pdf['actual'], pdf['predictions'], output_dict=True, zero_division=0)

import numpy as np

# Check if confusion matrix is not empty and plot if valid
if conf_matrix.size != 0 and np.any(conf_matrix):
    plt.figure(figsize=(10, 8))
    sns.heatmap(conf_matrix, annot=True, fmt="d")
    plt.title("Confusion Matrix")
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    confusion_matrix_path = "confusion_matrix.png"
    plt.savefig(confusion_matrix_path)
    plt.close()

    mlflow.log_artifact(confusion_matrix_path)
else:
    print("Not enough data to plot a confusion matrix.")



# Log model name and version, and Delta table version
model_name = "Sentiment_Analyzer"
model_version = "v1"
input_silver_version = "1"

mlflow.log_param("model_name", model_name)
mlflow.log_param("model_version", model_version)
mlflow.log_param("input_silver_version", input_silver_version)


## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.types import StructType, StructField, StringType, TimestampType


from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

from pyspark.sql.functions import regexp_extract

# Add a 'mention' column extracting the first mention in the tweet
# Adjust the regular expression as necessary based on your data specifics
gold_df = spark.read.format("delta").load(GOLD_DELTA)  # Load your gold delta table
gold_df = gold_df.withColumn("mention", regexp_extract("text", r"(@\w+)", 1))


from pyspark.sql.functions import sum as sql_sum

from pyspark.sql.functions import col, count, when

# Aggregate counts of each sentiment type per mention
mention_counts = gold_df.groupBy("mention").agg(
    count(when(col("sentiment") == "neutral", True)).alias("neutral_count"),
    count(when(col("sentiment") == "positive", True)).alias("positive_count"),
    count(when(col("sentiment") == "negative", True)).alias("negative_count")
).withColumn("total_mentions", col("neutral_count") + col("positive_count") + col("negative_count"))

# Sort by total mentions in descending order
sorted_mentions = mention_counts.orderBy(col("total_mentions").desc())

import matplotlib.pyplot as plt

def plot_top_mentions(df, sentiment_column, title):
    # Take the top 20 mentions for the specified sentiment
    top_mentions = df.orderBy(col(sentiment_column).desc()).limit(20).toPandas()

    # Plotting
    plt.figure(figsize=(10, 6))
    plt.bar(top_mentions['mention'], top_mentions[sentiment_column], color='blue' if sentiment_column == "positive_count" else 'red')
    plt.xlabel('Mentions')
    plt.ylabel('Counts')
    plt.title(title)
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    plt.show()

# Plot for positive and negative sentiments
plot_top_mentions(sorted_mentions, "positive_count", "Top 20 Mentions with Positive Sentiment")
plot_top_mentions(sorted_mentions, "negative_count", "Top 20 Mentions with Negative Sentiment")

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
# ENTER YOUR CODE HERE
from pyspark.sql.streaming import StreamingQueryManager
import time

# Stop all active streams
active_streams = spark.streams.active
for stream in active_streams:
    stream.stop()
    print(f"Stopped stream: {stream.name}")

# Confirm all streams are stopped
if not spark.streams.active:
    print("All streams have been stopped.")
else:
    print("Some streams are still running.")

# Calculate and print elapsed time
END_TIME = time.time()
elapsed_time = END_TIME - START_TIME  # Ensure START_TIME is imported or defined
print(f"Elapsed time: {elapsed_time:.2f} seconds.")

In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()

## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

### ENTER YOUR MARKDOWN HERE

In [None]:
# Spill What to Look For: Check for tasks that have a high memory spill to disk. This typically occurs when the executor's memory is insufficient for the tasks at hand, causing Spark to spill data to disk to free up memory. Spark UI: Navigate to the "Storage" tab to see if any RDDs are spilling to disk. Look at the "Task" tab for memory metrics associated with each task. High spill rates can drastically slow down your application.

# Skew What to Look For: Look for uneven distribution of tasks across partitions, which can result in some executors doing more work than others. This often leads to delays as faster executors wait for slower ones. Spark UI: The "Stage" tab will show you detailed information on task duration per partition. Large variations in task times or sizes can indicate skew.

# Shuffle What to Look For: Shuffle operations occur when data needs to be redistributed across different executors, usually after wide transformations like groupBy or reduceByKey. High shuffle read and write times can indicate network bottlenecks. Spark UI: The "Shuffle Read" and "Shuffle Write" metrics under the "Executors" tab will be critical here. High shuffle data volumes or significant time spent in shuffle can be red flags.

# Storage What to Look For: Inefficiencies can arise from how data is stored and retrieved, particularly if your data format or the size of files is suboptimal (e.g., many small files). Spark UI: Examine the "Storage" tab to review data format, serialization, and disk utilization. Metrics to watch include disk read/write times and the number of files.

# Serialization What to Look For: Inefficient serialization/deserialization can lead to significant overhead. This is especially true when transmitting data across the network or writing to/reading from disk. Spark UI: Look at the "Environment" tab to understand serialization configurations and possibly the "Executors" tab for task deserialization time.