# Introducción a MapReduce



En este notebook vamos a ver la técnica MapReduce, que es una técnica ampliamente utilizada para tratar grandes cantidades de datos. Existen múltiples implementaciones de MapReduce, incluido el famoso Apache Hadoop. Vamos a ver el concepto de forma intuitiva y con algunos ejemplos.



```
# This is formatted as code
```

Comenzaremos con una tarea básica: dada una lista de cadenas de texto, buscar cual es la cadena más larga. Esto es bastante simple de hacer.

In [1]:
def find_longest_string(list_of_strings):
    longest_string = None
    longest_string_len = 0 
    for s in list_of_strings:
        if len(s) >= longest_string_len:
            longest_string_len = len(s)
            longest_string = s
    return longest_string

Para una pequeña lista esto funciona razonablemente rápido:

In [2]:
list_of_strings = ['abc', 'python', 'wxyz']

%time max_length = print(find_longest_string(list_of_strings))

python
CPU times: user 1.93 ms, sys: 0 ns, total: 1.93 ms
Wall time: 1.79 ms


Pero si en vez de 3 elementos tuviéramos 30.000.000?  

In [3]:
large_list_of_strings = list_of_strings*10000000
%time max_length = max(large_list_of_strings, key=len)


CPU times: user 1.37 s, sys: 4.33 ms, total: 1.37 s
Wall time: 1.37 s


El tiempo de respuesta es ya de algunos segundos para una operación muy simple.

Una forma de mejorar el tiempo de cálculo es usando una CPU más potente y más rápida. El escalado de su sistema mediante el uso de hardware mejor y más rápido se denomina **escalado vertical**. Pero está solución no funciona siempre o no es posible.

En vez de esa opción, se podría intentar un **escalado horizontal**, diseñando el código para que pueda ejecutarse en paralelo y ser más rápido cuando se añadan más procesadores o CPUs.



Para hacer eso, hay que dividir el código en componentes más pequeños y ejecutar cálculos en paralelo de la siguiente manera:

1.   dividir nuestros datos en fragmentos,
2.   ejecutar la función `find_longest_string` en cada fragmento en paralelo y
3.  encontrar la cadena más larga entre las salidas de todos los fragmentos.

El código de la función `find_longest_string` lo vamos a dividir en dos pasos:

1.   Calcular la longitud `len` de todas las cadenas
2.   Obtener el valor máximo `max`



In [4]:
%%time
# paso 1:
list_of_string_lens = [len(s) for s in large_list_of_strings]
list_of_string_lens = zip(large_list_of_strings, list_of_string_lens)

# paso 2:
max_len = max(list_of_string_lens, key=lambda t: t[1])
print(max_len)

('python', 6)
CPU times: user 5.56 s, sys: 455 ms, total: 6.01 s
Wall time: 6.02 s


Ahora el código se ejecuta bastante más lento que antes porque en lugar de realizar una sola pasada por todas las cadenas, hace 2:  una para calcular la longitud y otra para encontrar el valor máximo. 

Entonces el paso 2 no tiene como entrada la lista original de cadenas sino los datos preprocesados. El paso 1 es un mapeador (***map***) pues asigna un valor a otro valor y el paso dos es un reductor (***reduce***) porque obtiene una lista de valores y produce un valor único.

```
mapper = len

def reducer (p,c):
    if p[1] > c[1]:
        return p
    return c
```

Reescribiendo el código utilizando funciones incluidas en Python `map` y `reduce`, incluidas en la librería `functools`.

In [5]:
%%time

import functools


# paso 1
mapped = map ( len , large_list_of_strings )
mapped = zip(large_list_of_strings, mapped)

# paso 2

reduced = functools.reduce ( lambda x, y: x if x[1] > y[1] else y, mapped )

CPU times: user 5.79 s, sys: 8.33 ms, total: 5.8 s
Wall time: 5.8 s


Este código hace exactamente lo mismo pero es más genérico y paralelizable.

Para paralelizar vamos a dividir la entrada en trozos (chunks) de igual tamaño con la función `chunks`.

In [6]:
def chunks(l, n):
    n = max(1, n)
    return (l[i:i+n] for i in range(0, len(l), n))

In [7]:
%%time
data_chunks = chunks ( large_list_of_strings , 30)

# paso 1 

reduced_all = []

for chunk in data_chunks:
  mapped_chunk = map ( len , chunk )
  mapped_chunk = zip(chunk, mapped_chunk)

  reduced_chunk = functools.reduce ( lambda x, y: x if x[1] > y[1] else y, mapped_chunk )
  reduced_all.append (reduced_chunk)

# paso 2

reduced = functools.reduce ( lambda x, y: x if x[1] > y[1] else y,reduced_all)
print (reduced)

('python', 6)
CPU times: user 7.94 s, sys: 78.1 ms, total: 8.01 s
Wall time: 8.03 s


Refeactorizando para dejar el proceso con dos funciones obtenemos:

In [8]:
%%time
def chunks_mapper(chunk):
    mapped_chunk = map(len , chunk) 
    mapped_chunk = zip(chunk, mapped_chunk)
    return functools.reduce ( lambda x, y: x if x[1] > y[1] else y, mapped_chunk )



data_chunks = chunks ( large_list_of_strings , 30)

#paso 1:
mapped = map(chunks_mapper, data_chunks)

#paso 2
reduced = functools.reduce ( lambda x, y: x if x[1] > y[1] else y,mapped)

print (reduced)


('python', 6)
CPU times: user 7.2 s, sys: 16.4 ms, total: 7.21 s
Wall time: 7.22 s


A continuación vamos a intentar paralelizar el paso 1 usando el módulo `multiprocesing` con la función `pool.map` en vez de la función `map` normal.

In [9]:
%%time

import multiprocessing as mp 
pool = mp.Pool(16)


data_chunks = chunks ( large_list_of_strings , 16)

# paso 1 
mapped = pool.map(chunks_mapper, data_chunks)

# paso 2
reduced = functools.reduce ( lambda x, y: x if x[1] > y[1] else y,mapped)

print (reduced)

('python', 6)
CPU times: user 8.6 s, sys: 804 ms, total: 9.4 s
Wall time: 32.2 s


Como podéis ver la mejora no es importante (cuando no empeora los tiempos) aunque son por los problemas de paralelización del entorno y la exclusión mútua sobre la variable `mapped`.

En cualquier caso, se ha arquitecturado una solución usando las funciones `map`y `reduce` que puede ejecutarse en paralelo. Esta arquitectura tiene dos ventajas:



1.   Es escalable: si hay más datos se pueden añadir más unidades de procesamiento sin cambiar el código
2.   Es genérica: esta arquitectura permite una gran cantidad de tareas reemplazando las funciones `map` y `reduce`.

En ambos casos se supone que los datos son enormes y estáticos. Lo que implica que dividir en fragmentos cada vez es poco eficiente y redundante. Por lo que se supone que los datos se almacenan en fragmentos (o *shards*) 
de origen.




---


Ahora vamos a ver otro caso. Tenemos una texto relativamente largo que es la Declaración Universal de Derechos Humanos. 



In [10]:
import nltk
nltk.download('udhr')

[nltk_data] Downloading package udhr to /root/nltk_data...
[nltk_data]   Unzipping corpora/udhr.zip.


True

In [11]:
from nltk.corpus import udhr
import re

text_udhr = udhr.raw('Spanish-Latin1')
#text_udhr = udhr.raw('Catalan-Latin1')
text_udhr[:25]

'Declaración Universal de '

Realizando un limpieza de datos y convirtiéndolos a minúsculas para obtener un array de cadenas 

In [12]:
%%time

def clean_word ( word ):
  return re.sub(r'[^\w\s]','',word).lower()


clean_text = clean_word ( text_udhr)
tokens = clean_text.split ()
ls = find_longest_string (tokens)

print (ls, len(ls))



arbitrariamente 15
CPU times: user 2.06 ms, sys: 961 µs, total: 3.02 ms
Wall time: 8.77 ms


Y realizado con `map` y `reduce`.

In [None]:
%%time

# TODO
data_chunks = chunks ( tokens , 1)

#paso 1:
mapped = map(chunks_mapper, data_chunks)

#paso 2
reduced = functools.reduce ( lambda x, y: x if x[1] > y[1] else y,mapped)

print (reduced)



('arbitrariamente', 15)
CPU times: user 48.3 ms, sys: 9.96 ms, total: 58.3 ms
Wall time: 58.7 ms


MapReduce es una técnica esencial para procesar grandes cantidades de datos y que permite una gran cantidad de tareas como contar, buscar,  aprendizaje automático (supervisado y no supervisado), etc. 