In [None]:
# Social Media Sentiment Analysis - Speed Layer Implementation
## Setup and Dependencies
!pip install pyspark pymongo transformers torch nltk pymongo[srv]

In [None]:
"""
# Social Media Sentiment Analysis - Speed Layer Implementation

This script implements the speed layer for a lambda architecture that processes
social media data in real-time using Spark Structured Streaming. It analyzes sentiment
from Mastodon posts and YouTube comments, and stores the results in MongoDB.
"""

# Import required libraries
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode, split, when, lit
from pyspark.sql.types import StringType, ArrayType, StructType, StructField, IntegerType, FloatType, TimestampType
import pymongo
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from nltk.tokenize import word_tokenize
import nltk

In [None]:
# Download necessary NLTK data
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('punkt_tab')

In [None]:
# # Mount Google Drive to save processed data and models
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# Set project directory
PROJECT_DIR = '/content/drive/MyDrive/Colab Notebooks/Sentiment Analysis'
os.makedirs(PROJECT_DIR, exist_ok=True)

In [None]:
## Initialize Spark Session with Streaming configurations
spark = SparkSession.builder \
    .appName("SentimentAnalysisSpeedLayer") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.streaming.schemaInference", "true") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

In [None]:
## MongoDB Connection
from google.colab import userdata
# Get credentials secret keys
username = userdata.get('mongodb_username')
password = userdata.get('mongodb_pw')
cluster_url = "cluster0.8ad48r1.mongodb.net"
MONGO_CONNECTION_STRING = f"mongodb+srv://{username}:{password}@{cluster_url}/?retryWrites=true&w=majority&appName=Cluster0"

def connect_to_mongodb():
    """Connect to MongoDB and return the database client."""
    try:
        client = pymongo.MongoClient(MONGO_CONNECTION_STRING)
        print("Connected to MongoDB successfully!")
        return client
    except Exception as e:
        print(f"Failed to connect to MongoDB: {e}")
        return None

mongo_client = connect_to_mongodb()
# Create the new database for speed layer outputs
db = mongo_client["social_media_analytics_new"] if mongo_client else None

In [None]:
db.list_collection_names()

In [None]:
def load_data_from_mongodb():
    """Load data from MongoDB collections into PySpark DataFrames."""

    # Create schema for Mastodon data
    mastodon_schema = StructType([
        StructField("tag", StringType(), True),
        StructField("text", StringType(), True),
        StructField("created_at", TimestampType(), True),
        StructField("post_url", StringType(), True)
    ])

    # Create schema for YouTube data
    youtube_schema = StructType([
        StructField("tag", StringType(), True),
        StructField("text", StringType(), True),
        StructField("published_at", StringType(), True),
        StructField("video_id", StringType(), True),
        StructField("video_title", StringType(), True)
    ])

    # Load Mastodon data
    mastodon_data = []
    if db is not None:  # Changed from 'if db' to 'if db is not None'
        for doc in db.mastodon_sentiment_data.find():
            mastodon_data.append(doc)

    # Load YouTube data
    youtube_data = []
    if db is not None:  # Changed from 'if db' to 'if db is not None'
        for doc in db.youtube_sentiment_data.find():
            youtube_data.append(doc)

    # Create PySpark DataFrames
    mastodon_df = spark.createDataFrame(mastodon_data, schema=mastodon_schema) if mastodon_data else None
    youtube_df = spark.createDataFrame(youtube_data, schema=youtube_schema) if youtube_data else None

    return mastodon_df, youtube_df

In [None]:
# Load the data
mastodon_df, youtube_df = load_data_from_mongodb()

if mastodon_df:
    print(f"Loaded {mastodon_df.count()} Mastodon posts")
    mastodon_df.printSchema()
    mastodon_df.show(5, truncate=True)

if youtube_df:
    print(f"Loaded {youtube_df.count()} YouTube comments")
    youtube_df.printSchema()
    youtube_df.show(5, truncate=True)

In [None]:
## Data Preprocessing
def preprocess_mastodon_data(df):
    """Preprocess Mastodon data."""
    if df is None:
        return None

    # Convert timestamp to date format
    df = df.withColumn("date", col("created_at").cast("date"))

    # Clean content: Remove HTML tags, URLs, etc.
    clean_content_udf = udf(lambda x: clean_text(x), StringType())
    df = df.withColumn("clean_content", clean_content_udf(col("text")))

    # Filter out empty content
    df = df.filter(col("clean_content").isNotNull() & (col("clean_content") != ""))

    return df

def preprocess_youtube_data(df):
    """Preprocess YouTube data."""
    if df is None:
        return None

    # Convert publishedAt to date format
    df = df.withColumn("date", col("published_at").cast("date"))

    # Clean comment text
    clean_comment_udf = udf(lambda x: clean_text(x), StringType())
    df = df.withColumn("clean_commentText", clean_comment_udf(col("text")))

    # Filter out empty comments
    df = df.filter(col("clean_commentText").isNotNull() & (col("clean_commentText") != ""))

    return df

def clean_text(text):
    """Clean text by removing unwanted characters, HTML tags, links, etc."""
    if not text:
        return None

    import re
    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    # Remove URLs
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    # Remove mentions
    text = re.sub(r'@\w+', '', text)
    # Remove hashtag symbols but keep the text
    text = re.sub(r'#', '', text)
    # Remove special characters
    text = re.sub(r'[^\w\s]', '', text)
    # Remove extra whitespace
    text = re.sub(r'\s+', ' ', text).strip()

    return text if text else None

In [None]:
# Process the data
processed_mastodon_df = preprocess_mastodon_data(mastodon_df)
processed_mastodon_df.show(5)
processed_youtube_df = preprocess_youtube_data(youtube_df)
processed_youtube_df.show(5)

In [None]:
if processed_mastodon_df:
    print("Processed Mastodon data:")
    processed_mastodon_df.select("tag", "clean_content", "date").show(5, truncate=True)

if processed_youtube_df:
    print("Processed YouTube data:")
    processed_youtube_df.select("tag", "clean_commentText", "date").show(5, truncate=True)

In [None]:
## Sentiment Analysis Model
def load_sentiment_model():
    """Load a pre-trained sentiment analysis model."""
    model_name = "cardiffnlp/twitter-roberta-base-sentiment"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForSequenceClassification.from_pretrained(model_name)
    return tokenizer, model

tokenizer, model = load_sentiment_model()

In [None]:
def analyze_sentiment_transformers(text, tokenizer=tokenizer, model=model, max_length=512):
    """Analyze sentiment using Transformers."""
    if not text:
        return None

    # Truncate text if needed
    if len(text) > max_length * 4:  # rough character estimate
        text = text[:max_length * 4]

    inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=max_length)

    with torch.no_grad():
        outputs = model(**inputs)
        scores = torch.nn.functional.softmax(outputs.logits, dim=1)
        sentiment_id = torch.argmax(scores).item()

    # Map sentiment ID to label (specific to the model)
    id2label = {0: "negative", 1: "neutral", 2: "positive"}
    sentiment = id2label[sentiment_id]

    return sentiment

In [None]:
# Register UDFs for data processing
sentiment_transformers_udf = udf(analyze_sentiment_transformers, StringType())

In [None]:
# Apply sentiment analysis to Mastodon data
if processed_mastodon_df:
    mastodon_with_sentiment = processed_mastodon_df.withColumn(
        "sentiment", sentiment_transformers_udf(col("clean_content"))
    )

    print("Mastodon data with sentiment:")
    mastodon_with_sentiment.select("clean_content", "sentiment").show(5, truncate=True)

# Apply sentiment analysis to YouTube data
if processed_youtube_df:
    youtube_with_sentiment = processed_youtube_df.withColumn(
        "sentiment", sentiment_transformers_udf(col("clean_commentText"))
    )

    print("YouTube data with sentiment:")
    youtube_with_sentiment.select("clean_commentText", "sentiment").show(5, truncate=True)

In [None]:
mastodon_with_sentiment.show(5)

In [None]:
## Create streaming sources from MongoDB for Mastodon and YouTube data
def create_streaming_sources():
    """Create streaming sources that continuously pull data from MongoDB collections."""

    # Setup MongoDB connection options for Spark
    mongo_options = {
        "uri": MONGO_CONNECTION_STRING,
        "database": "social_media_analytics",
        "collection": "mastodon_sentiment_data",
    }

    # Stream Mastodon data
    mastodon_stream = spark \
        .readStream \
        .format("mongodb") \
        .options(**mongo_options) \
        .load()

    # Update options for YouTube data
    mongo_options["collection"] = "youtube_sentiment_data"

    # Stream YouTube data
    youtube_stream = spark \
        .readStream \
        .format("mongodb") \
        .options(**mongo_options) \
        .load()

    return mastodon_stream, youtube_stream

In [None]:
## Aggregate and Analyze Results
from pyspark.sql.functions import col, explode, to_date
import pandas as pd

def analyze_sentiment_by_platform(mastodon_df=None, youtube_df=None):
    """Analyze sentiment distribution by platform, including platform type."""
    results = []

    # Mastodon sentiment distribution
    if mastodon_df is not None:
        print("Processing Mastodon sentiment distribution...")
        mastodon_sentiment_counts = mastodon_df.groupBy("sentiment").count().toPandas()
        if not mastodon_sentiment_counts.empty:
            mastodon_sentiment_counts['platform'] = 'mastodon'
            results.extend(mastodon_sentiment_counts[['platform', 'sentiment', 'count']].to_dict('records'))
        else:
            print("No Mastodon sentiment data found")
    else:
        print("Mastodon DataFrame is None")

    # YouTube sentiment distribution
    if youtube_df is not None:
        print("Processing YouTube sentiment distribution...")
        youtube_sentiment_counts = youtube_df.groupBy("sentiment").count().toPandas()
        if not youtube_sentiment_counts.empty:
            youtube_sentiment_counts['platform'] = 'youtube'
            results.extend(youtube_sentiment_counts[['platform', 'sentiment', 'count']].to_dict('records'))
        else:
            print("No YouTube sentiment data found")
    else:
        print("YouTube DataFrame is None")

    return results


def analyze_sentiment_over_time(mastodon_df=None, youtube_df=None):
    """Analyze sentiment trends over time, including platform type."""
    results = []

    # Mastodon sentiment over time
    if mastodon_df is not None:
        print("Processing Mastodon sentiment over time...")
        mastodon_df = mastodon_df.withColumn("date", to_date(col("created_at"))).withColumn("platform", lit("mastodon"))
        mastodon_time_trends = mastodon_df.groupBy("platform", "date", "sentiment").count().toPandas()
        if not mastodon_time_trends.empty:
            results.append(mastodon_time_trends)
        else:
            print("No Mastodon time trend data found")
    else:
        print("Mastodon DataFrame is None")

    # YouTube sentiment over time
    if youtube_df is not None:
        print("Processing YouTube sentiment over time...")
        youtube_df = youtube_df.withColumn("date", to_date(col("published_at"))).withColumn("platform", lit("youtube"))
        youtube_time_trends = youtube_df.groupBy("platform", "date", "sentiment").count().toPandas()
        if not youtube_time_trends.empty:
            results.append(youtube_time_trends)
        else:
            print("No YouTube time trend data found")
    else:
        print("YouTube DataFrame is None")

    # Combine results into a single DataFrame
    if results:
        return pd.concat(results, ignore_index=True)
    return pd.DataFrame()

def analyze_sentiment_by_tag(mastodon_df=None):
    """Analyze sentiment by tag (for Mastodon), including platform type."""
    if mastodon_df is None:
        print("Mastodon DataFrame is None")
        return None

    print("Processing Mastodon sentiment by tag...")
    # Add platform column
    mastodon_df = mastodon_df.withColumn("platform", lit("mastodon"))
    tag_sentiment = mastodon_df.groupBy("platform", "tag", "sentiment").count()

    # Get the most common tags
    top_tags = mastodon_df.groupBy("tag").count().orderBy(col("count").desc()).limit(20).toPandas()

    if not top_tags.empty:
        top_tag_list = top_tags["tag"].tolist()
        top_tag_sentiment = tag_sentiment.filter(col("tag").isin(top_tag_list)).toPandas()
        if not top_tag_sentiment.empty:
            return top_tag_sentiment
        else:
            print("No sentiment data for top tags")
            return None
    else:
        print("No tags found in Mastodon data")
        return None

In [None]:
# Run the analyses
platform_sentiment = analyze_sentiment_by_platform(mastodon_with_sentiment, youtube_with_sentiment)
time_sentiment = analyze_sentiment_over_time(mastodon_with_sentiment, youtube_with_sentiment)
tag_sentiment = analyze_sentiment_by_tag(mastodon_with_sentiment)

In [None]:
from pyspark.sql.functions import col, to_timestamp, date_format, lit
# Extract time from timestamp in Spark
youtube_df = youtube_with_sentiment.withColumn("time", date_format(to_timestamp("published_at"), "hh:mm:ss a")) \
    .withColumn("Platform", lit("youtube")) \
    .withColumnRenamed("clean_commentText", "clean_content") \
    .select("Platform", "tag", "time", "date", "clean_content", "sentiment")

mastodon_df = mastodon_with_sentiment.withColumn("time", date_format(to_timestamp("created_at"), "hh:mm:ss a")) \
    .withColumn("Platform", lit("mastodon")) \
    .select("Platform", "tag", "time", "date", "clean_content", "sentiment")

# Combine both Spark DataFrames
combined_spark_df = mastodon_df.unionByName(youtube_df)

# Convert to Pandas for display or export
combined_df = combined_spark_df.toPandas()


# Preview the final DataFrame
print(combined_df.head())

In [None]:
## Save Processed Data for Serving Layer
batch_platform_sentiment = db['batch_platform_sentiment']
batch_tag_sentiment = db['batch_tag_sentiment']
combined_sentiment = db['combined_sentiment']

In [None]:
from datetime import datetime

def save_to_mongodb():
    """Save processed data back to MongoDB for the serving layer."""
    if db is None:
        print("MongoDB connection not available. Cannot save results.")
        return

    try:
        # Save platform sentiment distribution
        if platform_sentiment:
            print("Saving platform sentiment:", platform_sentiment)
            db.batch_platform_sentiment.delete_many({})
            db.batch_platform_sentiment.insert_one({
                "timestamp": datetime.now(),
                "data": platform_sentiment
            })
            print("✅ Saved platform sentiment distribution to MongoDB.")
        else:
            print("No platform sentiment data to save.")

        # Save tag sentiment analysis
        if tag_sentiment is not None and not tag_sentiment.empty:
            print("Saving tag sentiment:", tag_sentiment)
            tag_data = tag_sentiment.to_dict('records')
            db.batch_tag_sentiment.delete_many({})
            db.batch_tag_sentiment.insert_one({
                "timestamp": datetime.now(),
                "data": tag_data
            })
            print("✅ Saved tag sentiment analysis to MongoDB.")
        else:
            print("No tag sentiment data to save.")

        # Save combined sentiment data
        if combined_df is not None and not combined_df.empty:
            print("Saving combined sentiment data...")

            # Convert date to string to avoid BSON encoding error
            combined_df['date'] = combined_df['date'].astype(str)

            db.combined_sentiment.delete_many({})  # Clears old data
            db.combined_sentiment.insert_many(combined_df.to_dict("records"))

            print("✅ Combined sentiment data inserted into MongoDB.")
        else:
            print("No combined sentiment data to save.")

    except Exception as e:
        print(f"❌ Error saving to MongoDB: {str(e)}")


In [None]:
# Save results
save_to_mongodb()

In [None]:
## Cleanup and Close Connections

# Close MongoDB connection
if mongo_client:
    mongo_client.close()
    print("Closed MongoDB connection.")

# Stop Spark session
spark.stop()
print("Stopped Spark session.")

print("\nSpeed Layer processing completed successfully!")