In [None]:
from __future__ import print_function
%matplotlib inline
import matplotlib.pylab as plt
import sys, os, glob
import numpy as np

plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

# Analyzing the Gutenberg Books Corpus

In this notebook, we will use the cleaned, pre-processed data that we created in the [pre-processing part](gutenberg-preprocessing-SOLUTIONS.ipynb). As a reminder, we ended up with an RDD of `(gid, text)` tuples that has been cleaned and we stored it on HDFS at `/user/<YOUR_USERNAME>/gutenberg/cleaned_rdd`. 

We have two goals in this notebook:

1. generate data that will enable us to create something like the [Google Ngram Viewer](https://books.google.com/ngrams) but for the Gutenberg book corpus. 
2. Use Spark's machine learning library to perform language classification on the English and German book corpus

## Setting up Python and Spark

These steps are identical to those used in the previous notebook so we have omitted the lengthy explanations -- if you need to check what any of this is doing, have a look at the pre-processing notebook. }

In [None]:
import findspark, os
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext

In [None]:
# put the number of executors and cores into variables so we can refer to it later
num_execs = 20
exec_cores = 4

In [None]:
# initializing the SparkConf
os.environ['SPARK_DRIVER_MEMORY'] = '4g'
os.environ['SPARK_CONF_DIR'] = '%s/../../spark_config'%os.getcwd()
conf = (SparkConf()
            .set('spark.executor.memory', '8g')
            .set('spark.executor.instances', str(num_execs))
            .set('spark.executor.cores', str(exec_cores))
            .set('spark.storage.memoryFraction', 0.3)
            .set('spark.shuffle.memoryFraction', 0.5)
            .set('spark.yarn.executor.memoryOverhead', 3072)
            .set('spark.yarn.am.memory', '8g')
            .set('spark.yarn.am.cores', 4)
            .set('spark.executorEnv.PYTHONPATH', 
                 '/cluster/apps/spark/spark-current/python:/cluster/apps/spark/spark-current/python/lib/py4j-0.8.2.1-src.zip')
            .set('spark.executorEnv.PATH', os.environ['PATH']))

In [None]:
sc = SparkContext(master = 'yarn-client', conf = conf)

If this works successfully, you can check the [YARN application scheduler](http://hadoop.hpc-net.ethz.ch:8088/cluster) and you should see your app listed there. Clicking on the "Application Master" link will bring up the familiar Spark Web UI. 

Note that we specified a custom `SPARK_CONF_DIR`: this way we can override default spark settings. In particular in this case we override the logging options to keep the console a bit more quiet. We still want to see the warnings and the outputs from the machine learning library, however. Here is the log configuration file that accomplishes this: 

In [None]:
!cat ../../spark_config/log4j.properties

## Load the data from HDFS

In [None]:
# TODO: load cleaned_rdd from your directory on HDFS
cleaned_rdd = sc.pickleFile('/user/YOUR USERNAME/gutenberg/cleaned_rdd').setName('cleaned_rdd').cache()

A quick inspection of the data to remind ourselves what it looks like: 

In [None]:
gid, text = cleaned_rdd.first()
print(gid, text[10000:10500]) # somewhere in the middle of the book

### Load in the metadata dictionary and broadcast it

In [None]:
from cPickle import load

with open('{home}/gutenberg_metadata.dump'.format(home=os.environ['HOME']), 'r') as f :
    meta_dict = load(f)

In [None]:
# TODO: create meta_b by broadcasting meta_dict
meta_b = sc.broadcast(meta_dict)

Now, our `cleaned_rdd` contains `gid`'s as keys and text as values and if we want some other piece of metadata, we can just access it via the lookup table, for example `meta_b.value[gid][meta_name]`. 

### Histogram of book publication years
Now we're ready to start asking some questions of the data. To begin with, lets do a simple histogram of the year distribution of the books. Since we don't have original publication dates, we just use the simple formula: 

$year = max\left((year_{birth} + year_{death})/2, year_{birth} + offset\right)$, 

where $offset$ is a number drawn from a gaussian centered on 40 with $\sigma = 5$ years. This means that we assume most people write their books around age 40. ;)

The function `publication_year` is provided for you and you should use it to *transform* the `year_rdd` into an RDD of publication years. 

Remember that we created a broadcast variable at the beginning of the notebook, which can be used to efficiently look up metadata.

In [None]:
import numpy as np
def publication_year(meta) : 
    """Returns the publication year for the given metadata dictionary"""
    
    birth_year = int(meta['birth_year'])
    if meta['death_year'] is None : 
        year = birth_year + np.random.normal(40,5)
    else :
        death_year = int(meta['death_year'])
        year = max((birth_year + death_year) / 2.0, birth_year+np.random.normal(40,5))

    return min(int(year),2015)

In [None]:
# TODO: map cleaned_rdd to contain just the publication years by using the publication_year function above
year_rdd = cleaned_rdd.<FILL IN>
                       
year_rdd.count()

In [None]:
year_rdd.first()

The histogram function actually already exists in the Spark API (but it didn't use to!). However, for fun we will write our own. Calculating the histogram can be split up into two parts:

2. calculate the bin that the value maps to and create an RDD of (`bin`, 1) pair
3. do a `reduceByKey` where we just add up all the values belonging to each bin. 

The last step is the simplest for us to execute, but the most complicated in terms of what happens behind the scenes. The reduction happens in two parts - first, it creates a list of `(key, count)` pairs locally by iterating through all the elements of the partition and creating a hash table. Then, these pairs are communicated to other partitions and added up. 

In [None]:
# define a helper function to determine which bin a value falls into
from bisect import bisect_right
def get_bin(bin_edges, value) : 
    """Returns which bin, specified by `bin_edges`, the `value` falls into."""
    return bisect_right(bin_edges, value) - 1

In [None]:
# TODO: fill in the bits and pieces of the histogram function

def histogram(rdd, nbins = 100, min_val=None, max_val=None) :
    """
    Calculate a histogram of the data, given the number of bins and the data range. 
    
    Arguments: 
        
        rdd: the data RDD
    
    Optional Keywords:
        
        nbins: number of bins (default 100)
        
        min_val: minimum value to consider (default, min value of the input data)
        
        max_val: maximum value to consider (default, max value of the input data)
    
    Returns: 
        
        tuple of bins and array of counts
        
    """
    # if either min_val or max_val are missing, get them from the data
    if min_val is None : 
        min_val = rdd.<FILL IN> # FILL IN --> look at the Spark API!
    if max_val is None : 
        max_val = rdd.<FILL IN> # FILL IN --> look at the Spark API!
        
    # create the array of bin edges
    bin_edges = np.linspace(min_val,max_val,nbins+1)
    
    # create an RDD where each data value is mapped into a tuple (bin, 1) that will 
    # be used for counting up the bin values (use the get_bin function defined above to generate the bin)
    binned_rdd = rdd.<FILL IN> # FILL IN 
    
    # reduce binned_rdd and collect the values into summed_bins 
    summed_bins = binned_rdd.<FILL IN>.collect() # FILL IN 
    
    # This is a sparse result -- turn into a dense vector for plotting: 
    res_full = np.zeros(nbins)
    overflow = 0
    for item in summed_bins : 
        if item[0] > len(res_full)-1 or item[0] < 0: 
            continue # ignore
        else: res_full[item[0]] = item[1]
            
    return .5*(bin_edges[:-1]+bin_edges[1:]), res_full

In [None]:
%time bins, vals = histogram(year_rdd, min_val=1500, max_val=2015)

Finally, here is the histogram of publication years:

In [None]:
plt.plot(bins, vals)
plt.xlim(1500,2015)
plt.xlabel('year')
plt.ylabel('Number of books')
plt.semilogy()
pass

### Reducing network traffic by using `mapPartition`

This kind of operation is a good candidate for first computing a partial result on each partition and then following with a reduce step. 

In our `histogram` function above, the `reduceByKey` step requires a reshuffling of the data which means spending potentially a lot of time in network communication and other overhead associated with creating distributed hash tables for all the keys. 

However, since our binning function effectively already hashes the values anyway, we can avoid this by instructing first each partition to calculate its local histogram and then simply adding up the histograms in the end. In this way, we are basically combinging the `map` operation with a local `reduce` operation, and since the number of bins is always relatively small, we end up sending a trivial amount of data across the network. 

Here is where our knowledge of generators comes in handy, because the method [`mapPartitions`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.mapPartitions) *requires* a generator function. 

The key here is to define a histogram function that calculates the histogram on each partition locally. `mapPartitions` gives us an *iterator* over the data; we need to extract the data out of this iterator and calculate the local histograms *for the data in each partition*. Below, this is accomplished using the `bin_partition` function.

Finally, we define a new `histogram_partition` function that uses `bin_partition` to compute the local histograms and a simple addition in a `reduce` operation to sum up the histograms in the end. 

In [None]:
def bin_partition(iterator, nbins,  min_val, max_val) : 
    """
    Perform the binning of data contained in an iterator
    
    Arguments: 
        
        iterator: the data iterable 
        
        nbins: number of bins
        
        min_val, max_val: min and max values to consider
    
    yields the local histogram
    """
    from bisect import bisect_right
    
    bin_edges = np.linspace(min_val,max_val,nbins+1)
    
    histogram = np.zeros(len(bin_edges)-1)
    
    for item in iterator : # iterating over all the items in the partition
        try : 
            ind = get_bin(bin_edges,item)
            histogram[ind] += 1
        except IndexError : 
            pass
        
    yield histogram

def histogram_partition(rdd, nbins = 100, min_val=None, max_val=None) :
    """
    Calculate a histogram of the data by using partition methods
    
    Arguments: 
    
        rdd: the data
        
    Optional Keywords:
    
        nbins: number of bins (default 100)
        
        min_val: minimum value to consider (default, min value of the input data)
        
        max_val: maximum value to consider (default, max value of the input data)
    
    Returns: 
        
        tuple of bins and array of counts
    """
    # if either min_val or max_val are missing, get them from the data
    if min_val is None : 
        min_val = rdd.<FILL IN> # FILL IN
    if max_val is None : 
        max_val = rdd.<FILL IN> # FILL IN
        
    bin_edges = np.linspace(min_val,max_val,nbins+1)
    
    result = (rdd.mapPartitions(lambda iterator: bin_partition(iterator, nbins, min_val, max_val))
                 .reduce(lambda a,b: a+b))
    
    return .5*(bin_edges[:-1]+bin_edges[1:]), result

In [None]:
%time bins, vals = histogram_partition(year_rdd, min_val=1500, max_val=2015)

The difference here doesn't look dramatic because the total amount of data is rather small, but have a look at the Spark Web UI and you will see that the second implementation didn't do any shuffle writing. If you have a large number of keys, the intermediate shuffles that need to take place can have a substantial impact on performance. With a bigger data set, this difference could potentially matter quite a lot!

### Inspecting the metadata some more

Lets do a couple more checks and practice using the Spark API. 

#### How many unique authors are there in the dataset? 

1. make `author_rdd` that is composed of a string `"last_name, first_name"` (use the broadcast variable `meta_b` to get the data for each `gid`)
2. keep only the unique author strings (*hint*: look at the Spark API to find an appropriate method)
3. count the number of elements remaining

**note**: use `meta_b.value` to access the actual metadata dictionary, i.e. to get the metadata for `gid=101`:

In [None]:
meta_b.value[101]

In [None]:
# TODO: map cleaned_rdd to contain the string "last_name, first_name" 
author_rdd = cleaned_rdd.<FILL IN>

In [None]:
# TODO: use RDD methods to obtain the distinct author strings and count them
n_authors = (author_rdd.<FILL IN>)
print("Number of distinct authors: %s " % n_authors)

In [None]:
assert(n_authors == 10192)

#### Most-represented authors in the corpus: 

1. use the `author_rdd` from above
2. use the pattern `(key, 1)` to set up an RDD that can be passed to `reduceByKey`
3. run `reduceByKey`, yielding an RDD composed of `(author, count)` tuples
4. sort by descending order of number of books per author and print out the top 10 (try using `takeOrdered`)

In [None]:
# TODO: generate a list of authors, reverse-sorted by the number of books they have in the corpus
(author_rdd.<FILL IN>
           .<FILL IN>
           .<FILL IN>)

Finally, lets do the same thing per language, just to get an idea of how much data there is: 

In [None]:
# FILL IN 
lang_rdd = (cleaned_rdd.map(<FILL IN>)
                       .<FILL IN>
                       .<FILL IN>).collect()

## How many unique words were used in English in these 500+ years? 

We could have done the above metadata gymnastics without ever invoking a distributed processing framework by simply extracting the years from the metadata -- nevertheless we used the metadata to have a closer look at some of the RDD methods. However, the text body of each data element is where the bulk of the data volume lies. 

To construct a corpus wide vocabulary, we have to deconstruct each document into a list of words and then extract the unique words from the entire data set. If our dataset fits into memory of a single machine, this is a simple `set` operation. But what if it doesn't? 

We'll assume this is the case and instead of converting each `gid,text` pair into a `gid,list_of_words` pair, we will simply construct one global RDD of words. Here we aren't necessarily interested in preserving the provenance of words, but just finding the unique words in the whole corpus, so we drop the metadata altogether. 

The steps are as follows:

1. map the entire RDD of text into an RDD of single words (use flatMap -- this returns a different number of elements than it takes in)
2. use the `distinct` method of the resulting RDD to transform it into an RDD with only unique words

As a reminder, here's an illustration of how `flatMap` differs from `map`:

![flatMap](../figs/flatMap_example.svg)

*Hint:* In python, splitting a string into a set of words separated by spaces is easy: 

In [None]:
line = 'splitting a string is super simple'
line.split()

Make an RDD `distinct_rdd` which holds the *unique English* words. Consider the steps this will require:

* use `cleaned_rdd` but keep only books in the english language (make sure you use the broadcast metadata variable!)
* convert each document into individual words
* retain only the unique words

Which RDD methods can you use to achieve these three steps? (note that this will be a pretty expensive operation so it might take some time...)

In [None]:
# TODO: create distinct_rdd by filtering for english books and using RDD methods to generate an RDD of distinct words
distinct_rdd = (cleaned_rdd.filter(lambda (gid, text): meta_b.value[gid]['lang'] == 'en')
                           .flatMap(lambda (gid, text): text.split())
                           .distinct())
nwords = distinct_rdd.count()
print("Number of unique English words: ", nwords)

In [None]:
assert(nwords == 3688479)

Note that not all of these are actual words, this is just how many character sequences separated by spaces we found. The pre-processing steps are less than perfect so there is some garbage in there. We will trim this down to just the most commonly-used ones later and that will get rid of most of the nonsense. 

**Bonus question**: can you write this by using `mapPartitions` to first make sets of words unique to each partition? It's a bit faster...

*hint*: use the python [set](https://docs.python.org/2/library/stdtypes.html#set) to find unique words in each partition

In [None]:
# FILL IN 
def partition_set(iterator): 
    words = set()
    
    # iterate through the data
    for text in iterator : 
        # use the "update" method of the words set to add a new set composed of all the words
        # from the "text" item. Remember that "text" is just a string, so can use the split() method
        words.<FILL IN> # FILL IN
    
    # now we have the unique words of the partition --> yield them into the RDD
    for word in words: 
        yield word
        

In [None]:
res = (cleaned_rdd.filter(lambda (gid, text): meta_b.value[gid]['lang'] == 'en')
            .values()
            .mapPartitions(partition_set)
            .distinct())
res.count()

## What are the most common words? 

A "MapReduce" tutorial has to include a word counting example -- it's basically the equivalent of a "Hello World!" example for a programming tutorial!

So, lets count the occurences of all the words across the entire corpus. This is a fairly straightforward operation, but it exposes some very common patterns that can be useful for many tasks. To simplify this a bit, we'll use only the English-language corpus for the moment.

Here are the steps we need to take:

0. keep only the english language books (use a filter)
1. `flatMap` each document into (`word, count`) pairs, but only for words that are not in the `stop_words` set (try with a list comprehension!)
2. call `reduceByKey` to sum up all the `count`s for each word
3. finally sort it in descending order to see the most common words first

The first part here (filtering and `flatMap`) is much like what we did before, but with a twist: for each word, check that it is *not* a member of the `stop_words` set. "Stop words" include common words like "a, the, he" etc.  

In [None]:
from pickle import load
stop_words = load(open('./stop_words.dump')).union(['gutenbergtm', 'gutenberg', 'electronic', 'foundation', 'license', 'copyright', 'donation', 'donations'])

In [None]:
# TODO: create english_rdd which contains only the english books by filtering on the metadata
english_rdd = cleaned_rdd.<FILL IN>.cache()

In [None]:
# TODO: use flatMap to extract the words from each document's text using the english_rdd we made above
words_rdd = (english_rdd.<FILL IN>
                        .setName('words_rdd')
                        .cache())

Now that we have our "flattened" data set, do the counting by first mapping each word in `words_rdd` into a `(word, 1)` tuple, and then using `reduceByKey` to calculate the word frequencies. At the end of this step, use the `sortBy` method to sort the word counts in descending order. 

In [None]:
# TODO: do the word count!
word_count = (words_rdd.<FILL IN>
                       .setName('word_count')
                       .cache())

In [None]:
assert(word_count.take(5) == [('said', 4339532), ('man', 2775185), ('time', 2688315), ('little', 2458328), ('like', 2342552)])

In [None]:
# fifty most common words (excluding stop words)
word_count.take(50)

## Reduces, Shuffles, and Partitioning
During a `reduceByKey`, or any other reduction for that matter, data must be shuffled around the cluster and combined. Other common RDD methods like `join`, `sortByKey` etc. also typically require lots of data shuffling. By default, this is done in an intelligent way by first reducing values locally on each partition, and then combining the results of the partitions. Still, as is the case here, for common keys, every partition will have to send its results to others. This can result in a lot of temporary file IO if the data that needs to be communicated can't all be held in memory on all of the executors. 

One way around this is to pre-partition the data ahead of time so that the same keys land on the same partition by design. This results in much less data needing to be shipped around the network and can improve the performance. Of course, at the cost of an expensive initial shuffle that takes place during the partitioning step! But if many "by key" have to be done on the same data, it might be worth it. 

Lets have a look at these concepts by performing the word count in a few different ways. 

Now we will create a re-partitioned `words_rdd` using the default partitioning of `partitionBy` (just a hash function). You may also specify your own partitioning function. 

In [None]:
num_partitions = words_rdd.getNumPartitions()

In [None]:
par = (words_rdd.map(lambda word: (word,1))
                .partitionBy(num_partitions)
                .cache())
par.count() # call count to compute and cache the data

Since both datasets are cached in memory, we can compare the time it takes the reduce step to complete:

In [None]:
%%time
(words_rdd.map(lambda word: (word,1))
          .reduceByKey(lambda a,b: a+b)
          .count())

In [None]:
%%time
par.reduceByKey(lambda a,b: a+b).count()

Quite a speedup, but at the cost of an expensive initial shuffle. If the "by key" operation is only done once, then this is not worth it -- but if it's done repeatedly  (i.e. frequent joins) then it may be beneficial. However, we can also expect the difference to depend on the nature of the dataset. If you inspect the Spark UI, you can see that the first `reduceByKey` (i.e. one done on `word_rdd`) shuffled ~390 Mb of data, while the second `reduceByKey` (i.e. done on `par`) only shuffled ~50 Mb of data. This dataset is still pretty small, but when the shuffles are in the Gb range, the differences can be substantial. 

If you need to do lookups of individual keys, this becomes even more dramatic: 

In [None]:
%time x = words_rdd.lookup('environment')

In [None]:
%time x = par.lookup('environment')

# Computing word frequency vs. time

Now we have all the components to build a simple tool that visualizes the relative word frequency as a function of time in the Gutenberg corpus. For inspiration, see the [Google Ngram viewer](https://books.google.com/ngrams).

### Converting documents into vectors

To make quantitative analysis of the corpus possible, we will convert each document into a vector that represents the word counts for each word appearing in the document. 

This will look something like this. Imagine we have have a corpus consisting of two "documents"

    document 1: "a dog bit me"
    document 2: "i bit the dog back"
    
Then our corpus vocabulary (of 1-grams) is

    ["a", "dog", "bit", "me", "i", "the", "back"]
    
Since this is an array and each word in the array has a unique index, we can "encode" the two documents using this index mapping. Our corpus now looks like this: 

    document 1: [1, 1, 1, 1, 0, 0, 0]
    document 2: [0, 1, 1, 0, 1, 1, 1]
    
In order for the vector indices to remain consistent across the whole corpus, the first step is to build a corpus-wide lookup table, a `word --> index` mapping. 


### Generating word counts 

Once each document is converted to a vector, doing the word counts for sub-groups of documents is a simple vector addition operation. In our case, we will reduce the vectors by year, yielding an RDD that will have the total number of occurrences of each word in every year. From there, it is trivial to look up the desired word and plot the relative frequency vs. year. 

## Create the vocabulary lookup table
Create a look-up table of words by attaching a unique index to each word. The Spark API provides a [zipWithIndex](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.zipWithIndex) method that makes this easy. 

Above, we created the `word_count` RDD that already contains the unique words and their associated counts. To reduce the size of the lookup table (and the size of the vectors), we will restrict ourselves to using only the first 100k words. 

You can either create the `(word, index)` pairs in the RDD and collect the top 100,000 as a dictionary using `collectAsMap`, or you can collect the top 100,000 words from `word_count` RDD and turn them into a dictionary locally. 

These are the steps we need to take to make the vocabulary lookup from an RDD:

1. use `zipWithIndex` on the keys of `word_count` to generate a unique index for each word -- we don't care about the counts anymore, so we can get rid of the values and just work with the keys
2. use filter to retain only the first 100000 words 
3. finally, use [collectAsMap](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.collectAsMap) to return the resulting RDD to the driver as a dictionary. 

In [None]:
# TODO: create word_lookup, a dictionary mapping each of the top 100,000 words to an integer
word_lookup = <FILL IN>

Make a `word_lookup` into a broadcast variable so we can use it on all the workers:

In [None]:
word_lookup_b = sc.broadcast(word_lookup)

This dictionary is approximately 6 Mb in size - without a broadcast, it would get sent over the network to each task, resulting in a lot of network traffic! As a broadcast variable, it gets sent only *once* to each *executor*, i.e. it's transferred only 20 times (if you are using the defaults, otherwise however many executors you have). 

## Vectorize the documents

Now that we have a vocabulary lookup table, we can use this to turn each document into a vector. 

This is done by counting up the occurrences of all words in the document that are also in the global vocabulary. 

The function `vectorize_doc` below accomplishes this by using a dictionary to keep track of the local word count. Once the counting is done we use the counts to create a sparse vector that represents the document. A sparse vector consists of two arrays, one representing the *locations* of the non-zero values, and the other the values themselves. 

To return to our contrived example from above, we had 

    document 1: "a dog bit me"
    document 2: "i bit the dog back"
    
which turned into 

    document 1: [1, 1, 1, 1, 0, 0, 0]
    document 2: [0, 1, 1, 0, 1, 1, 1]

with a vocabulary of 

    ["a", "dog", "bit", "me", "i", "the", "back"]
    
As sparse vectors, these two documents could be represented with two arrays: 

    document 1: indices = [0,1,2,3]; values = [1, 1, 1, 1]
    document 2: indices = [1,2,4,5,6]; values = [1, 1, 1, 1, 1]

We use Spark's own `SparseVector` data type, for which we must specify a size (total number of features), a (sorted) array of indices, and an array of values. This means that we start to save space if sparsity is > 50%. Note that the `SparseVector` provides some nice higher-level methods, but it does not allow simple operations like addition. If a lot of vector arithmetic is needed, you should use the scipy sparse types instead. 

In the next cell, we define two functions: 

* **`extract_ngrams`** converts a sequence of words or characters into a sequence of n-grams (here we are just using single worde, i.e. 1-grams so we'll postpone talking about ngrams until later)

* **`vectorize_doc`** converts a document into a sparse vector by using `extract_ngrams` to tokenize it and a vocabulary mapping to turn each word into a vector component

In [None]:
from pyspark.mllib.linalg import SparseVector

def extract_ngrams(tokens, ngram_range=[1,1], select_ngrams = None, ngram_type='word'):
    """
    Turn tokens into a sequence of n-grams 

    **Inputs**:

    *tokens*: a list of tokens

    **Optional Keywords**:

    *ngram_range*: a tuple with min, max ngram ngram_range
    
    *select_ngrams*: the vocabulary to use
    
    *ngram_type*: whether to produce word or character ngrams

    **Output**

    Generator yielding a list of ngrams in the desired range
    generated from the input list of tokens

    """
    join_str = "" if ngram_type=='character' else " "
    
    # handle token n-grams
    min_n, max_n = ngram_range
    n_tokens = len(tokens)
    
    for n in xrange(min_n, min(max_n + 1, n_tokens + 1)):
        for i in xrange(n_tokens - n + 1):
            if n == 1: 
                res = tokens[i]
            else : 
                res = join_str.join(tokens[i: i+n])
           
            # if we are using a lookup vocabulary, check for membership here
            if select_ngrams is not None : 
                if res in select_ngrams: 
                    yield res
            else : 
                yield res
            

def vectorize_doc(doc, vocab, ngram_range = [1,1], ngram_type='word') : 
    """
    Returns a vector representation of `doc` given the reference 
    vocabulary `vocab` after tokenizing it with `tokenizer`
    
    Arguments: 
        
        doc: a sequence of tokens (words or characters)
        
        vocab: the vocabulary mapping
        
    Keywords:
    
        ngram_range: the range of ngrams to process
        
        ngram_type: whether to produce character or word ngrams; default is 
        
    Returns:
    
        a sparse vector representation of the document given the vocabulary mapping
    """
    from collections import defaultdict
    from scipy.sparse import csr_matrix 
        
    # count all the word occurences 
    data = np.zeros(len(vocab))
    
    for ngram in extract_ngrams(doc, ngram_range, vocab, ngram_type) : 
         data[vocab[ngram]] += 1
            
    # only keep the nonzero indices for the sparse representation
    indices = data.nonzero()[0]
    values = data[indices]
    
    return SparseVector(len(vocab), indices, values)

Using these functions to vectorize our two-sentence test corpus: 

In [None]:
import string
s1 = "a dog bit me"
s2 = "i bit the dog back"
vocab = ["a", "dog", "bit", "me", "i", "the", "back"]
vocab_dict = {word:ind for ind, word in enumerate(vocab)}

In [None]:
print(s1)
print(vectorize_doc(s1.split(), vocab_dict))
print(s2)
print(vectorize_doc(s2.split(), vocab_dict))

Now we have all the components we need to create an RDD of english-language books vectorized using the most common 100k words. All we need to do is to use `mapValues` to map the text of each document in `english_rdd` into a vector using `vectorize_doc` and our broadcast vocabulary lookup table `word_lookup_b`. 

In [None]:
# FILL IN 
vector_rdd = <FILL IN>.cache()

In [None]:
vector_rdd.first()

In [None]:
# make sure the transformation can be carried out for all elements by using count
vector_rdd.count()

## Perform the aggregation

We now have the entire Gutenberg English book corpus in the form of sparse vectors encoding the most used 100k words. 

To get the yearly sums, we will turn the metadata of each document into its publication year (i.e. the key will be the year, the value is the vector) and then do an aggregation by year. 

We will use the powerful [`treeAggregate`](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=values#pyspark.RDD.treeAggregate) method, which requires that we specify three different components:

1. the starting aggregate
2. a function that adds a new value to the aggregate 
3. a function that adds together two aggregates

The way `treeAggregate` works is that it performs the reduction in a tree pattern in order to minimize the strain on the driver. In a "normal" reduction, the workers send their results to the driver, which is tasked with putting it all together -- however, if these partial results are large (as is potentially the case here) then the driver can run into memory issues. Furthermore, most of the cluster is sitting idle while the driver performs the aggregation. `treeAggregate` fixes this by performing partial aggregations on the workers and only sending the final stages to the driver. See this [blog post](https://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html) for a bit more description of this method. 

The aggregation methods are powerful because the "aggregate" can be any object -- we can write a class that gets passed around to do the aggregation, for example. Aggregation methods are more general reduction methods because they allow you to change the type of the variables -- in our case here, we are converting the data `(year, vector)` tuples into a dictionary of arrays. 

Below, we will use an instance of a dictionary as the aggregation object and define two functions that will do the actual aggregation. 

In [None]:
def add_vector(d, data_tuple) : 
    """Add a new vector to the aggregation dictionary
    
    The vectors in the aggregation dictionary are dense since for most years we can expect that 
    this will be the case anyway. Note that we use 32-bit floats to save a bit on memory. 
    
    Arguments: 
        d: the aggregation dictionary
        
        data_tuple: the (year, vec) tuple
        
    Returns: 
        the updated aggregation dictionary 
    """
    # expand the data tuple
    year, vec = data_tuple
    
    if year in d : 
        
        d[year][vec.indices] += vec.values
    else :
        # this is the first time we've encountered this year --> make an empty vector 
        new_vec = np.zeros(vec.size, dtype=np.float32)
        
        # now put in the contents of the current vector
        new_vec[vec.indices] = vec.values
        
        # create the year in the dictionary
        d[year] = new_vec
        
    return d

def add_dicts(d1, d2) : 
    """Add two dictionaries together
    
    Arguments: 
        d1: first dictionary
        
        d2: second dictionary
        
    Returns: 
        merged dictionaries
    """
    
    # iterate through all the items in the second dictionary
    for year, vec in d2.iteritems() : 
        # if this year is also in d1, add the vectors together
        if year in d1 : 
            d1[year] += vec
        # if not, create a new year entry in d1
        else : 
            d1[year] = vec
    return d1

Now we create an RDD of `(year, vec)` pairs using the `publication_year` function we defined at the top:

In [None]:
year_vec = vector_rdd.map(lambda (gid, vec): (publication_year(meta_b.value[gid]), vec))

Before we perform the aggregation, we can do one final bit of optimization. Passing around dictionaries full of large arrays can get expensive very quickly. The memory footprint of our partial results will depend on how heterogeneous the years on each partition or group of partitions are: if most of the data on a partition is for the same key (year in this case) then the dictionary we create on that partition will only contain a handful of vectors.  We can control this by first partitioning the RDD in a way that groups data with the same keys onto the same partitions. 

Spark provides a `partitionBy` method that does exactly this -- by default, it uses a hash function to map the keys to partitions, but you can also pass a custom partitioner if you want. If you look at the Spark UI after executing the next cell, you'll see that the partition step caused some shuffling of data, but that the aggregation itself ran very quickly and with minimal data movement. 

In [None]:
n_partitions = year_vec.getNumPartitions()

In [None]:
# TODO: use an empty dictionary and the two functions defined above as arguments to the treeAggregate method
year_sums = (year_vec.partitionBy(n_partitions)
                     .<FILL IN>)

Note that `year_sums` is a single "value", in this case our aggregate dictionary containing years as keys and vectors representing cummulative word counts as values. 

In [None]:
list(year_sums.iteritems())[0:10]

## Gutenberg Project N-gram viewer

Lets plot some results!

Below we define a plotting function and then make a plot of some interesting examples -- feel free to change the word list...

In [None]:
def plot_usage_frequency(words, year_data, word_lookup, plot_range = [1500,2015]) : 
    years = sorted(year_data.keys())
    tot_count = np.array([year_data[year].sum() for year in years])
    
    if type(words) is not str : 
        n_words = len(words) 
    else : 
        n_words = 1
        words = [words]
        
    for i, word in enumerate(words) : 
        word_ind = word_lookup[word]
        w_count = np.array([year_data[year][word_ind] for year in years])
        
        plt.plot(years, smooth(w_count/(tot_count-w_count)),label=word, color = plt.cm.Set1(1.*i/n_words))
    
    plt.xlim(plot_range)
    plt.xlabel('year')
    plt.ylabel('relative frequency')
    plt.legend(loc='upper left', fontsize = 'small')
    
    
def smooth(x,window_len=11,window='hanning'):
        if x.ndim != 1:
                raise ValueError, "smooth only accepts 1 dimension arrays."
        if x.size < window_len:
                raise ValueError, "Input vector needs to be bigger than window size."
        if window_len<3:
                return x
        if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
                raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
        s=np.r_[2*x[0]-x[window_len-1::-1],x,2*x[-1]-x[-1:-window_len:-1]]
        if window == 'flat': #moving average
                w=np.ones(window_len,'d')
        else:  
                w=eval('np.'+window+'(window_len)')
        y=np.convolve(w/w.sum(),s,mode='same')
        return y[window_len:-window_len+1]

Here are just some illustrative examples -- feel free to try your own...

In [None]:
plot_usage_frequency(['giveth', 'environment', 'machine'], year_sums, word_lookup, [1300, 2015])
plt.ylim(0,0.001);

If you are very motivated, you can adapt the workflow above to work with higher-order n-grams and allow for the lookup of phrases (i.e. "world war") instead of just single words. To do this, you have to create a new `word_lookup` table and regenerate the vectors. Since single words (i.e. one-grams) will dominate, it might make sense to build separate list of top N-grams (top two-grams, top three-grams) and then merge them together into a vocabulary map. Beware that the size of the data will increase quickly for N > 1!

In [None]:
sc.stop()

To continue the exploration of the Gutenberg books corpus, you can move on to the [language classification notebook](part3-lang-classification-EMPTY.ipynb).