In [None]:
import multiprocessing
import glob
import time
import operator
import itertools
import collections
import string
import numpy as np
from scipy import spatial
from scipy.spatial import distance
import nltk
from nltk import corpus
from nltk.corpus import gutenberg
from nltk.corpus import stopwords
from bs4 import BeautifulSoup, SoupStrainer

## Introduction
In this notebook, you will learn about MapReduce, a popular paradigm for distributed computing on big data. Although you will not be working with actual "big data" (all the processing will happen on your computer with relatively small datasets), you will learn how to solve problems with the MapReduce programming model by writing your own map and reduce functions.

### Grading
- MR1: Inverted Index (11pts)
- MR2: Reverse Web Graph (11pts)
- MR3: K-Means Clustering (11pts)

### Importing functions from unsupervised notebook
You'll need your code from the unsupervised.ipynb notebook for the K-Means problem (MR3) in this notebook. Using the menu in Jupyter, you can export code from your unsupervised notebook as a Python script: 
1. Click File -> Download as -> Python (.py)
2. Save file (unsupervised.py) in the same directory as this notebook 
3. (optional) Remove all test code (i.e. lines between AUTOLAB_IGNORE macros) from the script for faster loading time



In [None]:
# AUTOLAB_IGNORE_START
from unsupervised import KMeans
# AUTOLAB_IGNORE_STOP

# MapReduce using Python Multiprocessing

Autolab apparently doesn't communicate well with typical `mapreduce` libraries for Python available out there. So in this assignment, we have provided a very minimalistic framework of Map Reduce implemented using the `multiprocessing` library. Our code is built on top of this [PyMOTW](http://www.doughellmann.com/PyMOTW/) page: https://pymotw.com/3/multiprocessing/mapreduce.html. Please go through the code below and then the example (MR0) to get an idea of what goes under the hood of this framework and also how to use it.

In [None]:
# Copyright (c) 2016 All rights reserved.

# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:

# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.

# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.

# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

def init_worker_context(context_to_set):
    """Called when each worker is initialized, it sets the context which will be shared across all
    mappers.
    """
    global context
    # context can be accessed from the mapper function of every worker
    context = context_to_set

class MapReduceJob(object):
    
    def __init__(self, map_func, reduce_func, num_workers=None, worker_context=None):
        """ Initialize a Map Reduce Job with the map, reduce functions, number of worker threads and the shared context
        across mappers.
        Args:
            map_func:     function to map inputs to intermediate data. Takes as argument one input value and 
                            returns a tuple with the key and a value to be reduced.
            reduce_func:  function to reduce partitioned version of intermediate data to final output. Takes as
                            argument a key as produced by map_func and a list of the values associated with that key.
            num_workers:  int, the number of workers to create in the pool. Defaults to #CPUs on the current host.
            context:      any application-specific type, stores data that should be read-accessible from all workers
                            but it should NOT be written while the MR job is in progress.
        Attributes to set:
            map_func:     function to map inputs to intermediate data. (same description as above)
            reduce_func:  function to reduce intermediate data to final output. (same description as above)
            pool:         multiprocessing.Pool object, with num_workers worker threads. The threads should initialize
                            the worker context passed before running any map jobs.
                            (See init_worker_context function above.)
        """
        self.map_func = map_func
        self.reduce_func = reduce_func
        if worker_context is not None:
            self.pool = multiprocessing.Pool(num_workers, initializer=init_worker_context, initargs=(worker_context,))
        else:
            self.pool = multiprocessing.Pool(num_workers)
    
    def partition(self, mapped_values):
        """Organize the mapped values by their key.
        Args:
            mapped_values: output key-value pairs from mappers
        Outputs:
            list:          returns an unsorted sequence of tuples with a key and a sequence of values.
        """
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()

    def __call__(self, inputs, chunksize=1):
        """Process the inputs through the map and reduce functions given.
        Args:
            inputs:       (array-like) contains the input data to be processed.
            chunksize=1 : the portion of the input data to hand to each worker; can be used to tune performance 
                            during the mapping phase.
        Outputs:
            reduced_values: list of outputs from reduce functions.
        """
        # partition inputs according to chunksize
        indices = list(range(0, len(inputs), chunksize))
        if indices[-1] != len(inputs):
            indices.append(len(inputs))
        inputs_split = [inputs[start:end] for start, end in zip(indices[:-1], indices[1:])]
        # map
        map_responses = self.pool.map(self.map_func, inputs_split)
        # partition by key
        partitioned_data = self.partition(itertools.chain(*map_responses))
        # reduce
        reduced_values = self.pool.map(self.reduce_func, partitioned_data)
        return reduced_values

A quick note: More details about `worker_context` is given in MR3. For MR0, MR1 and MR2, this should left as `None`.

## MR0: Word Count (Example)
A classic example of a MapReduce application is in the counting of words in a corpus. A corpus consists of several documents, which can be processed in parallel. The counts of words from each of these documents can be then be aggregated to produce the total count of each word in the corpus.

### MapReduce Implementation Overview
The map function processes each line of a file, emitting < word, 1> for each word encountered. The reduce function adds together all values for the same word and emits a < word, total count> pair.

Now, go through the code below and make sure you understand how to use our implementation of the MapReduce framework given above.

In [None]:
def file_to_words(filenames):
    """Read a file and return a sequence of (word, occurances) values.
    """
    output = []
    for filename in filenames:
        print(multiprocessing.current_process().name, 'reading', filename)
        with open(filename, 'r') as f:
            for line in f:
                for token in line.strip().split():
                    output.append( (token, 1) )
    print(multiprocessing.current_process().name, 'outputting', output)
    return output

def count_words(item):
    """Convert the partitioned data for a word to a
    tuple containing the word and the number of occurances.
    """
    print(multiprocessing.current_process().name, 'reducing', item)
    word, occurances = item
    return (word, sum(occurances))

# AUTOLAB_IGNORE_START
input_files = glob.glob('mapreduce/wordcount/*.rst')
print(input_files)
wordcount_job = MapReduceJob(file_to_words, count_words, num_workers=2)
print('Word-Count Map-Reduce job initialized.')
time.sleep(1)
word_counts = wordcount_job(input_files, chunksize=1)
print('Word-Count Map-Reduce job completed successfully.')
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
print(word_counts)
# AUTOLAB_IGNORE_STOP

Running the above code yields the following output:

```
['mapreduce/wordcount/file1.rst', 'mapreduce/wordcount/file2.rst', 'mapreduce/wordcount/file3.rst', 'mapreduce/wordcount/file4.rst']
Word-Count Map-Reduce job initialized.
ForkPoolWorker-3 reading mapreduce/wordcount/file2.rst
ForkPoolWorker-4 reading mapreduce/wordcount/file1.rst
ForkPoolWorker-4 outputting [('1', 1), ('2', 1), ('3', 1)]
ForkPoolWorker-3 outputting [('2', 1), ('3', 1), ('4', 1)]
ForkPoolWorker-4 reading mapreduce/wordcount/file3.rst
ForkPoolWorker-3 reading mapreduce/wordcount/file4.rst
ForkPoolWorker-4 outputting [('3', 1), ('4', 1), ('5', 1)]
ForkPoolWorker-3 outputting [('6', 1), ('0', 1), ('1', 1)]
ForkPoolWorker-3 reducing ('2', [1, 1])
ForkPoolWorker-4 reducing ('1', [1, 1])
ForkPoolWorker-4 reducing ('4', [1, 1])
ForkPoolWorker-3 reducing ('3', [1, 1, 1])
ForkPoolWorker-3 reducing ('6', [1])
ForkPoolWorker-4 reducing ('5', [1])
ForkPoolWorker-3 reducing ('0', [1])
Word-Count Map-Reduce job completed successfully.
[('3', 3), ('4', 2), ('2', 2), ('1', 2), ('0', 1), ('6', 1), ('5', 1)]
```

## MR1: Inverted Index (11pts)

An inverted index is a data structure storing a mapping of elements to the list of locations at which the element occurs. In NLP, for example, given a corpora of documents, the elements are usually words, and locations are the document IDs (any unique identifier) and optionally, the position of the word in the document. Thus, a possible entry in the inverted index could be:

*hello : (2, 1), (6, 4), (8,0), (8, 3)*

and it is interpreted as "the word *hello* is present in the 1st position in documents ID 2, 4th position in doc ID 6 and the 0th and 3rd positions in doc ID 8".

**Application:** The concept of an inverted index is central to any modern search engine. E.g., given a query "X, Y", where, we want to retrieve documents containing the words X or/and Y, the engine only needs to look at the union/intersection of the documents in the inverted indices of X and Y (which have been precomputed already)!

### MapReduce Implementation Overview
The map function parses each document, and emits a sequence of < word, (document ID, position) > pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document IDs and emits a <word, list((document ID, position))> pair. The set of all output pairs forms a simple inverted index. It is easy to augment this computation to keep track of word positions.

### Specifications
- Words from a document titled `filename` in gutenberg corpus can be obtained using `nltk.corpus.gutenberg.words(filename)`.
- For every word that is not a stop word, emit < word, (document ID, position) >. Use the stop list from NLTK's `nltk.corpus.stopwords.words('english')`. 
- The words themselves should NOT be processed in any way. E.g., no lemmatization/lower casing/etc.
- Document ID is simply the name of the file, without any extension. E.g., document ID for `austen-emma.txt` is `austen-emma`.
- The inverted index for a given word should be sorted first by the document ID and then by the position of thew ord within the document.

In [None]:
def invindex_map(filenames):
    """Mapper: Read a list of filenames and generate (word, (document ID, position)) for every word in the filename.
    Args:
        filenames : list(str) : Filenames (from NLTK gutenberg corpus) to be processed.
    Outputs:
        (str, (str, int)) : tuple of word and location, which is in turn a tuple of document ID (str) and position of
                                the word within document (int).
    """
    pass

def invindex_reduce(item):
    """Reducer: Given the list of locations of a word, concatenate them into a list 
    (sorted as per specifications).
    Args:
        item : (str, list((str, int))) :  first element is word
    Outputs:
        (str, list((str, int))) :   first element is the word; 
                            second element is the list of (document ID, position)
    """
    pass

# AUTOLAB_IGNORE_START
input_files = nltk.corpus.gutenberg.fileids()
invindex_job = MapReduceJob(invindex_map, invindex_reduce, num_workers=4)
print('Inverted-Index Map-Reduce job initialized.')
time.sleep(1)
invindex = invindex_job(input_files)
print('Inverted-Index Map-Reduce job completed successfully.')
invindex.sort(key=operator.itemgetter(0))
for i, locations in invindex[:10]:
    print(i, len(locations))
# AUTOLAB_IGNORE_STOP

Our implementation yields:
```
Inverted-Index Map-Reduce job initialized.
Inverted-Index Map-Reduce job completed successfully.
! 5730
!!!" 1
!" 1719
!"' 3
!") 4
!"-- 47
!"?' 1
!' 332
!'" 4
!') 1
```

## MR2: Reverse Web Graph (11pts)
One way to represent a directed graph such as the web is using the directed adjacency list. The directed adjacency can be considered as the mapping from each node to the list of nodes it points to (i.e., out-neighbors). In the specific case of the web graph, this is fairly easy to construct because the out-neighbors (outgoing links) for a given webpage can be constricuted by parsing the webpage.

However, suppose that we want the *reverse* adjacency list. That is, for each webpage, we want the list of webpages which point to it (i.e., incoming links). This is the reverse webgraph construction problem, for which an outline of the MapReduce solution is provided below.

### MapReduce Implementation Overview
The map function outputs < target ID, source ID > pairs for each link to a target URL found in a page named "source". The reduce function concatenates the list of all source URLs associated with a given target URL and emits the pair: < target ID, list(source IDs) >.

### Specifications
- Both target and sources pages should be HTML (i.e., .html files). Other links (e.g., to ps files) can be ignored.
- A webpage (source/target) is identified by its complete URL. E.g.,  `http://cs.cornell.edu/Info/Courses/Current/CS415/CS414.html` is a valid webpage ID.
- The URL of a webpage given its filename is obtained from the first line of the file.
- Where only a partial (relative) URL is available (especially, when linked within href tag), create the complete URL using the hierarchical structure of source page's URL. For example, the file `page_0.html` contains a link to `cs415.html` in line 10. Hence, our code should output the following pair: 

`( http://cs.cornell.edu/Info/Courses/Current/CS415/cs415.html, http://cs.cornell.edu/Info/Courses/Current/CS415/CS414.html)`.

- In the final output with the list of source webpage IDs for each target webpage ID, the source webpage IDs must be sorted alphabetically.
- Finally, it should be noted a webpage which appears as a target may not appear as the source because the dataset is obtained from a limited crawl of the websites of these universities.

Now, implement the map and reduce functions below.

In [None]:
def webgraph_map(filenames):
    """Mapper: Read a list of filenames (corresponding to HTML documents) and generate 
    (target ID, source ID) for every hyperlink in the file.
    Args:
        filenames : list(str) : Filenames to be processed.
    Outputs:
        (str, str) : tuple of target and source webpage IDs satisfying specifications.
    """
    pass

def webgraph_reduce(item):
    """Reducer: Given the list of points belonging to a single cluster, compute the new means
    Args:
        item : (str, list(str)) :  first element is the webpage; 
                                    second element is the list of webpages linking to it
    Outputs:
        (str, list(str)) :   first element is the webpage; 
                                second element is the list of webpages linking to it, sorted
                                in the specified order.
    """
    pass

# AUTOLAB_IGNORE_START
input_files = glob.glob('mapreduce/webgraph/*.html')
webgraph_job = MapReduceJob(webgraph_map, webgraph_reduce, num_workers=4)
print('Reverse-Webgraph Map-Reduce job initialized.')
time.sleep(1)
webgraph = webgraph_job(input_files)
print('Reverse-Webgraph Map-Reduce job completed successfully.')
webgraph = sorted(webgraph, key=lambda x: -len(x[1]))
for target, sources in webgraph[:10]:
    print(target, len(sources))
# AUTOLAB_IGNORE_STOP

Our code yields the following output:
```
Reverse-Webgraph Map-Reduce job initialized.
Reverse-Webgraph Map-Reduce job completed successfully.
http://www.cs.wisc.edu/~bestor/bestor.html 44
http://www.gatech.edu/TechHome.html 39
http://www.cc.gatech.edu/gvu/gvutop.html 36
http://cs.nyu.edu/cs/courantnyu.html 33
http://karna.cs.umd.edu:3264/people/minker.html 30
http://www.cs.uiuc.edu/CS_INFO_SERVER/DEPT_INFO/CS_FACULTY/FAC_HTMLS/Faculty_Index.html 30
http://www.cs.bu.edu/faculty/best/Home.html 26
http://www.cs.wisc.edu/~cs302/cs302.html 26
http://www.cs.wisc.edu/~cs302/Consultants/consultants.html 26
http://www.ncsa.uiuc.edu/General/Internet/WWW/HTMLPrimer.html 25
```

## MR3: K-Means Clustering (11pts)
In this section, you will implement the K-means algorithm from the unsupervised.ipynb notebook in the MapReduce framework. The outline of the algorithm is given below.

### Algorithm
```
cluster_centers <- initialize clusters using Kmeans++ algorithm
for i=1..T
    cluster_centers <- output of the MapReduceJob which recomputes the new cluster_centers from the current cluster_centers
end
```

### MapReduce Implementation Overview
The map function outputs < cluster ID, point > pairs for each point based on the distance of the point from the current cluster centers. The reduce function collects the list of all points belonging to the same cluster and recomputes the cluster center: < cluster ID, recomputed cluster center >.
A key difference from the previous MapReduce problems above is that the K-Means algorithm requires the information about the current cluster centers to be shared across all mappers. This is done by passing the `worker_context` variable when initializing the MapReduceJob.

### Specifications
- Use `scipy.spatial.distance.cdist(p1, p2)` to compute the distance between two points `p1` and `p2`.
- When the distances of a point from two cluster centers are tied, assign the point to the cluster with lower ID.
- The cluster center is recomputed by taking a mean of all points that belong to the cluster.

In [None]:
def kmeans_map(points):
    """Mapper: Read a list of points and assign each of them to a cluster.
    Args:
        points : (np.array) : N*d array of N d-dimensional points
    Outputs:
        (int, np.array) : tuple of the cluster ID assigned to the point and that point
    """
    pass

def kmeans_reduce(item):
    """Reducer: Given the list of points belonging to a single cluster, compute the new means
    Args:
        item : (int, list(np.array)) :  first element is the cluster id (0,...,k-1); 
                                        second element is the list of d-dimensional points assigned to this cluster
    Outputs:
        (int, np.array) :   first element is the cluster id (0,...,k-1); 
                            second element is the recomputed cluster center, the mean of all points in this cluster
    """
    pass

def kmeans_train(data, initial_centers, num_iters, chunksize=1, num_workers=None):
    """Learns the Kmeans clustering of the given data starting from the given initialization of cluster centers
    by creating `num_iters` sequential MapReduceJobs.
    Inputs:
        data: np.array : N*d array of points, where each row is a data point
        initial_centers: np.array: k*d array of points, where each row is a center
        chunksize: int: number of data points per mapper
        num_iters: int: number of iterations (also, the number of MapReduceJobs you will create)
        num_workers: int: number of workers in each MapReduceJobs
    Outputs:
        (array-like): N*1 array of cluster assignments
    """
    pass

# AUTOLAB_IGNORE_START
# load data
k = 25
data = np.loadtxt("mapreduce/kmeans/faces_all.txt")
# initial centers using KMeans++ algorithm
initial_centers = KMeans().init_centers(data,k)
# final cluster assignment using map reduce
print('K-means Map-Reduce job initialized.')
cluster_assignment = kmeans_train(data, initial_centers, 10, int(len(data)/20), 4)
print('K-means Map-Reduce job completed successfully.')
print(cluster_assignment)
# AUTOLAB_IGNORE_STOP

Sanity check your code by comparing with the previous homework. Do you get identical cluster assignments, for the same initialization of centers and iteration count?

## For Fun
Try playing with the `num_workers` values by varying it from 1 to 4 (typical number of cores in a machine, although you could have more/fewer!) and then beyond that. How does the running time change? Can you explain it?