# Map Reduce

## Procesamiento de información con MAP-REDUCE

### Un ejemplo
Tenemos un log de la siguiente manera

    2012-01-01 09:08 BOG Libros 88.56 Discover
    2012-01-01 09:52 BGA Libros 62.41 Discover
    2012-01-01 10:08 MED Musica 93.37 Visa
    2012-01-01 10:58 MED Musica 395.93 Discover
    2012-01-01 14:38 BOG Musica 113.24 MasterCard
    2012-01-01 14:44 BGA Libros 290.5 Visa
    2012-01-01 16:26 MED Musica 246.12 Efectivo

Con el `dia`, `hora`, `ciudad`, `producto`, `importe` y `medio de pago` de compras realizadas en almacenes de una cierta cadena. Queremos obtener el total del importe de compras realizadas en cada ciudad. Una forma de hacerlo es la siguiente:

1) procesamos cada linea y, de cada una, generamos un par `(ciudad, importe)`, obteniendo los siguientes pares:

    (BOG, 88.56)
    (BGA, 62.41)
    (MED, 93.37)
    (MED, 395.93)
    (BOG, 113.24)
    (BGA, 290.5)
    (MED, 246.12)
    
2) agrupamos las tuplas generadas por el valor del primer componente:

    (BOG, [88.56, 113.24])
    (BGA, [62.41, 290.5])
    (MED, [93.37, 395.93, 246.12])
     
3) sumamos los elementos de cada lista para cada ciudad:

    (BOG, 201.8)
    (BGA, 352.91)
    (MED, 735.42)
    



### Tres fases

El primer paso en el ejemplo anterior se denomina **MAP** y, para cada registro de entrada, genera una tupla de formato `(clave, valor)`.

El segundo paso se denomina **SHUFFLE** y lo que hace es recopilar todas las tuplas generadas en el MAP anterior de cada `clave` y construir una lista con todos los valores generados. Es decir, una tupla de formato `(clave, lista_de_valores)` para cada clave.

El tercer paso se denomina **REDUCE** y, para cada clave, agrega los resultados de la lista generada en el SHUFFLE anterior.

Esta forma de procesar los datos constituye un **modelo de programación** llamado *MAP-REDUCE* y que está implementado por varios frameworks de programación y en varios lenguajes, de forma que el programador **solo implementa las funciones MAP y REDUCE** y el framework se encarga del shuffle.

### mr-job


Usaremos el framework [mr-job](https://pythonhosted.org/mrjob) para hacer nuestros programas map-reduce. El siguiente código implementa el proceso que acabamos de describir. Fíjate cómo sólo programamos las funciones `mapper` y `reducer`.

In [11]:
!pip install mrjob
!mkdir -p files



[33mYou are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [1]:
%%writefile files/mr-basico.py
from mrjob.job import MRJob
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        yield tokens[3], float(tokens[4])

    def reducer(self, key, values):
        yield key, sum(values)
        
if __name__ == '__main__':
    c = Compras()
    c.run()


Writing files/mr-basico.py


In [2]:
%%script python files/mr-basico.py -r inline --quiet
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo

"Musica"	848.66
"Libros"	441.47


_Prueba a eliminar el término_ `--quiet` _de la celda anterior y así verás los mensajes que emite **mr-job** durante la ejecución del programa_. <font style="color: red"/>**Tendrás que hacer esto si tus programas map-reduce no funcionan como esperado y así ver los mensajes de error del código**

### Otro ejemplo

En este ejemplo partimos de una tuplas (personaA, personaB) que representa que la personaA considera a la personaB como amiga. La relación no es simétrica. El siguiente programa cuenta cuantas amigos considera cada persona que tiene.

Fíjate que la entrada es en formato JSON y cómo usamos la librería `json` de Python para convertir la entrada JSON en una lista de valores.

In [3]:
%%writefile files/mr-amigos.py
from mrjob.job import MRJob
import json

class Amigos(MRJob):

    def mapper(self, _, line):
        record = json.loads(line)
        yield record[0], 1

    def reducer(self, key, values):
        yield key, sum(values)
        
if __name__ == '__main__':
    c = Amigos()
    c.run()


Writing files/mr-amigos.py


In [4]:
%%script python files/mr-amigos.py -r inline --quiet
["juan", "pepe"]
["juan", "sebastian"]
["raul", "pepe"]
["ana", "pepe"]
["juan", "ana"]
["ana", "pedro"]

"juan"	3
"ana"	2
"raul"	1


## Instrumentación y runners

Usamos la instrumentación para encontrar errores, entender nuestro código, etc. Por ahora usamos `print` para saber cuantas veces se llama a cada función. Fíjate como el `reducer` se llama una vez por cada clave distinta que generamos en el `mapper`. Cuando usemos varios procesos o máquinas tendremos que recurrir a otros tipos de instrumentación.

Fíjate también que `values` en la función `reducer` es un **generador** [[ref](http://www.jeffknupp.com/blog/2013/04/07/improve-your-python-yield-and-generators-explained/)], es decir, no contiene en sí una lista de valores, sino que cada vez que va devolviendo valores uno a uno hasta que se acaban. Típicamente esto sucede porque va obteniendo los valores que devuelve de un _stream_ de entrada como un fichero, o una conexión remota. Por esto sólo podemos iterar sobre todos los valores **una única vez**.

Como ahora en el `reducer` queremos obtenter ambos el número de valores que tenemos y la suma de los mismos, tenemos que modificar nuestro código.

In [5]:
%%writefile files/mr-basico-instrumentado.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):
    def mapper(self, _, line):
        tokens = line.split()
        print "mapper >>", line
        yield tokens[2], float(tokens[4])
        
    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        print "reducer >>", key, "number of values", n_values
        yield key, sum_values
        
if __name__ == '__main__':
    c = Compras()
    c.run()

Writing files/mr-basico-instrumentado.py


In [13]:
%%script python files/mr-basico-instrumentado.py --quiet
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo

mapper >> 2012-01-01 09:08 BOG Libros 88.56 Discover
mapper >> 2012-01-01 09:52 BGA Libros 62.41 Discover
mapper >> 2012-01-01 10:08 MED Musica 93.37 Visa
mapper >> 2012-01-01 10:58 MED Musica 395.93 Discover
mapper >> 2012-01-01 14:38 BOG Musica 113.24 MasterCard
mapper >> 2012-01-01 14:44 BGA Libros 290.5 Visa
mapper >> 2012-01-01 16:26 MED Musica 246.12 Efectivo
reducer >> BGA number of values 2
reducer >> BOG number of values 2
reducer >> MED number of values 3
"BOG"	201.8
"BGA"	352.90999999999997
"MED"	735.4200000000001


### Local runner

Los programas en **mr-job** pueden ejecutarse en distintos **runners**. Fíjate en la opción `-r` cuando ejecutamos nuestro código. Los runners posibles son los siguientes:

- **inline**: todos los `mapper` y `reducer` corren en el mismo proceso. Esta opción es útil para empezar a desarrollar un código y verificarlo funcionalmente.

- **local**: cada `mapper` y `reducer` corren en distintos procesos independientes en la misma máquina. Con esta opción podemos hacer una primera simulación de nuetro código en un entorno distribuido.

- **hadoop**: nuestro código se ejecuta en un cluster Hadoop

Instrumentar código con un local runner o en Hadoop ya no es tan fácil porque cada ejecución de las funciones `mapper` y `reducer` se hace en procesos o en máquinas distintas. Si no tenemos un mecanismo para recoger y coordinar la instrumentación generada en los distintos procesos o máquinas perderemos al menos parte de ella.


In [14]:
%%script python files/mr-basico-instrumentado.py -r local --quiet
2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 14:38 BOG Musica 113.24 MasterCard
2012-01-01 14:44 BGA Libros 290.5 Visa
2012-01-01 16:26 MED Musica 246.12 Efectivo

Distintos frameworks usan distintos mecanismos para instrumentar el código distribuido. En este caso (mrjob/Hadoop) usamos los `counters`, y el framework asegura un estado global de los mismos de manera consistente. Ahora necesitamos mostrar los mensajes de salida del framework para ver el valor final de los contadores.

Usamos a partir de ahora el fichero `data/compras.txt` que tiene 30 registros con el mismo formato que el usado hasta ahora.

In [15]:
!cat data/compras.txt

2012-01-01 09:08 BOG Libros 88.56 Discover
2012-01-01 09:09 BGA Libros 337.71 Efectivo
2012-01-01 09:52 BGA Libros 450.33 MasterCard
2012-01-01 09:52 BGA Libros 62.41 Discover
2012-01-01 10:08 MED Musica 93.37 Visa
2012-01-01 10:22 BGA Musica 369.94 MasterCard
2012-01-01 10:58 MED Musica 119.12 Efectivo
2012-01-01 10:58 MED Musica 395.93 Discover
2012-01-01 11:01 MED Musica 476.82 Efectivo
2012-01-01 11:15 MED Musica 22.34 Visa
2012-01-01 11:31 BGA Musica 114.03 Efectivo
2012-01-01 11:36 BOG Musica 296.76 Discover
2012-01-01 11:52 BGA Musica 347.24 Visa
2012-01-01 12:01 MED Libros 154.86 Discover
2012-01-01 12:08 MED Libros 391.65 Visa
2012-01-01 12:19 BOG Libros 165.05 Efectivo
2012-01-01 12:19 BOG Libros 293.76 Discover
2012-01-01 12:48 MED Libros 212.47 MasterCard
2012-01-01 12:55 BOG Libros 352.38 Discover
2012-01-01 13:04 BOG Musica 303.96 MasterCard
2012-01-01 13:12 MED Musica 429.44 Efectivo
2012-01-01 13:44 BOG Libros 249.6 MasterCard
2012-01-01 14:20 BOG 

In [16]:
%%writefile files/mr-basico-instrumentado-counters.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        self.increment_counter("llamadas al map", "numero", 1)
        yield tokens[2], float(tokens[4])
        
    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        self.increment_counter("longitud valores reduce", key, n_values )
        yield key, sum_values
        
if __name__ == '__main__':
    c = Compras()
    c.run()


Overwriting files/mr-basico-instrumentado-counters.py


In [17]:
%%script python files/mr-basico-instrumentado-counters.py -r local data/compras.txt
--

"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
"MED"	4634.3099999999995


No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/mr-basico-instrumentado-counters.user.20180525.103614.549432
Running step 1 of 1...

Counters: 4
	llamadas al map
		numero=30
	longitud valores reduce
		BGA=9
		BOG=10
		MED=11

Streaming final output from /tmp/mr-basico-instrumentado-counters.user.20180525.103614.549432/output...
Removing temp directory /tmp/mr-basico-instrumentado-counters.user.20180525.103614.549432...


podemos también ejecutar el mismo código, capturar el `stderr` y quedarnos con lo que nos interesa 

In [18]:
%%script --err err python files/mr-basico-instrumentado-counters.py -r local data/compras.txt
--

"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
"MED"	4634.3099999999995


In [19]:
def print_stderr(err):
    for line in err.split("\n"):
        if not any (x in line for x in ["configs", "writing", "creating", "python", 
                                        "sort", "Moving", "Streaming", "removing"]):
            print line
            
print_stderr(err)

Creating temp directory /tmp/mr-basico-instrumentado-counters.user.20180525.103619.530495
Running step 1 of 1...

Counters: 4
	llamadas al map
		numero=30
	longitud valores reduce
		BGA=9
		BOG=10
		MED=11

Removing temp directory /tmp/mr-basico-instrumentado-counters.user.20180525.103619.530495...



### Controlando el número de mappers y reducers
Ahora cambiamos la instrumentación para que cuente el número de llamadas de cada función por cada proceso. Puedes controlar el número de mappers y reducers al ejecutar tu programa como se indica abajo. Prueba a ejecutar el código con distintos números de mappers.

In [20]:
%%writefile files/mr-basico-instrumentado-counters-tasks.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        self.increment_counter("map: proceso "+str(os.getpid()), "numero", 1)
        yield tokens[2], float(tokens[4])
        
    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        self.increment_counter("longitud valores reduce: proceso "+str(os.getpid()), key, n_values )
        yield key, sum_values
        
        
if __name__ == '__main__':
    c = Compras()
    c.run()

Writing files/mr-basico-instrumentado-counters-tasks.py


_la linea entera que se ejecuta a continuación es la siguiente_

    python files/mr-basico-instrumentado-counters-tasks.py 
           -r local 
           --jobconf mapred.map.tasks=5 
           --jobconf mapred.reduce.tasks=1 
           data/compras.txt

In [21]:
%%script --err err python files/mr-basico-instrumentado-counters-tasks.py -r local --jobconf mapred.map.tasks=5 --jobconf mapred.reduce.tasks=1 data/compras.txt
--

"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
"MED"	4634.3099999999995


In [22]:
print_stderr(err)

Creating temp directory /tmp/mr-basico-instrumentado-counters-tasks.user.20180525.103630.614323
Running step 1 of 1...

Counters: 7
	longitud valores reduce: proceso 886
		BGA=9
	longitud valores reduce: proceso 887
		BOG=10
	longitud valores reduce: proceso 892
		MED=11
	map: proceso 865
		numero=8
	map: proceso 866
		numero=8
	map: proceso 871
		numero=8
	map: proceso 872
		numero=6

Removing temp directory /tmp/mr-basico-instrumentado-counters-tasks.user.20180525.103630.614323...



## Combiners

los combiners permiten resumir los datos que emite cada proceso map antes de que lleguen al reduce y así reducir el tráfico de red que sale de los procesos map y que entra en los reducers. Los combiners se ejecutan en la misma máquina que el map, alimentándose directamente de la salida de éste. Típicamente un combiner tiene la misma implementación que el `reducer` si es que éste **es asociativo**. Si no, puede tener otra implementación particular. Fíjate en la relación entre el número de procesos map y el número de valores que le entran a cada reduce.

In [23]:
%%writefile files/mr-basico-combiners.py
from mrjob.job import MRJob
from sys import stdin
import os

class Compras(MRJob):

    def mapper(self, _, line):
        tokens = line.split()
        self.increment_counter("map: proceso "+str(os.getpid()), "numero", 1)
        yield tokens[2], float(tokens[4])
        
    def combiner(self, key, values):
        yield key, sum(values)
        
    def reducer(self, key, values):
        n_values, sum_values = 0,0
        for i in values:
            n_values += 1
            sum_values += i
        self.increment_counter("longitud valores reduce: proceso "+str(os.getpid()), key, n_values )
        yield key, sum_values
        
if __name__ == '__main__':
    c = Compras()
    c.run()

Writing files/mr-basico-combiners.py


In [24]:
%%script --err err python files/mr-basico-combiners.py -r local --jobconf mapred.map.tasks=4 data/compras.txt
--

"BOG"	2008.5699999999997
"BGA"	2044.9899999999998
"MED"	4634.3099999999995


In [25]:
print_stderr(err)

Creating temp directory /tmp/mr-basico-combiners.user.20180525.103636.565839
Running step 1 of 1...

Counters: 7
	longitud valores reduce: proceso 948
		BGA=4
	longitud valores reduce: proceso 949
		BOG=4
	longitud valores reduce: proceso 955
		MED=4
	map: proceso 907
		numero=8
	map: proceso 908
		numero=8
	map: proceso 921
		numero=8
	map: proceso 922
		numero=6

Removing temp directory /tmp/mr-basico-combiners.user.20180525.103636.565839...

