# Analysis of Reddit Comments on Climate Change
This notebook analyzes Reddit comments on climate change. Our team's goal is to: ...

SENG 550 Final Project
- Monmoy Maahdie
- Smitkumar Saraiya
- Farhan Ali
- Kai Ferrer

## 1. Import Libraries

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
# import pyspark
from collections import Counter
from pyspark.sql.functions import col, length, regexp_replace, udf, split, explode
from pyspark.sql.types import DoubleType, StringType, IntegerType
import spacy

In [None]:

from pyspark.ml.feature import Tokenizer, HashingTF, VectorAssembler, StopWordsRemover
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import DataFrame



## 2. Create Spark Session

In [3]:
# Initialize spark session
spark = SparkSession.builder \
    .appName("Reddit Climate Change Comments") \
    .config("spark.executor.memory", "4g")  \
    .config("spark.driver.memory", "4g")  \
    .config("spark.executor.cores", "4") \
    .config("spark.num.executors", "4")  \
    .config("spark.sql.shuffle.partitions", "200")  \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/19 15:11:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 3. Load Data

In [4]:
nlp = spacy.load('en_core_web_sm')
stopwords = nlp.Defaults.stop_words
len(stopwords)

326

In [5]:
nlp.Defaults.stop_words -= {"no", "not"}

In [6]:
# Create dataframe
df = spark.read.csv("the-reddit-climate-change-dataset-comments.csv", header=True, inferSchema=True)
df = df.repartition(100)  #  increase the number of partitions for large datasets - idk what to put 
# df.show(5, truncate=False) # checking the dataset by displaying first 5 rows
df_original = df # save original dataset

                                                                                

In [7]:
new_columns = [col_name.replace('.', '_') for col_name in df.columns]
df = df.toDF(*new_columns)
# df.show(5, truncate=False)

In [8]:
df_clean = df.dropna()
# df_clean.show(n=5, truncate=False)

In [9]:
df_clean = df_clean.drop("permalink")

# Separate records where sentiment and score can be cast to numbers
clean_df = df_clean.filter(
    col("sentiment").cast(DoubleType()).isNotNull() &
    col("score").cast(DoubleType()).isNotNull()
)

# Records where either sentiment or score contain non-numerical values
problematic_df = df_clean.filter(
    col("sentiment").cast(DoubleType()).isNull() |
    col("score").cast(DoubleType()).isNull()
)




In [10]:
# clean_df.filter((df_clean["type"] == "comment") & (df_clean["subreddit_name"] == "technology")).show(n=100, truncate=False) | YOU CAN MIDIFY THE SUBREDDIT NAME TO SEE CLIMATE CHANGE DISCUSSIONS ON DIFFERENT SUBREDDITS
# clean_df.show(n=10, truncate=False) # we want this data

In [11]:
# print(clean_df.count())

In [12]:
# problematic_df.filter(df_clean["type"] == "comment").show(n=5, truncate=False) # this we do not need

In [13]:
clean_df_1 = clean_df.filter(col("type") == "comment") # only comments exist in here
# clean_df_1.show(n=10, truncate=False)

In [14]:
# Load spacy model
nlp = spacy.load('en_core_web_sm')

# Define the preprocessing function
def preprocess(comment):
    doc = nlp(comment)
    processed_words_list = []
    for token in doc:
        if not token.is_punct and not token.like_url and not token.is_stop:
            processed_words_list.append(token.lemma_.strip().lower())
    return ' '.join(processed_words_list)

    
# Create a UDF from the function
preprocess_udf = udf(preprocess, StringType())


In [15]:

# Apply the UDF to create a new column
clean_df_2 = clean_df_1.withColumn('processed_body', preprocess_udf(col('body')))
clean_df_2.show(n=10, truncate=False)

[Stage 4:>                                                          (0 + 1) / 1]

+-------+-------+------------+--------------------+--------------+-----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [16]:
def create_label(sentiment):
    if float(sentiment) < -0.05:
        return -1
    elif float(sentiment) >= -0.05 and float(sentiment) <= 0.05:
        return 0
    else:
        return 1

create_label_udf = udf(create_label, IntegerType())



In [17]:
clean_df_3 = clean_df_2.withColumn('label', create_label_udf(col('sentiment')))
# Count occurrences of each subreddit_name and order by count in descending order
subreddit_counts = clean_df_3.groupBy('subreddit_name') \
    .count() \
    .orderBy('count', ascending=False)

# Show the results



In [18]:
# print("Most common subreddits:")
# subreddit_counts.show(10, truncate=False)

In [19]:
# # If you need the total number of unique subreddits
# unique_subreddits = subreddit_counts.count()
# print(f"\nTotal number of unique subreddits: {unique_subreddits}")

In [20]:
# subreddit_counts.filter(subreddit_counts['subreddit_name'] == 'climate').show()
clean_df_3 = clean_df_3.withColumn("sentiment", col("sentiment").cast(DoubleType()))


In [21]:

# print("At tokenizer")
# # Tokenize comment text
# tokenizer = Tokenizer(inputCol="processed_body", outputCol="words")

# # Transform words into numerical features
# print("At hashingTF")
# hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=10000)

# # Define the model
# print("At linear regression")
# lr = LinearRegression(featuresCol="features", labelCol="sentiment")

# # Create a pipeline
# print("At pipeline")
# pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# # Split the data
# print("Data split")
# (train_data, test_data) = clean_df_3.randomSplit([0.01, 0.01])

# # Train the model
# print("model traiing")
# model = pipeline.fit(train_data)

# # Make predictions
# print("Predictions")
# predictions = model.transform(test_data)

# # Evaluate the model
# print("Mordel eval")
# evaluator = RegressionEvaluator(labelCol="sentiment", predictionCol="prediction", metricName="rmse")
# rmse = evaluator.evaluate(predictions)
# print("Root Mean Squared Error (RMSE) on test data =", rmse)


In [22]:
clean_df_3.show(3)
clean_df_3.persist()

[Stage 7:>                                                          (0 + 1) / 1]

+-------+-------+------------+--------------------+--------------+-----------+--------------------+---------+-----+--------------------+-----+
|   type|     id|subreddit_id|      subreddit_name|subreddit_nsfw|created_utc|                body|sentiment|score|      processed_body|label|
+-------+-------+------------+--------------------+--------------+-----------+--------------------+---------+-----+--------------------+-----+
|comment|ilzq6wi|      2cn1kk|confidentlyincorrect|         false| 1661601318|Climate change is...|  -0.9023|    3|climate change ci...|   -1|
|comment|ijh601l|       2qh4r|          conspiracy|         false| 1659984481|As oil production...|   0.9674|    0|oil production de...|    1|
|comment|ij23fxm|       2r73k|     politicaldebate|         false| 1659709788|Mc Donald's is a ...|  -0.3612|    1|mc donald fast fo...|   -1|
+-------+-------+------------+--------------------+--------------+-----------+--------------------+---------+-----+--------------------+-----+

                                                                                

DataFrame[type: string, id: string, subreddit_id: string, subreddit_name: string, subreddit_nsfw: string, created_utc: string, body: string, sentiment: double, score: string, processed_body: string, label: int]

In [23]:
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728")  # 128MB per partition


In [24]:
# Coalesce the DataFrame to a smaller number of partitions
clean_df_3 = clean_df_3.coalesce(10) 


In [25]:
clean_df_3.persist()

DataFrame[type: string, id: string, subreddit_id: string, subreddit_name: string, subreddit_nsfw: string, created_utc: string, body: string, sentiment: double, score: string, processed_body: string, label: int]

In [None]:
clean_df_3.write.saveAsTable("cleaned")


ERROR:root:KeyboardInterrupt while sending command.              (0 + 10) / 100]
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 



In [None]:
spark.sql("SELECT * FROM cleaned").show(3)


## Spark Warehouse

just using whatever Kai put down for spark warehousing 

In [None]:
spark.sql("CREATE DATABASE reddit_db") 

In [None]:
spark.sql("SHOW DATABASES").show() # check that reddit_db is in here

In [None]:
spark.sql("SHOW TABLES").show() # should be empty tables

In [None]:
spark.sql("DROP TABLE IF EXISTS reddit_db.comments")

In [None]:
spark.sql("""
CREATE TABLE IF NOT EXISTS reddit_db.comments (
    `type` STRING,
    `id` STRING,
    `subreddit_id` STRING,
    `subreddit_name` STRING,
    `subreddit_nsfw` STRING,
    `created_utc` STRING,
    `body` STRING,
    `sentiment` STRING,
    `score` STRING,
    `processed_body` STRING,
    `label` STRING
)
USING PARQUET
""")

In [None]:
spark.sql("USE reddit_db") # ensure youre on the right db
spark.sql("SHOW TABLES").show() # should be updated to have one table now

In [None]:
clean_df_3.show(5, truncate=False)

In [None]:
# align the columns - spark only accepts '_' but the dataset uses '.'
df_aligned = clean_df_3

In [None]:
df_aligned.printSchema() # double check

In [None]:
spark.sql("USE reddit_db") # ensure youre on the right db
spark.sql("SHOW TABLES").show()

In [None]:
df_aligned.show(n=5, truncate=False)

In [None]:
num_rows = df_aligned.count()
print(f"Number of rows: {num_rows}")


In [None]:
# Because we have a large dataset of 1mil+ entries, start by repartitioning the data
#df_aligned_repartitioned = df_aligned.repartition(20)  # The number can vary based on your machine, but we recommend keeping it from 20-30

In [None]:
# partition_sizes = df_aligned.rdd.glom().map(len).collect()
# print(partition_sizes)


In [None]:
df_coalesced = df_aligned.coalesce(10)


In [None]:
# Split DataFrame into smaller batches and insert into the table
df_coalesced = df_aligned.coalesce(10)  # Reduce to fewer partitions
batch_size = df_coalesced.count() // 10  # Divide into 10 batches

for i in range(10):
    df_batch = df_coalesced.limit(batch_size)
    df_batch.write.insertInto("reddit_db.comments", overwrite=False)


In [None]:
df_coalesced.createOrReplaceTempView("temp_df")  # Create a temporary view of the DataFrame

# Use Spark SQL to insert into the table
spark.sql("""
    INSERT OVERWRITE TABLE reddit_db.comments
    SELECT * FROM temp_df
""")



In [None]:
#df_coalesced.write.insertInto("reddit_db.comments", overwrite=True)
 # insert data from csv/df into spark table
#df_coalesced.write.mode("overwrite").parquet("/spark-warehouse/reddit_db.db/comments")


In [None]:
spark.sql("SELECT * FROM reddit_db.comments LIMIT 100").show() #validate the table

In [None]:
df_tokens = df_aligned.withColumn("words", split(col("body"), r"\s+"))
df_tokens = df_tokens.filter(df_tokens["words"].isNotNull())
df_tokens.show(5) #check if words column created

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")

In [None]:
df_aligned_words = remover.transform(df_tokens)
df_aligned_words.show(100) 

In [None]:
# explode - helps so that each word appears in a separate row so we can count frequency
df_exploded = df_aligned_words.withColumn("word", explode(col("filtered_words")))
df_exploded.show(100)

In [None]:
df_word_count = df_exploded.groupBy("word").count().orderBy("count", ascending=False)
df_word_count.show(10)

In [None]:
spark.stop()