In [1]:
#%install_ext https://raw.github.com/cpcloud/ipython-autotime/master/autotime.py
%load_ext autotime

# Homework 2

In this homework, we are going to play with Twitter data.

The data is represented as rows of of [JSON](https://en.wikipedia.org/wiki/JSON#Example) strings.
It consists of [tweets](https://dev.twitter.com/overview/api/tweets), [messages](https://dev.twitter.com/streaming/overview/messages-types), and a small amount of broken data (cannot be parsed as JSON).

For this homework, we will only focus on tweets and ignore all other messages.

# UPDATES

## Announcement

**We changed the test files size and the corresponding file paths.**

In order to avoid long waiting queue, we decided to limit the input files size for the Playground submissions. Please read the following files to get the input file paths:
    * 1GB test: `../Data/hw2-files-1gb.txt`
    * 5GB test: `../Data/hw2-files-5gb.txt`
    * 20GB test: `../Data/hw2-files-20gb.txt`

**We updated the json parsing section of this notebook.**

Python built-in json library is too slow. In our experiment, 70% of the total running time is spent on parsing tweets. Therefore we recommend using [ujson](https://pypi.python.org/pypi/ujson) instead of json. It is at least 15x faster than the built-in json library according to our tests.


## Important Reminders

1. The tokenizer in this notebook contains UTF-8 characters. So the first line of your `.py` source code must be `# -*- coding: utf-8 -*-` to define its encoding. Learn more about this topic [here](https://www.python.org/dev/peps/pep-0263/).
2. The input files (the tweets) contain UTF-8 characters. So you have to correctly encode your input with some function like `lambda text: text.encode('utf-8')`.
3. `../Data/hw2-files-<param>` may contain multiple lines, one line for one input file. You can use a single textFile call to read multiple files: `sc.textFile(','.join(files))`.
4. The input file paths in `../Data/hw2-files-<param>` contains trailing spaces (newline etc.), which may confuse HDFS if not removed. 
5. Your program will be killed if it cannot finish in 5 minutes. The running time of last 100 submissions (yours and others) can be checked at the "View last 100 jobs" tab. For your information, here is the running time of our solution:
   * 1GB test:  53 seconds,
   * 5GB test:  60 seconds,
   * 20GB test: 114 seconds.





## Tweets

A tweet consists of many data fields. [Here is an example](https://gist.github.com/arapat/03d02c9b327e6ff3f6c3c5c602eeaf8b). You can learn all about them in the Twitter API doc. We are going to briefly introduce only the data fields that will be used in this homework.

* `created_at`: Posted time of this tweet (time zone is included)
* `id_str`: Tweet ID - we recommend using `id_str` over using `id` as Tweet IDs, becauase `id` is an integer and may bring some overflow problems.
* `text`: Tweet content
* `user`: A JSON object for information about the author of the tweet
    * `id_str`: User ID
    * `name`: User name (may contain spaces)
    * `screen_name`: User screen name (no spaces)
* `retweeted_status`: A JSON object for information about the retweeted tweet (i.e. this tweet is not original but retweeteed some other tweet)
    * All data fields of a tweet except `retweeted_status`
* `entities`: A JSON object for all entities in this tweet
    * `hashtags`: An array for all the hashtags that are mentioned in this tweet
    * `urls`: An array for all the URLs that are mentioned in this tweet


## Data source

All tweets are collected using the [Twitter Streaming API](https://dev.twitter.com/streaming/overview).


## Users partition

Besides the original tweets, we will provide you with a Pickle file, which contains a partition over 452,743 Twitter users. It contains a Python dictionary `{user_id: partition_id}`. The users are partitioned into 7 groups.

# Part 0: Load data to a RDD

The tweets data is stored on AWS S3. We have in total a little over 1 TB of tweets. We provide 10 MB of tweets for your local development. For the testing and grading on the homework server, we will use different data.

## Testing on the homework server
In the Playground, we provide three different input sizes to test your program: 1 GB, 10 GB, and 100 GB. To test them, read files list from `../Data/hw2-files-1gb.txt`, `../Data/hw2-files-5gb.txt`, `../Data/hw2-files-20gb.txt`, respectively.

For final submission, make sure to read files list from `../Data/hw2-files-final.txt`. Otherwise your program will receive no points.

## Local test

For local testing, read files list from `../Data/hw2-files.txt`.
Now let's see how many lines there are in the input files.

1. Make RDD from the list of files in `hw2-files.txt`.
2. Mark the RDD to be cached (so in next operation data will be loaded in memory) 
3. call the `print_count` method to print number of lines in all these files

It should print
```
Number of elements: 2193
```

In [8]:
def print_count(rdd):
    print 'Number of elements:', rdd.count()


time: 1.37 ms


1) 1.87 s original. <br> 
2) .144 s  <br> 

In [10]:
with open('data/hw2-files.txt') as f:
    files = [line.strip() for line in f]
    print files
rawRDD = sc.textFile(','.join(files)).cache()
print_count(rawRDD)

 ['../Data/hw2-input.txt']
Number of elements:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/mgalarny/Documents/youtube/Data/hw2-input.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


time: 101 ms


# Part 1: Parse JSON strings to JSON objects

Python has built-in support for JSON.

**UPDATE:** Python built-in json library is too slow. In our experiment, 70% of the total running time is spent on parsing tweets. Therefore we recommend using [ujson](https://pypi.python.org/pypi/ujson) instead of json. It is at least 15x faster than the built-in json library according to our tests.

In [None]:
import ujson

json_example = '''
{
    "id": 1,
    "name": "A green door",
    "price": 12.50,
    "tags": ["home", "green"]
}
'''

json_obj = ujson.loads(json_example)
json_obj

## Broken tweets and irrelevant messages

The data of this assignment may contain broken tweets (invalid JSON strings). So make sure that your code is robust for such cases.

In addition, some lines in the input file might not be tweets, but messages that the Twitter server sent to the developer (such as [limit notices](https://dev.twitter.com/streaming/overview/messages-types#limit_notices)). Your program should also ignore these messages.

*Hint:* [Catch the ValueError](http://stackoverflow.com/questions/11294535/verify-if-a-string-is-json-in-python)


(1) Parse raw JSON tweets to obtain valid JSON objects. From all valid tweets, construct a pair RDD of `(user_id, text)`, where `user_id` is the `id_str` data field of the `user` dictionary (read [Tweets](#Tweets) section above), `text` is the `text` data field.

starting time: 10.9 ms <br>


In [None]:
import ujson

def safe_parse(raw_string):
    
    #Check if ujson is loaded properly
    try:
        parsed = ujson.loads(raw_string)
    except ValueError, e:
        return '0', 'INVALID'
    
    #Check if ujson has both 'user' and 'text' in ujson object
    if 'text' in parsed and 'user' in parsed: 
            return parsed['user']['id_str'].encode('utf-8'), parsed['text'].encode('utf-8')    
    else:
        return '0', 'INVALID'

twtRDD = rawRDD.map(lambda x: safe_parse(x)).filter(lambda x: x[0] != '0' or x[1] != 'INVALID').cache()

In [None]:
import ujson

def safe_parse(raw_string):
    
    #Check if ujson is loaded properly
    try:
        parsed = ujson.loads(raw_string)
    except ValueError, e:
        return '0', 'INVALID'
    
    #Check if ujson has both 'user' and 'text' in ujson object
    if 'text' in parsed and 'user' in parsed: 
            return parsed['user']['id_str'].encode('utf-8'), parsed['text'].encode('utf-8')    
    else:
        return '0', 'INVALID'

#twtRDD = rawRDD.map(lambda x: safe_parse(x)).filter(lambda x: x[0] != '0' or x[1] != 'INVALID').cache()

(2) Count the number of different users in all valid tweets (hint: [the `distinct()` method](https://spark.apache.org/docs/latest/programming-guide.html#transformations)).

It should print
```
The number of unique users is: 2083
```

In [None]:
def print_users_count(count):
    print 'The number of unique users is:', count

starting time: 87.9 ms <br>

In [None]:
count = twtRDD.map(lambda x:x[0]).distinct().count()
print_users_count(count)

# Part 2: Number of posts from each user partition

Load the Pickle file `../Data/users-partition.pickle`, you will get a dictionary which represents a partition over 452,743 Twitter users, `{user_id: partition_id}`. The users are partitioned into 7 groups. For example, if the dictionary is loaded into a variable named `partition`, the partition ID of the user `59458445` is `partition["59458445"]`. These users are partitioned into 7 groups. The partition ID is an integer between 0-6.

Note that the user partition we provide doesn't cover all users appear in the input data.

(1) Load the pickle file.

original timing: 2.51s

In [None]:
import pickle

with open('../Data/users-partition.pickle', 'r') as pickle_file:
    partitionFile = pickle.load(pickle_file)
partition_bc = sc.broadcast(partitionFile)

(2) Count the number of posts from each user partition

Count the number of posts from group 0, 1, ..., 6, plus the number of posts from users who are not in any partition. Assign users who are not in any partition to the group 7.

Put the results of this step into a pair RDD `(group_id, count)` that is sorted by key.

original timing: 677 ms <br>


In [None]:
def getPartitionID(user):
    if partition_bc.value.has_key(user): return partition_bc.value[user]
    return 7

#Count group by value
counts = twtRDD.map(lambda x:getPartitionID(x[0])).countByValue().items()

(3) Print the post count using the `print_post_count` function we provided.

It should print

```
Group 0 posted 81 tweets
Group 1 posted 199 tweets
Group 2 posted 45 tweets
Group 3 posted 313 tweets
Group 4 posted 86 tweets
Group 5 posted 221 tweets
Group 6 posted 400 tweets
Group 7 posted 798 tweets
```

In [None]:
def print_post_count(counts):
    for group_id, count in counts:
        print 'Group %d posted %d tweets' % (group_id, count)

In [None]:
print_post_count(counts)

# Part 3:  Tokens that are relatively popular in each user partition

In this step, we are going to find tokens that are relatively popular in each user partition.

We define the number of mentions of a token $t$ in a specific user partition $k$ as the number of users from the user partition $k$ that ever mentioned the token $t$ in their tweets. Note that even if some users might mention a token $t$ multiple times or in multiple tweets, a user will contribute at most 1 to the counter of the token $t$.

Please make sure that the number of mentions of a token is equal to the number of users who mentioned this token but NOT the number of tweets that mentioned this token.

Let $N_t^k$ be the number of mentions of the token $t$ in the user partition $k$. Let $N_t^{all} = \sum_{i=0}^7 N_t^{i}$ be the number of total mentions of the token $t$.

We define the relative popularity of a token $t$ in a user partition $k$ as the log ratio between $N_t^k$ and $N_t^{all}$, i.e. 

\begin{equation}
p_t^k = \log \frac{C_t^k}{C_t^{all}}.
\end{equation}


You can compute the relative popularity by calling the function `get_rel_popularity`.

(0) Load the tweet tokenizer.

In [None]:
# %load happyfuntokenizing.py
#!/usr/bin/env python

"""
This code implements a basic, Twitter-aware tokenizer.

A tokenizer is a function that splits a string of text into words. In
Python terms, we map string and unicode objects into lists of unicode
objects.

There is not a single right way to do tokenizing. The best method
depends on the application.  This tokenizer is designed to be flexible
and this easy to adapt to new domains and tasks.  The basic logic is
this:

1. The tuple regex_strings defines a list of regular expression
   strings.

2. The regex_strings strings are put, in order, into a compiled
   regular expression object called word_re.

3. The tokenization is done by word_re.findall(s), where s is the
   user-supplied string, inside the tokenize() method of the class
   Tokenizer.

4. When instantiating Tokenizer objects, there is a single option:
   preserve_case.  By default, it is set to True. If it is set to
   False, then the tokenizer will downcase everything except for
   emoticons.

The __main__ method illustrates by tokenizing a few examples.

I've also included a Tokenizer method tokenize_random_tweet(). If the
twitter library is installed (http://code.google.com/p/python-twitter/)
and Twitter is cooperating, then it should tokenize a random
English-language tweet.

"""

######################################################################

import re
import htmlentitydefs

######################################################################
# The following strings are components in the regular expression
# that is used for tokenizing. It's important that phone_number
# appears first in the final regex (since it can contain whitespace).
# It also could matter that tags comes after emoticons, due to the
# possibility of having text like
#
#     <:| and some text >:)
#
# Most imporatantly, the final element should always be last, since it
# does a last ditch whitespace-based tokenization of whatever is left.

# This particular element is used in a couple ways, so we define it
# with a name:
emoticon_string = r"""
    (?:
      [<>]?
      [:;=8]                     # eyes
      [\-o\*\']?                 # optional nose
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth      
      |
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
      [\-o\*\']?                 # optional nose
      [:;=8]                     # eyes
      [<>]?
    )"""

# The components of the tokenizer:
regex_strings = (
    # Phone numbers:
    r"""
    (?:
      (?:            # (international)
        \+?[01]
        [\-\s.]*
      )?            
      (?:            # (area code)
        [\(]?
        \d{3}
        [\-\s.\)]*
      )?    
      \d{3}          # exchange
      [\-\s.]*   
      \d{4}          # base
    )"""
    ,
    # URLs:
    r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
    ,
    # Emoticons:
    emoticon_string
    ,    
    # HTML tags:
     r"""<[^>]+>"""
    ,
    # Twitter username:
    r"""(?:@[\w_]+)"""
    ,
    # Twitter hashtags:
    r"""(?:\#+[\w_]+[\w\'_\-]*[\w_]+)"""
    ,
    # Remaining word types:
    r"""
    (?:[a-z][a-z'\-_]+[a-z])       # Words with apostrophes or dashes.
    |
    (?:[+\-]?\d+[,/.:-]\d+[+\-]?)  # Numbers, including fractions, decimals.
    |
    (?:[\w_]+)                     # Words without apostrophes or dashes.
    |
    (?:\.(?:\s*\.){1,})            # Ellipsis dots. 
    |
    (?:\S)                         # Everything else that isn't whitespace.
    """
    )

######################################################################
# This is the core tokenizing regex:
    
word_re = re.compile(r"""(%s)""" % "|".join(regex_strings), re.VERBOSE | re.I | re.UNICODE)

# The emoticon string gets its own regex so that we can preserve case for them as needed:
emoticon_re = re.compile(regex_strings[1], re.VERBOSE | re.I | re.UNICODE)

# These are for regularizing HTML entities to Unicode:
html_entity_digit_re = re.compile(r"&#\d+;")
html_entity_alpha_re = re.compile(r"&\w+;")
amp = "&amp;"

######################################################################

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = unicode(s)
        except UnicodeDecodeError:
            s = str(s).encode('string_escape')
            s = unicode(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = map((lambda x : x if emoticon_re.search(x) else x.lower()), words)
        return words

    def tokenize_random_tweet(self):
        """
        If the twitter library is installed and a twitter connection
        can be established, then tokenize a random tweet.
        """
        try:
            import twitter
        except ImportError:
            print "Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/"
        from random import shuffle
        api = twitter.Api()
        tweets = api.GetPublicTimeline()
        if tweets:
            for tweet in tweets:
                if tweet.user.lang == 'en':            
                    return self.tokenize(tweet.text)
        else:
            raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")

    def __html2unicode(self, s):
        """
        Internal metod that seeks to replace all the HTML entities in
        s with their corresponding unicode characters.
        """
        # First the digits:
        ents = set(html_entity_digit_re.findall(s))
        if len(ents) > 0:
            for ent in ents:
                entnum = ent[2:-1]
                try:
                    entnum = int(entnum)
                    s = s.replace(ent, unichr(entnum))	
                except:
                    pass
        # Now the alpha versions:
        ents = set(html_entity_alpha_re.findall(s))
        ents = filter((lambda x : x != amp), ents)
        for ent in ents:
            entname = ent[1:-1]
            try:            
                s = s.replace(ent, unichr(htmlentitydefs.name2codepoint[entname]))
            except:
                pass                    
            s = s.replace(amp, " and ")
        return s

In [None]:
from math import log

tok = Tokenizer(preserve_case=False)

def get_rel_popularity(c_k, c_all):
    return log(1.0 * c_k / c_all) / log(2)


def print_tokens(tokens, gid = None):
    group_name = "overall"
    if gid is not None:
        group_name = "group %d" % gid
    print '=' * 5 + ' ' + group_name + ' ' + '=' * 5
    for t, n in tokens:
        print "%s\t%.4f" % (t, n)
    print

(1) Tokenize the tweets using the tokenizer we provided above named `tok`. Count the number of mentions for each tokens regardless of specific user group.

Call `print_count` function to show how many different tokens we have.

It should print
```
Number of elements: 8979
```

original time: 1.15s

In [None]:
#get unique token for each user
#list comprehension with set() gets unique tokens per tweet
#distinct() gets unique tokens per user
#format: (group, token)
tokenRDD0 = twtRDD.flatMap(lambda x: [(x[0],z) for z in tok.tokenize(x[1])]).distinct()

#format: ((group, token), 1)
k_RDD1 = tokenRDD0.map(lambda x:((getPartitionID(x[0]),x[1]),1))

#format: ((group, token), k_c)
k_RDD2 = k_RDD1.reduceByKey(lambda x,y:x+y).cache()

#format: (token, k_all)
tokenRDD1 = k_RDD2.map(lambda (x,y):(x[1],y)).reduceByKey(lambda x,y:x+y)

print_count(tokenRDD1)

(2) Tokens that are mentioned by too few users are usually not very interesting. So we want to only keep tokens that are mentioned by at least 100 users. Please filter out tokens that don't meet this requirement.

Call `print_count` function to show how many different tokens we have after the filtering.

Call `print_tokens` function to show top 20 most frequent tokens.

It should print
```
Number of elements: 52
===== overall =====
:	1386.0000
rt	1237.0000
.	865.0000
\	745.0000
the	621.0000
trump	595.0000
x80	545.0000
xe2	543.0000
to	499.0000
,	489.0000
xa6	457.0000
a	403.0000
is	376.0000
in	296.0000
'	294.0000
of	292.0000
and	287.0000
for	280.0000
!	269.0000
?	210.0000
```

original time: 138 ms

In [None]:
#Filter out tokens with less than 100 users 
tokenRDD2 = tokenRDD1.filter(lambda x: x[1]>100).cache()

#print number of elements
print_count(tokenRDD2)

#Top 20 tokens by popularity
print_tokens(tokenRDD2.top(20, lambda x:x[1]))

(3) For all tokens that are mentioned by at least 100 users, compute their relative popularity in each user group. Then print the top 10 tokens with highest relative popularity in each user group. In case two tokens have same relative popularity, break the tie by printing the alphabetically smaller one.

**Hint:** Let the relative popularity of a token $t$ be $p$. The order of the items will be satisfied by sorting them using (-p, t) as the key.

It should print
```
===== group 0 =====
...	-3.5648
at	-3.5983
hillary	-4.0484
bernie	-4.1430
not	-4.2479
he	-4.2574
i	-4.2854
s	-4.3309
are	-4.3646
in	-4.4021

===== group 1 =====
#demdebate	-2.4391
-	-2.6202
clinton	-2.7174
&	-2.7472
amp	-2.7472
;	-2.7980
sanders	-2.8745
?	-2.9069
in	-2.9615
if	-2.9861

===== group 2 =====
are	-4.6865
and	-4.7055
bernie	-4.7279
at	-4.7682
sanders	-4.9449
in	-5.0395
donald	-5.0531
a	-5.0697
#demdebate	-5.1396
that	-5.1599

===== group 3 =====
#demdebate	-1.3847
bernie	-1.8535
sanders	-2.1793
of	-2.2356
t	-2.2675
clinton	-2.4179
hillary	-2.4203
the	-2.4330
xa6	-2.4962
that	-2.5160

===== group 4 =====
hillary	-3.8074
sanders	-3.9449
of	-4.0199
what	-4.0875
clinton	-4.0959
at	-4.1832
in	-4.2095
a	-4.2623
on	-4.2854
'	-4.2928

===== group 5 =====
cruz	-2.3344
he	-2.6724
will	-2.7705
are	-2.7796
the	-2.8522
is	-2.8822
that	-2.9119
this	-2.9542
for	-2.9594
of	-2.9804

===== group 6 =====
@realdonaldtrump	-1.1520
cruz	-1.4657
n	-1.4877
!	-1.5479
not	-1.8904
xa6	-1.9172
xe2	-1.9973
/	-2.0238
x80	-2.0240
it	-2.0506

===== group 7 =====
donald	-0.6471
...	-0.7922
sanders	-1.0380
what	-1.1178
trump	-1.1293
bernie	-1.2044
you	-1.2099
-	-1.2253
if	-1.2602
clinton	-1.2681
```

In [None]:
def print_popularity(k):
    '''Print top 10 popular token for group k'''
    print '===== group %s ====='%k
    
    #Filter by group and format to (token, c_k)
    popRDD1 = k_RDD2.filter(lambda x:x[0][0]==k).map(lambda x:(x[0][1],x[1]))
    
    #Co-partition (without shuffle) popRDD1 and tokenRDD2 for inner join
    partitionsTokenRDD2 = tokenRDD2.getNumPartitions()
    partitionsPopRDD1 = popRDD1.getNumPartitions()
    if partitionsTokenRDD2 == partitionsPopRDD1:
        pass
    elif partitionsTokenRDD2 < partitionsPopRDD1:
        popRDD1.coalesce(partitionsTokenRDD2, shuffle=False)
    else:
        tokenRDD2.coalesce(partitionsPopRDD1, shuffle=False)
    
    #inner join with previous co-partition to get (token, (c_k, c_all))
    popRDD2 = popRDD1.join(tokenRDD2)
    
    #calculate score and return (score, token)
    popRDD3 = popRDD2.map(lambda (x,y):(get_rel_popularity(y[0],y[1]),x))
    
    #Get top 10 tokens. Sort by score as key.
    pop_list = popRDD3.top(10)

    #print result
    for pop in pop_list:
        print '%s\t%.4f'%(pop[1],pop[0])
        
    print '\n'

original time: 893 ms

In [None]:
for i in xrange(8):
    print_popularity(i)

(4) (optional, not for grading) The users partition is generated by a machine learning algorithm that tries to group the users by their political preferences. Three of the user groups are showing supports to Bernie Sanders, Ted Cruz, and Donald Trump. 

If your program looks okay on the local test data, you can try it on the larger input by submitting your program to the homework server. Observe the output of your program to larger input files, can you guess the partition IDs of the three groups mentioned above based on your output?

original time: 2.6ms

In [None]:
# Change the values of the following three items to your guesses
users_support = [
    (-1, "Bernie Sanders"),
    (-1, "Ted Cruz"),
    (-1, "Donald Trump")
]

for gid, candidate in users_support:
    print "Users from group %d are most likely to support %s." % (gid, candidate)