<a href="https://colab.research.google.com/github/jayashalakshani/Mastodon-Youtube-Sentiment-Trend-Analyzer/blob/main/Speed_Layer_(Real_Time_Processing_with_Spark_).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Social Media Sentiment Analysis - Speed Layer Implementation
## Setup and Dependencies
!pip install pyspark pymongo transformers torch nltk pymongo[srv]

Collecting pymongo
  Downloading pymongo-4.12.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (22 kB)
Collecting dnspython<3.0.0,>=1.16.0 (from pymongo)
  Downloading dnspython-2.7.0-py3-none-any.whl.metadata (5.8 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)

In [None]:
"""
# Social Media Sentiment Analysis - Speed Layer Implementation

This script implements the speed layer for a lambda architecture that processes
social media data in real-time using Spark Structured Streaming. It analyzes sentiment
from Mastodon posts and YouTube comments, and stores the results in MongoDB.
"""

# Import required libraries
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode, split, when, lit
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, FloatType, TimestampType
import pymongo
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from nltk.tokenize import word_tokenize
import nltk

In [None]:
# Download necessary NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


True

In [None]:
# Mount Google Drive to save processed data and models
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Set project directory
PROJECT_DIR = '/content/drive/MyDrive/Colab Notebooks/Sentiment Analysis'
os.makedirs(PROJECT_DIR, exist_ok=True)

In [None]:
## Initialize Spark Session with Streaming configurations
spark = SparkSession.builder \
    .appName("SentimentAnalysisSpeedLayer") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

Spark version: 3.5.5


In [None]:
## MongoDB Connection
from google.colab import userdata
# Get credentials secret keys
username = userdata.get('mongodb_username')
password = userdata.get('mongodb_pw')
cluster_url = "cluster0.8ad48r1.mongodb.net"
MONGO_CONNECTION_STRING = f"mongodb+srv://{username}:{password}@{cluster_url}/?retryWrites=true&w=majority&appName=Cluster0"

def connect_to_mongodb():
    """Connect to MongoDB and return the database client."""
    try:
        client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
        print("Connected to MongoDB successfully!")
        return client
    except Exception as e:
        print(f"Failed to connect to MongoDB: {e}")
        return None

mongo_client = connect_to_mongodb()
# Create the new database for speed layer outputs
db = mongo_client["social_media_analytics_new"] if mongo_client else None

Connected to MongoDB successfully!


In [None]:
db.list_collection_names()

['batch_tag_sentiment',
 'youtube_tags_data',
 'mastodon_sentiment_data',
 'mastodon_tags_data',
 'youtube_unique_tag',
 'combined_sentiment',
 'youtube_sentiment_data',
 'mastodon_unique_tag',
 'batch_platform_sentiment']

In [None]:
def load_data_from_mongodb():
    """Load data from MongoDB collections into PySpark DataFrames."""

    # Create schema for Mastodon data
    mastodon_schema = StructType([
        StructField("tag", StringType(), True),
        StructField("text", StringType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("post_url", StringType(), True)
    ])

    # Create schema for YouTube data
    youtube_schema = StructType([
        StructField("tag", StringType(), True),
        StructField("text", StringType(), True),
        StructField("published_at", StringType(), True),
        StructField("video_id", StringType(), True),
        StructField("video_title", StringType(), True)
    ])

    # Load Mastodon data
    mastodon_data = []
    if db is not None:  # Changed from 'if db' to 'if db is not None'
        for doc in db.mastodon_sentiment_data.find():
            mastodon_data.append(doc)

    # Load YouTube data
    youtube_data = []
    if db is not None:  # Changed from 'if db' to 'if db is not None'
        for doc in db.youtube_sentiment_data.find():
            youtube_data.append(doc)

    # Create PySpark DataFrames
    mastodon_df = spark.createDataFrame(mastodon_data, schema=mastodon_schema) if mastodon_data else None
    youtube_df = spark.createDataFrame(youtube_data, schema=youtube_schema) if youtube_data else None

    return mastodon_df, youtube_df

In [None]:
# Load the data
mastodon_df, youtube_df = load_data_from_mongodb()

if mastodon_df:
    print(f"Loaded {mastodon_df.count()} Mastodon posts")
    mastodon_df.printSchema()
    mastodon_df.show(5, truncate=True)

if youtube_df:
    print(f"Loaded {youtube_df.count()} YouTube comments")
    youtube_df.printSchema()
    youtube_df.show(5, truncate=True)

Loaded 3 Mastodon posts
root
 |-- tag: string (nullable = true)
 |-- text: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- post_url: string (nullable = true)

+--------------------+--------------------+--------------------+--------------------+
|                 tag|                text|          created_at|            post_url|
+--------------------+--------------------+--------------------+--------------------+
|#nottheproblembut...|couple decades ag...| 2025-05-03 05:09:03|https://mastodon....|
|#nottheproblembut...|   short fuel enough|2025-05-03 05:25:...|https://mastodon....|
|#nottheproblembut...|sagethank outlook...| 2025-05-03 06:28:38|https://mastodon....|
+--------------------+--------------------+--------------------+--------------------+

Loaded 260 YouTube comments
root
 |-- tag: string (nullable = true)
 |-- text: string (nullable = true)
 |-- published_at: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- video_title: strin

In [None]:
## Data Preprocessing
def preprocess_mastodon_data(df):
    """Preprocess Mastodon data."""
    if df is None:
        return None

    # Convert timestamp to date format
    df = df.withColumn("date", col("created_at").cast("date"))

    # Clean content: Remove HTML tags, URLs, etc.
    clean_content_udf = udf(lambda x: clean_text(x), StringType())
    df = df.withColumn("clean_content", clean_content_udf(col("text")))

    # Filter out empty content
    df = df.filter(col("clean_content").isNotNull() & (col("clean_content") != ""))

    return df

def preprocess_youtube_data(df):
    """Preprocess YouTube data."""
    if df is None:
        return None

    # Convert publishedAt to date format
    df = df.withColumn("date", col("published_at").cast("date"))

    # Clean comment text
    clean_comment_udf = udf(lambda x: clean_text(x), StringType())
    df = df.withColumn("clean_commentText", clean_comment_udf(col("text")))

    # Filter out empty comments
    df = df.filter(col("clean_commentText").isNotNull() & (col("clean_commentText") != ""))

    return df

def clean_text(text):
    """Clean text by removing unwanted characters, HTML tags, links, etc."""
    if not text:
        return None

    import re
    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    # Remove mentions
    text = re.sub(r'@\w+', '', text)
    # Remove hashtag symbols but keep the text
    text = re.sub(r'#', '', text)
    # Remove special characters
    text = re.sub(r'[^\w\s]', '', text)
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    return text if text else None

In [None]:
# Process the data
processed_mastodon_df = preprocess_mastodon_data(mastodon_df)
processed_mastodon_df.show(5)
processed_youtube_df = preprocess_youtube_data(youtube_df)
processed_youtube_df.show(5)

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|                 tag|                text|          created_at|            post_url|      date|       clean_content|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|#nottheproblembut...|couple decades ag...| 2025-05-03 05:09:03|https://mastodon....|2025-05-03|couple decades ag...|
|#nottheproblembut...|   short fuel enough|2025-05-03 05:25:...|https://mastodon....|2025-05-03|   short fuel enough|
|#nottheproblembut...|sagethank outlook...| 2025-05-03 06:28:38|https://mastodon....|2025-05-03|sagethank outlook...|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+

+---------+--------------------+--------------------+-----------+--------------------+----------+--------------------+
|      tag|                text|        published_at| 

In [None]:
if processed_mastodon_df:
    print("Processed Mastodon data:")
    processed_mastodon_df.select("tag", "clean_content", "date").show(5, truncate=True)

if processed_youtube_df:
    print("Processed YouTube data:")
    processed_youtube_df.select("tag", "clean_commentText", "date").show(5, truncate=True)

Processed Mastodon data:
+--------------------+--------------------+----------+
|                 tag|       clean_content|      date|
+--------------------+--------------------+----------+
|#nottheproblembut...|couple decades ag...|2025-05-03|
|#nottheproblembut...|   short fuel enough|2025-05-03|
|#nottheproblembut...|sagethank outlook...|2025-05-03|
+--------------------+--------------------+----------+

Processed YouTube data:
+---------+--------------------+----------+
|      tag|   clean_commentText|      date|
+---------+--------------------+----------+
|new music|actually wanted s...|2025-05-03|
|new music|need part two sho...|2025-05-03|
|new music|lisa love song wo...|2025-05-03|
|new music|                lisa|2025-05-03|
|new music|                like|2025-05-03|
+---------+--------------------+----------+
only showing top 5 rows



In [None]:
## Sentiment Analysis Model
def load_sentiment_model():
    """Load a pre-trained sentiment analysis model."""
    model_name = "cardiffnlp/twitter-roberta-base-sentiment"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)
    return tokenizer, model

tokenizer, model = load_sentiment_model()

config.json:   0%|          | 0.00/747 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

In [None]:
def analyze_sentiment_transformers(text, tokenizer=tokenizer, model=model, max_length=512):
    """Analyze sentiment using Transformers."""
    if not text:
        return None

    # Truncate text if needed
    if len(text) > max_length * 4:  # rough character estimate
        text = text[:max_length * 4]

    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=max_length)

    with torch.no_grad():
        outputs = model(**inputs)
        scores = torch.nn.functional.softmax(outputs.logits, dim=1)
        sentiment_id = torch.argmax(scores).item()

    # Map sentiment ID to label (specific to the model)
    id2label = {0: "negative", 1: "neutral", 2: "positive"}
    sentiment = id2label[sentiment_id]

    return sentiment

In [None]:
# Register UDFs for data processing
sentiment_transformers_udf = udf(analyze_sentiment_transformers, StringType())

In [None]:
# Apply sentiment analysis to Mastodon data
if processed_mastodon_df:
    mastodon_with_sentiment = processed_mastodon_df.withColumn(
        "sentiment", sentiment_transformers_udf(col("clean_content"))
    )

    print("Mastodon data with sentiment:")
    mastodon_with_sentiment.select("clean_content", "sentiment").show(5, truncate=True)

# Apply sentiment analysis to YouTube data
if processed_youtube_df:
    youtube_with_sentiment = processed_youtube_df.withColumn(
        "sentiment", sentiment_transformers_udf(col("clean_commentText"))
    )

    print("YouTube data with sentiment:")
    youtube_with_sentiment.select("clean_commentText", "sentiment").show(5, truncate=True)

Mastodon data with sentiment:
+--------------------+---------+
|       clean_content|sentiment|
+--------------------+---------+
|couple decades ag...|  neutral|
|   short fuel enough|  neutral|
|sagethank outlook...|  neutral|
+--------------------+---------+

YouTube data with sentiment:
+--------------------+---------+
|   clean_commentText|sentiment|
+--------------------+---------+
|actually wanted s...| positive|
|need part two sho...|  neutral|
|lisa love song wo...| positive|
|                lisa|  neutral|
|                like|  neutral|
+--------------------+---------+
only showing top 5 rows



In [None]:
mastodon_with_sentiment.show(5)

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|                 tag|                text|          created_at|            post_url|      date|       clean_content|sentiment|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|#nottheproblembut...|couple decades ag...| 2025-05-03 05:09:03|https://mastodon....|2025-05-03|couple decades ag...|  neutral|
|#nottheproblembut...|   short fuel enough|2025-05-03 05:25:...|https://mastodon....|2025-05-03|   short fuel enough|  neutral|
|#nottheproblembut...|sagethank outlook...| 2025-05-03 06:28:38|https://mastodon....|2025-05-03|sagethank outlook...|  neutral|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+



In [None]:
## Create streaming sources from MongoDB for Mastodon and YouTube data
def create_streaming_sources():
    """Create streaming sources that continuously pull data from MongoDB collections."""

    # Setup MongoDB connection options for Spark
    mongo_options = {
        "uri": MONGO_CONNECTION_STRING,
        "database": "social_media_analytics",
        "collection": "mastodon_sentiment_data",
    }

    # Stream Mastodon data
    mastodon_stream = spark \
        .readStream \
        .format("mongodb") \
        .options(**mongo_options) \
        .load()

    # Update options for YouTube data
    mongo_options["collection"] = "youtube_sentiment_data"

    # Stream YouTube data
    youtube_stream = spark \
        .readStream \
        .format("mongodb") \
        .options(**mongo_options) \
        .load()

    return mastodon_stream, youtube_stream

In [None]:
## Aggregate and Analyze Results
from pyspark.sql.functions import col, explode, to_date
import pandas as pd

def analyze_sentiment_by_platform(mastodon_df=None, youtube_df=None):
    """Analyze sentiment distribution by platform, including platform type."""
    results = []

    # Mastodon sentiment distribution
    if mastodon_df is not None:
        print("Processing Mastodon sentiment distribution...")
        mastodon_sentiment_counts = mastodon_df.groupBy("sentiment").count().toPandas()
        if not mastodon_sentiment_counts.empty:
            mastodon_sentiment_counts['platform'] = 'mastodon'
            results.extend(mastodon_sentiment_counts[['platform', 'sentiment', 'count']].to_dict('records'))
        else:
            print("No Mastodon sentiment data found")
    else:
        print("Mastodon DataFrame is None")

    # YouTube sentiment distribution
    if youtube_df is not None:
        print("Processing YouTube sentiment distribution...")
        youtube_sentiment_counts = youtube_df.groupBy("sentiment").count().toPandas()
        if not youtube_sentiment_counts.empty:
            youtube_sentiment_counts['platform'] = 'youtube'
            results.extend(youtube_sentiment_counts[['platform', 'sentiment', 'count']].to_dict('records'))
        else:
            print("No YouTube sentiment data found")
    else:
        print("YouTube DataFrame is None")

    return results


def analyze_sentiment_over_time(mastodon_df=None, youtube_df=None):
    """Analyze sentiment trends over time, including platform type."""
    results = []

    # Mastodon sentiment over time
    if mastodon_df is not None:
        print("Processing Mastodon sentiment over time...")
        mastodon_df = mastodon_df.withColumn("date", to_date(col("created_at"))).withColumn("platform", lit("mastodon"))
        mastodon_time_trends = mastodon_df.groupBy("platform", "date", "sentiment").count().toPandas()
        if not mastodon_time_trends.empty:
            results.append(mastodon_time_trends)
        else:
            print("No Mastodon time trend data found")
    else:
        print("Mastodon DataFrame is None")

    # YouTube sentiment over time
    if youtube_df is not None:
        print("Processing YouTube sentiment over time...")
        youtube_df = youtube_df.withColumn("date", to_date(col("published_at"))).withColumn("platform", lit("youtube"))
        youtube_time_trends = youtube_df.groupBy("platform", "date", "sentiment").count().toPandas()
        if not youtube_time_trends.empty:
            results.append(youtube_time_trends)
        else:
            print("No YouTube time trend data found")
    else:
        print("YouTube DataFrame is None")

    # Combine results into a single DataFrame
    if results:
        return pd.concat(results, ignore_index=True)
    return pd.DataFrame()

def analyze_sentiment_by_tag(mastodon_df=None):
    """Analyze sentiment by tag (for Mastodon), including platform type."""
    if mastodon_df is None:
        print("Mastodon DataFrame is None")
        return None

    print("Processing Mastodon sentiment by tag...")
    # Add platform column
    mastodon_df = mastodon_df.withColumn("platform", lit("mastodon"))
    tag_sentiment = mastodon_df.groupBy("platform", "tag", "sentiment").count()

    # Get the most common tags
    top_tags = mastodon_df.groupBy("tag").count().orderBy(col("count").desc()).limit(20).toPandas()

    if not top_tags.empty:
        top_tag_list = top_tags["tag"].tolist()
        top_tag_sentiment = tag_sentiment.filter(col("tag").isin(top_tag_list)).toPandas()
        if not top_tag_sentiment.empty:
            return top_tag_sentiment
        else:
            print("No sentiment data for top tags")
            return None
    else:
        print("No tags found in Mastodon data")
        return None

In [None]:
# Run the analyses
platform_sentiment = analyze_sentiment_by_platform(mastodon_with_sentiment, youtube_with_sentiment)
time_sentiment = analyze_sentiment_over_time(mastodon_with_sentiment, youtube_with_sentiment)
tag_sentiment = analyze_sentiment_by_tag(mastodon_with_sentiment)

Processing Mastodon sentiment distribution...
Processing YouTube sentiment distribution...
Processing Mastodon sentiment over time...
Processing YouTube sentiment over time...
Processing Mastodon sentiment by tag...


In [None]:
from pyspark.sql.functions import col, to_timestamp, date_format, lit
# Extract time from timestamp in Spark
youtube_df = youtube_with_sentiment.withColumn("time", date_format(to_timestamp("published_at"), "hh:mm:ss a")) \
    .withColumn("Platform", lit("youtube")) \
    .withColumnRenamed("clean_commentText", "clean_content") \
    .select("Platform", "tag", "time", "date", "clean_content", "sentiment")

mastodon_df = mastodon_with_sentiment.withColumn("time", date_format(to_timestamp("created_at"), "hh:mm:ss a")) \
    .withColumn("Platform", lit("mastodon")) \
    .select("Platform", "tag", "time", "date", "clean_content", "sentiment")

# Combine both Spark DataFrames
combined_spark_df = mastodon_df.unionByName(youtube_df)

# Convert to Pandas for display or export
combined_df = combined_spark_df.toPandas()


# Preview the final DataFrame
print(combined_df.head())

   Platform                                tag         time        date  \
0  mastodon  #nottheproblembutsignoftheproblem  05:09:03 AM  2025-05-03   
1  mastodon  #nottheproblembutsignoftheproblem  05:25:17 AM  2025-05-03   
2  mastodon  #nottheproblembutsignoftheproblem  06:28:38 AM  2025-05-03   
3   youtube                          new music  06:37:29 AM  2025-05-03   
4   youtube                          new music  06:37:15 AM  2025-05-03   

                                       clean_content sentiment  
0  couple decades ago burning across hays plains ...   neutral  
1                                  short fuel enough   neutral  
2                             sagethank outlook life   neutral  
3  actually wanted song longer enjoy watching fol...  positive  
4               need part two short remind mrs smith   neutral  


In [None]:
## Save Processed Data for Serving Layer
batch_platform_sentiment = db['batch_platform_sentiment']
batch_tag_sentiment = db['batch_tag_sentiment']
combined_sentiment = db['combined_sentiment']

In [None]:
from datetime import datetime

def save_to_mongodb():
    """Save processed data back to MongoDB for the serving layer."""
    if db is None:
        print("MongoDB connection not available. Cannot save results.")
        return

    try:
        # Save platform sentiment distribution
        if platform_sentiment:
            print("Saving platform sentiment:", platform_sentiment)
            db.batch_platform_sentiment.delete_many({})
            db.batch_platform_sentiment.insert_one({
                "timestamp": datetime.now(),
                "data": platform_sentiment
            })
            print("✅ Saved platform sentiment distribution to MongoDB.")
        else:
            print("No platform sentiment data to save.")

        # Save tag sentiment analysis
        if tag_sentiment is not None and not tag_sentiment.empty:
            print("Saving tag sentiment:", tag_sentiment)
            tag_data = tag_sentiment.to_dict('records')
            db.batch_tag_sentiment.delete_many({})
            db.batch_tag_sentiment.insert_one({
                "timestamp": datetime.now(),
                "data": tag_data
            })
            print("✅ Saved tag sentiment analysis to MongoDB.")
        else:
            print("No tag sentiment data to save.")

        # Save combined sentiment data
        if combined_df is not None and not combined_df.empty:
            print("Saving combined sentiment data...")

            # Convert date to string to avoid BSON encoding error
            combined_df['date'] = combined_df['date'].astype(str)

            db.combined_sentiment.delete_many({})  # Clears old data
            db.combined_sentiment.insert_many(combined_df.to_dict("records"))

            print("✅ Combined sentiment data inserted into MongoDB.")
        else:
            print("No combined sentiment data to save.")

    except Exception as e:
        print(f"❌ Error saving to MongoDB: {str(e)}")


In [None]:
# Save results
save_to_mongodb()

Saving platform sentiment: [{'platform': 'mastodon', 'sentiment': 'neutral', 'count': 3}, {'platform': 'youtube', 'sentiment': 'positive', 'count': 79}, {'platform': 'youtube', 'sentiment': 'neutral', 'count': 149}, {'platform': 'youtube', 'sentiment': 'negative', 'count': 32}]
✅ Saved platform sentiment distribution to MongoDB.
Saving tag sentiment:    platform                                tag sentiment  count
0  mastodon  #nottheproblembutsignoftheproblem   neutral      3
✅ Saved tag sentiment analysis to MongoDB.
Saving combined sentiment data...
✅ Combined sentiment data inserted into MongoDB.


In [None]:
## Cleanup and Close Connections

# Close MongoDB connection
if mongo_client:
    mongo_client.close()
    print("Closed MongoDB connection.")

# Stop Spark session
spark.stop()
print("Stopped Spark session.")

print("\nSpeed Layer processing completed successfully!")

Closed MongoDB connection.
Stopped Spark session.

Speed Layer processing completed successfully!
