# Procesamiento distribuido de datos con *map-reduce*

<hr>

(Este documento es por el momento un borrador de trabajo. No hay inconveniente en difundirlo, pero la condición de borrador se ha de tener en cuenta al emitir valoraciones sobre el mismo, y sobre todo se agradecen sugerencias de todo tipo, en mi dirección de email, *cpareja@ucm.es*.)

## 1. Las piezas básicas: map, reduce, yields, labels

### 1.1 Breve recordatorio de *map*

Queremos calcular los $n$ primeros términos de la sucesión
$a_i=\frac{i^2}{i+1}$, es decir:

$$\sum_{i=0}^{n-1} a_i=\frac{i^2}{i+1}$$

Por ejemplo, para los cinco primeros términos, tendríamos la lista siguiente:

$$[\frac{0}{1}, \frac{1^2}{2}, \frac{2^2}{3}, \frac{3^2}{4}, \frac{4^2}{5}]$$

Podemos hacerlo con la notación intensional o con la función `map`:

In [13]:
def lista_de_terminos(n):
    return [i**2/(i+1) for i in range(n)]

print(lista_de_terminos(5))

def termino(i):
    return i**2/(i+1)

def lista_de_terminos(n):
    return list(map(termino, range(n)))

print(lista_de_terminos(5))

[0.0, 0.5, 1.3333333333333333, 2.25, 3.2]
[0.0, 0.5, 1.3333333333333333, 2.25, 3.2]


### 1.2. Breve recordatorio de *reduce*

Si quisiéramos ahora sumar todos los elementos de una lista, bastaría con *insertar* la operación *suma* entre sus elementos. Esto se puede hacer con la función `reduce`:

In [14]:
from functools import reduce

def suma(a, b):
    return a+b

def sumatorio(lista):
    return reduce(suma, lista)

mis_terminos = lista_de_terminos(5)
mi_sumatorio = sumatorio(mis_terminos)

print(mi_sumatorio)

7.283333333333333


### 1.3. Primera versión de la técnica *map-reduce*

La técnica de map-reduce se basa en resolver un problema mediante la combinación sucesiva de estas dos operaciones, `map` y `reduce`.

**Ejemplo 1.** Por ejemplo, vamos a calcular el siguiente sumatorio mediante *map-reduce*:

$$\sum_{i=0}^{n-1} a_i=\frac{i^2}{i+1}$$

In [22]:
def termino(i):
    return i**2/(i+1)

def suma(a, b):
    return a + b

def formula_sumatorio(n):
    numbers = [i for i in range(1, n+1)]
    terminos = list(map(termino, numbers))
    sumatorio_final = reduce(suma, terminos)
    return terminos, sumatorio_final

terminos, sumatorio = formula_sumatorio(5)
print(terminos)
print(sumatorio)
    


[0.5, 1.3333333333333333, 2.25, 3.2, 4.166666666666667]
11.45


In [23]:
#EJEMPLO a la manera del PROFESOR:
def termino(i):
    return i**2/(i+1)

def suma(a, b):
    return a+b

def sumatorio(n):
    terminos = list(map(termino, range(1, n+1)))
    
    return reduce(suma, terminos)
    
print(sumatorio(5))

11.45


Observa que la clave ha sido encontrar las funciones mapeadora

$$\lambda i \rightarrow a_i=\frac{i^2}{i+1}$$

y la función reductora:

$$\lambda a, b \rightarrow a + b$$

**Ejemplo 2.** Otra situación parecida:
    
$$\prod_{i=1}^{n} a_i= \frac{i^3+1}{i^2+1}$$

In [None]:
def termino(i):
    return (i**3 + 1)/(i**2+1)

def producto(a, b):
    return a * b

def productor_final(n):
    numeros = [i for i in range(1, n+1)]
    print(numeros)
    mapeador = list(map(termino, numeros))
    print(mapeador)
    reductor = reduce(producto, mapeador)
    return reductor


productor_final(5)
 
   

[1, 2, 3, 4, 5]
[1.0, 1.8, 2.8, 3.823529411764706, 4.846153846153846]


93.38823529411765

In [None]:
#Ejemplo a la manera del profesor:

def termino(i):
    return (i**3+1)/(i**2+1)

def prod(a, b):
    return a*b

def productorio(n):
    terminos = map(termino, range(1, n+1))
    return reduce(prod, terminos)
    
print(productorio(5))

93.38823529411765


Observa que la clave ha sido encontrar las funciones mapeadora

$$\lambda i \rightarrow a_i=\frac{i^3+1}{i^2+1}$$

y la función reductora:

$$\lambda a, b \rightarrow a * b$$

### 1.4. Repaso de la instrucción *yield*

La instrucción *return* en una función, devuelve un valor y termina la ejecución de la función. En cambio, una función con *yield* devuelve un generador, y cada vez que se ejecuta una instrucción *yield*, se añade ese valor al generador y la función no termina, hasta que se encuentra un *return*.

Si la función no tuviera return, aparentemente no termina. Pero en realidad, cada valor generado no es evaluado impacientemente, sino "ofrecido", y no se calcula de manera efectiva hasta ser demandado. Ésa es la gracia de los generadores.

In [25]:
def natural_numbers():
    i = 0
    while True:
        yield i
        i = i+1
    
nats = natural_numbers()

for _ in range(5):
    print(next(nats))
    
print(".........")

for _ in range(5):
    print(next(nats))

0
1
2
3
4
.........
5
6
7
8
9


### 1.5. Las etiquetas o claves

Empiezo poniendo un ejemplo muy básico para avanzar otro paso. Deseamos contabilizar los caracteres de un string mediante la técnica map-reduce:

Con frecuencia, conviene separar la entrada de datos en paquetes distintos. Estos paquetes se forman en el *map* y se reciben en *reduce*, ya separados.

Imaginemos que deseamos contabilizar

La instrucción *return* en una función, devuelve un valor y termina la ejecución de la función. En cambio, una función con *yield* devuelve un generador, y cada vez que se ejecuta una instrucción *yield*, se añade ese valor al generador y la función no termina, hasta que se encuentra un *return*.

Si la función no tuviera return, aparentemente no termina. Pero en realidad, cada valor generado no es evaluado impacientemente, sino "ofrecido", y no se calcula de manera efectiva hasta ser demandado. Ésa es la gracia de los generadores.

In [26]:
frase = "Muchos años después, frente al pelotón de fusilamiento..."
frase = "Muchos..."

unos = list(map(lambda letra: 1, frase))
print(unos)

total = reduce(suma, list(unos))
print(total)

[1, 1, 1, 1, 1, 1, 1, 1, 1]
9


Con frecuencia, conviene separar la entrada de datos en paquetes distintos. Estos paquetes se forman en el *map* y se reciben en *reduce*, ya separados.

Imaginemos que deseamos contabilizar por separado las letras y el resto de los caracteres. En lugar de un 1 siempre, podríamos generar ("letra", 1) o ("otros", 1), según el caso. De este modo, podemos luego sumarlos por separado, según sea su clave "letra" u "otros".

In [27]:
frase = "Muchos años después, frente al pelotón de fusilamiento..."

def clase(caracter):
    if caracter.isalpha():
        return "letra", 1
    else:
        return "otros", 1
    
claves_unos = list(map(clase, frase))
print(claves_unos)

[('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('letra', 1), ('otros', 1), ('otros', 1), ('otros', 1)]


In [None]:
# el cuerpo de esta función no es importante ahora. Lo esencial es lo que hace:

def separar(lista_de_pares):
    claves = {k for k, v in lista_de_pares}
    return [(k, [v for (clave, v) in lista_de_pares if k ==clave]) for k in claves]

lista_de_pares = separar(claves_unos)
print(separar(claves_unos))

{'letra', 'otros'}
{'letra', 'otros'}
[('letra', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), ('otros', [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1])]


In [9]:
def reduce_por_claves(lista_de_pares):
    return [(key, sum(valores)) for (key, valores) in lista_de_pares]
            
print(reduce_por_claves(lista_de_pares))

[('otros', 11), ('letra', 46)]


## 2. Ejemplos uniendo todas las piezas básicas


### 2.1 Técnica map-reduce

La resolución de problemas mediante map-reduce, opera así:

1. Toma una lista de elementos para ser procesados. Esos elementos son las líneas de un archivo de texto
2. Por cada elemento un mapper genera una colección de pares clave, valor
3. El sistema se encarga de agrupar esos resultados por claves; de esto no tenemos que preocuparnos
4. La función reduce genera los procesamientos de cada par (clave, lista_de_valores)

### 2.2 Primer ejemplo completo

Veamos cada ítem por separado, en un ejemplo concreto: el cálculo de letras y otros caracteres.

In [29]:
# 1. Veamos el archivo concreto que va a ser procesado:

! type ".\one_hundred_years.txt"

Many years later,
as he faced the firing squad,
Colonel Aureliano Buendia was to remember
that distant afternoon
when his father took him to discover ice.


In [30]:
# 2 y 4. Definición del mapper y el reducer. Lo demás, no hace falta verlo en este momento

! type ".\char_classifier.py"

from mrjob.job import MRJob

class MRWordCount(MRJob):

    def mapper(self, _, linea):
        for caracter in linea:
            if caracter.isalpha():
                yield "letra", 1
            else:
                yield "otros", 1
                
    def reducer(self, key, valores):
        yield key, sum(valores)

if __name__ == '__main__':
    MRWordCount.run()


In [31]:
# Ejecución de este código sobre el archivo de datos:

! python ".\char_classifier.py" ".\one_hundred_years.txt" -q

"letra"	126
"otros"	24


**Observaciones**

1. Es necesario tener instalada la librería *mrjob*. Si no la tienes, abre la consola de anaconda y ejecuta
   lo siguiente:
   
       > pip install mrjob
       
2. El mapper procesa cada línea.

3. No hace falta que defonamos los agrupamientos por etiquetas; esto lo realiza la clase.

4. El reducer procesa cada grupo de valores por cada etiqueta.

5. El funcionamiento de una orden o un comando en el sistema operativo se puede realizar desde el notebook mediante "!".

6. En la ejecución, la opción `-q` significa *quiet*, es decir,
   deseamos que se omitan los mensajes explicativos y advertencias.
   Para ver su efecto, ejecuta la orden anterior suprimiendo esta opción.

In [40]:
def mapper():
    frase= 'Many years later,'
    return list(map(lambda letra: (letra, 1), frase))

def reducer(lista: list):
    return [(key, sum(valores)) for (key, valores) in lista]
        
    
a=mapper()
print(a)
print([(key, sum(valores)) for (key, valores) in a])


[('M', 1), ('a', 1), ('n', 1), ('y', 1), (' ', 1), ('y', 1), ('e', 1), ('a', 1), ('r', 1), ('s', 1), (' ', 1), ('l', 1), ('a', 1), ('t', 1), ('e', 1), ('r', 1), (',', 1)]


TypeError: 'int' object is not iterable

### 2.3 Un segundo ejemplo, típico

Queremos contabilizar cuántas veces aparece el carácter `a` en un texto, cuántas, el `b`, etc.

**Diseño del mapper.** Para cada elemento (una línea) de la lista de entrada, devolverá una lista de pares, clave-valor, que serán cada carácter de la línea y un 1:

    "Many years later,"  →  [("M", 1), ("a", 1), ("n", 1), ("y", 1), ..., ("," , 1)]
    "as he faced the firing squad,"   → [("a", 1), ("s", 1), (" ", 1), ..., ("," ,1)]
    ...

He aquí una posible versión del mapper:

    def mapper(self, _, linea):
        for caracter in linea:
            yield caracter, 1

**Diseño del reducer.** En realidad, el reducer toma todos los valores de cada clave y los suma:

    def reducer(self, key, valores):
        yield key, sum(valores)
        
He aquí nuestro programa:

In [13]:
# Nuestro programa, contador de caracteres:

! type ".\char_count.py"

from mrjob.job import MRJob

class MRCharCount(MRJob):

    def mapper(self, _, linea):
        for caracter in linea:
            yield caracter, 1
                
    def reducer(self, key, valores):
        yield key, sum(valores)

if __name__ == '__main__':
    MRCharCount.run()


In [14]:
#Y ahora vemos el programa en acción sobre el archivo de datos:

! python ".\char_count.py" ".\one_hundred_years.txt" -q

" "	21
","	2
"."	1
"A"	1
"B"	1
"C"	1
"M"	1
"a"	13
"b"	1
"c"	3
"d"	5
"e"	16
"f"	4
"g"	1
"h"	7
"i"	9
"k"	1
"l"	4
"m"	3
"n"	9
"o"	10
"q"	1
"r"	9
"s"	7
"t"	11
"u"	3
"v"	1
"w"	2
"y"	2


### 2.4. Dos ejercicios básicos (resueltos)

**Ejercicio 1** ¿Sabrías diseñar tú un programa que cuenta cuántas líneas hay en un archivo?
(Para una línea dada, la función mapper debería generar un 1, con una etiqueta cualquiera.)

**Ejercicio 2** Hemos diseñado un contador de letras. ¿Sabrías diseñar tú un contador de palabras?
(Para una línea dada, la función mapper debería generar, cada palabra acompañada con un 1.)

In [15]:
#Vemos el contador de líneas en acción sobre el archivo de datos:

! python ".\lines_count.py" ".\one_hundred_years.txt" -q

"lineas"	5


In [16]:
#Y ahora vemos el contador de palabras en acción sobre el archivo de datos:

! python ".\word_count.py" ".\one_hundred_years.txt" -q

"Aureliano"	1
"Buendia"	1
"Colonel"	1
"Many"	1
"afternoon"	1
"as"	1
"discover"	1
"distant"	1
"faced"	1
"father"	1
"firing"	1
"he"	1
"him"	1
"his"	1
"ice."	1
"later,"	1
"remember"	1
"squad,"	1
"that"	1
"the"	1
"to"	2
"took"	1
"was"	1
"when"	1
"years"	1


En efecto, cada palabra aparece una sola vez. En esta versión no se han suprimido los signos de puntuación. Es fácil, con el método `strip(";,.:")`.

Las soluciones pueden verse en la carpeta de archivos `.\01_basic_files`.

### 2.5. Un ejemplo más realista

Tenemos un archivo de datos con información sobre las causas de muerte en cada país del mundo según la causa del deceso.

Mejor es que lo veas por ti mismo, abriendo el archivo,
aunque te proporciono una imagen a continuación del fragmento:

<center>
    <img src=".\causas_de_muerte_fragmento_usa.png" width="750">
</center>

He aquí unas cuantas líneas de dicho archivo. Observa que los campos están separados por comas. 

In [17]:
! type ".\annual-number-of-deaths-by-cause.csv"

"Entity,Code,Year,Number of executions (Amnesty International),Deaths - Road injuries - Sex: Both - Age: All Ages (Number),Deaths - Cirrhosis and other chronic liver diseases - Sex: Both - Age: All Ages (Number),Deaths - Digestive diseases - Sex: Both - Age: All Ages (Number),Deaths - Tuberculosis - Sex: Both - Age: All Ages (Number),Deaths - HIV/AIDS - Sex: Both - Age: All Ages (Number),Deaths - Diarrheal diseases - Sex: Both - Age: All Ages (Number),Deaths - Intestinal infectious diseases - Sex: Both - Age: All Ages (Number),Deaths - Lower respiratory infections - Sex: Both - Age: All Ages (Number),Deaths - Meningitis - Sex: Both - Age: All Ages (Number),Deaths - Drowning - Sex: Both - Age: All Ages (Number),Deaths - Alzheimer disease and other dementias - Sex: Both - Age: All Ages (Number),Deaths - Parkinson disease - Sex: Both - Age: All Ages (Number),Deaths - Alcohol use disorders - Sex: Both - Age: All Ages (Number),Deaths - Drug use disorders - Sex: Both - Age: All Ages (Number)

Mejor es que lo veas por ti mismo, abriendo el archivo,
aunque te proporciono una imagen a continuación del fragmento:

<center>
    <img src=".\causas_de_muerte_fragmento_usa.png" width="750">
</center>

Los campos están separados por comas. La  primera línea es la cabecera.

**Ejercicio**

Queremos saber cuántas muertes ha habido en Estados Unidos (la abreviatura es USA, en el segundo campo de la tabla) debidas a ejecuciones.

**Solución**

Observamos en este archivo que las columnas stán separads por comas; que la segunda columna es el código del país, y que la cuarta representa  el número de muertes que nos da Amnistía Internacional).

O sea, para cada línea, si su segunda columna es el código "USA", debemos que capturar el valor de la cuarta. Por lo tanto, nuestro mapper podría ser el siguiente:

    def mapper(self, _, linea):
        campos = linea.split(";")
        passcodigo = campos[1]
        num_ejecs = int(campos[3])
        if passcodigo == "USA":
            yield "ejecs", num_ejecs
            
Al ejecutarlo, obtenemos fallos, porque algunos valores de la cuarta columna no son números: son datos vacíos, missing, seguramente de países que no ofrecen esta información o en los que no existe la pena de muerte. Para evitar este problema, podemos sustituir la función `int` por una a nuestra medida `to_int`:

    def to_int(cadena):
        try:
            return int(cadena)
        except:
            return 0
            
Eso es todo.

In [3]:
# Veamos nuestro programa en fucionamiento:

! python ".\deadh_cause.py" ".\annual-number-of-deaths-by-cause.csv" -q

"ejecs"	385


In [4]:
# Veamos ahora el código:

! type ".\deadh_cause.py"

from mrjob.job import MRJob

def to_int(cadena):
    try:
        return int(cadena)
    except:
        return 0

class MRCauseOfDeadth(MRJob):

    def mapper(self, _, linea):
        campos = linea.split(";")
        passcodigo = campos[1]
        num_ejecs = to_int(campos[3])
        if passcodigo == "USA":
            yield "ejecs", num_ejecs
               
    def reducer(self, key, valores):
        yield key, sum(valores)

if __name__ == '__main__':
    MRCauseOfDeadth.run()


### 2.6. Parámetros en la línea de comandos

¿Podría servir este programa pra calcular las muertes por ejecución de cualquier país?

Para hacer esto, se tendría que pasar un parámetro que indicara el país. Como se trata de un programa de consola, el parámetro deberá indicarse en la línea de comandos. He aquí el programa en funcionamiento:

In [20]:
! python ".\deadh_cause_country.py" --country=USA ".\annual-number-of-deaths-by-cause.csv" -q

"USA"	385


In [21]:
! python ".\deadh_cause_country.py" --country=ESP ".\annual-number-of-deaths-by-cause.csv" -q

"ESP"	0


In [22]:
#Veamos el código:

! type ".\deadh_cause_country.py"

from mrjob.job import MRJob

def to_int(cadena):
    try:
        return int(cadena)
    except:
        return 0
        
class MRCauseOfDeadth(MRJob):

    def configure_args(self):
        super(MRCauseOfDeadth, self).configure_args()
        self.add_passthru_arg(
            '--country', default='Spain',
            help="Indica el cÃ³digo del paÃ­s.")

    def mapper(self, _, linea):
        campos = linea.split(",")
        cod_pais= campos[1]
        if cod_pais == self.options.country:
                num_ejecs = to_int(campos[3])
                yield cod_pais, num_ejecs
               
    def reducer(self, key, valores):
        yield key, sum(valores)

if __name__ == '__main__':
    MRCauseOfDeadth.run()


Parece que nos hemos sacado de la manga los detalles. En realidad, no es de la manga, sino de la documentación de `mrjob` y de la clase `MRJob` y de las múltiples referencias que pueden encontrarse en Internet. Ejs.:

    https://mrjob.readthedocs.io/en/latest/guides/writing-mrjobs.html
    https://stackoverflow.com/questions/66734113/is-it-possible-to-pass-arguments-to-mr-job   
    
En particular, los detalles técnicos sobre el paso de opciones (como `--country=ESP`) puede completarse con el apartado *Defining command line options* de la primera referencia.

### 2.7. Propiedades matemáticas asumidas

Podemos calcular la suma de una lista de varias maneras, alterando el orden y agrupamiento
de los sumandos:
    
    [1, 2, 3, 4, 5] -> [1, 2, 3], [4, 5] -> [6, 9] -> 15
    [1, 2, 3, 4, 5] -> [1, 2], [3, 4, 5] -> [3, 12] -> 15
    ...

Siempre obtenemos el mismo resultado, porque la suma es conmutativa y asociativa.

Hagamos lo mismo con la media:
    
    [1, 2, 3, 4, 5] -> [1, 2, 3], [4, 5] -> [2, 4.5] -> 3.25
    [1, 2, 3, 4, 5] -> [1, 2], [3, 4, 5] -> [1.5, 4] -> 2.75

Se obtienen resultados distintos agrupando en distintos órdenes porque la media no es asociativa.

El modelo *map-reduce* está pensado para procesar grandes volúmenes de datos en un orden arbitrario y agrupando los resultados de forma arbitraria, según vayan siendo generados los paquetes de trabajo por distintas máquinas o procesadores. Esto puede no percibirse con datos pequeños o cuando el procesamiento es en un solo ordenador, pero conviene saber que la función `reduce` tiene que estar basada en una operación binaria asociativa y conmutativa. Y la media no lo es.

Pongamos ahora que deseamos calcular la media de una gran cangtidad de números. Ponemos una solución inicial errónea, aunque se trata de un error muy frecuente y que pasa desapercibido con cantidades pequeñas de datos, y luego comentamos una solución correcta.

**Nota.** Esta solución está explicada y con ejemplos en la carpeta aparte siguiente:
`ej3 - media - postprocesamiento`