# Word Count en Java - HADOOP
## Definición del problema
Se desea contar la frecuencia de ocurrencia de palabras en un conjunto de documentos. Debido a los requerimientos de diseño (gran volúmen de datos y tiempos rápidos de respuesta) se desea implementar una arquitectura Big Data. **Se desea probar el código en la maquina local. El código debe ser escrito en Java.**

A continuación se generarán tres archivos de prieba para probar el sistema

* Preparación del directorio de trabajo

In [31]:
!pwd

/mnt/e/Notas de clase/Ciencia de los grandes datos


In [32]:
!rm -rf input output*
!mkdir input

In [33]:
%%writefile input/text0.txt
Analytics is the discovery, interpretation, and communication of meaningful patterns 
in data. Especially valuable in areas rich with recorded information, analytics relies 
on the simultaneous application of statistics, computer programming and operations research 
to quantify performance.

Organizations may apply analytics to business data to describe, predict, and improve business 
performance. Specifically, areas within analytics include predictive analytics, prescriptive 
analytics, enterprise decision management, descriptive analytics, cognitive analytics, Big 
Data Analytics, retail analytics, store assortment and stock-keeping unit optimization, 
marketing optimization and marketing mix modeling, web analytics, call analytics, speech 
analytics, sales force sizing and optimization, price and promotion modeling, predictive 
science, credit risk analysis, and fraud analytics. Since analytics can require extensive 
computation (see big data), the algorithms and software used for analytics harness the most 
current methods in computer science, statistics, and mathematics.

Writing input/text0.txt


In [34]:
%%writefile input/text1.txt
The field of data analysis. Analytics often involves studying past historical data to 
research potential trends, to analyze the effects of certain decisions or events, or to 
evaluate the performance of a given tool or scenario. The goal of analytics is to improve 
the business by gaining knowledge which can be used to make improvements or changes.

Writing input/text1.txt


In [35]:
%%writefile input/text2.txt
Data analytics (DA) is the process of examining data sets in order to draw conclusions 
about the information they contain, increasingly with the aid of specialized systems 
and software. Data analytics technologies and techniques are widely used in commercial 
industries to enable organizations to make more-informed business decisions and by 
scientists and researchers to verify or disprove scientific models, theories and 
hypotheses.

Writing input/text2.txt


## Implementación de una solución Map-Reduce implementada en Hadoop y Java
Para esto, solo se requiere crear las funciones para mapear (mapper) y para reducer (reduce)

In [7]:
%%writefile WordCount.java

import java.io.IOException;

/*
 * Esta clase permite separar una frase (texto)
 * en las palabras que lo conforman. La lista
 * resultante puede ser iterada en un ciclo for
 */
import java.util.StringTokenizer;

/*
 *
 * Librerias requeridas para ejecutar Hadoop
 *
 */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * Esta clase implementa el mapper y el reducer
 */
public class WordCount {
  
  /* 
   * Esta es la clase que se encargará de hacer el mapeo de las
   * palabras. Para ello necesita que tenga una funcion que se
   * llame "map"
   */
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
       
    // Implenteacion de un entero optimizado para hadoop
    private final static IntWritable one = new IntWritable(1);

    /* 
     * en esta variable se guarda cada palabra leida        
     * del flujo de entrada
     */     
    private Text word = new Text();

    /* 
     * Este es el mapper. Para cada palabra 
     * leída, emite el par <word, 1>
     */
    public void map(Object key,       // Clave
                    Text value,       // La linea de texto
                    Context context   // Aplicación que se esta ejecutando
                    ) throws IOException, InterruptedException {
                              
      // Convierte la línea de texto en una lista de strings
      StringTokenizer itr = new StringTokenizer(value.toString());
                              
      // Ejecuta el ciclo para cada palabra 
      // de la lista de strings
      while (itr.hasMoreTokens()) {
        // obtiene la palabra
        word.set(itr.nextToken());

        // escribe la pareja <word, 1> 
        // al flujo de salida
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
           
    // Clase para imprimir un entero al flujo de salida       
    private IntWritable result = new IntWritable();

    // Esta función es llamada para reducir 
    // una lista de valores que tienen la misma clave
    public void reduce(Text key,                      // clave
                       Iterable<IntWritable> values,  // lista de valores
                       Context context                // Aplicación que se esta ejecutando
                       ) throws IOException, InterruptedException {
        
      // itera sobre la lista de valores, sumandolos
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
        
      // escribe la pareja <word, valor> al flujo
      // de salida
      context.write(key, result);
    }
  }

    
  /*
   * Se crea la aplicación en Hadoop y se ejecuta
   */
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    
    /*
     * El job corresponde a la aplicacion
     */
    Job job = Job.getInstance(conf, "word count");
      
    /*
     * La clase que contiene el mapper y el reducer
     */
    job.setJarByClass(WordCount.class);
      
    /* 
     * Clase que implementa el mapper  
     */ 
    job.setMapperClass(TokenizerMapper.class);
      
    /*
     * El combiner es un reducer que se coloca a la salida
     * del mapper para agilizar el computo
     */
    job.setCombinerClass(IntSumReducer.class);
    
    /*
     * Clase que implementa el reducer
     */
    job.setReducerClass(IntSumReducer.class);
      
    /*
     * Salida
     */
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    /*
     * Formatos de entrada y salida
     */
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
   
    // resultado de la ejecución.
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Writing WordCount.java


* Luego lo siguiente que se debe hacer es compilar el programa, de manera que se pueda ejecutar, para esto utilizamos el mismo hadoop, pasandole primero como clase el compilador de java "javac" y la clase a compilar. *(Si les lanza un error de **bad substitution** hacer caso omiso de esto, sin embargo investigar por qué lo lanza)*

In [8]:
!hadoop com.sun.tools.javac.Main WordCount.java

/home/wm/hadoop-3.1.1/libexec/hadoop-functions.sh: line 2358: HADOOP_COM.SUN.TOOLS.JAVAC.MAIN_USER: bad substitution
/home/wm/hadoop-3.1.1/libexec/hadoop-functions.sh: line 2453: HADOOP_COM.SUN.TOOLS.JAVAC.MAIN_OPTS: bad substitution


* Generar el jar con las clases compiladas adentro para que se puedan ejecutar

In [9]:
!jar cf wc.jar WordCount*.class

* Verificar la creación del jar

In [10]:
!ls *.jar

wc.jar


* Luego de generado el .jar ya s puede enviar al sistema hadoop para que sea ejecutado. **NOTA**: En este momento, el sistema hadoop está ejecutando en modo *standalone* es decir en un solo hilo, como en modo de prueba, sin embargo una ejecucion en un clustes real sería identico, el cambio viene en la configuración de hadoop

In [17]:
!echo ${HADOOP_HOME}/etc/hadoop/core-site.xml
!echo "-----------------------------------------------------------------------"
!cat $HADOOP_HOME/etc/hadoop/core-site.xml
!echo "\n"
!echo "\n"
!echo ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml
!echo "-----------------------------------------------------------------------"
!cat $HADOOP_HOME/etc/hadoop/hdfs-site.xml

/home/wm/hadoop-3.1.1/etc/hadoop/core-site.xml
-----------------------------------------------------------------------
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
</configuration>




/home/wm/hadoop-3.1.1/etc/hadoop/hdfs-site.xml
--------------------------------------------------------

* ***POR FIN!!!! :)*** vamos a ejecutar nuestro contador de palabras en el sistema hadoop bajo la metodología Map-Reduce :D

In [18]:
## ejecuta el proceso para los archivos 
## de texto en el directorio input
## Se puede usar la opción --loglevel {OFF, FATAL, ERROR, WARN, DEBUG, INFO, TRACE, ALL} asi:
##
##         !$HADOOP_HOME/bin/hadoop --loglevel ERROR jar wc.jar WordCount input output 
##
## o modificar el archivo .bash_profile con la variable de entorno
##
##   export HADOOP_ROOT_LOGGER="ERROR,console"
##
!$HADOOP_HOME/bin/hadoop jar wc.jar WordCount input output

* Revisamos la carpeta output que es la salida de la ejecución

In [21]:
!ls output/

_SUCCESS  part-r-00000


In [22]:
!cat output/part-r-00000 | head

(DA)	1
(see	1
Analytics	2
Analytics,	1
Big	1
Data	3
Especially	1
Organizations	1
Since	1
Specifically,	1


## Ejercicios
1. Ejecutar el conteo de palabras unicamente con el archivo text0.txt

In [23]:
!hadoop jar wc.jar WordCount input/text0.txt output.1

In [24]:
!ls output.1/

_SUCCESS  part-r-00000


In [25]:
!cat output.1/part-r-00000 | head

(see	1
Analytics	1
Analytics,	1
Big	1
Data	1
Especially	1
Organizations	1
Since	1
Specifically,	1
algorithms	1


2. Mejorar el código para que logre organizar bien las palabras (limpiarlas)

In [42]:

## se limpia el directoroio de trabajo## se l 
!rm WordCount*.* *.jar
!rm -rf output*

In [43]:
!ls

 LICENSE		 README.md	       jdk-8u181-linux-x64.tar.gz
'Notas de clase.ipynb'	 hadoop-3.1.1.tar.gz
 Notas_de_clase.md	 input


* Modificar el codigo de la clase del mapper para que limpie las palabras

In [44]:
%%writefile WordCount.java

import java.io.IOException;

/*
 * Esta clase permite separar una frase (texto)
 * en las palabras que lo conforman. La lista
 * resultante puede ser iterada en un ciclo for
 */
import java.util.StringTokenizer;

/*
 *
 * Librerias requeridas para ejecutar Hadoop
 *
 */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * Esta clase implementa el mapper y el reducer
 */
public class WordCount {
  
  /* 
   * Esta es la clase que se encargará de hacer el mapeo de las
   * palabras. Para ello necesita que tenga una funcion que se
   * llame "map"
   */
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
       
    // Implenteacion de un entero optimizado para hadoop
    private final static IntWritable one = new IntWritable(1);

    /* 
     * en esta variable se guarda cada palabra leida        
     * del flujo de entrada
     */     
    private Text word = new Text();

    /* 
     * Este es el mapper. Para cada palabra 
     * leída, emite el par <word, 1>
     */
    public void map(Object key,       // Clave
                    Text value,       // La linea de texto
                    Context context   // Aplicación que se esta ejecutando
                    ) throws IOException, InterruptedException {
                              
      // Convierte la línea de texto en una lista de strings
      StringTokenizer itr = new StringTokenizer(value.toString());
                              
      // Ejecuta el ciclo para cada palabra 
      // de la lista de strings
      while (itr.hasMoreTokens()) {
          
          
        // NOTA: AGREGADA PARTE PARA LIMPIAR LAS PALABRAS
        // obtiene la palabra
        word.set(itr.nextToken().toLowerCase().replaceAll("[^a-zA-Z0-9]",""));

          
        // escribe la pareja <word, 1> 
        // al flujo de salida
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
           
    // Clase para imprimir un entero al flujo de salida       
    private IntWritable result = new IntWritable();

    // Esta función es llamada para reducir 
    // una lista de valores que tienen la misma clave
    public void reduce(Text key,                      // clave
                       Iterable<IntWritable> values,  // lista de valores
                       Context context                // Aplicación que se esta ejecutando
                       ) throws IOException, InterruptedException {
        
      // itera sobre la lista de valores, sumandolos
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
        
      // escribe la pareja <word, valor> al flujo
      // de salida
      context.write(key, result);
    }
  }

    
  /*
   * Se crea la aplicación en Hadoop y se ejecuta
   */
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    
    /*
     * El job corresponde a la aplicacion
     */
    Job job = Job.getInstance(conf, "word count");
      
    /*
     * La clase que contiene el mapper y el reducer
     */
    job.setJarByClass(WordCount.class);
      
    /* 
     * Clase que implementa el mapper  
     */ 
    job.setMapperClass(TokenizerMapper.class);
      
    /*
     * El combiner es un reducer que se coloca a la salida
     * del mapper para agilizar el computo
     */
    job.setCombinerClass(IntSumReducer.class);
    
    /*
     * Clase que implementa el reducer
     */
    job.setReducerClass(IntSumReducer.class);
      
    /*
     * Salida
     */
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    /*
     * Formatos de entrada y salida
     */
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
   
    // resultado de la ejecución.
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Writing WordCount.java


* Compilar y motar en un jar el codigo modificado

In [45]:
!hadoop com.sun.tools.javac.Main WordCount.java
!jar cf wc.jar WordCount*.class

/home/wm/hadoop-3.1.1/libexec/hadoop-functions.sh: line 2358: HADOOP_COM.SUN.TOOLS.JAVAC.MAIN_USER: bad substitution
/home/wm/hadoop-3.1.1/libexec/hadoop-functions.sh: line 2453: HADOOP_COM.SUN.TOOLS.JAVAC.MAIN_OPTS: bad substitution


* Ejecutar de nuevo el sistema hadoop

In [46]:
!hadoop jar wc.jar WordCount input output

In [47]:

## se limpia el directoroio de trabajo## se l 
!rm WordCount*.* *.jar
!rm -rf input output*