# Machine learning on house price prediction using the distributed computing system

In [None]:
# how install and integrate pyspark with jupyter:
# https://www.dataquest.io/blog/pyspark-installation-guide/

## Import required libraries

In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.feature import VectorAssembler, StringIndexer

import pyspark.sql.functions as F

import pandas as pd

In [2]:
sc = SparkContext(appName = "HousePrice")

In [3]:
sc.version

'3.0.1'

In [4]:
spark = SparkSession.Builder().getOrCreate() 

## Download and preprocessing

In [5]:
# to read in data from hadoop cluster.
df = spark.read.option('header', 'true').option('inferSchema', 'true').csv('hdfs://lena-master:8020/user/zyanm001/Mel_data.csv')

In [6]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- Bedroom2: double (nullable = true)
 |-- Bathroom: double (nullable = true)
 |-- Car: double (nullable = true)
 |-- Landsize: double (nullable = true)
 |-- Lattitude: double (nullable = true)
 |-- Longtitude: double (nullable = true)
 |-- Propertycount: double (nullable = true)
 |-- log_price: double (nullable = true)



In [7]:
#sparkDF.count() is an action that returns the number of rows in a DataFrame and 
# sparkDF.columns returns all columns in a list, python len() function returns the length of the list.

print((df.count(), len(df.columns)))

(24989, 13)


In [8]:
df.limit(1).toPandas()

Unnamed: 0,_c0,Rooms,Price,SellerG,Distance,Bedroom2,Bathroom,Car,Landsize,Lattitude,Longtitude,Propertycount,log_price
0,1,2,1480000.0,Biggin,2.5,2.0,1.0,1.0,202.0,-37.7996,144.9984,4019.0,14.207553


## Creating training and test datasets 

In [9]:
trainData, testData = df.randomSplit([0.7, 0.3], seed=30)

In [10]:
print((trainData.count(), len(trainData.columns)))
print((testData.count(), len(testData.columns)))

(17501, 13)
(7488, 13)


# Vectorisation via VectorAssembler and StringIndexer  

StringIndexer transform string column 'SellerG' to numerical values

In [11]:
stringIndex = StringIndexer(inputCol='SellerG', outputCol='SellerG_Indexed', handleInvalid='skip' )

In [12]:
df1 = stringIndex.fit(trainData).transform(trainData)
df1.show(1)

+---+-----+---------+-------+--------+--------+--------+---+--------+---------+----------+-------------+------------------+---------------+
|_c0|Rooms|    Price|SellerG|Distance|Bedroom2|Bathroom|Car|Landsize|Lattitude|Longtitude|Propertycount|         log_price|SellerG_Indexed|
+---+-----+---------+-------+--------+--------+--------+---+--------+---------+----------+-------------+------------------+---------------+
|  1|    2|1480000.0| Biggin|     2.5|     2.0|     1.0|1.0|   202.0| -37.7996|  144.9984|       4019.0|14.207552645740298|            9.0|
+---+-----+---------+-------+--------+--------+--------+---+--------+---------+----------+-------------+------------------+---------------+
only showing top 1 row



Vectorise features with Vector Assimbler

In [13]:
assembler = VectorAssembler(inputCols=['Rooms', 'Distance', 'Bedroom2', 'Bathroom', 'Car', 'Landsize',\
                                       'Lattitude', 'Longtitude', 'Propertycount', 'SellerG_Indexed'], outputCol='features')

In [14]:
df2 = assembler.transform(df1)
df2.show(1)

+---+-----+---------+-------+--------+--------+--------+---+--------+---------+----------+-------------+------------------+---------------+--------------------+
|_c0|Rooms|    Price|SellerG|Distance|Bedroom2|Bathroom|Car|Landsize|Lattitude|Longtitude|Propertycount|         log_price|SellerG_Indexed|            features|
+---+-----+---------+-------+--------+--------+--------+---+--------+---------+----------+-------------+------------------+---------------+--------------------+
|  1|    2|1480000.0| Biggin|     2.5|     2.0|     1.0|1.0|   202.0| -37.7996|  144.9984|       4019.0|14.207552645740298|            9.0|[2.0,2.5,2.0,1.0,...|
+---+-----+---------+-------+--------+--------+--------+---+--------+---------+----------+-------------+------------------+---------------+--------------------+
only showing top 1 row



## Build pipeline model to predict target variable

### Decide the parameters

In [15]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct('SellerG')).show()

+-----------------------+
|count(DISTINCT SellerG)|
+-----------------------+
|                    337|
+-----------------------+



parameter 'maxBins' should not be smaller than discretization of Categorical features. Hence, we will set 'maxBins' to 337

In [16]:
rf = RandomForestRegressor(labelCol = "log_price", maxBins=337)

### Hyperparameter Tuning for pipeline model

How do we decide the optimum number of trees and maximum tree depth?

In [17]:
# Define the estimator as rf

# Define the evaluator: 
evaluator = RegressionEvaluator(labelCol="log_price", predictionCol="prediction", metricName='rmse')

# Specify hyperparameters : 
paramGrid = (ParamGridBuilder()\
            .addGrid(rf.maxDepth, [2, 4, 6, 10])\
            .addGrid(rf.numTrees, [10, 50, 100])\
            .build())

# Use the CrossValidator to perform cross-validation, evaluating each of the various models
cv = CrossValidator(estimator=rf,\
                   evaluator=evaluator,\
                   estimatorParamMaps=paramGrid,\
                   numFolds=5,\
                   seed=42)

# build the pipeline
cvPipe = Pipeline(stages=[stringIndex, assembler, cv])

# build the pipeline model
cvPipeModel = cvPipe.fit(trainData)

In [18]:
cvModel = cvPipeModel.stages[-1]

In [19]:
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))

[({Param(parent='RandomForestRegressor_bbfcf6f85457', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
   Param(parent='RandomForestRegressor_bbfcf6f85457', name='numTrees', doc='Number of trees to train (>= 1).'): 10},
  0.40559867991986365),
 ({Param(parent='RandomForestRegressor_bbfcf6f85457', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
   Param(parent='RandomForestRegressor_bbfcf6f85457', name='numTrees', doc='Number of trees to train (>= 1).'): 50},
  0.39787033359321045),
 ({Param(parent='RandomForestRegressor_bbfcf6f85457', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2,
   Param(parent='RandomForestRegressor_bbfcf6f85457', name='numTrees', doc='Number of trees to train (>= 1).'): 100},
  0.39741596100

Results shown when 'maxDepth' was setted to 10 and 'numTrees' was setted to 100, we got the best rmse estimate as 0.265. 


## Model building for prediction and evaluation

In [25]:
rf.setNumTrees(100).setMaxDepth(10)

RandomForestRegressor_bbfcf6f85457

In [26]:
rf_pipe = Pipeline(stages=[stringIndex, assembler, rf])

In [27]:
rf_pipeModel = rf_pipe.fit(trainData)

In [28]:
predictions = rf_pipeModel.transform(testData)

In [29]:
predictions.select('features', 'log_price', 'prediction').show(2)

+--------------------+------------------+------------------+
|            features|         log_price|        prediction|
+--------------------+------------------+------------------+
|[2.0,2.5,2.0,1.0,...|13.849911984681606|13.848156722455006|
|[3.0,2.5,3.0,2.0,...|14.197365800433305|14.087477635086342|
+--------------------+------------------+------------------+
only showing top 2 rows



In [30]:
MSE = evaluator.setMetricName('mse').evaluate(predictions)
print(f"MSE is {MSE}")

MSE is 0.06776965161455895


In [31]:
RMSE = evaluator.setMetricName('rmse').evaluate(predictions)
print(f"RMSE is {RMSE}")

RMSE is 0.2603260486669725


In [32]:
predictions_trainData = rf_pipeModel.transform(trainData)

In [33]:
MSE_trainData = evaluator.setMetricName('mse').evaluate(predictions_trainData)
print(f"MSE is {MSE_trainData}")

MSE is 0.05003718662795921


In [34]:
RMSE_trainData = evaluator.setMetricName('rmse').evaluate(predictions_trainData)
print(f"RMSE is {RMSE_trainData}")

RMSE is 0.22368993412301594


In comparison, the pipeline model performance on training dataset and test dataset are close, which indicates the model correctly learns the pattern  instead of memorising the data(overfitting).

## Model optimising by eliminating insignificant features

### Feature importance 

In [35]:
rfModel = rf_pipeModel.stages[-1]
# print(rfModel.toDebugString)

It is possible to split on the same feature multiple times in a single decision tree but based on different values. Feature 9 appears frequently in single tree splitings, which can also be interpreted as decisions depending frequently on feature 9

In [36]:
featureimp = pd.DataFrame(list(zip(assembler.getInputCols(), rfModel.featureImportances)),\
                          columns=['feature', 'importance'])
featureimp.sort_values(by='importance', ascending=False)

Unnamed: 0,feature,importance
9,SellerG_Indexed,0.310629
0,Rooms,0.226974
1,Distance,0.125013
6,Lattitude,0.093284
7,Longtitude,0.07443
2,Bedroom2,0.069207
5,Landsize,0.046613
3,Bathroom,0.026246
8,Propertycount,0.019504
4,Car,0.008101


The feature importance could be extracted as above. Feature 9 scored highest importance, while the feature 'Car' contributes less than 1% of the prediction. We will remove this feature and evaluate model performance

In [37]:
assembler_new = VectorAssembler(inputCols=['Rooms', 'Distance', 'Bedroom2', 'Bathroom', 'Landsize',\
                                       'Lattitude', 'Longtitude', 'Propertycount', 'SellerG_Indexed'], outputCol='features')

In [38]:
rf_pipe_new = Pipeline(stages=[stringIndex, assembler_new, rf])
rf_pipeModel_new = rf_pipe_new.fit(trainData)
predictions_new = rf_pipeModel_new.transform(testData)

In [44]:
MSE_new = evaluator.setMetricName('mse').evaluate(predictions_new)
print(f"MSE is {MSE_new}")

MSE is 0.06938094178890876


In [45]:
RMSE_new = evaluator.setMetricName('rmse').evaluate(predictions_new)
print(f"RMSE is {RMSE_new}")

RMSE is 0.26340262297271977


In [46]:
predictions_new_trainData = rf_pipeModel_new.transform(trainData)

In [47]:
MSE_new_trainData = evaluator.setMetricName('mse').evaluate(predictions_new_trainData)
print(f"RMSE is {MSE_new_trainData}")

RMSE is 0.051875714571081984


In [48]:
RMSE_new_trainData = evaluator.setMetricName('rmse').evaluate(predictions_new_trainData)
print(f"RMSE is {RMSE_new_trainData}")

RMSE is 0.22776240816052587


Comparing with previous model, the RMSE has increase slightly in both training dataset and testing dataset. That is reasonable as new model has less feature for prediction. However, new model will consuming less computing resource with little decrease in performance by reducing feature numbers.

## Price prediction with decision tree regression model

### Hyperparameter tuning

In [49]:
# Define the estimator,
dt = DecisionTreeRegressor(labelCol='log_price', maxBins=337)

# Define the evaluator: 
dt_eval = RegressionEvaluator(labelCol="log_price", predictionCol="prediction", metricName='rmse')

# Specify hyperparameters : 
dtparamGrid = (ParamGridBuilder()\
            .addGrid(dt.maxDepth, [2, 4, 6, 10,15,20])\
            .build())

# Use the CrossValidator to perform cross-validation, evaluating each of the various models
dtcv = CrossValidator(estimator=dt,\
                   evaluator=dt_eval,\
                   estimatorParamMaps=dtparamGrid,\
                   numFolds=5,\
                   seed=42)

# build the pipeline
dtcvPipe = Pipeline(stages=[stringIndex, assembler_new, dtcv])

# build the pipeline model
dtcvPipeModel = dtcvPipe.fit(trainData)

In [50]:
dtcvModel = dtcvPipeModel.stages[-1]

In [51]:
list(zip(dtcvModel.getEstimatorParamMaps(), dtcvModel.avgMetrics))

[({Param(parent='DecisionTreeRegressor_34b796ac5db4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 2},
  0.4175162516468495),
 ({Param(parent='DecisionTreeRegressor_34b796ac5db4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4},
  0.35579910077477717),
 ({Param(parent='DecisionTreeRegressor_34b796ac5db4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 6},
  0.3242670208071366),
 ({Param(parent='DecisionTreeRegressor_34b796ac5db4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 10},
  0.30436179951340964),
 ({Param(parent='DecisionTreeRegressor_34b796ac5db4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth

When the 'maxDepth' was 10, we got the lowest(best) rsme value. 

In [52]:
dt.setMaxDepth(10)

DecisionTreeRegressor_34b796ac5db4

In [53]:
dtPipe = Pipeline(stages=[stringIndex, assembler_new, dt])

In [54]:
dtPipeModel = dtPipe.fit(trainData)

In [55]:
dtPrediction = dtPipeModel.transform(testData)

In [56]:
MSE_dtPrediction = dt_eval.setMetricName('mse').evaluate(dtPrediction)
print(f"MSE is {MSE_dtPrediction}")

MSE is 0.08580504024492315


In [57]:
RMSE_dtPrediction = dt_eval.setMetricName('rmse').evaluate(dtPrediction)
print(f"RMSE is {RMSE_dtPrediction}")

RMSE is 0.29292497374741394


In [58]:
dtPrediction_trainData = dtPipeModel.transform(trainData)

In [59]:
MSE_dtTrainData = dt_eval.setMetricName('mse').evaluate(dtPrediction_trainData)
print(f"MSE is {MSE_dtTrainData}")

MSE is 0.052858278048429005


In [60]:
RMSE_dtTrainData = dt_eval.setMetricName('rmse').evaluate(dtPrediction_trainData)
print(f"RMSE is {RMSE_dtTrainData}")

RMSE is 0.2299092822145922


the rmse of training dataset is lower than testing dataset. However, the difference is not big. The decision tree model have correctly learned the patterns from the data instead of memorising data.

Overall, random forest regression model performed better than decision tree regression model

In [70]:
spark.stop()