## Pyspark Initiation and Setting

In [1]:
# Import SparkConf class into program
from pyspark import SparkConf

# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "Assignment 2"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

### SparkContext and SparkSession

In [2]:
# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# Using SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

### Importing Dataset

In [3]:
# Optimisation step - include schema definitions first before reading in data
from pyspark.sql.types import *

schema = StructType([
    StructField('ID', IntegerType(), True),
    StructField('Type', StringType(), True),
    StructField('Rooms', IntegerType(), True), 
    StructField('Year', IntegerType(), True), 
    StructField('Suburb', StringType(), True),
    StructField('Address', StringType(), True),
    StructField('Price', DoubleType(), True), 
    StructField('Method', StringType(), True), 
    StructField('SellerG', StringType(), True), 
    StructField('Distance', IntegerType(), True),
    StructField('Landsize', DoubleType(), True), 
    StructField('CouncilArea', StringType(), True), 
    StructField('Regionname', StringType(), True),
    StructField('Month', IntegerType(), True), 
    StructField('Day', IntegerType(), True), 
    StructField('MedianPrice', DoubleType(), True), 
    StructField('AuctionResult', IntegerType(), True),
    StructField('StreetSuffix', StringType(), True), 
    StructField('binnedDistance', IntegerType(), True)
])

file = "Dataset_for_Model_1_Price_Prediction.csv"

df = spark.read.csv(file,  
    header=True,
    schema=schema
)
df.show(1)

+---+----+-----+----+----------+-------------+--------+------+-------+--------+--------+--------------------+--------------------+-----+---+-----------+-------------+------------+--------------+
| ID|Type|Rooms|Year|    Suburb|      Address|   Price|Method|SellerG|Distance|Landsize|         CouncilArea|          Regionname|Month|Day|MedianPrice|AuctionResult|StreetSuffix|binnedDistance|
+---+----+-----+----+----------+-------------+--------+------+-------+--------+--------+--------------------+--------------------+-----+---+-----------+-------------+------------+--------------+
|  0|   t|    3|2018|point cook|23 Tribeca Dr|535000.0|    SA|  Point|      14|    78.0|wyndham city council|western metropolitan|    2| 17|   592000.0|            0|          Dr|             2|
+---+----+-----+----+----------+-------------+--------+------+-------+--------+--------+--------------------+--------------------+-----+---+-----------+-------------+------------+--------------+
only showing top 1 row



In [4]:
# Drop the address column, it will not be useful for learning from
df.drop('Address')

DataFrame[ID: int, Type: string, Rooms: int, Year: int, Suburb: string, Price: double, Method: string, SellerG: string, Distance: int, Landsize: double, CouncilArea: string, Regionname: string, Month: int, Day: int, MedianPrice: double, AuctionResult: int, StreetSuffix: string, binnedDistance: int]

Check the result:

In [5]:
# Print the schema in tree format 
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Suburb: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Method: string (nullable = true)
 |-- SellerG: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- Landsize: double (nullable = true)
 |-- CouncilArea: string (nullable = true)
 |-- Regionname: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- MedianPrice: double (nullable = true)
 |-- AuctionResult: integer (nullable = true)
 |-- StreetSuffix: string (nullable = true)
 |-- binnedDistance: integer (nullable = true)



### Remove nulls
This dataset has nulls reamining in it because it will be used for two different models, and we believe that different features will be of varying usefulness for each one (basically we did't want to delete data on account of a column that may not even be used in the end).

In [6]:
from pyspark.sql.functions import isnan, when, count, col
nulls = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()

In [7]:
# Look at the missing values across the columns
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().head()

Unnamed: 0,ID,Type,Rooms,Year,Suburb,Address,Price,Method,SellerG,Distance,Landsize,CouncilArea,Regionname,Month,Day,MedianPrice,AuctionResult,StreetSuffix,binnedDistance
0,0,0,0,0,0,0,854,0,0,0,0,0,0,0,0,267,0,0,0


In [8]:
# Drop the nulls
df = df.na.drop()

Check the result:

In [9]:
# Checking for the result after dropping null values
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas().head()

Unnamed: 0,ID,Type,Rooms,Year,Suburb,Address,Price,Method,SellerG,Distance,Landsize,CouncilArea,Regionname,Month,Day,MedianPrice,AuctionResult,StreetSuffix,binnedDistance
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [10]:
# Importing the required libraries

from pyspark import SparkConf # Spark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression

#### Features must be defined as numeric or categoric so they can be treated appropriately by the data pipeline.

In [11]:
categoryInputCols = ['Type', "Suburb", "Method", "SellerG", "CouncilArea", "Regionname", "StreetSuffix"]
numericInputCols = ["Rooms", "Year", "Distance", "Landsize", "Month", "Day", "MedianPrice", "AuctionResult"]

# The response var
numericOutputCol = 'Price'

categoryCols = categoryInputCols
numericCols = numericInputCols+[numericOutputCol]

#### Use the string indexer to convert the categoric variables into indicies

In [12]:
# Defining the output columns

outputCols=[f'{x}_index' for x in categoryInputCols]
inputIndexer = StringIndexer(inputCols=categoryCols, outputCols=outputCols).setHandleInvalid("keep")

Check result:

In [13]:
print(categoryCols)
print(outputCols)

# apply indexing to categoric data
df_indexed = inputIndexer.fit(df).transform(df)

# show encoded result
df_indexed.select(outputCols).show(2)

['Type', 'Suburb', 'Method', 'SellerG', 'CouncilArea', 'Regionname', 'StreetSuffix']
['Type_index', 'Suburb_index', 'Method_index', 'SellerG_index', 'CouncilArea_index', 'Regionname_index', 'StreetSuffix_index']
+----------+------------+------------+-------------+-----------------+----------------+------------------+
|Type_index|Suburb_index|Method_index|SellerG_index|CouncilArea_index|Regionname_index|StreetSuffix_index|
+----------+------------+------------+-------------+-----------------+----------------+------------------+
|       2.0|        84.0|         6.0|         96.0|             19.0|             2.0|               4.0|
|       2.0|        84.0|         0.0|         96.0|             19.0|             2.0|              13.0|
+----------+------------+------------+-------------+-----------------+----------------+------------------+
only showing top 2 rows



#### One hot encode the categoric data, to generate binary vectors for each category value in every categoric feature.

In [14]:

outputCols_OHE = [f'{x}_vec' for x in categoryInputCols]

#Define OneHotEncoder with the appropriate columns
encoder = OneHotEncoder(inputCols=inputCols_OHE,
                        outputCols=outputCols_OHE)

Check result:

In [15]:
model = encoder.fit(df_indexed)
# Call fit and transform to get the encoded results
df_encoded = model.transform(df_indexed)
# Display the output columns
df_encoded.select(outputCols_OHE).show(2)

+-------------+----------------+-------------+----------------+---------------+--------------+----------------+
|     Type_vec|      Suburb_vec|   Method_vec|     SellerG_vec|CouncilArea_vec|Regionname_vec|StreetSuffix_vec|
+-------------+----------------+-------------+----------------+---------------+--------------+----------------+
|(3,[2],[1.0])|(324,[84],[1.0])|(9,[6],[1.0])|(363,[96],[1.0])|(33,[19],[1.0])| (8,[2],[1.0])|  (22,[4],[1.0])|
|(3,[2],[1.0])|(324,[84],[1.0])|(9,[0],[1.0])|(363,[96],[1.0])|(33,[19],[1.0])| (8,[2],[1.0])| (22,[13],[1.0])|
+-------------+----------------+-------------+----------------+---------------+--------------+----------------+
only showing top 2 rows



Create the input columns

In [16]:
# inputCols are all the encoded columns from OHE plus numerical columns
inputCols=outputCols_OHE
assemblerInputs = outputCols_OHE + numericInputCols

# Define the assembler with appropriate input and output columns
assembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features")

Check results:

In [17]:
# use the asseembler transform() to get encoded results
df_assembled = assembler.transform(df_encoded)

# Display the output
df_assembled.select('features').show()

+--------------------+
|            features|
+--------------------+
|(770,[2,87,333,43...|
|(770,[2,87,327,43...|
|(770,[2,87,329,39...|
|(770,[0,40,327,36...|
|(770,[0,40,327,33...|
|(770,[0,40,327,33...|
|(770,[0,40,328,33...|
|(770,[0,40,327,34...|
|(770,[0,40,327,33...|
|(770,[0,40,329,33...|
|(770,[0,40,327,33...|
|(770,[0,40,327,33...|
|(770,[0,40,327,33...|
|(770,[0,40,327,33...|
|(770,[0,40,329,33...|
|(770,[0,40,327,33...|
|(770,[0,40,327,33...|
|(770,[0,40,330,33...|
|(770,[0,40,329,33...|
|(770,[0,40,330,33...|
+--------------------+
only showing top 20 rows



### Pipeline

A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

In [18]:
# Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
stages = [inputIndexer,encoder,assembler]

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df_pipeline = pipelineModel.transform(df)



### Split the data into training (70%) and testing (30%) datesets

In [19]:
# Divide data into train sets and test sets. 
# Seed is the value used to make the same data three times later
train, test = df_pipeline.randomSplit([0.7, 0.3], seed = 2020)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 22166
Test Dataset Count: 9328


### Decision Tree 

Decision trees are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions.

In [22]:
from pyspark.ml.regression import DecisionTreeRegressor

# Extracts the number of nodes in the decision tree and the tree depth in the model and stores it in dt.
dt = DecisionTreeRegressor(featuresCol = 'features', labelCol = 'Price')
dtModel = dt.fit(train)


In [23]:
dtPredictions = dtModel.transform(test)
dtPredictions.select('features','Price','prediction').show(10)

+--------------------+---------+------------------+
|            features|    Price|        prediction|
+--------------------+---------+------------------+
|(770,[0,40,327,33...|1315000.0| 832024.4106776181|
|(770,[0,40,327,33...| 711000.0| 832024.4106776181|
|(770,[0,40,327,33...|1051000.0|1242608.2911714772|
|(770,[0,40,327,33...| 590069.0|1242608.2911714772|
|(770,[0,40,327,33...| 573000.0| 1029331.357495069|
|(770,[0,40,330,33...| 545000.0| 1029331.357495069|
|(770,[0,40,328,39...|1225000.0| 1029331.357495069|
|(770,[0,40,327,34...| 887500.0| 1029331.357495069|
|(770,[0,3,327,341...| 707500.0| 832024.4106776181|
|(770,[0,3,328,363...| 801000.0| 832024.4106776181|
+--------------------+---------+------------------+
only showing top 10 rows



### Model Evaluation

Now that we have built a model, we need to evaluate how well it performs. In spark.ml there are classification, regression, clustering, and ranking evaluators. Given that this is a regression problem, we will use rootmean- square error (RMSE) and R2 (pronounced “R-squared”) to evaluate our model’s performance.

#### RMSE
Evaluator for Regression, which expects input columns prediction, label and an optional weight column. RMSE is a metric that ranges from zero to infinity. The closer it is to zero, the better.

In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="Price",
metricName="rmse")
rmse = regressionEvaluator.evaluate(dtPredictions)
print(f"RMSE is {rmse:.1f}")

RMSE is 347450.4


#### R2 (R-Squared)

In [25]:
# Calculating the R-squared value
r2 = regressionEvaluator.setMetricName("r2").evaluate(dtPredictions)
print(f"R2 is {r2}")

R2 is 0.7188016401964463


R squared at 0.72 indicates that in our model, approximate 72% of the variability in “Price” can be explained using the model.

### Cross Validation and Hyperparameter Tuning

In [26]:
# Defining the pipeline estimator
pipeline = Pipeline(stages = [inputIndexer,encoder,assembler, dt])

In [27]:
from pyspark.ml.tuning import ParamGridBuilder
# Specifying the hyperparameters and their respective values using the ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(dt.maxDepth, [2, 4, 6])
.addGrid(dt.maxBins, [10, 20, 40, 100]) 
# maxBins determines the number of bins into which your continuous features are discretized, or split
.build())

In [28]:
# Defining how to evaluate each of the models to determine which one perfroms best
# Using RegressionEvaluator and RMSE as the metric
evaluator = RegressionEvaluator(labelCol="Price",
predictionCol="prediction",
metricName="rmse")

We will perform the k-fold cross-validation using the CrossValidator, which accepts
an estimator, evaluator, and estimatorParamMaps so that it knows which model to
use, how to evaluate the model, and which hyperparameters to set for the model. We
also set the number of folds we want to split the data into (numFolds=3).

In [29]:
# Cross validator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
parallelism = 4)


In [30]:
# Fitting the cvModel to train data set
cvModel = cv.fit(train)
print(cvModel)

CrossValidatorModel_f343a9de7b27


In [31]:
bestModel = cvModel.bestModel

In [32]:
# Testing the model on test data set
dtPredictions = bestModel.transform(test)
dtPredictions.select('features','Price','prediction').show(10)

+--------------------+---------+------------------+
|            features|    Price|        prediction|
+--------------------+---------+------------------+
|(770,[0,40,327,33...|1315000.0| 838275.8362731795|
|(770,[0,40,327,33...| 711000.0| 838275.8362731795|
|(770,[0,40,327,33...|1051000.0|1289384.8909001958|
|(770,[0,40,327,33...| 590069.0|1289384.8909001958|
|(770,[0,40,327,33...| 573000.0| 893368.1394891945|
|(770,[0,40,330,33...| 545000.0| 893368.1394891945|
|(770,[0,40,328,39...|1225000.0| 893368.1394891945|
|(770,[0,40,327,34...| 887500.0| 893368.1394891945|
|(770,[0,3,327,341...| 707500.0| 838275.8362731795|
|(770,[0,3,328,363...| 801000.0| 838275.8362731795|
+--------------------+---------+------------------+
only showing top 10 rows



In [33]:
# Calculating the R-squared value
r2 = regressionEvaluator.setMetricName("r2").evaluate(dtPredictions)
print(f"R2 is {r2}")

R2 is 0.741979570250014


### Random Forest

Random forests are ensembles of decision trees. Random forests combine many decision trees in order to reduce the risk of overfitting. The spark.ml implementation supports random forests for binary and multiclass classification and for regression, using both continuous and categorical features.

Random forests truly demonstrate the power of distributed machine learning with
Spark, as each tree can be built independently of the other trees

In [34]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(labelCol="Price", featuresCol="features")
rfModel = rf.fit(train)

In [35]:
rfPredictions = rfModel.transform(test)
rfPredictions.select('features','Price','prediction').show(2)

+--------------------+---------+-----------------+
|            features|    Price|       prediction|
+--------------------+---------+-----------------+
|(770,[0,40,327,33...|1315000.0|945188.6778585704|
|(770,[0,40,327,33...| 711000.0|921398.0813243687|
+--------------------+---------+-----------------+
only showing top 2 rows



### Model Evaluation

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="Price",
metricName="rmse")
rmse = regressionEvaluator.evaluate(rfPredictions)
print(f"RMSE is {rmse:.1f}")

RMSE is 341010.3


In [37]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(rfPredictions)
print(f"R2 is {r2}")

R2 is 0.7291293179977745


R squared at 0.73 indicates that in our model, approximate 73% of the variability in “Price” can be explained using the model.

### Cross Validation and Hyperparameter Tuning

In [38]:
#stages = [inputIndexer,encoder,assembler, rf]
pipeline = Pipeline(stages = [inputIndexer,encoder,assembler, rf])

In [39]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2, 4, 6])
.addGrid(rf.numTrees, [10, 100])
.build())

In [40]:
evaluator = RegressionEvaluator(labelCol="Price",
predictionCol="prediction",
metricName="rmse")

In [41]:
# Cross validator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=rf,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
parallelism = 4)

In [42]:
cvModel = cv.fit(train)
print(cvModel)

CrossValidatorModel_b362523087b7


In [43]:
bestModel = cvModel.bestModel

In [44]:
# Testing the model on test data set
rfPredictions = bestModel.transform(test)
rfPredictions.select('features','Price','prediction').show(10)

+--------------------+---------+------------------+
|            features|    Price|        prediction|
+--------------------+---------+------------------+
|(770,[0,40,327,33...|1315000.0| 960263.5144604783|
|(770,[0,40,327,33...| 711000.0| 958035.6614290236|
|(770,[0,40,327,33...|1051000.0|1192403.0944232033|
|(770,[0,40,327,33...| 590069.0|1192403.0944232033|
|(770,[0,40,327,33...| 573000.0|1103173.6755854357|
|(770,[0,40,330,33...| 545000.0|1103173.6755854357|
|(770,[0,40,328,39...|1225000.0|1131312.6823927257|
|(770,[0,40,327,34...| 887500.0|1131312.6823927257|
|(770,[0,3,327,341...| 707500.0| 796535.8046203809|
|(770,[0,3,328,363...| 801000.0| 805351.2148404751|
+--------------------+---------+------------------+
only showing top 10 rows



In [45]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(rfPredictions)
print(f"R2 is {r2}")

R2 is 0.7556648858805033


### Gradient-boosted tree regression

Gradient-Boosted Trees (GBTs) are ensembles of decision trees. GBTs iteratively train decision trees in order to minimize a loss function. The spark.ml implementation supports GBTs for binary classification and for regression, using both continuous and categorical features.

In [46]:
from pyspark.ml.regression import GBTRegressor
# Create an initial model using the train set.
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'Price', maxIter=10)
gbtModel = gbt.fit(train)

In [47]:
gbtPredictions = gbtModel.transform(test)
gbtPredictions.select('features','Price','prediction').show(2)

+--------------------+---------+-----------------+
|            features|    Price|       prediction|
+--------------------+---------+-----------------+
|(770,[0,40,327,33...|1315000.0|818475.2469973344|
|(770,[0,40,327,33...| 711000.0|818475.2469973344|
+--------------------+---------+-----------------+
only showing top 2 rows



### Model Evaluation

In [48]:
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="Price",
metricName="rmse")
rmse = regressionEvaluator.evaluate(gbtPredictions)
print(f"RMSE is {rmse:.1f}")

RMSE is 316104.4


In [49]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(gbtPredictions)
print(f"R2 is {r2}")

R2 is 0.7672507750229816


R squared at 0.77 indicates that in our model, approximate 77% of the variability in “Price” can be explained using the model.

### Cross Validation and Hyperparameter Tuning

In [50]:
pipeline = Pipeline(stages = [inputIndexer,encoder,assembler, gbt])

In [51]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(gbt.maxDepth, [2, 4, 6])
.addGrid(gbt.maxBins, [10, 20, 40, 100])
.build())

In [52]:
evaluator = RegressionEvaluator(labelCol="Price",
predictionCol="prediction",
metricName="rmse")

In [53]:
# Cross validator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=gbt,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
parallelism = 4)

In [54]:
cvModel = cv.fit(train)
print(cvModel)

CrossValidatorModel_dfbc81c7e48b


In [55]:
bestModel = cvModel.bestModel

In [None]:
# Testing the model on test data set
gbtPredictions = bestModel.transform(test)
gbtPredictions.select('features','Price','prediction').show(10)

In [57]:
r2 = regressionEvaluator.setMetricName("r2").evaluate(gbtPredictions)
print(f"R2 is {r2}")

R2 is 0.7719249211850375


Cross validation and hyperparameter tuning does make difference in determining which model performs best. the two models such as decision tree, random forest predictive performance was improved noticeably, where as for the gradient-boosted tree model the result was not changed
