## Prepare Session

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
import pyspark.sql.functions as F

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1716248275962_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Explore data

In [3]:
df_parquet = spark.read.parquet('s3://finalproject-nat-s3/submissions_parquet/*.parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
#Show many rows to ensure correct parsing
df_parquet.show(100)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+----------------+--------------------+-----+--------------------+--------------------+------------+
|    id|         created|              author|score|               title|            selftext|num_comments|
+------+----------------+--------------------+-----+--------------------+--------------------+------------+
|1fceq2|2013-05-30 11:28|           u/rlh1271|   59|24m My girlfriend...|Weve been dating ...|         124|
| cw6d6|2010-08-01 16:23|u/longdistance_throw|    4|What are some goo...|Hi friends youve ...|          12|
|1j2am7|2013-07-25 18:54|         u/[deleted]|    2|24m My wife 23f C...|Hi Reddit Been co...|           7|
|16k90k|2013-01-14 12:04|        u/RoboDinner|    1|Need gift advice ...|Ive been seeing a...|          13|
| yvqn2|2012-08-26 20:45|   u/MaddenInGeneral|    1|21m Help dealing ...|I need some help ...|           6|
| bn3ib|2010-04-06 07:21|         u/[deleted]|    2|Unhealthy relatio...|Note sorry but to...|          15|
|14a0go|2012-12-04 14:00|   

In [5]:
#See structure of the dataframe 
print('Total Columns: %d' % len(df_parquet.dtypes))
print('Total Rows: %d' % df_parquet.count())
df_parquet.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total Columns: 7
Total Rows: 4357389
root
 |-- id: string (nullable = true)
 |-- created: string (nullable = true)
 |-- author: string (nullable = true)
 |-- score: string (nullable = true)
 |-- title: string (nullable = true)
 |-- selftext: string (nullable = true)
 |-- num_comments: string (nullable = true)

## Remove rows with missing "selftext"

For context, selftext refers to the body text of the submission


In [6]:
#It is necessary to clean the following:
#deleted, removed, empty/null 

filtered_data = df_parquet.filter(df_parquet['selftext'] != "deleted")
filtered_data = filtered_data.filter(filtered_data['selftext'] != "removed")
filtered_data = filtered_data.filter(filtered_data['selftext'] != "null")
filtered_data = filtered_data.filter(F.trim(F.col('selftext')) != "")

#See how many rows remain
print(df_parquet.count() - filtered_data.count(), "rows removed")
print(filtered_data.count(), "rows remaining")

#Visualize structure of values
filtered_data.show(10)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2698497 rows removed
1658892 rows remaining
+------+----------------+--------------------+-----+--------------------+--------------------+------------+
|    id|         created|              author|score|               title|            selftext|num_comments|
+------+----------------+--------------------+-----+--------------------+--------------------+------------+
|1fceq2|2013-05-30 11:28|           u/rlh1271|   59|24m My girlfriend...|Weve been dating ...|         124|
| cw6d6|2010-08-01 16:23|u/longdistance_throw|    4|What are some goo...|Hi friends youve ...|          12|
|1j2am7|2013-07-25 18:54|         u/[deleted]|    2|24m My wife 23f C...|Hi Reddit Been co...|           7|
|16k90k|2013-01-14 12:04|        u/RoboDinner|    1|Need gift advice ...|Ive been seeing a...|          13|
| yvqn2|2012-08-26 20:45|   u/MaddenInGeneral|    1|21m Help dealing ...|I need some help ...|           6|
| bn3ib|2010-04-06 07:21|         u/[deleted]|    2|Unhealthy relatio...|Note sorry but to..

For some reason, null values aren't being captured with traditional methods, so I believe they are being deleted when I delete empty strings (as they appeared as empty strings in the csv and maybe the "null" is just a rendering thing here. 

Therefore, I'll test if they're indeed being deleted by comparing the df before and after cleaning

In [7]:
null_test1 = df_parquet.filter(df_parquet['id'] == "hthy4")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
null_test1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------------+-----------+-----+--------------------+--------+------------+
|   id|         created|     author|score|               title|selftext|num_comments|
+-----+----------------+-----------+-----+--------------------+--------+------------+
|hthy4|2011-06-07 00:22|u/[deleted]|    1|Right behind me o...|    null|           1|
+-----+----------------+-----------+-----+--------------------+--------+------------+

In [9]:
null_test2 = filtered_data.filter(filtered_data['id'] == "hthy4")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
null_test2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-------+------+-----+-----+--------+------------+
| id|created|author|score|title|selftext|num_comments|
+---+-------+------+-----+-----+--------+------------+
+---+-------+------+-----+-----+--------+------------+

I could observe that in fact, "null" values in the selftext are already deleted

## Remove duplicates

In [11]:
# Drop duplicates based on the "selftext" column only
# https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

no_duplicates_df = filtered_data.dropDuplicates(['selftext'])

#See how many rows remain
print(filtered_data.count() - no_duplicates_df.count(), "rows removed")
print(no_duplicates_df.count(), "rows remaining")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12485 rows removed
1646407 rows remaining

In [12]:
no_duplicates_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+--------------------+-----+--------------------+--------------------+------------+
|     id|         created|              author|score|               title|            selftext|num_comments|
+-------+----------------+--------------------+-----+--------------------+--------------------+------------+
|17zi1dd|2023-11-20 00:16|  u/ThrowRA867530910|    1|I 26M dont know w...|		So here is the ...|           3|
| 26qmxz|2014-05-28 18:20|       u/wetwilly850|    1|20m Problems with...|	I have a friend ...|           2|
| bzvord|2019-06-12 14:40|    u/waitingonfedex|    0|boyfriend doesnt ...|	Me 23 M and my p...|           0|
|169r4ld|2023-09-04 07:49|u/throwRAsltwaterbae|    1|28F32M  why would...|                 ...|           1|
|168k3h9|2023-09-02 20:37|u/throwRAuglyasal...|    1|28F32M  why would...|                 ...|           1|
| 4a4dcd|2016-03-12 10:19|    u/kabinotski0919|    0|Help me 23m my gi...|                 ...|           1|
| hb4dzu|2020-06-17

## Feature Engineering

In [32]:
# Combine title and self text into a single column
df = no_duplicates_df.withColumn("entire_text", F.lower(F.concat(F.col("title"), F.col("selftext"))))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
#I asked ChatGPT "How can I do this [df.select("title", "selftext", "entire_text").show(3)] 
#so it does not truncate the text?"" so I could see it complete and ensure the concatenation worked 

df.select("entire_text").show(3, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [35]:
#Create column that contains only the year
#https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.year.html

df = df.withColumn("year", F.year(F.col("created")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------------+--------------------+-----+--------------------+--------------------+------------+--------------------+----+
|     id|         created|              author|score|               title|            selftext|num_comments|         entire_text|year|
+-------+----------------+--------------------+-----+--------------------+--------------------+------------+--------------------+----+
|17zi1dd|2023-11-20 00:16|  u/ThrowRA867530910|    1|I 26M dont know w...|		So here is the ...|           3|i 26m dont know w...|2023|
| 26qmxz|2014-05-28 18:20|       u/wetwilly850|    1|20m Problems with...|	I have a friend ...|           2|20m problems with...|2014|
| bzvord|2019-06-12 14:40|    u/waitingonfedex|    0|boyfriend doesnt ...|	Me 23 M and my p...|           0|boyfriend doesnt ...|2019|
|169r4ld|2023-09-04 07:49|u/throwRAsltwaterbae|    1|28F32M  why would...|                 ...|           1|28f32m  why would...|2023|
|168k3h9|2023-09-02 20:37|u/throwRAuglyasal...|    1|28

## Upload prepared data into S3 

In [64]:
#Write cleaned data into new files
df_repartitioned = df.repartition(10)
df_repartitioned.write.parquet("s3://finalproject-nat-s3/prepared-data", mode = 'overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…