## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
from datetime import datetime as dt
from datetime import timedelta
import json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType,  IntegerType
from pyspark.sql.functions import input_file_name, current_timestamp
import time
from pyspark.sql.functions import col, regexp_extract, regexp_replace,lit, count, coalesce, to_timestamp, pandas_udf, PandasUDFType, date_format
import mlflow.pyfunc
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F
from pyspark.sql.functions import when

log4j = spark._jvm.org.apache.log4j
log4j.LogManager.getLogger("org.apache.spark.api.python.PythonGatewayServer").setLevel(log4j.Level.ERROR)
log4j.LogManager.getLogger("py4j").setLevel(log4j.Level.ERROR)


## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# cached_entries = dbutils.fs.ls(TWEET_SOURCE_PATH)
# # Read the source file directory listing
# display(cached_entries[:10])
# total_entries = len(cached_entries)
# 
# print(f"Total entries (files and directories): {total_entries}")
# # print the contents of one of the files
# bdf=spark.read.format('json').option("inferSchema","True").load(TWEET_SOURCE_PATH+'1.json')
# display(bdf)
# #print schema
# bdf.printSchema()

#### * Read the source file directory listing
#### * Count the source files (how many are there?)

In [0]:
metadata_table_path = "/tmp/labuser104917-3025809/metadataraw.delta"

# Define the schema
schema = StructType([
    StructField("total_files", IntegerType(), True)
])

# Check if the metadata table exists and create it if not
if not spark._jsparkSession.catalog().tableExists("metadata"):
    empty_df = spark.createDataFrame([], schema)
    empty_df.write.format("delta").option("path", metadata_table_path).saveAsTable("metadata")

def update_metadata_table(source_path, table_path):
    entries = dbutils.fs.ls(source_path)
    total_entries = len(entries)

    # Create a DataFrame with the correct schema. Note the [(total_entries,)] which ensures it's a list of tuples.
    new_data = spark.createDataFrame([(total_entries,)], schema)
    
    # Overwrite the existing metadata table with the new data
    new_data.write.format("delta").mode("overwrite").option("path", table_path).saveAsTable("metadata")

# Update the metadata table
#update_metadata_table(TWEET_SOURCE_PATH, metadata_table_path)


In [0]:

# Read and display the metadata
metadata_df = spark.read.format("delta").load(metadata_table_path)
total_entries = metadata_df.select("total_files").collect()[0][0] - 1

# Print total_entries
print(f"****************************The total number of entries is: {total_entries}************************************")

#### * print the contents of one of the files

In [0]:
# print the contents of one of the files
bdf=spark.read.format('json').option("inferSchema","True").load(TWEET_SOURCE_PATH+'1.json')
display(bdf)
#print schema
bdf.printSchema()

## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# dbutils.fs.rm(BRONZE_DELTA, recurse=True)
# dbutils.fs.rm(BRONZE_CHECKPOINT, recurse=True)
# dbutils.fs.rm(SILVER_DELTA, recurse=True)
# dbutils.fs.rm(SILVER_CHECKPOINT, recurse=True)
# dbutils.fs.rm(GOLD_DELTA, recurse=True)
# dbutils.fs.rm(GOLD_CHECKPOINT, recurse=True)


In [0]:
schema = StructType([ # schema definition
    StructField("date", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("text", StringType(), True),
    StructField("user", StringType(), True)
])

In [0]:
# Read the JSON data using the defined schema
bronze_raw_df = (
    spark
    .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("maxFilesPerTrigger", 5)
    .schema(schema)
    .load(TWEET_SOURCE_PATH)
)

## Bronze Data - raw ingest<br>
date - string in the source json<br>
user - string in the source json<br>
text - tweet string in the source json<br>
sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json<br>
source_file - the path of the source json file the this row of data was read from<br>
processing_time - a timestamp of when you read this row from the source json<br>


In [0]:
# Adding source_file and processing_time + columns in raw data
bronze_df = bronze_raw_df.withColumn("source_file", input_file_name()) \
                     .withColumn("processing_time", current_timestamp())


In [0]:
# Brinze stream - read
bronze_stream = (
    bronze_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", BRONZE_CHECKPOINT)
    .option("mergeSchema", "true")
    .queryName("bronze_stream")
    .start(BRONZE_DELTA)
)


Next function will help to know if any of the 3 streams have initialized their respective delta tables

In [0]:
def is_delta_table_initialized(table_path):
    try:
        # Attempt to load the Delta table
        spark.read.format("delta").load(table_path)
        return True
    except Exception as e:
        print(f"Delta table not available yet: {e}")
        return False

# Function to get the current row count of the Delta table
def get_row_count(table_path):
    try:
        return spark.read.format("delta").load(table_path).count()
    except Exception:
        return 0  # Return 0 if unable to read the table

Loop to monitor the stream and stop it based on row count

In [0]:
# Loop to wait for the Delta table to be initialized
while not is_delta_table_initialized(BRONZE_DELTA):
    print("Waiting for Delta table to be initialized...")
    time.sleep(5)  # Check every minute


max_rows = total_entries
while bronze_stream.isActive:
    current_row_count = get_row_count(BRONZE_DELTA)
    print(current_row_count)
    if current_row_count < max_rows:
        time.sleep(300)  # Sleep for 5 minutes before checking again
        if current_row_count >= max_rows:
            bronze_stream.stop()
            print(f"Stream stopped after reaching {current_row_count} rows.")
        else:
            print(f"Just {current_row_count} rows.")
    else:
        bronze_stream.stop()
        print(f"You have the rows there already!!.")
        break


In [0]:
bronze_stream.awaitTermination()
display(dbutils.fs.ls(BRONZE_DELTA))

In [0]:
df = spark.read.format('delta').load(BRONZE_DELTA) # reading bronze delta table in different ways
display(df)

In [0]:
%sql
SELECT * FROM bronze_delta LIMIT 500;


## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# How many tweets are captured in your Bronze Table?
total_tweets = spark.sql("SELECT count(*) as total_count from bronze_delta").collect()[0]["total_count"]
print("Total number of tweets in Bronze Table:", total_tweets)

In [0]:
# Are there any columns that contain Nan or Null values? If so how many and what will you do in your silver transforms to address this?

result = spark.sql("""
  SELECT
    SUM(CASE WHEN `date` IS NULL THEN 1 ELSE 0 END) AS null_count_date,
    SUM(CASE WHEN sentiment IS NULL THEN 1 ELSE 0 END) AS null_count_sentiment,
    SUM(CASE WHEN text IS NULL THEN 1 ELSE 0 END) AS null_count_text,
    SUM(CASE WHEN user IS NULL THEN 1 ELSE 0 END) AS null_count_user,
    SUM(CASE WHEN source_file IS NULL THEN 1 ELSE 0 END) AS null_count_source_file,
    SUM(CASE WHEN processing_time IS NULL THEN 1 ELSE 0 END) AS null_count_processing_time
  FROM
    bronze_delta
""")

result.show()



Well, it looks like there are no null values in the data, pretty clean.

In [0]:
# Count the number of tweets by each unique user handle and sort the data by descending count.
users_tweets = spark.sql("""
  SELECT user, COUNT(*) AS tweet_count
  FROM bronze_delta
  GROUP BY user
  ORDER BY tweet_count DESC
""")

users_tweets.show()

In [0]:
# How many tweets have at least one mention (@) how many tweet have no mentions (@)
result = spark.sql("""
  SELECT 
    SUM(CASE WHEN text LIKE '%@%' THEN 1 ELSE 0 END) AS tweets_with_mention,
    SUM(CASE WHEN text NOT LIKE '%@%' THEN 1 ELSE 0 END) AS tweets_without_mention
  FROM bronze_delta
""")

result.show()

In [0]:
# Plot a bar chart that shows the top 20 tweeters (users)

# Get the top 20 users
top_users = users_tweets.limit(20).toPandas()

# Create a bar chart
plt.bar(top_users['user'], top_users['tweet_count'])
plt.xlabel('User')
plt.ylabel('Tweet Count')
plt.title('Top 20 Tweeters')

# Rotate x-axis labels for better readability (optional)
plt.xticks(rotation=90)

# Display the chart
plt.show()

## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
bronze_stream_df = (
    spark
    .readStream
    .format("delta")
    .option("cloudFiles.format", "json")
    .option("maxFilesPerTrigger", 5)
    .load(BRONZE_DELTA)
)


### Silver Data - Bronze Preprocessing<br>
timestamp - convert date string in the bronze data to a timestamp<br>
mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.<br>
cleaned_text - the bronze text data with the mentions (@username) removed.<br>
sentiment - the given sentiment that was associated with the text in the bronze table.<br>


In [0]:
# In case there would be any null value I am imputing the data with the mode in each column
mode_date_df = spark.sql("""
    SELECT date
    FROM bronze_delta
    GROUP BY date
    ORDER BY COUNT(*) DESC
    LIMIT 1
""")

mode_date = mode_date_df.first()["date"]

mode_date_str = mode_date
print("Mode Date:", mode_date_str)


mode_user_df = spark.sql("""
    SELECT user
    FROM bronze_delta
    GROUP BY user
    ORDER BY COUNT(*) DESC
    LIMIT 1
""")

mode_user = mode_user_df.first()["user"]

mode_user_str = mode_user
print("Mode user:", mode_user_str)

mode_sentiment_df = spark.sql("""
    SELECT sentiment
    FROM bronze_delta
    GROUP BY sentiment
    ORDER BY COUNT(*) DESC
    LIMIT 1
""")

mode_sentiment = mode_sentiment_df.first()["sentiment"]

mode_sentiment_str = mode_sentiment
print("Mode sentiment:", mode_sentiment_str)


In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
silver_df = bronze_stream_df.withColumn("date", coalesce(col("date"), lit(mode_date)))
original_format = "EEE MMM dd HH:mm:ss zzz yyyy"

silver_df = silver_df.withColumn("timestamp", to_timestamp(col("date"), original_format))
desired_format = "yyyy-MM-dd"
silver_df = silver_df.withColumn("timestamp", date_format(col("timestamp"), desired_format))

silver_df = silver_df.withColumn("mention", regexp_extract(col("text"), "@([A-Za-z0-9_]+)", 1))
silver_df = silver_df.withColumn("cleaned_text", regexp_replace(col("text"), "@[A-Za-z0-9_]+", ""))
silver_df = silver_df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "��", ""))
silver_df = silver_df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "&[a-zA-Z]+;", "")) 
silver_df = silver_df.withColumn("cleaned_text", regexp_replace(col("cleaned_text"), "\\p{C}", ""))
silver_df = silver_df.withColumn("sentiment", col("sentiment"))
silver_df = silver_df.select("timestamp", "mention", "cleaned_text", "sentiment")

I have decided to take the same number of partitions bronze delta had to repartition silver_df, for some reason it just had one big partition an it gave a lot of trouble.

In [0]:

repartitioned_df = silver_df.repartition(6500)
silver_stream = (
    repartitioned_df
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", SILVER_CHECKPOINT)
    .option("mergeSchema", "true")
    .trigger(processingTime='2 minutes')
    .queryName("silver_stream")
    .start(SILVER_DELTA)
)

In [0]:

# Loop to wait for the Delta table to be initialized
while not is_delta_table_initialized(SILVER_DELTA):
    print("Waiting for Delta table to be initialized...")
    time.sleep(5)  # Check every minute

# Loop to monitor the stream and stop it based on row count
max_rows = total_entries
while silver_stream.isActive:
    current_row_count = get_row_count(SILVER_DELTA)
    print(current_row_count)
    if current_row_count < max_rows:
        time.sleep(600)  # Sleep for 5 minutes before checking again
        current_row_count = get_row_count(SILVER_DELTA)
        if current_row_count >= max_rows:
            silver_stream.stop()
            print(f"Stream stopped after reaching {current_row_count} rows.")
    else:
        silver_stream.stop()
        print(f"You have the rows there already!!.")
        break



In [0]:
silver_stream.awaitTermination()
display(dbutils.fs.ls(SILVER_DELTA))

In [0]:
df = spark.read.format('delta').load(SILVER_DELTA)
display(df)

## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

I experimented with adjusting the `maxFilesPerTrigger` parameter several times to determine the optimal timing for processing. Based on the partitioning of the gold stream, a setting of 500 appeared to be appropriate.


In [0]:
silver_stream_df = (
    spark
    .readStream
    .format("delta")
    .option("cloudFiles.format", "json")
    .option("maxFilesPerTrigger", 500)
    .load(SILVER_DELTA)
)


Registering the model to be used to predict the sentiment and the propobability.

In [0]:
model_uri=f"models:/{MODEL_NAME}/production"
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)


### Gold Data - Silver Table Inference

timestamp - the timestamp from the silver data table rows<br>
mention - the mention from the silver data table rows<br>
cleaned_text - the cleaned_text from the silver data table rows<br>
sentiment - the given sentiment from the silver data table rows<br>
predicted_score - score out of 100 from the Hugging Face Sentiment Transformer<br>
predicted_sentiment - string representation of the sentiment<br>
sentiment_id - 0 for negative and 1 for postive associated with the given sentiment<br>
predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer


In [0]:
limited_df = silver_stream_df.limit(10)

Initially, I attempted to use a Pandas UDF, but the process was notably slow. Consequently, I decided to avoid using the UDF and instead applied a more direct method.


In [0]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
from pyspark.sql.functions import col, struct
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# @pandas_udf("predicted_sentiment string, predicted_score float", PandasUDFType.SCALAR)
# def classify_text_udf(texts: pd.Series) -> pd.DataFrame:
#     # Simulated model prediction (replace with actual model)
#     logger.info(f"Processing the following texts: {texts}")
#     texts = texts.tolist()
#     predictions = transformer_model.predict(texts)
#     # Ensure the conversion to string to match the expected schema
#     result_df = pd.DataFrame({
#         "predicted_sentiment": predictions['label'],
#         "predicted_score":  predictions['score']
#     })
#     return result_df



In [0]:
#Apply the UDF to the streaming DataFrame
#streaming_df = silver_stream_df.withColumn("predictions", classify_text_udf(col("cleaned_text")))

# using predict_udf directly
streaming_df = silver_stream_df.withColumn("predictions", predict_udf(col("cleaned_text")))
# Flatten the predictions into separate columns
streaming_df = streaming_df.withColumn("predicted_sentiment", col("predictions")['label'])
streaming_df = streaming_df.withColumn("predicted_score", col("predictions")['score'])
# You could perform additional transformations here as needed
streaming_df = streaming_df.withColumn(
    "sentiment_id", 
    when(col("sentiment") == 'negative', 0).otherwise(1)
).withColumn(
    "predicted_sentiment_id", 
    when(col("predicted_sentiment") == 'NEG', 0).otherwise(1)
)
 
# # Define the streaming query to output to a Delta Lake
gold_stream = (streaming_df
    .repartition(1000)
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", GOLD_CHECKPOINT)  # Ensures fault tolerance
    .option("mergeSchema", "true")
    .queryName("gold_stream")
    .start(GOLD_DELTA)
)



In [0]:

# Loop to wait for the Delta table to be initialized
while not is_delta_table_initialized(GOLD_DELTA):
    print("Waiting for Delta table to be initialized...")
    time.sleep(60)  # Check every minute

# Loop to monitor the stream and stop it based on row count
max_rows = total_entries
while gold_stream.isActive:
    current_row_count = get_row_count(GOLD_DELTA)
    print("current_row_count", current_row_count)
    if current_row_count < max_rows:
        time.sleep(300)  # Sleep for 5 minutes before checking again
        current_row_count = get_row_count(GOLD_DELTA)
        if current_row_count >= max_rows:
            gold_stream.stop()
            print(f"Stream stopped after reaching {current_row_count} rows.")
    else:
        gold_stream.stop()
        print(f"You have the rows there already!!.")
        break

In [0]:
gold_df = spark.read.format('delta').load(GOLD_DELTA)
display(gold_df)

In [0]:
num_rows = gold_df.count()
print(f"Number of rows in DataFrame: {num_rows}")


## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# Assuming gold_df is your DataFrame
# Convert Spark DataFrame to Pandas DataFrame if it's not already a Pandas DataFrame
gold_pd = gold_df.toPandas()

# Prepare data
y_true = gold_pd['sentiment_id']
y_pred = gold_pd['predicted_sentiment_id']

# Calculate metrics
precision = precision_score(y_true, y_pred, average='binary')
recall = recall_score(y_true, y_pred, average='binary')
f1 = f1_score(y_true, y_pred, average='binary')

print("precision", precision)
print("recall", recall)
print("f1", f1)

# Generate and save confusion matrix
plt.figure(figsize=(10,7))
cf_matrix = confusion_matrix(y_true, y_pred)
sns.heatmap(cf_matrix, annot=True, fmt='g')
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()
plt.savefig("/tmp/confusion_matrix.png")
plt.close()

# Start an MLflow run
with mlflow.start_run():
    # Log metrics
    mlflow.log_metric("Precision", precision)
    mlflow.log_metric("Recall", recall)
    mlflow.log_metric("F1 Score", f1)

    # Log the confusion matrix artifact
    mlflow.log_artifact("/tmp/confusion_matrix.png")

    # Log parameters
    mlflow.log_param("Model Name", "Tweet Sentiment")
    mlflow.log_param("MLflow Version", mlflow.__version__)

    # Log the version of the Delta Table if using Delta Lake
    delta_table_version = spark.sql("DESCRIBE HISTORY silver_delta").collect()[0]['version']
    mlflow.log_param("Input Table Version", delta_table_version)


## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

Application Data - Gold Table Aggregation

min_timestamp - the oldest timestamp on a given mention (@username)

max_timestamp - the newest timestamp on a given mention (@username)

mention - the user (@username) that this row pertains to.

negative - total negative tweets directed at this mention (@username)

neutral - total neutral tweets directed at this mention (@username)

positive - total positive tweets directed at this mention (@username)

In [0]:
# Aggregate data by mention and pivot on predicted sentiment to count occurrences
mention_sentiments = gold_df.groupBy("mention").pivot("predicted_sentiment", ["POS", "NEG", "NEU"]).count()
mention_sentiments = mention_sentiments.fillna(0)
# Calculate total mentions for each user
mention_sentiments = mention_sentiments.withColumn("total", F.col("POS") + F.col("NEG") + F.col("NEU"))

# Get the min and max timestamps for each mention
time_bounds = gold_df.groupBy("mention").agg(
    F.min("timestamp").alias("min_timestamp"),
    F.max("timestamp").alias("max_timestamp")
)

# Join the data to include timestamps
full_mention_data = mention_sentiments.join(time_bounds, "mention")

# Sort by total mentions in descending order
sorted_mention_data = full_mention_data.orderBy(F.desc("total"))

# Convert to Pandas DataFrame for plotting
mention_pd = sorted_mention_data.toPandas()

# Function to plot top mentions by sentiment
def plot_top_mentions(df, sentiment, color, title):
    top_mentions = df.nlargest(20, sentiment)
    plt.figure(figsize=(10, 8))
    plt.bar(top_mentions['mention'], top_mentions[sentiment], color=color)
    plt.xlabel('Mentions')
    plt.ylabel(f'Count of {sentiment.capitalize()} Tweets')
    plt.title(title)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()

# Widget to control the update of plots
dbutils.widgets.dropdown("update_plots", "No", ["Yes", "No"])
update = dbutils.widgets.get("update_plots")

if update == "Yes":
    # Plot for top 20 mentions with positive sentiment
    plot_top_mentions(mention_pd, 'positive', 'green', 'Top 20 Mentions by Positive Sentiment')
    
    # Plot for top 20 mentions with negative sentiment
    plot_top_mentions(mention_pd, 'negative', 'red', 'Top 20 Mentions by Negative Sentiment')



In [0]:
display(mention_sentiments)

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
active_streams = spark.streams.active

# Print out details of each active stream
if active_streams:
    for stream in active_streams:
        print(f"Stream Name: {stream.name}, Stream ID: {stream.id}, Is Active: {stream.isActive}")
else:
    print("No active streams.")

In [0]:
for stream in spark.streams.active:
    stream.stop()
    print(f"Stopped stream: {stream.name}")

In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()
elapsed_time = END_TIME - START_TIME  
print(f"Elapsed Time: {elapsed_time:.2f} seconds")

## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

### ENTER YOUR MARKDOWN HERE