In [2]:
import time
from pathlib import Path
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, LongType, IntegerType, DoubleType, BooleanType, StructType, StructField, TimestampType
import os
import logging
import warnings

import urllib.request
import zipfile

# Suppress Python warnings
warnings.filterwarnings("ignore")

# Suppress Java/Spark logging
logging.getLogger("py4j").setLevel(logging.ERROR)
logging.getLogger("pyspark").setLevel(logging.ERROR)
logging.getLogger("org.apache.spark").setLevel(logging.ERROR)
logging.getLogger("org.apache.hadoop").setLevel(logging.ERROR)



# Set JAVA_HOME to the Homebrew OpenJDK installation
os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

# Java 17+ compatibility flags for Spark
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--conf spark.driver.extraJavaOptions='--add-opens java.base/javax.security.auth=ALL-UNNAMED "
    "--add-opens java.base/java.lang=ALL-UNNAMED "
    "--add-opens java.base/java.lang.invoke=ALL-UNNAMED "
    "--add-opens java.base/java.lang.reflect=ALL-UNNAMED "
    "--add-opens java.base/java.io=ALL-UNNAMED "
    "--add-opens java.base/java.net=ALL-UNNAMED "
    "--add-opens java.base/java.nio=ALL-UNNAMED "
    "--add-opens java.base/java.util=ALL-UNNAMED "
    "--add-opens java.base/java.util.concurrent=ALL-UNNAMED "
    "--add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED "
    "--add-opens java.base/sun.nio.ch=ALL-UNNAMED "
    "--add-opens java.base/sun.nio.cs=ALL-UNNAMED "
    "--add-opens java.base/sun.security.action=ALL-UNNAMED "
    "--add-opens java.base/sun.util.calendar=ALL-UNNAMED ' "
    "--conf spark.executor.extraJavaOptions='--add-opens java.base/javax.security.auth=ALL-UNNAMED "
    "--add-opens java.base/java.lang=ALL-UNNAMED "
    "--add-opens java.base/java.lang.invoke=ALL-UNNAMED "
    "--add-opens java.base/java.lang.reflect=ALL-UNNAMED "
    "--add-opens java.base/java.io=ALL-UNNAMED "
    "--add-opens java.base/java.net=ALL-UNNAMED "
    "--add-opens java.base/java.nio=ALL-UNNAMED "
    "--add-opens java.base/java.util=ALL-UNNAMED "
    "--add-opens java.base/java.util.concurrent=ALL-UNNAMED "
    "--add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED "
    "--add-opens java.base/sun.nio.ch=ALL-UNNAMED "
    "--add-opens java.base/sun.nio.cs=ALL-UNNAMED "
    "--add-opens java.base/sun.security.action=ALL-UNNAMED "
    "--add-opens java.base/sun.util.calendar=ALL-UNNAMED ' pyspark-shell"
)


In [3]:
# Verify Java
os.system("java -version")

openjdk version "17.0.17" 2025-10-21
OpenJDK Runtime Environment Homebrew (build 17.0.17+0)
OpenJDK 64-Bit Server VM Homebrew (build 17.0.17+0, mixed mode, sharing)


0

In [4]:
TARGET_DATE = "2025-09-10"

print("Initializing local Spark session...")
spark = SparkSession.builder \
    .appName("RedditEchoChamberLocal") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()
sc = spark.sparkContext
print(f"Spark session initialized. Running Spark version {sc.version}")

# Also suppress Spark context logs
sc.setLogLevel("ERROR")

Initializing local Spark session...


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/05 23:02:18 WARN Utils: Your hostname, Akashs-MacBook-Air-9.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.224 instead (on interface en0)
25/11/05 23:02:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/05 23:02:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session initialized. Running Spark version 4.0.1


In [7]:
print(f"\n--- Loading and filtering Reddit data for {TARGET_DATE} ---")
data_dir = Path("./data_cleaned")
posts_path = str(data_dir / "*_posts.csv")
posts_df_string = spark.read.csv(posts_path, header=True, inferSchema=False)
posts_transformed_df = posts_df_string.withColumn(
    "timestamp", F.to_timestamp(F.from_unixtime(F.expr("try_cast(created_utc AS LONG)")))
).withColumn(
    "score", F.expr("try_cast(score AS INT)")
).withColumn(
    "num_comments", F.expr("try_cast(num_comments AS INT)")
)
reddit_with_date = posts_transformed_df.withColumn("date", F.to_date(F.col("timestamp")))
reddit_single_day_df = reddit_with_date.filter(F.col("date") == F.to_date(F.lit(TARGET_DATE)))
reddit_single_day_df.cache()

reddit_count = reddit_single_day_df.count()
if reddit_count == 0:
    print(f"Warning: No Reddit posts found for {TARGET_DATE}. The analysis will be empty.")
else:
    print(f"Found {reddit_count:,} Reddit posts for {TARGET_DATE}.")


--- Loading and filtering Reddit data for 2025-09-10 ---
Found 2,857 Reddit posts for 2025-09-10.


In [13]:
print(f"\n--- Acquiring, loading, and correctly parsing GDELT data for {TARGET_DATE} ---")
# (Download logic remains the same)
gdelt_date_str = TARGET_DATE.replace("-", "")
gdelt_dir = Path("./gdelt_data")
gdelt_zip_filename = f"{gdelt_date_str}.export.CSV.zip"
gdelt_csv_filename = f"{gdelt_date_str}.export.CSV"
gdelt_zip_path = gdelt_dir / gdelt_zip_filename
gdelt_csv_path = gdelt_dir / gdelt_csv_filename
gdelt_url = f"http://data.gdeltproject.org/events/{gdelt_zip_filename}"
os.makedirs(gdelt_dir, exist_ok=True)
if not gdelt_csv_path.exists():
    try:
        urllib.request.urlretrieve(gdelt_url, gdelt_zip_path)
        with zipfile.ZipFile(gdelt_zip_path, 'r') as zip_ref:
            zip_ref.extractall(gdelt_dir)
        os.remove(gdelt_zip_path)
    except Exception as e:
        print(f"❌ Failed to download GDELT data: {e}")

gdelt_schema = StructType([
    StructField("GLOBALEVENTID", StringType(), True), StructField("SQLDATE", StringType(), True),
    StructField("Actor1Name", StringType(), True), StructField("Actor2Name", StringType(), True),
    StructField("EventCode", StringType(), True), StructField("GoldsteinScale", DoubleType(), True),
    StructField("NumMentions", StringType(), True), StructField("NumSources", StringType(), True),
    StructField("NumArticles", StringType(), True), StructField("AvgTone", DoubleType(), True),
    StructField("SOURCEURL", StringType(), True)
])

gdelt_sample_df = None
if gdelt_csv_path.exists() and reddit_count > 0:
    # --- THE KEY FIX: Read without a schema and select by index ---
    gdelt_raw_df = spark.read.csv(str(gdelt_csv_path), sep='\\t', inferSchema=False)
    
    # GDELT column indices (0-based) from the official documentation
    # SQLDATE=1, Actor1Name=5, EventCode=26, GoldsteinScale=30, AvgTone=34
    gdelt_df = gdelt_raw_df.select(
        F.col("_c1").alias("SQLDATE"),
        F.col("_c5").alias("Actor1Name"),
        F.col("_c26").alias("EventCode"),
        F.col("_c30").cast("double").alias("GoldsteinScale"),
        F.col("_c34").cast("double").alias("AvgTone")
    )
    
    gdelt_sample_df = gdelt_df.limit(reddit_count)
    gdelt_sample_df.cache()
    print(f"GDELT data loaded, parsed, and sampled. Total events in sample: {gdelt_sample_df.count():,}")
else:
    print("GDELT data not loaded or no Reddit data.")


--- Acquiring, loading, and correctly parsing GDELT data for 2025-09-10 ---
GDELT data loaded, parsed, and sampled. Total events in sample: 2,857


In [15]:
if gdelt_sample_df and reddit_single_day_df.count() > 0:
    print(f"\n--- Deriving Comparative Insights for {TARGET_DATE} ---")
    
    # Insight 1: (No changes needed, the result is already insightful)
    print("\nInsight 1: Overall Sentiment Comparison (News vs. Reddit)")
    gdelt_avg_sentiment = gdelt_sample_df.agg(F.avg("AvgTone").alias("avg_news_sentiment")).first()['avg_news_sentiment']
    reddit_avg_score = reddit_single_day_df.agg(F.avg("score").alias("avg_reddit_score")).first()['avg_reddit_score']
    print(f"  - Average GDELT News Sentiment (AvgTone): {gdelt_avg_sentiment:.2f}")
    print(f"  - Average Reddit Post Score: {reddit_avg_score:.2f}")

    # Insight 2: Find Mentions of Top GDELT Actors in Reddit Titles (with case-insensitivity)
    print(f"\nInsight 2: Mentions of Top GDELT Actors in Reddit Titles")
    generic_actors = ["PROTESTER", "GOVERNMENT", "POLICE", "REBEL", "MILITARY", "CITIZEN", "MEDIA", "OPPOSITION", "GOV", "JUD"]
    top_actors = gdelt_sample_df.filter(
        F.col("Actor1Name").isNotNull() & (~F.col("Actor1Name").isin(generic_actors))
    ).groupBy("Actor1Name").count().orderBy(F.col("count").desc()).limit(5)
    top_actors_list = [row.Actor1Name for row in top_actors.collect()]
    
    if not top_actors_list:
        print("No significant non-generic actors found in the GDELT sample.")
    else:
        print(f"Top 5 Filtered GDELT Actors in sample: {top_actors_list}")
        mention_counts = []
        for actor in top_actors_list:
            # THE FIX: Use F.lower() for a case-insensitive search
            count = reddit_single_day_df.filter(F.locate(actor.lower(), F.lower(F.col("title"))) > 0).count()
            mention_counts.append((actor, count))
        print("Mentions of these actors in Reddit titles:")
        for actor, count in mention_counts:
            print(f"  - '{actor}': {count} mentions")

    # Insight 3: Compare Thematic Focus (with robust stop word removal)
    print(f"\nInsight 3: Thematic Focus Comparison (GDELT Events vs. Reddit Keywords)")
    print("  - Top 10 GDELT Event Types:")
    gdelt_sample_df.groupBy("EventCode").count().orderBy(F.col("count").desc()).show(10, truncate=False)
    
    # THE FIX: Use a standard list of stop words for a more reliable filter
    print("  - Top 10 Reddit Title Keywords (after robust stop word removal):")
    # Get the default stop words from the ML library
    from pyspark.ml.feature import StopWordsRemover
    stopwords = StopWordsRemover.loadDefaultStopWords("english")
    
    reddit_keywords = reddit_single_day_df.withColumn("word", F.explode(F.split(F.lower(F.col("title")), "[^a-zA-Z]"))) \
        .filter(~F.col("word").isin(stopwords)) \
        .filter(F.col("word") != "") \
        .groupBy("word").count().orderBy(F.col("count").desc())
    reddit_keywords.show(10, truncate=False)


--- Deriving Comparative Insights for 2025-09-10 ---

Insight 1: Overall Sentiment Comparison (News vs. Reddit)
  - Average GDELT News Sentiment (AvgTone): -2.83
  - Average Reddit Post Score: 253.32

Insight 2: Mentions of Top GDELT Actors in Reddit Titles
Top 5 Filtered GDELT Actors in sample: ['USA', 'ISR', 'QAT', 'BUS', 'GBR']
Mentions of these actors in Reddit titles:
  - 'USA': 33 mentions
  - 'ISR': 53 mentions
  - 'QAT': 17 mentions
  - 'BUS': 7 mentions
  - 'GBR': 0 mentions

Insight 3: Thematic Focus Comparison (GDELT Events vs. Reddit Keywords)
  - Top 10 GDELT Event Types:
+---------+-----+
|EventCode|count|
+---------+-----+
|010      |254  |
|042      |224  |
|020      |201  |
|043      |198  |
|051      |168  |
|040      |154  |
|190      |153  |
|111      |148  |
|173      |79   |
|046      |78   |
+---------+-----+
only showing top 10 rows
  - Top 10 Reddit Title Keywords (after robust stop word removal):
+--------+-----+
|word    |count|
+--------+-----+
|charlie |12

In [25]:
spark.stop()
print("\n✅ Spark session stopped. Script finished.")


✅ Spark session stopped. Script finished.


25/11/05 22:58:54 ERROR Executor: Exception in task 7.0 in stage 12.0 (TID 43): Block rdd_47_7 does not exist
