# Colecting and analyzing the timing profile of code
* The goal of this notebook is to help you with HW3  
* This code is available from the class notes on Canvas  
* You can use this code for debugging, but when you submit - use the original nbgrader notebook provided.

## Timer: Time Measurement Class

In [1]:
from time import time
import datetime

import collections

class Timer:
    def __init__(self):
        self.start()
        
    def start(self):
        """ Start the timer """
        self.start = time()
        self.timestamps = collections.OrderedDict()
        
    def record(self, key):
        """ Record a timestamp with the label `key` """
        self.timestamps[key] = time()
        
    def print_interval(self):
        """ Print all recorded times in order """
        key_times = list(self.timestamps.items())
        for i in range(len(key_times)):
            prev = self.start if i == 0 else key_times[i - 1][1]
            
            print(f'{key_times[i][0]}: {key_times[i][1] - prev:12.2f} seconds')
            
timer = Timer()
timer.start()

In [2]:
_version = '10gb'

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.rdd import RDD

def set_spark_config(leader_name=None, app_name="cse255 spark"):
    import os
    from pyspark import SparkContext, SparkConf

    def get_local_ip():
        import socket
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip

    if leader_name is not None:
        # Connect to the treasurer's spark clusters
        os.environ['SPARK_LOCAL_IP'] = "" #driver_host
        driver_host = get_local_ip()

        conf = SparkConf()
        conf.setMaster(f'spark://spark-master.{leader_name}.svc.cluster.local:7077')
        conf.set("spark.blockmanager.port", "50002")
        conf.set("spark.driver.bindAddress", driver_host)
        conf.set("spark.driver.host", driver_host)
        conf.set("spark.driver.port", "50500")
        conf.set('spark.authenticate', False)
    else:
        conf = SparkConf()
        
#     conf.set("spark.cores.max", 4)
    conf.set("spark.executor.memory", "20g")
    conf.setAppName(app_name)
    sc = SparkContext(conf=conf)

    return sc

In [4]:
# Use local clusters (None) while developing in the notebook 
sc = set_spark_config(leader_name="l1qiao")

# Test whether cluster resources are available
sc.parallelize(['Test', 'resources']).collect()

timer.record("set up sc")

In [5]:
file_path = f'file:///datasets/cs255-sp22-a00-public/public/hw2-files-{_version}.txt'

# YOUR CODE HERE
raw_rdd = sc.textFile(file_path).cache()
num_tweets = raw_rdd.count()
# YOUR CODE ENDS

timer.record("read data")

In [6]:
import json

def safe_parse(raw_json):
    """
    Input is a String
    Output is a JSON object if the tweet is valid and None if not valid
    """

    # YOUR CODE HERE
    try:
        json_obj = json.loads(raw_json) # Not broken
        json_obj["created_at"] # Not a message
        return json_obj
    except:
        return None
    # YOUR CODE ENDS
    
# Remember to construct an RDD of (user_id, text) here.

# YOUR CODE HERE
raw_rdd_json = raw_rdd.map(safe_parse).filter(lambda p: p)
user_text_rdd = raw_rdd_json.map(lambda p: (p['user']['id_str'], p['text']))
# YOUR CODE ENDS

timer.record("safe parse rdd")

In [7]:
# YOUR CODE HERE
user_rdd = user_text_rdd.map(lambda user_text: user_text[0])
num_unique_users = user_rdd.distinct().count()
# YOUR CODE ENDS

timer.record("count unique users")

In [8]:
import subprocess
import pickle

proc = subprocess.Popen(["cat", "./users-partition.pickle"], stdout=subprocess.PIPE)
pickle_content = proc.communicate()[0]

partition = pickle.loads(pickle_content)
## Using Broadcast would help you here

len(partition)

452743

In [9]:
# YOUR CODE HERE
from operator import add
partition_count_rdd = user_text_rdd.map(lambda user_text: (partition.get(user_text[0], 7), 1)).reduceByKey(add).sortByKey()
counts_per_part = partition_count_rdd.collect()
# YOUR CODE ENDS

timer.record("count tweets per user partition")

In [10]:
#!/usr/bin/env python

"""
This code implements a basic, Twitter-aware tokenizer.

A tokenizer is a function that splits a string of text into words. In
Python terms, we map string and unicode objects into lists of unicode
objects.

There is not a single right way to do tokenizing. The best method
depends on the application.  This tokenizer is designed to be flexible
and this easy to adapt to new domains and tasks.  The basic logic is
this:

1. The tuple regex_strings defines a list of regular expression
   strings.

2. The regex_strings strings are put, in order, into a compiled
   regular expression object called word_re.

3. The tokenization is done by word_re.findall(s), where s is the
   user-supplied string, inside the tokenize() method of the class
   Tokenizer.

4. When instantiating Tokenizer objects, there is a single option:
   preserve_case.  By default, it is set to True. If it is set to
   False, then the tokenizer will downcase everything except for
   emoticons.

The __main__ method illustrates by tokenizing a few examples.

I've also included a Tokenizer method tokenize_random_tweet(). If the
twitter library is installed (http://code.google.com/p/python-twitter/)
and Twitter is cooperating, then it should tokenize a random
English-language tweet.


Julaiti Alafate:
  I modified the regex strings to extract URLs in tweets.
"""

__author__ = "Christopher Potts"
__copyright__ = "Copyright 2011, Christopher Potts"
__credits__ = []
__license__ = "Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Unported License: http://creativecommons.org/licenses/by-nc-sa/3.0/"
__version__ = "1.0"
__maintainer__ = "Christopher Potts"
__email__ = "See the author's website"

######################################################################

import re
from html import entities 

######################################################################
# The following strings are components in the regular expression
# that is used for tokenizing. It's important that phone_number
# appears first in the final regex (since it can contain whitespace).
# It also could matter that tags comes after emoticons, due to the
# possibility of having text like
#
#     <:| and some text >:)
#
# Most imporatantly, the final element should always be last, since it
# does a last ditch whitespace-based tokenization of whatever is left.

# This particular element is used in a couple ways, so we define it
# with a name:
emoticon_string = r"""
    (?:
      [<>]?
      [:;=8]                     # eyes
      [\-o\*\']?                 # optional nose
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth      
      |
      [\)\]\(\[dDpP/\:\}\{@\|\\] # mouth
      [\-o\*\']?                 # optional nose
      [:;=8]                     # eyes
      [<>]?
    )"""

# The components of the tokenizer:
regex_strings = (
    # Phone numbers:
    r"""
    (?:
      (?:            # (international)
        \+?[01]
        [\-\s.]*
      )?            
      (?:            # (area code)
        [\(]?
        \d{3}
        [\-\s.\)]*
      )?    
      \d{3}          # exchange
      [\-\s.]*   
      \d{4}          # base
    )"""
    ,
    # URLs:
    r"""http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"""
    ,
    # Emoticons:
    emoticon_string
    ,    
    # HTML tags:
     r"""<[^>]+>"""
    ,
    # Twitter username:
    r"""(?:@[\w_]+)"""
    ,
    # Twitter hashtags:
    r"""(?:\#+[\w_]+[\w\'_\-]*[\w_]+)"""
    ,
    # Remaining word types:
    r"""
    (?:[a-z][a-z'\-_]+[a-z])       # Words with apostrophes or dashes.
    |
    (?:[+\-]?\d+[,/.:-]\d+[+\-]?)  # Numbers, including fractions, decimals.
    |
    (?:[\w_]+)                     # Words without apostrophes or dashes.
    |
    (?:\.(?:\s*\.){1,})            # Ellipsis dots. 
    |
    (?:\S)                         # Everything else that isn't whitespace.
    """
    )

######################################################################
# This is the core tokenizing regex:
    
word_re = re.compile(r"""(%s)""" % "|".join(regex_strings), re.VERBOSE | re.I | re.UNICODE)

# The emoticon string gets its own regex so that we can preserve case for them as needed:
emoticon_re = re.compile(regex_strings[1], re.VERBOSE | re.I | re.UNICODE)

# These are for regularizing HTML entities to Unicode:
html_entity_digit_re = re.compile(r"&#\d+;")
html_entity_alpha_re = re.compile(r"&\w+;")
amp = "&amp;"

######################################################################

class Tokenizer:
    def __init__(self, preserve_case=False):
        self.preserve_case = preserve_case

    def tokenize(self, s):
        """
        Argument: s -- any string or unicode object
        Value: a tokenize list of strings; conatenating this list returns the original string if preserve_case=False
        """        
        # Try to ensure unicode:
        try:
            s = str(s)
        except UnicodeDecodeError:
            s = s.encode('string_escape')
            s = str(s)
        # Fix HTML character entitites:
        s = self.__html2unicode(s)
        # Tokenize:
        words = word_re.findall(s)
        # Possible alter the case, but avoid changing emoticons like :D into :d:
        if not self.preserve_case:            
            words = list(map((lambda x : x if emoticon_re.search(x) else x.lower()), words))
        return words

    def tokenize_random_tweet(self):
        """
        If the twitter library is installed and a twitter connection
        can be established, then tokenize a random tweet.
        """
        try:
            import twitter
        except ImportError:
            print("Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/")
        from random import shuffle
        api = twitter.Api()
        tweets = api.GetPublicTimeline()
        if tweets:
            for tweet in tweets:
                if tweet.user.lang == 'en':            
                    return self.tokenize(tweet.text)
        else:
            raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")

    def __html2unicode(self, s):
        """
        Internal metod that seeks to replace all the HTML entities in
        s with their corresponding unicode characters.
        """
        # First the digits:
        ents = set(html_entity_digit_re.findall(s))
        if len(ents) > 0:
            for ent in ents:
                entnum = ent[2:-1]
                try:
                    entnum = int(entnum)
                    s = s.replace(ent, unichr(entnum))	
                except:
                    pass
        # Now the alpha versions:
        ents = set(html_entity_alpha_re.findall(s))
        ents = filter((lambda x : x != amp), ents)
        for ent in ents:
            entname = ent[1:-1]
            try:            
                s = s.replace(ent, unichr(entities.name2codepoint[entname]))
            except:
                pass                    
            s = s.replace(amp, " and ")
        return s

In [11]:
from math import log

tok = Tokenizer(preserve_case=False)

def get_rel_popularity(c_k, c_all):
    '''
    Compute the relative popularity of a token.
    
    Args:
        c_k: the number of mentions in the user partition k.
        c_all: the number of all mentions.
        
    Return:
        The relative popularity of the token. It should be a negative number due to the log function. 
    '''
    
    return log(1.0 * c_k / c_all) / log(2)

In [12]:
# YOUR CODE HERE
user_token_rdd = user_text_rdd \
    .map(lambda x: (x[0], set(tok.tokenize(x[1])))) \
    .groupByKey() \
    .flatMap(lambda x: [(x[0], token) for token in set.union(*x[1])])
token_count_rdd = user_token_rdd.map(lambda user_token: (user_token[1], 1)).reduceByKey(add) 
num_of_tokens = token_count_rdd.count()
# YOUR CODE ENDS

timer.record("count all unique tokens")

In [13]:
# YOUR CODE HERE
user_token_rdd_freq = token_count_rdd.filter(lambda tok_count: tok_count[1] >= 100)
num_freq_tokens = user_token_rdd_freq.count()
top20 = user_token_rdd_freq.sortBy(lambda tok_count: tok_count[1], ascending=False).take(20)
# YOUR CODE ENDS

timer.record("count overall most popular tokens")

In [14]:
# YOUR CODE HERE
# user_token_rdd_freq: [(token, count), ...] where count >= 100
user_token_dict_freq = user_token_rdd_freq.collectAsMap()
timer.record(f'user_token_dict_freq')

popular_10_in_each_group = []
for k in range(8):
    # user_token_rdd: [(token, count), ...]
    # k_group_token: [(user, token), ...] where the count of token >= 100 AND user is in group k
    
    k_group_token = user_token_rdd \ 
        .filter(lambda user_token: user_token[1] in user_token_dict_freq) \
        .filter(lambda user_token: partition.get(user_token[0], 7) == k)
    timer.record(f'iteration {k} k_group_token')

    # k_group_token_count: [(token, count), ...] where count is number of mentions of token in group k
    k_group_token_count = k_group_token \
        .map(lambda user_token: (user_token[1], 1)) \
        .reduceByKey(lambda x, y: x + y) 
    timer.record(f'iteration {k} k_group_token_count')
    
    # Get the all mentions of the token regardless of the group using user_token_dict_freq[token]
    top10 = k_group_token_count \
        .map(lambda token_count: (token_count[0], get_rel_popularity(token_count[1], user_token_dict_freq[token_count[0]]))) \
        .sortBy(lambda token_rel: (-token_rel[1], token_rel[0])) \
        .take(10)
    timer.record(f'end iteration {k} top10')
                
    popular_10_in_each_group.append(top10)
# YOUR CODE END

In [15]:
timer.print_interval()

set up sc:         6.32 seconds
read data:        51.98 seconds
safe parse rdd:         0.01 seconds
count unique users:        89.39 seconds
count tweets per user partition:       100.91 seconds
count all unique tokens:       140.34 seconds
count overall most popular tokens:        45.07 seconds
user_token_dict_freq:        11.12 seconds
iteration 0 k_group_token:         0.00 seconds
iteration 0 k_group_token_count:         4.25 seconds
end iteration 0 top10:        58.54 seconds
iteration 1 k_group_token:         0.00 seconds
iteration 1 k_group_token_count:         4.52 seconds
end iteration 1 top10:        62.99 seconds
iteration 2 k_group_token:         0.00 seconds
iteration 2 k_group_token_count:         4.55 seconds
end iteration 2 top10:        59.49 seconds
iteration 3 k_group_token:         0.00 seconds
iteration 3 k_group_token_count:         4.50 seconds
end iteration 3 top10:        63.93 seconds
iteration 4 k_group_token:         0.00 seconds
iteration 4 k_group_token_c

### Takeaways
* Use a timer to understand which parts of your code consume the most time.
* Using a sequential timer rather than `start` `stop` guarantees that all parts are covered.
* When the same RDD is computed over and over, add a `.cache()` to store the the ressults.
* Avoid Loops: an ideal code does not have loops.
* Running time varies by $\pm 5\%$ even when your process runs alone. Code will be run only once in evaluation, so it might be better or worse than the timing you got. To estimate the range of running times, run your code several times.