# BIG DATA Y COMPUTACIÓN DE ALTO DESEMPEÑO: UNA INTRODUCCIÓN PARA EL ESTADÍSTICO



Haga click [aquí](http://nbviewer.jupyter.org/format/slides/github/jdvelasq/big-data-y-comp-alto-desempeno/blob/master/Slides.ipynb?flush_cache=true#/) para ver esta presentación en nbviewer.

<img src="img/firma-social-media.jpg" alt="" width="500" >

<img src="img/evo-big-data.jpg" alt="" width="800" >

<img src="img/servicios-web.jpg" alt="" width="800" >

# Apache Hadoop & Map/Reduce

<img src="img/map-reduce.jpg" alt="" width="800" >

<img src="img/mr-word-count.jpg" alt="" width="800" >

# Generalidades

## Modos de ejecución


* Modo local (un solo hilo de ejecución en la máquina local, para aprendizaje y desarrollo). El sistema local de archivos funciona como el HDFS.


* `=====>`  Modo seudo-distribuido (múltiples hilos en la máquina local, para desarrollo). El HDFS es alcanzable como un servicio.


* Cluster (ambiente productivo)



## Desarrollo de aplicaciones


* Programación directa en Java (Hadoop)


* `=====>` Programación usando un lenguaje interpretado (Hadoop-streaming)



# Sistemas operativos

* Microsoft Windows (dificil).


* Mac OS: instalación directa. 


* Ubuntu Linux: instalación directa.



# Ejemplo del conteo de palabras

Se usará Hadoop-streaming.

## Datos

In [93]:
%%writefile input/text0.txt
A B C A
A D D A
A K M C

Overwriting input/text0.txt


In [94]:
%%writefile input/text1.txt
B A C Y B
U O Y Y A
A B I T

Overwriting input/text1.txt


In [95]:
%%writefile input/text2.txt
A C D A
A K B 
A N H I D A

Overwriting input/text2.txt


## mapper.R

In [96]:
%%writefile mapper.R
#! /usr/bin/env Rscript

input <- file('stdin', 'r')
while(TRUE) {
    row <- readLines(input, n=1)
    if( length(row) == 0 ){
        break
    }
    words <- strsplit(row, " ")[[1]]
    for(word in words){
        if(word != '')
            write(cat(word,'\t1',sep=''), "")
    }
}

Overwriting mapper.R


In [119]:
## El programa anterior se hace ejecutable
!chmod +x mapper.R

## Verificación
!cat ./input/text*.txt | ./mapper.R | head

A	1
B	1
C	1
A	1
A	1
D	1
D	1
A	1
A	1
K	1
Error in cat(x, file = file, sep = c(rep.int(sep, ncolumns - 1), "\n"),  : 
  ignoring SIGPIPE signal
Calls: write -> cat
Execution halted


## reducer.R

In [98]:
%%writefile reducer.R
#!/usr/bin/env Rscript

curkey <- NULL
total <- 0
input <- file('stdin', 'r')
while(TRUE) {
    row <- readLines(input, n=1)
    if( length(row) == 0 ){
        break
    }
    x <- strsplit(row, "\t")[[1]]
    key <- x[1]
    value <- strtoi(x[2])
    if(!is.null(curkey) && key == curkey){
        total <- total + value
    }
    else{
        if( !is.null(curkey) ) {
            write(cat(curkey,'\t', total), "")
        }
        curkey <- key
        total <- value
    }
}
write(cat(curkey,'\t', total), "")

Overwriting reducer.R


In [99]:
!chmod +x reducer.R
!cat ./input/text*.txt | ./mapper.R | sort | ./reducer.R 

In readLines(input, n = 1) : incomplete final line found on 'stdin'
A 	 13
B 	 5
C 	 4
D 	 4
H 	 1
I 	 2
K 	 2
M 	 1
N 	 1
O 	 1
T 	 1
U 	 1
Y 	 3


## Ejecución en Hadoop

In [115]:
!hadoop fs -mkdir /user
!hadoop fs -mkdir /user/jdvelasq
!hadoop fs -mkdir /user/jdvelasq/input
!hadoop fs -copyFromLocal  input/* /user/jdvelasq/input
!hadoop fs -ls /user/jdvelasq/input/*

-rw-r--r--   1 jdvelasq supergroup         25 2018-11-08 23:36 /user/jdvelasq/input/text0.txt
-rw-r--r--   1 jdvelasq supergroup         29 2018-11-08 23:36 /user/jdvelasq/input/text1.txt
-rw-r--r--   1 jdvelasq supergroup         28 2018-11-08 23:36 /user/jdvelasq/input/text2.txt


In [116]:
!hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input -output output  -mapper mapper.R -reducer reducer.R

In [117]:
!hadoop fs -ls /user/jdvelasq/output

Found 2 items
-rw-r--r--   1 jdvelasq supergroup          0 2018-11-08 23:37 /user/jdvelasq/output/_SUCCESS
-rw-r--r--   1 jdvelasq supergroup         79 2018-11-08 23:37 /user/jdvelasq/output/part-00000


In [118]:
!hadoop fs -cat /user/jdvelasq/output/part-00000

A 	 13
B 	 5
C 	 4
D 	 4
H 	 1
I 	 2
K 	 2
M 	 1
N 	 1
O 	 1
T 	 1
U 	 1
Y 	 3


http://localhost:50070/explorer.html#/user/jdvelasq/output

## Problemas Típicos que se puden programar usando Map/Reduce

* Valores máximo o mínimo para una clave.


* Ordenamiento del archivo.


* Sumas y promedios por claves.


* Obtener los N registros más ...


* Asociar por claves:


    0   A, B                  A  0, 1
    1   A, B       ======>    B  0, 1, 2  
    2   B, C                  C  2
    
    
**La mayor parte de problemas no pueden solucionarse usando un proceso simple Map/Reduce**

<img src="img/mr-jobs.jpg" alt="" width="800" >

# Apache Pig

## WordCount en Pig

Se interactua a través de la líneas de comandos. El grupo de investigación escribió un magic que permite interactuar con Apache Pig desde Jupyter.

In [1]:
%load_ext bigdata
%pig_init

In [5]:
%%pig
lines = LOAD 'input/text*.txt' AS (line:CHARARRAY);

-- genera una tabla llamada words con una palabra por registro
words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) AS word;

-- agrupa los registros que tienen la misma palabra
grouped = GROUP words BY word;

-- genera una variable que cuenta las ocurrencias por cada grupo
wordcount = FOREACH grouped GENERATE group, COUNT(words);

-- selecciona las primeras 15 palabras
s = LIMIT wordcount 15;

-- imprime en pantalla las primeras 15 palabras
STORE s INTO 'output-pig';

In [6]:
!hadoop fs -ls output-pig/

Found 2 items
-rw-r--r--   1 jdvelasq supergroup          0 2018-11-09 07:22 output-pig/_SUCCESS
-rw-r--r--   1 jdvelasq supergroup         53 2018-11-09 07:22 output-pig/part-r-00000


In [7]:
!hadoop fs -cat /user/jdvelasq/output-pig/part-r-00000

A	13
B	5
C	4
D	4
H	1
I	2
K	2
M	1
N	1
O	1
T	1
U	1
Y	3


<img src="img/hdfs-pig-1.jpg" alt="" width="900" >

<img src="img/hdfs-pig-2.jpg" alt="" width="900" >

<img src="img/hdfs-pig-3.jpg" alt="" width="900" >

## Ejemplos de Apache Pig



http://nbviewer.jupyter.org/github/jdvelasq/AGD-03-Pig/blob/master/02-basics.ipynb
    
    
http://nbviewer.jupyter.org/github/jdvelasq/AGD-03-Pig/blob/master/04-tipos-de-datos.ipynb



# Apache Hive

## WordCount

In [8]:
%hive_init

Hive initialized!


In [9]:
%%hive
DROP TABLE IF EXISTS docs;
DROP TABLE IF EXISTS word_counts;

CREATE TABLE docs (line STRING);

LOAD DATA LOCAL INPATH 
    'input/text*.txt' 
OVERWRITE INTO TABLE docs;

CREATE TABLE word_counts 
AS
    SELECT word, count(1) AS count 
    FROM
        (SELECT explode(split(line, '\\s')) AS word FROM docs) w
GROUP BY 
    word
ORDER BY 
    word;

SELECT * FROM word_counts LIMIT 10;

	7
A	13
B	5
C	4
D	4
H	1
I	2
K	2
M	1
N	1


## Características de Apache Hive


* El lenguaje tiene muchas similitudes con SQL y resulta fácil 


* Los archivos se guardan directamente en disco duro como texto.


* A continuación se presentan los principales puntos de diferencia con otros sistemas de bases de datos.

In [11]:
%%writefile tbl0.csv
1,D,4,2016-06-25,a:e:c:d,bb#10:dd#20:cc#40
2,C,4,2015-05-14,a:c,dd#40:bb#20:cc#10
3,D,6,2014-12-26,b:d,aa#10:bb#40
4,D,5,2016-06-25,a:c:e:b:d,bb#40:dd#20:aa#10:cc#30
5,C,7,2016-05-23,d:e:c,cc#10:aa#20
6,A,2,2018-06-14,a:d,aa#20
7,B,3,2014-12-22,a:e:d,cc#20
8,C,6,2015-08-20,d:a:c:e,cc#10:aa#20
9,B,3,2017-12-08,b:a:c:e,cc#40:dd#10:aa#30:bb#20
10,B,7,2015-07-28,c:d:e:a:b,aa#10:dd#40:cc#30

Writing tbl0.csv


In [12]:
%%hive
DROP TABLE IF EXISTS tbl0;
CREATE TABLE tbl0 (
    c1 INT,
    c2 STRING,
    c3 INT,
    c4 DATE,
    c5 ARRAY<CHAR(1)>, 
    c6 MAP<STRING, INT>
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY ':'
MAP KEYS TERMINATED BY '#'
LINES TERMINATED BY '\n';

LOAD DATA LOCAL INPATH 'tbl0.csv' INTO TABLE tbl0;

SELECT * FROM tbl0;

1	D	4	2016-06-25	["a","e","c","d"]	{"bb":10,"dd":20,"cc":40}
2	C	4	2015-05-14	["a","c"]	{"dd":40,"bb":20,"cc":10}
3	D	6	2014-12-26	["b","d"]	{"aa":10,"bb":40}
4	D	5	2016-06-25	["a","c","e","b","d"]	{"bb":40,"dd":20,"aa":10,"cc":30}
5	C	7	2016-05-23	["d","e","c"]	{"cc":10,"aa":20}
6	A	2	2018-06-14	["a","d"]	{"aa":20}
7	B	3	2014-12-22	["a","e","d"]	{"cc":20}
8	C	6	2015-08-20	["d","a","c","e"]	{"cc":10,"aa":20}
9	B	3	2017-12-08	["b","a","c","e"]	{"cc":40,"dd":10,"aa":30,"bb":20}
10	B	7	2015-07-28	["c","d","e","a","b"]	{"aa":10,"dd":40,"cc":30}


<img src="img/hdfs-hive-1.jpg" alt="" width="900" >

# Spark

In [1]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext
from operator import add

APP_NAME = "My First Spark Application"

def tokenize(text):
    return text.split()

def main(sc): 
    text = sc.textFile('input/text*.txt')
    words = text.flatMap(tokenize)
    wc = words.map(lambda x: (x,1))
    counts = wc.reduceByKey(add)
    counts.saveAsTextFile("output-spark")

if __name__ == "__main__":
    conf = SparkConf().setAppName(APP_NAME) 
    conf = conf.setMaster("local[*]")
    sc = SparkContext(conf=conf)
    main(sc)

In [2]:
!hadoop fs -ls output-spark

Found 4 items
-rw-r--r--   1 jdvelasq supergroup          0 2018-11-09 08:39 output-spark/_SUCCESS
-rw-r--r--   1 jdvelasq supergroup         45 2018-11-09 08:39 output-spark/part-00000
-rw-r--r--   1 jdvelasq supergroup         37 2018-11-09 08:39 output-spark/part-00001
-rw-r--r--   1 jdvelasq supergroup         36 2018-11-09 08:39 output-spark/part-00002


In [6]:
!hadoop fs -cat /user/jdvelasq/output-spark/part-00000

('B', 5)
('C', 4)
('D', 4)
('T', 1)
('N', 1)


In [7]:
## en este ejemplo se pasa una función arbitraria a `map`
from operator import add
rdd = sc.textFile('input/text*.txt')
rdd = rdd.map(len)
print(rdd.collect())
rdd = rdd.reduce(add)
rdd

[7, 7, 7, 1, 9, 9, 7, 1, 7, 6, 11, 1]


73

In [None]:
query = """
SELECT DISTINCT
    _C5
FROM 
    csv.`datos.csv`
ORDER BY
    _c5
"""

spark.sql(query).write.save('temp', format="csv")

In [5]:
!hadoop fs -ls /user/jdvelasq

Found 4 items
drwxr-xr-x   - jdvelasq supergroup          0 2018-11-08 23:36 /user/jdvelasq/input
drwxr-xr-x   - jdvelasq supergroup          0 2018-11-08 23:37 /user/jdvelasq/output
drwxr-xr-x   - jdvelasq supergroup          0 2018-11-09 07:22 /user/jdvelasq/output-pig
drwxr-xr-x   - jdvelasq supergroup          0 2018-11-09 08:39 /user/jdvelasq/output-spark


In [4]:
!hadoop fs -rm /user/jdvelasq/output-pig/*
!hadoop fs -rmdir /user/jdvelasq/output-pig

rm: `/user/jdvelasq/output-pig/*': No such file or directory


In [134]:
!hadoop fs -mkdir /user/jdvelasq/output-pig

<img src="img/big-data-analytics.jpg" alt="" width="800" >

----

In [None]:
from IPython.core.display import HTML
def css():
    style = open("custom.css", "r").read()
    return HTML(style)
css()

---