In [53]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim
import os

def create_spark_session(app_name="RedditDataProcessor"):
    """Create a Spark session connected to the cluster"""
    spark = SparkSession \
        .builder \
        .master("spark://g37-master:7077") \
        .appName(app_name) \
        .config("spark.dynamicAllocation.enabled", True) \
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True) \
        .config("spark.shuffle.service.enabled", False) \
        .config("spark.dynamicAllocation.executorIdleTimeout", "30s") \
        .config("spark.executor.cores", 3) \
        .config("spark.executor.memory", "6g") \
        .config("spark.driver.memory", "6g") \
        .config("spark.default.parallelism", 18) \
        .config("spark.sql.shuffle.partitions", 18) \
        .config("spark.memory.fraction", 0.8) \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "1g") \
        .getOrCreate()
    
    return spark

In [54]:
filenames = ["corpus-webis-tldr-17.json", "reddit_100k.json", "reddit_200k.json", "reddit_500k.json", "reddit_50k.json", "reddit_filtered.parquet"]

In [55]:
spark = create_spark_session()

In [57]:
folderpath = "hdfs://192.168.2.156:9000/data/reddit/"
filename = filenames[0]
source_hdfs_path = folderpath + filename

In [58]:
# Read data from source HDFS (read-only operation)
print(f"Reading Reddit data from source: {source_hdfs_path}")
df = spark.read.json(source_hdfs_path)

# Show data sample and schema
print("Original Reddit data sample:")
df.show(1, truncate=False)

print("Data Schema:")
df.printSchema()

Reading Reddit data from source: hdfs://192.168.2.156:9000/data/reddit/corpus-webis-tldr-17.json


                                                                                

Original Reddit data sample:


[Stage 1:>                                                          (0 + 1) / 1]

+----------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [59]:
record_count = df.count()
print(f"Total records: {record_count}")



Total records: 3848330


                                                                                

In [61]:
df = df.drop("body", "normalizedBody", "title")

In [62]:
df = df.fillna({"subreddit": "unknown", "author": "unnamed"})

In [64]:
df = df.withColumn("content", lower(trim(col("content")))) \
        .withColumn("subreddit", lower(trim(col("subreddit")))) \
        .withColumn("summary", lower(trim(col("summary"))))

In [65]:
df = df.filter(col("content_len") >= 5)

In [66]:
df.printSchema()
df.show(5)

root
 |-- author: string (nullable = false)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- subreddit: string (nullable = false)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)

+----------------+--------------------+-----------+-------+-----------+------------+--------------------+-----------+
|          author|             content|content_len|     id|  subreddit|subreddit_id|             summary|summary_len|
+----------------+--------------------+-----------+-------+-----------+------------+--------------------+-----------+
|raysofdarkmatter|i think it should...|        178|c69al3r|       math|    t5_2qh0n|shifting seasonal...|          8|
|         Stork13|art is about the ...|        148|c6a9nxd|      funny|    t5_2qh33|personal opinions...|          4|
|   Cloud_dreamer|ask me what i thi...|         76|c6acx4l|borderlands|    t5_2r8cd|i

In [67]:
df.write.mode("overwrite").parquet("hdfs:///data/reddit/clean_corpus.parquet")

                                                                                

In [69]:
df = spark.read.parquet("hdfs:///data/reddit/clean_corpus.parquet")

df.show(1, truncate=False)

[Stage 10:>                                                         (0 + 1) / 1]

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [70]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- content_len: long (nullable = true)
 |-- id: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- summary_len: long (nullable = true)



In [71]:
spark.stop()