# House Price Prediction with PySpark

Here, the hous price prediction task is solved by using the big data framework Apache Spark instead of scikit-learn. The house price data set is rather small, so using spark for this task is not efficient at all. But this model scales much better to bigger data sets, than a model using scikit-learn.

In [1]:
import numpy as np

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import log, exp, lit
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

import pyspark.sql.functions as F

import pyspark.ml.tuning as tune

In [2]:
spark = SparkSession.builder.getOrCreate()

## Dataset

In [3]:
# get train and test data set
data_loc = './data'

train_data_base = spark.read.csv(os.path.join(data_loc,'train.csv'), inferSchema=True, header=True, nullValue='NA')
test_data_base = spark.read.csv(os.path.join(data_loc,'test.csv'), inferSchema=True, header=True, nullValue='NA')

## Preprocessing

In [4]:
# combine train and test set for the first preprocessing steps
test_data_base = test_data_base.withColumn('SalesPrice', lit(0))
test_data_base = test_data_base.withColumn('TainOrTest', lit('test'))

train_data_base = train_data_base.withColumn('TrainOrTest', lit('train'))

all_data_base = train_data_base.union(test_data_base)

In [5]:
# replace some categorical values with numerical ones

# define replacements
def binmap1(feature):
    _map = {'Grvl':0, 'Pave':1}
    return _map.get(feature, np.nan)

def binmap2(feature):
    _map= {'N':0, 'Y':1}
    return _map.get(feature, np.nan)

# create user defined functions
udf_binmap1 = F.udf(binmap1, IntegerType())
udf_binmap2 = F.udf(binmap2, IntegerType())

# aplly the udf to the whole set
all_data_base = all_data_base.withColumn('Street', udf_binmap1('Street'))
all_data_base = all_data_base.withColumn('CentralAir', udf_binmap2('CentralAir'))

In [6]:
def map1(feature):
    _map = {'Reg':0, 'IR1': 1, 'IR2':2, 'IR3':3}
    return _map.get(feature, np.nan)
    
def map2(feature):
    _map = {'Po':0, 'Fa':1, 'TA':2, 'Gd': 3, 'Ex':4}
    return _map.get(feature, np.nan)
    
def map3(feature):
    _map = {'Gtl':0, 'Mod':1, 'Sev':2}
    return _map.get(feature, np.nan)
    
def map4(feature):
    _map = {'NA':0, 'Po':1, 'Fa':2, 'TA':3, 'Gd': 4, 'Ex':5}
    return _map.get(feature, np.nan)
    
def map5(feature):
    _map = {'NA':0, 'No':1, 'Mn':2, 'Av':3, 'Gd': 4}
    return _map.get(feature, np.nan)
    
def map6(feature):
    _map = {'NA':0, 'Unf':1, 'LwQ':2, 'Rec':3, 'BLQ': 4, 'ALQ':5, 'GLQ':6}
    return _map.get(feature, np.nan)

def map7(feature):
    _map = {'NA':0, 'Unf':1, 'RFn':2, 'Fin':3}
    return _map.get(feature, np.nan)
    
def map8(feature):
    _map = {'N':0, 'P':1, 'Y':2}
    return _map.get(feature, np.nan)

def map9(feature):
    _map = {'NA':0, 'Fa':1, 'TA':2, 'Gd':3, 'Ex':4}
    return _map.get(feature, np.nan)

udf_map1 = F.udf(map1, IntegerType())
udf_map2 = F.udf(map2, IntegerType())
udf_map3 = F.udf(map3, IntegerType())
udf_map4 = F.udf(map4, IntegerType())
udf_map5 = F.udf(map5, IntegerType())
udf_map6 = F.udf(map6, IntegerType())
udf_map7 = F.udf(map7, IntegerType())
udf_map8 = F.udf(map8, IntegerType())
udf_map9 = F.udf(map9, IntegerType())


all_data_base = all_data_base.withColumn('LotShape', udf_map1('LotShape'))
all_data_base = all_data_base.withColumn('HeatingQC', udf_map2('HeatingQC'))
all_data_base = all_data_base.withColumn('KitchenQual', udf_map2('KitchenQual'))
all_data_base = all_data_base.withColumn('LandSlope', udf_map3('LandSlope'))
all_data_base = all_data_base.withColumn('ExterQual', udf_map2('ExterQual'))
all_data_base = all_data_base.withColumn('ExterCond', udf_map2('ExterCond'))
all_data_base = all_data_base.withColumn('BsmtQual', udf_map4('BsmtQual'))
all_data_base = all_data_base.withColumn('BsmtCond', udf_map4('BsmtCond'))
all_data_base = all_data_base.withColumn('BsmtExposure', udf_map5('BsmtExposure'))
all_data_base = all_data_base.withColumn('BsmtFinType1', udf_map6('BsmtFinType1'))
all_data_base = all_data_base.withColumn('BsmtFinType2', udf_map6('BsmtFinType2'))
all_data_base = all_data_base.withColumn('FireplaceQu', udf_map4('FireplaceQu'))
all_data_base = all_data_base.withColumn('GarageFinish', udf_map7('GarageFinish'))
all_data_base = all_data_base.withColumn('GarageQual', udf_map4('GarageQual'))
all_data_base = all_data_base.withColumn('GarageCond', udf_map4('GarageCond'))
all_data_base = all_data_base.withColumn('PavedDrive', udf_map8('PavedDrive'))
all_data_base = all_data_base.withColumn('PoolQC', udf_map9('PoolQC'))

In [7]:
# find all numerical columns
num_features = [col_name for col_name, dtype in all_data_base.dtypes if dtype == "int"]

# remove SalePrice, which is the target, and the Id column from the list of features
num_features.remove("SalePrice")
num_features.remove("Id")

In [8]:
# cast all numerical features to double (necessary for imputation)
for feat in num_features:
    all_data_base = all_data_base.withColumn(feat, all_data_base[feat].cast(DoubleType()))

In [9]:
# split validation data
train, val = all_data_base.where(all_data_base.TrainOrTest == 'train').randomSplit([.7, .3])

# get the test set
test = all_data_base.where(all_data_base.TrainOrTest == 'test')

In [10]:
# log-scale the SalePrice
train = train.withColumn("SalePriceLog", log("SalePrice"))
val = val.withColumn("SalePriceLog", log("SalePrice"))

## ML Pipeline

In [11]:
# names of the features afer imputation
num_features_imp = [feat+"_imp" for feat in num_features]

In [12]:
# set up the ML pipeline

# imputation of missing values
imputer = Imputer(inputCols=num_features, outputCols=num_features_imp)

# assembler to collect all the features
vec_assembler = VectorAssembler(inputCols=num_features_imp, outputCol="features")

# scaling of the fetures
scaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd=True, withMean=True)

# ridge regression
regression = LinearRegression(featuresCol="features_scaled", labelCol="SalePriceLog", elasticNetParam=0)

# pipeline, combining all the steps
pipe = Pipeline(stages=[imputer, vec_assembler, scaler, regression])

In [13]:
# create a parameter gird for hyperparameter tuning
grid = tune.ParamGridBuilder()
grid = grid.addGrid(regression.regParam, [0.001, 0.01, 0.1, 1, 10, 50, 100, 500, 1000])
grid = grid.addGrid(imputer.strategy, ["mean", "median"])
grid = grid.build()

In [14]:
# define the evaluation criteria
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="SalePriceLog", metricName='rmse')

# create the CrossValidator
cv = tune.CrossValidator(estimator=pipe, estimatorParamMaps=grid, evaluator=evaluator)

# fit cross validation models
cv_models = cv.fit(train)

# extract the best model
bestPipeline = cv_models.bestModel

## Evaluate the Model

In [15]:
# predict and evaluate the validation set
val_prediction = bestPipeline.transform(val)
evaluator.evaluate(val_prediction)

0.14618590836430886

In [16]:
# predict the test set
test_prediction = bestPipeline.transform(test)

In [17]:
# create the submission file 
submission = test_prediction.select("Id", "prediction")
submission = submission.withColumn("SalePrice", exp("prediction"))
submission = submission.drop("prediction")
submission.write.csv(os.path.join(data_loc, "submission_spark"), header=True)

The submission scored 0.14394 on the kaggle public leader board.