## Map Reduce

In [1]:
from functools import reduce
from itertools import groupby

## Map, Reduce, Groupby Functions

In [2]:
help(map)

Help on class map in module builtins:

class map(object)
 |  map(func, *iterables) --> map object
 |  
 |  Make an iterator that computes the function using arguments from
 |  each of the iterables.  Stops when the shortest iterable is exhausted.
 |  
 |  Methods defined here:
 |  
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |  
 |  __iter__(self, /)
 |      Implement iter(self).
 |  
 |  __next__(self, /)
 |      Implement next(self).
 |  
 |  __reduce__(...)
 |      Return state information for pickling.
 |  
 |  ----------------------------------------------------------------------
 |  Static methods defined here:
 |  
 |  __new__(*args, **kwargs) from builtins.type
 |      Create and return a new object.  See help(type) for accurate signature.



In [3]:
def my_sum(a):
    return a + a + a

In [4]:
my_sum(1)

3

In [5]:
[1,2,3,4]

[1, 2, 3, 4]

In [6]:
list(map(my_sum, [1,2,3,4,5,6,7]))

[3, 6, 9, 12, 15, 18, 21]

In [7]:
help(groupby)

Help on class groupby in module itertools:

class groupby(builtins.object)
 |  groupby(iterable, key=None)
 |  
 |  make an iterator that returns consecutive keys and groups from the iterable
 |  
 |  iterable
 |    Elements to divide into groups according to the key function.
 |  key
 |    A function for computing the group category for each element.
 |    If the key function is not specified or is None, the element itself
 |    is used for grouping.
 |  
 |  Methods defined here:
 |  
 |  __getattribute__(self, name, /)
 |      Return getattr(self, name).
 |  
 |  __iter__(self, /)
 |      Implement iter(self).
 |  
 |  __next__(self, /)
 |      Implement next(self).
 |  
 |  __reduce__(...)
 |      Return state information for pickling.
 |  
 |  __setstate__(...)
 |      Set state information for unpickling.
 |  
 |  ----------------------------------------------------------------------
 |  Static methods defined here:
 |  
 |  __new__(*args, **kwargs) from builtins.type
 |      Cre

In [8]:
[('a', 1), ('a', 2), ('b', 5)]
list(groupby([('a', 1), ('a', 2), ('b', 5)], key=lambda x: x[0]))

[('a', <itertools._grouper at 0x10a4748e0>),
 ('b', <itertools._grouper at 0x10a474550>)]

In [9]:
help(reduce)

Help on built-in function reduce in module _functools:

reduce(...)
    reduce(function, sequence[, initial]) -> value
    
    Apply a function of two arguments cumulatively to the items of a sequence,
    from left to right, so as to reduce the sequence to a single value.
    For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
    ((((1+2)+3)+4)+5).  If initial is present, it is placed before the items
    of the sequence in the calculation, and serves as a default when the
    sequence is empty.



In [10]:
reduce(lambda x, y: x + y, [1, 2, 3])

# How it works for reduce
# 1. [1, 2, 3]
# 2. [3, 3]
# 3. [6]
# 3. 6 

6

## Mapping

In [11]:
words = ['Deer', 'Bear', 'River', 'Car',
         'Car', 'River', 'Deer', 'Car', 'Bear']

mapping = list(map(lambda x: (x, 1), words))
print(mapping)

[('Deer', 1), ('Bear', 1), ('River', 1), ('Car', 1), ('Car', 1), ('River', 1), ('Deer', 1), ('Car', 1), ('Bear', 1)]


## Shuffling

In [12]:
sorted_mapping = sorted(mapping)
print(sorted_mapping)

[('Bear', 1), ('Bear', 1), ('Car', 1), ('Car', 1), ('Car', 1), ('Deer', 1), ('Deer', 1), ('River', 1), ('River', 1)]


## Reducing

In [13]:
# ------ breakdown of 'Reducing' Phase ---------
grouper = groupby(sorted_mapping, lambda p: p[0])

results = []

# loop through each group
for group in grouper:
    # key that is in the group
    key = group[0] 
    print(key)
    
    # list of groups for this key
    key_value_pairs = list(group[1]) 
    print(key_value_pairs)
    
    # only values for current key
    values_for_this_key = list(map(lambda p: p[1], key_value_pairs)) 
    values_for_this_key = [i[1] for i in key_value_pairs]
    print(values_for_this_key)
    
    # final part of map 'reduce', for this key
    count_of_current_key = reduce(lambda x, y: x + y, values_for_this_key) 
    print(count_of_current_key)
    
    # put it to results
    results.append((key, count_of_current_key))

    # runs only one if break is uncommented
    # break

Bear
[('Bear', 1), ('Bear', 1)]
[1, 1]
2
Car
[('Car', 1), ('Car', 1), ('Car', 1)]
[1, 1, 1]
3
Deer
[('Deer', 1), ('Deer', 1)]
[1, 1]
2
River
[('River', 1), ('River', 1)]
[1, 1]
2


In [14]:
results

[('Bear', 2), ('Car', 3), ('Deer', 2), ('River', 2)]

In [15]:
grouper = groupby(sorted_mapping, lambda p: p[0])

final = map(
    lambda l: (l[0], reduce(lambda x, y: x + y, map(lambda p: p[1], l[1]))), 
    grouper # <- [('Bear', 1), ('Bear', 1)] ---->     l[0] = 'Bear'     and      l[1] = 1
)
print(list(final))

[('Bear', 2), ('Car', 3), ('Deer', 2), ('River', 2)]


In [16]:
results

[('Bear', 2), ('Car', 3), ('Deer', 2), ('River', 2)]

## Simple Map Reduce Illustration

See each different stage of mapping, shuffling and reducing with a smaller dataset.

In [17]:
! python simple.py

Mapping: [('Deer', 1), ('Bear', 1), ('River', 1), ('Car', 1), ('Car', 1), ('River', 1), ('Deer', 1), ('Car', 1), ('Bear', 1)]
Shuffling: [('Bear', 1), ('Bear', 1), ('Car', 1), ('Car', 1), ('Car', 1), ('Deer', 1), ('Deer', 1), ('River', 1), ('River', 1)]
Reducing: [('Bear', 2), ('Car', 3), ('Deer', 2), ('River', 2)]


## Single Machine and Multiple Machines

Processing a larger dataset, `pg2701.txt`, the task is to find the most common words in the book of **Moby Dick; or The Whale**.

### Single Machine

If the code is written without the idea of Map Reduce, a single node has to do all different tasks in the job.

- Read the file
- Find all the words
- Keep a count for each different word in the memory
- Sort the words by the maximum occurance
- Print top N of the words

In [18]:
%%time

! python single_machine.py

max: the = 14620
[["the", 14620], ["project", 91], ["gutenberg", 94], ["ebook", 10], ["of", 6732], ["moby", 89], ["dick", 89], ["or", 797], ["whale", 1233], ["by", 1226]]
CPU times: user 6.79 ms, sys: 8.12 ms, total: 14.9 ms
Wall time: 752 ms


### Multiple Machines with MapReduce Algorithm

The tasks in the job divided into seperate tasks, and each task can be performed on **a seperate node**. The result of different tasks can be shared between nodes through network or from the disk.

- Mapping is the task to map a value to each key
- Shuffle goes through the mapped values, and sorts every key value pair
- Reduce task reduces each key value pair to a single key with the aggregated value

In [19]:
%%time

! python mapping.py

CPU times: user 6.35 ms, sys: 7.62 ms, total: 14 ms
Wall time: 697 ms


In [20]:
%%time

! python shuffling.py

CPU times: user 44.4 ms, sys: 19.3 ms, total: 63.7 ms
Wall time: 3.38 s


In [21]:
%%time

! python reducing.py

max: the = 14620
CPU times: user 4.18 ms, sys: 6.27 ms, total: 10.5 ms
Wall time: 613 ms
