In [78]:
!wget --no-cache -O init.py -q https://raw.githubusercontent.com/UDEA-Esp-Analitica-y-Ciencia-de-Datos/EACD-03-BIGDATA/master/init.py
import init; init.init(force_download=False); 
from IPython.display import Image

In [None]:
Image("local/imgs/udea-datascience.png")

In [None]:
#Instalación
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz 
!pip install -q findspark

#Variables de Entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

#SparkContext
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

#**RDD**



Un RDD (Dataset Distribuido Resiliente) es una colección de elementos que es tolerante a fallos y que es capaz de operar en paralelo.

Se considera como la principal abstracción de datos de Spark, definido desde la primera versión, y permite que los datos se puedan particionar y distribuir en los nodos del clúster.

Estas son las principales características de los RDD:
* Inmutable: Una vez creado NO puede ser modificado
* Tolerancia a fallas: Ya que los RDD están particionados y distribuidos en los nodos del clúster, si un nodo falla se puede recuperar los datos consultando otro nodo
* Evaluación perezosa: Las transformaciones realizada al RDD no se ejecutan de inmediato, se almacenan en un DAG (Grafo Acíclico dirigido) y se resuelven cuando sea necesario resolverlas



**Crear un RDD**

Existen dos formas de crear un RDD


1. Paralelizar una colección existente

Crear un RDD usando el método parallelize del SparkContext

In [None]:
num = [0, 1, 2, 3, 4, 5]
type(num)

In [None]:
numRdd=sc.parallelize(num)
type(numRdd)

2. Hacer referencia a un conjunto de datos en un sistema de almacenamiento externo

Crear un RDD usando el método textFile del SparkContext

In [None]:
textRdd = sc.textFile("local/data/animales.txt")
type(textRdd)

Al crear un RDD este es particionado y cada una de esas particiones se distribuye en el clúster.

Veamos cómo se crean esas particiones

In [None]:
numRdd.getNumPartitions()

Ese número de particiones a utilizar puede ser indicado por el usuario

In [None]:
numRddP4=sc.parallelize(num,4)

In [None]:
numRddP4.getNumPartitions()

**Evaluación perezosa**

La evaluación perezosa de los RDD se refiere a que las acciones no se ejecutan hasta que no sea estrictamente necesario. En su lugar se crea un Grafo Dirigido Asincrónico (DAG) que  se ejecutará cuando sea necesario.

Para comprobarlo creamos un RDD a partir de un archivo que no existe



In [None]:
textRdd = sc.textFile("local/data/archivo_no_existe.txt")

No se genera ningún error a pesar de que el archivo no existe.

Ahora apliquemos un collect() para obligar a que se ejecute el grafo

In [None]:
textRdd.collect()

Ahora si se genera el error de que el archivo a cargar no existe

Veamos otro ejemplo ahora leyendo un archivo que si existe

In [None]:
textRdd = sc.textFile("local/data/animales.txt")

Veamos el contenido del RDD creado

In [None]:
for line in textRdd:
  print(line)


Porqué no podemos ver el contenido del RDD?

La forma correcta sería convertir el RDD en una lista para que sea iterable y la pueda recorrer

In [None]:
for line in textRdd.collect():
  print(line)

Se puede ver que el archivo tiene 3 líneas.

Ahora abra el archivo, agregue una nueva línea y volvamos a imprimirlo

In [None]:
textRdd.collect()

**¿Que pasó?** Porque volvió a leer el archivo si ya lo había leído antes?

Vuelva a dejar el archivo con su contenido original

##**Operaciones**

Sobre un RDD se pueden ejecutar dos tipos de operaciones, a saber, transformaciones y acciones

En este enlace puede encontrar más información sobre cada una de las transformaciones y acciones presentadas en este Notebook

https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

###**Transformaciones**
Las transformaciones se ejecutan sobre un RDD y generan un nuevo RDD


**map(func):** Entrega un nuevo conjunto de datos resultado de pasar cada elemento por la función indicada

In [None]:
rdd = sc.parallelize(["a", "b", "c"])
rdd.map(lambda x: (x, 1)).collect()

**filter(func):** Entrega un conjunto de datos con los elementos en los que la función retorne verdadero


In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

Vamos a construir una función y la usamos como argumento de filter

In [None]:
def par(x):
  if x % 2==0:
    return True
  else:
    return False

In [None]:
par(4)

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(par).collect()

Y si el RDD contiene strings?

In [None]:
rdd = sc.parallelize(["La capital de Antioquia es Medellín", \
                      "La capital de Cundinamarca es Bogotá", \
                      "Bogotá tiene mas de 7 millones de habitantes", \
                      "Medellín tiene mas de 2 millones de habitantes"])

In [None]:

medellin=rdd.filter(lambda x: "Medellín" in x)
print("El número de regitros que contienen Medellín es",medellin.count())
medellin.collect()

**flatMap(func):** Funciona similar al map. Devuelve un nuevo RDD aplicando primero una función a todos los elementos de este RDD y luego compactando los resultados.

In [None]:
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: (x,x*x)).collect()

**sample(withReplacement, fraction, seed):**
Muestrea una fracción de los datos, con o sin reemplazo, usando una semilla generadora de números aleatorios

In [None]:
rdd = sc.parallelize(range(100))
sample= rdd.sample(False, 0.1, 1)
sample.collect()

Qué pasa si no usamos la semilla?

In [None]:
sample= rdd.sample(False, 0.1)
sample.collect()

**union(otherDataset):** Entrega un nuevo conjunto de datos que incluye los datos actuales y los que se pasen como argumento



In [None]:
a=sc.parallelize([1, 3, 5, 7, 9])
b=sc.parallelize([0, 2, 4, 6, 8])
a.union(b).collect()


**intersection(otherDataset):** Entrega un nuevo conjunto de datos formado por la intersección entre el conjunto de datos actual y el que se pasa como argumento 


In [None]:
a=sc.parallelize([0, 2, 4, 6, 8])
b=sc.parallelize([6, 8, 10, 12, 14])
a.intersection(b).collect()

**distinct():** Entrega un nuevo conjunto de datos formado por los elementos diferentes del conjunto de datos actual


In [None]:
a=sc.parallelize([3, 1, 1, 0, 2, 3, 2, 0, 0, 1, 1])
b=a.distinct()
b.collect()

**groupByKey():** Recibe un conjunto tuplas (clave valor), y entrega un nuevo conjunto de tuplas (clave, Iterable[valores] )



In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 6), ("a", 3), ("c", 5)])
gbk=rdd.groupByKey()
gbk.collect()

Con la función mapValues podemos obtener información sobre el contenido de ese iterable de los valores.

Si le pasamos como argumento len nos dirá el tamaño del iterable

In [None]:
gbk.mapValues(len).collect()

Si le pasamos como argumento list nos entregará la lista de valores

In [None]:
gbk.mapValues(list).collect()

**reduceByKey(func):** Recibe un conjunto de tuplas (clave, valor) y devuelve un nuevo conjunto de tuplas con la clave y la reducción de los valores por clave según la función que se pase como argumento


In [None]:
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 6), ("a", 3), ("c", 5)])
rdd.reduceByKey(add).collect()

Cuando trabajamos con datos numéricos podemos utilizar operadores predefinidos por Spark, como es el caso del operador add que se encarga de sumar los valores.


**sortByKey(type):** Entrega un conjunto de tuplas clave valor ordenados ascendente o descendentemente según se solicite


In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 6), ("a", 3), ("c", 5)])
rdd.sortByKey().collect()

Por defecto se ordena de manera ascendente, que también puede ser indicado enviando True como argumento.

Para ordenar descendentemente enviamos False como parámetro

In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 6), ("a", 3), ("c", 5)])
rdd.sortByKey(False).collect()

Y en caso que las claves puedan ser números y letras como se ordena?

In [None]:
rdd = sc.parallelize([('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)])
rdd.sortByKey(True).collect()

**join(otherDataset):** A partir de tuplas (K,V) y (K,W) entrega como resultado tuplas (K,(V,W))




In [None]:
x = sc.parallelize([("a", 1),("a", 2), ("b", 4)])
y = sc.parallelize([("a", 5), ("a", 6),("a", 7)])
x.join(y).collect()

**cogroup(otherDataset):** A partir de conjuntos de datos (K,V) y (K,W) entrega un conjunto de datos de tipo (K, Iterable<V>, Iterable<W>)


In [None]:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
z=x.cogroup(y)
z.mapValues(list).collect()

###**Acciones**

Las acciones se aplican sobre un conjunto de datos y devuelven un valor o un nuevo conjunto de datos	


**reduce(func):** Agrega los elementos de un conjunto de datos aplicando sobre ellos una función



In [None]:
from operator import add
rdd=sc.parallelize([1, 2, 3, 4, 5])
rdd.reduce(add)

Porqué ya no es necesario usar collect()?

**Collect():** Devuelve todos los elementos del conjunto de datos

**count():** Entrega el número de elementos disponibles en el conjunto de datos





In [None]:
sc.parallelize([1, 2, 3, 4, 5]).count()

**First():** Entrega el primer elemento del conjunto de datos



In [None]:
sc.parallelize([1, 2, 3, 4, 5]).first()

**Take(n):** Entrega los primeros n elementos del conjunto de datos



In [None]:
sc.parallelize([1, 2, 3, 4, 5]).take(3)

**TakeOrdered(n):** Entrega los primeros n elementos del conjunto de datos ordenados

In [None]:
sc.parallelize([5, 3, 1, 4, 2]).takeOrdered(3)

podemos utilizar un parámetro adicional que sería una función que realizaría el ordenamiento. takeOrdered(n,func)

**saveAsTextFile(path):** Almacena el conjunto de datos en un archivo de texto en la ruta path



In [None]:
x = sc.parallelize([("a", 1),("a", 2), ("b", 4)])
x.saveAsTextFile("local/data/ejemplo1")

**countByKey():** Devuelve un conjunto de datos representado por tuplas clave, valor. Donde el valor entregado será la suma de los elementos de la misma clave



In [None]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 6), ("a", 3), ("c", 5)])
rdd.countByKey()

**Foreach(func):** Ejecuta la función indicada en cada elemento del conjunto de datos



In [None]:
rdd=sc.parallelize([1, 2, 3, 4, 5])

In [None]:
def impar(x):
  if x % 2 ==1:
    print("%d es impar" %x)

In [None]:
rdd.foreach(impar)

##**Persistencia**

Teniendo en cuenta que la operación de los RDD es perezosa, la persistencia permite almacenar los RDD después de la primera vez que se calculan	con el fin de que las operaciones futuras se realicen mucho mas rápido

Almacenar los datos en cache es clave para algoritmos iterativos

Para persistir un RDD se utilizan los métodos: cache (persistir en memoria), persist (permite seleccionar el tipo de persistencia)



Veamos un ejemplo de persistencia.
* Carguemos el archivo animales.txt
* Apliquemos una persistencia
* Veamos su contenido
* Modifiquemos el contenido del archivo animales.txt
* Veamos nuevamente el contenido del archivo

In [None]:
textRdd = sc.textFile("local/data/animales.txt")
textRdd.cache()
textRdd.collect()

Agreguemos una línea al archivo y volvamos a imprimirlo

In [None]:
textRdd.collect()

Porque **no se da cuenta** que el archivo se actualizó?

para persistir un RDD podemos utilizar la función cache() que realizará persistencia en memoria.

Si queremos aplicar otro tipo de persistencia podemos aplicar la función persist(type) donde podemos indicar que tipo de persistencia queremos aplicar
* MEMORY_ONLY: Es el nivel por defecto. Se almacenan en memoria como objetos Java

* MEMORY_AND_DISK: Almacena en memoria y en caso de que requiera mas espacio almacenará las particiones restantes en disco

* DISK_ONLY: Almacena solo en disco


Spark maneja automáticamente los recursos y dejará de persistir RDDs inactivos si requiere memoria. 

Adicionalmente es posible dejar de pesistir con la función unpersist()

Podemos consultar la persistencia de un RDD con la propiedad is_cached



La persistencia tiene grandes implicaciones en los tiempos de ejecución de un programa. Si se desea utilizar constantemente un mismo RDD, hacerlo persistente ayudará a que no sea necesario recalcular las transformaciones.

Veamos los tiempos de ejecución aplicando operaciones matemáticas sobre RDD.
Vamos a tomar los números del 0 al 1000, los elevamos al cuadrado y sumamos el resultado

In [None]:
rdd=sc.parallelize(range(1000))
rdd=rdd.map(lambda x: x*x)
%time print ("la suma del cuadrado de los números del 0 al 1000 es: ", rdd.sum())

Cada vez que va a mostrar el resultado debe volver a construir el rdd2.


In [None]:
%time print ("la suma del cuadrado de los números del 0 al 1000 es: ", rdd2.sum())

In [None]:
rdd2.is_cached

Ahora apliquemos persistencia

In [None]:
rdd2.cache()
rdd2.take(5)

In [None]:
rdd2.is_cached

In [None]:
%time print ("la suma del cuadrado de los números del 0 al 1000 es: ", rdd2.sum())

In [None]:
rdd2.unpersist()

In [None]:
rdd2.is_cached

##**Variables Compartidas**

Cuando paralelizamos un variable, esta es dividida en fragmentos y cada uno de esos fragmentos es repartido entre los nodos del clúster, esto significa que los nodos no tienen los mismos datos excepto por los fragmentos coincidentes en las réplicas.

Los resultados intermedios generados por las transformaciones realizadas por cada nodo no son compartidos a los demás nodos, esto quiere decir que son resultados locales que no se transmiten.

En algunos casos puede ser importante disponer de un método que envíe a todos los nodos algún dato crucial para que se tenga en cuenta durante los cálculos a realizar, es decir, utilizar variables compartidas. Para esto Spark dispone de dos formas de compartir variables: broadcast y acumuladores

###**Broadcast**

Los broadcast o variables difundidas son variables de solo lectura que se transmiten a todos los nodos del clúster. Esto será de gran ayuda cuando el cálculo realizado por cada nodo requiere tener en cuenta el valor de la variable a compartir.

Utilizando el método broadcast del SparkContext podemos difundir una variable

In [None]:
num= [1, 2, 3, 4, 5]
numBcast=sc.broadcast(num)
numBcast.value

###**Acumuladores**

A diferencia de los broadcast que solo se pueden leer, los acumuladores pueden ser operados, es decir que cada nodo puede modificar el valor en el que se inicializa el acumulador.

In [None]:
from operator import add
suma = sc.accumulator(0)
rdd= sc.parallelize([1,2,3,4,5])
rdd.foreach(lambda x: suma.add(x))

print("La suma es", suma.value)

Podemos utilizar simultáneamente variables compartidas broadcast y acumuladores

Teniendo un RDD que contiene los números del 0 al 10, vamos a contar (acumulador) cuantos de ellos son menores que n (broadcast)

In [None]:
rdd=sc.parallelize([0,1,2,3,4,5,6,7,8,9,10])
n=sc.broadcast(x)
an=sc.accumulator(0)
rdd.collect()

Creemos una función map que nos ayude en el proceso

In [None]:
def menorN(x) :
  global an
  if x < 5:
    an.add(1)
    return (x,"SI")
  else:
    return (x,"NO")

Con una función map recorremos la lista de valores del RDD para contar cuantos cumplen la condición y generamos una tupla de salida cuyo valor indica el cumplimiento de la condición

In [None]:
mn=rdd.map(menorN).collect()
print("En el RDD el total de números menores que n es:", an.value)


In [None]:
display(mn)

#**Ejemplos**

###**Palabras más frecuentes en una obra de literatura**

Cuáles son las 10 palabras más frecuentes de la obra de literatura Hamlet

In [None]:
!wget --no-cache -O local/data/hamlet.txt -q http://www.gutenberg.org/cache/epub/56454/pg56454.txt

In [None]:
hamlet = sc.textFile("local/data/hamlet.txt")
counts = hamlet.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
top = counts.sortBy(lambda v: -v[1]).take(10)
for p in top:
	print (p[0] + ": %d" % p[1])


lo que más se repite es el caracter vacío. 

Otro problema es que palabras como; "De" y "de" son diferentes

Cómo lo solucionamos?

In [None]:
hamlet = sc.textFile("local/data/hamlet.txt")

counts = hamlet.flatMap(lambda line: line.split(" ")) \
      .filter(lambda x: x != '') \
      .map(lambda x: x.lower()) \
      .map(lambda word: (word, 1)) \
      .reduceByKey(lambda a, b: a + b)

top = counts.sortBy(lambda v: -v[1]).take(10)

for p in top:
	print (p[0] + ": %d" % p[1])

Cuáles son las diferencias entre los dos resultados obtenidos?

Y cuáles serán las palabras que menos se repiten?

In [None]:
top = counts.sortBy(lambda v: v[1]).take(10)

for p in top:
	print (p[0] + ": %d" % p[1])

###**Titanic**

El archivo titanic.csv recoge la información de los 2201 pasajeros del titanic. Para cada pasajero se dispone de la siguiente información.

* Clase: Tripulación, Primera, Segunda, Tercera
* Edad: Adulto, Niño
* Sexo: Hombre, Mujer
* Sobrevivió: Si, No

In [None]:
file = sc.textFile("local/data/titanic.csv")
file.take(5)

In [None]:
header = file.first()
data = file.filter(lambda row: row != header)
data.take(5) 

Usemos como clave la variable Clase para crear rdd que relacionen la clase con cada una de las demás variables

In [None]:
def rddEdad(a):
    data=a.split(";")
    return (data[0], data[1])

In [None]:
def rddSexo(a):
    data=a.split(";")
    return (data[0], data[2])

In [None]:
def rddSobrevivio(a):
    data=a.split(";")
    return (data[0], data[3])

In [None]:
edad=data.map(rddEdad)
edad.take(5)

In [None]:
sexo=data.map(rddSexo)
sexo.take(5)

In [None]:
sobrevivio=data.map(rddSobrevivio)
sobrevivio.take(5)

In [None]:
sobrevivio.distinct().collect()

**Cuántos pasajeros sobrevivieron por clase?**

In [None]:
from operator import add
sob=sobrevivio.map(lambda d: (d[0], 1 if d[1]=="Si" else 0))
num=sob.reduceByKey(add)
num.collect()

Y cómo contamos los que no sobrevivieron?

In [None]:
a=sobrevivio.countByValue().items()


In [None]:
list(a)[0]

In [None]:
list(a)[0][0]

In [None]:
list(a)[0][0][0]

In [None]:
a.items()

Calculemos el promedio de sobrevivientes por clase

In [None]:
sobrevivio.distinct().collect()

Creemos un RDD que permita contar los registros de cada clase y los registros de cada clase que sobrevivieron

In [None]:
contsob=sobrevivio.mapValues(lambda x: (1, 1 if x=="Si" else 0))
contsob.distinct().collect()

para calcular el promedio de sobrevivientes por clase primero sumamos todos los registros de cada clase y todos los sobrevivientes de cada clase

In [None]:
suma=contsob.reduceByKey(lambda x,y: (x[0] + y[0],x[1]+y[1]))
suma.collect()

Ahora calculemos el promedio

In [None]:
prom=suma.mapValues(lambda x: round(100*(x[1]/x[0]),1))
prom.collect()

**Qué pasó con los adultos de tercera clase?**

Primero creemos un solo RDD con la información necesaria

In [None]:
def rddTitanic(a):
    data=a.split(";")
    return (data[0], data[1], data[2], data[3])

In [None]:
titanic=data.map(rddTitanic)
titanic.take(5)

In [None]:
at=titanic.filter(lambda d: d[0] == "Tercera" and d[1]=="Adulto")
at.take(5)

In [None]:
at.count()

In [None]:
titanic.count()

In [None]:
ats=at.map(lambda d: (d[2], 1 if d[3]=="Si" else 0))
ats.take(5)

In [None]:
ats.distinct().collect()

In [None]:
num=ats.reduceByKey(add)
num.collect()


De los adultos de tercera clase sobrevivieron 76 mujeres y 75 hombres.

Pero cuál era el total de adultos de tercera clase?

In [None]:
hombres = sc.accumulator(0)
mujeres = sc.accumulator(0) 
ats.filter(lambda k: k[0]=="Hombre").foreach(lambda k: hombres.add(1))
ats.filter(lambda k: k[0]=="Mujer").foreach(lambda k: mujeres.add(1))

print("La cantidad de hombres adultos en el titanic es de ",hombres.value)
print("La cantidad de mujeres adultas en el titanic es de ",mujeres.value)


Que tal una forma más sencilla de resolverlo

In [None]:
a=ats.countByKey()

In [None]:
type(a)

**Al momento del rescate se cumplió con la premisa "Mujeres y niños primero"**

Calculemos las mujeres que sobrevivieron

In [None]:
titanic.take(2)

In [None]:
sobs=titanic.map(lambda x: (x[2], int (x[3]=="Si")))
sobs.distinct().collect()

In [None]:
num=sobs.reduceByKey(add)
num.collect()

Ahora calculemos los niños que sobrevivieron

In [None]:
titanic.map(lambda x: (x[1], int (x[3]=="Si"))).reduceByKey(add).collect()

Tenemos el total de mujeres que sobrevivieron y el total de niños que sobrevivieron. 

Cómo calculamos el total de mujeres y niños que sobrevivieron?

In [None]:
def rddMN(a):
    if (a[1]=="Ni�o" or a[2]=="Mujer"):
      return ("Mujeres y niños", int (a[3]=="Si"))
    else:
      return ("Otros", int (a[3]=="Si"))

In [None]:
mn=titanic.map(rddMN)
mn.distinct().collect()

In [None]:
mn.reduceByKey(add).collect()

Que se puede concluir con este resultado?

In [None]:
mn.countByKey().items()

Esto puede aclarar el resultado

In [None]:
a=mn.countByValue().items()

In [None]:
type(a)

#**Ejercicios**

Se dispone de un dataset que contiene información relacionada con el hurto a personas en Colombia, son mas de 100.000 casos de hurtos cometidos en el país en la última época

Cada registro presenta la siguiente información

* Departamento	
* Municipio	
* Día 	
* Hora 	
* Zona (urbana, rural)	
* Arma empleada	
* Movil agresor	
* Movil víctima	
* Edad víctima	
* Sexo víctima


Utilizando RDDs, resuelva a las siguientes inquietudes:

1. Top 10 de los municipios de Antioquia que presentan mayor y menor número de hurtos
2. Tipos de armas más utilizadas en zona urbana y rural
3. Promedio de edad de las víctimas por departamento
4. Tipo de vehículo más utilizado para los hurtos en los fines de semana
5. Promedio de casos de hurto por sexo para cada día de la semana
6. Municipio de Colombia que presenta mayor número de hurtos a mujeres mayores de 40 años