# Overview
We are going to be running mapreduce jobs on the wikipedia dataset.  The
dataset is available (pre-chunked) on
[s3](s3://thedataincubator-course/mrdata/simple/).

To solve this, you will need to run mrjob on AWS EMR.  The data is stored on
S3.  For development, we highly recommend you download a single chunk onto your
computer:

```
wget https://s3.amazonaws.com/thedataincubator-course/mrdata/simple/part-00026.xml.bz2
```

Then take a small sample that is small enough that mrjob can process the it in
a few seconds.  Your development cycle should be:

  1. Get your job to work locally on the sampled chunk.  This will greatly
  speed-up your development.
  2. Get your job to work localy on the full chunk you downloaded.
  3. Get your job to work on EMR for all of simple english.
  4. Get your job to work on EMR for all of english wikipedia.

By default, mrjob (when run on EMR) only uploads the mrjob python file and no
supporting libraries.

  1. You can always import members of the standard library as they come with
  any python distribution.
  2. EMR comes with `numpy`, `lxml` and a couple of other python libraries. Be
     warned though, they are probably different versions than you have.
  3. If you wish to include code from other local python files, use [tar them
     up](https://pythonhosted.org/mrjob/guides/setup-cookbook.html#running-a-makefile-inside-your-source-dir))
     to upload them to EMR.

*Warning*: EMR has rather old software installed (e.g. `python 2.6` instead of
2.7, `numpy 1.4` instead of 1.9. Make sure your code runs on these older
libraries before uploading jobs.

Finally, if you want to structure your mrjob code well, you will want to have
multiple mrjobs in a single module.  As a matter of good style, we recommend
that you write each separate mapreduce as it's own class.  Then write a wrapper
module that defines the logic for combining steps.  You can combine multiple
steps by overriding the [steps
method](https://pythonhosted.org/mrjob/guides/writing-mrjobs.html#multi-step-jobs).

Here are some helpful articles on how mrjob works and how to pass parameters to
your script:
  - [How mrjob is
    run](https://pythonhosted.org/mrjob/guides/concepts.html#how-your-program-is-run)
  - [Adding passthrough
  options](https://pythonhosted.org/mrjob/job.html#mrjob.job.MRJob.add_passthrough_option)
  - [An example of someone solving similar
  problems](http://arunxjacob.blogspot.com/2013/11/hadoop-streaming-with-mrjob.html)

Finally, if you are find yourself processing a lot of special cases, you are
probably doing it wrong.  For example, mapreduce jobs for
`Top100WordsSimpleWikipediaPlain`, `Top100WordsSimpleWikipediaText`, and
`Top100WordsSimpleWikipediaNoMetaData` are less than 150 lines of code
(including generous blank lines and biolerplate code)

## top100_words_simple_plain
Return a list of the top 100 words in article text (in no particular order).
You will need to write this as two map reduces:

1. The first job is similar to standard wordcount but with a few tweaks. 
   The data provided for wikipedia is in in *.xml.bz2 format.  Mrjob will
   automatically decompress bz2.  We'll deal with the xml in the next question.
   For now, just treat it as text.  A few hints:
   - To split the words, use the regular expression "\w+".
   - Words are not case sensitive: i.e. "The" and "the" reference to the same
     word.  You can use `string.lower()` to get a single case-insenstive
     canonical version of the data.

2. The second job will take a collection of pairs `(word, count)` and filter
   for only the highest 100.  A few notes:
   - To make the job more reusable make the job find the largest `n` words
     where `n` is a parameter obtained via
     [`get_jobconf_value`](https://pythonhosted.org/mrjob/utils-compat.html).
   - We have to keep track of at most the `n` most popular words.  As long as
     `n` is small, e.g. 100, we can keep track of the *running largest n* in
     memory wtih a priority-queue. We suggest taking a look at `heapq`, part of
     the Python standard library for this.  You may be asked about this data
     structure on an interview so it is good to get practice with it now.
   - To obtain the largest `n`, we need to first obtain the largest n elements
     per chunk from the mapper, output them to the same key (reducer), and then
     collect the largest n elements of those in the reducer (**Question:** why
     does this gaurantee that we have found the largest n over the entire set?)
     Given that we are using a priority queue, we will need to first initialize
     it, then add it for each record, and then output the top `n` after seeing
     each record.  For mappers, notice that these three phases correspond
     nicely to these three functions:
        - `mapper_init`
        - `mapper`
        - `mapper_final`
     There are similar functions in the reducer.  Also, while the run method
     to launch the mapreduce job is a classmethod:
        ``` 
          if __name__ == '__main__': MRWordCount.run() 
        ```
     actual objects are instantiated on the map and reduce nodes.  More
     precisely, a separate mapper class is instantiated in each map node and a
     reducer class is instantiated in each reducer node.  This means that the
     three mapper functions can pass state through `self`, e.g. `self.heap`.
     Remember that to pass state between the map and reduce phase, you will
     have to use `yield` in the mapper and read each line in the reducer.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.compat import get_jobconf_value
import bz2
import re
import heapq


WORD_RE = re.compile(r"[\w]+")
class FirstJob(MRJob):

    #Job 1
    def mapper_get_words (self,_,line):
        print line
        print 10 
        for word in WORD_RE.findall(line):
            yield (word.lower(),1)
    def combiner_count_words(self, word, counts):
        yield (word, sum(counts))
            
    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)
    
    # Job 2
    def init_get_100(self):
        self.counts = []

    def mapper_gettop100(self,_, word_count_pairs):
        heapq.heappush(self.counts, word_count_pairs)


    def mapper_final_top100 (self):        
        largest = heapq.nlargest(100,self.counts)
        for count,word in largest:
            yield ('heap',(count,word))

    def reducer_init(self):
        self.top100list = []


    def reducer_get_top100(self,key,top100):
        for word_count in top100:
            heapq.heappush(self.top100list, (word_count[0],word_count[1]))
    
    def reducer_final(self):
        largest = heapq.nlargest(100,self.top100list)
        words = [(word,  int(count)) for count,word in largest]
        yield (None, words)      


        
    def steps(self):
            return [
                MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
                MRStep(mapper_init = self.init_get_100,
                       mapper = self.mapper_gettop100,
                      mapper_final = self.mapper_final_top100,
                       reducer_init = self.reducer_init,
                       reducer = self.reducer_get_top100,
                       reducer_final = self.reducer_final)
                    ]


if __name__ =="__main__":
    FirstJob.run()

## top100_words_simple_text
Notice that the words `page` and `text` make it into the top 100 words in the
previous problem.  These are not common English words!  If you look at the xml
formatting, you'll realize that these are xml tags.  You should parse the files
so that tags like `<page></page>` should not be included in your total, nor
should words outside of the tag `<text></text>`.

*Hints*:
1. Both `xml.etree.elementtree` from the Python stdlib or `lxml.etree` parse
   xml. `lxml` is significantly faster though.

2. In order to parse the text, we will have to accumulate a `<page></page>`
   worth of data and then parse the resulting Wikipedia format string.

3. Don't forget that the Wikipedia format can have multiple revisions but you
   only want the latest one.


In [None]:
from mrjob.job import MRJob
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import heapq
import Queue
import xml.etree.ElementTree as ET

START_PAGE = re.compile('.*<page>*.')
END_PAGE = re.compile('.*</page>*.')
WORD_RE = re.compile(r"[\w]+")
class SecondJob(MRJob):

    def mapper_get_page_init(self):
        self.pages = Queue.Queue()
        self.queue= Queue.Queue()

    def mapper_get_page(self,_,line):
        self.queue.put(line)
        if END_PAGE.match(line):
            page = ''
            while not self.queue.empty():
                page+=self.queue.get()
            if START_PAGE.match(page):
                yield ('page',page)

    def reducer_get_page(self,key,pages):
        for page in pages:
            yield ('page',page)

    # Job 2
    def mapper_parser(self,key,page):
        tree = ET.fromstring(page.encode('utf-8'))
        text_tag = [(x.tag,x.text) for x in tree.getiterator()]
        for tag, text in text_tag:
            if tag=="text":
                if text:
                    for word in WORD_RE.findall(text):
                        yield(word.lower(),1)

    def combiner_count_words(self, word, counts):
        yield (word, sum(counts))
            
    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)
    
    # Job 3
    
    def init_get_100(self):
        self.counts = []

    def mapper_gettop100(self,_, word_count_pairs):
        heapq.heappush(self.counts, word_count_pairs)


    def mapper_final_top100 (self):        
        largest = heapq.nlargest(100,self.counts)
        for count,word in largest:
            yield ('heap',(count,word))

    def reducer_init(self):
        self.top100list = []


    def reducer_get_top100(self,key,top100):
        for word_count in top100:
            heapq.heappush(self.top100list, (word_count[0],word_count[1]))
    
    def reducer_final(self):
        largest = heapq.nlargest(100,self.top100list)
        words = [(word,  int(count)) for count,word in largest]
        yield (None, words)   

    
        
    def steps(self):
            return [
                MRStep(mapper_init=self.mapper_get_page_init,
                    mapper = self.mapper_get_page,
                   reducer=self.reducer_get_page),
                MRStep(mapper = self.mapper_parser,
                       combiner = self.combiner_count_words,
                       reducer = self.reducer_count_words),
                MRStep(mapper_init = self.init_get_100,
                       mapper = self.mapper_gettop100,
                       mapper_final = self.mapper_final_top100,
                       reducer_init = self.reducer_init,
                       reducer = self.reducer_get_top100,
                       reducer_final = self.reducer_final),
                    ]


if __name__ =="__main__":
    SecondJob.run()

## top100_words_simple_no_metadata
Finally, notice that 'www' and 'http' make it into the list of top 100 words in
the previous problem.  These are also not common English words!  These are
clearly the url in hyperlinks.  Looking at the format of [Wikipedia
links](http://en.wikipedia.org/wiki/Help:Wiki_markup#Links_and_URLs) and
[citations](http://en.wikipedia.org/wiki/Help:Wiki_markup#References_and_citing_sources),
you'll notice that they tend to appear within single and double brackets and
curly braces.

In [None]:
# Third Job
from mrjob.job import MRJob
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import heapq
import Queue
import xml.etree.ElementTree as ET
import mwparserfromhell

START_PAGE = re.compile('.*<page>*.')
END_PAGE = re.compile('.*</page>*.')
WORD_RE = re.compile(r"[\w]+")
class ThirdJob(MRJob):

    def mapper_get_page_init(self):
        self.pages = Queue.Queue()
        self.queue= Queue.Queue()

    def mapper_get_page(self,_,line):
        self.queue.put(line)
        if END_PAGE.match(line):
            page = ''
            while not self.queue.empty():
                page+=self.queue.get()
            if START_PAGE.match(page):
                yield ('page',page)

    def reducer_get_page(self,key,pages):
        for page in pages:
            yield ('page',page)

    # Job 2
    def mapper_parser(self,key,page):
        tree = ET.fromstring(page.encode('utf-8'))
        text_tag = [(x.tag,x.text) for x in tree.getiterator()]
        for tag, text in text_tag:
            if tag=="text":
                if text:
                    pure_text = re.sub(r'\([^)]*\)*|\<[^>]*\>*|\[[^]]*\]*|\{[^}]*\}*', '', text)
                    for word in WORD_RE.findall(pure_text):
                        yield(word.lower(),1)

    def combiner_count_words(self, word, counts):
        yield (word, sum(counts))
            
    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)
    
    # Job 3
    
    def init_get_100(self):
        self.counts = []

    def mapper_gettop100(self,_, word_count_pairs):
        heapq.heappush(self.counts, word_count_pairs)


    def mapper_final_top100 (self):        
        largest = heapq.nlargest(100,self.counts)
        for count,word in largest:
            yield ('heap',(count,word))

    def reducer_init(self):
        self.top100list = []


    def reducer_get_top100(self,key,top100):
        for word_count in top100:
            heapq.heappush(self.top100list, (word_count[0],word_count[1]))
    
    def reducer_final(self):
        largest = heapq.nlargest(100,self.top100list)
        words = [(word,  int(count)) for count,word in largest]
        yield (None, words)  

    
        
    def steps(self):
            return [
                MRStep(mapper_init=self.mapper_get_page_init,
                    mapper = self.mapper_get_page,
                   reducer=self.reducer_get_page),
                MRStep(mapper = self.mapper_parser,
                       combiner = self.combiner_count_words,
                       reducer = self.reducer_count_words),
                MRStep(mapper_init = self.init_get_100,
                       mapper = self.mapper_gettop100,
                       mapper_final = self.mapper_final_top100,
                       reducer_init = self.reducer_init,
                       reducer = self.reducer_get_top100,
                       reducer_final = self.reducer_final),
                    ]


if __name__ =="__main__":
    ThirdJob.run()

## wikipedia_entropy
The [Shannon
entropy](https://en.wikipedia.org/wiki/Entropy_(information_theory)) of a
discrete random variable with probability mass function p(x) is: 

    $$ H(X) = - \sum p(x) \log_2 p(x) $$ 

You can think of the Shannon entropy as the number of bits needed to store
things if we had perfect compression.  It is also closely tied to the notion of
entropy from physics.

You'll be estimating the Shannon entropy of different Simple English and Thai
based off of their Wikipedias. Do this with n-grams of characters, by first
calculating the entropy of a single n-gram and then dividing by n to get the
per-character entropy. Use n-grams of size 1, 2, 3, 4, 5, 10 and 15.  How
should our per-character entropy estimates vary based off of n?  How should
they vary by the size of the corpus? How much data would we need to get
reasonable entropy estimates for each n?

The data you need is available at:
    - https://s3.amazonaws.com/thedataincubator-course/mrdata/simple/part-000*
    - https://s3.amazonaws.com/thedataincubator-course/mrdata/thai/part-000*

In [None]:
from mrjob.job import MRJob
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import heapq
import Queue
import xml.etree.ElementTree as ET
import mwparserfromhell
import math

START_PAGE = re.compile('.*<page>*.')
END_PAGE = re.compile('.*</page>*.')
WORD_RE = re.compile(r"[\w]+")
THAI_RE = re.compile(r'[\u0E00-\u0EFF]+')
class FourthJob(MRJob):

    def mapper_get_page_init(self):
        self.pages = Queue.Queue()
        self.queue= Queue.Queue()

    def mapper_get_page(self,_,line):
        self.queue.put(line)
        if END_PAGE.match(line):
            page = ''
            while not self.queue.empty():
                page+=self.queue.get()
            if START_PAGE.match(page):
                yield ('page',page)

    def reducer_get_page(self,key,pages):
        for page in pages:
            yield ('page',page)

    # Job 2
    def mapper_parser_init(self):
        self.n=1


    def mapper_parser(self,key,page):
        tree = ET.fromstring(page.encode('utf-8'))
        text_tag = [(x.tag,x.text) for x in tree.getiterator()]
        #n_list = [1,2,3,4,5,10,15]
        #n_list = [1,2]
        for tag, text in text_tag:
            if tag=="text":
                if text:
                    wikicode = mwparserfromhell.parse(text)
                    pure_text = " ".join(" ".join(fragment.value.split()) for fragment in wikicode.filter_text())
                    pure_text_ws = " ".join(pure_text.split())
                    for n_gram in [pure_text_ws[i:i+10] for i in range(len(pure_text_ws))]:
                        yield (n_gram,1)

    
    def combiner_count_words(self, word, counts):
        yield (word, sum(counts))
            
    def reducer_count_words(self, word, counts):
        item = sum(counts)
        p1 = item*math.log(item,2)
        yield word,(p1,item)

    # Job 3
    def mapper_entropy(self,key,vals):
        p1,count = vals
        yield None,(p1,count)

    def reducer_entropy(self,_,vals):
        total = 0
        p1_sum = 0
        for val in vals:
            p1,count = val
            total+=count
            p1_sum+=p1
        entropy = (math.log(total,2)-p1_sum/total)/10
        yield None,entropy

    

    def steps(self):
            return [
                MRStep(mapper_init=self.mapper_get_page_init,
                    mapper = self.mapper_get_page,
                   reducer=self.reducer_get_page),
                MRStep(mapper_init = self.mapper_parser_init,
                       mapper = self.mapper_parser,
                       combiner = self.combiner_count_words,
                       reducer = self.reducer_count_words),
                    MRStep(
                        mapper = self.mapper_entropy,
                        reducer = self.reducer_entropy),
                    ]


if __name__ =="__main__":
    FourthJob.run()

## link_stats_simple
Let's look at some summary statistics on the number of unique links on a page
to other Wikipedia articles.  Return the number of articles (count), average
number of links, standard deviation, and the 5%, 25%, median, 75%, and 95%
quantiles.

1. Notice that the library `mwparserfromhell` supports the method
   `filter_wikilinks()`.
2. You will need to compute these statistics in a way that requires O(1)
   memory.  You should be able to compute the first few (i.e. non-quantile)
   statistics exactly by looking at the first few moments of a distribution.
   The quantile quantities can be accurately estimated by using reservoir
   sampling with a large reservoir.
3. If there are multiple links to the article have it only count for 1.  This
   keeps our results from becoming too skewed.
4. Don't forget that some (a surprisingly large number of) links have unicode!
   Make sure you treat them correctly.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import heapq
WORD_RE = re.compile(r"\w+")
START_RE = re.compile('.*<page>.*')
END_RE = re.compile('.*</page>.*')
import xml.etree.ElementTree as ET
import Queue
import mwparserfromhell
import random
import math
import numpy as np

class q5(MRJob):
                  
    def mapper_getpage_init(self):
        self.queue = Queue.Queue()

    def mapper_getpage(self,_,line):
        self.queue.put(line)
        if END_RE.match(line):
            # empty the queue
            page = ''
            while not self.queue.empty():
                page += self.queue.get()
            if START_RE.match(page):
                yield ('page',page)
    
    def reducer_getpage(self,key,pages):
        pagecount = 0
        for page in pages:
            pagecount +=1
            yield ('page',(page,pagecount))
    

        
    def mapper_parser(self,key,val):        
        sum_number_links = 0
        reservoir,c = [],0
        k = 5000
        page, pagecount = val
        tree = ET.fromstring(page.encode('utf-8','ignore'))
        tagtext = [(x.tag, x.text) for x in tree.getiterator()]
        for tag,text in tagtext:
            if tag == 'text':
                if text:
                    wikilinks = mwparserfromhell.parse(text).filter_wikilinks()
                    wikilinks = [ links.encode('utf-8','ignore') for links in wikilinks]
                    x = len(set(wikilinks))
                    sum_number_links += x
                    if c < k:
                        reservoir.append(x)
                    else:
                        r=random.randint(0,c-1)
                        if r<k:reservoir[r] = x
                    c+=1
        yield None,(sum_number_links,pagecount,reservoir)
      
    def reducer_compute(self,_, vals):   
            sum_links = 0
            sum_sq = 0
            page_count_list = []
            reservoir_list = []
            for val in vals:
                link,pagecount,reservoir = val
                sum_links+=link
                sum_sq+=link**2
                page_count_list.append(pagecount)
                reservoir_list.append(reservoir)
            total_page = max(page_count_list)   
            counts = [ link_count for item in reservoir_list for link_count in item]
            count_list = sorted(counts)
            mean = sum_links*1.0/total_page
            std= math.sqrt(sum_sq/total_page-mean**2)
            yield ('page_count',int(total_page))
            yield ('mean',mean)
            yield ('std',std)        
            yield ('5%',count_list[int(round(len(count_list)*0.05)-1)])
            yield ('25%',count_list[int(round(len(count_list)*0.25)-1)])
            yield ('median',count_list[int(round(len(count_list)*0.5)-1)])
            yield ('75%',count_list[int(round(len(count_list)*0.75)-1)])
            yield ('95%',count_list[int(round(len(count_list)*0.95)-1)])
            #yield None,counts




    def steps(self):
        return [
            MRStep(mapper_init = self.mapper_getpage_init,
                    mapper = self.mapper_getpage,
                    reducer = self.reducer_getpage),
           MRStep(
                   mapper = self.mapper_parser,
                   reducer=self.reducer_compute)
                ]

if __name__ == '__main__':
    q5.run()

## link_stats_english Run On Aws
The same thing but for all of English Wikipedia.  This is the real test of how
well your algorithm scales!  The data is also located on
[s3](s3://thedataincubator-course/mrdata/english/).

Running Command: python q5.py -r emr s3://thedataincubator-course/mrdata/simple/ --output-dir=s3://thedataincubator-fellow/xiaojun-wc/q6/

## double_link_stats_simple
Instead of analyzing single links, let's look at double links.  That is, pages
A and C that are connected through many pages B where there is a link 
`A -> B -> C` or `C -> B -> A'. Find the list of all pairs `(A, C)` (you can
use alphabetical ordering to break symmetry) that have the 100 "most"
connections (see below for the definition of "most").  This should give us a
notion that the articles `A` and `C` refer to tightly related concepts.

1. This is essentially a Matrix Multiplication problem.  If the adjacency
   matrix is denoted $M$ (where $M_{ij}$ represents the link between $i$ an
   $j$), we are looking for the highest 100 elements of the matrix $M M$.

2. Notice that a lot of Category pages (denoted "Category:.*") have a high link
   count and will rank very highly according to this metric.  Wikipedia also
   has `Talk:` pages, `Help:` pages, and static resource `Files:`.  All such
   "non-content" pages (and there might be more than just this) and links to
   them should be first filtered out in this analysis.

3. Some pages have more links than others.  If we just counted the number of
   double links between pages, we will end up seeing a list of articles with
   many links, rather than concepts that are tightly connected.

   1. One strategy is to weight each link as $\frac{1}{n}$ where $n$ is the
      number links on the page.  This way, an article has to spread it's
      "influence" over all $n$ of its links.  However, this can throw off the
      results if $n$ is small.

   2. Instead, try weighting each link as $\frac{1}{n+10}$ where 10 sets the
      "scale" in terms of number of links above which a page becomes
      "significant".  The number 10 was somewhat arbitrarily chosen but seems
      to give reasonably relevant results.

   3. This means that our "count" for a pair A,C will be the products of the
      two link weights between them, summed up over all their shared
      connections.

4. Again, if there are multiple links from a page to another, have it only
   count for 1.  This keeps our results from becoming skewed by a single page
   that references the same page multiple times.

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
import heapq
import xml.etree.ElementTree as ET
import Queue
import mwparserfromhell
import random
import math



WORD_RE = re.compile(r"\w+")
START_RE = re.compile('.*<page>.*')
END_RE = re.compile('.*</page>.*')
nocontent = re.compile('(user:.*)|(user\stalk:)|(template:.*)|(category:.*)|(talk:.*)|(help:.*)|(file:.*)|(special:.*)|(user_talk:)')

class q7(MRJob):
                  
    def mapper_getpage_init(self):
        self.queue = Queue.Queue()

    def mapper_getpage(self,_,line):
        self.queue.put(line)
        if END_RE.match(line):
            # empty the queue
            page = ''
            while not self.queue.empty():
                page += self.queue.get()
            if START_RE.match(page):
                yield ('page',page)
    
    def reducer_getpage(self,key,pages):
        for page in pages:
            yield ('page',page)
    

        
    def mapper_parser(self,key,page):        
        tree = ET.fromstring(page.encode('utf-8','ignore'))
        tagtext = [(x.tag, x.text) for x in tree.getiterator()]
        try:
            for tag,text in tagtext:
                if tag =="title":
                    if text:
                        if not nocontent.match(text):
                            title = text  #extract the title of the page
                elif tag == 'text':
                    if text:
                        wikilinks = mwparserfromhell.parse(text).filter_wikilinks()
                        wikilinks = [ links.encode('utf-8','ignore') for links in wikilinks]
                        # extract the title of the page that links connect to                  
                        link_title_list = list(set([str(mwparserfromhell.parse(link).filter_wikilinks()[0].title) for link in wikilinks if not nocontent.match(str(mwparserfromhell.parse(link).filter_wikilinks()[0].title))]))
                        for link in link_title_list:
                            if link!=title:
                                yield "M1",(title, link, 1.0/(len(link_title_list)+10.0))
                                yield "M2",(title, link, 1.0/(len(link_title_list)+10.0))
                            else:
                                continue
        except:
            pass
        
        
    def mapper_emit(self,key,val):
        row,col,v = val
        if key == "M1":
            yield col,(row,v)
        elif key=="M2":
            yield row,((col,v),)
            
            
    def multiply_values(self,j,values):
        brow=[]
        acol=[]
        for val in values:
            if len(val)==1:
                brow.append(val[0])
            else:
                acol.append(val)
                
        for (bcol,bval) in brow:
            for(arow,aval) in acol:
                yield ((arow,bcol),aval*bval)
    
    def reducer_sum(self,key,vals):
        yield key,sum(vals)
        
        
        
    def heap_init(self):
        self.h = []
        
    def mapper_add_to_head(self,key,val):
        heapq.heappush(self.h,(val,key))
        
    def mapper_pop_top_100(self):
        largest = heapq.nlargest(100,self.h)
        for count,word in largest:
            yield ('heap',(count,word))   
            
    def reducer_heap_init(self):
        self.h = []
        
    def reducer_heap_count_words(self, key,word_counts):
        for count,word in word_counts:
            heapq.heappush(self.h, (count,word))
    
    def reducer_pop_top_100(self):
        largest = heapq.nlargest(100,self.h)
        words = [(word,  count) for count,word in largest]
        yield (None, words) 
        
        
    
    def steps(self):
        return [
            MRStep(mapper_init = self.mapper_getpage_init,
                    mapper = self.mapper_getpage,
                    reducer = self.reducer_getpage
                  ),
           MRStep(
                   mapper = self.mapper_parser,
                 ),
            MRStep(
                   mapper = self.mapper_emit,
                   reducer = self.multiply_values
                  ),
            MRStep(reducer = self.reducer_sum),
            MRStep(mapper_init = self.heap_init,
                   mapper = self.mapper_add_to_head,
                   mapper_final = self.mapper_pop_top_100,
                   reducer_init = self.reducer_heap_init,
                   reducer = self.reducer_heap_count_words,
                   reducer_final = self.reducer_pop_top_100),
            #MRStep(reducer = self.reducer_tuple)
                ]

if __name__ == '__main__':
    q7.run()