# Lección IV

#### _<font color=blue>  Objetivo de la lección: </font>_

<font color=blue>
  Manejo Masivo de Datos: Pyspark. 
</font>

## Introducción

**Apache Spark** se está convirtiendo rápidamente en una de las mejores plataformas de análisis de datos de código abierto.

Con la mejora continua de **Apache Spark**, especialmente el motor **SQL** y el surgimiento de proyectos relacionados,  estamos comenzando a obtener la funcionalidad de análisis de datos que teníamos en configuraciones de una sola máquina utilizando RDBMS y bibliotecas de análisis de datos como **Pandas**.

**Pandas**, una herramienta de análisis de datos para el lenguaje de programación Python, es actualmente la herramienta de análisis de datos abierta más popular y madura. La biblioteca está altamente optimizada para el rendimiento, con rutas de código críticas escritas en Cython o C.

### Pandas vs Spark

Hemos usado **Pandas** para ordenar  nuestros datos de manera directa, facil y eficiente. Pandas nos ha ayudado a limpiar, transformar, manipular y analizar los datos. El punto de introducir **Spark**, es que se vuelve mejor que  **Pandas** cuando la cantidad de datos no es _grande_ sino _gigante_.

Hay una gran diferencia entre la cantidad de  datos grande (_Large_)  y la cantidad de datos gigante (_Big_). Y aunque estas medidas relativas algunos autores llama _big data_ a datos que ocupan mas de 100 GB.

Los numeros exactos de cuando usar uno o el otro varian dependiendo del tipo de version de software y hardware con que se cuenta. Aunque algunas pruebas concluyen que conjunto de datos con menos de 10 millones de filas (<5 GB de archivo) no se debe analizar con Spark, mientras que otras sugieren usar Spark solo cuando los datos sobrepasan los 200 GB. 

### ¿Por qué usar PySpark?

Al utilizar Spark, la mayoría de los ingenieros de datos recomienda desarrollarlo en Scala (que es el lenguaje Spark "nativo") o en Python a través de la API completa de PySpark.

Hemos visto que Python es flexible, robusto, fácil de aprender y se beneficia de la gran cantidad de bibliotecas que hay. Para nosotros Python es el lenguaje perfecto para la creación de prototipos en los campos de Big Data / Machine Learning.

Al igual que en las lecciones pasadas implementamos un proyecto concreto  de ciencia de datos.

 ##  Accidentes cerebrovasculares: El conjunto de datos
 
 Spark es un proyecto de código abierto de Apache. También es el motor de análisis más utilizado para el big data y el aprendizaje automático.

Esta lección se centrará en un inicio rápido para desarrollar un algoritmo de predicción con Spark.

Elegí el conjunto de datos "Healthcare Dataset Stroke Data" para trabajar con kaggle.com, la comunidad de científicos de datos y aprendizaje automático más grande del mundo.

## Accidentes cerebrovasculares

Según la Organización Mundial de la Salud, la cardiopatía isquémica y el accidente cerebrovascular representan las mayores causas de muerte en el mundo.

Información del sitio oficial: http://www.who.int/news-room/fact-sheets/detail/the-top-10-causes-of-death

Lo que debemos hacer es predecir la probabilidad de accidente cerebrovascular utilizando la información dada de los pacientes. Es un problema de clasificación, en el que intentaremos predecir la probabilidad de que una observación pertenezca a una categoría (en nuestro caso, la probabilidad de sufrir un accidente cerebrovascular).

Como hemos visto existe una amplia gamma de para resolver los problemas de clasificación. Nos restringimos al algoritmo de Árbol de decisiones.


### Configurando Spark y cargando los datos

Para correr en **Pyspark** usamos uno de los
__[contenedores de docker](https://github.com/jupyter/docker-stacks)__ para Spark y Python. 

_Docker es una herramienta diseñada para facilitar la creación, implementación y ejecución de aplicaciones mediante el uso de contenedores._ 

In [78]:
from pyspark.sql import SparkSession
import pyspark.sql as sparksql
spark = SparkSession.builder.appName('stroke').getOrCreate()
entrena = spark.read.csv('../data/train_2v.csv', inferSchema=True,header=True)

### Exploración de datos
La primera operación que se realiza después de importar datos es obtener información sobre su aspecto. Se puede hacer con los siguientes comandos:

_df.printSchema()_

_df.describe()_

In [79]:
entrena.printSchema()

root
 |-- id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [80]:
entrena.describe()

DataFrame[summary: string, id: string, gender: string, age: string, hypertension: string, heart_disease: string, ever_married: string, work_type: string, Residence_type: string, avg_glucose_level: string, bmi: string, smoking_status: string, stroke: string]

In [81]:
entrena.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|  783|
|     0|42617|
+------+-----+



Como se puede ver en esta observación. Este es un conjunto de datos desequilibrado, donde el número de observaciones que pertenecen a una clase es significativamente menor que las que pertenecen a las otras clases. En este caso, el modelo predictivo podría ser parcial e inexacto. Existen diferentes estrategias para manejar los conjuntos de datos desequilibrados. Para encontrar más información sobre el conjunto de datos desequilibrado:

https://www.analyticsvidhya.com/blog/2017/03/imbalanced-classification-problem/

Aquí tenemos mediciones clínicas (por ejemplo, hipertensión, enfermedad cardíaca, edad, antecedentes familiares de la enfermedad) para varios pacientes, así como información sobre si cada paciente ha tenido un accidente cerebrovascular. En la práctica, queremos que este método prediga con precisión el riesgo de accidente cerebrovascular para futuros pacientes en función de sus mediciones clínicas.

<img style="float: center;" src="../images/kaggle-table.png">

## Spark SQL
Spark SQL es un módulo de Spark para el procesamiento de datos estructurados. Internamente, Spark SQL utiliza esta información adicional para realizar optimizaciones adicionales. 

### SQL
Un uso de Spark SQL es ejecutar consultas SQL. Spark SQL también se puede utilizar para leer datos de una instalación existente de Hive. Para obtener más información sobre cómo configurar esta función, consulte la sección Tablas de Hive. Al ejecutar SQL desde otro lenguaje de programación, los resultados se devolverán como un conjunto de datos / marco de datos. También puede interactuar con la interfaz SQL utilizando la línea de comandos o sobre JDBC / ODBC.

## Spark SQL

### Dataframes

Un DataFrame es un Dataset organizado en columnas con nombre. Es similar al Dataframe de **Pandas**. Conceptualmente es equivalente a una tabla en una base de datos relacional o un marco de datos en R / Python, pero con optimizaciones más ricas. Los DataFrames se pueden construir a partir de una amplia gama de fuentes, tales como: archivos de datos estructurados, tablas en Hive, bases de datos externas o RDD existentes. 

_**PySpark** y **Pandas** refieren su estructura de datos como 'DataFrames' pero son diferentes plataformas en tiempo de ejecución._

### Análisis

Realizar un breve análisis utilizando operaciones básicas. Es posible hacerlo de varias maneras:

Los DataFrames proporcionan un lenguaje específico del dominio para la manipulación de datos estructurados, el acceso a las columnas de un DataFrame puede ser por atributo o por indexación para ejecutar consultas SQL mediante programación y devolver el resultado como un DataFrame

Por ejemplo, para ver qué tipo de trabajo tiene más casos de accidentes cerebrovasculares, podemos hacer lo siguiente:

In [82]:
# crear un DataFrame como una vista temporal
entrena.createOrReplaceTempView('table')

### Análisis Exploratorio: Consultas SQL

En spark podemos hecer nuestros analisis exploratorios de los datos mediante consultas SQL  (expresiones formales en el lenguaje SQL).

 SQL es un lenguaje de administración de bases de datos para bases de datos relacionales.

In [83]:
spark.sql("SELECT work_type, count(work_type) as work_type_count \
           FROM table WHERE stroke == 1 GROUP BY work_type \
           ORDER BY work_type_count DESC").show()

+-------------+---------------+
|    work_type|work_type_count|
+-------------+---------------+
|      Private|            441|
|Self-employed|            251|
|     Govt_job|             89|
|     children|              2|
+-------------+---------------+



Parece que la ocupación privada es el tipo de trabajo más peligroso en este conjunto de datos.

Averigüemos quiénes participaron en esta medición clínica. 

In [84]:
spark.sql("SELECT gender, count(gender) as count_gender,\
           count(gender)*100/sum(count(gender)) over() as percent \
           FROM table GROUP BY gender").show()

+------+------------+-------------------+
|gender|count_gender|            percent|
+------+------------+-------------------+
|Female|       25665|  59.13594470046083|
| Other|          11|0.02534562211981567|
|  Male|       17724|  40.83870967741935|
+------+------------+-------------------+



 El 59% de todas las personas son mujeres y solo el 40% son hombres que participaron en la investigación de accidentes cerebrovasculares:

A partir de esta información, existe la posibilidad de recuperar información sobre la cantidad de mujeres / hombres que tienen un ataque (cerebrovascular). La siguiente funcion nos devuelve el porcentaje de un genero dado

In [85]:
def porcentaje_ataque(genero='Male'):
    genero = "'"+genero+"'"    
    spark.sql("SELECT gender, count(gender),\
           (COUNT(gender) * 100.0) /(SELECT count(gender) \
            FROM table WHERE gender == "+genero+") as percentage \
            FROM table WHERE stroke = '1' \
            and gender = "+genero+" GROUP BY gender").show()
    return    

In [86]:
porcentaje_ataque('Male')
porcentaje_ataque('Female')

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|  Male|          352|1.98600767321146|
+------+-------------+----------------+

+------+-------------+----------------+
|gender|count(gender)|      percentage|
+------+-------------+----------------+
|Female|          431|1.67932982661212|
+------+-------------+----------------+



1,68% Mujeres y casi 2% Hombres han tenido un accidente cerebrovascular.


También podemos ver si la edad influye en el accidente cerebrovascular y cuál es el riesgo por edad.

In [87]:
spark.sql("SELECT age, count(age) as age_count \
          FROM table WHERE stroke == 1 GROUP BY age \
          ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|79.0|       70|
|78.0|       57|
|80.0|       49|
|81.0|       43|
|82.0|       36|
|70.0|       25|
|74.0|       24|
|77.0|       24|
|76.0|       24|
|67.0|       23|
|75.0|       23|
|72.0|       21|
|69.0|       20|
|59.0|       20|
|68.0|       20|
|71.0|       19|
|57.0|       19|
|63.0|       18|
|65.0|       18|
|66.0|       17|
+----+---------+
only showing top 20 rows



Podemos usar la operación de filtro para calcular el número de casos de accidente cerebrovascular para personas después de 50 años.

In [88]:
entrena.filter((entrena['stroke'] == 1) & (entrena['age'] > '50')).count()

708

Como podemos ver, la edad es un factor de riesgo importante para desarrollar un derrame cerebral.

### Limpieza de Datos
El siguiente paso de la exploración es tratar con los valores categóricos y los valores faltantes. Hay valores faltantes para smoking_status y bmi.

Rellenamos smoking_status con un valor de 'Sin información'

In [89]:
entrena_f = entrena.na.fill('No Info', subset=['smoking_status'])

 Rellenamos el parámetro bmi con el valor medio.

In [90]:
from pyspark.sql.functions import mean
promedio = entrena_f.select(mean(entrena_f['bmi'])).collect()
promedio_bmi = promedio[0][0]
entrena_f = entrena_f.na.fill(promedio_bmi,['bmi'])

La mayoría de los algoritmos ML no pueden trabajar directamente con datos categóricos. La codificación permite que los algoritmos que esperan que las características continuas usen características categóricas.

StringIndexer -> OneHotEncoder -> VectorAssembler


In [91]:
from pyspark.ml.feature import (VectorAssembler,OneHotEncoder,
                                StringIndexer)

In [92]:
indexador_genero = StringIndexer(inputCol='gender', outputCol='genderIndex')
codificador_genero = OneHotEncoder(inputCol='genderIndex', outputCol='genderVec')

In [106]:
indexador_casado = StringIndexer(inputCol='ever_married', outputCol='ever_marriedIndex') 
codificador_casado = OneHotEncoder(inputCol='ever_marriedIndex', outputCol='ever_marriedVec')

In [107]:
indexador_tipo_trabajo = StringIndexer(inputCol='work_type', outputCol='work_typeIndex')
codificador_tipo_trabajo = OneHotEncoder (inputCol='work_typeIndex', outputCol='work_typeVec')

In [108]:
indexador_residencia = StringIndexer(inputCol='Residence_type', outputCol='Residence_typeIndex') 
codificador_residencia = OneHotEncoder (inputCol='Residence_typeIndex', outputCol='Residence_typeVec')

In [109]:
indexador_status_fumador = StringIndexer(inputCol='smoking_status', outputCol='smoking_statusIndex')
codificador_status_fumador = OneHotEncoder (inputCol='smoking_statusIndex', outputCol='smoking_statusVec')

No es necesario saber cuántas categorías hay de antemano  en una característica, la combinación de StringIndexer y OneHotEncoder se encargan de ello.

El siguiente paso es crear un ensamblador, que combine una lista dada de columnas en una columna de un solo vector para entrenar el modelo ML. Usamos las columnas de vectores, que obtuvimos después de OneHotEncoding.

In [110]:
ensamblador = VectorAssembler(inputCols=['genderVec',
 'age',
 'hypertension',
 'heart_disease',
 'ever_marriedVec',
 'work_typeVec',
 'Residence_typeVec',
 'avg_glucose_level',
 'bmi',
 'smoking_statusVec'],outputCol='features')

Luego crearemos un objeto DecisionTree. Para hacer esto necesitamos importar DecisionTreeClassifier.

In [111]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol='stroke',featuresCol='features')

Hasta ahora tenemos una tarea compleja que contiene un montón de etapas, que deben realizarse para procesar los datos. Para envolver todo eso, Spark ML representa un flujo de trabajo como Pipeline, que consiste en una secuencia de PipelineStages que se ejecutarán en un orden específico.

In [119]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexador_genero, indexador_casado,\
                            indexador_tipo_trabajo, indexador_residencia, 
                            indexador_status_fumador,\
                            codificador_genero, codificador_casado, \
                            codificador_tipo_trabajo, codificador_residencia, \
                            codificador_status_fumador , \
                            assembler, dtc \
                           ])

El siguiente paso es dividir el conjunto de datos para entrenar y probar.

In [113]:
entrena_data,prueba_data = entrena_f.randomSplit([0.7,0.3])

Lo que voy a hacer ahora es ajustar el modelo. Para esto usaré el pipeline que fue creado y train_data

In [114]:
modelo = pipeline.fit(entrena_data)

Después de eso transforma los test_data.

In [117]:
dtc_predicciones = modelo.transform(prueba_data)

Es ahora, el momento de evaluar un modelo

In [118]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Select (prediction, true label) and compute test error
acc_evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction",\
                                                  metricName="accuracy")
dtc_acc = acc_evaluator.evaluate(dtc_predicciones)
print('A Decision Tree algorithm had an accuracy of: {0:2.2f}%'.format(dtc_acc*100))

A Decision Tree algorithm had an accuracy of: 98.04%


Un algoritmo de Árbol de Decisión tenía una precisión de: 98.04%

Como se definió al principio, el modelo predictivo de un conjunto de datos desequilibrado podría ser con una precisión engañosa.

### Conclusiones
Hemos discutido las ventajas y desventajas de usar Pyspark o Pandas.  Hemos trabajado el ducto de ciencias de datos para el conjunto de datos para el caso de accidentes cerebrovasculares