## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
# Basic imports for Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Imports for Delta Lake
from delta.tables import *
import delta

# MLflow for managing the machine learning model
import mlflow
import mlflow.pyfunc
from mlflow.tracking import MlflowClient

# Hugging Face Transformers for sentiment analysis
from transformers import AutoModelForSequenceClassification, AutoTokenizer

# Import for data visualization (optional, for EDA)
import matplotlib.pyplot as plt
import seaborn as sns

# Import for handling dates and times
from datetime import datetime

# Utilities
import os
import torch


Setting partitions

In [0]:
# Initialize or get the existing Spark session
spark = SparkSession.builder \
    .appName("Tweet Sentiment Analysis") \
    .config("spark.sql.shuffle.partitions", 8) \
    .getOrCreate()

# Check and set the shuffle partitions
print("Initial shuffle partitions:", spark.conf.get("spark.sql.shuffle.partitions"))
spark.conf.set("spark.sql.shuffle.partitions", 8)
print("Updated shuffle partitions:", spark.conf.get("spark.sql.shuffle.partitions"))

# Verify imports
print("Libraries imported successfully.")

## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# Start a Spark Session
spark = SparkSession.builder \
    .appName("Read S3 Data") \
    .getOrCreate()

# Define the path to your S3 bucket
s3_path = "s3a://voc-75-databricks-data/voc_volume/"

# Read the list of files in the directory using binaryFile format
file_list_df = spark.read.format("binaryFile").load(s3_path)

In [0]:
# List the files in the source directory
file_list = dbutils.fs.ls(TWEET_SOURCE_PATH)
file_list

In [0]:
# Count the number of files
number_of_files = len(file_list)
print(f"Number of files in the source directory: {number_of_files}")

# Read and print the contents of the first file, if it exists
if number_of_files > 0:
    # Take the path of the first file
    first_file_path = file_list[0].path
    # Assuming the files are in text format and small enough to be collected
    file_contents = spark.read.text(first_file_path).collect()
    
    # Print the contents 
    for line in file_contents:
        print(line.value)

In [0]:
# Get the path of the first file
first_file_path = file_list_df.select("path").first()[0]
print("Path of the first file:", first_file_path)

# Read and print the contents of the first file
first_file_df = spark.read.json(first_file_path)
print("Contents of the first file:")
first_file_df.show(truncate=False)

## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

In [0]:
# Define the schema based on the provided data dictionary
schema = StructType([
    StructField("date", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True),
    StructField("sentiment", StringType(), True),
    StructField("source_file", StringType(), True)
])

In [0]:
# Paths from the environment variables
TWEET_SOURCE_PATH = "s3a://voc-75-databricks-data/voc_volume/"
BRONZE_CHECKPOINT = "/tmp/labuser104917-3013268/bronze.checkpoint"
BRONZE_DELTA = "/tmp/labuser104917-3013268/bronze.delta"
SILVER_CHECKPOINT = "/tmp/labuser104917-3013268/silver.checkpoint"
SILVER_DELTA = "/tmp/labuser104917-3013268/silver.delta"
GOLD_CHECKPOINT = "/tmp/labuser104917-3013268/gold.checkpoint"
GOLD_DELTA = "/tmp/labuser104917-3013268/gold.delta"

In [0]:
# Setup the read stream
raw_stream = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(schema) \
    .load(TWEET_SOURCE_PATH)

In [0]:
# Transform the raw data to bronze format
bronze_data = raw_stream.select(
    col("date").alias("date"),
    col("user").alias("user"),
    col("text").alias("text"),
    col("sentiment").alias("sentiment"),
    input_file_name().alias("source_file"),
    current_timestamp().alias("processing_time")
)

In [0]:
# Setup the write stream
bronze_stream = bronze_data.writeStream \
    .format("delta") \
    .option("checkpointLocation", BRONZE_CHECKPOINT) \
    .option("path", BRONZE_DELTA) \
    .outputMode("append") \
    .queryName("bronze_stream") \
    .start()

In [0]:
def print_table_head(path_name, path, n=5):
    try:
        df = spark.read.format("delta").load(path)
        count = df.count()
        if count == 0:
            print(f"{path_name} is empty.")
        else:
            print(f"{path_name} is not empty. Row count: {count}")
            df.show(n, truncate=False)
    except Exception as e:
        print(f"Error reading Delta table {path_name} at {path}: {e}")

# Print the head of the Bronze Delta table
print_table_head("Bronze Delta", BRONZE_DELTA, n=5)

## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:
# Path variables
BRONZE_DELTA = "/tmp/labuser104917-3013268/bronze.delta"

# Read the Bronze table
bronze_df = spark.read.format("delta").load(BRONZE_DELTA)

# 1. How many tweets are captured in the Bronze Table?
total_tweets = bronze_df.count()
print(f"Total number of tweets captured in the Bronze Table: {total_tweets}")

In [0]:
# 2. Check for columns containing NaN or Null values
nan_null_stats = bronze_df.select([count(when(col(c).isNull(), c)).alias(c) for c in bronze_df.columns])
print("NaN/Null values in each column:")
nan_null_stats.show()

In [0]:
# 3. Count tweets by each unique user handle and sort by descending count
user_tweet_counts = bronze_df.groupBy("user").count().orderBy(desc("count"))
top_tweeters = user_tweet_counts.limit(20)
top_tweeters_pd = top_tweeters.toPandas()

print("Top 20 Tweeters (Users) by Tweet Count:")
print(top_tweeters_pd)

In [0]:
# 4. How many tweets have at least one mention (@)? How many tweets have no mentions (@)?
tweets_with_mention = bronze_df.filter(expr("text LIKE '%@%'")).count()
tweets_without_mention = total_tweets - tweets_with_mention

print(f"Number of tweets with at least one mention: {tweets_with_mention}")
print(f"Number of tweets with no mentions: {tweets_without_mention}")

In [0]:
# 5. Plot a bar chart showing the top 20 tweeters (users)
plt.figure(figsize=(12, 8))
plt.bar(top_tweeters_pd['user'], top_tweeters_pd['count'], color='blue')
plt.xlabel('User Handles')
plt.ylabel('Number of Tweets')
plt.title('Top 20 Tweeters')
plt.xticks(rotation=45)
plt.tight_layout()  # Adjust layout to not cut off labels
plt.show()

## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Silver Data Preprocessing") \
    .getOrCreate()

# Set legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [0]:
# Load the Bronze Delta table as a streaming DataFrame
bronze_stream_df = spark.readStream.format("delta").load(BRONZE_DELTA)

# Function to clean mentions from Bronze data
def clean_bronze(df: DataFrame) -> DataFrame:
    """
    Cleans the text column in the Bronze DataFrame by removing mentions (@username).
    
    Args:
        df (DataFrame): The input DataFrame representing the Bronze layer.

    Returns:
        DataFrame: A DataFrame with a cleaned `cleaned_text` column.
    """
    mention_pattern = r'@\w+'
    return df.withColumn("cleaned_text", regexp_replace(col("text"), mention_pattern, ""))

# Apply cleaning function
bronze_clean_df = clean_bronze(bronze_stream_df)

In [0]:
# Transform the Bronze data to Silver format
silver_df = bronze_clean_df.select(
    col('date').alias('timestamp'),
    col('text').contains('@').cast('integer').alias('mention'),
    col('cleaned_text'),
    col('sentiment')
)


In [0]:
# Write Silver data to the Delta table as a streaming DataFrame
silver_stream = silver_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", SILVER_CHECKPOINT) \
    .option("path", SILVER_DELTA) \
    .outputMode("append") \
    .queryName("silver_stream") \
    .start()


In [0]:
# Verify the Silver Delta table
silver_df = spark.read.format("delta").load(SILVER_DELTA)
row_count = silver_df.count()
print(f"Silver Delta Row count: {row_count}")
silver_df.show(5, truncate=False)

## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
%sql
OPTIMIZE bronze_delta

In [0]:
@pandas_udf("score: int, label: string")
def perform_model_inference(s: pd.Series) -> pd.DataFrame:
    predictions = loaded_model.predict(s.tolist())
    return pd.DataFrame({
        "score": predictions["score"].map(lambda x: int(x * 100)).tolist(),
        "label": predictions["label"].tolist(),
    })

In [0]:
model_uri = f"models:/{MODEL_NAME}/1"
loaded_model = mlflow.pyfunc.load_model(model_uri)

In [0]:
silver_df = spark.readStream.format("delta").load(SILVER_DELTA)
silver_df_1 = silver_df.limit(100)
gold_df = (silver_df
    .withColumn("sentiment_analysis", perform_model_inference(col("cleaned_text")))
    .withColumn("predicted_score", col("sentiment_analysis.score"))  
    .withColumn("predicted_sentiment", col("sentiment_analysis.label"))
    .withColumn("sentiment_id", when(col("sentiment") == "positive", 1).otherwise(0))
    .withColumn("predicted_sentiment_id", when(col("predicted_sentiment") == "POS", 1).otherwise(0))
    .select(
        "timestamp",
        "mention",
        "cleaned_text",
        "sentiment",
        "predicted_score",
        "predicted_sentiment",
        "sentiment_id",
        "predicted_sentiment_id"
    ))


In [0]:
gold_stream = (
    gold_df
    .writeStream
    .format("delta")
    .option("checkpointLocation", GOLD_CHECKPOINT)
    .outputMode("append")
    .queryName("gold_stream")
    .start(GOLD_DELTA)
)

In [0]:
# Initialize Spark Session
spark = SparkSession.builder.appName("View Gold Data").getOrCreate()

# Define the path to the Gold Delta table
gold_delta_path = "dbfs:/tmp/labuser104917-3013268/gold.delta"  # Adjust the path prefix as needed

# Load the Gold Delta table
gold_df = spark.read.format("delta").load(gold_delta_path)

# Show the first few rows of the DataFrame
print(gold_df.head())

# Print the schema to understand the data structure
gold_df.printSchema()

# Register the DataFrame as a temp view for SQL queries
gold_df.createOrReplaceTempView("gold_data")


## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# Load the gold data from the Delta table
gold_data = spark.read.format("delta").load(GOLD_DELTA)

# Cast the necessary columns to DoubleType
gold_data = gold_data.withColumn("sentiment_id", col("sentiment_id").cast(DoubleType()))
gold_data = gold_data.withColumn("predicted_sentiment_id", col("predicted_sentiment_id").cast(DoubleType()))

# Set up the evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="sentiment_id",
    predictionCol="predicted_sentiment_id"
)

# Evaluate the metrics
accuracy = evaluator.setMetricName("accuracy").evaluate(gold_data)
precision = evaluator.setMetricName("weightedPrecision").evaluate(gold_data)
recall = evaluator.setMetricName("weightedRecall").evaluate(gold_data)
f1 = evaluator.setMetricName("f1").evaluate(gold_data)

# Output the metrics
print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

# Optionally, log metrics to MLflow
with mlflow.start_run():
    mlflow.log_metric("Accuracy", accuracy)
    mlflow.log_metric("Precision", precision)
    mlflow.log_metric("Recall", recall)
    mlflow.log_metric("F1 Score", f1)
    # Ensure MLflow tracking server or client is properly set up if using this in a real project


In [0]:

# Create a confusion matrix
confusion_matrix = gold_data.crosstab("predicted_sentiment_id", "sentiment_id")

# Convert to Pandas DataFrame for plotting
confusion_matrix_pandas = confusion_matrix.toPandas()
confusion_matrix_pandas.set_index('predicted_sentiment_id_sentiment_id', inplace=True)

# Plot the confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_matrix_pandas, annot=True, fmt="d", cmap='Blues')
plt.title('Confusion Matrix')
plt.xlabel('Predicted Labels')
plt.ylabel('Actual Labels')


In [0]:
# Save the plot
confusion_matrix_path = '/dbfs/tmp/confusion_matrix.png'
plt.savefig(confusion_matrix_path)
plt.close()

In [0]:
# Log metrics and confusion matrix in MLflow
with mlflow.start_run():
    mlflow.log_metrics({'Accuracy': accuracy, 'Precision': precision, 'Recall': recall, 'F1 Score': f1})
    mlflow.log_artifact(confusion_matrix_path, "confusion_matrix.png")

    # Log model details
    mlflow.log_param("Model Name", MODEL_NAME)
    mlflow.log_param("MLflow Version", mlflow.__version__)

    # Log the version of the input Silver Delta Table
    delta_table = DeltaTable.forPath(spark, SILVER_DELTA)
    delta_version = delta_table.history().select('version').orderBy(desc("timestamp")).limit(1).collect()[0]['version']
    mlflow.log_param("Delta Table Version", str(delta_version))


## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
num = gold_data.filter(gold_data['mention']==1).count()
print(f"Total number of mentions: {num}")

In [0]:
df = gold_data.withColumn('positive',when(col('sentiment') == 'positive',1).otherwise(0))\
    .withColumn('negative',when(col('sentiment') == 'negative',1).otherwise(0))

result_df = df.groupBy("mention")\
                       .agg(sum("positive").alias("positive_count"),
                            sum("negative").alias("negative_count"),
                            count("mention").alias("total_mentions"))
                       
result_df = result_df.orderBy(col("total_mentions").desc())
result_df.show()

In [0]:

# Convert to Pandas DataFrame for plotting
result_pd = result_df.toPandas()

# Top 20 positive mentions
top_positive = result_pd.nlargest(20, 'positive_count')
plt.figure(figsize=(10, 8))
top_positive.plot.bar(x='mention', y='positive_count', color='green')
plt.title('Top 20 Mentions with Positive Sentiment')
plt.xlabel('Mention')
plt.ylabel('Positive Count')
plt.show()

# Top 20 negative mentions
top_negative = result_pd.nlargest(20, 'negative_count')
plt.figure(figsize=(10, 8))
top_negative.plot.bar(x='mention', y='negative_count', color='red')
plt.title('Top 20 Mentions with Negative Sentiment')
plt.xlabel('Mention')
plt.ylabel('Negative Count')
plt.show()
     

## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
# Get all active streams
active_streams = spark.streams.active

# Check if any streams are running
if len(active_streams) > 0:
    print("There are active streams running:")
    for stream in active_streams:
        print(stream.name)
else:
    print("No active streams running")

In [0]:
stop_all_streams()

In [0]:

END_TIME = time.time()
TOTAL_TIME = END_TIME - START_TIME
TOTAL_TIME_MINUTES = TOTAL_TIME / 60  # Convert seconds to minutes

print(f'It took {TOTAL_TIME_MINUTES:.2f} minutes to run all the notebook.')


## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

During my project
- Spill: It happens when there's insufficient memory (RAM), leading the worker to store data on the disk (ROM). Instances of spill are noted in stage details as Spill(Memory) and Spill(Disk). These columns don't show up if there aren't any spills. At the question 1, I set spark.sql.shuffle.partitions to be 8, as a result, my notebook doesn’t have any spill related issues.

https://drive.google.com/file/d/1IurENoYcgvKd1WDvPu_SXs7xY79-yEmm/view?usp=drive_link

https://drive.google.com/file/d/1DmW1Zd_jDHMJ89X7l1uTYoYekv6AXE27/view?usp=drive_link

https://drive.google.com/file/d/1KUDuICLDJUAy4butV3wmYUQinGThyu0Z/view?usp=drive_link

- Skew: Due to imbalance in partition size. Large variations in task durations and the size of the data processed per task. If certain tasks take much longer than others or process much more or much less data, this indicates skew. In my notebook, there is no skew in there.

https://drive.google.com/file/d/18HqEhHPZmkdA33kVwbw6wPa5ZTxcVJfg/view?usp=drive_link

https://drive.google.com/file/d/1sn1PwE7P9SMN-b4k5F7jwJWFbuV5q7nC/view?usp=drive_link

- Shuffle: No significant shuffle problems appear to be evident in the my notebook. Most of the tasks have relatively small shuffle write sizes (many are in the range of 142.0 B to 2.9 KiB). These small amounts indicate that the shuffle operation is not moving a lot of data, which is typically a good sign in terms of performance.

https://drive.google.com/file/d/1IurENoYcgvKd1WDvPu_SXs7xY79-yEmm/view?usp=drive_link

- Storage: In my notebook,there does not appear to be a significant storage problem concerning disk space or performance based on disk and cache metrics. 29.4 MiB is a very small amount of disk usage relative to the total disk capacity (463.9 GiB), which suggests there is no issue with disk space consumption.

https://drive.google.com/file/d/1s75R1pk6m2Q1EZsdYzt5R8e7M8RIzvlW/view?usp=drive_link

- Serialization: During my project seralization issue happens in gold stream step where I'm using a pandas udf to perform sentiment analysis. This function can't be optimized by Spark's engine, the serialization takes a long time. Before that i used standard udf,it didn't work A. However, panda udf is still better than using a standard udf function which doesn't utilize vectorization provided by pandas and Apache Arrow.

https://drive.google.com/file/d/1soqSk3vPLxXe3eEUIEzAr84o0SouHGnF/view?usp=drive_link
