# CS4650 Topic 9: Map Reduce

There are several ways to run Map-Reduce projects.  One of the interesting ways is using a Jupyter Notebook.  Notebooks provide a really nice way to experiment, try things, then document the results.  In fact, this lecture was written using a Notebook, and there were a lot of iterations as I was experimenting with the code, adjusting the mappers and reducers. 

Another reason for iterations was that, as I looked at the results, I thought of new questions to ask, new calculations to explore.
  
Usually when you iterate in a Notebook, you go back to a Cell and change things a bit, then rerun.  I didn't do that much, rather I duplicated Cells, so that you could see the progression of my experiments.  Sometimes this meant that I was subtlely changing function names, so there wouldn't be collisions.

## MRJob

There is a very useful package called MRJob.  I refer to it as 'Mister Job', but it really means 'Map-Reduce Job'.

With MRJob, you write one function that is the Mapper, another which is the Reducer (and you can also write a Combiner).  You can then run this on a dataset on your computer, right within Notebook.  Or you can connect to the web and run the job on a cluster of computers.  The local machine is usually running sample data, used to debug the code, while the cluster is used for the production run.

To install mrjob, activate the environment to be used, then in the Terminal enter the following:

```
conda install -c conda-forge mrjob
```

The '-c conda-forge' is telling conda where to search for the mrjob package.  Conda has a site with many packages, and this is the default place to find packages.  However, mrjob is not in that default site.

We will see how to use mrjob in just a bit.

## Word Count

For our first example, let's write a program to scan through a large text document, finding all words in that document, then counting how many times each of those words were used.

You may want to create a new directory and a new environment for this project, since we are going to be downloading extra files, and you don't want to clutter up your workspace by having files from different projects all in the same place.

In any event, you will want to create a new Jupyter Notebook.  I've called mine _word-count_.

Let's talk about the source of data for our project:

There is a website, _www.gutenberg.org_, that collects public-domain books, providing these for free.  They've got over 60,000 books.  The books are available in a number of formats, we will use the UTF-8 text format.

These are text documents, so each is just the words.  The old ASCII format is not sufficient, even though all English words can be expressed in ASCII, words from other languages use characters other than A-Z.  Hence, Unicode is used for these books, and UTF-8 is a convenient way to represent Unicode characters.

I've chosen 3 books to use:

1. Alice's Adventures in Wonderland

2. The Adventures of Sherlock Holmes

3. The Count of Monte Cristo

Feel free to experiment on your own, to download different books!

To download _Alice's Adventures in Wonderland_, browse to

```
https://www.gutenberg.org/files/11/11-0.txt
```

This will send the text to your browser.  You can then save the page to a file.  Save it to the file _alice.txt_, in your current project folder.

To download _The Adventures of Sherlock Holmes_, browse to the following, then save the results to the file _sherlock.txt_.

```
https://www.gutenberg.org/files/1661/1661-0.txt
```

Finally, to download _The Count of Monte Cristo_, browse to the following, then save the results to the file _montecristo.txt_.

```
https://www.gutenberg.org/cache/epub/1184/pg1184.txt
```

# Writing the Mapper and Reducer

We are now going to write some Python code, which will be the mapper and the reducer for our MapReduce job.

However, this has to be in a separate file, it cannot be in the Notebook itself.

Why?  Well, if we are going to run the MapReduce on a cluster of computers, then the code for the Mapper and Reducer has to be farmed out to a bunch of computers.

The system could just take the code from one Cell of the Notebook, but maybe the code would extend beyond just one Cell.  And recall that we can define variables and functions in one Cell, but then use these values in other Cells.  So how would the system know which Cells to grab code from for each of the other machines?

Consequently, the code has to be in a separate, stand-alone file.

However, there is an interesting trick that can be employed: You can have a Cell in the Notebook _create the Python file_.  In this way, the Notebook is self-contained.  If the Notebook is shared, the Notebook itself will create that Python file, then use it for the MapReduce job.

There is a special feature of Notebook which allows a Cell to create a file from the contents of the Cell.  Notebook has several features like this, these are called _magic_ commands.

In this case, we will start the Cell (a Code Cell) with the following line:

```
%%file word_count.py
```

This magic line indicates that the contents of the cell (following this magic line) are to be written to the named file.  Later we will see mrjob use this file.

Note that if we re-run this Cell, that file will get re-written.  That is OK!

There is a second trick, which is commonly used, appearing at the end of this Cell.  Here is that code:

```
if __name__ == '__main__':
    WordFrequency.run()
```

That 'WordFrequency' is the class that we will be defining in this Cell.  The above code checks to see if this is being executed as a Cell being run, and if so, it runs an instance of our mrjob subclass.  But later we will discuss what happens if we are executing this on a cluster.  For now, though, we will simply add this to the bottom of the Cell.

The following Cell is the complete implementation of the first version of our mapper/reducer!

In [None]:
%%file word_count.py

from mrjob.job import MRJob

class WordFrequency(MRJob):

    def mapper(self, _, line):
        thelist = line.split()
        for x in thelist:
            yield x, 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    WordFrequency.run()

If you are running this Notebook on your computer, be sure to run that Cell, so that the file will be created.

_Usually when 'running' a Notebook, you run each of the Cells in order._

We will now have a second Cell which runs that newly created Python file:

&nbsp;&nbsp;&nbsp;&nbsp;_Note: the following cell works on my Windows computer.  But on my Mac, the line looks a little different:_

&nbsp;&nbsp;&nbsp;&nbsp;_!python word_count.py -r local alice.txt_

In [None]:
!python word_count.py alice.txt

Be sure to run that Cell as well.

Congratulations, you just ran a MapReduce job!

You can experiment with this a bit, changing it to run Sherlock.txt or montecristo.txt.

## Analyzing the Results

This code ran fine, and we have some results.  

Later we will look at other ways to present the results.

But first we need to do some clean-up.

As I was looking through the results, I found some problems with the data, there were some unfortunate issues with the data:

* If a word is capitalized some places, and not others, that would count as two separate words.  But we would like to ignore the capitalization, we want them to count as one word.

* There are some punctuation characters that are cluttering up the data.  I see, for example, that 'reply', 'reply,', and 'reply.' are in there, being counted as 3 separate words.

* Some compound words include hyphens.  These are a little problematic -- if we simply throw the hyphens away, then the two words are joined into one word, which will look a little odd.  If we convert the hyphens to spaces, the words will be split into two separate words.

* There are also Unicode characters appearing in the words.  These show up as '\u' followed by 4 hexadecimal characters (which are 0-9 and A-F).

* The start of the file has some extra characters and so on, which aren't actually part of the document.

The problems here are specific to this dataset and this project.  For other situations, the data cleanup phase may not be required, or may have different issues.  The general concept is that we may need to do some cleanup of the data.

And there are a number of options:

* We can ignore the problem.

* We can discard entries that are incorrect.

* We can try to fix the problems.

Let's try to fix at least some of these problems!

## Capitalization

The capitalization problem is pretty easy to fix.  In the mapper, we are looking at one line of the input.  For this version of the mapper, we have split the line into individual _tokens_.  Each of these tokens are a String, and the String class knows how to generate a lowercase version of itself.  Thus, we will simply convert all strings to lowercase, then 'box' and 'Box' (and 'bOX') would all be considered the same word.  This will also convert 'Fred' to 'fred', but we can live with that.

We can edit the mapper function as follows:

```
    def mapper(self, _, line):
        thelist = line.split()
        for x in thelist:
            y = x.lower()
            yield y, 1
```

You can go back a couple of Cells and make this change to the mapper.  Then rerun that Cell and also run the Cell that calls Python to see the results.

## Punctuation

The next issue deals with punctuation characters, such as period and comma.  These are included in some tokens, and will confuse our system.  We have a couple of options here:

* We can check to see if the token only has alpha characters, a-z.  If the token has any other characters, we can ignore it.  The following code would do this:

```
    def mapper(self, _, line):
        thelist = line.split()
        for x in thelist:
            y = x.lower()
            if y.isalpha():
                yield y, 1
```

* We can eliminate all characters from the token that are not letters.  This will remove all of the punctuation, but it might lead to other issues.  For example, a unicode-non-ascii character such as \u201c.  Sometimes these are fancy quote characters, so removing them would be fine.  In other cases, these might be a letter with a diacritical.  Removing them would remove the letter.  If we are going to use this approach, we can use regular expressions to filter out the non-letters, as follows: 

In [None]:
%%file word_count.py
import re

from mrjob.job import MRJob

PUNC_RE = re.compile(r"[^a-z]")

class WordFrequency(MRJob):

    def mapper(self, _, line):
        thelist = line.split()
        for x in thelist:
            y = x.lower()
            z = re.sub(PUNC_RE, '', y)
            yield z, 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    WordFrequency.run()

For convenience, I've replicated the Cell to actually run the file:

In [None]:
!python word_count.py alice.txt

## Compound Words, Unicode

Since we are adopting the plan of removing non-alphabetic characters, we have removed hyphens, so compound words are now just run together, and we have removed unicode characters, which may have removed important letters.

If these were important issues, we could write other 'rules' to handle these characters.

For example, if we wanted to replace hyphens with spaces, so the compound words would simply become different words, we could replace '-' with ' ', before we do the split of the line.

For the unicode example, we could search for critical unicode characters, then replace them with the roughly equivalent ASCII character.  This can be done after the split, but should be done before we remove the punctuation.

## Introductory Matter

If you look at the text files we downloaded, you will see that the first several lines of the file are not actually part of the book, but rather provide an introduction to the book.  This lists the author, date of publication, and so on.

This material ends at a line starting with "*** START ".

We can filter this portion of the file by declaring an instance variable for the class, initializing the value to false.  Then, as the mapper scans a line, it checks to see if the line starts with "*** START".  If so, it will set that flag to be true.  If the flag was false, we then do not yield any data.

There is a problem with this, however.  When we distribute the job across a cluster of computers, each processor will have a subset of the data, a portion of the file.  It is only the processor that is processing the very first part of the data that should do this check.  All of the other processors _should not_ do this check, because then they will never see the "*** START", so they will never 'turn on'.

Consequently, for our purposes, we will ignore this, and so the introductory matter will also be added to the word counts.

# In-Class Exercise

Suppose we want to do a different calculation:  Now we want to count the number of words of each length, greater than 3.  So in other words, how many 4-letter words, how many 5-letter words, and so on.  In addition, we want to keep an example of one of the words of that length.  So the output might say:

```
    For 4 letter words, there were 125 of them, and one was "stop"
    For 5 letter words, there were 1264 of them, and one was "right"
```

Actually, the output would actually be closer to something like this:

```
    4, "stop", 125
    5, "right", 1264
```

What would the mapper look like, and what would the reducer look like, for this application?

?

?

?

## The Mapper

In the previous example, for each word the mapper found, it output the word, along with a count of 1, because it had just seen one instance of that word.  The word was the key, and the count was the value.  Because the word was the key, the shuffle, combine, and reduce gathered all of the data for each word (the key), then output a value.

For our new problem, the key will actually be the _length_ of the word, because we want to aggregate and count things based on this length.  But we then need to output _two_ values, the word and the count.

So in the yield, we want to send three things: length, word, and count.  But MapReduce is set to only have two things, a key and a value.

To solve this, we will use the length as the key, but for the value we will make an object that contains two values.  The object will have an example word and it will have a count.  Since we will be communicating these values through streams, as text, we need an object that can be serialized as JSON.  If we use a Python dictionary, the object will be automatically serialized as JSON, and at the other end, automatically reconstructed as a dictionary.

The output code for the mapper will therefore look like this:

```
                yield length, {"example":word, "count":1}
```

The mapper returns two things, the key (which is the length), and a dictionary with two values, an example word and the count.

In the previous example, the reducer received keys, which were the words, and a list of counts.  It simply summed up all of the counts, then emitted the key (word) and the total count.

For the new problem, the reducer receives a key, which is the length of the word, and a list of dictionaries.  The reducer will take one of the example words (probably either the first or the last), and it will sum up the counts.  It will then return a key, which is the same length as was in the input, and a value, which is a dictionary holding an example word and the total count for that word.

We will change the name of the file, since we are computing a different result.

In [None]:
%%file word_length_count.py
import re
import json

from mrjob.job import MRJob

PUNC_RE = re.compile(r"[^a-z]")

class WordLengthFrequency(MRJob):

    def mapper(self, _, line):
        thelist = line.split()
        for x in thelist:
            y = x.lower()
            z = re.sub(PUNC_RE, '', y)
            w = len(z)
            if w > 3:
                yield w, {"example":z, "count":1}

    def reducer(self, key, values):
        example = ''
        count = 0
        for x in values:
            example = x["example"]
            count += x["count"]
        if len(example) > 0:
            yield key, {"example":example, "count":count}


if __name__ == '__main__':
    WordLengthFrequency.run()

In [None]:
!python word_length_count.py alice.txt

Look carefully at the results of this run.  It looks like many of the longest words are actually related to the website from which the books were downloaded.  And also note that the counts of these length of words is very small, usually 1 or 2.  It looks like we should have spent the effort to remove the introductory matter from the books!

# Weather Data

We will look at one additional example of using MapReduce in a Notebook.  The problem we will solve came from our Hadoop textbook, from Chapter 2.  This example looks through weather data, looking for the largest temperature for each year.

In one appendix of the book, they gave instructions for downloading two years worth of data, which we can use as our sample data.  This is the weather data for 1901 and 1902.

The code we will use here is a little different from the example code in the book, primarily because we are using Python instead of Java.  However, we are also using mrjob, which interfaces to Hadoop for us, rather that using Hadoop directly.

We could have used Hadoop in this class.  I did download Hadoop to my computer, and was setting it up.  But I realized it would take a lot more effort and a lot more disk space.  So we are using Notebooks.  The concepts you learn will apply whichever approach you use in the future.

In [None]:
%%file max_temperature.py
import re
import json

from mrjob.job import MRJob

QUALITY_RE = re.compile(r"[01459]")

class MaxTemperature(MRJob):

    def mapper(self, _, line):
        val = line.strip()
        (year, temp, q) = (val[15:19], val[87:92], val[92:93])
        if (temp != "+9999" and re.match(QUALITY_RE, q)):
            yield year, int(temp)

    def reducer(self, key, values):
        yield key, max(values)

if __name__ == '__main__':
    MaxTemperature.run()

In [None]:
!python max_temperature.py --no-bootstrap-mrjob 1901 1902

Now let's try to get these values for each month rather than just one number per year.

In [None]:
%%file max_monthly_temperature.py
import re
import json

from mrjob.job import MRJob

QUALITY_RE = re.compile(r"[01459]")

monthName = ["Jan-", "Feb-", "Mar-", "Apr-", "May-", "Jun-",
            "Jul-", "Aug-", "Sep-", "Oct-", "Nov-", "Dec-"];

class MaxMonthlyTemperature(MRJob):
    
    def mapper(self, _, line):
        val = line.strip()
        (year, month, temp, q) = (val[15:19], val[20:21], val[87:92], val[92:93])
        if (temp != "+9999" and re.match(QUALITY_RE, q)):
            yield monthName[int(month) - 1] + str(year), int(temp)

    def reducer(self, key, values):
        yield key, max(values)

if __name__ == '__main__':
    MaxMonthlyTemperature.run()

In [None]:
!python max_monthly_temperature.py 1901 1902