# UC Berkeley School of Information | MIDS W251 Final Project

Project Team:
* Dhaval Bhatt
* Tuhin Mahumad
* James Gray


## Mining Reddit to gain insight into the Presidential Candidates (Oct 2007 - Aug 2015)

1. **Volume**: How many reddit posts (subreddit politics) are there each year (2007-2015) that reference 2016 presidential candidates in a politics context? 
2. **Keywords**: What are the top 15 keywords each year (2007-2015) in the reddit posts that include the candidates? This may give us insight into key themes or areas of interest.
3. **Part of Speech**: What are the top nouns and adjectives within the reddit posts for these candidates?
4. **Sentiment**: What is the sentiment of the reddit posts for these candidates? 

# Setup and modules for both Spark and Dask Implementations

In [11]:
# import Python libraries
import os
import sys
import re
import collections
import json
from operator import add
import nltk
from nltk.corpus import stopwords
import time
import pandas as pd
from bokeh.charts import Bar, Line, show
from bokeh.io import output_notebook
from bokeh.charts.attributes import cat
import dask
import dask.bag as db
import pandas as pd
import matplotlib

# 4 Node Softlayer cluster configuration 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-7-oracle-cloudera/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/CDH/lib/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
os.environ["PYSPARK_PYTHON"] = "/opt/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.8.2.1-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [12]:
# These variables are used for both the Spark and Dask implementation

# store the number of yearly posts
df_data = {}
post_list = []
year_list = []
candidate_list = []
candidates = []
candidate_keywords = {}
performance = []
candidate_adj = {}
candidate_nouns = {}

# the 'created_utc' JSON attribute stores when the reddit record was created.  This can be used for filtering
# http://www.epochconverter.com/
epochTime = {'2007': '1167609600', '2008' : '1199145600', '2009': '1230768000', '2010': '1262304000',
             '2011': '1293840000', '2012': '1325376000', '2013': '1356998400', '2014': '1388534400', 
             '2015': '1420070400', '2016': '1451606400'};

candidates = ['Donald Trump','Hillary Clinton', 'Ted Cruz', 'Bernie Sanders']

## Functions natural language processing using NLTK

### NLTK Data Cleansing

In [13]:
def no_stopwords(x):
    from nltk.corpus import stopwords 
    if x not in stopwords.words('english'):
        return x

is_word = lambda x: re.search("^[0-9a-zA-Z]+$", x) is not None

### NTLK Part of Speech Tagging

In [14]:
def parse(record):
    import nltk
    tokens = nltk.word_tokenize(record["body"])
    record["n_words"] = len(tokens)
    record["pos"] = nltk.pos_tag(tokens)
    return record

### NLTK: Extracting POS and filtering words

In [15]:
def get_NN(record):
    import re
    from nltk.corpus import stopwords
    all_pos = record["pos"]  #originally "pos"
    ret = []
    for pos in all_pos:
        if pos[1] == "NN" \
        and pos[0] not in stopwords.words('english') \
        and re.search("^[0-9a-zA-Z]+$", pos[0]) is not None:
            ret.append(pos[0])
    return ret

def get_ADJ(record):
    import re
    from nltk.corpus import stopwords
    all_pos = record["pos"]  #originally "pos"
    ret = []
    for pos in all_pos:
        if pos[1] == "VBG" \
        and pos[0] not in stopwords.words('english') \
        and re.search("^[0-9a-zA-Z]+$", pos[0]) is not None:
            ret.append(pos[0])
    return ret

# SPARK IMPLEMENTATION

## Initialize environment and SparkContext

In [6]:
from pyspark import SparkConf
from pyspark import SparkContext

conf = SparkConf()
conf.setMaster('yarn-client')
conf.setAppName('reddit-json')
sc = SparkContext(conf=conf)

## Load Reddit JSON files from HDFS

The 1TB Reddit dataset is composed of JSON files for each month from October 2007 through August 2015.  This routine loads each JSON file into a RDD so that we can perform analyses on each month.

In [7]:
lines = sc.textFile("/user/root/RC_2007*.json")  # Read the entire JSON data set into one RDD

start_time = time.time()
# load JSON file into RDD

data = lines.map(json.loads)

elapsed_time = time.time() - start_time
print "total data load time = " + str(elapsed_time)

total data load time = 0.000216960906982


## Reduce data to the "r/politics" subreddit

In [8]:
start_time = time.time()
politics = data.filter(lambda x: x['subreddit'] == 'politics')

#politics.persist()

#num_politics_posts = politics.count()
elapsed_time = (time.time() - start_time)/60
print "Politics subreddit filter time = " + str(elapsed_time) + " minutes"

#print "Total number of politics posts = " + str(num_politics_posts)

Politics subreddit filter time = 0.000358502070109 minutes


In [None]:
#%time politics.take(1)
# politics.saveAsTextFile('hdfs://node1.austin.com:8020/user/root/rdd')

## Query and Analyze Politics subreddits that include Presidential Candidates

In [None]:
for candidate in candidates:
    
    print "starting to filter politics RDD for candidate " + candidate
    candidateRDD = politics.filter(lambda d: candidate in d['body'])
    #num_posts = candidateRDD.count()
    #print "Total number of subreddit posts with " + candidate + " = "  + str(num_posts)
    # save RDD to memory
    #candidateRDD.persist()

    # calculate the number of subreddits for each year
    for year in epochTime:
        #print year
        nextyear = int(year) + 1
        #print nextyear
        #print epochTime[year]
        if int(year) <= 2015: 
            query = candidateRDD.filter(lambda d: d['created_utc'] > epochTime[year] and
                                        d['created_utc'] < epochTime[str(nextyear)])
            reddit_posts = query.count()
            post_list.append(reddit_posts)
            year_list.append(year)
            candidate_list.append(candidate)
            print "Total number of " + str(year)+ " subreddit posts with " + candidate + " = "  + str(reddit_posts)

    df_data = {'year': year_list, 'posts': post_list, 'candidate': candidate_list}

    # store year/posts data in DataFrame for visualization
    df_candidate = pd.DataFrame(df_data, columns=('year', 'posts', 'candidate'))
    df_candidate.sort_values(['year','candidate'], ascending=True, inplace=True)
    
    # NLTK analysis - parse the reddit body attribute
    candidateRDD2 = candidateRDD.map(parse)
    
    # get keyword counts
    words = candidateRDD.flatMap(lambda x: nltk.word_tokenize(x))
    print words.take(2)
    
    words2 = words.map(lambda x: x.lower()) # transform tokens to lowercase
    words3 = words2.filter(no_stopwords) # remove stopwords
    words4 = words3.filter(is_word) # remove tokens that are not words
    #keyword_counts = words4.map(lambda word: (word,1)).reduceByKey(lambda x, y: x+y).map(lambda x: (x[1], x[0])).sortByKey(False)
                                
    #keyword_counts = words4.map(lambda word: (word,1).reduceByKey(lambda x, y: x+y) \
    #                    .map(lambda x: (x[1], x[0])).sortByKey(False)
    
    # get nouns
    nouns = candidateRDD2.flatMap(get_NN)
    counts_nouns = nouns.map(lambda word: (word, 1))

    #get adjectives
    adjectives = candidateRDD2.flatMap(get_ADJ)
    counts_adj = adjectives.map(lambda word: (word, 1))
    
    # top words
    top_nouns = counts_nouns.countByKey()
    top_adjectives = counts_adj.countByKey()
    #print top_nouns

    # convert pos into dict for DataFrame
    top_nouns = dict(top_nouns)
    top_adjectives = dict(top_adjectives)
    
    candidate_nouns[candidate] = top_nouns
    candidate_adj[candidate] = top_adjectives
    
    # convert to a DataFrame for visualization
    #df_adj = pd.DataFrame(top_nouns)
    #df_noun = pd.DataFrame(top_adjectives)

#df_candidate

#print "Percentage of politics subreddit posts with Donald Trump: " + str(query.count()*100/movies.count()) 

starting to filter politics RDD for candidate Donald Trump


In [None]:
#candidate_nouns

## Visualize Subreddit Volume Trend for Candidates 

In [None]:
output_notebook()

def visualize_barchart (df):

    bar2 = Bar(df_candidate, values='posts', label='year', stack='candidate',
          title="Total subreddits by Presidential Candidate ", legend='top_right')
    show(bar2)
    
visualize_barchart(df_candidate)

## Question #2 - Determine the Top N keywords associated with subreddit posts that contain Donald Trump

In [None]:
def get_keywords(data):
    import nltk
    bodies = data.map(lambda x: (x['body'])) #pull out the body 
    words = bodies.map(nltk.word_tokenize) # create tokens
    #print words.collect()[:1]
    words2 = words.map(lambda x: x.lower()) # transform tokens to lowercase
    words3 = words2.filter(no_stopwords) # remove stopwords
    words4 = words3.filter(is_word) # remove tokens that are not words
    counts = words4.map(lambda word: (word,1)) \
                   .reduceByKey(lambda x, y: x+y)  
                   #.map(lambda x: (x[1], x[0])).sortByKey(False)
    return counts

trump_keywords = get_keywords(trump)
trump_keywords.take(1)

#start_time = time.time()
#values = counts.collect()
#elapsed_time = time.time() - start_time
#print str(elapsed_time)
#print len(values)

# sort the keywords in decending order
#sort = sorted(values, key=lambda x: x[1], reverse=True)
#sort[:20]

## Question #3 - Visualize Top N nouns in subreddit posts for Presidential Candidates

In [None]:
# iterate over 
for candidate in candidate_nouns.iterkeys():
    # get keywords dict for each candidate
    print "Top nouns in " + candidate + " posts"
    keywords = candidate_nouns[candidate]
    df_nouns = pd.DataFrame(keywords.items(), columns=['Noun', 'Count'])
    
    df_nouns.sort_values('Count', ascending=False, inplace=True)
    df_top_nouns = df_nouns.head(15)
    print df_top_nouns
    #display(df_top_nouns)
    
#df = pd.DataFrame(top_nouns.items(), columns=['Noun', 'Count'])
# sort the data by count descending
#df = df.sort_values('Count', ascending=False)
# reduce DF to the top 10 nouns
#df_top_10 = df.head(10)
# df_top_10

## Question #3 - Visualize Top N adjectives in subreddit posts for Presidential Candidates

In [None]:
#%matplotlib inline

# iterate over 
for candidate in candidate_adj.iterkeys():
    # get keywords dict for each candidate
    print "Top adjectives in " + candidate + " posts"
    keywords = candidate_adj[candidate]
    df_adj = pd.DataFrame(keywords.items(), columns=['Adj', 'Count'])
    
    df_adj.sort_values('Count', ascending=False, inplace=True)
    df_top_adj = df_adj.head(15)
    print df_top_adj
    
    #df_top_adj.plot(kind='bar', x=df_top_adj['Adj'])
    

#df_adj = pd.DataFrame(top_adjectives.items(), columns=['Adjective', 'Count'])
# sort the data by count descending
#df_adj = df_adj.sort_values('Count', ascending=False)
# reduce DF to the top 10 adjectives
#dfadj_top_10 = df_adj.head(10)
#dfadj_top_10

In [None]:
%matplotlib inline

In [None]:
# chart the top 10 nouns
df_top_10.plot(kind='bar', x=df_top_10['Noun'])

## Visualize keywords with Bokeh

In [None]:
from bokeh.charts import Bar, show
from bokeh.io import output_notebook
from bokeh.charts.attributes import cat

output_notebook()

In [None]:
p = Bar(df_top_nouns,
        label=cat(columns='Noun', sort=False),
        values='Count',
        title='Top N nouns in r/movies subreddit')

In [None]:
show(p)

# DASK DISTRIBUTED IMPLEMENTATION

This section implements the same functionality as the Spark code above but in pure Python using Dask and distributed.  The purpose is to evaluate the code similarity, differences and performance.

* Github: https://github.com/dask
* distributed: https://distributed.readthedocs.org/en/latest/
* hdfs3: http://hdfs3.readthedocs.org/en/latest/
* Dask: http://dask.pydata.org/en/latest/

## Dask and Distributed Scheduler setup

In [16]:
import dask
from hdfs3 import HDFileSystem  # Python interface to HDFS; http://hdfs3.readthedocs.org/en/latest/
from distributed import Executor, hdfs, progress 

# initialize a connection to the distributed executor
e = Executor('127.0.0.1:8786')
e

<Executor: scheduler=127.0.0.1:8786 workers=48 threads=48>

In [17]:
# https://distributed.readthedocs.org/en/latest/api.html#distributed.executor.Executor.restart
# restart the distributed executor
e.restart()

<Executor: scheduler=127.0.0.1:8786 workers=48 threads=48>

## Load Reddit JSON data from HDFS

In [18]:
# initialize Python connection to HDFS on headnode
hdfs_connection = HDFileSystem('node1.austin.com', port=8020)  #Softlayer headnode

# initialize a Dask bag from the HDFS data
lines = hdfs.read_text('/user/root/RC_20*.json', hdfs=hdfs_connection) 

# Execute read to create Bag
start_time = time.time()
jsonDask = lines.filter(None).map(json.loads)
jsonDask
elapsed_time = (time.time() - start_time)/60
print "JSON data load time = " + str(elapsed_time) + ' minutes'

Setting global dask scheduler to use distributed
JSON data load time = 0.00212588310242 minutes


## Reduce Reddit data to "Politics" subreddit

In [19]:
# filter out all of the posts in the 'politics' subreddit
start_time = time.time()
politicsDask = jsonDask.filter(lambda d: 'politics' in d['subreddit'])

#politicsDask = e.persist(jsonDask.filter(lambda d: 'politics' in d['subreddit']))

# save Bag to memory for fast look up
#politicsDask = e.persist(politicsDask)
progress(politicsDask)

# count the total number of subreddit posts
#num_politics_posts = politicsDask.count().compute() # Bags use the .compute() method to trigger computation
elapsed_time = (time.time() - start_time)/60
print "Politics filter time = " + str(elapsed_time) + " minutes"

#print "Number of politics subreddits = " + str(num_politics_posts)

Politics filter time = 0.0148902177811 minutes


In [None]:
#% time politicsDask.take(2)[1]

## Query and Analyze Politics subreddits that include Presidential Candidates

In [20]:
for candidate in candidates:

    print "starting to filter politics RDD for candidate " + candidate
    #query = politicsDask.filter(lambda d: candidate in d['body'])
    
    #candidateDask

    #candidateDask = e.persist(politicsDask.filter(lambda d: candidate in d['body']))  # original
    candidateDask = politicsDask.filter(lambda d: candidate in d['body'])  # original
    
    # execute computation using distributed
    # returns a Dask Future https://distributed.readthedocs.org/en/latest/api.html#future
    start_time = time.time()
    posts = candidateDask.count() # new version
    future = e.compute(posts)
    #future = e.compute(candidateDask) - working
    progress(future)
    #num_posts = candidateDask.count().compute() - working
    
    num_posts = future.result()  # new version
    
    elapsed_time = (time.time() - start_time)/60
    print "candidate filter time for " + candidate + '= ' + str(elapsed_time) + " minutes"
    print "Total number of subreddits that include " + candidate + " = " + str(num_posts)

    # save Bag to memory for fast look up
    candidateDask = e.persist(candidateDask)
    progress(candidateDask)

# calculate the number of subreddits for each year
    print "Calculating number of subreddits each year for " + candidate
    for year in epochTime:
        #print year
        nextyear = int(year) + 1
        #print nextyear
        #print epochTime[year]
        if int(year) <= 2015: 
            query = candidateDask.filter(lambda d: d['created_utc'] > epochTime[year] and
                                 d['created_utc'] < epochTime[str(nextyear)])
            
            
            #reddit_posts = query.count().compute()  # working
            subreddits = query.count()  #new
            result = e.compute(subreddits)                #new
            reddit_posts = result.result()   #new
            
            post_list.append(reddit_posts)
            year_list.append(year)
            candidate_list.append(candidate)
            #print "Total number of " + str(year)+ " subreddit posts with " + candidate + " = "  + str(reddit_posts)
        
    df_data = {'year': year_list, 'posts': post_list, 'candidate': candidate_list}

    # store year/posts data in DataFrame for visualization
    df_candidate = pd.DataFrame(df_data, columns=('year', 'posts', 'candidate'))
    df_candidate.sort_values(['year','candidate'], ascending=True, inplace=True)

    
# determine top keywords for each presidential candidate

    print "determining top keywords for " + candidate
    # pull out the body attribute data from the Dask bag
    bodies = candidateDask.pluck('body')
    # create tokens from the body records and concatentate the results
    words = bodies.map(nltk.word_tokenize).concat()
    # convert all text to lowercase to improve keyword analysis
    words2 = words.map(lambda x: x.lower())
    # remove common stop words to improve keyword analysis
    words3 = words2.filter(no_stopwords)
    # filterd out any tokens that are not words 
    words4 = words3.filter(is_word)
    # calculate the frequencies of each word (http://dask.pydata.org/en/latest/bag.html#dask.bag.core.Bag.frequencies)
    wordcounts = words4.frequencies()  # creates tuple of word, count

    # Execute computation
    start_time = time.time()
    future = e.compute(wordcounts)  #try
    #progress(future)
    #values = wordcounts.compute() # working
    values = future.result() # try
    elapsed_time = (time.time() - start_time)/60 
    print "time to compute keyword counts = " + str(elapsed_time) + " minutes"
    print ""

    # sort the bag returning a list of tuples
    sort = sorted(values, key=lambda x: x[1], reverse=True)
    topkeywords = sort[:15]
    topkeywords
    candidate_keywords[candidate] = topkeywords
    df_keywords = pd.DataFrame(topkeywords, columns=['Keyword', 'Count'])
    df_keywords

starting to filter politics RDD for candidate Donald Trump
candidate filter time for Donald Trump= 87.5447606683 minutes
Total number of subreddits that include Donald Trump = 7653
Calculating number of subreddits each year for Donald Trump
determining top keywords for Donald Trump
time to compute keyword counts = 0.393258547783 minutes

starting to filter politics RDD for candidate Hillary Clinton
candidate filter time for Hillary Clinton= 77.0863475362 minutes
Total number of subreddits that include Hillary Clinton = 16653
Calculating number of subreddits each year for Hillary Clinton
determining top keywords for Hillary Clinton
time to compute keyword counts = 0.618770515919 minutes

starting to filter politics RDD for candidate Ted Cruz
candidate filter time for Ted Cruz= 75.704826951 minutes
Total number of subreddits that include Ted Cruz = 11514
Calculating number of subreddits each year for Ted Cruz
determining top keywords for Ted Cruz
time to compute keyword counts = 0.384821

In [90]:
df_candidate.to_csv('candidateposts.csv')
df_keywords.to_csv('candidatekeywords.csv')


In [None]:
type(topkeywords)

## Question #1 (Volume) - Visualize/Analyze Posts by Presidential Candidates Over 2007-2015

In [23]:
output_notebook()

p = Bar(df_candidate,
        label=cat(columns='year', sort=True),
        #label =['candidates'],
        values='posts',
        title="Total Politics Subreddit Posts that include Donald Trump")

#lineChart = Line(df_candidate, ylabel='posts')

#line = Line(xyvalues, title="line", legend="top_left", ylabel='Languages')

bar2 = Bar(df_candidate, values='posts', label='year', stack='candidate',
          title="Total subreddits by Presidential Candidate ", legend='top_left')

#show(p)
show(bar2)

In [89]:
df_bernie = df_candidate[df_candidate.candidate == 'Bernie Sanders']
df_clinton = df_candidate[df_candidate.candidate == 'Hillary Clinton']
df_trump = df_candidate[df_candidate.candidate == 'Donald Trump']
df_cruz = df_candidate[df_candidate.candidate == 'Ted Cruz']
df = df_candidate[['posts','candidate', 'year']]
print df
from bokeh.io import gridplot
from bokeh.plotting import figure

line_bernie = Line(df_bernie, x='year', y='posts', ylabel='posts', title='Yearly Reddit Posts - Bernie Sanders',
                  width=200)
line_clinton = Line(df_clinton, x='year', y='posts', ylabel='posts', title='Yearly Reddit Posts - Hillary Clinton')
line_trump = Line(df_trump, x='year', y='posts', ylabel='posts', title='Yearly Reddit Posts - Donald Trump')
line_cruz = Line(df_cruz, x='year', y='posts', ylabel='posts', title='Yearly Reddit Posts - Ted Cruz')

#r = figure(plot_width=400, plot_height=400)
#r.multi_line = (df) 
#line = Line(xyvalues, title="line", legend="top_left")

p = gridplot([[lineChart_bernie, lineChart_clinton],[lineChart_trump, lineChart_cruz]])
show(p)
#show(r)

    posts        candidate  year
27      0   Bernie Sanders  2007
0       5     Donald Trump  2007
9     384  Hillary Clinton  2007
18      0         Ted Cruz  2007
30     52   Bernie Sanders  2008
3      24     Donald Trump  2008
12   2144  Hillary Clinton  2008
21      0         Ted Cruz  2008
31    155   Bernie Sanders  2009
4      27     Donald Trump  2009
13    347  Hillary Clinton  2009
22      0         Ted Cruz  2009
33    530   Bernie Sanders  2010
6      58     Donald Trump  2010
15    605  Hillary Clinton  2010
24      0         Ted Cruz  2010
32   1687   Bernie Sanders  2011
5    1264     Donald Trump  2011
14    914  Hillary Clinton  2011
23      1         Ted Cruz  2011
35   1152   Bernie Sanders  2012
8    1270     Donald Trump  2012
17   1569  Hillary Clinton  2012
26    106         Ted Cruz  2012
34   1770   Bernie Sanders  2013
7     432     Donald Trump  2013
16   1781  Hillary Clinton  2013
25   3584         Ted Cruz  2013
29   2230   Bernie Sanders  2014
2     216 

In [None]:
# [x['body'] for x in result.result()][:2]

## Question #2 - Visualize Top 15 Keywords for Presidential Candidates

In [22]:
candidate_keywords

for candidate in candidate_keywords.iterkeys():
    df = pd.DataFrame(candidate_keywords[candidate], columns=['Keyword', 'Count'])
    df_top = df.head(15)
    print "Top keywords for " + candidate
    print df_top #df.plot(table=True)
    

Top keywords for Donald Trump
       Keyword  Count
0        trump   9940
1       donald   8310
2       people   2962
3         like   2390
4        would   2352
5           gt   1995
6        think   1552
7          one   1359
8          get   1303
9        obama   1213
10        even   1134
11   president   1080
12  republican   1075
13       money   1074
14        make    996
Top keywords for Bernie Sanders
    Keyword  Count
0   sanders  28095
1    bernie  27549
2    people   9819
3     would   8774
4      like   8654
5        gt   7034
6     think   5422
7       get   4868
8       one   4764
9      vote   4653
10     even   4167
11    party   3957
12  hillary   3881
13    right   3625
14     http   3432
Top keywords for Hillary Clinton
      Keyword  Count
0     clinton  23659
1     hillary  21592
2          gt   8401
3       obama   8216
4       would   7796
5      people   6680
6        http   5955
7        like   5930
8   president   4685
9         one   4495
10      think   44