In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("comments") \
    .config("spark.executor.memory", "12g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "4") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1716517022501_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import col, udf, to_timestamp, desc, year, count, round, monotonically_increasing_id, concat, lit
from pyspark.sql.functions import from_unixtime, from_json, explode
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as F
import json

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Step 1
 First I am going to read my dataframe I created out the posts containing 'abortion' in the title. I create a new column by adding a 't3-' to the beginning of the ID, which is how Reddit lets you tell what post someone is responding to. Then, I am going to make the column a new dataframe to make it easier when so I perform an inner join on matching comments.

In [6]:
abortion_posts = spark.read.parquet('s3://131313113finalproject/posts_w_comments/').persist()

# adding on 't3_', making new df
parent_id = abortion_posts.select(F.concat(F.lit("t3_"), abortion_posts["id"]).alias("parent_id"))
parent_id.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- parent_id: string (nullable = true)

# Step 2
Now I am going to read the 2 GB compressed zst file from my s3 bucket. By reading it in as a text file, it create a single column dataframe where the column is the entirety of the data for that comment.

In [7]:
# reading in my zst file
z_file = 's3://131313113finalproject/Conservative_comments.zst'

data = spark.read.text(z_file)
# data_sample = data.sample(withReplacement=False, fraction=0.3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- value: string (nullable = true)

# Step 3
Next, I will be using the pyspark sql function from_json. This function parses a column on a dataframe and maps the data to the schema you provide.

In [12]:
# creating schema for data
json_schema = StructType([
    StructField("body", StringType(), True),
    StructField("author", StringType(), True),
    StructField("created_utc", StringType(), True),
    StructField("score", StringType(), True),
    StructField("id", StringType(), True),
    StructField("link_id", StringType(), True),
    StructField("subreddit", StringType(), True),
    StructField("author_flair_text", StringType(), True)
])

json_comments = data.select(from_json(col("value"), json_schema).alias("mapped_comments"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Step 4
After I have the data mapped, I preform an inner join with the previously made parent id dataframe, ensuring that only comments responding to the previously chosen posts make it into the dataset

In [13]:
# filtering based on matches with parent id
filtered_comments = json_comments.join(
    parent_id,
    json_comments.mapped_comments.link_id == parent_id.parent_id,
    "inner"
).select("mapped_comments.*")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Step 5
Writing to parquet

In [14]:
# writing to parquet
filtered_comments.write.parquet("filtered_comments_full_flare.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
fcf_ft = spark.read.parquet("filtered_comments_full_flare.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
fcf_ft.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+-----------+-----+-------+--------+------------+-----------------+
|                body|             author|created_utc|score|     id| link_id|   subreddit|author_flair_text|
+--------------------+-------------------+-----------+-----+-------+--------+------------+-----------------+
|"Every definition...|          [deleted]| 1222118791|    1|c05jb53|t3_72v18|Conservative|             null|
|Do you take every...|           cldnails| 1222119964|    1|c05jbgv|t3_72v18|Conservative|             null|
|Your response dri...|          [deleted]| 1222120133|    3|c05jbih|t3_72v18|Conservative|             null|
|You are not makin...|           cldnails| 1222129073|    1|c05jdq2|t3_72v18|Conservative|             null|
|You're not making...|          [deleted]| 1222130061|    1|c05jdyy|t3_72v18|Conservative|             null|
|The woman decidin...|          [deleted]| 1261564782|    1|c0hml86|t3_ahsop|Conservative|             null|
|State's right, Fe.

In [90]:
filtered_comments_full = filtered_comments_full.where(filtered_comments_full['author_flair_text'] != 'null')
filtered_comments_full.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------+-----------+-----+-------+--------+------------+--------------------+
|                body|       author|created_utc|score|     id| link_id|   subreddit|   author_flair_text|
+--------------------+-------------+-----------+-----+-------+--------+------------+--------------------+
|Well, considering...|    plato1123| 1269909833|   -5|c0n5bkf|t3_bjsrb|Conservative|            Moderate|
|Just throw in the...|contrarianism| 1269921488|    1|c0n5p1p|t3_bjsrb|Conservative|ideologues gonna ...|
|It makes you wond...|      DJWhamo| 1269941231|    5|c0n63sd|t3_bjsrb|Conservative|               Paleo|
|How can you be ag...|      DJWhamo| 1270364278|    4|c0ngf1g|t3_bjsrb|Conservative|               Paleo|
|Matter of opinion...|      DJWhamo| 1270396555|    0|c0ngu7p|t3_bjsrb|Conservative|               Paleo|
|Anti-rights vs. p...|      DJWhamo| 1270400865|    1|c0ngxr4|t3_bjsrb|Conservative|               Paleo|
|Here's the thing-...|      DJWhamo| 127042958

In [95]:
# Convert string to integer and then to timestamp I can use
fcf_ft = fcf_ft.withColumn("time", from_unixtime(fcf_ft["created_utc"].cast("bigint")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
# checking on that
fcf_ft.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+-----------+-----+-------+--------+------------+-----------------+-------------------+
|                body|   author|created_utc|score|     id| link_id|   subreddit|author_flair_text|               time|
+--------------------+---------+-----------+-----+-------+--------+------------+-----------------+-------------------+
|"Every definition...|[deleted]| 1222118791|    1|c05jb53|t3_72v18|Conservative|             null|2008-09-22 21:26:31|
|Do you take every...| cldnails| 1222119964|    1|c05jbgv|t3_72v18|Conservative|             null|2008-09-22 21:46:04|
+--------------------+---------+-----------+-----+-------+--------+------------+-----------------+-------------------+
only showing top 2 rows

In [16]:
# looking at schema for total data
fcf_ft.printSchema()
print('Total Rows: %d' % fcf_ft.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- body: string (nullable = true)
 |-- author: string (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- score: string (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- author_flair_text: string (nullable = true)

Total Rows: 206756

In [98]:
# writing to bucket
bucket = 's3://131313113finalproject/ffcf_ft'
fcf_ft.write.parquet(bucket, mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…