## Devising the MapReduce solution

In [1]:
L = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [20]:
def mapper(fun, *iter):
    for i in zip(*iter):
        yield fun(*i)

m = list(mapper(lambda x: x**2, L))
print(m)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


In [22]:
def reducer(fun, seq):
    if len(seq)==1:
        return seq[0]
    else:
        return fun(reducer(fun, seq[:-1]), seq[-1])

L = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
m = list(mapper(lambda x: x**2, L))
r = reducer(lambda x, y: x+y, m)
print(r)

285


## Demonstrating counting words

In [25]:
from  urllib import request

url = 'http://gutenberg.readingroo.ms/2/6/0/2600/2600.txt'
response = request.urlopen(url)
text = response.read().decode('utf-8')[627:]

HTTPError: HTTP Error 403: Forbidden

In [5]:
print (text[:37])

WAR AND PEACE

By Leo Tolstoy/Tolstoi


In [6]:
words = text.split()
print ('Number of words: %i' % len(words))

Number of words: 566218


In [7]:
import os
if os.name == "nt":
    #Safer multithreading on Windows
    from multiprocessing.dummy import Pool
else:
    #Multiprocessing on Linux,Mac
    from multiprocessing import Pool
    
from multiprocessing import cpu_count
from functools import partial

def remove_punctuation(text):
    return ''.join([l for l in text if l not in ['.', 
            ',', '!', '?', '"']])

def count_words(list_of_words, keywords):
    results = list()
    for word in list_of_words:
        for keyword in keywords:
            if keyword == remove_punctuation(
                            word.upper()):
                results.append((keyword,1))
    return results

def Partition(data, size):
    return [data[x:x+size] for x in range(0, len(data), 
                                          size)]

def Distribute(function, data, cores): 
    pool = Pool(cores)
    results = pool.map(function, data)
    pool.close()
    return results

def Shuffle_Sort(L):
    # Shuffle
    Mapping = dict()
    for sublist in L:
        for key_pair in sublist:
            key, value = key_pair
            if key in Mapping:
                Mapping[key].append(key_pair)
            else:
                Mapping[key] = [key_pair]
    return [Mapping[key] for key in Mapping]

def Reduce(Mapping):
  return (Mapping[0][0], sum([value for (key, value
                                ) in Mapping]))

In [8]:
n = cpu_count()
print ('You have %i cores available for MapReduce' % n)

You have 8 cores available for MapReduce


In [9]:
Map = partial(count_words, 
              keywords=['WAR', 'PEACE', 'RUSSIA', 
                        'NAPOLEON'])
map_result = Distribute(Map, 
                        Partition(
        words,len(words)//n+1), n)
print ('map_result is a list made of %i elements' % 
       len(map_result))
print ('Preview of one element: %s]'% map_result[0][:5])

map_result is a list made of 8 elements
Preview of one element: [('WAR', 1), ('PEACE', 1), ('WAR', 1), ('WAR', 1), ('RUSSIA', 1)]]


In [10]:
Shuffled = Shuffle_Sort(map_result)
print ('Shuffled is a list made of %i elements' % 
       len(Shuffled))
print ('Preview of first element: %s]'% Shuffled[0][:5])
print ('Preview of second element: %s]'% Shuffled[1][:5])

Shuffled is a list made of 4 elements
Preview of first element: [('RUSSIA', 1), ('RUSSIA', 1), ('RUSSIA', 1), ('RUSSIA', 1), ('RUSSIA', 1)]]
Preview of second element: [('PEACE', 1), ('PEACE', 1), ('PEACE', 1), ('PEACE', 1), ('PEACE', 1)]]


In [11]:
result = Distribute(Reduce, Shuffled, n)
print ('Emitted results are: %s' % result)

Emitted results are: [('RUSSIA', 156), ('PEACE', 111), ('NAPOLEON', 469), ('WAR', 288)]


In [27]:
import urllib.request
url = "https://gutenberg.org/files/1661/1661-0.txt"
text = urllib.request.urlopen(url).read().decode(
                                    'utf-8')[723:]
words = text.split()

URLError: <urlopen error [WinError 10061] Impossibile stabilire la connessione. Rifiuto persistente del computer di destinazione>

In [13]:
print (text[:65])
print ('\nTotal words are %i' % len(words))

THE ADVENTURES OF SHERLOCK HOLMES

by

SIR ARTHUR CONAN DOYLE

Total words are 107431


In [14]:
Map = partial(count_words, 
              keywords=['WATSON', 'ELEMENTARY'])
result = Distribute(Reduce, 
                    Shuffle_Sort(Distribute(Map, 
                    Partition(words,len(words)//n), n)),
                    1)
print ('Emitted results are: %s' % result)

Emitted results are: [('ELEMENTARY', 1), ('WATSON', 81)]
