# Understanding Climate Change Discourse on Reddit: A Distributed Analysis of Public Themes, Sentiment, and Recommendations

### Candidate numbers: 39884, 48099, 49308, 50250

## Notebook Overview

This notebook conducts topic modeling on the dataset using Latent Dirichlet Allocation (LDA) algorithm. This was the chosen topic modeling technique due to the performance and more importantly the interpretability. 

The goal is to uncover dominant themes, identify how these topics evolve across time, and visualize meaningful patterns in public discourse. The first step of this is to predict the topic each of the comments belong to. To improve human interpretability and create more sound and clean topic labels, we prompt Groq llama3-70b-8192 via an API to provide us with topic labels. The predicted topic and topic name is then saved in a column in the main dataframe.

The main dataframe `df_tf` is then saved as a parquet file in a bucket so that it can be imported for the sentiment analysis and visualisation part of this section.

## Cluster Setup and Initialization Actions

We used Google Cloud Dataproc to create a scalable cluster with the following settings:

#### Create the bucket
```gsutil mb gs://st446-gp-sm```

#### Upload the initialization script
```gsutil cp my_actions.sh gs://st446-gp-sm```

#### Create the Dataproc cluster
```gcloud dataproc clusters create st446-cluster-gp-sm \
  --enable-component-gateway \
  --public-ip-address \
  --region europe-west1 \
  --bucket=st446-gp-sm \
  --master-machine-type n2-standard-2 \
  --master-boot-disk-size 100 \
  --num-workers 2 \
  --worker-machine-type n2-standard-2 \
  --worker-boot-disk-size 200 \
  --image-version 2.2-debian12 \
  --optional-components JUPYTER \
  --metadata 'PIP_PACKAGES=sklearn nltk pandas numpy' \
  --initialization-actions 'gs://st446-gp-sm/my_actions.sh' \
  --project=capstone-data-1-wto```

In [1]:
# Import libraries used in this notebook
import zipfile
!pip install gensim
!pip install groq
import os
import re
import hashlib
from datetime import datetime
import numpy as np
import pandas as pd
import string
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
import string
from pyspark.sql import SparkSession
import pyspark.sql.functions as sql_f 
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA
from time import time
from pyspark.sql.functions import udf, col, rand, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType
from pyspark.ml.feature import StopWordsRemover, Tokenizer, CountVectorizer, IDF
from pyspark.sql.functions import lower, regexp_replace, row_number, desc
import random
from pyspark.sql.functions import rand
from gensim.corpora import Dictionary
from gensim.models.coherencemodel import CoherenceModel
from pyspark.sql.functions import year
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import groq
from pyspark.ml.classification import LogisticRegression

Collecting gensim
  Downloading gensim-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.1 kB)
Collecting smart-open>=1.8.1 (from gensim)
  Downloading smart_open-7.1.0-py3-none-any.whl.metadata (24 kB)
Collecting wrapt (from smart-open>=1.8.1->gensim)
  Downloading wrapt-1.17.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.4 kB)
Downloading gensim-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (26.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.7/26.7 MB[0m [31m156.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading smart_open-7.1.0-py3-none-any.whl (61 kB)
Downloading wrapt-1.17.2-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (83 kB)
Installing collected packages: wrapt, smart-open, gensim
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3/3[0m [gensim]2m2/3[0m [gensim]
[1A[2KSuccessful

# Data Loading Preprocessing and Cleaning

In [2]:
# Download the Kaggle dataset zip
!curl -L -o climate.zip \
    "https://www.kaggle.com/api/v1/datasets/download/pavellexyr/the-reddit-climate-change-dataset"

# Unzip it (this will extract all files, including the comments CSV)
!unzip -o climate.zip

# Remove any old copy in HDFS and put the comments file there
!hadoop fs -rm -f /the-reddit-climate-change-dataset-comments.csv
!hadoop fs -put the-reddit-climate-change-dataset-comments.csv /

# Verify upload
!hadoop fs -ls /

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 1536M  100 1536M    0     0  40.1M      0  0:00:38  0:00:38 --:--:-- 41.2M
Archive:  climate.zip
  inflating: the-reddit-climate-change-dataset-comments.csv  
  inflating: the-reddit-climate-change-dataset-posts.csv  
Found 4 items
-rw-r--r--   2 root hadoop 4111000325 2025-05-05 08:49 /the-reddit-climate-change-dataset-comments.csv
drwxrwxrwt   - hdfs hadoop          0 2025-05-05 08:43 /tmp
drwxrwxrwt   - hdfs hadoop          0 2025-05-05 08:46 /user
drwxrwxrwt   - hdfs hadoop          0 2025-05-05 08:41 /var


In [3]:
# Point to the new HDFS path (edit according to your cluster name)
comments_path = "hdfs://st446-cluster-gp-sm-m:8020/the-reddit-climate-change-dataset-comments.csv"

In [4]:
# Define the schema for the comments file
schema = StructType([
    StructField("type",           StringType(), True),
    StructField("id",             StringType(), True),
    StructField("subreddit.id",   StringType(), True),
    StructField("subreddit.name", StringType(), True),
    StructField("subreddit.nsfw", StringType(), True),
    StructField("created_utc",    StringType(), True),
    StructField("permalink",      StringType(), True),
    StructField("body",           StringType(), True),
    StructField("sentiment",      DoubleType(), True),
    StructField("score",          IntegerType(),True)
])

df = spark.read \
    .option("header", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .schema(schema) \
    .csv(comments_path)

df = df.repartition(8)
df.printSchema()
df.show(5)

root
 |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- subreddit.id: string (nullable = true)
 |-- subreddit.name: string (nullable = true)
 |-- subreddit.nsfw: string (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- body: string (nullable = true)
 |-- sentiment: double (nullable = true)
 |-- score: integer (nullable = true)



                                                                                

+-------+-------+------------+-----------------+--------------+-----------+--------------------+--------------------+---------+-----+
|   type|     id|subreddit.id|   subreddit.name|subreddit.nsfw|created_utc|           permalink|                body|sentiment|score|
+-------+-------+------------+-----------------+--------------+-----------+--------------------+--------------------+---------+-----+
|comment|hv6kbym|       2qh4r|       conspiracy|         false| 1643748232|https://old.reddi...|What they’ll prob...|    -0.34|    2|
|comment|dz5bszu|       2qh3l|             news|         false| 1526585977|https://old.reddi...|I think climate c...|   0.8113|    1|
|comment|fm20ywf|       2qhsa|interestingasfuck|         false| 1585688617|https://old.reddi...|You're right that...|   0.9311|    2|
|comment|h10axzg|       vkedk|         themotte|         false| 1623138758|https://old.reddi...|Epistemic status:...|  -0.9927|   53|
|comment|f29z91o|       2qh1s|        economics|         false

In [5]:
custom_stopwords = set([
    "lt", "gt", "ref", "quot", "cite", "br", "amp", "https", "http", "urlhttps", "urlhttp", 
    "file", "image", "jpg", "png", "gif", "svg", "thumb", "px", "category", "url", "external", 
    "link", "source", "web", "cite", "reference", "reflist", "main", "article", "seealso", 
    "further", "infobox", "template", "navbox", "redirect", "harvnb", "isbn", "doi", "pmid", 
    "ssrn", "jstor", "bibcode", "arxiv", "ol", "hdl", "wikidata", "wiki", "math", "sup", "sub", 
    "nbsp", "equation", "displaystyle", "begin", "end", "left", "right", "sqrt", "frac", "sum", 
    "prod", "int", "lim", "rightarrow", "infty", "alpha", "beta", "gamma", "delta", "epsilon", 
    "zeta", "eta", "theta", "iota", "kappa", "lambda", "mu", "nu", "xi", "omicron", "pi", "rho", 
    "sigma", "tau", "upsilon", "phi", "chi", "psi", "omega", "mathrm", "mathbb", "mathcal", 
    "mathbf", "cdots", "ldots", "vdots", "ddots", "forall", "exists", "in", "ni", "subset", 
    "subseteq", "supset", "supseteq", "emptyset", "cap", "cup", "setminus", "not", "times", 
    "div", "cdot", "pm", "mp", "oplus", "otimes", "odot", "leq", "geq", "neq", "approx", 
    "aligncenter", "fontsize", "alignright", "alignleft", "textalign", "bold", "italic", 
    "underline", "strikethrough", "lineheight", "padding", "margin", "width", "height", "float", 
    "clear", "border", "background", "color", "font", "family", "size", "weight", "style", 
    "decoration", "verticalalign", "textindent", "pre-wrap", "nowrap", "valign", "bgcolor", 
    "style", "class", "id", "width", "height", "align", "border", "cellpadding", "cellspacing", 
    "colspan", "rowspan", "nowrap", "target", "rel", "hreflang", "title", "alt", "src", "dir", 
    "lang", "type", "name", "value", "readonly", "multiple", "onclick", "onmousedown", 
    "onmouseup", "onmouseover", "onmouseout", "onload", "onunload", "onsubmit", "onreset", 
    "onfocus", "onblur", "onkeydown", "onkeyup", "onkeypress", "onerror", "infobox", "caption", 
    "cite", "dmy", "mdy", "date", "archive", "www", "com", "org", "access", "ndash", "sfn", "dts", "vauthors", 
    "mvar", "ipaslink", "ipa", "iii", "ibn", "first", "last", "also", "html", "use", "publisher", "year", "one", 
    "page", "new", "trek", "ipablink", "similar", "usual", "two", "abbr", "used", "est", "ibm", "first1",
    "first2", "last1", "last2", "pdf", "der", "ted", "get", "even", "isn", "going", "like", "people", "even",
    "still", "doesn", "much", "make", "many", "made", "don", "did"
])

In [6]:
# Draw a 20% sample of the raw comments (due to memory constraints)
sample_df = df.sample(withReplacement=False, fraction=0.20, seed=42)
print(f"Sample size: {sample_df.count()} comments")

# Clean text on the sample, remove symbols, links and make everything lower case
df_clean = (
    sample_df
      .withColumn("body_clean", lower(col("body")))
      .withColumn("body_clean", regexp_replace("body_clean", r"http\S+", ""))  
      .withColumn("body_clean", regexp_replace("body_clean", r"[^a-z\s]", " "))  
      .withColumn("body_clean", regexp_replace("body_clean", r"\s+", " "))     
)

# Tokenize
tokenizer = Tokenizer(inputCol="body_clean", outputCol="tokens")
df_tokens = tokenizer.transform(df_clean)

# Remove stop‐words (based on general and custom list)
default_stops = StopWordsRemover.loadDefaultStopWords("english")
combined_stops = list(set(default_stops) | custom_stopwords)

remover = StopWordsRemover(
    inputCol="tokens",
    outputCol="filtered",
    stopWords=combined_stops
)
df_no_stop = remover.transform(df_tokens)

# Filter out very short tokens (words of 1 or 2 characters like 'a' and 'is')
min_len = udf(lambda toks: [t for t in toks if len(t) > 2], ArrayType(StringType()))
df_final = df_no_stop.withColumn("final_tokens", min_len(col("filtered")))

df_final.select("body_clean", "final_tokens").show(5)

                                                                                

Sample size: 920435 comments


[Stage 11:>                                                         (0 + 1) / 1]

+--------------------+--------------------+
|          body_clean|        final_tokens|
+--------------------+--------------------+
|warren is fine an...|[warren, fine, ec...|
| gt just not enth...|[enthusiastic, cl...|
|first of all we n...|[need, rid, prete...|
| gt each one of y...|[points, clearly,...|
| gt debating cont...|[debating, conten...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [7]:
# Vectorize on the sample to get the features
cv = CountVectorizer(
    inputCol="final_tokens",
    outputCol="rawFeatures",
    minDF=50,
    maxDF=0.8
)
cv_model = cv.fit(df_final)
df_tf = cv_model.transform(df_final)

                                                                                

In [8]:
df_tf.select("final_tokens", "rawFeatures").show(5)

vocab_sample = cv_model.vocabulary
print(f"Sample vocab size: {len(vocab_sample)}")
print("First 20 sample-vocab entries:", vocab_sample[:20])

                                                                                

+--------------------+--------------------+
|        final_tokens|         rawFeatures|
+--------------------+--------------------+
|[warren, fine, ec...|(26883,[2,5,11,18...|
|[enthusiastic, cl...|(26883,[11,20,25,...|
|[need, rid, prete...|(26883,[4,6,7,12,...|
|[points, clearly,...|(26883,[1,7,10,22...|
|[debating, conten...|(26883,[12,39,48,...|
+--------------------+--------------------+
only showing top 5 rows

Sample vocab size: 26883
First 20 sample-vocab entries: ['think', 'world', 'years', 'time', 'know', 'way', 'need', 'want', 'global', 'things', 'really', 'see', 'say', 'good', 'well', 'science', 'believe', 'point', 'trump', 'actually']


# Topic Modelling

## LDA

We use PySpark's MLlib `LDA` class to identify latent topics. The dataset is tokenized, stopwords are removed, and a CountVectorizer is applied to prepare term-frequency vectors suitable for LDA input. The model is trained with a 10 topics.

In [9]:
# Train LDA
k = 10
max_iters = 20

lda = LDA(
    k=k,
    maxIter=max_iters,
    featuresCol="rawFeatures",
    seed=123,          
)

# Fit on vectorized data
lda_model = lda.fit(df_tf)

                                                                                

In [10]:
# Describe the top 6 terms for each of the k=10 topics
topics = lda_model.describeTopics(10)
topics.show(truncate=False)

# Retrieve vocabulary from the CountVectorizer
vocab = cv_model.vocabulary

# Map each topic to its actual words and weights
topic_words = topics.rdd.map(
    lambda row: (
        row['topic'],
        [vocab[i] for i in row['termIndices']],
        row['termWeights']
    )
).collect()

print("\nIdentified Topics:")
for topic, words, weights in topic_words:
    print(f"Topic {topic}:")
    for w, wt in zip(words, weights):
        print(f"  {w}: {wt:.4f}")
    print()

+-----+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topic|termIndices                                    |termWeights                                                                                                                                                                                                                       |
+-----+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |[188, 249, 20, 18, 27, 463, 108, 436, 22, 201] |[0.007001323950551166, 0.005934168170880602, 0.0051651659933804415, 0.004944025210571986, 0.0046




Identified Topics:
Topic 0:
  bernie: 0.0070
  national: 0.0059
  said: 0.0052
  trump: 0.0049
  government: 0.0046
  bot: 0.0046
  china: 0.0041
  security: 0.0041
  energy: 0.0035
  states: 0.0034

Topic 1:
  trump: 0.0109
  think: 0.0047
  white: 0.0035
  years: 0.0032
  said: 0.0031
  world: 0.0030
  believe: 0.0030
  time: 0.0028
  anti: 0.0028
  know: 0.0027

Topic 2:
  science: 0.0097
  know: 0.0072
  think: 0.0063
  real: 0.0051
  scientists: 0.0045
  things: 0.0044
  world: 0.0043
  way: 0.0042
  say: 0.0041
  scientific: 0.0039

Topic 3:
  warming: 0.0103
  global: 0.0100
  years: 0.0081
  earth: 0.0072
  ice: 0.0063
  temperature: 0.0058
  data: 0.0047
  carbon: 0.0045
  time: 0.0042
  emissions: 0.0038

Topic 4:
  time: 0.0052
  world: 0.0040
  really: 0.0040
  think: 0.0036
  good: 0.0034
  way: 0.0034
  things: 0.0033
  well: 0.0032
  work: 0.0032
  lot: 0.0030

Topic 5:
  think: 0.0062
  party: 0.0054
  want: 0.0045
  government: 0.0040
  vote: 0.0040
  political: 0.004

                                                                                

## LLM (Groq llama3-70b-8192) representation model
To improve human interpretability and create more sound and clean topic labels, we prompt Groq llama3-70b-8192 via an API to provide us with topic labels.

In [11]:
keywords_per_topic = {topic: words for topic, words, _ in topic_words}

In [12]:
import groq

# Set Groq API Key
GROQ_API_KEY = "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"
client = groq.Groq(api_key=GROQ_API_KEY)

# Define prompt template
prompt_template = """
I have a topic described by the following keywords:
{keywords}

Based on these keywords, generate a short and descriptive topic label of at most 5 words.
Make sure the output follows this format:
topic: <topic label>
"""

In [13]:
# Label function
def get_groq_label(keywords):
    prompt = prompt_template.format(keywords=", ".join(keywords))
    response = client.chat.completions.create(
        model="llama3-70b-8192",
        messages=[
            {"role": "system", "content": "You are a helpful assistant."},
            {"role": "user", "content": prompt}
        ]
    )
    raw_text = response.choices[0].message.content.strip()
    match = re.search(r"topic:\s*(.+)", raw_text, re.IGNORECASE)
    return match.group(1).strip() if match else raw_text

In [14]:
lda_topic_labels = {}

print("=== GROQ Labeled Topics ===")
for topic_id, words, _ in topic_words:
    label = get_groq_label(words[:10])
    lda_topic_labels[topic_id] = label
    print(f"Topic {topic_id}: {label}")
    print("Keywords:", ", ".join(words[:10]))
    print()

=== GROQ Labeled Topics ===
Topic 0: US-China National Security
Keywords: bernie, national, said, trump, government, bot, china, security, energy, states

Topic 1: Trump's Anti-World Beliefs
Keywords: trump, think, white, years, said, world, believe, time, anti, know

Topic 2: How Scientists View the World
Keywords: science, know, think, real, scientists, things, world, way, say, scientific

Topic 3: Global Warming Trends
Keywords: warming, global, years, earth, ice, temperature, data, carbon, time, emissions

Topic 4: Thinking the World Works
Keywords: time, world, really, think, good, way, things, well, work, lot

Topic 5: Expressing Political Party Views
Keywords: think, party, want, government, vote, political, things, trump, say, way

Topic 6: Understanding the World's Reality
Keywords: think, world, believe, know, years, really, way, time, things, say

Topic 7: Biden's Nuclear Energy Shift
Keywords: nuclear, biden, energy, power, solar, wind, coal, joe, plants, fossil

Topic 8: G

## Comment on LDA Topics and LLM Labeling

As can be seen above the LDA model has effectively grouped thematically consistent comments, and the LLM-based labels give human-readable topics to improve interpretability.

Notably:
- **Topics 0, 1, and 7** are clearly tied to political and energy policy debates (e.g., US-China security, Trump’s worldview, Biden’s nuclear stance).
- **Topics 3 and 8** align with global environmental concerns (climate change, water scarcity).
- **Topics 2, 4, and 6** reflect philosophical discussion around science and human understanding of the world.
- **Topic 5** captures political party alignment discussions.

## Predict topic for each document

In [15]:
# To predict for each comment in the dataframe
df_tf = lda_model.transform(df_tf)

# UDF to extract dominant topic
def dominant_topic(topicDist):
    return int(topicDist.argmax())

dominant_topic_udf = udf(dominant_topic, IntegerType())

df_tf = df_tf.withColumn("predictedTopic", dominant_topic_udf("topicDistribution"))

# topic: (words, weights) dictionary
topics50 = lda_model.describeTopics(50)
vocab = cv_model.vocabulary

topic_full = topics50.rdd.map(lambda row: (
    row['topic'],
    [vocab[i] for i in row['termIndices']],
    row['termWeights']
)).collect()

topic_full_dict = {t: (words, weights) for t, words, weights in topic_full}
topic_full_b = sc.broadcast(topic_full_dict)

# UDF to compute top words per comment
def doc_top_words(td, raw, num=10):
    dom = int(td.argmax())
    if dom not in topic_full_b.value:
        return []
    cand_words, cand_wts = topic_full_b.value[dom]

    counts = {i: v for i, v in zip(raw.indices, raw.values)} if raw else {}

    dom_prob = float(td[dom])
    scores = {}
    for w, wt in zip(cand_words, cand_wts):
        try:
            idx = vocab.index(w)
            freq = counts.get(idx, 0)
        except ValueError:
            freq = 0
        noise = 1 + 0.05 * random.random()
        scores[w] = freq * wt * dom_prob * noise

    sorted_ = sorted(scores.items(), key=lambda x: x[1], reverse=True)
    if all(s == 0 for _, s in sorted_):
        return random.sample(cand_words, min(num, len(cand_words)))
    return [w for w, _ in sorted_[:num]]

doc_top_udf = udf(doc_top_words, ArrayType(StringType()))

# Obtain top Words in document
df_tf = df_tf.withColumn("docTopWords", doc_top_udf("topicDistribution", "rawFeatures"))

In [16]:
topic_label_b = sc.broadcast(lda_topic_labels)

# UDF to get the topic label using predictedTopic
def map_topic_label(topic_id):
    return topic_label_b.value.get(topic_id, "Unknown")

topic_label_udf = udf(map_topic_label, StringType())

df_tf = df_tf.withColumn("PredictedTopicName", topic_label_udf("predictedTopic"))

In [18]:
result_lda = df_tf.selectExpr(
    "body_clean",
    "`subreddit.name` AS subreddit",
    "predictedTopic",
    "docTopWords",
    "PredictedTopicName"
).orderBy(rand()).limit(15)

result_lda.show()

25/05/05 09:14:47 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB

+--------------------+--------------+--------------+--------------------+--------------------+
|          body_clean|     subreddit|predictedTopic|         docTopWords|  PredictedTopicName|
+--------------------+--------------+--------------+--------------------+--------------------+
|both and brics ma...|    conspiracy|             0|[china, global, p...|US-China National...|
|personally i thin...|         meirl|             8|[water, world, th...|Global Water War ...|
|i m a geologist b...|      politics|             3|[earth, think, st...|Global Warming Tr...|
|it gets trickier ...|   libertarian|             5|[party, every, th...|Expressing Politi...|
|i think it is tot...|    truereddit|             2|[think, believe, ...|How Scientists Vi...|
|are you really no...|     dankmemes|             2|[really, well, sh...|How Scientists Vi...|
|looks like all th...| unitedkingdom|             4|[good, thing, day...|Thinking the Worl...|
|i think the key w...|onguardforthee|             



# Saving df_tf

In [19]:
# Define your output path (change bucket name accordingly)
output_path = "gs://st446-gp-sm/processed_data/df_tf_after_topics"

# Save DataFrame as Parquet
df_tf.write.mode("overwrite").parquet(output_path)

25/05/05 09:22:33 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
                                                                                