# MapReduce

MapReduce is a programming model for performing parallel processing on large data sets.  Although it is a powerful technique, its basics are relatively simple.  Imagine we have a collection of items we'd like to process somehow.  For instance, the items might be website logs, the texts of various books, image files, etc.  A basic version of the MapReduce algorithm consists of the following steps:

1. Use a mapper function to turn each item into zero or more key-value pairs.
2. Collect together all the pairs with identical keys.
3. Use a reducer function on each collection of grouped values to produce output values for the corresponding keys.

This is all sort of abstract, so let's look at a specific example.  There are few absolute rules of data science, but one of them is that your first MapReduce example has to involve counting words.

when you have a small data set this is simple to do:

In [2]:
import math, random, re, datetime
from collections import defaultdict, Counter
from functools import partial
from code_python3.naive_bayes import tokenize

def word_count_old(documents):
    """word count not using MapReduce"""
    return Counter(word
        for document in documents
        for word in tokenize(document))

With a massive data set, the set of documents is suddenly too big to fit on your computer.  If you can just fit this into the MapReduce model, you can use some "big data" infrastructure that your engineers have implemented.  First, we need a function that turns a document into a sequence of key-value pairs.  We'll want our output to be grouped by word, which means that the keys should be words.  And for each word, we'll just emit the value 1 to indicate that this pair corresponds to one occurence of the word:

In [3]:
def wc_mapper(document):
    """for each word in the document, emit (word,1)"""
    for word in tokenize(document):
        yield (word, 1)

Skipping the plumbing step 2 for the moment, imagine that for some word we've collected a list of the corresponding counts we emitted.  Then to produce the overall count for that word we just need:

In [4]:
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

Returning to step 2, we now need to collect the results from `wc_mapper` and feed them to `wc_reducer`.  Let's think about how we would do this on just one computer:

In [5]:
def word_count(documents):
    """count the words in the input documents using MapReduce"""

    # place to store grouped values
    collector = defaultdict(list)

    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)

    return [output
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

Image that we have three documents `["data science", "big data", "science fiction"]`.  Then `wc_mapper` applied to the first document yieldxs the two pairs `("data", 1)` and `("science", 1)`.  After we've gone through all three documents, the collector contains:

In [6]:
documents = ["data science", "big data", "science fiction"]

wc_mapper_results = [result
                     for document in documents
                     for result in wc_mapper(document)]

print("wc_mapper results")
print(wc_mapper_results)
print()

print("word count results")
print(word_count(documents))
print()

wc_mapper results
[('data', 1), ('science', 1), ('big', 1), ('data', 1), ('science', 1), ('fiction', 1)]

word count results
[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]



With the right infrastructure, we can do the following:

- Have each machine run the mapper on its documents, producing lots of key-value pairs.
- Distribute those key-value pairs to a number of "reducing" machines, making sure that the pairs corresponding to any given key all end up on the same machine.
- Have each reducing machine group the pairs by key and then run the reducer on each set of values.
- Return each key-output pair.

What is amazing about this is that it scales horizontally.  If we double the number of machines, then our computation should run approximately twice as fast.  Each mapper machine will only need to do half as much work, and (assuming there are enough distinct keys to further distribute the reducer work) the same is true for the reducer machines.

If you think about it for a minute, all of the word-count specific code in the previous example is contained in the `wc_mapper` and `wc_reducer` functions.  This means that witha  couple of changes, we have a much more general framework:

In [7]:
def word_count(documents):
    """count the words in the input documents using MapReduce"""

    # place to store grouped values
    collector = defaultdict(list)

    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)

    return [output
            for word, counts in collector.items()
            for output in wc_reducer(word, counts)]

def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on the inputs using mapper and reducer"""
    collector = defaultdict(list)

    for input in inputs:
        for key, value in mapper(input):
            collector[key].append(value)

    return [output
            for key, values in collector.items()
            for output in reducer(key,values)]

And then we can count words simply by using:

In [8]:
print("word count using map_reduce function")
print(map_reduce(documents, wc_mapper, wc_reducer))
print()

word count using map_reduce function
[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]



This gives us the flexibility to solve a wide variety of problems.  Before we proceed, observe that `wc_reducer` is just summing the values corresponding to each key.  This kind of aggregation is common enough that it's worth abstracting it out:

In [11]:
def reduce_with(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn to the values"""
    yield (key, aggregation_fn(values))

def values_reducer(aggregation_fn):
    """turns a function (values -> output) into a reducer"""
    return partial(reduce_with, aggregation_fn)

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

Here's a way to analyze status updates.  Given:

In [12]:
status_updates = [
    {"id": 1,
     "username" : "joelgrus",
     "text" : "Is anyone interested in a data science book?",
     "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "data_gal", "bill"] },
    # add your own
]

Let's say we need to figure out which day of the week people talk the most about data science.  In order to find this, we'll just count how many data science updates there are on each day of the week.  This means we'll need to group by the day of week, so that's our key.  And if we emit a value of 1 for each update that contains "data science", we can simply get the total number using `sum`:

In [14]:
def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)

data_science_days = map_reduce(status_updates,
                               data_science_day_mapper,
                               sum_reducer)

print("data science days")
print(data_science_days)
print()

data science days
[(5, 1)]



As a slightly more complicated example, imagine we need to find out for each user the most common word that she puts into her status updates.  There are three possible approaches that spring to mind for the `mapper`:

- Put the username in the key; put the words and counts in the values.
- Put the word in key; put the usernames and counts in the values.
- Put the username and word in the key; put the counts in the values.

If you think about it a bit more, we definitely want to group by `username` because we want to consider each person's words separately.  And we don't want to group by `word` since our reducer will need to see all the words for each person to find out which is the most popular.  This means that the first option is the right choice:

In [16]:
def words_per_user_mapper(status_update):
    user = status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user, (word, 1))

def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs,
    return the word with the highest total count"""

    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count

    word, count = word_counts.most_common(1)[0]

    yield (user, (word, count))

user_words = map_reduce(status_updates,
                        words_per_user_mapper,
                        most_popular_word_reducer)

print("user words")
print(user_words)
print()

user words
[('joelgrus', ('in', 1))]



Or we could find out the number of distinct status-likers for each user:

In [17]:
def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user, liker)

distinct_likers_per_user = map_reduce(status_updates,
                                      liker_mapper,
                                      count_distinct_reducer)

print("distinct likers")
print(distinct_likers_per_user)
print()

distinct likers
[('joelgrus', 3)]



Given a $ m x n $ matrix *A* and a $ n x k $ matrix *B*, we can multiply them  to form a $ m x k $ matrix *C*, where the element of C in row *i* and column *j* is given by:

$$ C_ij = A_i1B_1j + A_i2B_2j + ... + A_inB_nj $$

A natural way to reprent this kind of matrix is with a list of lists.  But large matrices are sometimes sparse, which means that most of their elements equal zero.  For large sparse matrices, a list of lists can be a very wasteful representation.  A more compact representation is a list of tuples `(name, i, j, value)` where `name` identifies the matrix, and where `i, j, value` indicates a location with nonzero value.

For example, a billion x billion matrix has a quintillion entries, which would not be easy to store on a computer.  But if there are only a few nonzero entries in each row, this alternativen representation is many orders of magnitude smaller.  Given this sort of representation, it turns out that we can use MapReduce to perform matrix multiplication in a distributed manner.

To motivate our algorithm, notice that each element is only used to compute the elements of the matrix iby row, and each second tuple element is only used to compute the elements of the matrix by column.  Our goal will be for each output of our reducer to be a single entry of the matrix, which means that we'll need our mapper to emit keys identifying a single entry of the matrix.  This suggests the following:

In [18]:
def matrix_multiply_mapper(m, element):
    """m is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    matrix, i, j, value = element

    if matrix == "A":
        for column in range(m):
            # A_ij is the jth entry in the sum for each C_i_column
            yield((i, column), (j, value))
    else:
        for row in range(m):
            # B_ij is the ith entry in the sum for each C_row_j
            yield((row, j), (i, value))

def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)

    # sum up all the products of the positions with two results
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)

    if sum_product != 0.0:
        yield (key, sum_product)

For example, if you had the two matrices

```python
A = [[3, 2, 0],
     [0, 0, 0]]

B = [[4, -1, 0],
     [10, 0, 0],
     [0, 0, 0]]
```

You could rewrite them as tuples:

In [21]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
       ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

print("map-reduce matrix multiplication")
print("entries:", entries)
print("result:", map_reduce(entries, mapper, reducer))

map-reduce matrix multiplication
entries: [('A', 0, 0, 3), ('A', 0, 1, 2), ('B', 0, 0, 4), ('B', 0, 1, -1), ('B', 1, 0, 10)]
result: [((0, 0), 32), ((0, 1), -3)]
