#### *This notebook run on Data Science 3.0 - Python 3 kernel on a ml.t3.large instance.*

## In this notebook...

We will analyze the [Pushshift Reddit dataset](https://arxiv.org/pdf/2001.08435.pdf) to be used for the project and then we will filter out the comments and submissions from subreddits of interest. The filtered data will be stored in your account's s3 bucket and it is this filtered data that you will be using for this project.

## Setup

In [2]:
# Setup - Run only once per Kernel App
%conda install openjdk -y

# install PySpark
%pip install pyspark==3.3.0

# restart kernel
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Collecting package metadata (current_repodata.json): done
Solving environment: done


  current version: 23.3.1
  latest version: 24.3.0

Please update conda by running

    $ conda update -n base -c defaults conda

Or to minimize the number of packages updated during conda update use

     conda install conda=24.3.0



## Package Plan ##

  environment location: /opt/conda

  added / updated specs:
    - openjdk


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    ca-certificates-2024.3.11  |       h06a4308_0         127 KB
    certifi-2024.2.2           |  py310h06a4308_0         159 KB
    openjdk-11.0.13            |       h87a67e3_0       341.0 MB
    ------------------------------------------------------------
                                           Total:       341.3 MB

The following NEW packages will be INSTALLED:

  openjdk            pkgs/main/linux-64::openjdk-11.0.13-h87a67e

In [3]:
# Import pyspark and build Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName("PySparkApp")
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2")
    .config(
        "fs.s3a.aws.credentials.provider",
        "com.amazonaws.auth.ContainerCredentialsProvider",
    )
    .getOrCreate()
)

print(spark.version)



:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6cb713f7-1156-4a73-ad13-357d10fef07c;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 341ms :: artifacts dl 21ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------

24/04/03 18:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


3.3.0


In [4]:
import sagemaker
sess = sagemaker.Session()
bucket = sess.default_bucket()
print(bucket)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
sagemaker-us-east-1-655790771121


In [5]:
%%time
comments = spark.read.parquet(
    "s3a://bigdatateaching/reddit-parquet/comments/year=2022/month={3,4,5,6,7,8}/*.parquet",
    header=True
)
# comments.show()

24/04/03 05:32:25 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

CPU times: user 235 ms, sys: 41.9 ms, total: 277 ms
Wall time: 1min 21s


In [6]:
%%time
submissions = spark.read.parquet(
    "s3a://bigdatateaching/reddit-parquet/submissions/year=2022/month={3,4,5,6,7,8}/*.parquet",
    header=True
)
# submissions.show()

                                                                                

24/04/03 05:34:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
CPU times: user 94.3 ms, sys: 26 ms, total: 120 ms
Wall time: 34.8 s


In [13]:
%%time
print(f"shape of the submissions dataframe is {submissions.count():,}x{len(submissions.columns)}") 



shape of the submissions dataframe is 210,157,555x68
CPU times: user 615 ms, sys: 154 ms, total: 769 ms
Wall time: 6min 46s


                                                                                

In [14]:
%%time
print(f"shape of the comments dataframe is {comments.count():,}x{len(comments.columns)}") 



shape of the comments dataframe is 1,406,797,406x21
CPU times: user 1.16 s, sys: 296 ms, total: 1.45 s
Wall time: 11min 19s


                                                                                

## Preliminary data extraction, filtering and cleaning
In this section we do the following:
- Extract the submissions and comments from subreddits of interest.
- Conduct primary examination of the subset of data, check for missing values and filter out irrelevant columns.
- Save the cleaned dataframes to S3 bucket for later use.

In [7]:
submissions.createOrReplaceTempView("submissions")
comments.createOrReplaceTempView("comments")

## r/birthcontrol

In [8]:
import pyspark.sql.functions as f
from pyspark.sql.functions import isnan, when, count, col, lit

In [9]:
subreddit = 'birthcontrol'

In [10]:
sql_str=f"select * from submissions where subreddit='{subreddit}'"
submissions_filtered = spark.sql(sql_str)
print('Total submissions:', submissions_filtered.count())



Total submissions: 17695


                                                                                

In [11]:
sql_str=f"select * from comments where subreddit='{subreddit}'"
comments_filtered = spark.sql(sql_str)
print('Total comments:', comments_filtered.count())



Total comments: 93161


                                                                                

In [12]:
submissions_filtered.cache()
comments_filtered.cache()

DataFrame[author: string, author_cakeday: boolean, author_flair_css_class: string, author_flair_text: string, body: string, can_gild: boolean, controversiality: bigint, created_utc: timestamp, distinguished: string, edited: string, gilded: bigint, id: string, is_submitter: boolean, link_id: string, parent_id: string, permalink: string, retrieved_on: timestamp, score: bigint, stickied: boolean, subreddit: string, subreddit_id: string]

#### Check for columns and filter the ones we need

In [13]:
submissions_filtered.printSchema()

root
 |-- adserver_click_url: string (nullable = true)
 |-- adserver_imp_pixel: string (nullable = true)
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- brand_safe: boolean (nullable = true)
 |-- contest_mode: boolean (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- approved_at_utc: string (nullable = true)
 |    |    |-- approved_by: string (nullable = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    

In [14]:
submissions_filtered = submissions_filtered.select(
    "id", "author", "created_utc", "selftext", "title", 
    "num_comments", "score", "is_self", "over_18", "distinguished"
)

In [15]:
submissions_filtered.show(5)

[Stage 10:>                                                         (0 + 1) / 1]

+------+--------------------+-------------------+--------------------+--------------------+------------+-----+-------+-------+-------------+
|    id|              author|        created_utc|            selftext|               title|num_comments|score|is_self|over_18|distinguished|
+------+--------------------+-------------------+--------------------+--------------------+------------+-----+-------+-------+-------------+
|x1dtdj|             teiteip|2022-08-30 08:44:24|Here's history of...|Late periods or p...|           1|    0|   true|  false|         null|
|x1ech4|  Other_Treacle_7229|2022-08-30 09:18:25|Hey!\nI've starte...|what's the best t...|           1|    0|   true|  false|         null|
|x1edoz|   firewalkwithme223|2022-08-30 09:20:39|I need a little b...| PMS but no bleeding|           3|    0|   true|  false|         null|
|x1eeg4|Small-Perspective538|2022-08-30 09:22:01|i started taking ...|               help!|           1|    0|   true|  false|         null|
|x1eg9w|     

                                                                                

In [16]:
comments_filtered.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: timestamp (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: timestamp (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [17]:
comments_filtered = comments_filtered.select(
    "id", "author", "link_id", "parent_id", "created_utc", 
    "body", "score", "gilded", "distinguished", "controversiality"
)

In [18]:
comments_filtered.show(5)

[Stage 11:>                                                         (0 + 1) / 1]

+-------+---------------+---------+----------+-------------------+--------------------+-----+------+-------------+----------------+
|     id|         author|  link_id| parent_id|        created_utc|                body|score|gilded|distinguished|controversiality|
+-------+---------------+---------+----------+-------------------+--------------------+-----+------+-------------+----------------+
|i4brjwx|        asowona|t3_u0mhg4|t1_i46rqw5|2022-04-11 18:29:05|Quick question no...|    1|     0|         null|               0|
|i4bs2qt|  AutoModerator|t3_u1egcf| t3_u1egcf|2022-04-11 18:32:23|Welcome and pleas...|    1|     0|    moderator|               0|
|i4bsprd|Simon_Petrikov_|t3_u1egcf| t3_u1egcf|2022-04-11 18:48:35|take a drink of w...|    3|     0|         null|               0|
|i4bsq61|  AutoModerator|t3_u1elit| t3_u1elit|2022-04-11 18:48:56|Welcome and pleas...|    1|     0|    moderator|               0|
|i4btavl|  AutoModerator|t3_u1eplj| t3_u1eplj|2022-04-11 19:00:49|Welcome an

                                                                                

#### Check for missing values

In [19]:
from pyspark.sql import Row
from pyspark.sql.functions import *
# Check for missing data in Submissions
missing_counts = [Row(column_name=col_name, missing_count=submissions_filtered.select(sum(col(col_name).isNull().cast("int")).alias(col_name)).collect()[0][col_name]) for col_name in submissions_filtered.columns]
bc_subs_missing = spark.createDataFrame(missing_counts)

                                                                                

In [20]:
# Check for missing data in Comments
missing_counts = [Row(column_name=col_name, missing_count=comments_filtered.select(sum(col(col_name).isNull().cast("int")).alias(col_name)).collect()[0][col_name]) for col_name in comments_filtered.columns]
bc_comments_missing = spark.createDataFrame(missing_counts)

                                                                                

In [21]:
bc_subs_missing.show()

                                                                                

+-------------+-------------+
|  column_name|missing_count|
+-------------+-------------+
|           id|            0|
|       author|            0|
|  created_utc|            0|
|     selftext|            0|
|        title|            0|
| num_comments|            0|
|        score|            0|
|      is_self|            0|
|      over_18|            0|
|distinguished|        17694|
+-------------+-------------+



In [22]:
bc_comments_missing.show()

+----------------+-------------+
|     column_name|missing_count|
+----------------+-------------+
|              id|            0|
|          author|            0|
|         link_id|            0|
|       parent_id|            0|
|     created_utc|            0|
|            body|            0|
|           score|            0|
|          gilded|            0|
|   distinguished|        75071|
|controversiality|            0|
+----------------+-------------+



In [23]:
cols_to_drop = ['distinguished']

In [24]:
bc_subs = submissions_filtered.drop(*cols_to_drop)
bc_coms = comments_filtered.drop(*cols_to_drop)

In [25]:
bc_subs.write.mode("overwrite").format("parquet").save("s3a://sagemaker-us-east-1-655790771121/bc_subs")

                                                                                

In [None]:
# write.mode("overwrite").format("parquet").save

In [26]:
bc_coms.write.mode("overwrite").format("parquet").save("s3a://sagemaker-us-east-1-655790771121/bc_coms")

                                                                                

## r/abortion

In [9]:
from pyspark.sql import Row
from pyspark.sql.functions import *

In [8]:
subreddit = 'abortion'

In [10]:
sql_str=f"select * from submissions where subreddit='{subreddit}'"
submissions_filtered = spark.sql(sql_str)

In [10]:
sql_str=f"select * from submissions where subreddit='{subreddit}'"
submissions_filtered = spark.sql(sql_str)
print('Total submissions:', submissions_filtered.count())



Total submissions: 6958


                                                                                

In [11]:
sql_str=f"select * from comments where subreddit='{subreddit}'"
comments_filtered = spark.sql(sql_str)

In [11]:
sql_str=f"select * from comments where subreddit='{subreddit}'"
comments_filtered = spark.sql(sql_str)
print('Total comments:', comments_filtered.count())



Total comments: 46661


                                                                                

In [12]:
submissions_filtered.cache()
comments_filtered.cache()

DataFrame[author: string, author_cakeday: boolean, author_flair_css_class: string, author_flair_text: string, body: string, can_gild: boolean, controversiality: bigint, created_utc: timestamp, distinguished: string, edited: string, gilded: bigint, id: string, is_submitter: boolean, link_id: string, parent_id: string, permalink: string, retrieved_on: timestamp, score: bigint, stickied: boolean, subreddit: string, subreddit_id: string]

#### Check for missing values

In [10]:
missing_counts = [Row(column_name=col_name, missing_count=ab_subs.select(sum(col(col_name).isNull().cast("int")).alias(col_name)).collect()[0][col_name]) for col_name in ab_subs.columns]
ab_subs_missing = spark.createDataFrame(missing_counts)

                                                                                

In [11]:
missing_counts = [Row(column_name=col_name, missing_count=ab_coms.select(sum(col(col_name).isNull().cast("int")).alias(col_name)).collect()[0][col_name]) for col_name in ab_coms.columns]
ab_coms_missing = spark.createDataFrame(missing_counts)

                                                                                

In [12]:
ab_subs_missing.show()

[Stage 269:>                                                        (0 + 1) / 1]

+--------------------+-------------+
|         column_name|missing_count|
+--------------------+-------------+
|  adserver_click_url|         6958|
|  adserver_imp_pixel|         6958|
|            archived|            0|
|              author|            0|
|      author_cakeday|         6946|
|author_flair_css_...|         6958|
|   author_flair_text|         6954|
|           author_id|         6958|
|          brand_safe|         6958|
|        contest_mode|            0|
|         created_utc|            0|
|    crosspost_parent|         6958|
|crosspost_parent_...|         6958|
|    disable_comments|         6958|
|       distinguished|         6958|
|              domain|          250|
|     domain_override|         6958|
|              edited|            0|
|          embed_type|         6958|
|           embed_url|         6958|
+--------------------+-------------+
only showing top 20 rows



                                                                                

In [13]:
ab_coms_missing.show()

+--------------------+-------------+
|         column_name|missing_count|
+--------------------+-------------+
|              author|            0|
|      author_cakeday|        46529|
|author_flair_css_...|        46661|
|   author_flair_text|        39299|
|                body|            0|
|            can_gild|            0|
|    controversiality|            0|
|         created_utc|            0|
|       distinguished|        39461|
|              edited|            0|
|              gilded|            0|
|                  id|            0|
|        is_submitter|            0|
|             link_id|            0|
|           parent_id|            0|
|           permalink|            0|
|        retrieved_on|            0|
|               score|            0|
|            stickied|            0|
|           subreddit|            0|
+--------------------+-------------+
only showing top 20 rows



In [14]:
ab_subs = ab_subs.select(
    "id", "author", "created_utc", "selftext", "title", 
    "num_comments", "score", "is_self", "over_18"
)

In [15]:
ab_coms = ab_coms.select(
    "id", "author", "link_id", "parent_id", "created_utc", 
    "body", "score", "gilded", "controversiality"
)

In [16]:
ab_subs.show(5)

+------+-------------+-------------------+--------------------+--------------------+------------+-----+-------+-------+
|    id|       author|        created_utc|            selftext|               title|num_comments|score|is_self|over_18|
+------+-------------+-------------------+--------------------+--------------------+------------+-----+-------+-------+
|tusi6f|   wonderjedi|2022-04-02 20:33:20|So my bf and I ha...|no period, neg pr...|           5|    2|   true|  false|
|tusnzz|    [deleted]|2022-04-02 20:40:17|           [deleted]|Gf doesn't want m...|           6|    1|   true|  false|
|tuswqb|    [deleted]|2022-04-02 20:50:53|           [deleted]|    Abortion options|           1|    1|   true|  false|
|tuszhj|      Fuuuxkk|2022-04-02 20:54:17|I had a medical a...|6 weeks post medi...|           6|    1|   true|  false|
|vfn4am|vitabellaxoxo|2022-06-19 04:22:24|Can someone help ...|Free counseling r...|           3|    1|   true|  false|
+------+-------------+------------------

In [17]:
ab_coms.show(5)

+-------+-------------+---------+----------+-------------------+--------------------+-----+------+----------------+
|     id|       author|  link_id| parent_id|        created_utc|                body|score|gilded|controversiality|
+-------+-------------+---------+----------+-------------------+--------------------+-----+------+----------------+
|ifcvhth|    [deleted]|t3_vtzg8a| t3_vtzg8a|2022-07-08 15:50:19|           [removed]|    1|     0|               0|
|ifcvt64|AutoModerator|t3_vudvwg| t3_vudvwg|2022-07-08 15:52:20|Welcome to /r/abo...|    1|     0|               0|
|ifcw36a|  P0tatoes123|t3_vtxwpx|t1_ifcc0cj|2022-07-08 15:54:06|I felt kind of al...|    1|     0|               0|
|ifcw3gd|       gayice|t3_vubjdp| t3_vubjdp|2022-07-08 15:54:09|Give yourself the...|    2|     0|               0|
|ifcwdrs|     helppp94|t3_vucomu| t3_vucomu|2022-07-08 15:55:56|I 100% understand...|    3|     0|               0|
+-------+-------------+---------+----------+-------------------+--------

In [35]:
ab_subs.write.mode("overwrite").format("parquet").save("s3a://sagemaker-us-east-1-655790771121/ab_subs/")

                                                                                

In [36]:
ab_coms.write.mode("overwrite").format("parquet").save("s3a://sagemaker-us-east-1-655790771121/ab_coms/")

                                                                                

## r/politics 

In [9]:
subreddit = 'politics'

In [10]:
sql_str=f"select * from submissions where subreddit='{subreddit}'"
submissions_politics = spark.sql(sql_str)

In [11]:
sql_str=f"select * from comments where subreddit='{subreddit}'"
comments_politics = spark.sql(sql_str)

In [15]:
sql_str_subs = f"""
SELECT 
    id, 
    author, 
    created_utc, 
    selftext, 
    title, 
    num_comments, 
    score, 
    is_self, 
    over_18, 
    distinguished
FROM 
    submissions
WHERE 
    subreddit='{subreddit}'
"""

submissions_politics = spark.sql(sql_str_subs)
print('Total submissions:', submissions_politics.count())



Total submissions: 65716


                                                                                

In [16]:
sql_str_coms = f"""
SELECT 
    id, 
    author, 
    link_id, 
    parent_id, 
    created_utc, 
    body, 
    score, 
    gilded, 
    distinguished, 
    controversiality
FROM 
    comments
WHERE 
    subreddit='{subreddit}'
"""

comments_politics = spark.sql(sql_str_coms)
print('Total comments:', comments_politics.count())



Total comments: 6850734


                                                                                

In [12]:
submissions_politics.cache()
comments_politics.cache()

DataFrame[author: string, author_cakeday: boolean, author_flair_css_class: string, author_flair_text: string, body: string, can_gild: boolean, controversiality: bigint, created_utc: timestamp, distinguished: string, edited: string, gilded: bigint, id: string, is_submitter: boolean, link_id: string, parent_id: string, permalink: string, retrieved_on: timestamp, score: bigint, stickied: boolean, subreddit: string, subreddit_id: string]

In [13]:
submissions_politics.createOrReplaceTempView("submissions_politics")
comments_politics.createOrReplaceTempView("comments_politics")

### Create a term list to filter for posts related to reproductive rights

In [14]:
# Term List A1 - General Terms:
term_list_A1 = [
    'Abortion',
    'Reproductive Rights',
    'Pro-choice',
    'Pro-life',
    'Family Planning',
    'Birth Control',
    'Contraception',
    'Women’s Health',
    'Planned Parenthood',
    'Reproductive Health',
    'Pregnancy Termination',
    'Fetal Rights',
    'Body Autonomy',
    'Roe v. Wade',
    'Dobbs v. Jackson',
]

# Term List A2 - Legal and Political Terms:
term_list_A2 = [
    'Supreme Court',
    'Legislation',
    'Law',
    'Policy',
    'SCOTUS',
    'Legal Battle',
    'State Law',
    'Federal Law',
    'Amendment',
    'Judiciary',
    'Legal Case',
    'Legal Right',
    'Civil Liberties',
    'Advocacy',
    'Activism',
]

# Term List A3 - Combined Phrases from A1 and A2:
term_list_A3 = []
for i in term_list_A1:
    for j in term_list_A2:
        term_list_A3.append(i + ' ' + j)
        term_list_A3.append(j + ' ' + i)

# Term List B - Specific Terms and Acronyms:
term_list_B = [
    'FDA',
    'RU-486',
    'Mifepristone',
    'Misoprostol',
    'Plan B',
    'Emergency Contraception',
    'Defunding',
    'Hyde Amendment',
    'Gag Rule',
    'Title X',
    'Heartbeat Bill',
    'Gestation Law',
    'Parental Consent',
    'Abortion Clinic',
]

# Term List C - Organizations and Movements:
term_list_C = [
    'NARAL',
    'ACLU',
    'NOW',
    'National Right to Life',
    'Center for Reproductive Rights',
    'Guttmacher Institute',
    'March for Life',
    'Women’s March',
    'Operation Rescue',
    'Pro-Choice America',
    'Planned Parenthood Federation',
]

In [15]:
search_term_list = term_list_A3 + term_list_B + term_list_C

In [17]:
sql_str=f"""
    select * 
    from submissions_politics 
    where 
        subreddit='{subreddit}' 
        and (
            
            -- if any of the search terms are in the post title
            ( {" or ".join([f"LOWER(title) LIKE LOWER('%{search_term}%')" for search_term in search_term_list])})
            
            -- if any of the search terms are in the post body
            or ( {" or ".join([f"LOWER(selftext) LIKE LOWER('%{search_term}%')" for search_term in search_term_list])} )
        
            -- if both a word from A1 AND a term from A2 are both in the title
            or ( 
                ({" or ".join([f"LOWER(title) LIKE LOWER('%{search_term}%')" for search_term in term_list_A1])})
                and ({" or ".join([f"LOWER(title) LIKE LOWER('%{search_term}%')" for search_term in term_list_A2])})
            )
            
        );
    """

In [18]:
submissions_politics_filtered = spark.sql(sql_str)

In [33]:
print('Submission count:', submissions_politics_filtered.count())



Submission count: 3147


                                                                                

In [20]:
submissions_politics_filtered.write.parquet("s3a://sagemaker-us-east-1-655790771121/politics_subs/")

                                                                                

In [25]:
sql_str = f"""
    select * 
    from comments_politics 
    where 
        subreddit='{subreddit}' 
        and (
            -- if any of the search terms are in the comment body
            {" or ".join([f"LOWER(body) LIKE LOWER('%{search_term}%')" for search_term in search_term_list])}
        );
"""

In [26]:
comments_politics_filtered = spark.sql(sql_str)

In [28]:
# comments_politics_filtered.write.parquet("s3a://sagemaker-us-east-1-655790771121/politics_coms/")

In [20]:
pol_subs = spark.read.parquet("s3a://sagemaker-us-east-1-655790771121/politics_subs/")

In [21]:
pol_subs = pol_subs.select(
    "id", "author", "created_utc", "selftext", "title", 
    "num_comments", "score", "is_self", "over_18"
)

In [22]:
pol_subs.show(5)

+------+--------------------+-------------------+---------+--------------------+------------+-----+-------+-------+
|    id|              author|        created_utc| selftext|               title|num_comments|score|is_self|over_18|
+------+--------------------+-------------------+---------+--------------------+------------+-----+-------+-------+
|wcuk0s|         hafsayashfa|2022-07-31 17:51:20|         |Internet's world ...|           2|    1|  false|  false|
|vrh7e7|           [deleted]|2022-07-04 20:58:46|[deleted]|'He knows that th...|           1|    1|  false|  false|
|v0g6l1|Suspicious_County_24|2022-05-29 17:36:06|         |They admitted the...|           0|    1|  false|  false|
|tt4qhb|          AdCheap475|2022-03-31 17:19:11|         |Upcoming politica...|           0|    1|  false|  false|
|u355fk|       CapitalCourse|2022-04-14 00:54:03|         |Maryland Lawmaker...|         189| 4360|  false|  false|
+------+--------------------+-------------------+---------+-------------

                                                                                

In [24]:
pol_subs.write.mode("overwrite").format("parquet").save("s3a://sagemaker-us-east-1-655790771121/pol_subs/")

                                                                                