# Site Analysis

## Subreddit Growth

### How many unique subreddits were there at the beginning and end of 2018?

I interpret the beginning and end of 2018 to be the first and last months of 2018. So to answer this question, I will find the number of unique subreddits which had comments posted to them in Januray 2018 and the same number for December 2018 and compare these two numbers to analyze subreddit growth.

In [3]:
df_jan_18_res = spark.read.json('hdfs://orion11:13001/RES-RC_2018-01.zst')

In [4]:
df_jan_18_res.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- is_submitter: boolean (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- permalink: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit_type: string (nullable = true)



In [5]:
# Returns the estimated cardinality by HyperLogLog++
from pyspark.sql.functions import approx_count_distinct

approx_count = df_jan_18_res.select(approx_count_distinct("subreddit_id"))

In [6]:
jan_approx_count_list = approx_count.collect()

In [7]:
jan_approx_count_value = jan_approx_count_list[0][0]

In [8]:
jan_approx_count_value

51805

In [9]:
df_dec_18_res = spark.read.json('hdfs://orion11:13001/RES-RC_2018-12.zst')

In [10]:
df_dec_18_res.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_created_utc: long (nullable = true)
 |-- author_flair_background_color: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_richtext: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- a: string (nullable = true)
 |    |    |-- e: string (nullable = true)
 |    |    |-- t: string (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- author_flair_template_id: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- author_flair_text_color: string (nullable = true)
 |-- author_flair_type: string (nullable = true)
 |-- author_fullname: string (nullable = true)
 |-- author_patreon_flair: boolean (nullable = true)
 |-- body: string (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- can_mod_post: boolean (nullable = true)
 |-

In [11]:
dec_approx_count = df_dec_18_res.select(approx_count_distinct("subreddit_id"))

In [12]:
dec_approx_count_list = dec_approx_count.collect()

In [15]:
dec_approx_count_value = dec_approx_count_list[0][0]
dec_approx_count_value

56457

In [16]:
net_subreddit_growth = dec_approx_count_value - jan_approx_count_value
net_subreddit_growth

4652

In [19]:
percent_subreddit_growth = (net_subreddit_growth / jan_approx_count_value) * 100
percent_subreddit_growth

8.979828201911012

### Answer

The number of subreddits grew approximately 9% in 2018. There were approximately 51805 unique subreddits in the beginning of 2018 and approximately 56457 unique subreddits at the end of 2018 in the reservoir sampled data. This is an increase of 4652 subreddits or almost 9% growth in subreddits. While this answer is approximate because I used the sampled data and used the HyperLogLog++ algorithm which provides a cardinality estimate, I believe the 9% growth in the number of subreddits is a good estimate and is more accurate than the estimated number of unique subreddits because I used the sampled data.

## User Growth
### How many active users does Reddit have now compared to the past?

We can define active users as monthly active users. We can define "now" as the latest month of data which is December 2020. How do we define the past? Let's look at user growth over a decade so we can define the "the past" as December 2010 and look at the growth in monthly active users over 10 years. Let's define a monthly active user as a Reddit account which posted at least one comment in that month.

In [10]:
df_dec_10 = spark.read.json('hdfs://orion11:13001/RC_2010-12.bz2')

In [11]:
df_dec_10.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- score_hidden: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [12]:
# Redefine the schema because we are only interested in users

df_dec_10_author = df_dec_10.select('author')

In [13]:
# Removes duplicate users

df_dec_10_distinct_users = df_dec_10_author.distinct()

In [15]:
dec_10_users = df_dec_10_distinct_users.count()
dec_10_users

243810

In [7]:
# java.lang.InternalError: Frame requires too much memory for decoding 😱

df_dec_20 = spark.read.json('hdfs://orion11:13001/RC_2020-12.zst')

Py4JJavaError: An error occurred while calling o47.json.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 64) (10.0.1.26 executor 6): java.lang.InternalError: Frame requires too much memory for decoding
	at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method)
	at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:187)
	at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
	at java.base/java.io.InputStream.read(InputStream.java:205)
	at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:191)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:274)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
	at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
	at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
	at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
	at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
	at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
	at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:103)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.infer(JsonInferSchema.scala:116)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.$anonfun$inferFromDataset$5(JsonDataSource.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:110)
	at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.infer(JsonDataSource.scala:99)
	at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:65)
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:59)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:361)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.InternalError: Frame requires too much memory for decoding
	at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method)
	at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:187)
	at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
	at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
	at java.base/java.io.InputStream.read(InputStream.java:205)
	at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:191)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:274)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator.isEmpty(Iterator.scala:387)
	at scala.collection.Iterator.isEmpty$(Iterator.scala:387)
	at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1431)
	at scala.collection.TraversableOnce.reduceLeftOption(TraversableOnce.scala:249)
	at scala.collection.TraversableOnce.reduceLeftOption$(TraversableOnce.scala:248)
	at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1431)
	at scala.collection.TraversableOnce.reduceOption(TraversableOnce.scala:256)
	at scala.collection.TraversableOnce.reduceOption$(TraversableOnce.scala:256)
	at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1431)
	at org.apache.spark.sql.catalyst.json.JsonInferSchema.$anonfun$infer$1(JsonInferSchema.scala:103)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [1]:
# Let's try with the reservoir sample instead of the full month of data

df_dec_20 = spark.read.json('hdfs://orion11:13001/RES-RC_2020-12.zst').select('author')

In [4]:
df_dec_20_distinct_users = df_dec_20.distinct()

In [5]:
dec_20_users = df_dec_20_distinct_users.count()
dec_20_users

2906214

In [6]:
# Copy and pasted after running `hdfs dfs -ls /`

full_file_size = 19222994313
sample_file_size = 2878295402

In [8]:
dec_20_users_scaled = round(dec_20_users * (full_file_size / sample_file_size))
dec_20_users_scaled

19409452

In [16]:
net_user_growth = dec_20_users_scaled - dec_10_users
net_user_growth

19165642

In [17]:
percent_user_growth = (net_user_growth / dec_10_users) * 100
percent_user_growth

7860.89249825684

### Answer

Reddit grew in the number of monthly active users by approximately 7860% in the decade between December 2010 and December 2020. We are defining an active user as a user who posted a comment on Reddit in that month. In December 2010, 243,810 unique accounts posted a comment. In December 2020, approximately 19,409,452 unique accounts posted a comment. This number is an approximation because I used the resevoir sampled data and then multiplied the number of unique accounts commenting in that random sample by the file size of the full data divided by the file size of the sampled data. This, of course, is making several assumptions. One assumption is that the percentage of unique accoutns making comments in the sampled data out of all the comments is similar in the entire data set. This also assumes that the size of each row in the sampled data and the full data are relatively similar. A final assumption, would would still need to be made even if exact counts were found, is that each different Reddit account represents a unique user. Since Reddit is anonymous, it is possible for one user to make many accounts. Another limitation of this analysis is that there are comments which have deleted users. We are not able to count these users in our analysis. 

## Best Comment Award

### What was the most upvoted comment on Election Day 2008?

In [2]:
df_nov_08 = spark.read.json('hdfs://orion11:13001/RC_2008-11.bz2')

In [3]:
df_nov_08.count()

792310

In [4]:
df_nov_08.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- score_hidden: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [5]:
df_nov_08.first()

Row(archived=True, author='[deleted]', author_flair_css_class=None, author_flair_text=None, body='[deleted]', controversiality=0, created_utc='1225497606', distinguished=None, downs=0, edited='false', gilded=0, id='c064gt3', link_id='t3_7ajem', name='t1_c064gt3', parent_id='t3_7ajem', retrieved_on=1425896386, score=1, score_hidden=False, subreddit='reddit.com', subreddit_id='t5_6', ups=1)

In [6]:
from datetime import datetime
import time 

In [15]:
# Election Day was on November 4, 2008

start_datetime = datetime(2008, 11, 4)
end_datetime = datetime(2008, 11, 5)

# Convert datetime object to a unix timestamp string which is how timestamps are represented in the schema

start_datetime_unix = str(int(time.mktime(start_datetime.timetuple())))
end_datetime_unix = str(int(time.mktime(end_datetime.timetuple())))

In [16]:
df_nov_4_08 = df_nov_08.filter(df_nov_08['created_utc'] >= start_datetime_unix)
df_nov_4_08 = df_nov_4_08.filter(df_nov_08['created_utc'] < end_datetime_unix).cache()

In [17]:
df_nov_4_08.count()

37898

In [21]:
# Sort comments on election day in descending order by upvotes

df_nov_4_08 = df_nov_4_08.orderBy(df_nov_4_08["ups"].desc())

In [22]:
best_comment = df_nov_4_08.first()

In [23]:
best_comment

Row(archived=True, author='[deleted]', author_flair_css_class=None, author_flair_text=None, body="Dear Rest of The World\n\nWe didn't fuck it up\n\nSigned,\n\nAmerica", controversiality=0, created_utc='1225858444', distinguished=None, downs=0, edited='false', gilded=0, id='c066uum', link_id='t3_7beo2', name='t1_c066uum', parent_id='t3_7beo2', retrieved_on=1425897518, score=2108, score_hidden=False, subreddit='politics', subreddit_id='t5_2cneq', ups=2108)

In [29]:
top_3_nov_4_08 = df_nov_4_08.take(3)

In [30]:
top_3_nov_4_08

[Row(archived=True, author='[deleted]', author_flair_css_class=None, author_flair_text=None, body="Dear Rest of The World\n\nWe didn't fuck it up\n\nSigned,\n\nAmerica", controversiality=0, created_utc='1225858444', distinguished=None, downs=0, edited='false', gilded=0, id='c066uum', link_id='t3_7beo2', name='t1_c066uum', parent_id='t3_7beo2', retrieved_on=1425897518, score=2108, score_hidden=False, subreddit='politics', subreddit_id='t5_2cneq', ups=2108),
 Row(archived=True, author='rockus', author_flair_css_class=None, author_flair_text=None, body='Dear America,\n\nCongrats!\n\nRegards,\nRest of the World', controversiality=0, created_utc='1225858825', distinguished=None, downs=0, edited='false', gilded=0, id='c066v41', link_id='t3_7beo2', name='t1_c066v41', parent_id='t1_c066uum', retrieved_on=1425897522, score=1143, score_hidden=False, subreddit='politics', subreddit_id='t5_2cneq', ups=1143),
 Row(archived=True, author='Hukeshy', author_flair_css_class=None, author_flair_text=None,

### Answer:

The most upvoted comment on Election Day 2008 was:
> Dear Rest of The World
>
> We didn't fuck it up
>
> Signed,
>
> America

which had 2108 upvotes. However, this comment was made by a deleted user.

The most upvoted comment with a known user was:
> Dear America,
> 
> Congrats!
>
> Regards,
> Rest of the World

which had 1143 upvotes and was made by u/rockus (who is still active on Reddit).

The third most upvoted comment was: 
> Dear Canada
>
> It will clearly be viewable from Russia.

All three of these comments were made on the r/politics subreddit and they all follow a similar format of addressing the comment with "Dear".

Since the actual top comment was made by a deleted user, we will analyze u/rockus comments in the next section.


## Top Comments: See top_comments.ipynb in the GitHub repo

## Ban Hammer
### Based on user activity, which which subreddits have been recently banned?

Look for subreddits which go from having many comments to suddenly having none. Maybe in one month they have many comments, then in the next month they suddenly don't have any comments. The question asks about "recently banned". I will define this to mean banned in the last month of data which is December 2020. So I will look for a subreddit which had many comments in November 2020 then zero comments in December 2020. This would likely mean the subreddit was banned sometime in November. Alternatively, we could just look at December 2020 comments and try to find a subreddit which was banned in 2020 by using the timestamps. Before a certain time there were many comments to that subreddit then after that time there were none. This would mean the subreddit was likely banned. We can use the reservoir sample data for this question because a banned subreddit would likely have many comments before getting banned so it will likely have comments in the sample.

In [2]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([StructField('subreddit', StringType(), nullable=True)])

In [3]:
df_nov_20_res = spark.read.schema(schema).json('hdfs://orion11:13001/RES-RC_2020-11.zst')

In [4]:
print(df_nov_20_res.take(2))

[Row(subreddit='AssassinsCreedValhala'), Row(subreddit='worldnews')]


In [5]:
df_dec_20_res = spark.read.schema(schema).json('hdfs://orion11:13001/RES-RC_2020-12.zst')

In [6]:
df_nov_count = df_nov_20_res.groupBy('subreddit').count()
df_dec_count = df_dec_20_res.groupBy('subreddit').count()

In [7]:
df_nov_count_name = df_nov_count.withColumnRenamed('count', 'nov_count')
df_dec_count_name = df_dec_count.withColumnRenamed('count', 'dec_count')

In [15]:
# full outer join

df_join = df_nov_count_name.join(df_dec_count_name, on='subreddit', how='full')

In [16]:
df_join.printSchema()

root
 |-- subreddit: string (nullable = true)
 |-- nov_count: long (nullable = true)
 |-- dec_count: long (nullable = true)



In [12]:
print(df_join.take(2))

[Row(subreddit='anime', nov_count=14061, dec_count=16185), Row(subreddit='travel', nov_count=878, dec_count=909)]


In [20]:
df_filter = df_join.filter((df_join.nov_count >= 100) & (df_join.dec_count.isNull()))

In [21]:
banned_list = df_filter.collect()

In [22]:
banned_list

[Row(subreddit='csci040temp', nov_count=1890, dec_count=None),
 Row(subreddit='DoomerGvng', nov_count=153, dec_count=None),
 Row(subreddit='Vindicta', nov_count=108, dec_count=None),
 Row(subreddit='dadswhodidnotwantpets', nov_count=138, dec_count=None),
 Row(subreddit='ChiefsvsRavensLiveTv', nov_count=165, dec_count=None),
 Row(subreddit='RadicalChristianity', nov_count=190, dec_count=None),
 Row(subreddit='YoungPrettyHoes', nov_count=255, dec_count=None),
 Row(subreddit='megnuttleaks', nov_count=170, dec_count=None),
 Row(subreddit='donkybooties', nov_count=192, dec_count=None),
 Row(subreddit='hftyty', nov_count=3230, dec_count=None),
 Row(subreddit='ismos', nov_count=137, dec_count=None),
 Row(subreddit='FitnessGuidesSharing', nov_count=455, dec_count=None),
 Row(subreddit='CallMeCarson', nov_count=275, dec_count=None)]