<font color=gray>*Se informa que el presente documento es propiedad de Synergic Partners, S.L.U. Asimismo, su contenido tiene carácter confidencial o reservado. Este documento no puede ser reproducido, en su totalidad o parcialmente, ni mostrado a terceros, ni utilizarse para otros propósitos que los que han originado su entrega, sin el permiso previo de Synergic Partners. Synergic Partners no podrá ser considerada responsable de eventuales errores u omisiones en la edición del documento. Por la presente, usted queda notificado que cualquier forma de reproducción, copia y divulgación del presente documento y/o la información contenida en el mismo están estrictamente prohibidas.*</font>

---

<font color=gray>Mayo 2017 | Tecnologías Big Data</font>

# <font color=#003d5c>Apache Spark 1.6</font>

## <font color=#003d5c>Spark Core: RDDs</font>

---

## <font color=#003d5c>Resilient Distributed Dataset (RDD)</font>

Los **RDDs** son la abstracción de datos atómica de Apache Spark. Consisten en colecciones de objetos particionados y  distribuidos en un cluster. Los RDDs nos permiten escribir programas en términos de operaciones, *transformaciones*, y *acciones*, sobre datos distribuidos.

#### <font color=#003d5c>Características</font>

- **In-memory**. Los datos de un RDD se almacenan en memoria, tanto (tamaño y tiempo) como sea posible.
- **Inmutable (read-only)**. Un RDD no cambia una vez creado. Al ser modificado usando transformaciones se genera un nuevo RDD.
- **Lazy evaluated**. Los datos de un RDD no están disponibles hasta que se ejecuta una acción sobre ellos
- **Procesamiento de datos en paralelo**
- **Cacheables**. permite persistencia de los datos, como la memoria (preferiblemente, por defecto) o el disco (más lento)
- **Tipados**. Los valores de un RDD tienen tipos (ejemplos: RDD[(Int, String)], RDD[Long], etc.)
- **Particionados**. Datos distribuidos en los nodos del cluster

#### <font color=#003d5c>Transformaciones</font>

<font color=gray>Operación</font> | <font color=gray>Descripción</font>
------------ | -------------
**map(func)** | Devuelve un nuevo RDD, aplicando a cada elemento del original la función ‘func’
**filter(func)**| Devuelve un nuevo RDD, seleccionando únicamente aquellos elementos que devuelven true a la función ‘func’
**flatMap(func)**| Parecida a map, pero cada elemento de entrada puede ser mapeado a 0 o más elementos de salida (‘func’ devolverá entonces una secuencia en vez de un simple elemento)
**distinct()**| Devuelve un nuevo RDD que contiene los elementos distintos del RDD original
**groupByKey()**| Cuando se llama sobre un RDD formado por pares (K,V), devuelve un nuevo RDD con pares (K, Seq[V])
**reduceByKey(func)**| Cuando se llama sobre un RDD formado por pares (K,V), devuelve un nuevo RDD con pares (K, V) en el que los para cada clave (K) son agregados en base a la función de agregación ‘func’
**sortByKey([ascending])**| Cuando se llama sobre un RDD formado por pares (K,V) donde K implementa ‘Ordered’, devuelve un nuevo RDD con pares (K, V) ordenados por la clave (K) en orden ascendente o descendente según argumento
**join(otherDataset)**| Cuando se llama sobre RDDs formados por pares (K, V) y (K, W), devuelve un nuevo RDD con pares (K, (V, W)) con todas las parejas de elementos para cada clave (K)

#### <font color=#003d5c>Acciones</font>

<font color=gray>Operación</font> | <font color=gray>Descripción</font>
------------ | -------------
**reduce(func)**| Agrega los elementos del RDD utilizando las función ‘func’ (toma 2 argumentos y devuelve 1). La función debe ser conmutativa y asociativa para poder ser calculado correctamente en paralelo
**collect()**| Devuelve todos los elementos del RDD en un array (útil tras un filtro u otra operación que devuelve un subconjunto suficientemente pequeño de datos). Operación costosa.
**count()**| Devuelve el número de elementos en el RDD
**take(n)**| Devuelve un array con los ‘n’ primeros elementos del RDD
**first()**| Devuelve el primer elemento del RDD (similar a take(1))
**countByKey()**| (solo para RDDs con elementos de tipo (K, V)). Devuelve un objeto ‘Map’ de pares (K, Int) con el conteo de cada clave (K)
    
#### <font color=#003d5c>Persistencia</font>

<font color=gray>Operación</font> | <font color=gray>Descripción</font>
----------|------------
**cache()**| Persistencia de RDD con el nivel de almacenamiento por defecto (MEMORY_ONLY).
**persist()**| Permite establecer el nivel de almacenamiento para la persistencia de RDD. Sólo puede ser utilizado si el RDD no tiene previamente asignado un nivel de almacenamiento.  
**getStorageLevel()**| Permite consultar el nivel de almacenamiento de un RDD
**unpersist()**|Elimina la persistencia de RDD y remueve todos los bloques de este en memoria.

---

### <font color=#003d5c>Crear un RDDs</font>

Existen tres formas básicas para crear un RDD:

#### <font color=#003d5c>1. Paralelizando colecciones existentes</font>

<div class="alert alert-warning" role="alert">
  <strong>Nota:</strong> Para crear un RDD en Apache Spark es necesario instanciar el `SparkContext`. En nuestro caso no es necesario, porque el entorno interactivo del Jupyter instacia el objeto `sc` automáticamente.
</div>

In [None]:
data = [1, 2, 3, 4, 5] # lista de enteros

# paralelizar 'data' utilizando SparkContext --> RDD

In [None]:
print "Datos paralelizados en RDD:", rdd.collect() # recoger datos mediante accion 'collect()' --> list
print type(rdd), "\n"

#### <font color=#003d5c>2. Transformando RDDs ya existentes</font>

In [None]:
# Obtener un RDD incrementando en una unidad los elementos de otro RDD


In [None]:
print "RDD con datos incrementados (x+1): ", rdd_inc.collect()

#### <font color=#003d5c>3. Leyendo datos desde ficheros en HDFS u otro sistema de almacenamiento soportado por Hadoop</font>

Se utiliza el método `textFile()` del `SparkContext`. Por defecto, el método lee un fichero de texto desde HDFS, o del sistema local de ficheros (si está disponible en todos los nodos)

In [None]:
import os

filename = 'Baby_Names__Beginning_2007.csv'
directory = os.path.join('/tmp/spark')
filepath = os.path.join(directory,filename)

In [None]:
# Para leer desde HDFS del cluster


In [None]:
# Inspeccionamos el primer elemento del RDD

##### <font color=#666666>Ejercicio 1. Crear RDD</font>

In [None]:
tuplas =[(1,4),(2,5),(3,6)]

# Completa el código necesario para crear un RDD a partir de la lista de tuplas

# Escribe tu código aquí

In [None]:
# mostramos por pantalla el contenido del RDD (hay que evitar hacerlo con grandes datasets)

---

### <font color=#003d5c>Operaciones RDDs</font>

In [None]:
tuplas = [(1,4),(2,5),(3,6)]
rdd = sc.parallelize(tuplas)

Una vez creado el RDD podemos realizar operaciones sobre el mismo, enlazando transformaciones y acciones. Por ejemplo:

#### `map()`

In [None]:
help(rdd.map)

Elevamos al cuadrado cada elemento del RDD.

<div class="alert alert-warning" role="alert">
  <strong>Nota</strong> En Python la palabra clave `lambda` define una función anónima. 
</div>

Además de funciónes anónimas, las transformaciones aceptan funciones regulares de Python. Por ejemplo:

In [None]:
def suma (x):
    return sum(x)

rdd.map(suma).collect() # La función map() aplica la función suma a cada fila del RDD

#### `flatmap()`

In [None]:
help(rdd.flatMap)

In [None]:
# Transformación para obtener una sola columna


La transformación `flatMap()` aplica una función sobre cada elemento del RDD y devuelve una secuencia de valores. Como las transformaciones en Spark son *lazy evaluated*, esta no se ejecuta hasta que se realiza un acción (en este caso la operación `collect()`). El resultado obtenido es una secuencia de valores.

#### `filter()`

Filtrar valores numericos menores que uno dado (0.5) [0.05,0.2,0.3,0.7,0.9] --> [0.05,0.2,0.3]

In [None]:
rdd = sc.parallelize([0.05,0.2,0.3,0.7,0.9])

#### `take()`

Obtener los dos primeros elementos de un RDD.

In [None]:
rdd = sc.parallelize([0.05,0.2,0.3,0.7,0.9])

Existen otras operaciones que permiten extraer datos de un RDD, p.e.: `first()`

In [None]:
rdd = sc.parallelize([0.05,0.2,0.3,0.7,0.9])

In [None]:
print "¿es igual take(1) y first()?", rdd.take(1) == rdd.first()

<div class="alert alert-danger" role="alert">
  <strong>!</strong> ¿Por qué son diferentes?
</div>

#### `reduce()`

Sumar todos los elementos de una lista de numeros, agregandolos mediante la funcion 'add'

In [None]:
from operator import add # es necesario importar la funcion 'add' (suma 2 numeros)

In [None]:
help(add)

In [None]:
rdd = sc.parallelize([0.05,0.2,0.3,0.7,0.9])

#### `takeOrdered()`

Obtener los dos primeros elementos de un RDD tras ser ordenados ascendenemente/descendentemente

In [None]:
rdd = sc.parallelize([1,5,2,3,4])

In [None]:
print "Los dos elementos mayores son:", max2
print "Los dos elementos menores son:", min2

#### `cache()`

In [None]:
rdd = sc.parallelize([1,2,3]).cache()

In [None]:
# Storage Level


In [None]:
# Unpersist


##### <font color=#666666>Ejercicio 2. Operaciones RDDs</font>

Crear un RDD a partir de una lista de enteros, y contar el número de elementos pares. Mostrar por pantalla los resultados.

In [None]:
data = [1, 2, 3, 4, 5] # lista de enteros

In [None]:
# Escribe tu solución aquí 

##### <font color=#666666>Ejercicio 3. Operaciónes `map()` y `collect()`</font>

Re-escalar un vector de valores numericos `[0.05,0.2,0.3,0.7,0.9] --> [5,20,30,70,90]`

In [None]:
# Escribe tu solución aquí

---

### <font color=#003d5c>Pair RDD</font>

Son un tipo especial de RDD, donde cada elemento es un par clave-valor. Las claves y valores pueden ser de cualquier tipo.

- Se utilizan para algoritmos de tipo MapReduce
- Tienen asociadas funciones extras como: `sort`, `join`, `count`, etc.
- Se pueden crear a partir de funciones: `map`, `flatmap`, `keyBy`, etc.

In [None]:
personas = ['Michael, ', 'Andy, 30', 'Justin, 19']

rdd = sc.parallelize(personas)
personasRDD = (rdd
        .map(lambda x: x.split(","))
        .map(lambda x: [x[0], int(x[1].strip()) if x[1].strip() else None])
        .keyBy(lambda x: x[0])
        .mapValues(lambda x: x[1])
        )
personasRDD.collect()

In [None]:
personasRDD.lookup('Andy')

---

## <font color=#666666>Ejercicio propuesto. WordCount con RDD</font>

En este ejercicio vamos a implementar una aplicación que calcula la frecuencia de palabras del texto de una de las obras más importantes de la literatura universal. Para ello vamos a seguir los siguientes pasos:

1. Cargar los datos
2. Normalizar el texto
3. Extraer palabras
4. Contar las palabras


##### 1. Cargar los datos

El texto se lee desde un fichero almacenado en HDFS para crear un RDD.

In [None]:
# Para leer los datos desde HDFS
filename = 'quijote.txt'
rddQuijote = sc.textFile('/tmp/spark/' + filename) # crear RDD

In [None]:
print type(rddQuijote)

*Ejercicio*: Muestra las primeras cinco lineas del texto usando la variable rddQuijote

In [None]:
# Escribe tu solución aquí

##### 2. Normalizar

A continuación se realizan las operaciones necesarias para estandarizar el texto. estas tareas incluyen:

- eliminar signos de puntuación, 
- símbolos foráneos,
- espacios en blanco extras,
- convertir el texto en minísculas, 
- etc.

In [None]:
# sp: preserve
import re
from unicodedata import normalize

def norm(text):
    text = normalize('NFKD', text).encode('ASCII', 'ignore')
    return re.sub(r'[^a-zA-Z0-9\s]',"",text).lower().strip()

Obtenga un `rddQuijoteStd` que contenga el texto normalizado a partir del RDD original. Además es necesario eliminar líneas en blanco extras.

In [None]:
# Escribe tu solución aquí

##### 3. Extraer palabras de lineas

Una vez se obtiene un RDD normalizado y sin lineas en blanco, tenemos que extraer las palabras del texto. 

In [None]:
# Escribe tu solución

##### 4. Eliminar *stop words*

Ahora vamos a eliminar las palabras vacias (*stop words*), p.e.: que, un, de, etc.

In [None]:
# sp: preserve
stopWordsFile = 'stopwords.txt' 
stopWords = sc.textFile('/tmp/spark/' + stopWordsFile)
stopWords = sc.broadcast(stopWords.collect())

In [None]:
# Escribe tu solución

##### 5. Contar palabras

A partir del RDD obtenido, ejecuta las transformaciones y acciones necesarias para obtener las palabras y sus frecuencias.

*Hint: Crear un RDD donde cada elemento es un par (key,value), y luego aplicar la acción `reduceByKey()`*

In [None]:
# Escribe tu solución

##### 6. Palabras más comunes

Muestra las 10 palabras más comunes en el texto.

*Hint: Utilizar la acción `takeOrdered()` en lugar de `collect()`*

In [None]:
# Escribe tu solución

Copyright © 2017 – Synergic Partners