# Pig: Índice invertido

Partiendo del dataset de posts utilizado anteriormente, vamos a calcular un índice invertido.

Este notebook se ejecutará dentro del cluster de Hadoop creado usando Docker durante las clases del máster. En concreto, se usarán las siguientes imágenes:
* accaminero/namenode01
* swapnillinux/cloudera-hadoop-yarnmaster
* swapnillinux/cloudera-hadoop-datanode

Se puede construir el cluster Hadoop y lanzar el entorno Jupyter usando los siguientes scripts incluídos en la práctica:
* docker/build.sh
* docker/jupyter.sh

## 1. Preparación del entorno de ejecución

Creamos una carpeta local para almacenar los ficheros de trabajo

In [1]:
! rm -fr pig-indiceinvertido
! mkdir -p pig-indiceinvertido
import os
os.chdir("pig-indiceinvertido/")
! pwd

/media/notebooks/pig-indiceinvertido


Instalación de depencias:
* Instalamos dos2unix para limpiar el fichero y convertirlo de formato DOS a Unix
* Instalamos pig para ejecutar los correspondientes comandos

In [4]:
! yum install -y dos2unix pig hbase

Loaded plugins: fastestmirror, ovl
Repodata is over 2 weeks old. Install yum-cron? Or run: yum makecache fast
base                                                     | 3.6 kB     00:00     
cloudera-cdh                                             |  951 B     00:00     
epel/x86_64/metalink                                     |  24 kB     00:00     
epel                                                     | 4.7 kB     00:00     
http://ftp.cica.es/epel/7/x86_64/repodata/repomd.xml: [Errno -1] repomd.xml does not match metalink for epel
Trying other mirror.
epel                                                     | 4.7 kB     00:00     
extras                                                   | 3.4 kB     00:00     
updates                                                  | 3.4 kB     00:00     
(1/5): extras/7/x86_64/primary_db                          | 166 kB   00:00     
(2/5): epel/x86_64/group_gz                                | 266 kB   00:01     
(3/5): epel/x86_64/updateinfo  

Copiamos los ficheros de datos al directorio de trabajo

In [2]:
! cp ../dataset/forum_node.tsv.gz ../dataset/forum1.tsv .
! ls -lh

total 38M
-rw-r--r-- 1 root root 1.8K Feb 18 10:03 forum1.tsv
-rwxr-xr-x 1 root root  38M Feb 18 10:03 forum_node.tsv.gz


Descomprimimos el primer fichero y lo limpiamos

In [3]:
! gzip -d forum_node.tsv.gz && dos2unix -f forum_node.tsv

dos2unix: converting file forum_node.tsv to Unix format ...


Creamos el directorio de usuario en Hadoop si no existiera

In [4]:
! hadoop fs -rm -r /user/$(whoami)/pig-indiceinvertido
! hadoop fs -rm -r /user/$(whoami)/inverted_index
! hadoop fs -mkdir -p /user/$(whoami)

rm: `/user/root/pig-indiceinvertido': No such file or directory
rm: `/user/root/inverted_index': No such file or directory


Copiamos los ficheros a Hadoop y al directorio local

In [5]:
! hadoop fs -put -p forum_node.tsv

In [6]:
! hadoop fs -put forum1.tsv

In [7]:
! hadoop fs -ls

Found 2 items
-rw-r--r--   3 root supergroup       1774 2018-02-18 10:03 forum1.tsv
-rwxr-xr-x   3 root root        120109135 2018-02-18 10:03 forum_node.tsv


## 2. Creación del fichero PIG a ejecutar

In [8]:
%%writefile students-inverted-index.pig

/* 1.Carga el fichero de los posts forum_node.tsv, utilizando una extension de Piggybank para poder quitar la cabecera,
en vez de usar directamente el PigStorage. */
REGISTER /usr/lib/pig/piggybank.jar;
DEFINE StringToInt InvokeForInt('java.lang.Integer.valueOf', 'String');

data =
    load 'forum_node.tsv'
    using org.apache.pig.piggybank.storage.CSVExcelStorage('\t', 'YES_MULTILINE', 'NOCHANGE', 'SKIP_INPUT_HEADER')
    as (pid:chararray, title:chararray, tagnames:chararray,
        author_id:chararray,body:chararray,
        node_type:chararray, parent_id:chararray,
        abs_parent_id:chararray,added_at:chararray,
        score:chararray, state_string:chararray, last_edited_id:chararray,
        last_activity_by_id:chararray, last_activity_at:chararray,
        active_revision_id:chararray, extra:chararray,
        extra_ref_id:chararray, extra_count:chararray, marked:chararray);

/* 2.Limpiamos el fichero quitando los saltos de linea, expresiones html y la expresión regular que se proponia en el ejercicio. */
cleandata = foreach data generate
    REPLACE(pid, '[a-zA-Z]+', '') as post_id,
    LOWER(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(body, '\\\\n\\\\r', ''), '\\\\r', ''), '\\\\n', ''), '<*>', ''), '[^a-zA-Z0-9\'\\s]+', ' ')) AS clean_body;

/* 3.Filtramos los datos de post_id que no son numericos. */
cleandata_filtered = filter cleandata by org.apache.pig.piggybank.evaluation.IsNumeric(post_id);

/* 4.Creamos tuplas separando el body por espacios y convirtiendo el post_id en un numerico a través de una función custom, para evitar problemas que sufrimos con el cast de String a Integer. */
words_data = FOREACH cleandata_filtered GENERATE StringToInt(post_id) as post_id_int:int, FLATTEN(TOKENIZE(clean_body)) as word;
words_data_filtered = filter words_data by SIZE(word) > 0;

/* 5.Agrupamos por palabra */
word_groups = GROUP words_data_filtered BY word;

/* 6.Por cada grupo de palabras, hacemos un distinct para los post_id, eliminando los duplicados, contamos el número de post en que aparece (despues de quitar los duplicados) y generamos una fila con el índice. */
index = FOREACH word_groups {
    pairs = DISTINCT $1.$0;
    cnt = COUNT(pairs);
    GENERATE $0 as word, pairs as index_bag, cnt as count;
};

/* 7.Como se pide que el indice lleve el post_id ordenador, ordenamos la bag resultante de los posts por su id. */
sorted_index = foreach index {
    sorted_bag = order index_bag by $0;
    generate word, sorted_bag, count;
}

/* 8. Lo guardamos en un fichero. */
STORE sorted_index INTO 'inverted_index';





Writing students-inverted-index.pig


## 3. Ejecución del fichero PIG en modo local

In [9]:
! pig -f students-inverted-index.pig -x local

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-02-18 10:04:18,343 [main] INFO  org.apache.pig.Main - Apache Pig version 0.12.0-cdh5.9.0 (rUnversioned directory) compiled Oct 21 2016, 01:17:18
2018-02-18 10:04:18,344 [main] INFO  org.apache.pig.Main - Logging error messages to: /media/notebooks/pig-indiceinvertido/pig_1518948258309.log
2018-02-18 10:04:18,369 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - user.name is deprecated. Instead, use mapreduce.job.user.name
2018-02-18 10:04:18,781 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /root/.pigbootup not found
2018-02-18 10:04:18,887 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2018-02-18 10:04:18,888 [main] INFO  org.apache.hadoop.conf.Configurati

2018-02-18 10:04:22,801 [JobControl] INFO  org.apache.hadoop.mapreduce.Job - The url to track the job: http://localhost:8080/
2018-02-18 10:04:22,803 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_local811689810_0001
2018-02-18 10:04:22,803 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases 1-1,cleandata,cleandata_filtered,data,index,pairs,sorted_bag,sorted_index,word_groups,words_data,words_data_filtered
2018-02-18 10:04:22,803 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: data[8,4],cleandata[-1,-1],cleandata_filtered[25,21],words_data[28,13],words_data_filtered[29,22],word_groups[32,14] C:  R: index[35,8],1-1[36,21],pairs[36,12],1-1[36,21],pairs[36,12],sorted_index[42,15],sorted_bag[43,17]
2018-02-18 10:04:22,808 [Thread-5] INFO  org.apache.hadoop.mapred.LocalJobRunner - OutputCommitter set

2018-02-18 10:04:41,438 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - (EQUATOR) 0 kvi 26214396(104857584)
2018-02-18 10:04:41,439 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - mapreduce.task.io.sort.mb: 100
2018-02-18 10:04:41,439 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - soft limit at 83886080
2018-02-18 10:04:41,439 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - bufstart = 0; bufvoid = 104857600
2018-02-18 10:04:41,439 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - kvstart = 26214396; length = 6553600
2018-02-18 10:04:41,440 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
2018-02-18 10:04:41,470 [LocalJobRunner Map Task Executor #0] WARN  org.apache.pig.data.SchemaTupleBackend - SchemaTupleBackend has already bee

2018-02-18 10:05:13,212 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.MapTask - Finished spill 1
2018-02-18 10:05:13,214 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.Merger - Merging 2 sorted segments
2018-02-18 10:05:13,216 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.Merger - Down to the last merge-pass, with 2 segments left of total size: 68256169 bytes
2018-02-18 10:05:14,057 [communication thread] INFO  org.apache.hadoop.mapred.LocalJobRunner - map > sort > 
2018-02-18 10:05:15,210 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.Task - Task:attempt_local811689810_0001_m_000002_0 is done. And is in the process of committing
2018-02-18 10:05:15,212 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.LocalJobRunner - map > sort
2018-02-18 10:05:15,213 [LocalJobRunner Map Task Executor #0] INFO  org.apache.hadoop.mapred.Task - Task 'attempt_local811689810_0001_m_000002_0' done.

2018-02-18 10:05:26,841 [localfetcher#1] INFO  org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput - Read 60032392 bytes from map-output for attempt_local811689810_0001_m_000000_0
2018-02-18 10:05:26,842 [localfetcher#1] INFO  org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - closeInMemoryFile -> map-output of size: 60032392, inMemoryMapOutputs.size() -> 4, commitMemory -> 181213592, usedMemory ->241245984
2018-02-18 10:05:26,843 [EventFetcher for fetching Map Completion Events] INFO  org.apache.hadoop.mapreduce.task.reduce.EventFetcher - EventFetcher is interrupted.. Returning
2018-02-18 10:05:26,844 [pool-5-thread-1] INFO  org.apache.hadoop.mapred.LocalJobRunner - 4 / 4 copied.
2018-02-18 10:05:26,846 [pool-5-thread-1] INFO  org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl - finalMerge called with 4 in-memory map-outputs and 0 on-disk map-outputs
2018-02-18 10:05:26,981 [pool-5-thread-1] INFO  org.apache.hadoop.mapred.Merger - Merging 4 sorted segments
2018-02-

In [11]:
! tail -60000 ./inverted_index/part-r-00000  | head -10

googlecode	{(79),(10617),(28844),(45630),(1006011),(1034790),(3001942),(5001673),(5002485),(6001946),(6002641),(6002651),(6002660),(6002690),(6003146),(6003978),(6004231),(6004298),(6005746),(6006014),(6007830),(6010307),(6015242),(6016801),(6022761),(6025168),(6027118),(7006210)}	28
googlemail	{(66818),(66848),(67057),(6005844),(6006822),(6008744)}	6
googlemale	{(10011506)}	1
googleplex	{(2009842)}	1
googleplus	{(6002796)}	1
googolplex	{(2009965)}	1
goosebumps	{(5002139)}	1
gorchynski	{(12002258),(12002259),(12002270),(12002296),(12002300),(12002345),(12002440)}	7
gordeychuk	{(12003403)}	1
gorilla834	{(6019443),(6019538)}	2
tail: write error: Broken pipe


## 4. Ejecución del fichero PIG en Hadoop

In [12]:
! pig -f students-inverted-index.pig

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
2018-02-18 10:07:32,413 [main] INFO  org.apache.pig.Main - Apache Pig version 0.12.0-cdh5.9.0 (rUnversioned directory) compiled Oct 21 2016, 01:17:18
2018-02-18 10:07:32,416 [main] INFO  org.apache.pig.Main - Logging error messages to: /media/notebooks/pig-indiceinvertido/pig_1518948452352.log
2018-02-18 10:07:34,140 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /root/.pigbootup not found
2018-02-18 10:07:34,408 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2018-02-18 10:07:34,408 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2018-02-18 10:07:34,408 [main] INFO  org.apache.pig.backe

2018-02-18 10:07:47,365 [JobControl] INFO  org.apache.hadoop.mapreduce.Job - The url to track the job: http://yarnmaster:8088/proxy/application_1518944122177_0002/
2018-02-18 10:07:47,365 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_1518944122177_0002
2018-02-18 10:07:47,365 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases 1-1,cleandata,cleandata_filtered,data,index,pairs,sorted_bag,sorted_index,word_groups,words_data,words_data_filtered
2018-02-18 10:07:47,366 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: data[8,4],cleandata[-1,-1],cleandata_filtered[25,21],words_data[28,13],words_data_filtered[29,22],word_groups[32,14] C:  R: index[35,8],1-1[36,21],pairs[36,12],1-1[36,21],pairs[36,12],sorted_index[42,15],sorted_bag[43,17]
2018-02-18 10:07:47,533 [main] INFO  org.apache.pig.backend.hadoo

In [13]:
! hadoop fs -ls inverted_index/

Found 2 items
-rw-r--r--   3 root supergroup          0 2018-02-18 10:11 inverted_index/_SUCCESS
-rw-r--r--   3 root supergroup   86936662 2018-02-18 10:11 inverted_index/part-r-00000


In [15]:
! hadoop fs -cat inverted_index/part-r-00000 | tail -60000 | head -10

googlecode	{(79),(10617),(28844),(45630),(1006011),(1034790),(3001942),(5001673),(5002485),(6001946),(6002641),(6002651),(6002660),(6002690),(6003146),(6003978),(6004231),(6004298),(6005746),(6006014),(6007830),(6010307),(6015242),(6016801),(6022761),(6025168),(6027118),(7006210)}	28
googlemail	{(66818),(66848),(67057),(6005844),(6006822),(6008744)}	6
googlemale	{(10011506)}	1
googleplex	{(2009842)}	1
googleplus	{(6002796)}	1
googolplex	{(2009965)}	1
goosebumps	{(5002139)}	1
gorchynski	{(12002258),(12002259),(12002270),(12002296),(12002300),(12002345),(12002440)}	7
gordeychuk	{(12003403)}	1
gorilla834	{(6019443),(6019538)}	2
tail: write error: Broken pipe
tail: write error
