# MapReduce in Word Counting

In [22]:
import urllib, string, random
from nltk import word_tokenize
from collections import Counter
from operator import itemgetter
from collections import defaultdict
from functools import partial
from itertools import product
import numpy as np

### 0. Data: Alice's Adventure

In [2]:
firehose = urllib.urlopen(url="http://www.gutenberg.org/cache/epub/11/pg11.txt")
documents = firehose.readlines()

### 1. Old Technique: Brute Force

In [3]:
def word_count_old(documents):
    return Counter(word for document in documents for word in word_tokenize(document))

In [4]:
%%time
countOld = word_count_old(documents)
countOld = sorted([(word,count) for word,count in countOld.iteritems()], key=itemgetter(1), reverse=True)

CPU times: user 622 ms, sys: 10.5 ms, total: 633 ms
Wall time: 630 ms


In [5]:
countOld[:10]

[(',', 2565),
 ('the', 1675),
 ("'", 1130),
 ('.', 1101),
 ('and', 824),
 ('to', 791),
 ('a', 669),
 ('of', 602),
 ('it', 525),
 ('she', 506)]

### 2. MapReduce

In [6]:
class MapReduceWC:
    # NB: _.. methods are module-private, not imported when import *.py
    # NB: __.. methods are class-private, not accessible.

    def __init__(self, documents):
        self.documents = documents
        self.collector = defaultdict(list)
    
    def __wc_mapper(self, document):
        # for each word in the document, lazy-return (word,1).
        for word in word_tokenize(document):
            yield (word,1)
            
    def __wc_reducer(self, word, counts):
        # sum up the counts for a word.
        yield (word, sum(counts))
        
    def word_count(self):
        for document in self.documents:
            for word,count in self.__wc_mapper(document):
                self.collector[word].append(count)
        return [output
                for word,counts in self.collector.iteritems()
                for output in self.__wc_reducer(word,counts)]
    

In [7]:
%%time
mrwc = MapReduceWC(documents)
countNew = mrwc.word_count()

CPU times: user 577 ms, sys: 37.3 ms, total: 614 ms
Wall time: 588 ms


In [8]:
countNew = sorted(countNew, key=itemgetter(1), reverse=True)
countNew[:10]

[(',', 2565),
 ('the', 1675),
 ("'", 1130),
 ('.', 1101),
 ('and', 824),
 ('to', 791),
 ('a', 669),
 ('of', 602),
 ('it', 525),
 ('she', 506)]

### 4. Additional Benchmarking with Brown

**NB: No performance difference with 1 machine. However MapReduce allows distributed computing.**

In [9]:
from nltk.corpus import brown
brownSents = [' '.join(sent) for sent in brown.sents()]

In [10]:
%%time
# OLD WORD COUNT METHOD
countOld = word_count_old(brownSents)
countOld = sorted([(word,count) for word,count in countOld.iteritems()], key=itemgetter(1), reverse=True)
print countOld[:10]

[(u'the', 62713), (u',', 58336), (u'.', 50270), (u'of', 36080), (u'and', 27915), (u'to', 25732), (u'a', 21888), (u'in', 19540), (u'that', 10328), (u'is', 10100)]
CPU times: user 9.98 s, sys: 122 ms, total: 10.1 s
Wall time: 10 s


In [11]:
%%time
# NEW WORD COUNT METHOD
mrwc = MapReduceWC(brownSents)
countNew = mrwc.word_count()
countNew = sorted(countNew, key=itemgetter(1), reverse=True)
print countNew[:10]

[(u'the', 62713), (u',', 58336), (u'.', 50270), (u'of', 36080), (u'and', 27915), (u'to', 25732), (u'a', 21888), (u'in', 19540), (u'that', 10328), (u'is', 10100)]
CPU times: user 9.97 s, sys: 117 ms, total: 10.1 s
Wall time: 10 s


# MapReduce in General

In [12]:
# GENERALIZED MAPREDUCE
def map_reduce(inputs, mapper, reducer):
    collector = defaultdict(list)
    for inpt in inputs:
        for key,value in mapper(inpt):
            collector[key].append(value)
    return [output
            for key,values in collector.iteritems()
            for output in reducer(key,values)]

In [13]:
# GENERALIZED REDUCER
def reduce_values_using(aggregation_fn, key, values):
    yield (key, aggregation_fn(values))
def values_reducer(aggregation_fn):
    # turns a (values->output) aggregation function into a reducer: (key,values)->(key,output).
    return partial(reduce_values_using, aggregation_fn)

In [14]:
# CREATEING SPECIALIZED REDUCERS
sum_reducer = values_reducer(sum)
max_reducer = values_reducer(max)
min_reducer = values_reducer(min)
count_distinct_reduer = values_reducer(lambda values: len(set(values)))

# Application: Matrix Multiplication

** a. Matrix Representation**

* Instead of storing (sparse) matrices using list of lists, only store non-zero cells with their indices indicated (e.g. $("A", i, j, value)$ for $A_{ij}$ if $A_{ij}\neq 0$). 

** b. Efficient Computation Using Indices**

* For $C = A\cdot B$ a cell in matrix $C$, $C_{ik}$ is only related to $A_{ij}$ and $B_{jk}$.
* In the following graph, $i$ and $k$ locate a cell in $C$ (i.e. $C_{ik}$), the column/row index of $A$/$B$, $j$, is used to indicate that these two cells "meet" in the computation.
* Therefore, for a pair of element representation $("A", i, j, value1)$ and $("B", j, k, value2)$, we know extract $(i,k)$ to locate the cell $C_{ik}$, and use $j$ to indicate that these two cells meet in computation.
* **NB:** In the following, $m$ is used *a la place de* $j$ for "meet-index".

In [15]:
#   A_ij   B_jk
#     ^^     ^^
#     ||_____||
#     |   |   |
#     |"meet" |
#     | index |
#     |_______|
#         |
#        C_ik

In [16]:
# MAPPER
def matrix_multiply_mapper(m, element):
    # m: the meet-index.
    # element: (matrix-name, i, j, value).
    name, i, j, value = element
    if name=="A":
        for k in range(m): # cuz A_ij is relevant for computing C_ik, k=1,..,m.
            yield((i,k),(j,value)) # using A's col dimension for meet-indexing.
    else:
        for k in range(m):
            yield((k,j),(i,value)) # using B's row dimension for meet-indexing.

In [17]:
# REDUCER
def matrix_multiply_reducer(m, key, indexedValues):
    #                           ^         ^
    #                           |         |
    #   expect C's index (e.g. C_ik)    expect (meet-index, value)
    resultsByIndex = defaultdict(list)
    for index,value in indexedValues:
        resultsByIndex[index].append(value)
    sumProduct = sum(results[0]*results[1] 
                     for results in resultsByIndex.values() # results: [valFromA,valueFromB].
                     if len(results)==2) # non-meet C_ik cells won't have two values appended.
    if sumProduct!=0.0:
        yield (key, sumProduct) # key: C's index (e.g. C_ik); sumProduct: C_ik's values.

In [18]:
# TEST ON TOY EXAMPLE
entries = [("A", 0, 0, 3), ("A", 0, 1, 2), ("B", 0, 0, 4), ("B", 0, 1, -1), ("B", 1, 0, 10)]
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)
map_reduce(entries, mapper, reducer)
    # output in matrix rep.:
    # [[32,-3, 0],
    #  [ 0, 0, 0]]

[((0, 1), -3), ((0, 0), 32)]

### C. Comparing Brute-Force Matrix Multiplication with MapReduce Version

In [19]:
# MATRIX CONVERTER
#  [[..],[..],..] => [element0, element1, ..].
def matrix_convert(M, matrixName=None):
    if matrixName==None: # randomly generate matrix name: len5 alphanumeric.
        matrixName=''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(5))
    return [(matrixName, i, j, M[i][j]) for i in range(M.shape[0]) for j in range(M.shape[1]) if M[i][j]!=0]

In [20]:
# SPARSE MATRIX GENERATOR
def sparse_matrix(shape, prop=.1): # prop: proportion of 1's in distribution.
    assert len(shape)==2
    distr = list(np.repeat(0,int((1-prop)*10))) + list(np.repeat(1,int(prop*10)))
    matrix = np.zeros(shape)
    for i in range(shape[0]):
        for j in range(shape[1]):
            matrix[i][j] = random.choice(distr)
    return matrix

In [23]:
M = sparse_matrix((10,10))
matrix_convert(M,matrixName='M')

[('M', 0, 9, 1.0),
 ('M', 1, 5, 1.0),
 ('M', 1, 6, 1.0),
 ('M', 1, 8, 1.0),
 ('M', 2, 7, 1.0),
 ('M', 3, 4, 1.0),
 ('M', 3, 5, 1.0),
 ('M', 4, 0, 1.0),
 ('M', 5, 7, 1.0),
 ('M', 5, 9, 1.0),
 ('M', 6, 6, 1.0),
 ('M', 7, 2, 1.0),
 ('M', 8, 0, 1.0),
 ('M', 9, 1, 1.0),
 ('M', 9, 2, 1.0),
 ('M', 9, 5, 1.0)]

In [24]:
# BRUTE-FORCE MATRIX MULTIPLICATION
def matrix_multiply(A,B):
    assert type(A)==type(B)==np.ndarray # type checking.
    assert A.shape[1]==B.shape[0] # dimension checking.
    def sum_of_product(v,w):
        assert len(v)==len(w)
        total = 0
        for i in range(len(v)):
            total += v[i]*w[i]
        return total
    C = np.zeros((A.shape[0],B.shape[1]))
    for i in range(A.shape[0]):
        for j in range(B.shape[1]):
            C[i][j] = sum_of_product(A[i],B[:,j])
    return C       

In [33]:
%%time
A = sparse_matrix((100,500))
B = sparse_matrix((500,100))

CPU times: user 77.5 ms, sys: 2.26 ms, total: 79.8 ms
Wall time: 78.1 ms


In [34]:
%%time
matrix_multiply(A,B)

CPU times: user 1.62 s, sys: 6.25 ms, total: 1.63 s
Wall time: 1.63 s


array([[ 4.,  5.,  6., ...,  7.,  6.,  9.],
       [ 5.,  4.,  2., ...,  4.,  5.,  9.],
       [ 5.,  5.,  7., ...,  7.,  8.,  5.],
       ..., 
       [ 4.,  5.,  2., ...,  9.,  2.,  2.],
       [ 4.,  7.,  3., ...,  3.,  2.,  6.],
       [ 3.,  5.,  4., ...,  4.,  2.,  0.]])

In [35]:
AConv = matrix_convert(A,matrixName='A')
BConv = matrix_convert(B,matrixName='B')
ABList = AConv+BConv
mapper = partial(matrix_multiply_mapper, 500)
reducer = partial(matrix_multiply_reducer, 500)

In [36]:
%%time
map_reduce(ABList, mapper, reducer)
print "DONE"

DONE
CPU times: user 6.57 s, sys: 160 ms, total: 6.73 s
Wall time: 6.71 s
