# Predicting House Prices using pyspark

📌 In this section, we will predict house prices using pyspark.

## Business Problem

📌 Here we need to predict the selling price of each house. Click on this <a href="https://www.kaggle.com/competitions/house-prices-advanced-regression-techniques/overview/description">link</a> to review the data set and variables.

# Create Session in Spark

In [1]:
!pip install findspark
import findspark
findspark.init("C:\spark")
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
spark = SparkSession.builder \
    .master("local") \
    .appName("House_Price_Prediction_Regression") \
    .config("spark.sql.shuffle.partitions","2") \
    .getOrCreate()
sc = spark.sparkContext



In [2]:
sc

# Import Necesaary Libraries

In [3]:
import pandas as pd
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
pd.set_option("display.width", 500)
pd.set_option("display.float_format", lambda x: '%.4f' % x)
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Import Dataset

In [4]:
df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("train.csv")
     )
df.persist()

DataFrame[Id: int, MSSubClass: int, MSZoning: string, LotFrontage: string, LotArea: int, Street: string, Alley: string, LotShape: string, LandContour: string, Utilities: string, LotConfig: string, LandSlope: string, Neighborhood: string, Condition1: string, Condition2: string, BldgType: string, HouseStyle: string, OverallQual: int, OverallCond: int, YearBuilt: int, YearRemodAdd: int, RoofStyle: string, RoofMatl: string, Exterior1st: string, Exterior2nd: string, MasVnrType: string, MasVnrArea: string, ExterQual: string, ExterCond: string, Foundation: string, BsmtQual: string, BsmtCond: string, BsmtExposure: string, BsmtFinType1: string, BsmtFinSF1: int, BsmtFinType2: string, BsmtFinSF2: int, BsmtUnfSF: int, TotalBsmtSF: int, Heating: string, HeatingQC: string, CentralAir: string, Electrical: string, 1stFlrSF: int, 2ndFlrSF: int, LowQualFinSF: int, GrLivArea: int, BsmtFullBath: int, BsmtHalfBath: int, FullBath: int, HalfBath: int, BedroomAbvGr: int, KitchenAbvGr: int, KitchenQual: string

In [5]:
df.limit(5).toPandas()

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Neighborhood,Condition1,Condition2,BldgType,HouseStyle,OverallQual,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,CentralAir,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,GrLivArea,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,1,60,RL,65,8450,Pave,,Reg,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2003,2003,Gable,CompShg,VinylSd,VinylSd,BrkFace,196,Gd,TA,PConc,Gd,TA,No,GLQ,706,Unf,0,150,856,GasA,Ex,Y,SBrkr,856,854,0,1710,1,0,2,1,3,1,Gd,8,Typ,0,,Attchd,2003,RFn,2,548,TA,TA,Y,0,61,0,0,0,0,,,,0,2,2008,WD,Normal,208500
1,2,20,RL,80,9600,Pave,,Reg,Lvl,AllPub,FR2,Gtl,Veenker,Feedr,Norm,1Fam,1Story,6,8,1976,1976,Gable,CompShg,MetalSd,MetalSd,,0,TA,TA,CBlock,Gd,TA,Gd,ALQ,978,Unf,0,284,1262,GasA,Ex,Y,SBrkr,1262,0,0,1262,0,1,2,0,3,1,TA,6,Typ,1,TA,Attchd,1976,RFn,2,460,TA,TA,Y,298,0,0,0,0,0,,,,0,5,2007,WD,Normal,181500
2,3,60,RL,68,11250,Pave,,IR1,Lvl,AllPub,Inside,Gtl,CollgCr,Norm,Norm,1Fam,2Story,7,5,2001,2002,Gable,CompShg,VinylSd,VinylSd,BrkFace,162,Gd,TA,PConc,Gd,TA,Mn,GLQ,486,Unf,0,434,920,GasA,Ex,Y,SBrkr,920,866,0,1786,1,0,2,1,3,1,Gd,6,Typ,1,TA,Attchd,2001,RFn,2,608,TA,TA,Y,0,42,0,0,0,0,,,,0,9,2008,WD,Normal,223500
3,4,70,RL,60,9550,Pave,,IR1,Lvl,AllPub,Corner,Gtl,Crawfor,Norm,Norm,1Fam,2Story,7,5,1915,1970,Gable,CompShg,Wd Sdng,Wd Shng,,0,TA,TA,BrkTil,TA,Gd,No,ALQ,216,Unf,0,540,756,GasA,Gd,Y,SBrkr,961,756,0,1717,1,0,1,0,3,1,Gd,7,Typ,1,Gd,Detchd,1998,Unf,3,642,TA,TA,Y,0,35,272,0,0,0,,,,0,2,2006,WD,Abnorml,140000
4,5,60,RL,84,14260,Pave,,IR1,Lvl,AllPub,FR2,Gtl,NoRidge,Norm,Norm,1Fam,2Story,8,5,2000,2000,Gable,CompShg,VinylSd,VinylSd,BrkFace,350,Gd,TA,PConc,Gd,TA,Av,GLQ,655,Unf,0,490,1145,GasA,Ex,Y,SBrkr,1145,1053,0,2198,1,0,2,1,4,1,Gd,9,Typ,1,TA,Attchd,2000,RFn,3,836,TA,TA,Y,192,84,0,0,0,0,,,,0,12,2008,WD,Normal,250000


In [6]:
df.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: string (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |--

# Schema Check and Fixes

In [7]:
df.select("LotFrontage").filter("LotFrontage == 'NA'").count()

259

In [8]:
df.select("MasVnrArea").filter("MasVnrArea == 'NA'").count()

8

In [9]:
df.select("GarageYrBlt").filter("GarageYrBlt == 'NA'").count()

81

## cast mis-iferred dtypes

In [10]:
df.selectExpr("AVG(LotFrontage)").show()
df.selectExpr("AVG(LotFrontage)").collect()[0]["avg(LotFrontage)"]

+-----------------+
| avg(LotFrontage)|
+-----------------+
|70.04995836802665|
+-----------------+



70.04995836802665

In [11]:
df.selectExpr("AVG(MasVnrArea)").show()
df.selectExpr("AVG(MasVnrArea)").collect()[0]["avg(MasVnrArea)"]

+------------------+
|   avg(MasVnrArea)|
+------------------+
|103.68526170798899|
+------------------+



103.68526170798899

In [12]:
df.selectExpr("AVG(GarageYrBlt)").show()
df.selectExpr("AVG(GarageYrBlt)").collect()[0]["avg(GarageYrBlt)"]

+------------------+
|  avg(GarageYrBlt)|
+------------------+
|1978.5061638868744|
+------------------+



1978.5061638868744

In [13]:
avg_LotFrontage = df.selectExpr("AVG(LotFrontage)").collect()[0]["avg(LotFrontage)"]
avg_MasVnrArea = df.selectExpr("AVG(MasVnrArea)").collect()[0]["avg(MasVnrArea)"]
avg_GarageYrBlt = df.selectExpr("AVG(GarageYrBlt)").collect()[0]["avg(GarageYrBlt)"]

df1 = df.withColumn("LotFrontage", F.when(F.col("LotFrontage") == 'NA', avg_LotFrontage).otherwise(F.col("LotFrontage"))) \
        .withColumn("LotFrontage", F.col("LotFrontage").cast("int")) \
        .withColumn("MasVnrArea", F.when(F.col("MasVnrArea") == 'NA', avg_MasVnrArea).otherwise(F.col("MasVnrArea"))) \
        .withColumn("MasVnrArea", F.col("MasVnrArea").cast("int")) \
        .withColumn("GarageYrBlt", F.when(F.col("GarageYrBlt") == 'NA', avg_GarageYrBlt).otherwise(F.col("GarageYrBlt"))) \
        .withColumn("GarageYrBlt", F.col("GarageYrBlt").cast("int"))

In [14]:
df.select("LotFrontage","MasVnrArea","GarageYrBlt").printSchema()

root
 |-- LotFrontage: string (nullable = true)
 |-- MasVnrArea: string (nullable = true)
 |-- GarageYrBlt: string (nullable = true)



In [15]:
df1.select("LotFrontage","MasVnrArea","GarageYrBlt").printSchema()

root
 |-- LotFrontage: integer (nullable = true)
 |-- MasVnrArea: integer (nullable = true)
 |-- GarageYrBlt: integer (nullable = true)



# Missing Value Analysis

In [16]:
df1.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df1.columns]).toPandas()

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,LotConfig,LandSlope,Neighborhood,Condition1,Condition2,BldgType,HouseStyle,OverallQual,OverallCond,YearBuilt,YearRemodAdd,RoofStyle,RoofMatl,Exterior1st,Exterior2nd,MasVnrType,MasVnrArea,ExterQual,ExterCond,Foundation,BsmtQual,BsmtCond,BsmtExposure,BsmtFinType1,BsmtFinSF1,BsmtFinType2,BsmtFinSF2,BsmtUnfSF,TotalBsmtSF,Heating,HeatingQC,CentralAir,Electrical,1stFlrSF,2ndFlrSF,LowQualFinSF,GrLivArea,BsmtFullBath,BsmtHalfBath,FullBath,HalfBath,BedroomAbvGr,KitchenAbvGr,KitchenQual,TotRmsAbvGrd,Functional,Fireplaces,FireplaceQu,GarageType,GarageYrBlt,GarageFinish,GarageCars,GarageArea,GarageQual,GarageCond,PavedDrive,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [17]:
def null_count(dataframe, col_name):
    nc = dataframe.select(col_name).filter(
        (F.col(col_name) == "NA") |
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
    ).count()
    return nc

In [18]:
null_count(df1, "Alley")

1369

In [19]:
def show_all_null(dataframe):
    for col_name in dataframe.dtypes:
        nc = null_count(dataframe, col_name[0])
        if nc > 0:
            print("{} ===> {} , Ratio: {:.2f}".format(col_name[0], nc, (nc/dataframe.count())*100))

In [20]:
show_all_null(df1)

Alley ===> 1369 , Ratio: 93.77
MasVnrType ===> 8 , Ratio: 0.55
BsmtQual ===> 37 , Ratio: 2.53
BsmtCond ===> 37 , Ratio: 2.53
BsmtExposure ===> 38 , Ratio: 2.60
BsmtFinType1 ===> 37 , Ratio: 2.53
BsmtFinType2 ===> 38 , Ratio: 2.60
Electrical ===> 1 , Ratio: 0.07
FireplaceQu ===> 690 , Ratio: 47.26
GarageType ===> 81 , Ratio: 5.55
GarageFinish ===> 81 , Ratio: 5.55
GarageQual ===> 81 , Ratio: 5.55
GarageCond ===> 81 , Ratio: 5.55
PoolQC ===> 1453 , Ratio: 99.52
Fence ===> 1179 , Ratio: 80.75
MiscFeature ===> 1406 , Ratio: 96.30


# Analysis of Categorical and Numerical Variables

In [21]:
categorical_cols = []
numerical_cols = []
label_col = ["SalePrice"]
# Alley, FireplaceQu, PoolQC, Fence, MiscFeature have too much null
# Id has too many categories
# Utilities, Condition2, RoofMatl, Heating has one weak categories and binary, shoud be discarted
discarted_cols = ["Id", "Alley","FireplaceQu","PoolQC","Fence","MiscFeature","Utilities","Condition2","RoofMatl","Heating"]

In [22]:
def grab_cat_num_cols(dataframe):
    for col_name in dataframe.dtypes:
        if (col_name[0] not in label_col+discarted_cols):
            if col_name[1] == "string":
                categorical_cols.append(col_name[0])
            else:
                numerical_cols.append(col_name[0])
    return categorical_cols, numerical_cols

In [23]:
categorical_cols, numerical_cols = grab_cat_num_cols(df1)

#Print Categorical and Numerical Variables
print(f"Observations: {df1.count()}")
print(f"Variables: {len(df1.columns)}")
print(f"Cat_cols: {len(categorical_cols)}")
print(f"Num_cols: {len(numerical_cols)}")

Observations: 1460
Variables: 81
Cat_cols: 34
Num_cols: 36


In [24]:
# column check
if (len(df1.columns) == (len(label_col) + len(discarted_cols) + len(categorical_cols) + len(numerical_cols))):
    print("column check is True")
else:
    print("There is a problem for column check")

column check is True


In [25]:
def examine_categories(dataframe, cat_cols):
    for cat_col in cat_cols:
        print(cat_col)
        dataframe.groupBy(cat_col).count().orderBy(F.desc("count")).show()

In [26]:
examine_categories(df1, categorical_cols)

MSZoning
+--------+-----+
|MSZoning|count|
+--------+-----+
|      RL| 1151|
|      RM|  218|
|      FV|   65|
|      RH|   16|
| C (all)|   10|
+--------+-----+

Street
+------+-----+
|Street|count|
+------+-----+
|  Pave| 1454|
|  Grvl|    6|
+------+-----+

LotShape
+--------+-----+
|LotShape|count|
+--------+-----+
|     Reg|  925|
|     IR1|  484|
|     IR2|   41|
|     IR3|   10|
+--------+-----+

LandContour
+-----------+-----+
|LandContour|count|
+-----------+-----+
|        Lvl| 1311|
|        Bnk|   63|
|        HLS|   50|
|        Low|   36|
+-----------+-----+

LotConfig
+---------+-----+
|LotConfig|count|
+---------+-----+
|   Inside| 1052|
|   Corner|  263|
|  CulDSac|   94|
|      FR2|   47|
|      FR3|    4|
+---------+-----+

LandSlope
+---------+-----+
|LandSlope|count|
+---------+-----+
|      Gtl| 1382|
|      Mod|   65|
|      Sev|   13|
+---------+-----+

Neighborhood
+------------+-----+
|Neighborhood|count|
+------------+-----+
|       NAmes|  225|
|     CollgCr

# Encoding Scaling

In [27]:
def find_binary_cols(dataframe, cat_cols):
    binary_cols = dataframe.select([col for col in cat_cols if dataframe.select(col).dtypes[0][1] == "string" and dataframe.select(col).distinct().count() == 2])
    return binary_cols

In [28]:
binary_cols = find_binary_cols(df1, categorical_cols)
print(binary_cols.columns)

['Street', 'CentralAir']


In [29]:
my_dict = {}
string_indexer_objs = []
string_indexer_output_names = []
ohe_input_names = []
ohe_output_names = []

for col_name in categorical_cols:
    my_dict[col_name+"_index_obj"] = StringIndexer() \
    .setHandleInvalid("skip") \
    .setInputCol(col_name) \
    .setOutputCol(col_name+"_indexed")
    
    string_indexer_objs.append(my_dict.get(col_name+"_index_obj"))
    string_indexer_output_names.append(col_name+"_indexed")
    
    if col_name not in binary_cols.columns:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [30]:
not_to_hot_coded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(not_to_hot_coded)

['CentralAir_indexed', 'Street_indexed']


In [31]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)
assembler = VectorAssembler().setHandleInvalid("skip").setInputCols(numerical_cols + not_to_hot_coded + ohe_output_names).setOutputCol("unscaled_features")
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

# Create Model

In [32]:
# split dataset
train_df, test_df = df1.randomSplit([0.8, 0.2], seed=123)
print(train_df.count() , test_df.count())

1158 302


In [33]:
# create Estimator
estimator = GBTRegressor(labelCol=label_col[0])

In [34]:
pipeline_obj = Pipeline().setStages(string_indexer_objs + [encoder, assembler, scaler, estimator])

In [35]:
# train model
pipeline_model = pipeline_obj.fit(train_df)

In [36]:
# prediction
transform_df = pipeline_model.transform(test_df)
transform_df.select("SalePrice", "prediction").show(5)

+---------+------------------+
|SalePrice|        prediction|
+---------+------------------+
|   223500| 227670.8416893489|
|   307000| 266581.4885757278|
|   144000|129422.39017124374|
|   279500|215377.43985408114|
|   159000|157782.08509550107|
+---------+------------------+
only showing top 5 rows



In [37]:
# evaluate model
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="r2")
print("R2: ", evaluator.evaluate(transform_df))

R2:  0.8073009833127116


In [38]:
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="rmse")
print("RMSE: ", evaluator.evaluate(transform_df))

RMSE:  36023.41533276803


# Model Tuning

In [41]:
paramGrid = (ParamGridBuilder()
             .addGrid(estimator.maxDepth, [2, 4, 6])
             .addGrid(estimator.maxBins, [20, 30])
             .addGrid(estimator.maxIter, [10, 20])
             .build())
cv = CrossValidator(estimator=pipeline_obj,
                    estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(labelCol=label_col[0]),
                    numFolds=3)

In [42]:
cv_model = cv.fit(train_df)

In [43]:
# prediction
y_pred = cv_model.transform(test_df)
y_pred.select("SalePrice", "prediction").show(5)

+---------+------------------+
|SalePrice|        prediction|
+---------+------------------+
|   223500|208197.37335363426|
|   307000| 282379.7266599659|
|   144000| 132023.3079400909|
|   279500|202093.32893774222|
|   159000|146652.34796104007|
+---------+------------------+
only showing top 5 rows



In [44]:
# evaluate model tuning
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="r2")
print("R2: ", evaluator.evaluate(y_pred))

R2:  0.7965168700659936


In [45]:
evaluator = RegressionEvaluator(labelCol=label_col[0], metricName="rmse")
print("RMSE: ", evaluator.evaluate(y_pred))

RMSE:  37017.69226067356
