## DSCC202-402 Data Science at Scale Final Project
### Tracking Tweet sentiment at scale using a pretrained transformer (classifier)
<p>Consider the following illustration of the end to end system that you will be building.  Each student should do their own work.  The project will demonstrate your understanding of Spark Streaming, the medalion data architecture using Delta Lake, Spark Inference at Scale using an MLflow packaged model as well as Exploritory Data Analysis and System Tracking and Monitoring.</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/pipeline.drawio.png">

<p>
You will be pulling an updated copy of the course GitHub repositiory: <a href="https://github.com/lpalum/dscc202-402-spring2024">The Repo</a>.  If you are unclear on how to pull an updated copy using the GitHub command line, the following <a href="https://techwritingmatters.com/how-to-update-your-forked-repository-on-github">document</a> is helpful.  Be sure to add the professors and TAs as collaborators on your project. 

- lpalum@gmail.com GitHub ID: lpalum
- ajay.anand@rochester.edu GitHub ID: ajayan12
- divyamunot1999@gmail.com GitHub ID: divyamunot
- ylong6@u.Rochester.edu GitHub ID: NinaLong2077

Once you have updates your fork of the repository you should see the following template project that is resident in the final_project directory.
</p>

<img src="https://data-science-at-scale.s3.amazonaws.com/images/notebooks.drawio.png">

<p>
You can then pull your project into the Databrick Workspace using the <a href="https://www.databricks.training/step-by-step/importing-courseware-from-github/index.html">Repos</a> feature.
Each student is expected to submit the URL of their project on GitHub with their code checked in on the main/master branch.  This illustration highlights the branching scheme that you may use to work on your code in steps and then merge your submission into your master branch before submitting.
</p>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/github.drawio.png">
<p>
Work your way through this notebook which will give you the steps required to submit a complete and compliant project.  The following illustration and associated data dictionary specifies the transformations and data that you are to generate for each step in the medallion pipeline.
</p>
<br><br>
<img src="https://data-science-at-scale.s3.amazonaws.com/images/dataframes.drawio.png">

#### Bronze Data - raw ingest
- date - string in the source json
- user - string in the source json
- text - tweet string in the source json
- sentiment - the given sentiment of the text as determined by an unknown model that is provided in the source json
- source_file - the path of the source json file the this row of data was read from
- processing_time - a timestamp of when you read this row from the source json

#### Silver Data - Bronze Preprocessing
- timestamp - convert date string in the bronze data to a timestamp
- mention - every @username mentioned in the text string in the bronze data gets a row in this silver data table.
- cleaned_text - the bronze text data with the mentions (@username) removed.
- sentiment - the given sentiment that was associated with the text in the bronze table.

#### Gold Data - Silver Table Inference
- timestamp - the timestamp from the silver data table rows
- mention - the mention from the silver data table rows
- cleaned_text - the cleaned_text from the silver data table rows
- sentiment - the given sentiment from the silver data table rows
- predicted_score - score out of 100 from the Hugging Face Sentiment Transformer
- predicted_sentiment - string representation of the sentiment
- sentiment_id - 0 for negative and 1 for postive associated with the given sentiment
- predicted_sentiment_id - 0 for negative and 1 for positive assocaited with the Hugging Face Sentiment Transformer

#### Application Data - Gold Table Aggregation
- min_timestamp - the oldest timestamp on a given mention (@username)
- max_timestamp - the newest timestamp on a given mention (@username)
- mention - the user (@username) that this row pertains to.
- negative - total negative tweets directed at this mention (@username)
- neutral - total neutral tweets directed at this mention (@username)
- positive - total positive tweets directed at this mention (@username)

When you are designing your approach, one of the main decisions that you will need to make is how you are going to orchestrate the streaming data processing in your pipeline.  There are several valid approaches.  First, you may choose to start the bronze_stream and let it complete (read and append all of the source data) before preceeding and starting up the silver_stream.  This approach has latency associated with it but it will allow your code to proceed in a linear fashion and process all the data by the end of your notebook execution.  Another potential approach is to start all the streams and have a "watch" method to determine when the pipeline has processed sufficient or all of the source data before stopping and displaying results.  Both of these approaches are valid and have different implications on how you will trigger your steams and how you will gate the execution of your pipeline.  Think through how you want to proceed and ask questions if you need guidance. The following references may be helpful:
- [Spark Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Databricks Autoloader - Cloudfiles](https://docs.databricks.com/en/ingestion/auto-loader/index.html)

### Be sure that your project runs end to end when *Run all* is executued on this notebook! (15 Points out of 60)

In [0]:
%run ./includes/includes

In [0]:
"""
Adding a widget to the notebook to control the clearing of a previous run.
or stopping the active streams using routines defined in the utilities notebook
"""
dbutils.widgets.removeAll()

dbutils.widgets.dropdown("clear_previous_run", "No", ["No","Yes"])
if (getArgument("clear_previous_run") == "Yes"):
    clear_previous_run()
    print("Cleared all previous data.")

dbutils.widgets.dropdown("stop_streams", "No", ["No","Yes"])
if (getArgument("stop_streams") == "Yes"):
    stop_all_streams()
    print("Stopped all active streams.")

from delta import *
dbutils.widgets.dropdown("optimize_tables", "No", ["No","Yes"])
if (getArgument("optimize_tables") == "Yes"):
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, BRONZE_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, SILVER_DELTA).optimize().executeCompaction()
    # Suck up those small files that we have been appending.
    DeltaTable.forPath(spark, GOLD_DELTA).optimize().executeCompaction()
    print("Optimized all of the Delta Tables")

## 1.0 Import your libraries here...
- Are your shuffle partitions consistent with your cluster and your workload?
- Do you have the necessary libraries to perform the required operations in the pipeline/application?

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType,TimestampType
from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import lit
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when
import matplotlib.pyplot as plt
from transformers import pipeline
from pyspark.sql.functions import col, from_unixtime, unix_timestamp, regexp_extract, regexp_replace,collect_list
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, IntegerType, FloatType
import mlflow.pyfunc
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import IntegerType, StringType, FloatType
import mlflow.pyfunc
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from pyspark.sql.functions import col
import matplotlib.pyplot as plt

## 2.0 Use the utility functions to ...
- Read the source file directory listing
- Count the source files (how many are there?)
- print the contents of one of the files

In [0]:
# Directory containing the source files
source_directory = "s3a://voc-75-databricks-data/voc_volume/"

# Function to count the source files and print their contents
def read_source_files(directory):
    # List all files in the directory
    files = dbutils.fs.ls(directory)
    
    # Count the number of source files
    num_files = len(files)
    print("Number of source files:", num_files)
    
    # Print the contents of one of the files (assuming at least one file exists)
    if num_files > 0:
        # Selecting the first file for demonstration
        file_to_read = files[0].path
        df = spark.read.json(file_to_read)
        df.show()
# Call the function with the source directory
read_source_files(source_directory)


## 3.0 Transform the Raw Data to Bronze Data using a stream
- define the schema for the raw data
- setup a read stream using cloudfiles and the source data format
- setup a write stream using cloudfiles to append to the bronze delta table
- enforce schema
- allow a new schema to be merged into the bronze delta table
- Use the defined BRONZE_CHECKPOINT and BRONZE_DELTA paths defines in the includes
- name your raw to bronze stream as bronze_stream
- transform the raw data to the bronze data using the data definition at the top of the notebook

**1. define the schema for the raw data**

In [0]:
# Defining the Schema
raw_data_schema = StructType([
    StructField("date", StringType(), nullable=True),
    StructField("sentiment", StringType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("user", StringType(), nullable=True),
    StructField("source_file", StringType(), nullable=True),
    StructField("processing_time",TimestampType(), nullable=True)
    # Add more fields as needed
])

**2. setup a read stream using cloudfiles and the source data format**

In [0]:
# setup a read stream using cloudfiles and the source data format
raw_stream=(spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema(raw_data_schema)
    .load(TWEET_SOURCE_PATH)\
    .withColumn("source_file", lit(TWEET_SOURCE_PATH))
    .withColumn("processing_time", current_timestamp())
)

%md
**3. setup a write stream using cloudfiles to append to the bronze delta table**

In [0]:
# setup a write stream using cloudfiles to append to the bronze delta table
brownz_data_1=(raw_stream
      .writeStream
      .format("delta")
      .option("checkpointLocation", BRONZE_CHECKPOINT)
      .option("mergeSchema", "true")
      .option("path",BRONZE_DELTA)
      .outputMode("append")
      .table("bronze_table"))



## 4.0 Bronze Data Exploratory Data Analysis
- How many tweets are captured in your Bronze Table?
- Are there any columns that contain Nan or Null values?  If so how many and what will you do in your silver transforms to address this?
- Count the number of tweets by each unique user handle and sort the data by descending count.
- How many tweets have at least one mention (@) how many tweet have no mentions (@)
- Plot a bar chart that shows the top 20 tweeters (users)


In [0]:

%sql
SELECT COUNT(*) FROM BRONZE_DELTA

In [0]:
%python
def count_nulls(df: DataFrame) -> DataFrame:
    exprs = [count(when(col(c).isNull(), c)).alias(c + '_nulls') for c in df.columns]
    null_counts_df = df.select(exprs)
    return null_counts_df

# Assuming df is your DataFrame loaded from the BRONZE_DELTA table
# Load your DataFrame
df_bronze = spark.read.table("BRONZE_DELTA")

# Call the function to get null counts
null_counts = count_nulls(df_bronze)

# Show the result
null_counts.show()


In [0]:
from pyspark.sql.functions import col

tweet_counts = df_bronze.groupBy('user').count().withColumnRenamed('count', 'tweet_count')
tweet_counts.sort(col('tweet_count').desc()).show()

In [0]:
# Filter the DataFrame to count tweets with mentions
tweets_with_mentions = df_bronze.filter(col("text").like("%@%"))
tweet_count_with_mentions = tweets_with_mentions.count()

# Count the total number of tweets
total_tweet_count = df_bronze.count()

# Calculate the number of tweets without mentions
tweet_count_without_mentions = total_tweet_count - tweet_count_with_mentions

# Display the results
print("Tweets with mentions:", tweet_count_with_mentions)
print("Tweets without mentions:", tweet_count_without_mentions)

In [0]:
import matplotlib.pyplot as plt

# Get the count of tweets per user
user_counts = df_bronze.groupBy("user").count()

# Order by count in descending order and limit to top 20 users
top_20_users = user_counts.orderBy("count", ascending=False).limit(20).toPandas()

# Plotting
plt.figure(figsize=(10, 8))
plt.bar(top_20_users['user'], top_20_users['count'], color='skyblue')
plt.title('Top 20 Tweeters')
plt.xlabel('User Handle')
plt.ylabel('Number of Tweets')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()



## 5.0 Transform the Bronze Data to Silver Data using a stream
- setup a read stream on your bronze delta table
- setup a write stream to append to the silver delta table
- Use the defined SILVER_CHECKPOINT and SILVER_DELTA paths in the includes
- name your bronze to silver stream as silver_stream
- transform the bronze data to the silver data using the data definition at the top of the notebook

In [0]:
bronze_stream = spark.readStream \
    .format("delta") \
    .table("bronze_table")

In [0]:
%sql
DESCRIBE HISTORY bronze_table;

In [0]:
from pyspark.sql.functions import udf, col, from_unixtime, unix_timestamp, regexp_replace, explode
from pyspark.sql.types import ArrayType, StringType
import re

# Define the UDF to extract mentions
def extract_mentions(text):
    return re.findall(r"@(\w+)", text)  # Extract usernames without '@'

# Register the UDF
extract_mentions_udf = udf(extract_mentions, ArrayType(StringType()))

# Use the UDF in the DataFrame transformation
silver_stream = bronze_stream.select(
    from_unixtime(unix_timestamp(col("date"), "EEE MMM dd HH:mm:ss z yyyy")).alias("timestamp"),
    regexp_replace(col("text"), "@[\w]+", "").alias("cleaned_text"),
    col("sentiment"),
    extract_mentions_udf(col("text")).alias("mentions")
).withColumn("mention", explode(col("mentions"))).drop("mentions")
  # Optional: drop the original mentions column if no longer needed





In [0]:
silver_stream_writer = silver_stream \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", SILVER_CHECKPOINT) \
    .option("path", SILVER_DELTA) \
    .table("silver_str")


## 6.0 Transform the Silver Data to Gold Data using a stream
- setup a read stream on your silver delta table
- setup a write stream to append to the gold delta table
- Use the defined GOLD_CHECKPOINT and GOLD_DELTA paths defines in the includes
- name your silver to gold stream as gold_stream
- transform the silver data to the gold data using the data definition at the top of the notebook
- Load the pretrained transformer sentiment classifier from the MODEL_NAME at the production level from the MLflow registry
- Use a spark UDF to parallelize the inference across your silver data

In [0]:
# Setup a read stream on the silver Delta table
silver_stream = spark.readStream \
    .format("delta") \
    .table("silver_str")


In [0]:
from mlflow.pyfunc import *
sentiment_analysis_udf = mlflow.pyfunc.spark_udf(spark, "models:/HF_TWEET_SENTIMENT/Production")

In [0]:
# Assuming sentiment_analysis_udf generates predictions.label and predictions.score columns
gold_stream = (
    silver_stream
    .withColumn("predictions", sentiment_analysis_udf(col("cleaned_text")))
    .select(
        col("timestamp"),
        col("mention"),
        col("cleaned_text"),
        col("sentiment"),
        col("predictions.label").alias("predicted_sentiment"),  # Renamed for clarity
        col("predictions.score").alias("predicted_sentiment_score"),   # Renamed for clarity
        when(col("sentiment") == "negative", 0).otherwise(1).alias("sentiment_id"),
        when(col("predictions.label") == "NEG", 0).otherwise(1).alias("predicted_sentiment_id")
    )
)


In [0]:
# Setup a write stream to append to the Gold Delta table
gold_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("ignoreDeletes", "true") \
    .option("checkpointLocation", GOLD_CHECKPOINT) \
    .option("path", GOLD_DELTA) \
    .table("gold_stream")


## 7.0 Capture the accuracy metrics from the gold table in MLflow
Store the following in an MLflow experiment run:
- Store the precision, recall, and F1-score as MLflow metrics
- Store an image of the confusion matrix as an MLflow artifact
- Store the mdoel name and the MLflow version that was used as an MLflow parameters
- Store the version of the Delta Table (input-silver) as an MLflow parameter

In [0]:
# Read the golden delta table
df = spark.read.format("delta").table("gold_stream")

# Convert DataFrame to Pandas for easier manipulation in Python
pdf = df.toPandas()


# Calculate metrics
precision = precision_score(pdf['sentiment_id'], pdf['predicted_sentiment_id'])
recall = recall_score(pdf['sentiment_id'], pdf['predicted_sentiment_id'])
f1 = f1_score(pdf['sentiment_id'], pdf['predicted_sentiment_id'])

# Generate confusion matrix
conf_matrix = confusion_matrix(pdf['sentiment_id'], pdf['predicted_sentiment_id'])

In [0]:
# Start an MLflow run
with mlflow.start_run():
    # Log metrics
    mlflow.log_metric("Precision", precision)
    mlflow.log_metric("Recall", recall)
    mlflow.log_metric("F1 Score", f1)

    # Log parameters
    mlflow.log_param("Model Name", "Hugging Face Sentiment Transformer")
    mlflow.log_param("MLflow Version", mlflow.__version__)
    mlflow.log_param("Delta Table Version", "Version X")

    # Plot and save confusion matrix
    plt.figure(figsize=(10, 7))
    disp = ConfusionMatrixDisplay(conf_matrix, display_labels=['Negative', 'Positive'])
    disp.plot(values_format='d')
    plt.title('Confusion Matrix')
    plt.savefig("/tmp/confusion_matrix.png")
    
    # Log confusion matrix as an artifact
    mlflow.log_artifact("/tmp/confusion_matrix.png")


## 8.0 Application Data Processing and Visualization
- How many mentions are there in the gold data total?
- Count the number of neutral, positive and negative tweets for each mention in new columns
- Capture the total for each mention in a new column
- Sort the mention count totals in descending order
- Plot a bar chart of the top 20 mentions with positive sentiment (the people who are in favor)
- Plot a bar chart of the top 20 mentions with negative sentiment (the people who are the vilians)

You may want to use the "Loop Application" widget to control whether you repeateded display the latest plots while the data comes in from your streams before moving on to the next section and cleaning up your run.

*note: A mention is a specific twitter user that has been "mentioned" in a tweet with an @user reference.

In [0]:
# Load the gold data table using Delta format
gold_data_final = spark.read.format("delta").table("gold_stream")

# Count the total number of unique mentions in the gold data
total_mentions = gold_data_final.select("mention").distinct().count()

print(f"Total number of unique mentions: {total_mentions}")


In [0]:
from pyspark.sql.functions import col, sum, when

# Load the gold data table


# Aggregate counts of negative, positive, and neutral sentiments for each mention
sentiment_counts = gold_data_final.groupBy("mention").agg(
    sum(when(col("Predicted_sentiment") == "NEG", 1).otherwise(0)).alias("negative_count"),
    sum(when(col("Predicted_sentiment") == "POS", 1).otherwise(0)).alias("positive_count"),
    sum(when(col("Predicted_sentiment") == "NEU", 1).otherwise(0)).alias("neutral_count")
)

# Calculate the total count of sentiments for each mention
total_counts = sentiment_counts.withColumn(
    "total_mentions", 
    col("negative_count") + col("positive_count") + col("neutral_count")
)

# Display the DataFrame with the counts
display(total_counts)


In [0]:
# Assuming total_sentiment_counts is a DataFrame with columns "mention", "total_mentions", "positive_count", and "negative_count"
# Sort mentions by total counts in descending order
mentions_sorted_by_total = total_counts.orderBy(col("total_mentions"), ascending=False)

# Get the top 20 mentions with the highest positive sentiment counts
top_20_positive = mentions_sorted_by_total.orderBy(col("positive_count").desc()).limit(20)

# Get the top 20 mentions with the highest negative sentiment counts
top_20_negative = mentions_sorted_by_total.orderBy(col("negative_count").desc()).limit(20)

# Convert to Pandas DataFrames for visualization
positive_mentions_df = top_20_positive.toPandas()
negative_mentions_df = top_20_negative.toPandas()

# Create a bar chart for the top 20 mentions with positive sentiment
plt.figure(figsize=(10, 5))
plt.bar(positive_mentions_df['mention'], positive_mentions_df['positive_count'], color='green')
plt.title('Top 20 Mentions by Positive Sentiment')
plt.xlabel('Mentions')
plt.ylabel('Count of Positive Sentiments')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Create a bar chart for the top 20 mentions with negative sentiment
plt.figure(figsize=(10, 5))
plt.bar(negative_mentions_df['mention'], negative_mentions_df['negative_count'], color='red')
plt.title('Top 20 Mentions by Negative Sentiment')
plt.xlabel('Mentions')
plt.ylabel('Count of Negative Sentiments')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()



## 9.0 Clean up and completion of your pipeline
- using the utilities what streams are running? If any.
- Stop all active streams
- print out the elapsed time of your notebook.

In [0]:
import time

stop_all_streams()

start_time = time.time()

# Your notebook code goes here

elapsed_time = time.time() - start_time
print("Elapsed time:", elapsed_time, "seconds")

In [0]:
import shutil 
shutil.rmtree(SILVER_CHECKPOINT)
shutil.rmtree(GOLD_CHECKPOINT)

from pyspark.sql import SparkSession
import shutil
import time

# Assuming stop_all_streams() is defined elsewhere

# Assuming START_TIME is defined elsewhere
START_TIME = time.time()

# Spark Session
spark = SparkSession.builder \
    .appName("Delete Delta and Checkpoints") \
    .getOrCreate()

def delete_delta_and_checkpoints(delta_path, checkpoint_path):
    # Delete Delta Table Path
    try:
        shutil.rmtree(delta_path)
        print(f"Deleted delta table at {delta_path}")
    except Exception as e:
        print(f"Error deleting delta table at {delta_path}: {str(e)}")

    # Delete Checkpoint Path
    try:
        shutil.rmtree(checkpoint_path)
        print(f"Deleted checkpoint at {checkpoint_path}")
    except Exception as e:
        print(f"Error deleting checkpoint at {checkpoint_path}: {str(e)}")

# Assuming GOLD_DELTA and GOLD_CHECKPOINT are defined elsewhere
delta_table_path = "/path/to/gold/delta"
checkpoint_path = "/path/to/gold/checkpoint"

delete_delta_and_checkpoints(BRONZE_DELTA,BRONZE_CHECKPOINT)
delete_delta_and_checkpoints(SILVER_DELTA,SILVER_CHECKPOINT)
delete_delta_and_checkpoints(GOLD_DELTA,GOLD_CHECKPOINT)


In [0]:
# Get the notebooks ending time note START_TIME was established in the include file when the notebook started.
END_TIME = time.time()

## 10.0 How Optimized is your Spark Application (Grad Students Only)
Graduate students (registered for the DSCC-402 section of the course) are required to do this section.  This is a written analysis using the Spark UI (link to screen shots) that support your analysis of your pipelines execution and what is driving its performance.
Recall that Spark Optimization has 5 significant dimensions of considertation:
- Spill: write to executor disk due to lack of memory
- Skew: imbalance in partition size
- Shuffle: network io moving data between executors (wide transforms)
- Storage: inefficiency due to disk storage format (small files, location)
- Serialization: distribution of code segments across the cluster

Comment on each of the dimentions of performance and how your impelementation is or is not being affected.  Use specific information in the Spark UI to support your description.  

Note: you can take sreenshots of the Spark UI from your project runs in databricks and then link to those pictures by storing them as a publicly accessible file on your cloud drive (google, one drive, etc.)

References:
- [Spark UI Reference Reference](https://spark.apache.org/docs/latest/web-ui.html#web-ui)
- [Spark UI Simulator](https://www.databricks.training/spark-ui-simulator/index.html)

"Question 6 presented a significant challenge for me, involving the creation of gold-level data from silver-level data within the Mandelion architecture. Now, I'm aiming to conduct Spark optimization analysis on this process.

For starters, I'll inspect the Spark UI's 'Storage' tab to identify instances where data spillage to disk occurs due to memory constraints. This will be evident in metrics like 'Memory Spill' and 'Disk Spill,' while also keeping an eye on Twitter data measures, particularly cache hit ratio, which appears to be remarkably low.

Moving on, I'll address skew concerns by examining the 'Stages' tab. Any significant discrepancies in task execution times might signal data skew issues, possibly caused by certain tasks processing disproportionately larger volumes of data. Additionally, I'll check the data distribution across partitions to identify potential skew using the 'Storage' tab.

Shuffle operations are another critical aspect to consider, as they often pose a bottleneck. By scrutinizing the 'Stages' tab for metrics like 'Shuffle Read' and 'Shuffle Write,' I'll pinpoint stages with high shuffle read/write times, indicating their impact on overall job runtime.

Furthermore, I'll assess storage inefficiencies, such as small files or suboptimal data formats, using the 'Storage' tab. This analysis will help me identify performance-hindering issues caused by factors like numerous small files or uncompressed data formats.

Lastly, I'll pay close attention to serialization concerns, which could arise from inefficient data transfer between executors or suboptimal serialization formats. By examining serialization/deserialization times under the 'Tasks' tab, I can pinpoint any bottlenecks resulting from serialization issues."
