## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
!pip install pyspark

### Variables' Name Update
I updated the variables' names as below. As I tried too many times, I assumed the log was merged several times (when I called the tweets, the same tweets appeared, increasing the number of them over 200k). The checkpoints and tables must be additionally organized. Thus, additional variables have been addressed.

>>>

BRONZE_CHECKPOINT_2 = USER_DIR + 'bronze2.checkpoint' \
BRONZE_DELTA_2 = USER_DIR + 'bronze2.delta'

SILVER_CHECKPOINT_2 = USER_DIR + 'silver2.checkpoint' \
SILVER_DELTA_2 = USER_DIR + 'silver2.delta'

GOLD_CHECKPOINT_2 = USER_DIR + 'gold2.checkpoint' \
GOLD_DELTA_2 = USER_DIR + 'gold.delta'


In [0]:
%run ./includes/includes


In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
import mlflow
import mlflow.pyfunc
from delta.tables import *
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType, TimestampType
from pyspark.sql.functions import current_timestamp, col, isnan, when, count, desc, expr, explode, split, regexp_replace, udf
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score





## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# 1. Read the Source File Directory Listing
df = get_source_listing_df()
print(df)

# 2. Count the Source Files
file_count = len(df) - 1   # Subtract one to exclude the header row
print(f"Number of source files: {file_count}")

# 3. Print the Contents of One of the Files
if file_count > 0:
    filename = df.iloc[1]['File Name']  # Use the second row which should be the first actual data file if there's a header
    print(f"\nContents of {filename}:")
    file_contents = show_s3_file_contents(filename)
    print(file_contents.decode('utf-8'))  # Assuming the file content is in bytes
else:
    print("No files found in the source directory or only header is present.")

## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# 1. Define the Schema for Raw Data

# Define the schema corresponding to the JSON structure of the source data
schema  = StructType([
    StructField("date", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("source_file", StringType(), True)
])


In [0]:
# 2. Setup a Read Stream for Source Data

# Setup the read stream with whole dataset
raw_data = (
    spark.readStream
    .format("cloudFiles")  # Using cloudFiles format for compatibility with cloud storage
    .option("cloudFiles.format", "json")
    .schema(schema)  # Enforce the schema
    .load(TWEET_SOURCE_PATH)
)


In [0]:
# 3. Transform Raw Data to Bronze Data

# Add processing timestamp and source file path
bronze_data = raw_data.withColumn("processing_time", current_timestamp())


In [0]:
# 4.  Setup a Write Stream to Bronze Delta Table

# # Write the transformed data to a Delta table
query = (
    bronze_data.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", BRONZE_CHECKPOINT_2)
    .option("path", BRONZE_DELTA_2)
    .trigger(processingTime='1 minute')  # Trigger the processing every minute
    .queryName("bronze_stream")  # Named based on the variable requirement
    .start()
)


## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# Load the Bronze Delta Table
bronze_df = spark.read.format("delta").load(BRONZE_DELTA_2)

In [0]:
# Task 1: Count Tweets in Bronze Table

# Count the number of tweets
tweet_count = bronze_df.count()
print(f"Total number of tweets: {tweet_count}")


In [0]:
# Task 2: Identify Null or Nan Values

# Convert timestamp column to double type
bronze_df = bronze_df.withColumn("processing_time", col("processing_time").cast("double"))

# Check for null or NaN values in each column
null_counts = bronze_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in bronze_df.columns])
null_counts.show()


In [0]:
# Task 3: Count Tweets by Each Unique User


# Assuming 'user' is the column with the user's Twitter handle
tweets_by_user = bronze_df.groupBy("user").count().orderBy(desc("count"))
tweets_by_user.show()


In [0]:
# Task 4: Count Tweets with Mentions


# Count tweets with and without mentions
tweets_with_mentions = bronze_df.filter(col("text").contains("@")).count()
tweets_without_mentions = tweet_count - tweets_with_mentions

print(f"Tweets with mentions: {tweets_with_mentions}")
print(f"Tweets without mentions: {tweets_without_mentions}")



In [0]:
# Task 5: Plot a Bar Chart of the Top 20 Tweeters

# Using display in Databricks to show the top 20 tweeters
display(tweets_by_user.limit(20))

# Alternatively, using matplotlib to plot
import matplotlib.pyplot as plt

top_tweeters = tweets_by_user.limit(20).toPandas()  # Convert to Pandas for plotting

plt.figure(figsize=(10, 8))
plt.bar(top_tweeters['user'], top_tweeters['count'], color='skyblue')
plt.xlabel('User')
plt.ylabel('Number of Tweets')
plt.title('Top 20 Tweeters')
plt.xticks(rotation=45)
plt.show()




## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook


#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.


### Variable Name Change
- mention -> potential_mention


In [0]:
# 1.  Setup the read stream for Bronze data
bronze_df = (
    spark.readStream
    .format("delta")
    .load(BRONZE_DELTA_2)
)

In [0]:
# 2. Transform the data
silver_data = bronze_df.select(
    # Convert date string to timestamp
    col("date").cast("timestamp").alias("timestamp"),
    # Explode mentions into separate rows
    explode(split(col("text"), " ")).alias("potential_mention"),
    # Remove mentions from text
    regexp_replace(col("text"), "@\\w+", "").alias("cleaned_text"),
    # Carry over sentiment
    col("sentiment")
).filter(col("potential_mention").startswith("@"))  # Only keep rows where there is a mention


In [0]:
# Write the transformed data to the Silver Delta table
silver_query = (
    silver_data.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", SILVER_CHECKPOINT_2) # updated with checkpoint2
    .option("path", SILVER_DELTA_2 ) # updated with delta2
    .queryName("silver_stream")  # Name the stream
    .trigger(processingTime='1 minute')  # Trigger the processing every minute
    .start()
)


## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

In [0]:
#loading the model from MLFlow Registery using the spark 
sentiment_model = mlflow.pyfunc.spark_udf(spark, model_uri = f"models:/{MODEL_NAME}/production")

In [0]:
# 1. Read from the Silver Delta Table / Checking silver_df shows well

silver_df = spark.readStream.format("delta").load(SILVER_DELTA_2)
# display(silver_df)

In [0]:
#### Test with the mock data

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import random

# Start Spark session
spark = SparkSession.builder.appName("TestDeltaData").getOrCreate()

# Sample data
data = [
    {"date": "2024-05-10", "user": "user1", "text": "I love this product!", "sentiment": "pos", "source_file": "file1.json"},
    {"date": "2024-05-10", "user": "user2", "text": "Not what I expected.", "sentiment": "neg", "source_file": "file2.json"},
    {"date": "2024-05-10", "user": "user3", "text": "Pretty average, nothing special.", "sentiment": "neu", "source_file": "file3.json"},
    {"date": "2024-05-10", "user": "user4", "text": "Worst experience ever!", "sentiment": "neg", "source_file": "file4.json"},
    {"date": "2024-05-10", "user": "user5", "text": "This is amazing!", "sentiment": "pos", "source_file": "file5.json"}
]

# Create DataFrame
tschema = StructType([
    StructField("date", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("source_file", StringType(), True)
])

df = spark.createDataFrame(data, schema=schema)

# Add processing timestamp to mimic bronze layer processing
df = df.withColumn("processing_time", current_timestamp())

# Define path for the test Delta table
test_silver_delta_path = "/tmp/test_silver_delta_table"

# Write the DataFrame to a Delta table
df.write.format("delta").mode("overwrite").save(test_silver_delta_path)



In [0]:
#### TEST WORKING GOOOOOOOODDDDDDD!!

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType, StringType, StructType, StructField
import mlflow.pyfunc

# Define the schema for the output of your UDF
schema = StructType([
    StructField("score", FloatType(), False),
    StructField("label", StringType(), False)
])

# Define the UDF function with the model loading inside it
def predict_sentiment(text):
    # Load the ML model inside the UDF function to avoid scope issues
    model = mlflow.pyfunc.load_model(model_uri=f"models:/{MODEL_NAME}/production")
    # Predict using the loaded model
    prediction = model.predict([text])
    # Assuming prediction structure where the model returns a DataFrame with 'score' and 'label'
    return (float(prediction['score'].iloc[0]), prediction['label'].iloc[0])

# Register the UDF with Spark, using the schema
predict_sentiment_udf = udf(predict_sentiment, schema)

# Usage in the query
test_query = (
    spark.readStream
    .format("delta")
    .load(test_silver_delta_path)
    .withColumn("prediction_results", predict_sentiment_udf(col("text")))
    .withColumn("predicted_score", col("prediction_results.score"))
    .withColumn("predicted_sentiment", col("prediction_results.label"))
    .writeStream
    .format("delta")
    .option("checkpointLocation", "/tmp/test_gold_checkpoint")
    .outputMode("append")
    .start("/tmp/test_gold_delta_table")
)


In [0]:
#### Transform test query to gold delta streaming
#### Read & Write stream have been merged

# Define the schema for the output of your UDF
schema = StructType([
    StructField("score", FloatType(), False),
    StructField("label", StringType(), False)
])

# Define the UDF function with the model loading inside it
def predict_sentiment(text):
    # Load the ML model inside the UDF function to avoid scope issues
    model = mlflow.pyfunc.load_model(model_uri=f"models:/{MODEL_NAME}/production")
    # Predict using the loaded model
    prediction = model.predict([text])
    # Assuming prediction structure where the model returns a DataFrame with 'score' and 'label'
    return (float(prediction['score'].iloc[0]), prediction['label'].iloc[0])

# Register the UDF with Spark, using the schema
predict_sentiment_udf = udf(predict_sentiment, schema)



# Usage in the query
gold_query = (
    spark.readStream
    .format("delta")
    .load(SILVER_DELTA_2)
    .withColumn("prediction_results", predict_sentiment_udf(col("cleaned_text")))
    .withColumn("predicted_score", col("prediction_results.score"))
    .withColumn("predicted_sentiment", col("prediction_results.label"))
    .writeStream
    .format("delta")
    .option("checkpointLocation", GOLD_CHECKPOINT_2)
    .outputMode("append")
    .queryName("gold_stream")
    .trigger(once=True) # Trigger adjusted within "1 minute", "continuous" or "once"
    .start(GOLD_DELTA_2)
)


# gold_query.awaitTermination(300) 
# gold_query.stop()

In [0]:
# Checking if delta points have been created well

dbutils.fs.ls("/tmp/labuser104917-3007337/")

In [0]:
# Optimizing checkpoint directories 
# RESULT : TRUE

# dbutils.fs.rm(USER_DIR + "/gold2.delta", True)
# dbutils.fs.rm(USER_DIR + "/gold2.checkpoint", True)

In [0]:
# # Create table for Gold Delta

# %sql
# DROP TABLE IF EXISTS gold;
# CREATE TABLE gold (
# date TIMESTAMP,
# potential_mention STRING,
# sentiment STRING,
# cleaned_text STRING,
# user STRING,
# source_file_name STRING,
# processing_time_stamp TIMESTAMP
# )
# USING delta
# LOCATION '/tmp/labuser104917-3007337/gold2.delta'

In [0]:
gold_data = spark.read.format("delta").load(GOLD_DELTA_2)
display(gold_data)

predicted_score - score out of 100 from the Hugging Face Sentiment Transformer

predicted_sentiment - string representation of the sentiment

sentiment_id - 0 for negative and 1 for postive associated with the given sentiment

predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# Extract actual and predicted sentiment IDs from the gold data
actual_sentiments = gold_data.select('sentiment_id').toPandas()
predicted_sentiments = gold_data.select('predicted_sentiment_id').toPandas()

# Compute the confusion matrix
confusion_mtx = confusion_matrix(actual_sentiments, predicted_sentiments)

# Create and configure the heatmap for the confusion matrix
plt.figure(figsize=(5, 5)) 
sns.heatmap(confusion_mtx, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted Sentiment IDs")
plt.ylabel("Actual Sentiment IDs")
plt.title("Sentiment Analysis Confusion Matrix")
plt.savefig("updated_confusion_matrix.png")

plt.show()
plt.close()

In [0]:
# Calculate performance metrics
recall_score_value = recall_score(actual_sentiments, predicted_sentiments)
precision_score_value = precision_score(actual_sentiments, predicted_sentiments)
f1_score_value = f1_score(actual_sentiments, predicted_sentiments)

# Retrieve the latest version of the silver Delta table
latest_silver_version = _sqldf.select('version').first()

# Start an MLflow run to log parameters, metrics, and artifacts
with mlflow.start_run():
    mlflow.log_param("MLflow version used", mlflow.__version__)
    mlflow.log_param("Input Silver Table Version", latest_silver_version)
    mlflow.log_param("Name of the Sentiment Model", HF_MODEL_NAME)
    mlflow.log_metric("Precision Score", precision_score_value)
    mlflow.log_metric("Recall Score", recall_score_value)
    mlflow.log_metric("F1 Score", f1_score_value)
    mlflow.log_artifact("updated_confusion_matrix.png")

    

## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer


#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)


In [0]:
# Filtering rows that contain mentions
filtered_mentions = gold_data.filter(gold_data["mentioned_users"] != "")

# Transforming mentions into an array of words
mentions_array = filtered_mentions.select(split(col("mentioned_users"), ",").alias("mentioned_users"))

# Counting all occurrences of each mention
total_mention_count = mentions_array.select(explode(col("mentioned_users")).alias("mentioned_users")).count()

# Output the total count of mentions
print("Total Count of All Mentions (Including Repeats):", total_mention_count)

# Aggregate data by mentions to count sentiment classifications
aggregated_mentions = (gold_data
                       .filter(gold_data["mentioned_users"] != "")
                       .groupBy('mentioned_users')
                       .agg(
                           sum(when(col('predicted_sentiment') == 'POS', 1).otherwise(0)).alias('count_positive'),
                           sum(when(col('predicted_sentiment') == 'NEG', 1).otherwise(0)).alias('count_negative'),
                           sum(when(col('predicted_sentiment') == 'NEU', 1).otherwise(0)).alias('count_neutral'),
                           count("*").alias("total_mentions"),
                           min('timestamp').alias('earliest_mention'), 
                           max('timestamp').alias('latest_mention')
                        )
                       .orderBy(desc('total_mentions'))
                      )


In [0]:
# Display the aggregated DataFrame
display(aggregated_mentions)


In [0]:

# Selecting and visualizing the top 20 positive mentions
top_positive_mentions = (aggregated_mentions
                         .orderBy(desc("count_positive"))
                         .limit(20)
                         )



In [0]:
# Convert Spark DataFrame to Pandas for plotting
pandas_positive_mentions = top_positive_mentions.toPandas()

# Create a bar chart for positive mentions
positive_mentions_fig = px.bar(pandas_positive_mentions, x='mentioned_users', y='count_positive', title='Top 20 Positive Mentions')
positive_mentions_fig.show()


In [0]:
# Selecting and visualizing the top 20 negative mentions
top_negative_mentions = (aggregated_mentions
                         .orderBy(desc("count_negative"))
                         .limit(20)
                         )


In [0]:
# Convert Spark DataFrame to Pandas for plotting
pandas_negative_mentions = top_negative_mentions.toPandas()

# Create a bar chart for negative mentions
negative_mentions_fig = px.bar(pandas_negative_mentions, x='mentioned_users', y='count_negative', title='Top 20 Negative Mentions')
negative_mentions_fig.show()

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()

## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

## Optimization Logs of Spark Application

I've found that running the code to create the Gold Delta table produces constant errors. At first, I assumed I needed more time to make a stream, but looking deeper into the log, I realized that the executors might need adjustment.

df = spark.read.format("image").load("/Workspace/Repos/labuser104917-3007337@vocareum.com/dscc402/final_project/Assets/error_no_space.png")



Therefore, I've been exploring various ways to optimize the process, and the collaborative effort can lead to a solution. 
<br>

### 1. Manage Executor Logs
I've looked for tasks that significantly **spill** data to disk. This typically happens when memory cannot hold all the data required for operations like joins or aggregations.



- Cluster Management: With more nodes, cluster management becomes more complex. Issues such as network partitions or synchronization become more prominent. 






### 3. Minor Optimizations
- Checkpoint Directory Maintenance: Ensure the checkpoint directory is not accumulating excessive metadata or checkpoint files that consume disk space. I removed checkpoint data and cleaned old checkpoints since the streaming was not initiated. 
- Managing Temporary Files: Clearing any unnecessary data or logs that may consume disk space is a necessary task. This includes temporary files, cache, or old application logs. However, the process resulted in FALSE, indicating that the task was not fully successful.

In [0]:
# Optimizing checkpoint directories 
# RESULT: TRUE

# dbutils.fs.rm(USER_DIR + "/gold2.delta", True)
# dbutils.fs.rm(USER_DIR + "/gold2.checkpoint", True)

In [0]:
# Try to reduce temporary files to secure more data usages
# RESULT: FALSE 

# dbutils.fs.rm('/path/to/temp/files', recurse=True)