Hadoop/MapReduce -- WordCount en Python (Implementación eficiente)
===

**Juan David Velásquez Henao**  
jdvelasq@unal.edu.co   
Universidad Nacional de Colombia, Sede Medellín  
Facultad de Minas  
Medellín, Colombia

---

Haga click [aquí](https://github.com/jdvelasq/big-data-analytics/tree/master/) para acceder al repositorio online.

Haga click [aquí](http://nbviewer.jupyter.org/github/jdvelasq/big-data-analytics/tree/master/) para explorar el repositorio usando `nbviewer`.

---

# Definición del problema

Se desea contar la frecuencia de ocurrencia de palabras en un conjunto de documentos. Debido a los requerimientos de diseño (gran volúmen de datos y tiempos rápidos de respuesta) se desea implementar una arquitectura Big Data. Se desea implementar una **solución computacional eficiente** en **Python**.

A continuación se generarán tres archivos de prueba para probar el sistema.

In [32]:
## Se crea el directorio de entrada
!rm -rf input
!rm -rf output
!mkdir input

In [33]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns 
in data. Especially valuable in areas rich with recorded information, analytics relies 
on the simultaneous application of statistics, computer programming and operations research 
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business 
performance. Specifically, areas within analytics include predictive analytics, prescriptive 
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big 
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, 
marketing optimization and marketing mix modeling, web analytics, call analytics, speech 
analytics, sales force sizing and optimization, price and promotion modeling, predictive 
science, credit risk analysis, and fraud analytics. Since analytics can require extensive 
computation (see big data), the algorithms and software used for analytics harness the most 
current methods in computer science, statistics, and mathematics.

Writing input/text0.txt


In [34]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to 
research potential trends, to analyze the effects of certain decisions or events, or to 
evaluate the performance of a given tool or scenario. The goal of analytics is to improve 
the business by gaining knowledge which can be used to make improvements or changes.

Writing input/text1.txt


In [35]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions 
about the information they contain, increasingly with the aid of specialized systems 
and software. Data analytics technologies and techniques are widely used in commercial 
industries to enable organizations to make more-informed business decisions and by 
scientists and researchers to verify or disprove scientific models, theories and 
hypotheses.

Writing input/text2.txt


# Solución

A continuación se presenta una implementación eficiente en Python para el problema.

In [44]:
%%writefile mapper.py
#! /usr/bin/env python

##
## Esta es la función que mapea la entrada a parejas (clave, valor)
##
import sys


##
## Se usa una clase iterable para implementar el mapper.
##

class Mapper:
    
    def __init__(self, stream):
        ## 
        ## almacena el flujo de entrada como una
        ## variable del objeto
        ##
        self.stream = stream
    
    def emit(self, key, value):
        ##
        ## escribe al flujo estándar de salida
        ##
        sys.stdout.write("{}\t{}\n".format(key, value))
        
        
    def status(self, message):
        ##
        ## imprime un reporte en el flujo de error
        ## no se debe usar el stdout, ya que en este 
        ## unicamente deben ir las parejas (key, value)
        ##
        sys.stderr.write('reporter:status:{}\n'.format(message))

        
    def counter(self, counter, amount=1, group="ApplicationCounter"):
        ## 
        ## imprime el valor del contador
        ##
        sys.stderr.write('reporter:counter:{},{},{}\n'.format(group, counter, amount))
        
    def map(self):

        word_counter = 0
        
        for word in self:
            
            ##
            ## imprime un mensaje indicando la palabra procesada
            ##
            self.status('procesando ' + word)
            
            ##
            ## cuenta la cantidad de palabras procesadas
            ##
            word_counter += 1
            self.counter('num_words', amount=word_counter)

            ##
            ## por cada palabra del flujo de datos
            ## emite la pareja (word, 1)
            ##
            self.emit(key=word, value=1)
            
    def __iter__(self):
        ##
        ## itera sobre cada linea de código recibida
        ## a través del flujo de entrada
        ##
        for line in self.stream:
            ##
            ## itera sobre cada palabra de la línea
            ## (en los ciclos for, retorna las palabras
            ## una a una)
            ##
            for word in line.split():
                ##
                ## retorna la palabra siguiente en el
                ## ciclo for
                ##
                yield word
    

if __name__ == "__main__": 
    ##
    ## inicializa el objeto con el flujo de entrada
    ##
    mapper = Mapper(sys.stdin)
    
    ##
    ## ejecuta el mapper
    ##
    mapper.map()


Overwriting mapper.py


In [45]:
## El programa anterior se hace ejecutable
!chmod +x mapper.py 

In [46]:
## la salida de la función anterior es:
!cat ./input/text*.txt | ./mapper.py | head

reporter:status:procesando Analytics
reporter:counter:ApplicationCounter,num_words,1
reporter:status:procesando is
reporter:counter:ApplicationCounter,num_words,2
reporter:status:procesando the
reporter:counter:ApplicationCounter,num_words,3
reporter:status:procesando discovery,
reporter:counter:ApplicationCounter,num_words,4
reporter:status:procesando interpretation,
reporter:counter:ApplicationCounter,num_words,5
reporter:status:procesando and
reporter:counter:ApplicationCounter,num_words,6
reporter:status:procesando communication
reporter:counter:ApplicationCounter,num_words,7
reporter:status:procesando of
reporter:counter:ApplicationCounter,num_words,8
reporter:status:procesando meaningful
reporter:counter:ApplicationCounter,num_words,9
reporter:status:procesando patterns
reporter:counter:ApplicationCounter,num_words,10
reporter:status:procesando in
reporter:counter:ApplicationCounter,num_words,11
reporter:status:procesando data.
reporter:counter:ApplicationC

El reducer recibe las parejas (key, value) a través del flujo de salida. En los ejemplos anteriores, el reducer verifica si la clave cambia de un elemento al siguiente. Sin embargo, resulta más eficiente que se pueda iterar directamente sobre elementos consecutivos que tienen la misma clave. La función `groupby` de la librería `itertools` permite hacer esto. Dicha función recibe como argumentos los datos y una función que genera la clave para cada dato. Retorna una tupla con la clave y los elementos consecutivos que contienen la misma clave. El siguiente ejemplo permite clarificar su operación.

In [47]:
import itertools

## la letra es la clave y los números son los valores
data = [('A', 1), ('B', 10), ('A', 2), ('A', 3), ('A', 4) , ('B', 20)]

## retorna la parte correspondiente a la clave
def keyfun(x):
    k, v = x
    return k

## itera sobre la clave y los elementos que contiene 
## la misma clave
for key, group in itertools.groupby(data, keyfun):
    print(key)
    for g in group:
        print('   ', g)

A
    ('A', 1)
B
    ('B', 10)
A
    ('A', 2)
    ('A', 3)
    ('A', 4)
B
    ('B', 20)


A continuación se modifica el reducer para incoporar el uso de clases y de la función `groupby`.

In [48]:
%%writefile reducer.py
#!/usr/bin/env python

import sys
import itertools

class Reducer:
    
    def __init__(self, stream):
        self.stream = stream
        
    def emit(self, key, value):
        sys.stdout.write("{}\t{}\n".format(key, value)) 

    def reduce(self):
        ##
        ## Esta función reduce los elementos que 
        ## tienen la misma clave
        ##        
        for key, group in itertools.groupby(self, lambda x: x[0]):
            total = 0
            for _, val in group:
                total += val
            
            self.emit(key=key, value=total)

    def __iter__(self):
        
        for line in self.stream:
            ##
            ## Lee el stream de datos y lo parte 
            ## en (clave, valor)
            ##
            key, val = line.split("\t") 
            val = int(val)
            
            ##
            ## retorna la tupla (clave, valor)
            ## como el siguiente elemento del ciclo for
            ##
            yield (key, val)


if __name__ == '__main__': 
  
    reducer = Reducer(sys.stdin)
    reducer.reduce()


Overwriting reducer.py


In [49]:
!chmod +x reducer.py

In [50]:
## La función sort hace que todos los elementos con 
## la misma clave queden en lineas consecutivas.
## Hace el papel del módulo Shuffle & Sort
!cat ./input/text*.txt | ./mapper.py | sort | ./reducer.py | head

reporter:status:procesando Analytics
reporter:counter:ApplicationCounter,num_words,1
reporter:status:procesando is
reporter:counter:ApplicationCounter,num_words,2
reporter:status:procesando the
reporter:counter:ApplicationCounter,num_words,3
reporter:status:procesando discovery,
reporter:counter:ApplicationCounter,num_words,4
reporter:status:procesando interpretation,
reporter:counter:ApplicationCounter,num_words,5
reporter:status:procesando and
reporter:counter:ApplicationCounter,num_words,6
reporter:status:procesando communication
reporter:counter:ApplicationCounter,num_words,7
reporter:status:procesando of
reporter:counter:ApplicationCounter,num_words,8
reporter:status:procesando meaningful
reporter:counter:ApplicationCounter,num_words,9
reporter:status:procesando patterns
reporter:counter:ApplicationCounter,num_words,10
reporter:status:procesando in
reporter:counter:ApplicationCounter,num_words,11
reporter:status:procesando data.
reporter:counter:ApplicationC

(DA)	1
(see	1
Analytics	2
Analytics,	1
Big	1
Data	2
Especially	1
Organizations	1
Since	1
Specifically,	1


### Ejecución en Hadoop

Una vez se tienen las versiones anteriores funcionando, se puede proceder a ejecutar la tarea directamente en hadoop. 

In [51]:
## Primero se limpian los directorios.
!rm -rf output

In [52]:
##
## Se ejecuta en Hadoop.
##   -input: archivo de entrada
##   -output: directorio de salida
##   -maper: programa que ejecuta el map
##   -reducer: programa que ejecuta la reducción
##
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar -input input -output output  -mapper mapper.py -reducer reducer.py

2018-08-28 22:01:20,005 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-08-28 22:01:20,620 INFO impl.MetricsConfig: loaded properties from hadoop-metrics2.properties
2018-08-28 22:01:20,673 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2018-08-28 22:01:20,673 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2018-08-28 22:01:20,688 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2018-08-28 22:01:20,968 INFO mapred.FileInputFormat: Total input files to process : 3
2018-08-28 22:01:21,018 INFO mapreduce.JobSubmitter: number of splits:3
2018-08-28 22:01:21,167 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local645885619_0001
2018-08-28 22:01:21,169 INFO mapreduce.JobSubmitter: Executing with tokens: []
2018-08-28 22:01:21,265 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2018-08-28 22:01:21,266

2018-08-28 22:01:21,710 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
2018-08-28 22:01:21,711 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
2018-08-28 22:01:21,711 INFO mapred.MapTask: soft limit at 83886080
2018-08-28 22:01:21,711 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
2018-08-28 22:01:21,711 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
2018-08-28 22:01:21,711 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2018-08-28 22:01:21,716 INFO streaming.PipeMapRed: PipeMapRed exec [/Volumes/JetDrive/GitHub/big-data-analytics/./mapper.py]
2018-08-28 22:01:21,724 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
2018-08-28 22:01:21,765 INFO streaming.PipeMapRed: Records R/W=4/1
2018-08-28 22:01:21,768 INFO streaming.PipeMapRed: MRErrorThread done
2018-08-28 22:01:21,768 INFO streaming.PipeMapRed: mapRedFinished
2018-08-28 22:01:21,769 INFO mapred.LocalJobRunner: 
2018-08-28 22:01:2

2018-08-28 22:01:22,275 INFO mapreduce.Job: Job job_local645885619_0001 running in uber mode : false
2018-08-28 22:01:22,276 INFO mapreduce.Job:  map 100% reduce 100%
2018-08-28 22:01:22,277 INFO mapreduce.Job: Job job_local645885619_0001 completed successfully
2018-08-28 22:01:22,285 INFO mapreduce.Job: Counters: 31
	File System Counters
		FILE: Number of bytes read=721072
		FILE: Number of bytes written=2728941
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
	Map-Reduce Framework
		Map input records=24
		Map output records=252
		Map output bytes=2369
		Map output materialized bytes=2891
		Input split bytes=348
		Combine input records=0
		Combine output records=0
		Reduce input groups=159
		Reduce shuffle bytes=2891
		Reduce input records=252
		Reduce output records=159
		Spilled Records=504
		Shuffled Maps =3
		Failed Shuffles=0
		Merged Map outputs=3
		GC time elapsed (ms)=0
		Total committ

In [53]:
## contenido del directorio con los 
## resultados de la corrida
!ls output/

_SUCCESS   part-00000


In [54]:
## se visualiza el archivo con los
## resultados de la corrida
!cat output/part-00000 | head

(DA)	1
(see	1
Analytics	2
Analytics,	1
Big	1
Data	3
Especially	1
Organizations	1
Since	1
Specifically,	1


In [55]:
!rm reducer.py mapper.py
!rm -rf input output

---

Hadoop/MapReduce -- WordCount en Python (Implementación eficiente)
===

**Juan David Velásquez Henao**  
jdvelasq@unal.edu.co   
Universidad Nacional de Colombia, Sede Medellín  
Facultad de Minas  
Medellín, Colombia

---

Haga click [aquí](https://github.com/jdvelasq/big-data-analytics/tree/master/) para acceder al repositorio online.

Haga click [aquí](http://nbviewer.jupyter.org/github/jdvelasq/big-data-analytics/tree/master/) para explorar el repositorio usando `nbviewer`.