## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
%pip install pyspark==3.1.2

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
# Core PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, input_file_name, current_timestamp, desc, udf, to_timestamp, expr, explode, count
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, DoubleType, ArrayType

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix

# Machine Learning
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# MLflow for model management
import mlflow
import mlflow.spark

# Delta Lake
from delta.tables import DeltaTable

## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# Setting up the Spark session with Delta support
source_files_df = get_source_listing_df()
display(source_files_df.head(10))

In [0]:
file_count = source_files_df.count()

In [0]:
show_s3_file_contents('voc_volume/0.json')

## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# Initialize Spark session with necessary configurations
spark = SparkSession.builder.appName("StreamToBronze").getOrCreate()

# Define the schema according to the expected raw data format
raw_data_schema = StructType([
    StructField("date", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True),
    StructField("sentiment", StringType(), True)
])

# Set up a read stream using CloudFiles with the schema and options configured
df_stream = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.maxFilesPerTrigger", 1000)  
    .schema(raw_data_schema)
    .load(TWEET_SOURCE_PATH)
    .withColumn("source_file", input_file_name())
    .withColumn("processing_time", current_timestamp())
)

# Define paths for Delta tables and checkpoint locations
bronze_table_path = BRONZE_DELTA
checkpoint_path = BRONZE_CHECKPOINT

# Set up a write stream to append transformed data to the Bronze Delta table
bronze_query = (
    df_stream.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", checkpoint_path)
    .option("path", bronze_table_path)
    .trigger(processingTime = "1 seconds")  # Ensure the stream processes the data once and stops
    .start()
)

## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


How many tweets are captured in your Bronze Table?

In [0]:
# Load the Bronze data
bronze_df = spark.read.format("delta").load(BRONZE_DELTA)

# Count the number of tweets
total_tweets = bronze_df.count()
print(f"Total number of tweets: {total_tweets}")

Are there any columns that contain Nan or Null values? If so how many and what will you do in your silver transforms to address this?

In [0]:
bronze_df = spark.read.format("delta").load(BRONZE_DELTA)
# Check for nulls in each column
null_counts = {column: bronze_df.filter(col(column).isNull()).count() for column in bronze_df.columns}
print("Null counts by column:", null_counts)

# Decide what to do with null values for your Silver table transforms
# Example: Fill nulls for 'text' column with a default string
bronze_df = bronze_df.na.fill({"text": "No text available"})


Count the number of tweets by each unique user handle and sort the data by descending count.

In [0]:
bronze_df = spark.read.format("delta").load(BRONZE_DELTA)
# Count the number of tweets by each user, sort, and cache the result for multiple actions
user_tweet_counts = bronze_df.groupBy("user").count().orderBy(desc("count"))
user_tweet_counts.cache()
user_tweet_counts.show()

How many tweets have at least one mention (@) how many tweet have no mentions (@)

In [0]:
# Accelerate queries with Delta: This query is on a Delta table with many small files. 
# To improve the performance of queries, run the OPTIMIZE command on the table delta.`dbfs:/tmp/labuser104917-3025700/bronze.delta`.
spark.sql("OPTIMIZE delta.`dbfs:/tmp/labuser104917-3025700/bronze.delta`")

# Combine adding column and counting in one step
mention_counts = bronze_df.select(expr("case when text like '%@%' then 1 else 0 end as has_mention")).groupBy("has_mention").count()
mention_counts.show()

Plot a bar chart that shows the top 20 tweeters (users)

In [0]:
# Collect only the necessary top 20 tweeters data 
user_tweet_counts = bronze_df.groupBy("user").count().orderBy(desc("count"))
top_tweeters = user_tweet_counts.limit(20).toPandas()

plt.figure(figsize=(10, 6))
plt.bar(top_tweeters['user'], top_tweeters['count'], color='blue')
plt.xlabel('User Handle')
plt.ylabel('Number of Tweets')
plt.title('Top 20 Tweeters')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()  # Adjust layout to make room for label rotation
plt.show()

## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
# Read from the Bronze Delta table as a stream
bronze_df = spark.readStream.format("delta").load(BRONZE_DELTA)

# UDF to extract mentions starting with '@' from text
extract_mentions = udf(lambda text: [word[1:] for word in text.split() if word.startswith('@') and len(word) > 1], ArrayType(StringType()))

# UDF to extract cleaned text
extract_cleaned_text = udf(lambda text: ' '.join([word for word in text.split() if not word.startswith('@')]))

# Filtering rows with having atleast one mentions
# Adding 'mention' and 'cleaned_text' columns using udf
bronze_to_silver = (bronze_df
    .select("date", "text", "sentiment")
    .filter(col("text").like("%@%"))
    .withColumn("mention_array", extract_mentions(col("text")))
    .withColumn("cleaned_text", extract_cleaned_text(col("text")))
)

# Explode the mention array column to get the row for each mention
bronze_to_silver = (bronze_to_silver
    .select("*", explode("mention_array").alias("mention"))
)

# Adding timestamp column using date column
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
bronze_to_silver = bronze_to_silver.withColumn("timestamp", to_timestamp(col("date"), "EEE MMM dd HH:mm:ss z yyyy"))

# Dropping the redundant columns
bronze_to_silver = bronze_to_silver.drop("date", "text", "mention_array")

In [0]:
# Write the transformed data to the Silver Delta table
silver_stream = (
    bronze_to_silver
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema", "true")  
    .option("checkpointLocation", SILVER_CHECKPOINT)
    .start(SILVER_DELTA)
)


In [0]:
# Load the Bronze data
silver_df = spark.read.format("delta").load(SILVER_DELTA)

# Count the number of tweets
total_tweets = silver_df.count()
print(f"Total number of tweets: {total_tweets}")

In [0]:
# Group by 'mention' and count each group
mention_counts = silver_df.groupBy("mention").count()

# Assign a name to the aggregated DataFrame
mention_counts_per_mention = mention_counts.withColumnRenamed("count", "total_mentions")

# Sort the DataFrame by 'total_mentions' in descending order
sorted_mentions = mention_counts_per_mention.orderBy(col("total_mentions").desc())

# Show the results
sorted_mentions.show()

## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
from pyspark.sql.functions import col, when 
import mlflow
sentiment_analysis_udf = mlflow.pyfunc.spark_udf(spark, "models:/HF_TWEET_SENTIMENT/Production")

# Setup a read stream on your silver delta table
silver_df = (
    spark.readStream
    .format("delta")
    .option("ignoreChanges", "true")
    .load(SILVER_DELTA)
)

#repartitioned_silver_df = silver_df.repartition(20)
# Use a spark UDF to parallelize the inference across your silver data
gold_df = silver_df.withColumn("predictions", sentiment_analysis_udf(col("cleaned_text"))) \
    .select(
        col("timestamp"),
        col("mention"),
        col("cleaned_text"),
        col("sentiment"),
        col("predictions.label").alias("predicted_sentiment"),
        col("predictions.score").alias("predicted_score"),
        when(col("sentiment") == "negative", 0).otherwise(1).alias("sentiment_id"),
        when(col("predictions.label") == "NEG", 0)
        .when(col("predictions.label") == "POS", 1)
        .when(col("predictions.label") == "NEU", -1)
        .alias("predicted_sentiment_id")
    ) 

# Setup a write stream to append to the gold delta table
gold_stream = gold_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", GOLD_CHECKPOINT) \
    .start(GOLD_DELTA)

In [0]:
# Load the Bronze data
gold_df = spark.read.format("delta").load(GOLD_DELTA)

# Count the number of tweets
total_tweets = gold_df.count()
print(f"Total number of tweets: {total_tweets}")

## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the model name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

Store the precision, recall, and F1-score as MLflow metrics

In [0]:
# Cast the integer columns to double
gold_df = gold_df.withColumn("sentiment_id", col("sentiment_id").cast(DoubleType())) \
                 .withColumn("predicted_sentiment_id", col("predicted_sentiment_id").cast(DoubleType()))

# Now you can use this adjusted DataFrame for your evaluations

# Setup evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="sentiment_id", 
    predictionCol="predicted_sentiment_id",
    metricName="accuracy"
)

# Compute accuracy, precision, recall, and F1-score
accuracy = evaluator.evaluate(gold_df)
evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(gold_df)
evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(gold_df)
evaluator.setMetricName("f1")
f1_score = evaluator.evaluate(gold_df)

# Log metrics to MLflow
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("precision", precision)
mlflow.log_metric("recall", recall)
mlflow.log_metric("f1_score", f1_score)


Store an image of the confusion matrix as an MLflow artifact

In [0]:
# Convert Spark DataFrame to Pandas DataFrame for plotting
pdf = gold_df.select("sentiment_id", "predicted_sentiment_id").toPandas()
conf_matrix = confusion_matrix(pdf["sentiment_id"], pdf["predicted_sentiment_id"])

plt.figure(figsize=(10, 7))
ax = sns.heatmap(conf_matrix, annot=True, fmt='d', cmap="gist_rainbow_r", cbar=True)
ax.set_title('Confusion Matrix')
ax.set_xlabel('Predicted Labels')
ax.set_ylabel('Actual Labels') 
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
plt.show()

Store the model name and the MLflow version that was used as an MLflow parameters

In [0]:
mlflow.log_param("Model Name", MODEL_NAME)
mlflow.log_param("Model Version", "1")


Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# Log model name and version
mlflow.log_param("model_name", "HF_TWEET_SENTIMENT")
mlflow.log_param("hf_model_name", "finietautomata/bertweet-base-sentiment-analysis")

# Retrieve and log the version of the Silver Delta table
silver_table = DeltaTable.forPath(spark, SILVER_DELTA)
version = silver_table.history(1).select("version").collect()[0][0]
mlflow.log_param("silver_delta_version", version)


## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

1. Count of Total Mentions in the Gold Data

In [0]:
# Count total mentions in the gold data
total_mentions_count = gold_df.select(count("mention")).collect()[0][0]
print(f"Total mentions in the gold data: {total_mentions_count}")

2. Count of Neutral, Positive, and Negative Tweets for Each Mention

In [0]:
# Aggregate counts of each sentiment type for each mention
sentiment_counts_per_mention = gold_df.groupBy("mention").agg(
    count(when(col("sentiment") == "positive", True)).alias("positive_count"),
    count(when(col("sentiment") == "negative", True)).alias("negative_count"),
    count(when(col("sentiment") == "neutral", True)).alias("neutral_count")
)
sentiment_counts_per_mention = sentiment_counts_per_mention.withColumn(
    "total_mentions", col("positive_count") + col("negative_count") + col("neutral_count")
)
sentiment_counts_per_mention.show()


4. Sort the Mention Count Totals in Descending Order

In [0]:
# Sort mentions by the total mentions in descending order
sorted_mentions = sentiment_counts_per_mention.orderBy(col("total_mentions").desc())
sorted_mentions.show()

5. Plot a Bar Chart of the Top 20 Mentions with Positive Sentiment

In [0]:
# Fetch top 20 mentions for positive sentiment and convert to pandas for plotting
top_positive_mentions = sorted_mentions.orderBy(col("positive_count").desc()).limit(20).toPandas()

# Plotting the top 20 mentions with positive sentiment
plt.figure(figsize=(12, 8))
plt.bar(top_positive_mentions['mention'], top_positive_mentions['positive_count'], color='green')
plt.xlabel('Mention')
plt.ylabel('Count of Positive Mentions')
plt.title('Top 20 Mentions with Positive Sentiment')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

6. Plot a Bar Chart of the Top 20 Mentions with Negative Sentiment

In [0]:
# Fetch top 20 mentions for negative sentiment and convert to pandas for plotting
top_negative_mentions = sorted_mentions.orderBy(col("negative_count").desc()).limit(20).toPandas()

# Plotting the top 20 mentions with negative sentiment
plt.figure(figsize=(12, 8))
plt.bar(top_negative_mentions['mention'], top_negative_mentions['negative_count'], color='red')
plt.xlabel('Mention')
plt.ylabel('Count of Negative Mentions')
plt.title('Top 20 Mentions with Negative Sentiment')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
for stream in spark.streams.active:
    print(stream.name, 'is running')
stop_all_streams()

In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()

In [0]:
END_TIME

## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

#### Spill Assessment:

Observation: We have carefully monitored for any signs of data spill across our system and found none. The consistency in partition sizes throughout various jobs and the small file sizes at the source, averaging around 1562 KiB for shuffle read and write operations in particular stages, support this finding.

Implication: The absence of oversized files, which could otherwise lead to disproportionate loading or processing burdens, significantly mitigates any performance issues that could arise from disk spill. This ensures a smoother operational flow and enhances the overall system efficiency.

#### Data Skew Analysis:

Observation: There is a notable absence of data skew in the execution of jobs. The evenness in file sizes across the system contributes significantly to this equilibrium. The utilization of Adaptive Query Execution (AQE) has been instrumental in optimizing partition management, which in turn helps in alleviating any potential skew that might have occurred otherwise.

Implication: Thanks to the balanced distribution of data across the system, we effectively ensure that no single node or process becomes a performance bottleneck due to uneven loads. This uniformity is crucial for maintaining high performance and operational consistency across the board.

#### Shuffle Operations:

Observation: Shuffle operations, indicated by the presence of wide transformations such as GroupBy, OrderBy, Aggregate, and Explode, are necessary and prevalent. By setting the number of shuffle partitions to be a multiple of the number of processor cores, we've optimized data movement across these partitions, reflected in the consistent shuffle read and write sizes of about 1562 KiB in specific stages.

Optimization Strategy: To enhance performance, we have configured the number of shuffle partitions to align as a multiple of the number of processor cores available. This strategic alignment optimizes the movement of data across partitions, which is essential for minimizing potential delays associated with shuffle operations.

Implication: Although shuffle operations are an inherent aspect of the processes we are undertaking, our strategic approach to configuring shuffle partitions effectively reduces the time overhead linked to these necessary operations.

#### Storage Optimization:

Observation: The initial storage of JSON files in the S3 bucket is suboptimal due to the presence of numerous small files. This initial setup results in less efficient read times, with the initial data read averaging 86.6 MiB and the data written to IO Cache being approximately 31.2 MiB.

Further Steps: Following the initial data read, we observe that data is stored more efficiently in the structured Bronze, Silver, and Gold Delta tables. This observation suggests that improvements should predominantly focus on the initial data ingestion phase. Strategies such as consolidating files prior to processing or modifying the reading mechanisms could offer significant enhancements in efficiency.

#### Serialization Efficiency:

Observation: TSerialization, particularly of the ML model for sentiment analysis implemented as a UDF, is the most time-consuming operation. This is especially evident in the time metrics observed where certain operations related to the UDF took upwards of 74 ms.

Alternative Considerations: Although it is not feasible to replace this particular UDF, optimizing other UDFs, especially those used for extracting mentions in the Silver table, could present substantial performance benefits. Furthermore, exploring the possibility of utilizing built-in Spark functions that perform similar tasks might reduce our reliance on custom UDFs. Custom UDFs are generally known to be less efficient and more resource-intensive compared to built-in functions, and this shift could potentially enhance operational efficiency and reduce processing times significantly.