## MapReduce

In this assignment, we will explore how to use some of the methods we have learned about, but with Big Data.

MapReduce is a programming model for performing parallel processing on Big Data. It is powerful, yet relatively simple.

There are two basic steps:
1. _Mapper_ - Turn each item in zero or more key-value pairs.
2. _Reducer_ - Produce output values by grouping together values from each corresponding key.

In [1]:
# Some necessary code
from collections import defaultdict, Counter
import re, datetime
from functools import partial

def tokenize(message):
    message = message.lower()                       # convert to lowercase
    all_words = re.findall("[a-z0-9']+", message)   # extract the words
    return (all_words)                           # remove duplicates

In [2]:
# Old way of counting words
def word_count_old(documents):
    """word count not using MapReudce"""
    return Counter(word
                  for document in documents
                  for word in tokenize(document))

In [3]:
documents = ['data science','data wrangling','data mining','houghton mining','huskies number one','i love formula one','big data']
print(word_count_old(documents))

Counter({'data': 4, 'mining': 2, 'one': 2, 'science': 1, 'wrangling': 1, 'houghton': 1, 'huskies': 1, 'number': 1, 'i': 1, 'love': 1, 'formula': 1, 'big': 1})


This old way of counting words works fine for 100s of documents, but not millions. We need a way to distribute the documents to several different computers, each of them counting, and then returning the counts to a central hub. MapReduce

In [4]:
# The mapper functions maps the task
def wc_mapper(document):
    """for each word in document, emit (word,1)"""
    for word in tokenize(document):
        yield (word,1) # Key value pair -- CHECK WHAT YIELD DOES
        
# The reducer function collects the results
def wc_reducer(word, counts):
    """sum up the counts for a word"""
    yield (word, sum(counts))

This is fine when you have a parallel processing environment and later we will learn about cloud computing and how to use MapReduce with the cloud. But for now, let's simulate this with our one computer.

In [5]:
def word_count(documents):
    """count the words in the input documents using MapReduce"""
    
    # place to store grouped values
    collector = defaultdict(list)
    
    for document in documents:
        for word, count in wc_mapper(document):
            collector[word].append(count)
            
    # add a statement to print the collector here
            
    return [output
            # replace items() with iteritems() if you get an error
           for word, counts in collector.items()
           for output in wc_reducer(word,counts)]

Create a list of documents where there is some overlap in the words in each document (don't use more than a total of about 5-6 words). Use your word_count function on this list.

e.g., ["data science", "big data", "science fiction", "data mining"]

Add a print statment to the function so you can see the values in collector after the mapper function has run.

What if a document has more than one occurence of a word? e.g., "data data science" Can you alter the tokenize function to fix this problem?

In [6]:
# make a list of documents here
word_count(documents)

[('data', 4),
 ('science', 1),
 ('wrangling', 1),
 ('mining', 2),
 ('houghton', 1),
 ('huskies', 1),
 ('number', 1),
 ('one', 2),
 ('i', 1),
 ('love', 1),
 ('formula', 1),
 ('big', 1)]

Let's create a more general MapReduce function now. Hint: This function will look nearly identical to the word_count function above with some substitutions.

Rather than (word, count), it should be (key, value) to be more general.

In [7]:
def map_reduce(inputs, mapper, reducer):
    """runs MapReduce on input using functions mapper and reducer"""
    collector = defaultdict(list)
    
    # write a for loop over the inputs that calls mapper
    for obj in inputs:
        for key,value in mapper(obj):
            collector[key].append(value)
    
    # write a return statement that calls the reducer
    return[output
          for key,value in collector.items()
          for output in reducer(key,value)]

If all went well, you should be able to call the word count with the code below.

In [8]:
word_counts = map_reduce(documents, wc_mapper, wc_reducer)
print(word_counts)

[('data', 4), ('science', 1), ('wrangling', 1), ('mining', 2), ('houghton', 1), ('huskies', 1), ('number', 1), ('one', 2), ('i', 1), ('love', 1), ('formula', 1), ('big', 1)]


Let's also create more a more general reducer function, where we can change the aggregation that used; e.g., sum, max, or min, etc.

In [9]:
def reduce_values_using(aggregation_fn, key, values):
    """reduces a key-values pair by applying aggregation_fn"""
    yield (key, aggregation_fn(values))
    
def values_reducer(aggregation_fn):
    """turns a functions (values->output) into a reducer that
    maps (key, values)->(key, output)"""
    return partial(reduce_values_using, aggregation_fn)

In [10]:
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reducer = values_reducer(lambda values: len(set(values)))

Try doing the word count with your documents using each of the reducers above. Do you get the results you expected? What do you think is going on?

In [11]:
word_counts = map_reduce(documents, wc_mapper, sum_reducer)
print(word_counts)

word_counts = map_reduce(documents, wc_mapper, max_reducer)
print(word_counts)

word_counts = map_reduce(documents, wc_mapper, min_reducer)
print(word_counts)

word_counts = map_reduce(documents, wc_mapper, count_distinct_reducer)
print(word_counts)

[('data', 4), ('science', 1), ('wrangling', 1), ('mining', 2), ('houghton', 1), ('huskies', 1), ('number', 1), ('one', 2), ('i', 1), ('love', 1), ('formula', 1), ('big', 1)]
[('data', 1), ('science', 1), ('wrangling', 1), ('mining', 1), ('houghton', 1), ('huskies', 1), ('number', 1), ('one', 1), ('i', 1), ('love', 1), ('formula', 1), ('big', 1)]
[('data', 1), ('science', 1), ('wrangling', 1), ('mining', 1), ('houghton', 1), ('huskies', 1), ('number', 1), ('one', 1), ('i', 1), ('love', 1), ('formula', 1), ('big', 1)]
[('data', 1), ('science', 1), ('wrangling', 1), ('mining', 1), ('houghton', 1), ('huskies', 1), ('number', 1), ('one', 1), ('i', 1), ('love', 1), ('formula', 1), ('big', 1)]


Let's explore more by looking at social network status updates.

In [12]:
status_updates = [
    {"id": 1, 
     "username" : "joelgrus", 
     "text" : "Is anyone interested in a data science book?",
     "created_at" : datetime.datetime(2013, 12, 21, 11, 47, 0),
     "liked_by" : ["data_guy", "data_gal", "bill"] },
    {"id": 2, 
     "username" : "sriharirao", 
     "text" : "what is cognitive science?",
     "created_at" : datetime.datetime(2015, 12, 26, 11, 47, 0),
     "liked_by" : ["meghana", "hari", "hari"] },
    {"id": 3, 
     "username" : "linkin", 
     "text" : "what is IBM Watson?",
     "created_at" : datetime.datetime(2016, 11, 19, 10, 34, 0),
     "liked_by" : ["data_guy", "niks", "prer"] },
    {"id": 4, 
     "username" : "staycalm", 
     "text" : "I love that data science book?",
     "created_at" : datetime.datetime(2009, 7, 16, 11, 49, 0),
     "liked_by" : ["srihari", "vivek", "bill"] }
    # add your own
]

Let's create a mapper that counts the number of times "data science" is mentioned per day of the week.

In [13]:
def data_science_day_mapper(status_update):
    """yields (day_of_week, 1) if status_update contains "data science" """
    if "data science" in status_update["text"].lower():
        day_of_week = status_update["created_at"].weekday()
        yield (day_of_week, 1)
        
data_science_days = map_reduce(status_updates,data_science_day_mapper,sum_reducer)
print(data_science_days)

[(5, 1), (3, 1)]


Let's imagine another task. Let's say we want to profile each user by the most common word they put their status update. There are really three possible approaches. Which is right?
1. key is username, values are words and counts
2. key is word, values are usernames and counts
3. key us username and word, values are counts

Let's define a mapper and reducer for this task.

In [14]:
def words_per_user_mapper(status_update):
    user=status_update["username"]
    for word in tokenize(status_update["text"]):
        yield (user,(word,1))
            
def most_popular_word_reducer(user, words_and_counts):
    """given a sequence of (word, count) pairs, 
    return the word with the highest total count"""
    
    word_counts = Counter()
    for word, count in words_and_counts:
        word_counts[word] += count
        
    word, count = word_counts.most_common(1)[0]
    yield (user,(word,count))
    

user_words = map_reduce(status_updates,
                        words_per_user_mapper, 
                        most_popular_word_reducer)

print(user_words)

[('joelgrus', ('is', 1)), ('sriharirao', ('what', 1)), ('linkin', ('what', 1)), ('staycalm', ('i', 1))]


Now you create a mapper for finding out the number of distinct status-likers for each user.

In [15]:
def liker_mapper(status_update):
    """return (user,liker) pairs"""
    user=status_update["username"]
    for liker in status_update["liked_by"]:
        yield (user,liker)
    
distinct_likers_per_user = map_reduce(status_updates,
                                     liker_mapper,
                                     count_distinct_reducer)

print(distinct_likers_per_user)

[('joelgrus', 3), ('sriharirao', 2), ('linkin', 3), ('staycalm', 3)]


Let's end this lesson by defining a mapper and reducer for matrix multiplication. Let's assume an $m\times n$ matrix $A$, and an $n\times k$ matrix $B$.

$C_{ij} = \sum_{l=1}^n A_{il}B_{lj}$

Assume the matrices
A = [[3, 2, 0],
    [0, 0, 0]]
B = [[4, -1, 0],
    [10, 0, 0],
    [0, 0, 0]]
are stored in a common list organized as so:

entries = [("A",0,0,3), ("A",0,1,2),
            ("B",0,0,4), ("B",0,1,-1), ("B",1,0,10)]

Our mapper will return the key-value pair ((row,col) of $C$, (col of $A$, value of $A$)) for elements of $A$ and ((row,col) of $C$, (row of $B$, value of $B$)) for elements of $B$.

In [16]:
def matrix_multiply_mapper(n, element):
    """n is the common dimension (columns of A, rows of B)
    element is a tuple (matrix_name, i, j, value)"""
    matrix, i, j, value = element

    # if matrix is A then output the key-value pairs ((i,column), (j,value)) over all columns of C
    if matrix == 'A':
        for column in range(n):
            yield((i, column), (j,value))
    
    # else if matrix is B then output the key-value pairs ((row, j), (i, value)) over all rows of C
    else:
        for row in range(n):
            yield ((row,j),(i,value))
     
def matrix_multiply_reducer(n, key, indexed_values):
    results_by_index = defaultdict(list)
    
    # this reducer works the same as the word count reducer,
    # collecting all the pairs of A and B for each element of C
    for index, value in indexed_values:
        results_by_index[index].append(value)

    # sum up all the products of the positions with two (non-zero) results
    sum_product = sum(results[0]*results[1] for results in results_by_index.values() if len(results) == 2)
                      
    # finally if the terms are != 0 then yield (key, value), where value is the result of the sum-product
    if sum_product !=0:
        yield (key, sum_product)


Once you have your mapper and reducer finished. Try it out.

In [17]:
entries = [("A", 0, 0, 3), ("A", 0, 1,  2),
           ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3) # what does partial do here?
reducer = partial(matrix_multiply_reducer, 3)
map_reduce(entries,mapper,reducer) # [((0, 0), 32), ((0, 1), -3)]

[((0, 0), 32), ((0, 1), -3)]

## Mappers of Mappers

Now create a MapReduce implementation to output the most common words. Let's now reinvent the wheel. Can we mapreduce within a mapper? Certainly!

In [18]:
def file_mapper(filename):
    """Create a mapper that reads in the lines of <filename>
    counts the words in each line and then returns the (key,value)
    pairs for those word counts"""
    f = open(filename)
    for line in f.readlines():
        for word in tokenize(line):
            yield (word,1)

Now run your mapper with the sum_reducer to see if it works.

In [19]:
filenames = ["data/genesis.txt",
            "data/Luke.txt",
            "data/Kings.txt"]
word_counts = map_reduce(filenames, file_mapper, sum_reducer)
print(type(word_counts))

<class 'list'>


In [20]:
sorted(word_counts,key=lambda word_counts:word_counts[1], reverse=True)

[('and', 7478),
 ('the', 6041),
 ('of', 3330),
 ('he', 1769),
 ('to', 1634),
 ('in', 1419),
 ('that', 1360),
 ('unto', 1278),
 ('his', 1206),
 ('him', 1181),
 ('said', 979),
 ('i', 916),
 ('was', 861),
 ('a', 848),
 ('for', 831),
 ('they', 748),
 ('it', 714),
 ('with', 661),
 ('shall', 651),
 ('which', 648),
 ('them', 648),
 ('is', 634),
 ('all', 622),
 ('not', 612),
 ('thou', 606),
 ('be', 603),
 ('lord', 601),
 ('my', 578),
 ('thy', 557),
 ('thee', 517),
 ('me', 511),
 ('god', 450),
 ('son', 423),
 ('came', 412),
 ('were', 389),
 ('when', 369),
 ('will', 369),
 ('but', 368),
 ('this', 359),
 ('from', 325),
 ('have', 322),
 ('there', 319),
 ('had', 318),
 ('house', 317),
 ('as', 315),
 ('ye', 304),
 ('king', 302),
 ('man', 298),
 ('out', 286),
 ('her', 281),
 ('their', 276),
 ('upon', 275),
 ('father', 272),
 ('saying', 263),
 ('went', 260),
 ('up', 258),
 ('israel', 256),
 ('you', 254),
 ('she', 245),
 ('are', 241),
 ('on', 240),
 ('into', 234),
 ('land', 231),
 ('behold', 222),
 ('b