# 2. Preprocesado

En este apartado se estudiarán las características con valores perdidos y se discriminarán aquellas cuyos valores permiten "NA's" de las que no. Para los valores perdidos reales, se evaluarán diferentes formas de imputación y se ejecutará la que se considere más conveniente.

Por otra parte, se procederá a la conversión de variables categóricas a dummies y a la transformación logarítmica de aquellas variables contínuas que no sigan una distribución normal y no cumplan la hipótesis de homocedasticidad. Por último se tipificarán las variables para conseguir un conjunto de datos lo más homogéneo posible e idóneo para el funcionamiento de los algoritmos.

## Inicialización de Spark y carga de librerías

In [1]:
import sys
import os

spark_path = "/Applications/spark-2.1.0-bin-hadoop2.7"
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("Entrega") \
    .getOrCreate()

spark

<pyspark.sql.session.SparkSession at 0x10c93a9e8>

In [19]:
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler, OneHotEncoder, StandardScaler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression, GBTRegressor
from pyspark.sql.functions import col, count, mean, sum as agg_sum
from pyspark.ml.evaluation import RegressionEvaluator as UserEval
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import lit, log1p, expm1, udf 
from pyspark.sql.types import IntegerType, DoubleType 
from pyspark.mllib.stat import Statistics 
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline

import time
import pandas as pd
import numpy as np

seed = 2017

## Lectura de datos

Se importarán los datos preprocesados en el notebook anterior, y se aplicará un pipiline con las tranformaciones necesarias crear variables dummies, escalar los datos y pasarlos al formato adecuado para aplicar los modelos de ML.

In [5]:
path = "preprocessed"
datos = spark.read.csv(path, header=True, inferSchema=True, nullValue= 'NA')

## Creación del pipeline de transformaciones

Dado que los modelos no tabajan con variables categóricas, se hace necesaria su transformación a variables Dummies. Se trabajará con las librerías que proporciona el paquete ml de spark, implementándolas en un pipeline que permite concatenar operaciones sobre el conjunto de datos de forma más eficiente que realizando las transformaciones una a una

In [6]:
# División entre variables categóricas y numéricas
categ = [tupla[0] for tupla in datos.dtypes if tupla[1] == "string"]
no_categ = [tupla[0] for tupla in datos.dtypes if tupla[1] != "string"]

no_categ.remove('SalePrice')
no_categ.remove('Id')

# A cada columna categórica se le asocia un StringIndexer
StringIndex={c:StringIndexer(inputCol=c, outputCol=c+'_n') for c in categ}

# Codificación One Hot sobre  la salida StringIndex
encoders={c:OneHotEncoder(inputCol = c + '_n', outputCol = c + '_dummies') for c in categ}

# Union de las columnas en una de tipo vector (formato requerido por los modelos de ml).
assembler = VectorAssembler(inputCols=[c+'_dummies' for c in categ] + no_categ, outputCol="features")

# Escalado de los datos
standard_scaler = StandardScaler(withMean=True, withStd=True, inputCol='features', outputCol='scaledFeatures')


# Se crea un pipeline que permite concatenar los estimadores creados anteriormente
piplist=list(StringIndex.values())+list(encoders.values())+[assembler]+[standard_scaler]
pip=Pipeline(stages=piplist)

# Se entrena el transforamdor
pippre=pip.fit(datos)

In [7]:
# Separación del conjunto en train y test
test = datos.filter(datos.SalePrice.isNull())
train = datos.filter(datos.SalePrice.isNotNull())

print("El tamaño del conjunto de train es : ", train.select('Id').count())
print("El tamaño del conjunto de test es : ", test.select('Id').count())

El tamaño del conjunto de train es :  1460
El tamaño del conjunto de test es :  1459


In [9]:
# Aplicación del pipeline al conjunto de test
train_trans = pippre.transform(train)

#features contiene variables predictoras, label contiene variable objetivo.
train_trans = train_trans.select('features', train_trans.SalePrice.alias('label'),'Id')

train_trans.show(5)

+--------------------+------------------+---+
|            features|             label| Id|
+--------------------+------------------+---+
|(259,[0,4,5,7,10,...| 12.24769911637256|  1|
|(259,[0,4,5,7,10,...|12.109016442313738|  2|
|(259,[0,4,5,8,10,...|12.317171167298682|  3|
|(259,[0,4,5,8,10,...|11.849404844423074|  4|
|(259,[0,4,5,8,10,...|12.429220196836383|  5|
+--------------------+------------------+---+
only showing top 5 rows



In [8]:
# Aplicación del pipeline al conjunto de test
test_trans = pippre.transform(test)

#features contiene variables predictoras, label contiene variable objetivo.
test_trans = test_trans.select('features',test_trans.SalePrice.alias('label'),'Id')

test_trans.show(5)

+--------------------+-----+----+
|            features|label|  Id|
+--------------------+-----+----+
|(259,[3,4,5,7,10,...| null|1461|
|(259,[0,4,5,8,10,...| null|1462|
|(259,[0,4,5,8,10,...| null|1463|
|(259,[0,4,5,8,10,...| null|1464|
|(259,[0,4,5,8,11,...| null|1465|
+--------------------+-----+----+
only showing top 5 rows



## Separación de los datos de train_trans en train y validation

Como no se tiene información acerca de SalePrice en los datos de test, hay que hacer una división del conjunto de entrenamiento train_trans en validación y train. Esto nos permitirá ajustar los modelos y obtener una estimación del error cometido por éstos.

In [11]:
train, validation = train_trans.randomSplit([0.75, 0.25], seed=seed)

## Aplicación de modelos básicos

### Random Forest

In [20]:
# Instanciación del modelo
rf_1 = RandomForestRegressor(predictionCol='p_rf_1', seed=seed)

# Definición del grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf_1.minInstancesPerNode, [5,10,20,30]) \
    .addGrid(rf_1.maxDepth, [3, 7, 11, 15]) \
    .addGrid(rf_1.numTrees, [10, 50, 100]) \
    .build()
    
# Instanciación del método de crosvalidación
crossval = CrossValidator(estimator=rf_1,
                          estimatorParamMaps=paramGrid,
                          evaluator=UserEval(predictionCol='p_rf_1'),
                          numFolds=5, 
                          seed=seed) 


t0 = time.time()
rf_1_cv_mod = crossval.fit(train)
tt = time.time() - t0
print('Tiempo ajuste de parámetros {} min.'.format(round(tt/60,2)))

# Tabla results
par_map = rf_1_cv_mod.getEstimatorParamMaps()
lpars = [{par.name: value for par, value in par_comb.items()} for par_comb in par_map]

pars_df = pd.DataFrame(lpars)
pars_df['score'] = rf_1_cv_mod.avgMetrics
pars_df.sort_values(by='score', ascending=False)

Tiempo ajuste de parámetros 17.93 min.


Unnamed: 0,maxDepth,minInstancesPerNode,numTrees,score
19,3,30,50,0.191417
18,3,20,50,0.191273
16,3,5,50,0.19091
3,3,30,10,0.190732
17,3,10,50,0.190613
35,3,30,100,0.190338
2,3,20,10,0.190188
34,3,20,100,0.190107
33,3,10,100,0.189865
32,3,5,100,0.189795


In [23]:
#EVALUAMOS MEDIANTE TEST.
val = rf_1_cv_mod.bestModel.transform(validation)
val.show(5)
evaluator = UserEval(predictionCol='p_rf_1')
modelos_eval = pd.DataFrame([('rf1',evaluator.evaluate(val))],columns=['Modelo','Score'])
modelos_eval

+--------------------+------------------+----+------------------+
|            features|             label|  Id|            p_rf_1|
+--------------------+------------------+----+------------------+
|(259,[0,4,5,7,10,...|11.863589378812122| 420|11.866650660403034|
|(259,[0,4,5,7,10,...|11.674202123168433| 932|11.717866417047006|
|(259,[0,4,5,7,10,...|  11.6617850286409|1201|11.630776128548042|
|(259,[0,4,5,7,10,...|11.884495917930655| 255|11.845339117544441|
|(259,[0,4,5,7,10,...|11.849404844423074| 675| 11.91621676772221|
+--------------------+------------------+----+------------------+
only showing top 5 rows



Unnamed: 0,Modelo,Score
0,rf1,0.146155


In [21]:
# Instanciación del modelo
gbt_1 = GBTRegressor(predictionCol='p_gbt_1',featuresCol='features', seed=seed)

# Definición del grid
paramGrid = ParamGridBuilder() \
    .addGrid(gbt_1.minInstancesPerNode , [20,25,30,40]) \
    .addGrid(gbt_1.maxDepth, [2,3,4]) \
    .addGrid(gbt_1.maxIter , [50,100]) \
    .build()
    
    
# Instanciación del método de crosvalidación
crossval = CrossValidator(estimator=gbt_1,
                          estimatorParamMaps=paramGrid,
                          evaluator=UserEval(predictionCol='p_gbt_1'),
                          numFolds=5, 
                          seed=seed) 

t0 = time.time()
gbt_cv_mod = crossval.fit(train)
tt = time.time() - t0
print('Tiempo ajuste de parámetros {} min.'.format(round(tt/60,2)))

# Tabla results
par_map = gbt_cv_mod.getEstimatorParamMaps()
lpars = [{par.name: value for par, value in par_comb.items()} for par_comb in par_map]
pars_df = pd.DataFrame(lpars)
pars_df['score'] = gbt_cv_mod.avgMetrics
pars_df.sort_values(by='score', ascending=False)

Tiempo ajuste de parámetros 45.7 min.


Unnamed: 0,maxDepth,maxIter,minInstancesPerNode,score
9,4,50,25,0.161667
8,4,50,20,0.161657
10,4,50,30,0.161155
11,4,50,40,0.159412
7,3,50,40,0.157668
2,2,50,30,0.157548
1,2,50,25,0.156495
5,3,50,25,0.156395
0,2,50,20,0.156332
21,4,100,25,0.156114


In [24]:
val=gbt_cv_mod.bestModel.transform(validation)
val.show(5)
evaluator=UserEval(predictionCol='p_gbt_1')
modelos_eval=pd.concat([modelos_eval,pd.DataFrame([('gbt',evaluator.evaluate(val))],columns=['Modelo','Score'])])
modelos_eval

+--------------------+------------------+----+------------------+
|            features|             label|  Id|           p_gbt_1|
+--------------------+------------------+----+------------------+
|(259,[0,4,5,7,10,...|11.863589378812122| 420|11.855857957423007|
|(259,[0,4,5,7,10,...|11.674202123168433| 932|11.761172816272868|
|(259,[0,4,5,7,10,...|  11.6617850286409|1201|11.565947976023734|
|(259,[0,4,5,7,10,...|11.884495917930655| 255|11.847304109845078|
|(259,[0,4,5,7,10,...|11.849404844423074| 675|11.880424866809443|
+--------------------+------------------+----+------------------+
only showing top 5 rows



Unnamed: 0,Modelo,Score
0,rf1,0.146155
0,gbt,0.150746
