In [19]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, FloatType

import nltk
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import re

# Ensure required NLTK resources are downloaded
nltk.download("vader_lexicon")
nltk.download("stopwords")
nltk.download("punkt")
nltk.download("wordnet")

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /home/ubuntu/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!
[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /home/ubuntu/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /home/ubuntu/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [14]:
spark = SparkSession.builder \
    .appName("RedditSentimentAnalysis") \
    .master("spark://192.168.2.46:7077") \
    .config("spark.dynamicAllocation.enabled", True)\
    .config("spark.shuffle.service.enabled", False)\
    .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
    .config("spark.executor.cores",2)\
    .config("spark.cores.max", 4) \
    .config("spark.driver.port",9999)\
    .config("spark.blockManager.port",10005)\
    .getOrCreate()

sc = spark.sparkContext
print("Spark Session Created")

Spark Session Created


In [15]:
df = spark.read.json("hdfs://192.168.2.46:9000/data/corpus-webis-tldr-17.json")

In [16]:
# Select only necessary columns
df = df.select("author", "normalizedBody", "subreddit")

In [18]:
# Step 1: Remove bots comments
import pandas as pd
bot_df = pd.read_csv("/home/ubuntu/botlist.csv")
bot_list = bot_df["AAbot"].dropna().unique().tolist()
print("Sample bot names:", bot_list[:10])  # Show the first 10 bot usernames
# Remove bot-generated comments based on the bot list
df_filtered = df.filter(~col("author").isin(bot_list))
df_filtered.show(5)  # Show filtered results

FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/botlist.csv'

In [70]:
#Remove Null or Empty Text
df = df.filter(col("normalizedBody").isNotNull()).filter(col("normalizedBody") != "")
df.show(5)

+----------------+--------------------+-----------+
|          author|      normalizedBody|  subreddit|
+----------------+--------------------+-----------+
|raysofdarkmatter|I think it should...|       math|
|         Stork13|Art is about the ...|      funny|
|   Cloud_dreamer|Ask me what I thi...|Borderlands|
|   NightlyReaper|In Mechwarrior On...|   gamingpc|
|  NuffZetPand0ra|You are talking a...|     Diablo|
+----------------+--------------------+-----------+
only showing top 5 rows



In [71]:
# Step 2: Text Cleaning (Remove URLs, Special Characters, Markdown)
def clean_text(text):
    if text is None:
        return ""
    text = text.lower()  # Convert to lowercase
    text = re.sub(r"http\S+|www\S+", "", text)  # Remove URLs
    text = re.sub(r"\[.*?\]|\(.*?\)", "", text)  # Remove markdown links
    text = re.sub(r"[^A-Za-z0-9.,!? ]+", "", text)  # Remove special characters
    return text.strip()

# Register UDF
clean_text_udf = udf(clean_text, StringType())

# Apply cleaning function
df = df.withColumn("clean_text", clean_text_udf(col("normalizedBody"))).drop("normalizedBody")

df.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+----------------+-----------+--------------------+
|          author|  subreddit|          clean_text|
+----------------+-----------+--------------------+
|raysofdarkmatter|       math|i think it should...|
|         Stork13|      funny|art is about the ...|
|   Cloud_dreamer|Borderlands|ask me what i thi...|
|   NightlyReaper|   gamingpc|in mechwarrior on...|
|  NuffZetPand0ra|     Diablo|you are talking a...|
+----------------+-----------+--------------------+
only showing top 5 rows



                                                                                

In [72]:
# Step 3: Remove Stopwords
stop_words = set(stopwords.words("english"))

def remove_stopwords(text):
    return " ".join([word for word in text.split() if word not in stop_words])

# Register UDF
remove_stopwords_udf = udf(remove_stopwords, StringType())

# Apply stopword removal
df = df.withColumn("clean_text", remove_stopwords_udf(col("clean_text")))

df.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+----------------+-----------+--------------------+
|          author|  subreddit|          clean_text|
+----------------+-----------+--------------------+
|raysofdarkmatter|       math|think fixed eithe...|
|         Stork13|      funny|art hardest thing...|
|   Cloud_dreamer|Borderlands|ask think wall st...|
|   NightlyReaper|   gamingpc|mechwarrior onlin...|
|  NuffZetPand0ra|     Diablo|talking charsi im...|
+----------------+-----------+--------------------+
only showing top 5 rows



                                                                                

In [75]:
# Step 4: Lemmatization
lemmatizer = WordNetLemmatizer()

def lemmatize_text(text):
    return " ".join([lemmatizer.lemmatize(word) for word in text.split()])

# Register UDF
lemmatize_udf = udf(lemmatize_text, StringType())

# Apply lemmatization
df = df.withColumn("clean_text", lemmatize_udf(col("clean_text")))


In [59]:
# Step 5: Tokenization
def tokenize_text(text):
    return " ".join(word_tokenize(text))

# Register UDF
tokenize_udf = udf(tokenize_text, StringType())

# Apply tokenization
df = df.withColumn("clean_text", tokenize_udf(col("clean_text")))

In [12]:
# Stop Spark
spark.stop()