# Resilient Distributed Datasets

Los Resilient Distributed Datasets(RDDs) son una coleccion distribuida de objetos JVM inmutables que nos permiten hacer calculos muy rapido y son la piedra angular de Spark. Como el nombre lo dice, el dataset esta distribuido, esta dividido en trozos y distribuido a nodos ejecutores. Esto nos permite hacer calculos a una gran velocidad. Algo muy bueno de los RDDs es que guardan un log de todas las tranformaciones aplicadas a cada trozo del dataset, esto nos permite saber que ha salido mal o si se pierde informacion. Es importante entender que los RDDs se ejecuta de manera paralela.

### Creando RDDs

Hay dos maneras de crear RDDs:


In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

import bokeh.charts as chrt
from bokeh.io import output_notebook

output_notebook()

from pyspark.sql.session import SparkSession
import pyspark.sql.types as typ
import sql
from IPython.display import display
import pyspark.mllib.stat as st
import numpy as np
import pyspark.mllib.linalg as ln
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg
from pyspark.mllib.classification \
    import LogisticRegressionWithLBFGS
import pyspark.mllib.evaluation as ev
from pyspark.mllib.tree import RandomForest
import pyspark.ml.feature as ft
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline
import pyspark.ml.evaluation as ev
from pyspark.ml import PipelineModel
import pyspark.ml.tuning as tune
import pyspark.sql.functions as func
import pyspark.ml.clustering as clus
import pyspark.ml.regression as reg

In [None]:
spark = SparkSession.builder.appName("Spark_2").getOrCreate()
sc = spark.sparkContext

In [None]:
# Paralelizando una lista
data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), ('Amber', 9)])

In [None]:
# o leyendolo desde un repo o una base de datos y archivos
# el archivo para este ejemplo lo puedes bajar aqui:
# http://tomdrabas.com/data/VS14MORT.txt.gz
data_from_file = sc.\
    textFile(
        './data/VS14MORT.DUSMCPUB', 
        4)

### Schema

Los RDDs son estuctura de datos schema-less

In [None]:
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()
data_heterogenous

In [None]:
# Puedes acceder a los objetos como lo harias normalmente en Python
data_heterogenous[1]['Porsche']

### Leyendo desde archivos

Cuando leemos desde un archivo de texto, cada fiala del archivo es un elemento de un RDD.

In [None]:
data_from_file.take(1)

### Funciones definidas por el usuario

Puedes crear un metodo largo para transformar tus datos usando expresiones lambda. Un ejemplo:

In [None]:
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]

    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''

    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs
#     return record_split.split(row)

O podemos usar el meteodo extractInformation(...):

In [None]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(1)

### Transformaciones (No la 4ta) 

#### .map(...)

Este metodo se aplica a cada elemento de nuestro RDD

In [None]:
data_2014 = data_from_file_conv.map(lambda row: int(row[16]))
data_2014.take(10)

Podemos combinar mas columnas

In [None]:
data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
data_2014_2.take(10)

#### .filter(...)

El metodo .filter(...) nos permite seleccionar elementos de nuestro dataset que cumplen con criterios especificos.

In [None]:
data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
data_filtered.count()

#### .flatMap(...)

El metodo .flatMap(...) funciona de manera similar a la de .map(...) pero regresa resultados planos en lugar de una lista.

In [None]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)

#### .distinct()

Este método devuelve una lista de valores distintos en una columna especificada.

In [None]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
distinct_gender

#### .sample(...)

El metodo .sample() nos devuelve un muestra randomizada de un dataset.

In [None]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)

data_sample.take(1)

In [None]:
# Vamos a comprobar que realmente tenemos 10% de los records.
print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

#### .leftOuterJoin(...)

Este metodo, al igual que el mundo SQL, une dos RDD en función de los valores encontrados en ambos conjuntos de datos, y devuelve los registros del RDD izquierdo con los registros del derecho adjunto donde coinciden los dos RDD.

In [None]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.take(5)

Si hubieramos usado el metodo .join(...) hubieramos obtenido solo los valores de 'a' y 'b' ya que estos dos valores se intersecan entre estos dos RDDs.

In [None]:
rdd4 = rdd1.join(rdd2)
rdd4.collect()

Otro metodo util es el metodo .intersection(...) que devuelve los registros que son iguales en ambos RDDs.

In [None]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

#### .repartition(...)

Volver a particionar el dataset cambia el número de particiones en las que se divide el conjunto de datos.

In [None]:
rdd1 = rdd1.repartition(4)

len(rdd1.glom().collect())

### Actions

#### .take(...)

El método devuelve n filas superiores de una sola partición de datos.

In [None]:
data_first = data_from_file_conv.take(1)
data_first

Si quieres datos relativamente randomizados puedes usar el metodo .takeSample(...)

In [None]:
data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
data_take_sampled

#### .reduce(...)

Otra acción que procesa tus datos, el método .reduce (...) reduce los elementos de un RDD utilizando un método específico.

In [None]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

Si la función de reducción no es asociativa y conmutativa, a veces obtendrá resultados erróneos según la forma en que se particionen sus datos.

In [None]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

Si tuviéramos que reducir los datos de una manera en que nos gustaría dividir el resultado actual por el siguiente, esperaríamos un valor de 10

In [None]:
works = data_reduce.reduce(lambda x, y: x / y)
works

Sin embargo, si tuviera que particionar los datos en 3 particiones, el resultado será incorrecto.

In [None]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

El metodo .reduceByKey(...) funciona en manera similar al metodo .reduce(...) pero realiza una reduccion en un key-by-key basis.

In [None]:
data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_key.reduceByKey(lambda x, y: x + y).collect()

#### .count()¶

El metodo .count() cuenta el numero de elementos en nustro RDD.

In [None]:
data_reduce.count()

Tiene el mismo efecto que el método a continuación, pero no requiere cambiar los datos al controlador.

In [None]:
len(data_reduce.collect()) # En terminos reales este 'metodo' no es correcto, es mejor utilizar el de arriba

Si tu dataset está en una forma de key-value, puede usar el método .countByKey () para obtener los conteos de keys distintas.

In [None]:
data_key.countByKey().items()

#### .saveAsTextFile(...)

Como el nombre lo sugiere, el metodo .saveAsTextFile() guarda el RDD como archivos de texto, en archivos separados.

In [None]:
data_key.saveAsTextFile('./data/data_key.txt')

Para volver a leerlo, debe analizarlo de nuevo, ya que, como antes, todas las filas se tratan como cadenas.

In [None]:
def parseInput(row):
    import re
    
    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc \
    .textFile('./data/data_key.txt') \
    .map(parseInput)
data_key_reread.collect()

#### .foreach(...)

Un método que aplica la misma función a cada elemento del RDD de manera iterativa.

In [None]:
def f(x): 
    print(x)

data_key.foreach(f)

# DataFrames

### Creando un DataFrame

En lugar de acceder al sistema de archivos, vamos a crear un DataFrame generando los datos. En este caso, primero crearemos el RDD stringRDD y luego lo convertiremos en un DataFrame cuando estemos leyendo stringJSONRDD usando spark.read.json.

In [None]:
# Generemos datos en JSON 
# Asi no tenemos que interactuar los archivos en el sistema
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [None]:
# Creamos un DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [None]:
# Creamos una tabla temporal
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [None]:
# DataFrame API
swimmersJSON.show()

In [None]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()

#### Infiriendo el Schema utlizando reflexion

Tenemos que notar que Spark esta infiriendo el scheme utlizando la reflexion, esto quiere decir que esta determinando el schema de la base de datos basandose en su revision de los datos JSON

In [None]:
# Print the schema
swimmersJSON.printSchema()

Notemos que Spark es capaz de determinar/inferir el schema cuando usamos .printSchema

Que tal si queremos especificar el schema de manera programatica aka con un script

In [None]:
from pyspark.sql.types import *

# Generamos nuestros datos CSV
# Lo hacemos asi para no accesar un archivo en la memoria
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# El schema esta encodeado en un string, utilizando StructType definimos el schema utilizando pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Aplicamos el schema al RDD y creamos al DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creamos una vista temporal utlizando el DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [None]:
# Imprimimos el schema
swimmers.printSchema()

#### Querying with SQL

Con DataFrames, podemos escribir nuestro querrys con Spark SQL

In [None]:
# Ejecutamos el SQL Querry y regresa los datos
spark.sql("select * from swimmers").show()

In [None]:
# Contamos las filas en SQL 
spark.sql("select count(1) from swimmers").show()

In [None]:
# Query id y age para nadadores con age=22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# Lo mismo de otra manera
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()

In [None]:
# En SQL
spark.sql("select id, age from swimmers where age = 22").show()

In [None]:
# Query name y eyecolor para nadadores con eye color comenzando con la letra 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

#### Querying with the DataFrame API

Con los DataFrames, podemos escribir nuestro queries utlizando el DataFrame API

In [None]:
# Muestra los valores 
swimmers.show()

In [None]:
# Utlizando el comando Databricks 'display' podemos ver los datos de manera facil
display(swimmers)

In [None]:
# Contamos las filas
swimmers.count()

In [None]:
# id, age donde age = 22
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# name, eyeColor donde eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

#### On-Time Flight Performance

Consultamos los retrasos de salida de vuelos por estado y ciudad al unirse al retraso de salida y unirse a los códigos del aeropuerto (para identificar estado y ciudad).

#### DataFrame Queries

Vamos a ejecutar flight performance usando DataFrames; primero construyamos los DataFrames de los datasets de origen.

In [None]:
# Paths
flightPerfFilePath = "./data/departuredelays.csv"
airportsFilePath = "./data/airport-codes-na.txt"

# Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Departure Delays dataset 
flightPerf.cache()

In [None]:
# Flight Delays by City and Origin Code (para Washington State)
spark.sql("select a.City, f.origin, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.State = 'WA' group by a.City, f.origin order by sum(f.delay) desc").show()

In [None]:
# Flight Delays por State (para US)
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()

# Preparando nuestros Datos para Modelarlo

#### Duplicados

Como manejamos los duplicados? Un ejemplo:

In [None]:
df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])

Hay duplicados?

In [None]:
print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))

Como podemos ver los nombres son diferentes, en este caso particular hay filas repetidas. Vamos a quitarlas usando el metodo .dropDuplicates(...)

In [None]:
df = df.dropDuplicates()
df.show()

Vamos a comprobar...

In [None]:
print('Count of ids: {0}'.format(df.count()))
print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))

No jalo... Volvamos a correr lo mismo pero agreguemos un parametro subset

In [None]:
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()

Ahora vamos a ver cuantos IDs unicos tenemos con el metodo .agg(...)

In [None]:
import pyspark.sql.functions as fn

df.agg(
    fn.count('id').alias('count'),
    fn.countDistinct('id').alias('distinct')
).show()

Un ID por fila

In [None]:
df.withColumn('new_id', fn.monotonically_increasing_id()).show()

#### Valores faltantes

Un ejemplo como el de arriba, pero con valores faltantes:

In [None]:
df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

Vamos a encontrar los valores faltantes con el siguiente codigo

In [None]:
df_miss.rdd.map(
    lambda row: (row['id'], sum([c == None for c in row]))
).collect()

Vamos a entender cuantos valores faltantes tenemos en las columnas

In [None]:
df_miss.where('id == 3').show()

Cual es el porcentaje de valores faltantes?

In [None]:
df_miss.agg(*[
    (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing')
    for c in df_miss.columns
]).show()

Vamos a quitar la columna de 'income' dado que tiene muchos valores faltantes

In [None]:
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income'])
df_miss_no_income.show()

Para quitar los valores faltantes usamos el metodo .dropna(...)

In [None]:
df_miss_no_income.dropna(thresh=3).show()

Vamos a calcular valores.

In [None]:
means = df_miss_no_income.agg(
    *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']
).toPandas().to_dict('records')[0]

means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

#### Outliers

Un ejemplo:

In [None]:
df_outliers = spark.createDataFrame([
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ], ['id', 'weight', 'height', 'age'])

Vamos a calcular los limites superiores e inferiores para cada etiqueta

In [None]:
cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
    quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]

El diccionario de límites contiene los límites inferior y superior de cada etiqueta

In [None]:
bounds

Vamos a marcarlos

In [None]:
outliers = df_outliers.select(*['id'] + [
    (
        (df_outliers[c] < bounds[c][0]) | 
        (df_outliers[c] > bounds[c][1])
    ).alias(c + '_o') for c in cols
])
outliers.show()

Tenemos dos valores atípicos en la función de peso y dos en la función de edad.

In [None]:
df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
df_outliers.filter('age_o').select('id', 'age').show()

#### Entendiendo nuestros datos

Vamos a convertir nuestros datos a un Spark DataFrame

In [None]:
import pyspark.sql.types as typ

In [None]:
fraud = sc.textFile('./data/ccFraud.csv.gz')
header = fraud.first()

fraud = fraud \
    .filter(lambda row: row != header) \
    .map(lambda row: [int(elem) for elem in row.split(',')])

Creamos un schema para nuestro DF

In [None]:
fields = [
    *[
        typ.StructField(h[1:-1], typ.IntegerType(), True)
        for h in header.split(',')
    ]
]

schema = typ.StructType(fields)

Creamos nuestro DF

In [None]:
fraud_df = spark.createDataFrame(fraud, schema)

Ahora con el DF creado vamos a explorar

In [None]:
fraud_df.printSchema()

Para las columnas categóricas contaremos las frecuencias de sus valores utilizando el método .groupby (...).

In [None]:
fraud_df.groupby('gender').count().show()

Para las características verdaderamente numéricas podemos usar el método .describe ().

In [None]:
numerical = ['balance', 'numTrans', 'numIntlTrans']

In [None]:
desc = fraud_df.describe(numerical)
desc.show()

A continuación indicamos cómo verificar la asimetría

In [None]:
fraud_df.agg({'balance': 'skewness'}).show()

#### Correlaciones
Calcular las correlaciones en PySpark es muy fácil una vez que sus datos están en un formulario DataFrame.

In [None]:
fraud_df.corr('balance', 'numTrans')

Creamos una matriz de correlaciones

In [None]:
n_numerical = len(numerical)

corr = []

for i in range(0, n_numerical):
    temp = [None] * i
    
    for j in range(i, n_numerical):
        temp.append(fraud_df.corr(numerical[i], numerical[j]))
    corr.append(temp)
    
corr

#### Histogramas

In [None]:
hists = fraud_df.select('balance').rdd.flatMap(lambda row: row).histogram(20)

Vamos a graficar

In [None]:
data = {
    'bins': hists[0][:-1],
    'freq': hists[1]
}

fig = plt.figure(figsize=(12,9))
ax = fig.add_subplot(1, 1, 1)
ax.bar(data['bins'], data['freq'], width=2000)
ax.set_title('Histogram of \'balance\'')

plt.savefig('B05793_05_22.png', dpi=300)

Otra manera de graficar usando Bokeh

In [None]:
b_hist = chrt.Bar(data, values='freq', label='bins', title='Histogram of \'balance\'')
chrt.show(b_hist)

In [None]:
data_driver = {'obs': fraud_df.select('balance').rdd.flatMap(lambda row: row).collect()}

In [None]:
fig = plt.figure(figsize=(12,9))
ax = fig.add_subplot(1, 1, 1)

ax.hist(data_driver['obs'], bins=20)
ax.set_title('Histogram of \'balance\' using .hist()')


plt.savefig('B05793_05_24.png', dpi=300)

In [None]:
b_hist_driver = chrt.Histogram(data_driver, values='obs', title='Histogram of \'balance\' using .Histogram()', bins=20)
chrt.show(b_hist_driver)

#### Interacciones entre etiquetas

Vamos a muestrear nuestro dataset de fraude al 1% dado el género como estratos.

In [None]:
data_sample = fraud_df.sampleBy('gender', {1: 0.0002, 2: 0.0002}).select(numerical)

In [None]:
data_multi = dict([
    (elem, data_sample.select(elem).rdd.flatMap(lambda row: row).collect()) 
    for elem in numerical
])

sctr = chrt.Scatter(data_multi, x='balance', y='numTrans')

chrt.show(sctr)

# Intro a MLib

In [None]:
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.StringType()),
    ('BIRTH_YEAR', typ.IntegerType()),
    ('BIRTH_MONTH', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('MOTHER_RACE_6CODE', typ.StringType()),
    ('MOTHER_EDUCATION', typ.StringType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('FATHER_EDUCATION', typ.StringType()),
    ('MONTH_PRECARE_RECODE', typ.StringType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_BMI_RECODE', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.StringType()),
    ('DIABETES_GEST', typ.StringType()),
    ('HYP_TENS_PRE', typ.StringType()),
    ('HYP_TENS_GEST', typ.StringType()),
    ('PREV_BIRTH_PRETERM', typ.StringType()),
    ('NO_RISK', typ.StringType()),
    ('NO_INFECTIONS_REPORTED', typ.StringType()),
    ('LABOR_IND', typ.StringType()),
    ('LABOR_AUGM', typ.StringType()),
    ('STEROIDS', typ.StringType()),
    ('ANTIBIOTICS', typ.StringType()),
    ('ANESTHESIA', typ.StringType()),
    ('DELIV_METHOD_RECODE_COMB', typ.StringType()),
    ('ATTENDANT_BIRTH', typ.StringType()),
    ('APGAR_5', typ.IntegerType()),
    ('APGAR_5_RECODE', typ.StringType()),
    ('APGAR_10', typ.IntegerType()),
    ('APGAR_10_RECODE', typ.StringType()),
    ('INFANT_SEX', typ.StringType()),
    ('OBSTETRIC_GESTATION_WEEKS', typ.IntegerType()),
    ('INFANT_WEIGHT_GRAMS', typ.IntegerType()),
    ('INFANT_ASSIST_VENTI', typ.StringType()),
    ('INFANT_ASSIST_VENTI_6HRS', typ.StringType()),
    ('INFANT_NICU_ADMISSION', typ.StringType()),
    ('INFANT_SURFACANT', typ.StringType()),
    ('INFANT_ANTIBIOTICS', typ.StringType()),
    ('INFANT_SEIZURES', typ.StringType()),
    ('INFANT_NO_ABNORMALITIES', typ.StringType()),
    ('INFANT_ANCEPHALY', typ.StringType()),
    ('INFANT_MENINGOMYELOCELE', typ.StringType()),
    ('INFANT_LIMB_REDUCTION', typ.StringType()),
    ('INFANT_DOWN_SYNDROME', typ.StringType()),
    ('INFANT_SUSPECTED_CHROMOSOMAL_DISORDER', typ.StringType()),
    ('INFANT_NO_CONGENITAL_ANOMALIES_CHECKED', typ.StringType()),
    ('INFANT_BREASTFED', typ.StringType())
]

schema = typ.StructType([
        typ.StructField(e[0], e[1], False) for e in labels
    ])

Vamos a cargar los datos

In [None]:
births = spark.read.csv('./data/births_train.csv.gz', 
                        header=True, 
                        schema=schema)

Diccionario de recodificacion

In [None]:
recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
    }
}

Nuestro objetivo es predecir si el 'INFANT_ALIVE_AT_REPORT' es 1 o 0. Por lo tanto, eliminaremos todas las características relacionadas con el bebé.

In [None]:
selected_features = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_BEFORE', 
    'CIG_1_TRI', 
    'CIG_2_TRI', 
    'CIG_3_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'MOTHER_DELIVERY_WEIGHT', 
    'MOTHER_WEIGHT_GAIN', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]

births_trimmed = births.select(selected_features)

Especificamos los métodos de recodificación.

In [None]:
import pyspark.sql.functions as func

def recode(col, key):        
    return recode_dictionary[key][col] 

def correct_cig(feat):
    return func \
        .when(func.col(feat) != 99, func.col(feat))\
        .otherwise(0)

rec_integer = func.udf(recode, typ.IntegerType())

Corregimos las características relacionadas con el número de cigarros fumados.

In [None]:
births_transformed = births_trimmed \
    .withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
    .withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
    .withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
    .withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))

Cual de las caracteristicas Yes/No/Unknown hay

In [None]:
cols = [(col.name, col.dataType) for col in births_trimmed.schema]

YNU_cols = []

for i, s in enumerate(cols):
    if s[1] == typ.StringType():
        dis = births.select(s[0]) \
            .distinct() \
            .rdd \
            .map(lambda row: row[0]) \
            .collect()

        if 'Y' in dis:
            YNU_cols.append(s[0])

Los DataFrames pueden transformar las funciones de forma masiva mientras selecciona las caracteristicas.

In [None]:
births.select([
        'INFANT_NICU_ADMISSION', 
        rec_integer(
            'INFANT_NICU_ADMISSION', func.lit('YNU')
        ) \
        .alias('INFANT_NICU_ADMISSION_RECODE')]
     ).take(5)

Transformamos todos los YNU_cols en uno usando una lista de transformaciones.

In [None]:
exprs_YNU = [
    rec_integer(x, func.lit('YNU')).alias(x) 
    if x in YNU_cols 
    else x 
    for x in births_transformed.columns
]

births_transformed = births_transformed.select(exprs_YNU)

Vamos a comprobar

In [None]:
births_transformed.select(YNU_cols[-5:]).show(5)

#### Exploramos los datos 

Vamos a usar el metodo colStats(...)

In [None]:
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
                'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
                'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
                'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
               ]

numeric_rdd = births_transformed\
                       .select(numeric_cols)\
                       .rdd \
                       .map(lambda row: [e for e in row])

mllib_stats = st.Statistics.colStats(numeric_rdd)

for col, m, v in zip(numeric_cols, 
                     mllib_stats.mean(), 
                     mllib_stats.variance()):
    print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))

Para las variables categóricas calcularemos las frecuencias de sus valores.

In [None]:
categorical_cols = [e for e in births_transformed.columns 
                    if e not in numeric_cols]

categorical_rdd = births_transformed\
                       .select(categorical_cols)\
                       .rdd \
                       .map(lambda row: [e for e in row])
            
for i, col in enumerate(categorical_cols):
    agg = categorical_rdd \
        .groupBy(lambda row: row[i]) \
        .map(lambda row: (row[0], len(row[1])))
        
    print(col, sorted(agg.collect(), 
                      key=lambda el: el[1], 
                      reverse=True))

#### Correlaciones

Que correlacion tienen nuestras caracteristicas.

In [None]:
corrs = st.Statistics.corr(numeric_rdd)

for i, el in enumerate(corrs > 0.5):
    correlated = [
        (numeric_cols[j], corrs[i][j]) 
        for j, e in enumerate(el) 
        if e == 1.0 and j != i]
    
    if len(correlated) > 0:
        for e in correlated:
            print('{0}-to-{1}: {2:.2f}' \
                  .format(numeric_cols[i], e[0], e[1]))


Vamos a quitar las que tienen alta correlacion

In [None]:
features_to_keep = [
    'INFANT_ALIVE_AT_REPORT', 
    'BIRTH_PLACE', 
    'MOTHER_AGE_YEARS', 
    'FATHER_COMBINED_AGE', 
    'CIG_1_TRI', 
    'MOTHER_HEIGHT_IN', 
    'MOTHER_PRE_WEIGHT', 
    'DIABETES_PRE', 
    'DIABETES_GEST', 
    'HYP_TENS_PRE', 
    'HYP_TENS_GEST', 
    'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_to_keep])

#### Pruebas estadisticas

Vamos a usar el test chi-square para ver si diferencias significantes para las variables categoricas

In [None]:
for cat in categorical_cols[1:]:
    agg = births_transformed \
        .groupby('INFANT_ALIVE_AT_REPORT') \
        .pivot(cat) \
        .count()    

    agg_rdd = agg \
        .rdd\
        .map(lambda row: (row[1:])) \
        .flatMap(lambda row: 
                 [0 if e == None else e for e in row]) \
        .collect()

    row_length = len(agg.collect()[0]) - 1
    agg = ln.Matrices.dense(row_length, 2, agg_rdd)
    
    test = st.Statistics.chiSqTest(agg)
    print(cat, round(test.pValue, 4))

#### Creamos un DataSet final

Vamos a crear un RDD de LabeledPoints con hashing para encodear 'BIRTH_PLACE'

In [None]:
hashing = ft.HashingTF(7)

births_hashed = births_transformed \
    .rdd \
    .map(lambda row: [
            list(hashing.transform(row[1]).toArray()) 
                if col == 'BIRTH_PLACE' 
                else row[i] 
            for i, col 
            in enumerate(features_to_keep)]) \
    .map(lambda row: [[e] if type(e) == int else e 
                      for e in row]) \
    .map(lambda row: [item for sublist in row 
                      for item in sublist]) \
    .map(lambda row: reg.LabeledPoint(
            row[0], 
            ln.Vectors.dense(row[1:]))
        )

#### Dividimos el dataset en Trainingset & Testste

In [None]:
births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

#### Vamos a predecir la supervivencia

Vamos a usar el algoritmo de regresion logistica

In [None]:
LR_Model = LogisticRegressionWithLBFGS \
    .train(births_train, iterations=10)

Vamos a usar el nuevo modelo para predecir las clases de nuestro testset

In [None]:
LR_results = (
        births_test.map(lambda row: row.label) \
        .zip(LR_Model \
             .predict(births_test\
                      .map(lambda row: row.features)))
    ).map(lambda row: (row[0], row[1] * 1.0))

Vamos a ver como le fue a nuestro modelo

In [None]:
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()

#### Las caracteristicas mas predecibles

Spark MLib nos permite hacer uso de un selector chi-square para seleccionar solo las caracteristicas mas predecibles

In [None]:
selector = ft.ChiSqSelector(4).fit(births_train)

topFeatures_train = (
        births_train.map(lambda row: row.label) \
        .zip(selector \
             .transform(births_train \
                        .map(lambda row: row.features)))
    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))

topFeatures_test = (
        births_test.map(lambda row: row.label) \
        .zip(selector \
             .transform(births_test \
                        .map(lambda row: row.features)))
    ).map(lambda row: reg.LabeledPoint(row[0], row[1]))

#### Random Forest 

Con lo anterior podemos hacer un modelo con random forest.

In [None]:
RF_model = RandomForest \
    .trainClassifier(data=topFeatures_train, 
                     numClasses=2, 
                     categoricalFeaturesInfo={}, 
                     numTrees=6,  
                     featureSubsetStrategy='all',
                     seed=666)

Vamos a evaluar como le fue al modelo

In [None]:
RF_results = (
        topFeatures_test.map(lambda row: row.label) \
        .zip(RF_model \
             .predict(topFeatures_test \
                      .map(lambda row: row.features)))
    )

RF_evaluation = ev.BinaryClassificationMetrics(RF_results)

print('Area under PR: {0:.2f}' \
      .format(RF_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(RF_evaluation.areaUnderROC))
RF_evaluation.unpersist()

Vamos a ver como le va a la regresion logistica con un numero menor de caracteristicas.

In [None]:
LR_Model_2 = LogisticRegressionWithLBFGS \
    .train(topFeatures_train, iterations=10)

LR_results_2 = (
        topFeatures_test.map(lambda row: row.label) \
        .zip(LR_Model_2 \
             .predict(topFeatures_test \
                      .map(lambda row: row.features)))
    ).map(lambda row: (row[0], row[1] * 1.0))

LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)

print('Area under PR: {0:.2f}' \
      .format(LR_evaluation_2.areaUnderPR))
print('Area under ROC: {0:.2f}' \
      .format(LR_evaluation_2.areaUnderROC))
LR_evaluation_2.unpersist()

# Intro al Paquete ML

In [None]:
labels = [
    ('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
    ('BIRTH_PLACE', typ.StringType()),
    ('MOTHER_AGE_YEARS', typ.IntegerType()),
    ('FATHER_COMBINED_AGE', typ.IntegerType()),
    ('CIG_BEFORE', typ.IntegerType()),
    ('CIG_1_TRI', typ.IntegerType()),
    ('CIG_2_TRI', typ.IntegerType()),
    ('CIG_3_TRI', typ.IntegerType()),
    ('MOTHER_HEIGHT_IN', typ.IntegerType()),
    ('MOTHER_PRE_WEIGHT', typ.IntegerType()),
    ('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
    ('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
    ('DIABETES_PRE', typ.IntegerType()),
    ('DIABETES_GEST', typ.IntegerType()),
    ('HYP_TENS_PRE', typ.IntegerType()),
    ('HYP_TENS_GEST', typ.IntegerType()),
    ('PREV_BIRTH_PRETERM', typ.IntegerType())
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

births = spark.read.csv('./data/births_transformed.csv.gz', 
                        header=True, 
                        schema=schema)

Vamos a crear transformadores

In [None]:
births = births \
    .withColumn(       'BIRTH_PLACE_INT', 
                births['BIRTH_PLACE'] \
                    .cast(typ.IntegerType()))

Transformador 1

In [None]:
encoder = ft.OneHotEncoder(
    inputCol='BIRTH_PLACE_INT', 
    outputCol='BIRTH_PLACE_VEC')

Vamos a crear un columna con todas las caracteristicas

In [None]:
featuresCreator = ft.VectorAssembler(
    inputCols=[
        col[0] 
        for col 
        in labels[2:]] + \
    [encoder.getOutputCol()], 
    outputCol='features'
)

#### Creando un estimador

Una vez mas usaremos la Regresion Logistica

In [None]:
logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='INFANT_ALIVE_AT_REPORT')

#### Creamos un pipeline

Ahora creamos un pipeline para nuestro modelo.

In [None]:
pipeline = Pipeline(stages=[
        encoder, 
        featuresCreator, 
        logistic
    ])

#### Hacemos fit de nuestro modelo

Vamos a hacer uso del metodo .randomSplit(...)

In [None]:
births_train, births_test = births \
    .randomSplit([0.7, 0.3], seed=666)

Ejecutamos el pipeline

In [None]:
model = pipeline.fit(births_train)
test_model = model.transform(births_test)

Como se hace el testing?

In [None]:
test_model.take(1)

#### Evaluacion

Como le fue a nuestro modelo

In [None]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

print(evaluator.evaluate(test_model, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

#### Guardando nuestro modelo

Spark nos deja guardar nuestros modelos

In [None]:
pipelinePath = './data/infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)

Lo podemos usar despues con el metodo .fit(...)

In [None]:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline \
    .fit(births_train)\
    .transform(births_test)\
    .take(1)

Tambien podemos guardar los modelos

In [None]:
modelPath = './data/infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)

loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)

#### Hyper tuning de parametros

Busqueda por grids/cuadriculas

A que parametros le vamos a aplicar el loop:

In [None]:
logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT')

grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,  
             [2, 10, 50]) \
    .addGrid(logistic.regParam, 
             [0.01, 0.05, 0.3]) \
    .build()

Vamos a comparar los modelos

In [None]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='INFANT_ALIVE_AT_REPORT')

La validacion

In [None]:
cv = tune.CrossValidator(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

Un PipeLine Transformador

In [None]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)

Ahora si podemos tratar de encontrar la combinacion optima de parametros para nuestro modelo.

In [None]:
cvModel = cv.fit(data_transformer.transform(births_train))

La variable cvModel nos va a devolver el mejor modelo para poderlo utilizar y compararlo

In [None]:
data_train = data_transformer \
    .transform(births_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

Qué parámetros tiene el mejor modelo?

In [None]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, 
       key=lambda el: el[1], 
       reverse=True)[0]

#### Train-Validation splitting

Vamos a usar el selector ChiSqSelector para seleccionar las 5 mejores caracteristicas, y asi limitamos la complejidad del modelo.

In [None]:
selector = ft.ChiSqSelector(
    numTopFeatures=5, 
    featuresCol=featuresCreator.getOutputCol(), 
    outputCol='selectedFeatures',
    labelCol='INFANT_ALIVE_AT_REPORT'
)

logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT',
    featuresCol='selectedFeatures'
)

pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)

El objeto TrainValidationSplit se genera de la misma manera que el modelo CrossValidator

In [None]:
tvs = tune.TrainValidationSplit(
    estimator=logistic, 
    estimatorParamMaps=grid, 
    evaluator=evaluator
)

Vamos a pasar los datos por nuestro modelo y calculamos los resultados

In [None]:
tvsModel = tvs.fit(
    data_transformer \
        .transform(births_train)
)

data_train = data_transformer \
    .transform(births_test)
results = tvsModel.transform(data_train)

print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, 
     {evaluator.metricName: 'areaUnderPR'}))

#### Nomas para probar

Vamos a extraer caracteristicas para Procesamiento de Lenguaje Natural(NLP)

In [None]:
text_data = spark.createDataFrame([
    ['''Machine learning can be applied to a wide variety 
        of data types, such as vectors, text, images, and 
        structured data. This API adopts the DataFrame from 
        Spark SQL in order to support a variety of data types.'''],
    ['''DataFrame supports many basic and structured types; 
        see the Spark SQL datatype reference for a list of 
        supported types. In addition to the types listed in 
        the Spark SQL guide, DataFrame can use ML Vector types.'''],
    ['''A DataFrame can be created either implicitly or 
        explicitly from a regular RDD. See the code examples 
        below and the Spark SQL programming guide for examples.'''],
    ['''Columns in a DataFrame are named. The code examples 
        below use names such as "text," "features," and "label."''']
], ['input'])


Tokenizamos el texto

In [None]:
tokenizer = ft.RegexTokenizer(
    inputCol='input', 
    outputCol='input_arr', 
    pattern='\s+|[,.\"]')

Como se ve el output?

In [None]:
tok = tokenizer \
    .transform(text_data) \
    .select('input_arr') 

tok.take(1)

Usemos el metodo StopWordsRemover(...)

In [None]:
stopwords = ft.StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), 
    outputCol='input_stop')

Y el output?

In [None]:
stopwords.transform(tok).select('input_stop').take(1)

Usamos el algorimot NGram y creamos un pipeline

In [None]:
ngram = ft.NGram(n=2, 
    inputCol=stopwords.getOutputCol(), 
    outputCol="nGrams")

pipeline = Pipeline(stages=[tokenizer, stopwords, ngram])

Ya con un pipeline hecho hacemos lo mismo de siempre, bueno parecido.

In [None]:
data_ngram = pipeline \
    .fit(text_data) \
    .transform(text_data)
    
data_ngram.select('nGrams').take(1)

Ahora si podemos hacer NLP

Vamos a discretizar variables continuas

In [None]:
x = np.arange(0, 100)
x = x / 100.0 * np.pi * 4
y = x * np.sin(x / 1.764) + 20.1234

schema = typ.StructType([
    typ.StructField('continuous_var', 
                    typ.DoubleType(), 
                    False
   )
])

data = spark.createDataFrame([[float(e), ] for e in y], schema=schema)

Usamos el modelo QuantileDiscretizer para dividir nuestra variable continua en 5 cubos 

In [None]:
discretizer = ft.QuantileDiscretizer(
    numBuckets=5, 
    inputCol='continuous_var', 
    outputCol='discretized')

In [None]:
data_discretized = discretizer.fit(data).transform(data)

data_discretized \
    .groupby('discretized')\
    .mean('continuous_var')\
    .sort('discretized')\
    .collect()

#### Estandarización de variables continuas.

Creamos una representación vectorial de nuestra variable continua

In [None]:
vectorizer = ft.VectorAssembler(
    inputCols=['continuous_var'], 
    outputCol= 'continuous_vec')

Creamos un normalizador y un pipeline

In [None]:
normalizer = ft.StandardScaler(
    inputCol=vectorizer.getOutputCol(), 
    outputCol='normalized', 
    withMean=True,
    withStd=True
)

pipeline = Pipeline(stages=[vectorizer, normalizer])
data_standardized = pipeline.fit(data).transform(data)

#### Clasificación

Ahora usamos RandomForestClassfier para modelar las posibilidades de supervivencia de un bebé.
Primero, convertimos las etiquetas a DoubleType

In [None]:
births = births.withColumn(
    'INFANT_ALIVE_AT_REPORT', 
    func.col('INFANT_ALIVE_AT_REPORT').cast(typ.DoubleType())
)

births_train, births_test = births \
    .randomSplit([0.7, 0.3], seed=666)

Contruimos el modelo

In [None]:
classifier = cl.RandomForestClassifier(
    numTrees=5, 
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')

pipeline = Pipeline(
    stages=[
        encoder,
        featuresCreator, 
        classifier])

model = pipeline.fit(births_train)
test = model.transform(births_test)

Vamos a comparar RandomForest contra LogisticRegression

In [None]:
evaluator = ev.BinaryClassificationEvaluator(
    labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, 
    {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, 
    {evaluator.metricName: "areaUnderPR"}))

Probemos que tal le iria a un arbol.

In [None]:
classifier = cl.DecisionTreeClassifier(
    maxDepth=5, 
    labelCol='INFANT_ALIVE_AT_REPORT')
pipeline = Pipeline(stages=[
        encoder,
        featuresCreator, 
        classifier]
)

model = pipeline.fit(births_train)
test = model.transform(births_test)

evaluator = ev.BinaryClassificationEvaluator(
    labelCol='INFANT_ALIVE_AT_REPORT')
print(evaluator.evaluate(test, 
     {evaluator.metricName: "areaUnderROC"}))
print(evaluator.evaluate(test, 
     {evaluator.metricName: "areaUnderPR"}))

#### Clustering

Ahora usaremos k-means para encontrar similitudes en los datos de nacimientos.

In [None]:
kmeans = clus.KMeans(k = 5, 
    featuresCol='features')

pipeline = Pipeline(stages=[
        encoder,
        featuresCreator, 
        kmeans]
)

model = pipeline.fit(births_train)

Habiendo estimado el modelo, veamos si podemos encontrar algunas diferencias entre los grupos.

In [None]:
test = model.transform(births_test)

test \
    .groupBy('prediction') \
    .agg({
        '*': 'count', 
        'MOTHER_HEIGHT_IN': 'avg'
    }).collect()

En NLP los problemas como la extracción de temas dependen de la agrupación en clústeres para detectar documentos con temas similares. Primero, vamos a crear nuestro conjunto de datos.

In [None]:
text_data = spark.createDataFrame([
    ['''To make a computer do anything, you have to write a 
    computer program. To write a computer program, you have 
    to tell the computer, step by step, exactly what you want 
    it to do. The computer then "executes" the program, 
    following each step mechanically, to accomplish the end 
    goal. When you are telling the computer what to do, you 
    also get to choose how it's going to do it. That's where 
    computer algorithms come in. The algorithm is the basic 
    technique used to get the job done. Let's follow an 
    example to help get an understanding of the algorithm 
    concept.'''],
    ['''Laptop computers use batteries to run while not 
    connected to mains. When we overcharge or overheat 
    lithium ion batteries, the materials inside start to 
    break down and produce bubbles of oxygen, carbon dioxide, 
    and other gases. Pressure builds up, and the hot battery 
    swells from a rectangle into a pillow shape. Sometimes 
    the phone involved will operate afterwards. Other times 
    it will die. And occasionally—kapow! To see what's 
    happening inside the battery when it swells, the CLS team 
    used an x-ray technology called computed tomography.'''],
    ['''This technology describes a technique where touch 
    sensors can be placed around any side of a device 
    allowing for new input sources. The patent also notes 
    that physical buttons (such as the volume controls) could 
    be replaced by these embedded touch sensors. In essence 
    Apple could drop the current buttons and move towards 
    touch-enabled areas on the device for the existing UI. It 
    could also open up areas for new UI paradigms, such as 
    using the back of the smartphone for quick scrolling or 
    page turning.'''],
    ['''The National Park Service is a proud protector of 
    America’s lands. Preserving our land not only safeguards 
    the natural environment, but it also protects the 
    stories, cultures, and histories of our ancestors. As we 
    face the increasingly dire consequences of climate 
    change, it is imperative that we continue to expand 
    America’s protected lands under the oversight of the 
    National Park Service. Doing so combats climate change 
    and allows all American’s to visit, explore, and learn 
    from these treasured places for generations to come. It 
    is critical that President Obama acts swiftly to preserve 
    land that is at risk of external threats before the end 
    of his term as it has become blatantly clear that the 
    next administration will not hold the same value for our 
    environment over the next four years.'''],
    ['''The National Park Foundation, the official charitable 
    partner of the National Park Service, enriches America’s 
    national parks and programs through the support of 
    private citizens, park lovers, stewards of nature, 
    history enthusiasts, and wilderness adventurers. 
    Chartered by Congress in 1967, the Foundation grew out of 
    a legacy of park protection that began over a century 
    ago, when ordinary citizens took action to establish and 
    protect our national parks. Today, the National Park 
    Foundation carries on the tradition of early park 
    advocates, big thinkers, doers and dreamers—from John 
    Muir and Ansel Adams to President Theodore Roosevelt.'''],
    ['''Australia has over 500 national parks. Over 28 
    million hectares of land is designated as national 
    parkland, accounting for almost four per cent of 
    Australia's land areas. In addition, a further six per 
    cent of Australia is protected and includes state 
    forests, nature parks and conservation reserves.National 
    parks are usually large areas of land that are protected 
    because they have unspoilt landscapes and a diverse 
    number of native plants and animals. This means that 
    commercial activities such as farming are prohibited and 
    human activity is strictly monitored.''']
], ['documents'])

Primero, volveremos a utilizar los modelos RegexTokenizer y StopWordsRemover.

In [None]:
tokenizer = ft.RegexTokenizer(
    inputCol='documents', 
    outputCol='input_arr', 
    pattern='\s+|[,.\"]')

stopwords = ft.StopWordsRemover(
    inputCol=tokenizer.getOutputCol(), 
    outputCol='input_stop')

Ahora usamos CountVectorizer

In [None]:
stringIndexer = ft.CountVectorizer(
    inputCol=stopwords.getOutputCol(), 
    outputCol="input_indexed")

tokenized = stopwords \
    .transform(
        tokenizer\
            .transform(text_data)
    )
    
stringIndexer \
    .fit(tokenized)\
    .transform(tokenized)\
    .select('input_indexed')\
    .take(2)

Ahora usamos el modelo Latent Dirichlet Allocation para extraer los temas

In [None]:
clustering = clus.LDA(k=2, optimizer='online', featuresCol=stringIndexer.getOutputCol())

Juntemos las piezas

In [None]:
pipeline = Pipeline(stages=[
        tokenizer, 
        stopwords,
        stringIndexer, 
        clustering]
)

A ver si hemos descubierto adecuadamente los temas.

In [None]:
topics = pipeline \
    .fit(text_data) \
    .transform(text_data)

topics.select('topicDistribution').collect()

#### Regresión

En esta sección trataremos de predecir el MOTHER_WEIGHT_GAIN.

In [None]:
features = ['MOTHER_AGE_YEARS','MOTHER_HEIGHT_IN',
            'MOTHER_PRE_WEIGHT','DIABETES_PRE',
            'DIABETES_GEST','HYP_TENS_PRE', 
            'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM',
            'CIG_BEFORE','CIG_1_TRI', 'CIG_2_TRI', 
            'CIG_3_TRI'
           ]

Primero, juntaremos todas las funciones y usaremos el ChiSqSelector para seleccionar solo las 6 funciones más importantes.

In [None]:
featuresCreator = ft.VectorAssembler(
    inputCols=[col for col in features[1:]], 
    outputCol='features'
)

selector = ft.ChiSqSelector(
    numTopFeatures=6, 
    outputCol="selectedFeatures", 
    labelCol='MOTHER_WEIGHT_GAIN'
)

Para predecir el aumento de peso utilizaremos gradient boosted trees regressor

In [None]:
regressor = reg.GBTRegressor(
    maxIter=15, 
    maxDepth=3,
    labelCol='MOTHER_WEIGHT_GAIN')

Creamos un Pipeline

In [None]:
pipeline = Pipeline(stages=[
        featuresCreator, 
        selector,
        regressor])

weightGain = pipeline.fit(births_train)

Como le va a nuestro modelo?

In [None]:
evaluator = ev.RegressionEvaluator(
    predictionCol="prediction", 
    labelCol='MOTHER_WEIGHT_GAIN')

print(evaluator.evaluate(
     weightGain.transform(births_test), 
    {evaluator.metricName: 'r2'}))