In [1]:
import pandas as pd
import json

In [2]:
from pyspark.sql.functions import col

# December

In [3]:
december_raw_comments = spark.read.json("s3a://historical-reddit-comments/Dec_2016/*")

In [4]:
december_raw_comments.cache()

DataFrame[author: string, author_flair_css_class: string, author_flair_text: string, body: string, controversiality: bigint, created_utc: bigint, distinguished: string, edited: string, gilded: bigint, id: string, link_id: string, parent_id: string, retrieved_on: bigint, score: bigint, stickied: boolean, subreddit: string, subreddit_id: string]

In [5]:
december_raw_comments.count()

72942967

In [6]:
december_raw_comments.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)



In [7]:
december_clean_comments = december_raw_comments.filter(december_raw_comments['body'] != '[deleted]')
december_clean_comments.cache()
december_clean_comments.count()

69895797

In [8]:
december_top_subreddits = december_clean_comments.groupBy(december_clean_comments['subreddit']).count().sort(col("count").desc())

In [9]:
december_top_subreddits.show()

+--------------------+-------+
|           subreddit|  count|
+--------------------+-------+
|           AskReddit|4855257|
|            politics|1717930|
|          The_Donald|1212355|
|                 nfl| 984445|
|RocketLeagueExchange| 807611|
|           worldnews| 723430|
|                 nba| 625852|
|     leagueoflegends| 596877|
|               funny| 592208|
|                news| 589182|
|                 CFB| 544883|
|              videos| 538078|
|                pics| 523558|
|           Overwatch| 513436|
|        pcmasterrace| 483865|
|              gaming| 465520|
|       todayilearned| 454935|
|       pokemontrades| 438278|
|       SquaredCircle| 414952|
|               DotA2| 405002|
+--------------------+-------+
only showing top 20 rows



In [10]:
dec_10 = december_top_subreddits.limit(10).toPandas()

In [11]:
december_comment_user = december_clean_comments.selectExpr('author',
                                                           'created_utc', 
                                                           'subreddit', 
                                                           'body')


#december_comment_user.write.parquet("s3a://historical-reddit-comments/historical-parquet/december_user_comments")


In [12]:
december_subreddits = december_clean_comments.selectExpr('subreddit_id',
                                                         'subreddit')

#december_subreddits.write.parquet("s3a://historical-reddit-comments/historical-parquet/december_subreddits")

# January

In [13]:
january_raw_comments = spark.read.json("s3a://historical-reddit-comments/Jan_2017/*")

In [14]:
january_raw_comments.cache()

DataFrame[author: string, author_flair_css_class: string, author_flair_text: string, body: string, controversiality: bigint, created_utc: bigint, distinguished: string, edited: string, gilded: bigint, id: string, link_id: string, parent_id: string, retrieved_on: bigint, score: bigint, stickied: boolean, subreddit: string, subreddit_id: string]

In [15]:
january_raw_comments.count()

78946585

In [16]:
january_clean_comments = january_raw_comments.filter(january_raw_comments['body'] != '[deleted]')
january_clean_comments.cache()
january_clean_comments.count()

75673703

In [17]:
january_top_subreddits = january_clean_comments.groupBy(january_clean_comments['subreddit']).count().sort(col("count").desc())

In [18]:
jan_10 = january_top_subreddits.limit(10).toPandas()

In [19]:
january_comment_user = january_clean_comments.selectExpr('author',
                                                         'created_utc', 
                                                         'subreddit', 
                                                         'body')




In [20]:
january_comment_user.show()

+------------------+-----------+-------------------+--------------------+
|            author|created_utc|          subreddit|                body|
+------------------+-----------+-------------------+--------------------+
|     captnkaposzta| 1483228800|                 de|Beileid? Kiwi Fer...|
|       CampyJejuni| 1483228800|    TwoXChromosomes|Wrong subreddit m...|
|     Luigimario280| 1483228800|BikiniBottomTwitter|              Karma!|
|MRA-automatron-2kb| 1483228800|              MGTOW|Oh... thanks for ...|
|            Godcon| 1483228800|       minipainting|Do you buy the mi...|
|           fendaar| 1483228800|      tipofmytongue|              Rule 7|
|        Fleetthrow| 1483228800|                gis|Can I ask where y...|
|         music4mic| 1483228800|          SeattleWA|They may just pul...|
|        FatalTouch| 1483228800|              Games|Its truly a great...|
|       youngchinox| 1483228800|              funny|Hue... hue... she...|
|            stew22| 1483228800|      

In [21]:
#january_comment_user.write.parquet("s3a://historical-reddit-comments/historical-parquet/january_user_comments1")

In [22]:
january_subreddits = january_clean_comments.selectExpr('subreddit_id',
                                                       'subreddit')



In [23]:
#january_subreddits.write.parquet("s3a://historical-reddit-comments/historical-parquet/january_subreddits")

In [24]:
january_subreddits.show()

+------------+-------------------+
|subreddit_id|          subreddit|
+------------+-------------------+
|     t5_22i0|                 de|
|    t5_2r2jt|    TwoXChromosomes|
|    t5_3deqz|BikiniBottomTwitter|
|    t5_2sjgc|              MGTOW|
|    t5_2scss|       minipainting|
|    t5_2r4oc|      tipofmytongue|
|    t5_2qmpb|                gis|
|    t5_2vbli|          SeattleWA|
|    t5_2qhwp|              Games|
|    t5_2qh33|              funny|
|    t5_2ror6|       StudentLoans|
|    t5_2sgoq|         streetwear|
|    t5_2qm35|            Romania|
|    t5_2qo4s|                nba|
|    t5_2qh3v|             bestof|
|    t5_2qyz1|    ChineseLanguage|
|    t5_2r4oc|      tipofmytongue|
|    t5_2qiel|             hockey|
|    t5_2qhsa|  interestingasfuck|
|    t5_3c35m|        Shadowverse|
+------------+-------------------+
only showing top 20 rows



## new data

In [25]:
new = spark.read.json("s3a://reddit-comments-a/2017/03/08/*/*")

In [26]:
new.cache()

DataFrame[approved: boolean, author: string, author_cakeday: boolean, author_flair_css_class: string, author_flair_text: string, body: string, created_utc: bigint, id: string, link_id: string, name: string, parent_id: string, removed: boolean, spam: boolean, stickied: boolean, subreddit: string, subreddit_id: string, subreddit_name_prefixed: string, subreddit_type: string, url: string]

In [27]:
new.count()

392764

In [28]:
new.printSchema()

root
 |-- approved: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- removed: boolean (nullable = true)
 |-- spam: boolean (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit_name_prefixed: string (nullable = true)
 |-- subreddit_type: string (nullable = true)
 |-- url: string (nullable = true)



In [29]:
clean_new = new.filter(new['body'] != '[deleted]')

In [30]:
clean_new = new.filter(new['body'] != '[removed]')

In [31]:
clean_new.cache()

DataFrame[approved: boolean, author: string, author_cakeday: boolean, author_flair_css_class: string, author_flair_text: string, body: string, created_utc: bigint, id: string, link_id: string, name: string, parent_id: string, removed: boolean, spam: boolean, stickied: boolean, subreddit: string, subreddit_id: string, subreddit_name_prefixed: string, subreddit_type: string, url: string]

In [32]:
clean_new.count()

387767

In [39]:
comments = sc.textFile("s3a://reddit-comments-a/2017/03/08/*/*")

In [40]:
jsonrdd = comments.flatMap(lambda x: json.loads(x))\
                  .map(lambda comment: (comment['subreddit'], 1))\
                  .reduceByKey(lambda a, b: a+b)\
                  .sortBy(lambda a: a[1], ascending=False).toDF()

In [42]:
subreddit_pandas = jsonrdd.toPandas()

In [43]:
subreddit_pandas.columns

Index([u'_1', u'_2'], dtype='object')

In [44]:
subreddit_pandas.columns = ['Subreddit', 'Comments']

In [45]:
subreddit_pandas.head(10)

Unnamed: 0,Subreddit,Comments
0,AskReddit,23477
1,politics,7774
2,soccer,6314
3,The_Donald,5996
4,pics,5058
5,nfl,4465
6,worldnews,4088
7,leagueoflegends,3484
8,NintendoSwitch,3387
9,Overwatch,3380
