# Exercise 11
Deadline: Tuesday, May 17 (end of day) 

##### MapReduce
MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. 
A MapReduce program is composed of a *map* procedure, which performs filtering and sorting, and a *reduce* method, which performs a summary operation.

![alt text](https://miro.medium.com/max/1400/1*DOdiH_em5zWcZivW7ZCYxw.png "mapreduce")

#### A basic MapReduce example:
Suppose we have a list of strings and want to identify the longest string. The following python function returns the longest string by simply iterating over the entire list.

In [1]:
def find_longest_string(list_of_strings):
    longest_string = None
    longest_string_len = 0 
    for s in list_of_strings:
        if len(s) > longest_string_len:
            longest_string_len = len(s)
            longest_string = s
    return longest_string

In [2]:
strings = ['find', 'longest', 'word', 'in', 'list']
%time print(find_longest_string(strings)) # the magic command prints the time it takes to run the statement
# why doesn't it print the cpu times of the func

longest
CPU times: total: 0 ns
Wall time: 0 ns


What happens if we feed a list of 50 million strings?

In [7]:
strings = ['find', 'longest', 'word', 'in', 'list'] * 10**7
%time print(find_longest_string(strings))

longest
CPU times: total: 578 ms
Wall time: 1.32 s


This is a problem, as the computation takes too long. We can break our code into smaller components and try to execute them in parallel. The idea is the following:
1. break data into many chunks
2. execute our function on every chunk in parallel
3. find the longest string among the outputs of all chunks  

More specifically, we compute the `len` of the string and compare it to the longest string to date, and then select the `max`value.

In [8]:
%%time
# prints the time it takes to run the cell

# step 1:
list_of_lengths = [len(s) for s in strings]
list_of_lengths = zip(strings, list_of_lengths)

# step 2:
max_length = max(list_of_lengths, key=lambda x: x[1]) # function tells max to only look at value of index 1 , anonymou
print(max_length)
# why does it take even longer than above?

('longest', 7)
CPU times: total: 1.34 s
Wall time: 3.95 s


We can redefine this concept by introducing a *mapper* and a *reducer*. The mapper is just the `len` function, whereas the reducer gets two tuples and return the one with the biggest length.

In [9]:
mapper = len

def reducer(p, c):
    return p if p[1] > c[1] else c

##### Short Theory 

Mapping: consists of applying a transformation function to an iterable to produce a new iterable. Items in the new iterable are produced by calling the transformation function on each item in the original iterable.

Filtering: consists of applying a predicate or Boolean-valued function to an iterable to generate a new iterable. Items in the new iterable are produced by filtering out any items in the original iterable that make the predicate function return false.

Reducing: consists of applying a reduction function to an iterable to produce a single cumulative value.

We can now rewrite our code in the MapReduce concept using the built-in python functions `map` and `reduce`:

In [11]:
%%time
from functools import reduce

# step 1:

# each string is 1:1 mapped to its length via the map function:
# it returns a list of values generated using the mapper function on the elements of the strings list
mapped = map(mapper, strings)

# run this to take a look at the list of values returned by the map operation above:
# just print few values (here the first 5)...
# print(list(mapped)[:5])

# associate the strings to their length:
mapped = zip(strings, mapped)

# print(list(mapped)[:5]) 

# step 2:
# we get the pair (string, length) where length is maximum:
# the reduced function is cumulatively applied to the mapped data
reduced = reduce(reducer, mapped)

print(reduced)

('longest', 7)
CPU times: total: 1.8 s
Wall time: 3.96 s


At this time the code does exactly the same thing as before (hence the equal timing) since we have not parallelized anything yet. Let's split our `strings` into chunks of equal size. We use a helper function to create the data chunks. Once we have this we iterate over the chunks and store the largest word, and finally reduce to find the longest word in the entire list.

In [12]:
def splitter(a, n_chunks):
    '''
    Splits the list "a" in "n_chunks" chunks.
    '''
    k, m = divmod(len(a), n_chunks)
    return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n_chunks)))

#### What does divmod() do?

For integers, the return value is the same as (a // b, a % b).

For floating point numbers the return value is (q, a % b), where q is usually math.floor(a / b) which is the whole part of the quotient.

In [13]:
data_chunks = splitter(strings, 50)
print(f'Each of the {len(data_chunks)} chunks has {len(data_chunks[0])} strings.')

Each of the 50 chunks has 1000000 strings.


In [14]:
%%time

# step 1:
reduced_all = []

# for each chunk of data we perform the map and reduce operations
for chunk in data_chunks:
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    
    reduced_chunk = reduce(reducer, mapped_chunk)
    # store the result of the reduce operation 
    reduced_all.append(reduced_chunk)

# step 2:
# reduce over all the chunks
global_reduced = reduce(reducer, reduced_all)
print(global_reduced)

# why doesn't the time really decrease

('longest', 7)
CPU times: total: 1.64 s
Wall time: 3.97 s


We can unify the `map` and `reduce` in the for loop in a single function. This will help us when we run the code in parallel.

In [15]:
%%time

def chunks_mapper(chunk):
    mapped_chunk = map(mapper, chunk)
    mapped_chunk = zip(chunk, mapped_chunk)
    return reduce(reducer, mapped_chunk)

# step 1:
mapped = map(chunks_mapper, data_chunks)  # for each of the data chunnks the chunks mapper is applied

# step2:
global_reduced = reduce(reducer, mapped)
print(global_reduced)

('longest', 7)
CPU times: total: 1.36 s
Wall time: 3.9 s


The final ingredient is to parallelize the `for` loop (step 1) using the `multiprocessing` module by using the `pool.map` instead of our regular `map` function.
***NOTE: this part might not work on Mac/Windows with Python >3.8!***

In [None]:
%%time
from multiprocessing import Pool

# create a Pool object
# the processes parameter defines how many processes (threads) to split the job into
pool = Pool(processes=32)

# we want a separate process for each chunk of data
# processes == number of chunks
data_chunks = splitter(strings, 32)

mapper = len
# step 1: now runs in parallel --> use pool.map instead of just map
mapped = pool.map(chunks_mapper, data_chunks)

# step 2:
global_reduced = reduce(reducer, mapped)
pool.close()
print(global_reduced)

---
Suppose we have a very large collection of articles or twitter tweets with many words and we would like to find the top 10 used words in the entire collection. We are going to use MapReduce on a built-in dataset from sklearn.

In [1]:
from sklearn.datasets import fetch_20newsgroups
news = fetch_20newsgroups()

extend = 1
data = news.data * extend
print('Number of articles:', len(data))

Number of articles: 11314


We need to do some preliminary cleaning of the dataset: clean words, remove stop words and non-english words.

In [7]:
import re
# here you might want to import stop_words instead of _stop_words
from sklearn.feature_extraction import _stop_words

def clean_word(word):
    return re.sub(r'[^\w\s]','',word).lower()

def word_not_in_stopwords(word):
    return word not in _stop_words.ENGLISH_STOP_WORDS and word and word.isalpha()

The following function finds the top 10 words in the entire collection. We use the `Counter` object to keep track of individual words and counts by using its `update` routine. Individual tokens in the text have to be cleaned and filtered for `ENGLISH_STOP_WORDS`. Use the two functions given above together with the built-in `filter` routine to clean the tokens. We fill in the function below and find the 10 most common words by running it.

* We fill in the function below: map every word in the text to their 'cleaned' version obtained through the function `clean_word`. Then, use the `find_top_words` function to compute the 10 most common word in the articles in your dataset.

In [10]:
from collections import Counter

def find_top_words(data):
    cnt = Counter() # the counter object
    for text in data: # loop over the articles in the dataset
        tokens_in_text = text.split() # get the list of different words in the text
        
        tokens_in_text = list(map(clean_word, tokens_in_text))
        
        # filter the tokens for word_not_in_stopwords() using filter()
        tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
        
        # update the counter object with the cleaned texts
        cnt.update(tokens_in_text)
    
    # return the 10 most common words in the texts in the counter object
    return cnt.most_common(10)

In [4]:
find_top_words(data)

[('subject', 12252),
 ('lines', 11824),
 ('organization', 11185),
 ('writes', 7836),
 ('article', 6754),
 ('people', 5832),
 ('dont', 5813),
 ('like', 5757),
 ('just', 5579),
 ('university', 5544)]

* We implement the `mapper`, `reducer` and `chunk_mapper` functions for this specific problem.

In [4]:
# We want this function to accept some text to:
# - split it into words
# - to clean the words
# - return a counter object created out of these cleaned words (already implemented below)

from functools import reduce


def mapper(text):
    text = text.split()
    tokens_in_text = map(clean_word, text)
    tokens_in_text = filter(word_not_in_stopwords, tokens_in_text)
    
    # print(Counter(tokens_in_text))
    return Counter(tokens_in_text)

# You want this function to update the counter object cnt1 with a second counter object cnt2
def reducer(cnt1, cnt2):
    cnt1.update(cnt2)
    return cnt1

# It has to map the data in the chunk with the mapper function and reduce them
# (the reduce operation consists in the cumulative update performed by the reducer)
def chunk_mapper(chunk):
    mapped = map(mapper, chunk)
    reduced = reduce(reducer, mapped)
    return reduced


* We split the dataset into 16 individual chunks though the `splitter` function (see above) and initiate a `Pool` object with 1 process per chunk.

In [8]:
from multiprocessing import Pool
data_chunks = splitter(data, 16)

# pool = #  Pool(processes = 16)

* We implement the MapReduce scheme (see the example above) and run the code in parallel using the functions you have just defined. Display the 5 most common words.

In [9]:
%%time

# step 1:
# mapped = pool.map(chunk_mapper, data_chunks)

mapped = map(chunk_mapper, data_chunks)

# step 2:
# reduced = reduce(reducer, mapped)
global_reduced = reduce(reducer, mapped)

# print global

print(*global_reduced.most_common(5))

('subject', 12252) ('lines', 11824) ('organization', 11185) ('writes', 7836) ('article', 6754)
Wall time: 10.2 s


---

We change the `extend` keyword in the cell where we load the data from 1 to 5. What do you observe in the timings of the parallelized and non-parallelized cases. How do both the timings of the parallelized and non-parallelized cases scale with the `extend` factor?

In [8]:
# timings scaling with the extend factor

from sklearn.datasets import fetch_20newsgroups
news = fetch_20newsgroups()

extend = 5
data = news.data * extend

In [11]:
%%time

# non-parallized approach

mapped = map(mapper, data)

reduced = reduce(reducer, mapped)

print(reduced.most_common(5))

[('subject', 61260), ('lines', 59120), ('organization', 55925), ('writes', 39180), ('article', 33770)]
Wall time: 49.6 s


In [12]:
%%time

# parallized approach

# generate chunks

data_chunks = splitter(data, 16 * extend)

mapped = map(chunk_mapper, data_chunks)

# step 2:
# reduced = reduce(reducer, mapped)
global_reduced = reduce(reducer, mapped)

# print global

print(global_reduced.most_common(5))

[('subject', 61260), ('lines', 59120), ('organization', 55925), ('writes', 39180), ('article', 33770)]
Wall time: 49.1 s


##### Results

By incresing the data by a factor of 5 the calculation time to generate the reduced counter object increases by a factor of

---

Using the MapReduce paradigm, we find the word with the largest number of consonants in the text you find in the `some.txt` file. Contracted forms like "I'm" or "don't", and genitive forms like "hunter's" shall be considered as different words with respect to their non-contracted, or non genitive, versions.

In [18]:
# We find the word with the largest number of consonants in the text

from collections import Counter
from functools import reduce

raw  = open('data/some.txt')
some = raw.readlines()[0]
raw.close()

cnt = Counter()

# define functions that clean strings, count consonants and reduce mapping


def string_cleaner(string):
    trash = "()?'-,"
    for t in trash:
        string = string.replace(t, " ")

    string = string.replace('  ',' ')
    
    return string.split() 

def cons_counter(word):
                                      
    letters = Counter(word)
    del letters['a'], letters['e'], letters['i'],letters['o'],letters['u']
    
    return (sum(letters.values()), word)

def reducer(word1, word2):
    return word1 if word1[0] > word2[0] else word2

# implementation of optimized parallel mapping

def chunk_mapper(chunk):
    mapped = map(cons_counter, chunk)
    reduced = reduce(reducer, mapped)
    return reduced

In [19]:
# call the functions

some = string_cleaner(some)
chunks = splitter(some, 30)

# cons_counter('elsa')

mapped = map(chunk_mapper, chunks)

global_reduced = reduce(reducer, mapped)

print(('The word with the most consonants is `{}` which contains {} consonants').format(global_reduced[1], global_reduced[0]))

The word with the most consonants is `Searchlights` which contains 9 consonants
