# Lección IV

#### _<font color=blue>  Objetivo de la lección: </font>_

<font color=blue>
  Manejo Masivo de Datos: Pyspark. 
</font>

## Introducción

**Apache Spark** se está convirtiendo rápidamente en una de las mejores plataformas de análisis de datos de código abierto.

Con la mejora continua de **Apache Spark**, especialmente el motor **SQL** y el surgimiento de proyectos relacionados,  estamos comenzando a obtener la funcionalidad de análisis de datos que teníamos en configuraciones de una sola máquina utilizando RDBMS y bibliotecas de análisis de datos como **Pandas**.

**Pandas**, una herramienta de análisis de datos para el lenguaje de programación Python, es actualmente la herramienta de análisis de datos abierta más popular y madura. La biblioteca está altamente optimizada para el rendimiento, con rutas de código críticas escritas en Cython o C.

### Pandas vs Spark

Hemos usado **Pandas** para ordenar  nuestros datos de manera directa, facil y eficiente. Pandas nos ha ayudado a limpiar, transformar, manipular y analizar los datos. El punto de introducir Spark, es que este se vuelve mejor que **Pandas** cuando la cantidad de datos no es _grande_ sino _gigante_.

Hay una gran diferencia entre la cantidad de  datos grande (_Large_)  y la cantidad de datos gigante (_Big_). Y aunque estas medidas relativas algunos autores llama _big data_ a datos que ocupan mas de 100 GB.

Los numeros exactos de cuando usar uno o el otro varian dependiendo del tipo de version de software y hardware con que se cuenta. Aunque algunas pruebas concluyen que conjunto de datos con menos de 10 millones de filas (<5 GB de archivo) no se debe analizar con Spark, mientras que otras sugieren usar Spark solo cuando los datos sobrepasan los 200 GB. 

### ¿Por qué usar PySpark en un cuaderno Jupyter?

Al utilizar Spark, la mayoría de los ingenieros de datos recomienda desarrollarlo en Scala (que es el lenguaje Spark "nativo") o en Python a través de la API completa de PySpark.

Hemos visto que Python es flexible, robusto, fácil de aprender y se beneficia de la gran cantidad de bibliotecas que hay. Para nosotros Python es el lenguaje perfecto para la creación de prototipos en los campos de Big Data / Machine Learning.

Al igual que en las lecciones pasadas implementamos un proyecto concreto  de ciencia de datos.

 ##  Accidentes cerebrovasculares: El conjunto de datos
 
 Spark es un proyecto de código abierto de Apache. También es el motor de análisis más utilizado para el big data y el aprendizaje automático.

Esta publicación se centrará en un inicio rápido para desarrollar un algoritmo de predicción con Spark.

Elegí el conjunto de datos "Healthcare Dataset Stroke Data" para trabajar con kaggle.com, la comunidad de científicos de datos y aprendizaje automático más grande del mundo.

## Contenido:

Según la Organización Mundial de la Salud, la cardiopatía isquémica y el accidente cerebrovascular representan las mayores causas de muerte en el mundo.

Información del sitio oficial: http://www.who.int/news-room/fact-sheets/detail/the-top-10-causes-of-death

Lo que debemos hacer es predecir la probabilidad de accidente cerebrovascular utilizando la información dada de los pacientes. Es un problema de clasificación, en el que intentaremos predecir la probabilidad de que una observación pertenezca a una categoría (en nuestro caso, la probabilidad de sufrir un accidente cerebrovascular).

Como hemos visto existe una amplia gamma de para resolver los problemas de clasificación. Nos restringimos al algoritmo de Árbol de decisiones.


### Configurando Spark y cargando los datos

Para correr en **Pyspark** usamos uno de los
__[contenedores de docker](https://github.com/jupyter/docker-stacks)__ para Spark y Python. 

_Docker es una herramienta diseñada para facilitar la creación, implementación y ejecución de aplicaciones mediante el uso de contenedores._ 

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
spark = SparkSession.builder.appName('stroke').getOrCreate()
entrena = spark.read.csv('../data/train_2v.csv', inferSchema=True,header=True)

### Exploración de datos
La primera operación que se realiza después de importar datos es obtener información sobre su aspecto. Se puede hacer con los siguientes comandos:

_df.printSchema()_

_df.describe()_

In [3]:
entrena.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [4]:
entrena.describe()

DataFrame[summary: string, id: string, gender: string, age: string, hypertension: string, heart_disease: string, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: string, bmi: string, smoking_status: string, stroke: string]

In [5]:
entrena.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



Como se puede ver en esta observación. Este es un conjunto de datos desequilibrado, donde el número de observaciones que pertenecen a una clase es significativamente menor que las que pertenecen a las otras clases. En este caso, el modelo predictivo podría ser parcial e inexacto. Existen diferentes estrategias para manejar los conjuntos de datos desequilibrados, por lo tanto, está fuera del alcance de esta publicación, en cambio me centraré en Spark. Para encontrar más información sobre el conjunto de datos desequilibrado:

https://www.analyticsvidhya.com/blog/2017/03/imbalanced-classification-problem/

Aquí tenemos mediciones clínicas (por ejemplo, hipertensión, enfermedad cardíaca, edad, antecedentes familiares de la enfermedad) para varios pacientes, así como información sobre si cada paciente ha tenido un accidente cerebrovascular. En la práctica, queremos que este método prediga con precisión el riesgo de accidente cerebrovascular para futuros pacientes en función de sus mediciones clínicas.

<img style="float: center;" src="../images/kaggle-table.png">

### Análisis

Realizar un breve análisis utilizando operaciones básicas. Es posible hacerlo de varias maneras:

Los DataFrames proporcionan un lenguaje específico del dominio para la manipulación de datos estructurados, el acceso a las columnas de un DataFrame puede ser por atributo o por indexación para ejecutar consultas SQL mediante programación y devolver el resultado como un DataFrame
Por ejemplo, para ver qué tipo de trabajo tiene más casos de accidente cerebrovascular, podemos hacer lo siguiente:

In [17]:
# crear un DataFrame como una vista temporal
entrena.createOrReplaceTempView('table')

## Spark SQL
Spark SQL es un módulo de Spark para el procesamiento de datos estructurados. Internamente, Spark SQL utiliza esta información adicional para realizar optimizaciones adicionales. 

### SQL
Un uso de Spark SQL es ejecutar consultas SQL. Spark SQL también se puede utilizar para leer datos de una instalación existente de Hive. Para obtener más información sobre cómo configurar esta función, consulte la sección Tablas de Hive. Al ejecutar SQL desde otro lenguaje de programación, los resultados se devolverán como un conjunto de datos / marco de datos. También puede interactuar con la interfaz SQL utilizando la línea de comandos o sobre JDBC / ODBC.

### Dataframes

Un DataFrame es un Dataset organizado en columnas con nombre. Es similar al Dataframe de **Pandas**. Conceptualmente es equivalente a una tabla en una base de datos relacional o un marco de datos en R / Python, pero con optimizaciones más ricas bajo el capó. Los DataFrames se pueden construir a partir de una amplia gama de fuentes, tales como: archivos de datos estructurados, tablas en Hive, bases de datos externas o RDD existentes. 

_**PySpark** y **Pandas** refieren su estructura de datos como 'DataFrames' pero son diferentes plataformas en tiempo de ejecución._

### Análisis Exploratorio: Consultas SQL

En spark podemos hecer nuestros analisis exploratorios de los datos mediante consultas SQL  (expresiones formales en el lenguaje SQL).

 SQL es un lenguaje de administración de bases de datos para bases de datos relacionales.

In [19]:
spark.sql("SELECT work_type, count(work_type) as work_type_count \
           FROM table WHERE stroke == 1 GROUP BY work_type \
           ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+



Parece que la ocupación privada es el tipo de trabajo más peligroso en este conjunto de datos.

Averigüemos quiénes participaron en esta medición clínica.

In [20]:
spark.sql("SELECT gender, count(gender) as count_gender,\
           count(gender)*100/sum(count(gender)) over() as percent \
           FROM table GROUP BY gender").show()

+------+------------+-------------------+
|gender|count_gender|            percent|
+------+------------+-------------------+
|Female|       25665|  59.13594470046083|
| Other|          11|0.02534562211981567|
|  Male|       17724|  40.83870967741935|
+------+------------+-------------------+



In [10]:
sc.stop()
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print("Number of elements in RDD -> %i" % (counts))

NameError: name 'sc' is not defined

In [12]:
sc.stop()
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print("Elements in RDD -> %s" % (coll))

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [15]:
sc.stop()
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [17]:
sc.stop()
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print("Words got chached > %s" % (caching))

Words got chached > True


In [20]:
sc.stop()
from pyspark import SparkContext 
sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print("Stored data -> %s" % (data) )
elem = words_new.value[2] 
print("Printing a particular element in RDD -> %s" % (elem))

Stored data -> ['scala', 'java', 'hadoop', 'spark', 'akka']
Printing a particular element in RDD -> hadoop


In [22]:
sc.stop()
from pyspark import SparkContext 
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print("Accumulated value is -> %i" % (final))

Accumulated value is -> 150


En Apache Spark, puede cargar sus archivos usando sc.addFile (sc es su SparkContext predeterminado) y obtener la ruta de acceso de un trabajador que usa SparkFiles.get. Por lo tanto, SparkFiles resuelve las rutas a los archivos agregados a través de SparkContext.addFile ().

SparkFiles contiene los siguientes métodos de clase:

obtener (nombre de archivo)
getrootdirectory ()
Vamos a entenderlos en detalle.

obtener (nombre de archivo)
Especifica la ruta del archivo que se agrega a través de SparkContext.addFile ().

getrootdirectory ()
Especifica la ruta al directorio raíz, que contiene el archivo que se agrega a través del SparkContext.addFile ().

In [30]:
sc.stop()
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print("Absolute Path -> %s" % SparkFiles.get(finddistancename))

Py4JJavaError: An error occurred while calling o368.addFile.
: java.io.FileNotFoundException: File file:/home/hadoop/examples_pyspark/finddistance.R does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1544)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [28]:
sc.stop()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("local[*]")
sc = SparkContext(conf = conf)



Este punto de referencia comparará el rendimiento de esos marcos en tareas comunes de análisis de datos:
    Complejo donde las cláusulas
    Ordenar un conjunto de datos
    Unirse a un conjunto de datos
    Auto se une
    Agrupando los datos
    
Las pruebas se realizarán en el conjunto de datos de la placa de licencia de OpenData de Holanda (que se encuentra aquí) que tiene ~ 13.5 millones de registros, pero usé herramientas de línea de comando para dividirlo en archivos con 100k, 1m, 5m, 10m y ~ 13.5 líneas. Los tamaños de archivo correspondientes son: 49 MB, 480 MB, 2,4 GB, 4,7 GB y 6,5 GB en consecuencia. Para realizar la prueba de combinaciones, hice este pequeño conjunto de datos con ~ 100 filas.

El software subyacente es Python 3.5.3 / pandas 0.19.2 y Scala 2.10 / Spark 1.6.2 en una máquina con 32GB de RAM y 8 CPU. El modo local de Spark se realizó en la misma máquina con 32 GB de RAM y 8 CPU, mientras que el modo distribuido se realizó en el modo de hilado-cliente utilizando los blocs de notas de zeppelin en una configuración en 3 máquinas con las mismas especificaciones. En el modo local de Spark utilicé el sistema de archivos normal y en el modo distribuido utilicé HDFS.

Puedes encontrar el código Scala para Spark aquí. (* el código no es un archivo de Scala adecuado, pero puede copiarse simplemente en las celdas del cuaderno Zeppelin). El Código para Pandas (en python) está aquí.
Prueba 1: Complejo donde se encuentran las cláusulas.
Esperanzas de heredar:

Este es un caso en el que Spark debería brillar, ya que puede leer simultáneamente un conjunto de datos de varias máquinas.

Consulta: