Entrega APBD: Spark

> Master en Data Science y Big Data

> Alumno: Alvaro Racero Armario






# Inicializamos el entorno virtual

Instalacion java y spark

In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

Instalamos drive para acceder a los archivos y guardar el resultado

In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


Importamos e iniciamos spark

In [4]:
import findspark
findspark.init()

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("Entrega_APBD") \
    .getOrCreate()
#Cliente
spark

In [6]:
spark.sparkContext.defaultParallelism
#Numero hilos ejecucion cluster (1 maquina de 2 hilos)

2

# Cargamos los archivos necesarios

Ruta donde tengo los archivos y donde voy a cargar mis resultados

In [38]:
input_path = '/content/drive/MyDrive/Practica_IPDB/data entrega/{}'
#mi output va a ser un csv con el id de la casa y el precio estimado de test
output_path= '/content/drive/MyDrive/Practica_IPDB/data entrega/output'

Cargamos el dataset

In [8]:
train = spark.read.csv(path=input_path.format('train.csv'), header=True, inferSchema=True, nullValue='NA')
# Header=true ya que tienen nombre las columnas, inferschema para que detecte
# los datos numericos, nullvalue="NA" para que entienda los NA como nulos
test = spark.read.csv(path=input_path.format('test.csv'), header=True, inferSchema=True, nullValue='NA')

En el siguiente chunk di un primer vistazo al dataset. Lo he convertido en comentario para que no sea interminable el archivo

In [9]:
#train.printSchema()
#len(train.columns) #Vemos que tenemos 80 variables más la variable objetivo
#train.show(10)

In [10]:
print(f'Observaciones en train: {train.count()}') #Tenemos 1460 observaciones en train
print(f'Observaciones en test: {test.count()}') #Tenemos 1459 observaciones en test


Observaciones en train: 1460
Observaciones en test: 1459


# Preprocesado

En las columnas donde NA no significa nulo vamos a ponerle el valor "otros" para que podamos usar esas variables correctamente

In [11]:
#columnas donde NA no significa Nulo (informacion en la documentacion del dataset):
col_cambiar=['Alley','BsmtQual', 'BsmtCond','BsmtExposure','BsmtFinType1','BsmtFinType2','FireplaceQu','GarageType',
             'GarageFinish','GarageQual','GarageCond','PoolQC','Fence','MiscFeature']

for col_name in col_cambiar:
    train = train.fillna("otros", subset=[col_name])

Ahora que nos hemos librado de los falsos nulos, vamos a ver que nulos quedan en el resto de variables y que hacer con ellos

In [12]:
#Importamos el siguiente paquete para trabajar con consultas SQL en los dataframe
import pyspark.sql.functions as F

#Creamos funcion para mostrar las columnas con al menos un valor nulo y su recuento
def mostrar_nulos(df):
    # Calculamos el recuento de nulos para todas las columnas
    null_counts = df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns]).first().asDict()
    # Filtramos solo las columnas que tienen al menos un valor nulo
    columns_with_nulls = [c for c, count in null_counts.items() if count > 0]
    # Hacemos la primera operacion pero solo para las columnas con nulos
    exprs = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in columns_with_nulls]
    return df.agg(*exprs).show()

mostrar_nulos(train)

+-----------+----------+----------+----------+-----------+
|LotFrontage|MasVnrType|MasVnrArea|Electrical|GarageYrBlt|
+-----------+----------+----------+----------+-----------+
|        259|         8|         8|         1|         81|
+-----------+----------+----------+----------+-----------+



In [13]:
#Hagamoslo tambien en test:
for col_name in col_cambiar:
    test = test.fillna("otros", subset=[col_name])

mostrar_nulos(test)

+--------+-----------+---------+-----------+-----------+----------+----------+----------+----------+---------+-----------+------------+------------+-----------+----------+-----------+----------+----------+--------+
|MSZoning|LotFrontage|Utilities|Exterior1st|Exterior2nd|MasVnrType|MasVnrArea|BsmtFinSF1|BsmtFinSF2|BsmtUnfSF|TotalBsmtSF|BsmtFullBath|BsmtHalfBath|KitchenQual|Functional|GarageYrBlt|GarageCars|GarageArea|SaleType|
+--------+-----------+---------+-----------+-----------+----------+----------+----------+----------+---------+-----------+------------+------------+-----------+----------+-----------+----------+----------+--------+
|       4|        227|        2|          1|          1|        16|        15|         1|         1|        1|          1|           2|           2|          1|         2|         78|         1|         1|       1|
+--------+-----------+---------+-----------+-----------+----------+----------+----------+----------+---------+-----------+------------+-----

Vemos que en train y en test hay un exceso e nulos en las variables LotFrontage y GarageYrBlt (más de un 5% de todos los casos) Por tanto vamos a borrar estas variables.

In [14]:
#En test y en train hay un exceso de nulos en las variables LotFrontage y GarageYrBlt
train=train.drop('LotFrontage','GarageYrBlt') #Borrada por exceso de nulos
test=test.drop('LotFrontage','GarageYrBlt') #Borradas por exceso de nulos

Una vez eliminadas estas variables, vamos a eliminar las observaciones que contengan valores nulos:



In [15]:
train = train.na.drop()
train.count() # Se borran 9 observaciones en train

1451

# Técnicas de machine learning

### Vectorizar variables

Es necesario vectorizar las variables para mejorar la eficiencia y para facilitar la manipulación en spark.

Antes de vectorizar los datos es necesario transformar las variables categóricas de texto a indice y seleccionar las variables con las que haremos nuestro vector de "features"


In [16]:
#Listado de variables categoricas que van a ser el input para ser transformadas en indices numericos
col_input=[]
#Listado de las nuevas variables creadas por StringIndexer
col_output = []
#Total de variables que vamos a usar para la regresion: van a ser las variables categoricas transformadas y las numericas
selected_features=[]

#Vemos que columnas son categoricas
for col_name, col_type in train.dtypes:
    if col_type == "string":
        # Añado la columna categorica a la lista y preparo mis columnas seleccionadas
        col_input.append(col_name)
        col_output.append(col_name+"_n")
        selected_features.append(col_name+"_n")
    else:
      #Al ser numericas aqui no hay que hacer nada
      selected_features.append(col_name)

selected_features.remove('Id') #eliminamos de la regresion una columna que solo es de identificacion
selected_features.remove('SalePrice') #Eliminamos la variable objetivo del futuro vector

#train.select(*col_input).show(5) #Columnas categoricas

In [17]:
len(selected_features) #Nos hemos quedado con 77 variables predictoras

77

Ya teniendo los listados podemos empezar a construir nuestros transformer y la pipeline, que nos permite ejecutar todos nuestros transformer de forma automática con una menor cantidad de líneas de código.

In [18]:
#Cargamo librerias necesarias
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml import Pipeline

#Creamos el transformer de texto a indice
cat_indexer_model = StringIndexer(inputCols=col_input,outputCols=col_output)

#Creamos el transformer de las variables seleccionadas a vector
assembler = VectorAssembler(inputCols= selected_features,outputCol='features')

#Creamos el pipeline que tiene dos pasos: hacer la transformacion cat_indexer_model y luego el vector de "features"
pipeline = Pipeline(stages=[cat_indexer_model,assembler])

Hay que tener en cuenta que todavía no se ha ejecutado nada, se puede ver en el tiempo de ejecucion del chunk de arriba. Ahora vamos a ejecutar nuestro pipeline sobre el conjunto de datos limpio "train":

In [19]:
preprocessing_pl = pipeline.fit(train)

train_fin = preprocessing_pl.transform(train)
train_fin.show(3)

+---+----------+--------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+----------+--------+-------+----------+-------------+-----------+-----------+-----------+--------------+------------+------------+----------+------------+-----------+-

Para que sea más simple la tabla y facil de leer, vamos a quedarnos solamente con las columnas features y la variable objetivo, la cual comenzaremos a llamar label:

In [20]:
#dataset features y etiquetas
train_fin= train_fin.select(train_fin.features,train_fin.SalePrice.alias('label'))
train_fin.show(3)

+--------------------+------+
|            features| label|
+--------------------+------+
|(77,[0,2,10,14,15...|208500|
|(77,[0,2,8,10,11,...|181500|
|(77,[0,2,5,10,14,...|223500|
+--------------------+------+
only showing top 3 rows



En el siguiente chunk podemos ver la correlacion de diferentes variables con la variable objetivo "SalePrice". Esto se podría usar para hacer una selección de variables más a fondo pero vamos a obviarlo por ahora.

In [22]:
#Correlacion variables
from pyspark.sql.functions import corr
train_corr = preprocessing_pl.transform(train)
correlation = train_corr.select([corr('SalePrice', col).alias(col) for col in selected_features])

# Muestra la correlación
correlation.show()

+--------------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+------------------+------------------+------------------+------------------+--------------------+--------------------+------------------+------------------+------------------+--------------------+-------------------+-------------------+-------------------+-----------------+--------------------+------------------+--------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+------------------+-------------------+--------------------+------------------+-------------------+--------------------+------------------+------------------+-------------

Reduccion de la dimensionalidad mediante el Analisis de las componentes principales

In [23]:
#Intento de Reduccion dimensionalidad mediante analisis componentes principales. Da un RMSE malo y no ha sido posible usarlo
#from pyspark.ml.feature import PCA
#pca = PCA(k=2, inputCol="features", outputCol="pca_features")
#model = pca.fit(train_fin)
#result = model.transform(train_fin)
#result.show(3)

Antes de comenzar la regresión, vamos a dividir el conjunto test en entrenamiento y validación, ya que tenemos que ver el rendimiento de los modelos sin que estos hayan sido usados en el entrenamiento:

In [24]:
#Dividimos el conjunto en 80% entrenamiento 20% validacion
(trainingData, testData) = train_fin.randomSplit([0.8, 0.2])

#Vamos a crear una funcion para evaluar nuestros modelos
from pyspark.ml.evaluation import RegressionEvaluator
def evaluador(val,modelo):
  #Creamos predicciones con el conjunto de datos test
  predictions = modelo.transform(val)
  eval = RegressionEvaluator(
      labelCol="label", predictionCol="prediction", metricName="rmse")
  rmse = eval.evaluate(predictions)
  print("RMSE on test data = %g" % rmse)

  eval = RegressionEvaluator(
      labelCol="label", predictionCol="prediction", metricName="r2")
  r2 = eval.evaluate(predictions)
  print("R2 on test data = %g" % r2)

#Creamos una funcion para obtener los mejores hiperparámetros de los modelos mediante validacion cruzada
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
def crossval(estimador,grid):
  # Evaluador para mejorar rmse
  eval = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
  crossval = CrossValidator(estimator=estimador,
                            estimatorParamMaps=grid,
                            evaluator=eval,
                            numFolds=4)  # número de divisiones en la validación cruzada
  # Entrenamos el modelo mediante validacion cruzada con el dataset train completo
  cv_model = crossval.fit(trainingData) ################train_fin
  # Encuentra el mejor modelo
  return cv_model

## Regresión lineal

Primero vamos a comenzar con un modelo sencillo, la regresión lineal clásica. Veamos que resultados da:

In [25]:
#Regresion lineal
from pyspark.ml.regression import LinearRegression

#Creamos el modelo
lr = LinearRegression(featuresCol = 'features', labelCol='label')

#Entrenamos el modelo
lr_model = lr.fit(trainingData)

#Usaremos la funcion evaluador ya creada para evaluar nuestro modelo sobre testData
evaluador(testData,lr_model)

RMSE on test data = 39858.3
R2 on test data = 0.797441


El resultado no es nada malo, pero tenemos que compararlo con otros modelos para tener una visión clara

Antes de pasar a otros modelos vamos a ajustar los hiperparámetros de la regresión lineal:

In [26]:
# mejores hiperparametros regresion lineal

param_grid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 20]) \
    .addGrid(lr.regParam, [0.0, 0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

best_lr = crossval(lr,param_grid)
#hiperparámetros del mejor modelo
print("MaxIter:", best_lr.bestModel.getMaxIter())
print("RegParam:", best_lr.bestModel.getRegParam())
print("ElasticNetParam:", best_lr.bestModel.getElasticNetParam())

MaxIter: 10
RegParam: 0.01
ElasticNetParam: 0.5


Modelo de regresion lineal ajustado:

In [27]:
#lr = LinearRegression(featuresCol = 'features', labelCol='label', maxIter=10, regParam=0.01, elasticNetParam=0.5)
evaluador(testData,best_lr)

RMSE on test data = 38307.9
R2 on test data = 0.812894


Con el ajuste de hiperparámetros tenemos una ligera mejora del modelo. Veamos que tal con una regresión de random forest:

## Random forest (regresion)

In [28]:
# Random forest
from pyspark.ml.regression import RandomForestRegressor

#Hacemos los mismos pasos que el modelo anterior
rf = RandomForestRegressor(featuresCol="features",labelCol="label")
rf_model= rf.fit(trainingData)

#Usaremos la funcion evaluador ya creada para evaluar nuestro modelo
evaluador(testData,rf_model)

RMSE on test data = 37352.9
R2 on test data = 0.822107


El modelo Random Forest da mejores resultados a priori que el lineal, pero para tomar una decisión final tenemos que ajustar los hiperparámetros:

In [29]:
#Malla de hiperparámetros random forest regression
param_grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2, 5, 10]) \
    .addGrid(rf.maxBins, [25,30,40]) \
    .addGrid(rf.numTrees, [5, 20, 50]) \
    .build()

# Encuentra el mejor modelo
best_rf = crossval(rf,param_grid)

#Con la regresion random forest el modelo

# Muestra los hiperparámetros del mejor modelo
print("MaxDepth:", best_rf.bestModel.getMaxDepth())
print("MaxBins:", best_rf.bestModel.getMaxBins())
print("NumTrees:", best_rf.bestModel.getNumTrees)

MaxDepth: 10
MaxBins: 30
NumTrees: 50


In [30]:
#No necesito crearlo de nuevo porque ya lo tengo creado con la validacion cruzada
#rf = RandomForestRegressor(featuresCol="features",labelCol="label",maxDepth=10,maxBins=30,numTrees=20)
#best_rf= rf.fit(trainingData)
evaluador(testData,best_rf)


RMSE on test data = 32725.5
R2 on test data = 0.863452


En este modelo si vemos una clara mejoría al ajustar los hiperparámetros, siendo superior que el modelo de regresion lineal ajustado. Usaremos este modelo debido que es el que menor rmse nos devuelve.



> **RMS**



# Solucion al problema (predicciones sobre test)

Ya solo nos queda generar unas predicciones del precio de las viviendas del conjunto test para participar en el reto.

Debido a que ya tenemos construida la pipeline para preprocesar los datos, solamente necesitamos trabajar los valores nulos de test y crear un dataframe con el "ID" para reconocer las variables y su vector de "features"

In [31]:
#Hacemos todo el preprocesado a test tambien, creamos el vector características con su ID
test = test.na.drop()

preprocessing_pl = pipeline.fit(test)
test_fin = preprocessing_pl.transform(test)
test_fin= test_fin.select("Id",test_fin.features)
test_fin.show(3)

+----+--------------------+
|  Id|            features|
+----+--------------------+
|1461|(77,[0,1,2,11,15,...|
|1462|(77,[0,2,5,8,15,1...|
|1463|(77,[0,2,5,10,14,...|
+----+--------------------+
only showing top 3 rows



Crearemos las predicciones sobre nuestro mejor modelo y seleccionaremos mediante select el id y el valor de la casa que queremos predecir.

In [42]:
pred = best_rf.transform(test_fin)

pred = pred.select("Id","prediction")
pred.show(3)

+----+------------------+
|  Id|        prediction|
+----+------------------+
|1461|126688.06916058424|
|1462|148430.75902223002|
|1463|190921.94476800333|
+----+------------------+
only showing top 3 rows



Finalmente crearemos un archivo CSV y lo mandaremos al output que está establecido en las primeras lineas. Este archivo es el que habría que mandar a la competición.

In [43]:
#Guardamos los resultados
pred.write.csv(output_path,header=True,mode="overwrite")