# MapReduce

MapReduce is a popular paradigm for distributed computing on big data. While it has been replaced by generalized dataflow systems like [Apache Spark](https://spark.apache.org/) and [Microsoft Dryad](https://www.microsoft.com/en-us/research/project/dryad/), it is a very simple concept that lends itself to distributed computing.

Here you will implement a simple version of MapReduce and use it to solve problems.

In [3]:
import multiprocessing
import time
from typing import TypeVar, Hashable, Dict, List, Callable, Iterable, Tuple, Optional
import collections
import tarfile
import gzip
import itertools
from urllib.parse import urlparse

import nltk
from nltk.corpus import gutenberg
from nltk.corpus import stopwords

from bs4 import BeautifulSoup

from testing.testing import test

def nltk_download_test(nltk_download):
    nltk_download()

@test
def nltk_download():
    nltk.download('stopwords')
    nltk.download('gutenberg')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Xindi\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package gutenberg to
[nltk_data]     C:\Users\Xindi\AppData\Roaming\nltk_data...
[nltk_data]   Package gutenberg is already up-to-date!
### TESTING nltk_download: PASSED 0/0
###



## MapReduce Implementation

MapReduce is conceptually very simple. Read the lecture notes, and optionally the [Wikipedia Entry](https://en.wikipedia.org/wiki/MapReduce) to understand it. Here's a summary:

1. The Map function is called on each element of a list of inputs, producing a list of dictionaries. Each input (in any format) is converted to a dictionary from a key to some intermediate value. Once you figure out what a good intermediate value for the problem you're trying to solve, the Map and Reduce functions are usually obvious. 
2. The Partition function (sometimes called Shuffle or Collate) converts the produced list-of-dictionaries into a dictionary-of-lists. Each key/value pair in the output dictionay is sometimes called a Partition.
3. The Reduce function converts each value in the dictionary-of-lists into a single value, making it a dictionary-of-values.

Now you have to complete this implementation of MapReduce by filling in the partition function. We have included type hints, but you don't need them to complete this; you can optionally read more about them [here](https://mypy.readthedocs.io/en/latest/cheat_sheet_py3.html).

In [4]:
# Type variables for typechecking:
# This is a way to denote the input and output types of mapreduce and various functions.
# We also describe this information in the mapreduce docstring below

TInput = TypeVar("TInput")             # Type of input
TKey = TypeVar("TKey", bound=Hashable) # Key type, must be hashable 
TIntermediate = TypeVar("TIntermediate") # Intermediate representation
TOutput = TypeVar("TOutput")           # The produced output type

def partition_test(partition):
    test.equal(partition([{"A": "A", "B": "B"}, {"B": "B", "C": "C"}]),
              {"A": ["A"], "B": ["B", "B"], "C": ["C"]})

@test
def partition(intermediates : Iterable[Dict[TKey, TIntermediate]]) -> Dict[TKey, List[TIntermediate]]:
        """Organize the mapped values by their key.

        args:
            intermediates : Iterable[Dict[TKey, TIntermediate]] -- a list of dictionaries produced by the mapper

        returns : Dict[TKey, List[TIntermediate]] -- the values in intermediates grouped by key 
        """
#         d={}
#         for i in l:
#             for k in i:
#                 if k not in d:
#                     d[k]=[i[k]]
#                 else:
#                     d[k].append(i[k])
        rv = collections.defaultdict(list)
        for i in intermediates:
            for k in i:
                if k not in rv:
                    if isinstance(i[k],list):
                        rv[k]=i[k]
                    else:
                        rv[k]=[i[k]] 
                else:
                    if isinstance(i[k],list):
                        rv[k].extend(i[k])
                    else:
                        rv[k].append(i[k])
                    
        return rv

### TESTING partition: PASSED 1/1
###



Once you have done that, the MapReduce implementation is quite simple:

In [5]:
# This is a fake multiprocessing.Pool, used to provide single-thread execution
# of map. You get better error messages with this.
class FakePool():
    def map(self, lmbd, data, chunksize=None):
        return list([lmbd(x) for x in data])

In [6]:
def mapreduce(mapper  : Callable[[TInput], Dict[TKey, TIntermediate]],
              reducer : Callable[[Tuple[TKey, List[TIntermediate]]], TOutput],
              data    : Iterable[TInput],
              chunksize_map=1, chunksize_reduce=8, pool=None) -> Dict[TKey, TOutput]:
    """ MapReduce. Map the data using mapper, partition it, and reduce each partition using the reducer.
    
    args:
        mapper  : Callable[[TInput], Dict[TKey, TIntermediate]]
                -- a function that takes in an input element and breaks it out into a dictionary from some key
                   to some intermediate value
        reducer : Callable[[Tuple[TKey, List[TIntermediate]]], TOutput]
                -- a function that takes in a tuple of the key and the list of intermediate values and converts
                   it to a single output value; this conversion should not rely on the key
        data    : Iterable[TInput] -- the input data to map over

    kwargs:
        chunksize_map    : int -- an internal parameter used for performance-tuning
        chunksize_reduce : int -- an internal parameter used for performance-tuning
        pool : multiprocessing.Pool -- the pool providing mapping

    returns : List[TOutput] -- the output of running the mapper and reducer on data.
    """
    if pool is None:
        pool = FakePool()
    
    # Isn't the implementation really simple?
    intermediates = pool.map(mapper, data, chunksize_map)
#     print(intermediates)
    partitions = partition(intermediates)
#     print(partitions)
    return pool.map(reducer, partitions.items(), chunksize_reduce)

Note that we're using the Python multiprocessing library to split the computation across the cores in your computer. There are [some limitations](https://codewithoutrules.com/2018/09/04/python-multiprocessing/) to this: in particular, you cannot use anonymous functions (i.e. `lambda` expressions). If your Python kernel crashes, you may need to reset it.

Great! Now let's apply this to a very simple example:

## Character Count

A classic example of a MapReduce application is in the counting of words in a corpus; here we count _characters_ instead. The map function converts each documentation into the count of words that appear in the document, and the reduce function adds the counts for each word up, producing a tuple as the output type. The keys are words, and the intermediate representation is the count in each string, and the output is a tuple of character and count.

Now, go through the code below and make sure you understand how to use MapReduce:

In [7]:
# import re
# import collections

def cc_map(data : str) -> collections.Counter:
    """ Read a string and convert it into character frequency dictionary

    args:
        data : str -- the input string
    
    returns : Counter[str, int] -- a dictionary of counts, from each character to the number of times it appears
    """
#     data=re.sub(r"[^abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ]", "", data)
    return collections.Counter(data)
 
def cc_reduce(t : Tuple[str, List[int]]):
    """ Find the total number of times a character appears in the input from a list of appearances in each file
    
    args:
        t : Tuple[key, frequencies] -- a tuple of
            key : str -- the character
            frequencies : List[int] -- the number of times it appears in each file
    
    returns: Tuple[str, int] -- a tuple of (word, total number of occurrences)
    """
    key, values = t
    return (key, sum(values))

In [8]:
def count_characters_test(count_characters):
    cc = count_characters([nltk.corpus.gutenberg.raw(f) for f in nltk.corpus.gutenberg.fileids()])
    cc.sort(key=lambda x: x[1], reverse=True)
    test.equal(cc[:8], [(' ', 2000723), ('e', 1108892), ('t', 800450), ('a', 698889), ('o', 662391), ('h', 635854), ('n', 607945), ('s', 540033)])    
    # The most common letter is the space character, followed by the letter 'e'.
    print(cc[:8])

@test
def count_characters(alos):
    """ Count the number of times each character appears in the input data
    
    args:
      alos : List[str] -- a list of strings
    """
    return mapreduce(cc_map, cc_reduce, alos)

[(' ', 2000723), ('e', 1108892), ('t', 800450), ('a', 698889), ('o', 662391), ('h', 635854), ('n', 607945), ('s', 540033)]
### TESTING count_characters: PASSED 1/1
###



Now let's kick it up a notch with the poster child use-case for MapReduce.

## Inverted Index

An inverted index is a mapping (`dict`) of elements to the list of locations at which the element occurs. When Google released their [2004 MapReduce paper](https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf), this was one of the use-cases they cited.

The concept of an inverted index is central to any modern search engine. E.g., given a query "X, Y", where, we want to retrieve documents containing the words X or/and Y, the engine only needs to look at the union/intersection of the documents in the inverted indices of X and Y (which have been precomputed already)!

In our version of this, we will be constructing an inverted index for words in documents in the Project Gutenberg corpus.

### Specification

The types in this problem are:

 - The input type is `Tuple[str, List[str]]`, corresponding to the file name and a list of words
 - The key type is `str`, corresponding to each word
 - The intermediate representation type is `List[Tuple[str, int]]`, which is a list of (filename, position) pairs
 - The output type is `Tuple[str, TIntermediate]`, which is a tuple of word and the intermediate representations, with the added constraint that the intermediate representations are in sorted order.

(To make this easier, we've told you what the intermediate representation is -- when writing MapReduce problems, finding the correct intermediate representation is often the challenge.)

In the map step, your output should not include any stopwords in `nltk.corpus.stopwords.words('english')`. You do not need to lower-case the input or lemmatize words, etc.

In [9]:
def ii_map(data : Tuple[str, List[str]]) -> Dict[str, List[Tuple[str, int]]]:
    """ Inverted index map; compile a list of occurrences of each (non-stopword) word in a document.

    args:
        data : Tuple[filename, words] -- the input data, where:
            filename : str -- the filename to tag each word in the index with
            words : List[str] -- the list of words in the document, in the order they appear

    returns : Dict[str, List[Tuple[str, int]] -- the mapped dictionary, where each word has a corresponding list
        of tuples, each with the filename and index at which it appears.
    """
    sw = set(stopwords.words('english'))
    filename, words = data
#     w1=[]
#     for word in words: 
#         if word not in sw:
#             w1.append(word)
    
    di={}
    for i,k in enumerate(words):
        if k not in di:
            di[k]=[(filename, i)]
        else:
            di[k].append((filename, i))
    di2={}
    for w in di:
        if w not in sw:
            di2[w]=di[w]
    return di2

def ii_reduce(t : Tuple[str, List[List[Tuple[str, int]]]]):
    """ Assemble and sort the inverted index.

    args:
        t : Tuple[key, List[occurrences]] -- a tuple of
            key : str -- the word 
            occurrences : List[Tuple[str, int]] -- the file name and word number in the file

    returns : Tuple[key, occurrences] -- a tuple of
            key : str -- the word 
            occurrences : List[Tuple[str, int]] -- the occurrences, flattened and sorted

    """
    key, occurrences = t
#     print("reduce:")
#     print(key)
#     print(occurrences)
    occurrence=(key,sorted(occurrences, key=lambda element: (element[0], element[1])))
    
    return occurrence

In [10]:
def inverted_index_test(inverted_index):
    tt = inverted_index([("@", list("ABCDEFGHIJKLMNOPQRSTUVWXYZ")), ("%", list("WXYZ"))])
    
    test.equal(tt[:4], [('A', [('@', 0)]), ('B', [('@', 1)]), ('C', [('@', 2)]), ('D', [('@', 3)])])
    test.equal(tt[-2], ('Y', [('%', 2), ('@', 24)]))
    test.equal(tt[-1], ('Z', [('%', 3), ('@', 25)]))

    books = ['austen-emma.txt', 'austen-persuasion.txt', 'austen-sense.txt']
    gutenberg = inverted_index([(f.replace(".txt", "").replace("austen-", ""), list(nltk.corpus.gutenberg.words(f))) for f in books])
    g = dict(gutenberg)

    # Check that you have all entries for some words:
    test.equal(len(g[","]), 27601)
    test.equal(len(g["I"]), 6306)
    test.equal(len(g["could"]), 1837)
    test.equal(len(g["Mr"]), 1587)
    test.equal(len(g["Emma"]), 866)

    # Check output for some words:
    test.equal(g["Austen"], [('emma', 4), ('persuasion', 4), ('sense', 6)])
    # If this works but the next test fails, then you aren't ordering the 
    test.equal(set(g["likelihood"]), set([('emma', 5715), ('emma', 110230), ('persuasion', 68360), ('sense', 83037), ('sense', 120041)]))
    test.equal(g["likelihood"], [('emma', 5715), ('emma', 110230), ('persuasion', 68360), ('sense', 83037), ('sense', 120041)])
    
    # Check which files the name "Emma" is mentioned in:
    test.equal(collections.Counter(v for v, _ in g["Emma"]), {"emma": 865, "persuasion": 1})

@test
def inverted_index(data):
    """ Count the number of times each character appears in the input data
    
    args:
      data : Tuple[str, List[str]] -- a list of (filename, words)
    """
    return mapreduce(ii_map, ii_reduce, data)

### TESTING inverted_index: PASSED 12/12
###



## Reverse Web Graph

The world-wide web can be throught of as a directed graph, where the edge A → B encodes a link in page A that points to page B. We've previously covered adjacency matrices in this course; we can very easily construct a row in the adjacency matrix for each page by parsing each webpage and recording the outgoing links we observe in it.

Now we want the *reverse* adjacency list. For each webpage, we want the list of webpages which point to it (i.e., incoming links). This is the reverse webgraph problem, and we can solve this with MapReduce:

### Specification

The types in this problem are:

 - The input type is `Tuple[str, bytes]`, corresponding to the URL and the raw bytes of the website
 - The key type is `Tuple[str, str]`, corresponding to the URL `netloc` and `path`. We explain these parts in the Hints section.
 - The intermediate type is up to you!
 - The output type is `Tuple[key, Dict[incoming_netloc, incoming_path]]`, where:
     - `key` is a `Tuple[str, str]`, same as the key type.
     - `incoming_netloc` is a `str` which has the `netloc` of the incoming links, and
     - `incoming_path` is a `Set[str]` which is a set of `path`s following the associated `netloc`.

Details:

- When constructing the index, we do not care how many links point from A to B.
- You can choose any way to represent the data.
- You should use `BeautifulSoup` to parse the webpage.
    - Only search for URLs in tags like `<a href="...">`.
    - Only consider URLs ending in `.html`
- URLs can be relative or absolute; you should use [`urlparse`](https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urlparse) to extract the `netloc` and `path` to determine which path this is.
    - If the URL is absolute (such as `http://cs.cornell.edu/Info/Courses/Current/CS415/CS414.html`), then `netloc` will not be blank. You can use the parsed `netloc` and `path` directly.
    - If the URL is relative (such as `/Info/Courses/Current/CS415/CS414.html` or `CS414.html`), then `netloc` will be blank. You can assume the `netloc` of the target is the same as the `netloc` of the source, and:
        - If the `path` has a leading `/`, then it is the complete path of the target.
        - If the `path` does not have a leading `/`, you should replace the final path segment (i.e. everything after the final `/`) with the contents of `path`. You do not need to consider parent paths (i.e. `../`).
- Not all target pages may appear in the source; this is alright.
- Ignore the `Some characters could not be decoded, and were replaced with REPLACEMENT CHARACTER.` message.

The most fiddly part of this problem is in handling URLs; we provide you with `canonicalize_url` and some tests which you can optionally use.

In [11]:
from urllib.parse import urlparse
def canonicalize_url_test(canonicalize_url):
    source = ("source.edu", "/path/segment/leaf.html")
    test.equal(canonicalize_url(source, "https://a.edu/research/test.html"), ("a.edu", "/research/test.html"))
    test.equal(canonicalize_url(source, "http://a.edu/research/test.html"), ("a.edu", "/research/test.html"))
    test.equal(canonicalize_url(source, "/research/test.html"), ("source.edu", "/research/test.html"))
    test.equal(canonicalize_url(source, "research/test.html"), ("source.edu", "/path/segment/research/test.html"))
    test.equal(canonicalize_url(source, "test.html"), ("source.edu", "/path/segment/test.html"))
    # You should pretend .. is a folder name:
    test.equal(canonicalize_url(source, "../test.html"), ("source.edu", "/path/segment/../test.html"))
    test.equal(canonicalize_url(source, "test.pdf"), None)
    test.equal(canonicalize_url(('www.cs.evenbug.edu', '/evenbug.html'), "file:///evenbug.html"), ('www.cs.evenbug.edu', '/evenbug.html'))

@test
def canonicalize_url(source : Tuple[str, str], target_str : str) -> Optional[Tuple[str, str]]:
    """Given a source (netloc, path) and a target (netloc, path), construct the canonical target (netloc, path)

    args:
        source : Tuple[str, str] -- source (netloc, path)
        target_str : str -- target url, not canonical

    returns : Optional[Tuple[str, str]] -- canonical target (netloc, path) or None if the URL should be ignored
    """
    t=None
    o = urlparse(target_str)
    if o.scheme!="":
        netloc=o.netloc
        path=o.path
        t=(netloc, path)
    elif o.path.endswith('.html'):
        netloc=source[0]
        if target_str[0]=="/":
            path=o.path
        else:
            s=source[1].split("/")
            s1=s[len(s)-1]
            p=source[1].replace(s1, "")
            path=p+o.path
        t=(netloc, path)
    if t!=None and t[0]=="":
        t=(source[0], path)
    if t!=None and t[1]=="":
        t=(netloc, source[1])
        
    return t
    pass

### TESTING canonicalize_url: PASSED 8/8
###



In [12]:
def rwg_map(datum):
    """populate the (target, source) dict for every hyperlink in the file.

    args:
        datum : Tuple[str, bytes] -- Tuple of URL and page content

    returns : Dict[target, source] -- where
        target : Tuple[str, str] -- the netloc, path of the destination webpage
        source : YourIntermediateType -- the source webpage, as your chosen type.
    """
#     print("run")
    url, html = datum
    o = urlparse(url)
    source=(o.netloc, o.path)
    soup = BeautifulSoup(html, 'html.parser')
    a=soup.find_all('a', href=True)
    target=[]
    for i in a:
        t=canonicalize_url(source, i['href'])
        target.append(t)
    dis={}
    for i in target:
        dis[i]=url
    return dis

def rwg_reduce(item):
    """Restructure the gathered backlinks

    args:
        item : Tuple[target, sources] -- where
            target : Tuple[str, str] -- is the (netloc, path) of the target webpage
            sources : List[YourIntermediateType] -- is the list of representations of source webpages

    returns: Tuple[key, Dict[incoming_netloc, incoming_path]] -- where
        key : Tuple[str, str] -- same as the key type
        incoming_netloc : str -- which has the `netloc` of the incoming links
        incoming_path : Set[str] -- which is a set of `path`s that follow the associated `netloc`
    """
    target, sources = item
    dic={}
    for i in sources:
        key, value=canonicalize_url(target, i)
        if key not in dic:
            dic[key]=[value]
        else:
            dic[key].append(value)
            
    for k, v in dic.items():
        dic[k]=set(v)
        
    return (target, dic)
    pass

In [13]:
def read_webpages():
    tar = tarfile.open("webgraph.tar.gz", "r:gz")
    data : List[Tuple[str, bytes]] = []
    for member in tar.getmembers():        
        num = member.name[:-5].split("_")[-1]
        try:
            if int(num) < 1000:
                f = tar.extractfile(member)
                if f is not None:
                    # The first line is the URL, all subsequent lines are the data:
                    website : str = next(f).decode("utf-8").strip()
                    content : bytes = f.read()
                    data.append((website, content))
        except ValueError:
            pass

    return data

def reverse_webgraph_test(reverse_webgraph):
    data = read_webpages()

    # Here we use a multiprocessing.Pool, which parallelizes execution over multiple cores.
#     with multiprocessing.Pool() as pool:
    pool = FakePool()
    rw = dict(reverse_webgraph(data, pool=pool))

    #l = [sorted(((k, len(v)) for k, v in rw.items() if len(v) > 1), key=lambda x: x[1])]
    v = {}
    for k in [
    ('www.cs.byu.edu', '/byu.html'),
    ('www.cs.berkeley.edu', '/~russell/aima.html'),
    ('uu-gna.mit.edu:8001', '/uu-gna/text/cc/index.html'),
    ('www.ncsa.uiuc.edu', '/General/Internet/WWW/HTMLPrimer.html'),
    ('www.cs.umass.edu', '/rcfdocs/newhome/index.html')]:
        v[k] = rw[k]
        
    print(v)
    
    
    
@test
def reverse_webgraph(data, pool=None):
    """ Count the number of times each character appears in the input data
    
    args:
      data : Tuple[str, List[str]] -- a list of (filename, words)
    """
#     print("run outside")
    return mapreduce(rwg_map, rwg_reduce, data, pool=FakePool())

{('www.cs.byu.edu', '/byu.html'): {'www.cs.byu.edu': {'/courses/cs531/syllabus.html', '/courses/cs142/syllabus.html', '/courses/cs578/syllabus.html', '/courses/cs678/syllabus.html', '/courses/cs404/syllabus.html'}, 'osm7.cs.byu.edu': {'/cs240/cs240homepage.html', '/cs327/cs327HomePage.html'}}, ('www.cs.berkeley.edu', '/~russell/aima.html'): {'cs.nyu.edu': {'/cs/dept_info/course_home_pages/fall96/G22.2561/index.html', '/cs/dept_info/course_home_pages/spr96/G22.2560/index.html'}, 'www.cs.wisc.edu': {'/~kunen/cs540.html', '/~dyer/cs540.html'}, 'www.cs.indiana.edu': {'/classes/b551/home.html'}}, ('uu-gna.mit.edu:8001', '/uu-gna/text/cc/index.html'): {'www.cslab.uky.edu': {'/~jurek/cs122/cs122.html', '/~jurek/cs420/cs420.html'}, 'www.cs.wisc.edu': {'/~cs564-1/cs564.html'}, 'www.cs.cornell.edu': {'/Info/Courses/Current/CS537/course.html'}}, ('www.ncsa.uiuc.edu', '/General/Internet/WWW/HTMLPrimer.html'): {'www.cs.rutgers.edu': {'/~murdocca/index.html', '/~dls/index.html'}, 'www.cs.ucsb.edu': 

## PageRank

MapReduce can also be used to perform the update step in any iterative computation, such as PageRank. In this assignment we're writing a pagerank that splits the pagerank algorithm into batches of columns.

We will try that on the same graphs you may have used back in Homework 2. For your reference, the simple graph is this:
```
    A -> B -> C
    |    ^
    v    |
    D -> E
``` 

The larger test graph is from Wikipedia.

You are allowed to consult online sources as long as all code remains your own. Bear in mind that a lot of online sources involve making PageRank efficient in matrices so large that no single row or column can be held in memory. 

### Algorithm

We have discussed the PageRank algorithm in class; here is some naive pseudocode for calculating it:

```python
nodes = [...] # A collection of nodes
edges = [...] # A collection of edges

pr = {...} # Uniform initialization of pagerank
d = 0.85   # Smoothing
for i in range(iters):
    update = {...} # Zeros, one for each node
    for source in nodes:
        # Count the number of outgoing edges from source
        num_outgoing = len(e for e in edges if e is (source -> *))
        
        # If this is a dead-end node:
        if num_outgoing == 0:
            for destination in nodes:
                update[destination] += pr[source]/len(nodes)
        # This node has outgoing connections:
        else:
            for destination in nodes:
                if (source -> destination) in edges:
                    update[destination] += pr[source]/num_outgoing

    # Update with smoothing:
    for node in nodes:
        pr[node] = update[node]*d + (1-d)/n
```

Note that you should not actually use this algorithm for calculating this; that would be too slow. Instead, you should:

1. Observe that your function is given `num_outgoing`, the number of outgoing edges from each node, and `incoming`, the source nodes that have edges ending in your nodes.
2. Rewrite the above pseudocode to calculate the new pagerank for a single node using `incoming` rather than as shown in the pseudocode above.
3. Implement this using PageRank (Additional hint: the reduce function is very simple! All the heavy lifting is done in the map function.)
4. Verify that your algorithm is numerically correct on the small, and then the large test case provided.
5. Time your algorithm and make sure it completes all tests in under 120s. The Optimization section has hints on this.

It may be a good idea to come to TA hours with pseudocode or ideas.

### Optimization

One of the challenges is making your code run quickly. We've already arranged the code so you should be able to meet the time requirement. Also, we will be evaluating your code on only a few iterations instead of 100 iterations. Ensure that your function takes under 120s on a modern laptop.

Here are some hints on optimization:

1. While you have sufficient information to solve the problem with the arguments to `pr_map`, you can significantly improve the speed by precomputing a little information. Feel free to change `gen` in `pagerank` to pass precomputed information to the workers.
2. It may not be at all necessary to improve this, though, so submit your code once before trying to optimize it. 
3. We use the generator `gen` to bundle arguments to feed them to `mapreduce`. We are grouping the incoming nodes into chunks of `2**11 = 2048` nodes; we experimentally found that this balances the overhead of transferring data to the worker processes against the amount of work per chunk. You may change this. 


You will notice that this method is much slower than that in Homework 2. It took about 6s to run all tests using the original sparse matrix implementation on a single core; this will take about a 60s to do the same on a 6-core machine. The main reasons for the slow-down are the setup time (in spawning new processes for the multiprocessing pool), the communication overhead (marshaling, sending data to each process, and unmarshaling it), and the use of the Python dictionary instead of heavily-optimized Numpy sparse matrix math.

### Specification

You should complete `pr_map` and `pr_reduce` (and optionally modify `pagerank`) to calculate the pagerank of nodes in the input graph, following the type signatures. 100 iterations of this should complete in under 120s. You may choose any intermediate representation (though a very simple one is sufficient to solve the problem).

In [14]:
# Utility function to read the edges:
def read_graph(basename="wikipedia_small"):
    nodes = []
    num_outgoing = {}
    incoming = {}
    with gzip.open(f"{basename}.nodes.gz", 'rt', encoding="utf-8", newline="") as f:
        for a in f:
            a = a.strip()
            nodes.append(a)
            num_outgoing[a] = 0
            incoming[a] = []

    with gzip.open(f"{basename}.graph.gz", 'rt', encoding="utf-8", newline="") as f:
        links = []
        for row in f:
            i, j = tuple(row.strip().split())
            num_outgoing[nodes[int(i)]] += 1
            incoming[nodes[int(j)]].append(nodes[int(i)])
    return num_outgoing, incoming

# Utility function to help group the input into chunks:
# https://docs.python.org/3/library/itertools.html
def grouper(iterable, n, fillvalue=None):
    "Collect data into fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
    args = [iter(iterable)] * n
    return itertools.zip_longest(fillvalue=fillvalue, *args)

In [50]:
def pagerank_test(pagerank):
    G_num_outgoing = {"A": 2, "B": 1, "C": 0, "D": 1, "E": 1}
    G_incoming = { "A": set(), "B": set("AE"), "C": {"B"}, "D": {"A"}, "E": {"D"} }

    pr = pagerank(G_num_outgoing, G_incoming)
    test.true(abs(pr['A']-0.08510862387068166) < 1e-7)
    test.true(abs(pr['B']-0.28124676686965944) < 1e-7)
    test.true(abs(pr['C']-0.3241683757098922) < 1e-7)
    test.true(abs(pr['D']-0.12127978901572137) < 1e-7)
    test.true(abs(pr['E']-0.18819644453404483) < 1e-7)
    
    # Remove this line to run the rest of the tests:
    GW_num_outgoing, GW_incoming = read_graph()
    
    with multiprocessing.Pool() as pool:
        start_time = time.perf_counter()
        pr = pagerank(GW_num_outgoing, GW_incoming, pool=pool, iters=100)
        duration = time.perf_counter() - start_time
        
    test.equal(len(pr), 24166)

    # Numerical check
    test.true(abs(pr['United_States'] - 0.00275188705264) < 1e-5)
    test.true(abs(pr['2008']          - 0.00217342514773) < 1e-5)
    test.true(abs(pr['Canada']        - 0.00109896195215) < 1e-5)
    test.true(abs(pr['World_War_II']  - 0.00104913079624) < 1e-5)
    test.true(abs(pr['List_of_African_films'] - 0.00100713870383) < 1e-5)
    test.true(abs(pr['Europe']        - 0.000937690025073) < 1e-5)
    test.true(abs(pr['English_language'] - 0.000908144359626) < 1e-5)
    test.true(abs(pr['Geographic_coordinate_system'] - 0.000891711151403) < 1e-5)
    test.true(abs(pr['Latin']         - 0.000888662228804) < 1e-5)

    print(f"Completed pagerank in {round(duration, 1)}s")
    test.true(duration < 120)

In [32]:
# def pr_map(inp):
#     """PageRank map function
    
#     args:
#         inp: Tuple[entries, pr, num_outgoing, d,...] -- where
#             entries : List[Tuple[node, incoming]] -- a list of nodes to process, where
#                 node : str -- the node id
#                 incoming : Collection[str] -- the node ids that have an edge to this node
#             pr : Dict[str, float] -- from node id to pagerank at the start of the iteration
#             num_outgoing : Dict[str, int] -- from node id to the number of edges leaving that node
#             d : float -- the damping factor
#             ...additional entries to speed up your code
    
#     returns : Dict[str, Intermediate], where the key is the node id and Intermediate is whatever your
#             intermediate step representation is. Each entry in entries must be represented in your output
#     """
# #     entries, pr, num_outgoing, d, terminal_v = inp

# #     update={}
# #     pr1={}
# #     # We remove null entries created by grouper:
# #     for node, incoming_links in filter(lambda x: x, entries):
# #         update[node]=0
# #         if incoming_links != set():
# #             for i in incoming_links:
# #                 update[node]+=pr[i]/num_outgoing[i]+terminal_v
        
# # #     for k in out_zero:
# # #         for k1 in update.keys():
# # #             update[k1] += pr[k]/len(set(num_outgoing))


# #     for node in update.keys():
# # #         for k in out_zero:
# # #             update[node] += pr[k]/len(set(num_outgoing))
# #         pr1[node] = update[node]*d + (1-d)/len(set(num_outgoing))


#     entries, pr, num_outgoing, d, out_zero = inp

#     update={}
#     pr1={}
#     # We remove null entries created by grouper:
#     for node, incoming_links in filter(lambda x: x, entries):
#         update[node]=0
#         if incoming_links != set():
#             for i in incoming_links:
#                 update[node]+=pr[i]/num_outgoing[i]
        
# #     for k in out_zero:
# #         for k1 in update.keys():
# #             update[k1] += pr[k]/len(set(num_outgoing))


#     for node in update.keys():
#         for k in out_zero:
#             update[node] += pr[k]/len(set(num_outgoing))
#             print("in", pr[k]/len(set(num_outgoing)))
#         print("update", node, update[node])
#         pr1[node] = update[node]*d + (1-d)/len(set(num_outgoing))
        
        
# #     entries, pr, num_outgoing, d = inp

# #     update={}
# #     pr1={}
# #     # We remove null entries created by grouper:
# #     for node, incoming_links in filter(lambda x: x, entries):
# #         update[node]=0
# #         if incoming_links != set():
# #             for i in incoming_links:
# #                 update[node]+=pr[i]/num_outgoing[i]
        
# #     for k, v in num_outgoing.items():
# #         if v == 0:
# #             for k1 in update.keys():
# #                 update[k1] += pr[k]/len(set(num_outgoing))


# #     for node in update.keys():
# #         pr1[node] = update[node]*d + (1-d)/len(set(num_outgoing))
# #         pr[node] = update[node]*d + (1-d)/len(set(num_outgoing))

# #*********************************************
# #     destination=[]
# #     d1={}
# #     # We remove null entries created by grouper:
# #     for node, incoming_links in filter(lambda x: x, entries):
# #         destination.append(node)
# #         for i in incoming_links:
# #             if i in d1:
# #                 d1[i].append(node)
            
# #             else:
# #                 d1[i]=[node]

# #     update={}
# #     pr1={}
# #     for i in destination:
# #         update[i]=0
# #     for k, v in num_outgoing.items():
# #         if v == 0:
# #             for j in destination:
# #                 update[j] += pr[k]/len(set(num_outgoing))
    
# #     for node, outgoing in d1.items():
# #             for j in destination:
# #                 if j in outgoing:
# #                     update[j] += pr[node]/num_outgoing[node]

# #     for node in destination:
# #         pr1[node] = update[node]*d + (1-d)/len(set(num_outgoing))
# # #         pr[node] = update[node]*d + (1-d)/len(set(num_outgoing))
#     return pr1

# def pr_reduce(inp):
#     """PageRank reduce function
    
#     args:
#         inp : Tuple[str, List[Intermediate]]
        
#     returns: Tuple[str, float] -- the node id and its weight at the end of this iteration
#     """
#     node, list_of_reps = inp
#     return node, list_of_reps[0]

# @test
# def pagerank(num_outgoing, incoming, d=0.85, iters=100, pool=None):
#     """ Compute the PageRank score for each node in the network using a MapReduce version of the power method

#     args:
#         num_outgoing: Dict[str, int] -- the number of outgoing links from each node
#         incoming: Dict[str, Set[str]] -- the set of pages that link to each node

#     kwargs:
#         d : float -- damping factor (note that this is defined the same as homework 2)
#         iters : int -- number of iterations

#     returns: Dict[str, int] -- the importance score for each node
#     """
#     n = len(num_outgoing)
#     # Initialize the pagerank as a uniform distribution. You should not change this:
#     pr = {k : 1./n for k in num_outgoing.keys()}
#     destination=[]
#     d1={}
#     n_terminal = 0
#     for k, v in num_outgoing.items():
#         if v==0:
#             n_terminal=n_terminal+1
# #     terminal_v = (1./n)/n
#     out_zero = [k for k, v, in num_outgoing.items() if v==0]
#     # We remove null entries created by grouper:
# #     for node, incoming_links in filter(lambda x: x, entries):
# #         destination.append(node)
# #         for i in incoming_links:
# #             if i in d1:
# #                 d1[i].append(node)

# #             else:
# #                 d1[i]=[node]

#     for i in range(iters):
#         terminal_v = 0
#         for j in out_zero:
#             terminal_v=terminal_v+pr[j]/n
#         print("out", terminal_v)
#         gen = ((entries, pr, num_outgoing, d, out_zero) for entries in grouper(incoming.items(), 2**11))
#         pr = dict(mapreduce(pr_map, pr_reduce, gen, pool=None))
#     return pr

out 0.04
in 0.04
update A 0.04
in 0.04
update B 0.34
in 0.04
update C 0.24000000000000002
in 0.04
update D 0.14
in 0.04
update E 0.24000000000000002
out 0.0468
in 0.0468
update A 0.0468
in 0.0468
update B 0.3128
in 0.0468
update C 0.36580000000000007
in 0.0468
update D 0.07880000000000001
in 0.0468
update E 0.19580000000000003
out 0.06818600000000001
in 0.06818600000000001
update A 0.06818600000000001
in 0.06818600000000001
update B 0.29950600000000005
in 0.06818600000000001
update C 0.36406600000000006
in 0.06818600000000001
update D 0.10307600000000001
in 0.06818600000000001
update E 0.16516600000000004
out 0.06789122000000002
in 0.06789122000000002
update A 0.06789122000000002
in 0.06789122000000002
update B 0.28226137000000007
in 0.06789122000000002
update C 0.3524713200000001
in 0.06789122000000002
update D 0.11187027000000002
in 0.06789122000000002
update E 0.18550582000000004
out 0.06592012440000003
in 0.06592012440000003
update A 0.06592012440000003
in 0.06592012440000003
updat

update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.0648336751419785
update C 0.34608044201163823
in 0.0648336751419785
update D 0.10738798707731938
in 0.0648336751419785
update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.0648336751419785
update C 0.34608044201163823
in 0.0648336751419785
update D 0.10738798707731938
in 0.0648336751419785
update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.0648336751419785
update C 0.34608044201163823
in 0.0648336751419785
update D 0.10738798707731938
in 0.0648336751419785
update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.06483367514

update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.0648336751419785
update C 0.34608044201163823
in 0.0648336751419785
update D 0.10738798707731938
in 0.0648336751419785
update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.0648336751419785
update C 0.34608044201163823
in 0.0648336751419785
update D 0.10738798707731938
in 0.0648336751419785
update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.0648336751419785
update C 0.34608044201163823
in 0.0648336751419785
update D 0.10738798707731938
in 0.0648336751419785
update E 0.18611346415769997
out 0.0648336751419785
in 0.0648336751419785
update A 0.0648336751419785
in 0.0648336751419785
update B 0.29558443161136433
in 0.06483367514

In [None]:
def pr_map(inp):
    """PageRank map function
    
    args:
        inp: Tuple[entries, pr, num_outgoing, d,...] -- where
            entries : List[Tuple[node, incoming]] -- a list of nodes to process, where
                node : str -- the node id
                incoming : Collection[str] -- the node ids that have an edge to this node
            pr : Dict[str, float] -- from node id to pagerank at the start of the iteration
            num_outgoing : Dict[str, int] -- from node id to the number of edges leaving that node
            d : float -- the damping factor
            ...additional entries to speed up your code
    
    returns : Dict[str, Intermediate], where the key is the node id and Intermediate is whatever your
            intermediate step representation is. Each entry in entries must be represented in your output
    """
    entries, pr, num_outgoing, d, terminal_v = inp

    update={}
    pr1={}
    # We remove null entries created by grouper:
    for node, incoming_links in filter(lambda x: x, entries):
        update[node]=terminal_v
        for i in incoming_links:
                update[node]+=pr[i]/num_outgoing[i]
        


    for node, incoming_links in filter(lambda x: x, entries):
        pr1[node] = update[node]*d + (1-d)/len(num_outgoing)
    return pr1

def pr_reduce(inp):
    """PageRank reduce function
    
    args:
        inp : Tuple[str, List[Intermediate]]
        
    returns: Tuple[str, float] -- the node id and its weight at the end of this iteration
    """
    node, list_of_reps = inp
    
    return node, sum(list_of_reps)

@test
def pagerank(num_outgoing, incoming, d=0.85, iters=100, pool=None):
    """ Compute the PageRank score for each node in the network using a MapReduce version of the power method

    args:
        num_outgoing: Dict[str, int] -- the number of outgoing links from each node
        incoming: Dict[str, Set[str]] -- the set of pages that link to each node

    kwargs:
        d : float -- damping factor (note that this is defined the same as homework 2)
        iters : int -- number of iterations

    returns: Dict[str, int] -- the importance score for each node
    """
    n = len(num_outgoing)
    # Initialize the pagerank as a uniform distribution. You should not change this:
    pr = {k : 1./n for k in num_outgoing.keys()}
    out_zero = [k for k, v, in num_outgoing.items() if v==0]

    for i in range(iters):
        terminal_v = 0
        for j in out_zero:
            terminal_v=terminal_v+pr[j]/n
        gen = ((entries, pr, num_outgoing, d, terminal_v) for entries in grouper(incoming.items(), 2**11))
        pr = dict(mapreduce(pr_map, pr_reduce, gen, pool=None))
    return pr