# Subreddit Growth: How many unique subreddits were there at the beginning of 2018?

In [3]:
# Reading data from hdfs cluster on orion11
# In here, I am using reservoir sample because the spark running out of memory and unable to read the full data set
df_2018_jan = spark.read.json('hdfs://orion11:23001/RES-RC_2018-01.zst')

In [4]:
# Get schema
df_2018_jan.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit_type: string (nullable = true)



In [5]:
df_2018_jan.createOrReplaceTempView("df_view_2018_jan")

# Subreddit Growth: How many unique subreddits were there at the beginning of 2018?
spark.sql("SELECT DISTINCT subreddit AS unique_subreddits FROM df_view_2018_jan").count()

50974

In [6]:
df_2018_dec = spark.read.json('hdfs://orion11:23001/RES-RC_2018-12.zst')

In [7]:
df_2018_dec.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_created_utc: long (nullable = true)
 |-- author_flair_background_color: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_richtext: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- e: string (nullable = true)
 |    |    |-- t: string (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- author_flair_template_id: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_flair_text_color: string (nullable = true)
 |-- author_flair_type: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_patreon_flair: boolean (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- can_mod_post: boolean (nullable = true)
 |-

In [8]:
df_2018_dec.createOrReplaceTempView("df_view_2018_dec")

# Subreddit Growth: How many unique subreddits were there at the end of 2018?
spark.sql("SELECT DISTINCT(subreddit) AS unique_subreddits FROM df_view_2018_dec").count()

56146

In [9]:
# Print output answer
print("The number of unique subreddits at the beginning of 2018: {}".format(Out[5]))
print("The number of unique subreddits at the end of 2018: {}".format(Out[8]))

The number of unique subreddits at the beginning of 2018: 50974
The number of unique subreddits at the end of 2018: 56146


# User Growth: How many active users does Reddit have now compared to the past?

In [5]:
# Reading data from hdfs cluster on orion11
# Reading the very first data reddit has vs the last data from reddit
df_2005_jan = spark.read.json('hdfs://orion11:23001/RC_2005-12.bz2')
df_2020_dec = spark.read.json('hdfs://orion11:23001/RES-RC_2020-12.zst')

In [10]:
# Get 2005 schema
df_2005_jan.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: boolean (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [13]:
# Get 2020 schema
df_2020_dec.printSchema()

root
 |-- all_awardings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- award_sub_type: string (nullable = true)
 |    |    |-- award_type: string (nullable = true)
 |    |    |-- awardings_required_to_grant_benefits: long (nullable = true)
 |    |    |-- coin_price: long (nullable = true)
 |    |    |-- coin_reward: long (nullable = true)
 |    |    |-- count: long (nullable = true)
 |    |    |-- days_of_drip_extension: long (nullable = true)
 |    |    |-- days_of_premium: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- end_date: string (nullable = true)
 |    |    |-- giver_coin_reward: long (nullable = true)
 |    |    |-- icon_format: string (nullable = true)
 |    |    |-- icon_height: long (nullable = true)
 |    |    |-- icon_url: string (nullable = true)
 |    |    |-- icon_width: long (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- is_enabled: boolean (nullable = t

In [18]:
# January 2005
# So my assumption for active user is a user that post something (comment/subreddit) within the month that we are analyzing.
df_2005_jan.createOrReplaceTempView("df_view_2005_jan")

# Subreddit Growth: How many unique subreddits were there at the beginning of 2018?
active_user_2005_jan = spark.sql("SELECT DISTINCT author AS active_user FROM df_view_2005_jan").count()


In [None]:
# December 2020
# So my assumption for active user is a user that post something (comment/subreddit) within the month that we are analyzing.
df_2020_dec.createOrReplaceTempView("df_view_2020_dec")

# Subreddit Growth: How many unique subreddits were there at the beginning of 2018?
active_user_2020_dec = spark.sql("SELECT DISTINCT author AS active_user FROM df_view_2020_dec").count()


In [23]:
# Print output answer
print("The number of active users at jan 2005: {}".format(active_user_2005_jan))
print("The number of active users at dec 2020: {}".format(active_user_2020_dec))
print("Growth percentage: {}".format((active_user_2020_dec - active_user_2005_jan)/active_user_2020_dec * 100))

The number of active users at jan 2005: 394
The number of active users at dec 2020: 2906214
Growth percentage: 99.98644284281887


# Best Comment Award: Choose a particular day and determine what the most upvoted comment was. (Include the comment in your report, of course!)

In [48]:
# Finding the the maximum score on 12-1-2020
spark.sql("SELECT MAX(score) as max_score FROM (SELECT * FROM df_view_2020_dec WHERE from_unixtime(created_utc, 'MM dd yyyy') == '12 01 2020') AS t").show()


+---------+
|max_score|
+---------+
|    26280|
+---------+



In [59]:
# Get the most upvoted comment and author
most_upvoted_comment = spark.sql("SELECT author, body as most_upvoted_comment FROM df_view_2020_dec WHERE score == 26280 AND from_unixtime(created_utc, 'MM dd yyyy') == '12 01 2020'")
most_upvoted_comment.cache()
most_upvoted_comment.show()

+--------+--------------------+
|  author|most_upvoted_comment|
+--------+--------------------+
|ThatKiwi|I've been working...|
+--------+--------------------+



In [62]:
# Printing the whole comment
author_comment = most_upvoted_comment.collect()
print("author: ", author_comment[0][0])
print("comment: ", author_comment[0][1])

author:  ThatKiwi
comment:  I've been working from home and as my employer, like many others, are soul sucking demons not fit for this earth, they track you in just about every way possible because it's simply inconceivable to think people would just do the work they're paid to do, regardless of the setting they're in.

If I dont interact with the computer for more than five minutes it registers me as away, heavens forbid I have to poop or something. 

I got spoken to the very first week of WFH and told "you have to be available during work hours" by a supervisor who is seemingly never available but was blind to the irony I guess.

In any event, I found that if you navigate to a certain portion of our 1970s software you can endlessly type. I promptly went outside and found a rock which now holds down my space bar for me anytime I need to walk away for a minute. 

That rock is, in my eyes, now one of the most important objects in my world and it was free.

Edit: Fixed typos. I feel sham

# Top Comments: For the user you found in the previous question, find their five most-upvoted comments overall across the entire dataset. Do they post highly-upvoted comments often, or are they a “one hit wonder?”

In [64]:
# Get the top 5 upvoted comments from the author
top_five_upvoted_comment = spark.sql("SELECT author, body, score FROM df_view_2020_dec WHERE author == 'ThatKiwi' ORDER BY score DESC LIMIT 5")
top_five_upvoted_comment.show()

+--------+--------------------+-----+
|  author|                body|score|
+--------+--------------------+-----+
|ThatKiwi|I've been working...|26280|
+--------+--------------------+-----+



Apparently the user did not post any other comment on December 2020. It seems like her/his comment is a "one hit wonder". But, since this data is coming from reservoir sample, the author might have other comments that are not included in the sample.

# Ban Hammer: Based on user activity, determine which subreddits have been recently banned.

In [None]:
# Get 2020 november data
df_2020_nov = spark.read.json('hdfs://orion11:23001/RES-RC_2020-11.zst')
df_2020_nov.createOrReplaceTempView("df_view_2020_nov")

In [None]:
# Get all subreddits that exist in november 2020 but not exist in december 2020
# This approach does not give guarantee that the subreddit is banned because we are using reservoir sample
recent_banned_subreddit = spark.sql("SELECT DISTINCT subreddit FROM (SELECT subreddit FROM (SELECT subreddit, COUNT(*) AS num_of_comments FROM df_view_2020_nov GROUP BY subreddit) AS t1 WHERE num_of_comments > 100 AND subreddit NOT IN (SELECT DISTINCT subreddit FROM df_view_2020_dec)) AS t3")

recent_banned_subreddit.cache()
recent_banned_subreddit.show()

In [8]:
recent_banned_subreddit.count()

3

In [10]:
sc = spark._jsc.sc() 
n_workers =  len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(n_workers)

10
