<center> <h1>Trabajo final para la asignatura APBD</h1> </center>
<center> <h3>Máster en Data Science y Big Data. CFP-Universidad de Sevilla</h3> </center>
<p style="text-align: center;">Autor: Inmaculada Perea Fernández</p>
<p style="text-align: center;">(Julio 2017)</p>


# Tabla de contenidos

1. [Descripción del problema](#descripcion)

2. [Inicio sesión Spark](#inicio)

3. [Lectura de datos y exploración](#exploracion)

4. [Preprocesado](#preprocesado)

    4.1. [Valores perdidos (missings)](#missings)
    
    4.2. [Conversión a tipo numérico](#numerico)
    
    4.3. [Escalado](#escalado)
    
    4.4. [Convertir features en vector](#features)
    
5. [Selección de variables](#seleccion)

6. [Modelos](#modelos)

    6.1. [Ajuste de parámetros](#ajuste)
    
    6.2. [Evaluación](#evaluacion)
       
7. [Cierre sesión spark](#cierre)


<a id='descripcion'></a>

# Descripción del problema

El problema elegido para el trabajo es uno de los datasets de aprendizaje de kaggle, en el que el objetivo es predecir el precio de venta de viviendas a partir de sus características

Para más información sobre el problema ir a https://www.kaggle.com/c/house-prices-advanced-regression-techniques

El objetivo de este trabajo es practicar con las diferentes herramientas que *Spark* nos proporciona para el procesamiento distribuido de grandes cantidades de datos, y para la aplicación de técnicas de machine learning. La obtención de predicciones óptimas es un objetivo secundario aunque se valorará positivamente.

Las APIs de Spark más importantes para este proyecto serán la API de DataFrames y la API de ML para DataFrames. También tenemos a nuestra disposición la API RDDs para procesamiento a bajo nivel, y la API MLlib para RDDs.

# Carga de librerías

In [35]:
import sys

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Se importa sum como agg_sum para evitar colisión de nombres
from pyspark.sql.functions import lit, col, count, sum as agg_sum
from pyspark.sql.functions import expr, udf, regexp_extract
from pyspark.sql.functions import desc, avg, round as col_round


from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import pandas as pd
import matplotlib.pyplot as plt

# Establecimiento Semilla

Establecemos la semilla que usaremos para toda la práctica

In [36]:
global_seed = 123456789

<a id="inicio"></a>

# Inicio sesión spark

Creamos la sesión spark como punto de entrada para poder trabajar con la API de DataFrame

In [37]:
# Ruta de la carpeta local Spark.
spark_path = 'C:/Users/inpf/spark-2.1.0-bin-hadoop2.7'

sys.path.append(spark_path + '/python')
sys.path.append(spark_path + '/python/lib/py4j-0.10.4-src.zip')

spark = SparkSession.builder.master("local[*]").appName("evaluacion_APBD_InmaPerea").getOrCreate()

spark

<pyspark.sql.session.SparkSession at 0x44c2d51be0>

Comprobamos el número de cores asignados

In [38]:
spark.sparkContext.defaultParallelism

4

<a id="exploracion"></a>

# 3. Lectura de datos

Para explorar uniremos ambos datasets train y test. Por tanto cargaremos ambos datasets para su unión y posterior exploración.

In [94]:
path = './data/'

train = spark.read.csv(path +'train.csv', header=True, inferSchema=True, nullValue="NA").cache()
test  = spark.read.csv(path +'test.csv', header=True, inferSchema=True, nullValue="NA").cache()

Comprobaremos si ambos datasets tienen las mismas dimensiones y esquema antes de unirlos

In [163]:
print('Numero columnas para cada conjunto de datos ')
print('- train = {}'.format(len(train.schema.fields)))
print('- test  = {}'.format(len(test.schema.fields)))

Numero columnas para cada conjunto de datos 
- train = 81
- test  = 81


Observamos que tienen dimensiones distintas, esto es debido a que en el conjunto *test* no existe la columna correspondiente a la variable respuesta *SalePrice*. Vamos a añadir dicha columna al conjunto *test*.

Consultamos el tipo de dato de la columna *SalePrice* para replicarlo en el conjunto test.

In [153]:
train.schema['SalePrice']

StructField(SalePrice,IntegerType,true)

Creamos la columna en el conjunto *test* con el mismo tipo que en el conjunto *train*

In [152]:
test = test.withColumn('SalePrice', lit(None).cast(IntegerType()))

Ahora comprobaremos los esquemas de ambos dataframes, el orden de las columnas y su tipo

In [158]:
equal=True

for test_field, train_field in zip (test.schema.fields, train.schema.fields): 
    if(train_field != test_field):
        equal=False
        
if equal==True:
    print("Ambos dataframes tienen el mismo esquema de datos") 
else:
    print("Warning: Dataframes diferentes")   

Ambos dataframes tienen el mismo esquema de datos


In [164]:
full = train.union(test)

print("Número de filas de cada dataframe:")
print("- train = {}".format(train.count()))
print("- test  = {}".format(test.count()))
print("- full  = {}".format(full.count()))

Número de filas de cada dataframe:
- train = 1460
- test  = 1459
- full  = 2919


# 4. Exploración de los datos

El esquema del dataframe union de train + test

In [165]:
full.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |-

In [166]:
num_columns=len(full.columns)
print("Número de variables= {}".format(num_columns))

Número de variables= 81


In [167]:
step=7
for index in range(0, num_columns, step):
    full.select(cols[index:index+step]).show(5) 

+---+----------+--------+-----------+-------+------+-----+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|
+---+----------+--------+-----------+-------+------+-----+
|  1|        60|      RL|         65|   8450|  Pave| null|
|  2|        20|      RL|         80|   9600|  Pave| null|
|  3|        60|      RL|         68|  11250|  Pave| null|
|  4|        70|      RL|         60|   9550|  Pave| null|
|  5|        60|      RL|         84|  14260|  Pave| null|
+---+----------+--------+-----------+-------+------+-----+
only showing top 5 rows

+--------+-----------+---------+---------+---------+------------+----------+
|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|
+--------+-----------+---------+---------+---------+------------+----------+
|     Reg|        Lvl|   AllPub|   Inside|      Gtl|     CollgCr|      Norm|
|     Reg|        Lvl|   AllPub|      FR2|      Gtl|     Veenker|     Feedr|
|     IR1|        Lvl|   AllPub|   Inside|      Gtl|     Co

In [168]:
step=5
for index in range(0, num_columns, step):
    full.describe(cols[index:index+step]).show()

+-------+-----------------+------------------+--------+------------------+-----------------+
|summary|               Id|        MSSubClass|MSZoning|       LotFrontage|          LotArea|
+-------+-----------------+------------------+--------+------------------+-----------------+
|  count|             2919|              2919|    2915|              2433|             2919|
|   mean|           1460.0|  57.1377183967112|    null| 69.30579531442663|10168.11408016444|
| stddev|842.7870430897713|42.517627829150314|    null|23.344904706927394|7886.996359105535|
|    min|                1|                20| C (all)|                21|             1300|
|    max|             2919|               190|      RM|               313|           215245|
+-------+-----------------+------------------+--------+------------------+-----------------+

+-------+------+-----+--------+-----------+---------+
|summary|Street|Alley|LotShape|LandContour|Utilities|
+-------+------+-----+--------+-----------+---------+


<a id="missings"></a>

## 4.1 Valores perdidos (missings)

Hay que indicar que los NA son los missings (nullValue="NA")

NA se ha usado tambien para rellenar los valores de las variables que no aplica, por ejemplo casas que no tienen garaje se ha completado con NA los metros cuadrados del garaje

In [190]:
def count_nulls(c):
    return agg_sum(col(c).isNull().cast(IntegerType())).alias(c)

exprs = [count_nulls(c) for c in full.columns]

count_NA = full.agg(*exprs).first().asDict()

sorted([(v, k) for k, v in count_NA.items() if v > 0], reverse=True)

In [218]:
variables_NA = [k for k, v in count_NA.items() if v>0]
print(len(variables_NA))
variables_NA

35


['GarageType',
 'SaleType',
 'SalePrice',
 'GarageFinish',
 'Fence',
 'PoolQC',
 'Electrical',
 'GarageArea',
 'BsmtCond',
 'MasVnrArea',
 'MiscFeature',
 'GarageCars',
 'Exterior1st',
 'GarageQual',
 'TotalBsmtSF',
 'BsmtFinSF1',
 'BsmtHalfBath',
 'Exterior2nd',
 'BsmtFinSF2',
 'FireplaceQu',
 'BsmtUnfSF',
 'LotFrontage',
 'BsmtFinType2',
 'KitchenQual',
 'Functional',
 'BsmtFinType1',
 'BsmtExposure',
 'BsmtFullBath',
 'Alley',
 'MasVnrType',
 'GarageCond',
 'Utilities',
 'BsmtQual',
 'MSZoning',
 'GarageYrBlt']

In [213]:
allowed_NA=['Alley',
            'BsmtQual',
            'BsmtCond',
            'BsmtExposure',
            'BsmtFinType1',
            'BsmtFinType2',
            'FireplaceQu',
            'GarageType',
            'GarageFinish',
            'GarageQual',
            'GarageCond',
            'PoolQC',
            'Fence',
            'MiscFeature']

print('Número de variables con categoría NA permitida = {}'.format(len(allowed_NA))

14

In [217]:
missings = [v for v in variables_NA if v not in allowed_NA]
print('Número de variables con valores perdidos = {} '.format(len(missings))
missings

21


['SaleType',
 'SalePrice',
 'Electrical',
 'GarageArea',
 'MasVnrArea',
 'GarageCars',
 'Exterior1st',
 'TotalBsmtSF',
 'BsmtFinSF1',
 'BsmtHalfBath',
 'Exterior2nd',
 'BsmtFinSF2',
 'BsmtUnfSF',
 'LotFrontage',
 'KitchenQual',
 'Functional',
 'BsmtFullBath',
 'MasVnrType',
 'Utilities',
 'MSZoning',
 'GarageYrBlt']

<a id="preprocesado"></a>

# 5. Preprocesado

## 5.1 Imputación de valores perdidos

<a id="numerico"></a>

## 5.2 Conversión a tipo numérico

<a id="escalado"></a>

## 5.3 Escalado

<a id="features"></a>

## 5.4 Convertir features en vector

<a id="seleccion"></a>

# 6. Selección de variables 

(correlaciones, importancia de variables)

<a id="modelos"></a>

# 7. Modelos

<a id="ajuste"></a>

## 7.1 Ajuste de parámetros

<a id="evaluacion"></a>

## 7.2 Evaluación

La evaluación del modelo final debe hacerse con un dataset diferente al de entrenamiento. 

Se recomienda automatizar procesos mediante el uso de pipelines y creación de funciones para evitar repetir código.

La métrica a usar para la evaluación de soluciones es *RMSLE*. Esta métrica no se encuentra dentro de las que ofrece el objeto *RegressionEvaluator* por lo que debemos definir nuestro propio evaluador.

In [9]:
'''
Código original tomado de:
https://gist.github.com/pvalienteverde

- Se ha añadido la fórmula rmsle para que trabaje con la API DataFrames.
'''

from pyspark.ml.evaluation import Evaluator
from math import sqrt
from operator import add
from pyspark.sql.functions import avg
from pyspark.sql.functions import log1p

class MyEvaluator(Evaluator):
    '''
    When a userID is predicted when it is not already trained (all userID  data is used on validation 
    group and none of them to train), prediction is nan,  so RegressionEvaluator returns Nan.
    To solve this we must change RegressionEvaluator by MiValidacion
    '''
    def __init__(self,predictionCol='prediction', targetCol='rating'):        
        super(MyEvaluator, self).__init__()
        self.predictionCol=predictionCol
        self.targetCol=targetCol
        
    def _evaluate(self, dataset):       
        error=self.rmse(dataset,self.predictionCol,self.targetCol)
        print ("Error: {}".format(error))
        return error
    
    def isLargerBetter(self):
        return False
    
    @staticmethod
    def rmse(dataset, predictionCol, targetCol):
        return sqrt(dataset.select(avg((dataset[targetCol] - dataset[predictionCol]) ** 2)).first()[0])
    
    @staticmethod
    def rmsle(dataset, predictionCol, targetCol):
        return sqrt(dataset.select(avg((log1p(dataset[targetCol]) - log1p(dataset[predictionCol])) ** 2)).first()[0])             

In [10]:
evaluator = MyEvaluator()

<a id="cierre"></a>

# 8. Cierre sesión spark

Liberamos recursos

In [34]:
spark.stop()