# Predicción del precio de viviendas con PySpark
Este notebook utiliza Apache Spark para construir un modelo de regresión que predice el precio de viviendas usando el dataset de Kaggle 'House Prices - Advanced Regression Techniques'.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pyspark.sql.functions as F
from math import sqrt
import os


In [None]:
spark = SparkSession.builder.appName("HousePricePrediction").getOrCreate()

train_df = spark.read.csv("data/train.csv", header=True, inferSchema=True)
test_df = spark.read.csv("data/test.csv", header=True, inferSchema=True)

train_df.printSchema()
train_df.select("SalePrice").describe().show()


In [None]:
from pyspark.sql.functions import col, count, when, isnan

missing_values = train_df.select([
    count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in train_df.columns
])
missing_values.show()


In [None]:
num_cols = [c for c, t in train_df.dtypes if t in ['int', 'double'] and c != 'SalePrice']
cat_cols = [c for c, t in train_df.dtypes if t == 'string']


In [None]:
imputer = Imputer(strategy="median", inputCols=num_cols, outputCols=[c + "_imp" for c in num_cols])
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="keep") for c in cat_cols]
encoders = [OneHotEncoder(inputCol=c + "_idx", outputCol=c + "_ohe") for c in cat_cols]

feature_cols = [c + "_imp" for c in num_cols] + [c + "_ohe" for c in cat_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")


In [None]:
lr = LinearRegression(featuresCol="features", labelCol="SalePrice")

pipeline_lr = Pipeline(stages=indexers + encoders + [imputer, assembler, lr])

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()

evaluator_rmse = RegressionEvaluator(labelCol="SalePrice", predictionCol="prediction", metricName="rmse")

cv_lr = CrossValidator(estimator=pipeline_lr,
                       estimatorParamMaps=paramGrid_lr,
                       evaluator=evaluator_rmse,
                       numFolds=5)

lr_model = cv_lr.fit(train_df)


In [None]:
rf = RandomForestRegressor(featuresCol="features", labelCol="SalePrice")

pipeline_rf = Pipeline(stages=indexers + encoders + [imputer, assembler, rf])

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

cv_rf = CrossValidator(estimator=pipeline_rf,
                       estimatorParamMaps=paramGrid_rf,
                       evaluator=evaluator_rmse,
                       numFolds=5)

rf_model = cv_rf.fit(train_df)


In [None]:
predictions = rf_model.transform(train_df)
rmse = evaluator_rmse.evaluate(predictions)
print("RMSE:", rmse)


In [None]:
from pyspark.ml.evaluation import Evaluator

class RmsleEvaluator(Evaluator):
    def __init__(self, predictionCol='prediction', targetCol='SalePrice'):        
        super(RmsleEvaluator, self).__init__()
        self.predictionCol = predictionCol
        self.targetCol = targetCol

    def _evaluate(self, dataset):
        error = self.rmsle(dataset, self.predictionCol, self.targetCol)
        print("RMSLE:", error)
        return error

    def isLargerBetter(self):
        return False

    @staticmethod
    def rmsle(dataset, predictionCol, targetCol):
        return sqrt(dataset.select(F.avg((F.log1p(dataset[targetCol]) - F.log1p(dataset[predictionCol])) ** 2)).first()[0])

rmsle_eval = RmsleEvaluator()
rmsle = rmsle_eval.evaluate(predictions)


In [None]:
final_predictions = rf_model.transform(test_df)
final_predictions.select("Id", "prediction") \
    .withColumnRenamed("prediction", "SalePrice") \
    .toPandas().to_csv("data/submission2.csv", index=False)
