Hadoop/MapReduce -- WordCount en Python (Hadoop Streaming)
===

* *30 min* | Última modificación: Noviembre 05, 2019

## Definición del problema

Se desea contar la frecuencia de ocurrencia de palabras en un conjunto de documentos. Debido a los requerimientos de diseño (gran volúmen de datos y tiempos rápidos de respuesta) se desea implementar una arquitectura Big Data. Se desea implementar la solución en **Python** y ejecutar Hadoop en **modo streamming**.

## Solución

A continuación se usa la opción streaming de Hadoop. Se el tutorial anterior como punto de inicio.

### Preparación

#### Inicio de la máquina virtual

Si usa linux o macOS puede pasar directamente al siguiente paso. Inicie la VM con:

```bash
vagrant up
```

y luego vaya a la carpeta de trabajo:

```
cd /vagrant
```


#### Ejecución del contendor de Docker

Si va a iniciar el contendor de Hadoop en la carpeta compartida con su máquina local use:

```
docker run --rm -it -v "$PWD":/datalake --name hadoop -p 50070:50070 -p 8088:8088 -p 8888:8888 -p 5000:5000 jdvelasq/hadoop:2.8.5-pseudo
```

Si desea iniciar la sesión en el `datalake` use:

```
docker run --rm -it -v datalake:/datalake --name hadoop -p 50070:50070 -p 8088:8088 -p 8888:8888 -p 5000:5000 jdvelasq/hadoop:2.8.5-pseudo
```


Si un contenedor ya se está ejecutando puede abrir un nuevo terminal con:

```
docker exec -it hadoop bash
```

#### Archivos de prueba

A continuación se generarán tres archivos de prueba para probar el sistema. Puede usar directamente comandos del sistema operativo en el Terminal y el editor de texto `pico` para crear los archivos.

In [1]:
## Se crea el directorio de entrada
!rm -rf input output
!mkdir input

In [2]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns 
in data. Especially valuable in areas rich with recorded information, analytics relies 
on the simultaneous application of statistics, computer programming and operations research 
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business 
performance. Specifically, areas within analytics include predictive analytics, prescriptive 
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big 
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, 
marketing optimization and marketing mix modeling, web analytics, call analytics, speech 
analytics, sales force sizing and optimization, price and promotion modeling, predictive 
science, credit risk analysis, and fraud analytics. Since analytics can require extensive 
computation (see big data), the algorithms and software used for analytics harness the most 
current methods in computer science, statistics, and mathematics.

Writing input/text0.txt


In [3]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to 
research potential trends, to analyze the effects of certain decisions or events, or to 
evaluate the performance of a given tool or scenario. The goal of analytics is to improve 
the business by gaining knowledge which can be used to make improvements or changes.

Writing input/text1.txt


In [4]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions 
about the information they contain, increasingly with the aid of specialized systems 
and software. Data analytics technologies and techniques are widely used in commercial 
industries to enable organizations to make more-informed business decisions and by 
scientists and researchers to verify or disprove scientific models, theories and 
hypotheses.

Writing input/text2.txt


### Prueba de la implementación fuera de Hadoop

Para realizar la implementación solicitada se usará Hadoop Streaming, el cual permite la implementación de aplicaciones MapReduce en cualquier lenguaje que soporte la creación de scripts. Para este caso se ejemplificará la implementación en Python. Hadoop streaming debe su nombre a que usa los streamings de Unix para la entrada y salida en texto (stdin, stdout y stderr respectivamente). El sistema lee y escribe líneas de texto una a una a los respectivos flujos de entrada y salida, tal como ocurre de forma estándar en las herramientas de la línea de comandos de Linux. 

La implementación requiere un programa para realizar el mapeo y otro para la reducción que se ejecutan independientemente; Hadoop se encarga de la coordinación entre ellos. El intercambio de información se hace a traves de texto, que es el lenguaje universal para intercambio de información. 

#### Paso 1

Se implementa la función map en Python y se guarda en el archivo `mapper.py`.

In [5]:
%%writefile mapper.py
#! /usr/bin/python3

##
## Esta es la funcion que mapea la entrada a parejas (clave, valor)
##
import sys
if __name__ == "__main__": 
    
    ##
    ## itera sobre cada linea de codigo recibida
    ## a traves del flujo de entrada
    ##
    for line in sys.stdin:
        
        ##
        ## genera las tuplas palabra \tabulador 1
        ## ya que es un conteo de palabras
        ##
        for word in line.split(): 
                   
            ##
            ## escribe al flujo estandar de salida
            ##
            sys.stdout.write("{}\t1\n".format(word))
            

Writing mapper.py


#### Paso 2

In [6]:
## El programa anterior se hace ejecutable
!chmod +x mapper.py 

In [7]:
## la salida de la función anterior es:
!cat ./input/text*.txt | ./mapper.py | head

Analytics	1
is	1
the	1
discovery,	1
interpretation,	1
and	1
communication	1
of	1
meaningful	1
patterns	1


### Paso 3

Se implementa la función `reduce` y se salva en el archivo `reducer.py`.

In [8]:
%%writefile reducer.py
#! /usr/bin/python3

import sys

##
## Esta funcion reduce los elementos que 
## tienen la misma clave
##

if __name__ == '__main__': 
  
    curkey = None
    total = 0
    
    ##
    ## cada linea de texto recibida es una 
    ## entrada clave \tabulador valor
    ##
    for line in sys.stdin:
        
        key, val = line.split("\t") 
        val = int(val)
        
        if key == curkey: 
            ##
            ## No se ha cambiado de clave. Aca se 
            ## acumulan los valores para la misma clave.
            ##
            total += val  
        else:
            ##
            ## Se cambio de clave. Se reinicia el
            ## acumulador.
            ##
            if curkey is not None:
                ##
                ## una vez se han reducido todos los elementos
                ## con la misma clave se imprime el resultado en
                ## el flujo de salida
                ##
                sys.stdout.write("{}\t{}\n".format(curkey, total)) 
            
            curkey = key
            total = val
            
    sys.stdout.write("{}\t{}\n".format(curkey, total)) 

Writing reducer.py


#### Paso 4

In [9]:
## El archivo se hace ejecutable
!chmod +x reducer.py

#### Paso 5

Se realiza la prueba de la implementación en el directorio actual antes de realizar la ejecución en el modo pseudo-distribuido

In [10]:
##
## La función sort hace que todos los elementos con 
## la misma clave queden en lineas consecutivas.
## Hace el papel del módulo Shuffle & Sort
##
!cat ./input/text*.txt | ./mapper.py | sort | ./reducer.py | head

(DA)	1
(see	1
Analytics	2
Analytics,	1
Big	1
Data	3
Especially	1
Organizations	1
Since	1
Specifically,	1


### Ejecución en Hadoop en modo pseudo-distribuido

Una vez se tienen las versiones anteriores funcionando, se puede proceder a ejecutar la tarea directamente en Hadoop. 

Verifique que Hadoop se esta ejecutando correctamente. Vaya a las direcciones:

* NameNode: http://127.0.0.1:50070/

* Yarn ResourceManager: http://127.0.0.1:8088/

#### Paso 1. Movimiento de los datos al HDFS

Se copian los archivos de entrada del sistema local (o del datalake) al HDFS.

In [11]:
!hadoop fs -mkdir input
!hadoop fs -copyFromLocal input/* input
!hadoop fs -ls input/*

-rw-r--r--   1 root supergroup       1093 2019-11-14 22:06 input/text0.txt
-rw-r--r--   1 root supergroup        352 2019-11-14 22:06 input/text1.txt
-rw-r--r--   1 root supergroup        440 2019-11-14 22:06 input/text2.txt


**Vaya a la pantalla de monitoreo del NameNode (http://127.0.0.1:50070/) y ubique los archivos en el HDFS.**

La gestión de archivos entre el sistema local y el HDFS se realiza mediante comandos similares a los del sistema operativo Unix en Terminal. A continuación se resumen los principales comandos.

* `hadoop fs -help`:  Imprime la ayuda en pantalla para todos los comandos.


**Comandos para la gestion de directorios y archivos.**


* `hadoop fs -ls <path>`


* `hadoop fs -mkdir <path>`


* `hadoop fs -rmdir <path>`


* `hadoop fs -cp <src> <dest>`


* `hadoop fs -mv <src> <dest>`


* `hadoop fs -rm <path>`


* `hadoop fs -cat <path>`


* `hadoop fs -head <path>`


* `hadoop fs -tail <path>`


* `hadoop fs -text <path>`. Imprime el arachivo en `<path>` y lo imprime en formato texto. Soporta archivos zip, TextRecordInputStream y Avro.


* `hadoop fs -stat <path>`: Imprime estadísticos de `<path>`.


**Transferencia de información entre el sistema local y el HDFS**.


* `hadoop fs -get <src> <localdest>` / `hadoop fs -copyToLocal <src> <localdest>`. Copia el contenido de `<src>` en el HDFS en `<localdest>` en el sistema local.


* `hadoop fs -put <localsrc> <dest>` / `hadoop fs -copyFromLocal <src> <localdest>`. Copia el contenido de `<localsrc>` en el sistema local a `<dest>` en el HDFS.


* `hadoop fs -count <path>`. Cuenta el número de directorios, archivos y bytes en `<path>`.


* `hadoop fs -appendToFile <localsrc> <dest>`: pega al final de `<dest>` el contenido de los archivos en `<localsrc>`.



#### Paso 2

Se ejecuta Hadoop Streaming en modo pseudo-distribuido. 

In [12]:
##
## Se ejecuta en Hadoop.
##   -files: archivos a copiar al hdfs
##   -input: archivo de entrada
##   -output: directorio de salida
##   -file: archivos a copiar de la maquina local al hdfs
##   -maper: programa que ejecuta el map
##   -reducer: programa que ejecuta la reducción
##
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar -files mapper.py,reducer.py  -input input  -output output -mapper mapper.py -reducer reducer.py

packageJobJar: [/tmp/hadoop-unjar8434641534785643469/] [] /tmp/streamjob2548476558488568105.jar tmpDir=null


Alternativamente, podria usar:

```
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
   -files mapper.py,reducer.py  \
   -input input \
   -output output \
   -mapper 'python3 $PWD/mapper.py' \
   -reducer 'python3 $PWD/reducer.py'

```

In [13]:
## contenido del directorio con los 
## resultados de la corrida
!hadoop fs -ls output

Found 2 items
-rw-r--r--   1 root supergroup          0 2019-11-14 22:07 output/_SUCCESS
-rw-r--r--   1 root supergroup       1649 2019-11-14 22:07 output/part-00000


In [14]:
## se visualiza el archivo con los
## resultados de la corrida
!hadoop fs -cat output/part-00000 | head

(DA)	1
(see	1
Analytics	2
Analytics,	1
Big	1
Data	3
Especially	1
Organizations	1
Since	1
Specifically,	1


#### Paso 3 (Movimiento de los resultados al sistema local)

In [15]:
!hadoop fs -copyToLocal output output
!ls output/*

output/_SUCCESS  output/part-00000


#### Paso 4 (Limpieza del hdfs)

In [16]:
## Se elimina el directorio de salida en el hdfs si existe
!hadoop fs -rm input/*
!hadoop fs -rm output/*
!hadoop fs -rmdir input output

Deleted input/text0.txt
Deleted input/text1.txt
Deleted input/text2.txt
Deleted output/_SUCCESS
Deleted output/part-00000


### Limpieza de la máquina local

In [17]:
!rm reducer.py mapper.py
!rm -rf input output

## Notas.

**Combiners.--** Los combiners son *reducers* que se ejecutan sobre los resultdos que produce cada mapper antes de pasar al modulo de suffle-&-sort, con el fin de reducir la carga computacional. Suelen ser identicos a los *reducers*. Una llamada típica sería:


     $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
        -input input \
        -output output  \
        -mapper mapper.py \
        -reducer reducer.py \
        -combiner combiner.py


**Partitioners.--** Son rutinas que controlan como se enviar las parejas (clave, valor) a cada reducers, tal que elementos con la misma clave son enviados al mismo reducer. 

**Job Chain.--** Se refiere al encadenamiento de varias tareas cuando el cómputo que se desea realizar es muy complejo para que pueda realizarse en un MapReduce.