## Chapter 23. MapReduce

**MapReduce** = programming model for performing **parallel processing** on large data sets = powerful technique w/ basics = relatively simple. Imagine we have a collection of items we’d like to process somehow (website logs, texts of various books, image files, etc.). A basic version of the MapReduce algorithm consists of the following steps:
1. Use a **mapper/map function** to turn each item into 0 or more KV-pairs
2. Collect together all pairs w/ identical keys.
3. Use a **reducer function** on each collection of grouped values to produce output
values for the corresponding key.

Specific example: There're few *absolute rules* of data science, but 1 = your 1st MapReduce example has to involve counting words. Our site has grown to millions of users = great for job security, but it makes routine analyses slightly more difficult. VP of Content wants to know what sorts of things people are talking about in status updates. As a 1st attempt, you decide to count the words that appear so that you can prepare a report on the most frequent ones. When you had a few hundred users this was simple to do:


In [1]:
import sys
import math
import re
#from naive_bayes import tokenize
from collections import Counter

sys.path.insert(0, './../../../00_DataScience/DSFromScratch/code')

def tokenize(message):
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract words
    return set(all_words)                           # remove duplicates

def word_count_old(docs):
    """Counts words NOT using MapReduce"""
    return Counter(word for doc in docs for word in tokenize(doc))

W/ millions of users, the set of documents (status updates) is suddenly too big to fit on
your CPU. If you can just fit this into the **MapReduce model**, you can use some “big
data” infrastructure our engineers have implemented.
1st, we need a function that turns a document into a sequence of KV-pairs, where keys = words b/c we want our output to be grouped by word, + for each word, emit the value `1` to indicate this pair corresponds to 1 occurrence of the word:

In [2]:
def wrd_cnt_mapper(doc):
    """For each word in the document, emit `(word,1)`"""
    for word in tokenize(doc):
        yield (word, 1) # yield = like 'return' but returns a generator

Skipping the “plumbing” step 2 for the moment, imagine that for some word, we’ve collected a list of corresponding counts emitted. Then, to produce the overall count for that word we need:

In [3]:
def wrd_cnt_reducer(word,counts):
    """Sum up the counts for a word"""
    yield (word,sum(counts))

Returning to step 2: need to collect results from `wc_mapper()` + feed them to `wc_reducer`. Let’s think about how we would do this on just one CPU:

In [4]:
from collections import defaultdict
def word_count(docs):
    """Count the word in the list of input documents via MapReduce"""
    # store grouped values
    collector = defaultdict(list)
    
    for doc in docs:
        for word,count in wrd_cnt_mapper(doc):
            collector[word].append(count)
    
    return [output for word,counts in collector.items()
           for output in wrd_cnt_reducer(word,counts)]

Imagine we have 3 documents `["data science", "big data", "science fiction"]`. Then, `wc_mapper` applied to the 1st document yields the 2 pairs `("data", 1)` + `("science", 1)`. After we’ve gone through all 3 documents, the collector contains 3 lists of KV-pairs of words and counts, then `wc_reducer` produces the count for each word:

In [5]:
word_count(["data science", "big data", "science fiction"])

[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]

### Why MapReduce?

Primary benefit of **MapReduce = allows us to distribute computations by moving the processing to the data**. Imagine we want to word-count across billions of docs.
Our original (non-MapReduce) approach requires the CPU doing the processing to actually have access to every doc = means docs all need to either live on that machine or else be transferred to it during processing. More important, it means that the machine can only process 1 document at a time.

***NOTE***L Possibly it can process up to a few at a time if it has multiple cores + if code is rewritten to take advantage of them. But even so, all the docs still have to get to that machine.

Imagine our billions of docs are scattered across 100 machines. W/ the right infrastructure (+ glossing over some details), we can do the following:

* Have each machine run the mapper on its own docs, producing lots of (K,V) pairs.
* Distribute those (K, V) pairs to a # of “reducing” machines, *making sure the pairs corresponding to any given key all end up on the same machine.*
* Have each reducing machine group pairs by key + then run the reducer on each set of values.
* Return each (key, output) pair.

What is amazing about this = **it scales horizontally**. If we double the # of machines, then (ignoring certain fixed-costs of running a MapReduce system) our computation should run approximately 2x as fast. Each mapper machine will only need to do 1/2 as much work, + (assuming there're enough distinct keys to further distribute reducer work) the same is true for the reducer machines.

### MapReduce More Generally
If you think about it for a minute, all of the word-count-specific code in previous
example = contained in the `wrd_cnt_mapper` + `wrd_cnt_reducer` functions. This means that w/ a couple of changes, we have a much more general framework (that still runs on a single machine):

In [6]:
docs = ["data science", "big data", "science fiction"]

def map_reduce(inputs,mapper,reducer):
    """Runs MapReduce on the inputs using given mapper and reducer"""
    collector = defaultdict(list)
    
    for input in inputs:
        for k,v in mapper(input):
            collector[k].append(v)
            
    return [output for k,vs in collector.items()
            for output in reducer(k,vs)]

# count words
word_counts = map_reduce(docs,wrd_cnt_mapper,wrd_cnt_reducer)
print(word_counts)

[('data', 2), ('science', 2), ('big', 1), ('fiction', 1)]


This gives us the flexibility to solve a wide variety of problems. Before we proceed, observe `wc_reducer()` = just summing the values corresponding to each key. This kind of aggregation is common enough that it’s worth abstracting it out:

In [7]:
from functools import partial

def reduce_values_using(aggregation_fn,k,vs):
    """Reduces a K-Vs pairs by appying aggregation_fn to the values"""
    yield (k, aggregation_fn(vs))

def values_reducer(aggregation_fn):
    """Turns a function(values -> output) into a reduce that
    maps (K, V's) -> (K, outputs)"""
    return partial(reduce_values_using, aggregation_fn)

sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)

count_distinct_reducer = values_reducer(lambda values: len(set(values)))

#print(count_distinct_reducer)

### Example: Analyzing Status Updates

Content VP was impressed w/ the word counts + asks what else you can learn from people’s status updates. You manage to extract a data set of status updates that look like:

In [8]:
import datetime

status_updates = [{"id": 1,
                   "username" : "joelgrus",
                   "text" : "Is data anyone interested in a data science book, data baby?",
                   "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
                   "liked_by" : ["data_guy", "data_gal", "mike"]}]

Let’s say we need to figure out which day of the week people talk the most about data
science. To find this, count how many "data science updates" on each day of the week = *group by the day of week* = our key. If we emit a value = 1 for each update that contains “data science,” can simply get the total number using `sum`:

In [9]:
def ds_day_mapper(status_update):
    """Yields (DOW,1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        dow = status_update["created_at"].weekday()
    
    yield(dow,1)

ds_days = map_reduce(status_updates, ds_day_mapper, sum_reducer)
print(ds_days)

[(5, 1)]


As a slightly more complicated example, imagine we need to find out, for each user, the
most common word they put in their status updates. There are 3 possible approaches that spring to mind for the mapper:

* Put username = key; put words + counts = values.
* Put word = key; put usernames + counts = values.
* Put username + word = key; put counts = values.

If you think about it a bit more, definitely want to group by `username`, b/c we want to consider each person’s words separately, + we don’t want to group by `word`, since our `reducer` will need to see all words for each person to find out the most popular. This means the *1st option is the right choice*:


In [10]:
def words_per_user_mapper(status_update):
    
    user = status_update["username"]
    
    for word in tokenize(status_update["text"]):
        yield(user, (word, 1))

def most_popular_word_reducer(user,words_and_counts):
    """Given a sequence of (word,count) pairs,
    return word with highest total count"""
    word_counts = Counter()
    
    for word,count in words_and_counts:
        word_counts[word] += count
        
    word,count = word_counts.most_common(1)[0]
    
    yield(user,(word,count))

user_words = map_reduce(status_updates, words_per_user_mapper, most_popular_word_reducer)
print(user_words)

[('joelgrus', ('science', 1))]


Or we could find out the number of distinct status-likers for each user:

In [11]:
def liker_mapper(status_update):
    user = status_update["username"]
    for liker in status_update["liked_by"]:
        yield(user,liker)

dist_likers_per_user = map_reduce(status_updates,liker_mapper,count_distinct_reducer)
print(dist_likers_per_user)

[('joelgrus', 3)]


### Example: Matrix Multiplication
Recall from “Matrix Multiplication” that given a `m*n` matrix A + `n*k` matrix B B, we multiply them to form a `m*k` matrix C, where the element of C in row i + column j is given by:

* **`C_ij = A_i1B_1j + A_i2B_2j + ... + A_inB_nj`**

As we’ve seen, a “natural” way to represent an `m*n` matrix is with a `list` of `list`s,
where the element `A_ij` = jth element of ith list.

But large matrices are sometimes **sparse = most of elements = 0**. For large sparse matrices, a list of lists can be a very wasteful representation. **A more compact representation = a list of tuples (name, i, j, value)** where `name` ID's the matrix, +  `i`, `j`, `value` indicates a location w/ non-zero value.

Ex: Billion × billion matrix has a quintillion entries, which would not be easy to store on a CPU. But if there are only a few non-zero entries in each row, this alternative representation is many orders of magnitude smaller.

**Given this sort of representation, it turns out we can use MapReduce to perform matrix multiplication in a distributed manner.** To motivate our algorithm, notice each element `A_ij` is only used to compute the elements of C in row `i`, + each element `B_ij`is only used to compute the elements of C in column `j`. 

**Goal = each output of our `reducer` = a single entry of C**, which means we’ll need our `mapper` to emit keys IDing a single entry of C. This suggests the following:

In [12]:
def mtx_multiply_mapper(m,element):
    """m is the common dimension (cols of A, rows of B), and 
    element is a tuple (mtx_name, i, j, value)"""
    mtx, i, j, value = element
    
    if mtx == "A":
        # A_ij = jth entry in su, for each C_ik, k=1..m
        for col in range(m):
            # group with other entries for C_ij = C_i_col
            yield((i,col), (j,value))
    else:
        # B_ij = ith entry in su, for each C_kj = C_row_j
        for row in range(m):
            # group with other entries for C_kj = C_row_j
            yield((row,j), (i,value)) 
            
def mtx_multiply_reducer(m,key,indexed_values):
    results_by_index = defaultdict(list)
    
    for index,value in indexed_values:
        results_by_index[index].append(value)
        
    # sum up all products of the positions w/ 2 results
    sum_product = sum(results[0]*results[1]
                     for results in results_by_index.values()
                     if len(results) == 2)
    
    if sum_product != 0:
        yield(key,sum_product)

        # matrices
A = [[3,2,0],
     [0,0,0]]
B = [[4,-1,0],
    [10,0,0],
    [0,0,0]]

# matrices as tuples
entries = [("A",0,0,3),("A",0,1,2),
          ("B",0,0,4),("B",0,1,-1),("B",1,0,10)]

mapper = partial(mtx_multiply_mapper,3)
reducer = partial(mtx_multiply_reducer,3)

map_reduce(entries,mapper,reducer)

[((0, 0), 32), ((0, 1), -3)]

This isn’t terribly interesting on such small matrices, but if you had millions of rows + millions of columns, it could help you a lot.

### An Aside: Combiners
1 thing you've probably noticed: many of our `mappers` seem to include a bunch of extra info. For example, when counting words, rather than emitting `(word, 1)` + summing over the values, we could've emitted `(word, None)` + just taken the `length`.

1 reason we didn’t do this = in the distributed setting, we sometimes want to use **combiners** to reduce the amount of data that has to be transferred around from machine to machine. If 1 of our mapper machines sees the word “data” 500 times, we can tell it to
combine the 500 instances of ("data", 1) into a single ("data", 500) before handing off to the reducing machine = results in a lot less data getting moved around, which can make our algorithm substantially faster still. B/c of the way we wrote our `reducer`, it would handle this combined data correctly. (If we’d written it using `len` it would not have.)

### For Further Exploration
* Most widely used MapReduce system = **Hadoop**, which itself merits many books. There are various commercial and noncommercial distributions and a huge ecosystem of Hadoop-related tools.
    * In order to use it, must set up your own cluster (or find someone to let you use theirs), which is not necessarily a task for the faint-hearted. Hadoop mappers + reducers are commonly written in Java, although there is a facility known as “Hadoop streaming” that allows you to write them in other languages (including Python).
* Amazon.com offers an Elastic MapReduce service that can programmatically create + destroy clusters, charging you only for the amount of time that you’re using them.
* mrjob = a Python package for interfacing with Hadoop (or Elastic MapReduce).
* Hadoop jobs = typically high-latency, which makes them a poor choice for “realtime” analytics. There are various “real-time” tools built on top of Hadoop, but there are also several alternative frameworks that are growing in popularity. 2 of the most popular = **Spark** and **Storm**.

All that said, by now it’s quite likely that the flavor of the day is some hot new
distributed framework that didn’t even exist when this book was written. You’ll have to
find that one yourself.