# Housing Prices - Spark ML

This notebook is a continuation of the Housing Prices prediction. In this case, we will be looking at how to build similar models from the main scitkit learn example, but in this case using Spark ML.

First, we extract the data:

In [1]:
spark

Intitializing Scala interpreter ...

Spark Web UI available at http://bea7d995f668:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1570730255655)
SparkSession available as 'spark'


res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@385c9719


In [2]:
val data_key = "housing_data_raw.csv"

val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(s"./$data_key")

data_key: String = housing_data_raw.csv
df: org.apache.spark.sql.DataFrame = [_c0: int, address: string ... 17 more fields]


Next, we will drop the same outliers as before.

In [3]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

def drop_outliers(data: DataFrame) = {
    val drop = List(1618, 3405,10652, 954, 11136, 5103, 916, 10967, 7383, 1465, 8967, 8300, 4997)
    
    data.filter(not($"_c0".isin(drop:_*)))
}

val housing = drop_outliers(df)

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
drop_outliers: (data: org.apache.spark.sql.DataFrame)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
housing: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: int, address: string ... 17 more fields]


Then we will cleanup the `lastsolddate` column so that it is a numeric value.

In [4]:
val housing_dateint = housing.withColumn("lastsolddateint", unix_timestamp($"lastsolddate","MM/dd/yy"))

housing_dateint: org.apache.spark.sql.DataFrame = [_c0: int, address: string ... 18 more fields]


Then remove data that may not be helpful (at least initially) for building models.

In [5]:
def drop_geog(data: DataFrame, keep: List[String] = List()) = {
    val removeList = List("info","address","z_address","longitude","latitude","neighborhood",
                          "lastsolddate","zipcode","zpid","usecode", "zestimate","zindexvalue")
    .filter(!keep.contains(_))
    
    data.drop(removeList: _*)
}

drop_geog: (data: org.apache.spark.sql.DataFrame, keep: List[String])org.apache.spark.sql.DataFrame


In [6]:
val housing_dropgeo = drop_geog(housing_dateint)

housing_dropgeo: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 6 more fields]


Now we can move onto the Spark ML aspects. In order to train a model, the data needs to be formatted in a Spark DataFrame (which is not that much different conceptually from a Pandas DataFrame). But the features also need to be in a single column in the format of a Vector, which is essentially a list of all the features for that row. We can assemble the Vector with the following code, putting all the columns except the label, `lastsoldprice`, into the Vector. At the same time, we will split our data into a training and testing set.

In [7]:
import org.apache.spark.ml.feature.VectorAssembler

def train_test_split(data: DataFrame) = {
    
    val assembler = new VectorAssembler().
       setInputCols(data.drop("lastsoldprice").columns).
       setOutputCol("features")
    
    val Array(train, test) = data.randomSplit(Array(0.8, 0.2), seed = 30)

    (assembler.transform(train), assembler.transform(test))
}

import org.apache.spark.ml.feature.VectorAssembler
train_test_split: (data: org.apache.spark.sql.DataFrame)(org.apache.spark.sql.DataFrame, org.apache.spark.sql.DataFrame)


In [8]:
val (train, test) = train_test_split(housing_dropgeo)

train: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 7 more fields]
test: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 7 more fields]


Now, we will get to some actual machine learning. First, linear regression.

In [9]:
import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression()
    .setLabelCol("lastsoldprice")
    .setFeaturesCol("features")

val lrModel = lr.fit(train)
val predictions = lrModel.transform(test)

import org.apache.spark.ml.evaluation.RegressionEvaluator

val rmse = new RegressionEvaluator()
  .setLabelCol("lastsoldprice")
  .setPredictionCol("prediction")
  .setMetricName("rmse")

val r2 = new RegressionEvaluator()
  .setLabelCol("lastsoldprice")
  .setPredictionCol("prediction")
  .setMetricName("r2")

println("Root Mean Squared Error (RMSE) on test data = " + rmse.evaluate(predictions))
println("R^2 on test data = " + r2.evaluate(predictions))

Root Mean Squared Error (RMSE) on test data = 857356.2890199891
R^2 on test data = 0.31933500943383086


import org.apache.spark.ml.regression.LinearRegression
lr: org.apache.spark.ml.regression.LinearRegression = linReg_9aed1af51530
lrModel: org.apache.spark.ml.regression.LinearRegressionModel = linReg_9aed1af51530
predictions: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 8 more fields]
import org.apache.spark.ml.evaluation.RegressionEvaluator
rmse: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_441bb31d3542
r2: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_761a46688c27


With that, we will do the same again but with a reusable method.

In [10]:
import org.apache.spark.ml.Predictor
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.LinearRegression

def train_eval[R <: Predictor[Vector, R, M],
               M <: PredictionModel[Vector, M]](
    predictor: Predictor[Vector, R, M],
    paramMap: Array[ParamMap],
    train: DataFrame, 
    test: DataFrame) = {

    val cv = new CrossValidator()
      .setEstimator( predictor    
                    .setLabelCol("lastsoldprice")
                    .setFeaturesCol("features"))
      .setEvaluator(new RegressionEvaluator()
          .setLabelCol("lastsoldprice")
          .setPredictionCol("prediction")
          .setMetricName("rmse"))
      .setEstimatorParamMaps(paramMap)
      .setNumFolds(5)
      .setParallelism(2)

    val cvModel = cv.fit(train)
    val predictions = cvModel.transform(test)
    
    println("Root Mean Squared Error (RMSE) on test data = " + rmse.evaluate(predictions))
    println("R^2 on test data = " + r2.evaluate(predictions))

    val bestModel = cvModel.bestModel
    
    println(bestModel.extractParamMap)
    
    bestModel
}

val lr = new LinearRegression()

val lrParamMap = new ParamGridBuilder()
    .addGrid(lr.regParam, Array(10, 1, 0.1, 0.01, 0.001))
    .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
    .addGrid(lr.maxIter, Array(10000, 250000))
    .build()

train_eval(lr, lrParamMap, train, test)

Root Mean Squared Error (RMSE) on test data = 857348.5768801493
R^2 on test data = 0.31934725489128823
{
	linReg_359b4e462342-aggregationDepth: 2,
	linReg_359b4e462342-elasticNetParam: 1.0,
	linReg_359b4e462342-epsilon: 1.35,
	linReg_359b4e462342-featuresCol: features,
	linReg_359b4e462342-fitIntercept: true,
	linReg_359b4e462342-labelCol: lastsoldprice,
	linReg_359b4e462342-loss: squaredError,
	linReg_359b4e462342-maxIter: 10000,
	linReg_359b4e462342-predictionCol: prediction,
	linReg_359b4e462342-regParam: 10.0,
	linReg_359b4e462342-solver: auto,
	linReg_359b4e462342-standardization: true,
	linReg_359b4e462342-tol: 1.0E-6
}


import org.apache.spark.ml.Predictor
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.regression.LinearRegression
train_eval: [R <: org.apache.spark.ml.Predictor[org.apache.spark.ml.linalg.Vector,R,M], M <: org.apache.spark.ml.PredictionModel[org.apache.spark.ml.linalg.Vector,M]](predictor: org.apache.spark.ml.Predictor[org.apache.spark.ml.linalg.Vector,R,M], paramMap: Array[org.apache.spark.ml.param.ParamMap], train: org.apache.spark.sql.DataFrame, test: org.apache.spark.sql.DataFrame)org.apache.spark.ml.Model[_]
lr: org.apache.spark.ml.regression.LinearRegression = linReg_359b4e462342
lrParamMap: Array[org.apache.s...

No we can try again with different machine learning algorithms.

In [11]:
import org.apache.spark.ml.regression.DecisionTreeRegressor

val decisionTree = new DecisionTreeRegressor()
val dtParamMap = new ParamGridBuilder().build()
train_eval(decisionTree, dtParamMap, train, test)

Root Mean Squared Error (RMSE) on test data = 759685.8395738212
R^2 on test data = 0.46558480196241925
{
	dtr_c14d3cc21045-cacheNodeIds: false,
	dtr_c14d3cc21045-checkpointInterval: 10,
	dtr_c14d3cc21045-featuresCol: features,
	dtr_c14d3cc21045-impurity: variance,
	dtr_c14d3cc21045-labelCol: lastsoldprice,
	dtr_c14d3cc21045-maxBins: 32,
	dtr_c14d3cc21045-maxDepth: 5,
	dtr_c14d3cc21045-maxMemoryInMB: 256,
	dtr_c14d3cc21045-minInfoGain: 0.0,
	dtr_c14d3cc21045-minInstancesPerNode: 1,
	dtr_c14d3cc21045-predictionCol: prediction,
	dtr_c14d3cc21045-seed: 926680331
}


import org.apache.spark.ml.regression.DecisionTreeRegressor
decisionTree: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_c14d3cc21045
dtParamMap: Array[org.apache.spark.ml.param.ParamMap] =
Array({

})
res3: org.apache.spark.ml.Model[_] = DecisionTreeRegressionModel (uid=dtr_c14d3cc21045) of depth 5 with 63 nodes


In [12]:
import org.apache.spark.ml.regression.RandomForestRegressor

val randomForest = new RandomForestRegressor()

val rfParamMap = new ParamGridBuilder()
    .addGrid(randomForest.maxBins, Array(4, 16, 32, 64))
    .addGrid(randomForest.numTrees, Array(1, 10, 100))
    .addGrid(randomForest.maxDepth, Array(2, 5, 10))
    .build()

train_eval(randomForest, rfParamMap, train, test)

Root Mean Squared Error (RMSE) on test data = 647133.830611256
R^2 on test data = 0.6122079099308858
{
	rfr_162d5da97e7e-cacheNodeIds: false,
	rfr_162d5da97e7e-checkpointInterval: 10,
	rfr_162d5da97e7e-featureSubsetStrategy: auto,
	rfr_162d5da97e7e-featuresCol: features,
	rfr_162d5da97e7e-impurity: variance,
	rfr_162d5da97e7e-labelCol: lastsoldprice,
	rfr_162d5da97e7e-maxBins: 64,
	rfr_162d5da97e7e-maxDepth: 10,
	rfr_162d5da97e7e-maxMemoryInMB: 256,
	rfr_162d5da97e7e-minInfoGain: 0.0,
	rfr_162d5da97e7e-minInstancesPerNode: 1,
	rfr_162d5da97e7e-numTrees: 100,
	rfr_162d5da97e7e-predictionCol: prediction,
	rfr_162d5da97e7e-seed: 235498149,
	rfr_162d5da97e7e-subsamplingRate: 1.0
}


import org.apache.spark.ml.regression.RandomForestRegressor
randomForest: org.apache.spark.ml.regression.RandomForestRegressor = rfr_162d5da97e7e
rfParamMap: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfr_162d5da97e7e-maxBins: 4,
	rfr_162d5da97e7e-maxDepth: 2,
	rfr_162d5da97e7e-numTrees: 1
}, {
	rfr_162d5da97e7e-maxBins: 16,
	rfr_162d5da97e7e-maxDepth: 2,
	rfr_162d5da97e7e-numTrees: 1
}, {
	rfr_162d5da97e7e-maxBins: 32,
	rfr_162d5da97e7e-maxDepth: 2,
	rfr_162d5da97e7e-numTrees: 1
}, {
	rfr_162d5da97e7e-maxBins: 64,
	rfr_162d5da97e7e-maxDepth: 2,
	rfr_162d5da97e7e-numTrees: 1
}, {
	rfr_162d5da97e7e-maxBins: 4,
	rfr_162d5da97e7e-maxDepth: 2,
	rfr_162d5da97e7e-numTrees: 10
}, {
	rfr_162d5da97e7e-maxBins: 16,
	rfr_162d5da97e7e-maxDepth: 2,
	rfr_162d5da97e7e-numTrees: 10
}, {
	rfr_...

In [13]:
import org.apache.spark.ml.regression.GBTRegressor

val gradientBoost = new GBTRegressor()

val gbParamMap = new ParamGridBuilder()
    .addGrid(randomForest.maxBins, Array(16, 32))
    .addGrid(randomForest.numTrees, Array(5, 10, 100))
    .addGrid(randomForest.maxDepth, Array(5, 10))
    .addGrid(randomForest.minInfoGain, Array(0.0, 0.1, 0.5))
    .build()

train_eval(gradientBoost, gbParamMap, train, test)

Root Mean Squared Error (RMSE) on test data = 703037.6456894034
R^2 on test data = 0.5423137139558296
{
	gbtr_4a77c4a76717-cacheNodeIds: false,
	gbtr_4a77c4a76717-checkpointInterval: 10,
	gbtr_4a77c4a76717-featureSubsetStrategy: all,
	gbtr_4a77c4a76717-featuresCol: features,
	gbtr_4a77c4a76717-impurity: variance,
	gbtr_4a77c4a76717-labelCol: lastsoldprice,
	gbtr_4a77c4a76717-lossType: squared,
	gbtr_4a77c4a76717-maxBins: 32,
	gbtr_4a77c4a76717-maxDepth: 5,
	gbtr_4a77c4a76717-maxIter: 20,
	gbtr_4a77c4a76717-maxMemoryInMB: 256,
	gbtr_4a77c4a76717-minInfoGain: 0.0,
	gbtr_4a77c4a76717-minInstancesPerNode: 1,
	gbtr_4a77c4a76717-predictionCol: prediction,
	gbtr_4a77c4a76717-seed: -131597770,
	gbtr_4a77c4a76717-stepSize: 0.1,
	gbtr_4a77c4a76717-subsamplingRate: 1.0,
	gbtr_4a77c4a76717-validationTol: 0.01
}


import org.apache.spark.ml.regression.GBTRegressor
gradientBoost: org.apache.spark.ml.regression.GBTRegressor = gbtr_4a77c4a76717
gbParamMap: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfr_162d5da97e7e-maxBins: 16,
	rfr_162d5da97e7e-maxDepth: 5,
	rfr_162d5da97e7e-minInfoGain: 0.0,
	rfr_162d5da97e7e-numTrees: 5
}, {
	rfr_162d5da97e7e-maxBins: 32,
	rfr_162d5da97e7e-maxDepth: 5,
	rfr_162d5da97e7e-minInfoGain: 0.0,
	rfr_162d5da97e7e-numTrees: 5
}, {
	rfr_162d5da97e7e-maxBins: 16,
	rfr_162d5da97e7e-maxDepth: 5,
	rfr_162d5da97e7e-minInfoGain: 0.1,
	rfr_162d5da97e7e-numTrees: 5
}, {
	rfr_162d5da97e7e-maxBins: 32,
	rfr_162d5da97e7e-maxDepth: 5,
	rfr_162d5da97e7e-minInfoGain: 0.1,
	rfr_162d5da97e7e-numTrees: 5
}, {
	rfr_162d5da97e7e-maxBins: 16,
	rfr_162d5da97e7e-maxDepth: 5,
	rfr_162d...

If we want better results, we will have to bring in the neighborhood data as we did in the scitket learn example.

In [14]:
val housing_neighborhood = drop_geog(housing_dateint, List("neighborhood"))

housing_neighborhood: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 7 more fields]


In [15]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer().setInputCol("neighborhood").setOutputCol("neighborhoodIndex")

val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array(indexer.getOutputCol))
  .setOutputCols(Array("neighborhoodVector"))

val pipeline = new Pipeline().setStages(Array(indexer, encoder))

val housingEncoded = pipeline.fit(housing_neighborhood).transform(housing_neighborhood)
.drop("neighborhoodIndex")
.drop("neighborhood")

housingEncoded.show

+---+---------+--------+------------+-------------+----------+---------+---------------+------------------+
|_c0|bathrooms|bedrooms|finishedsqft|lastsoldprice|totalrooms|yearbuilt|lastsolddateint|neighborhoodVector|
+---+---------+--------+------------+-------------+----------+---------+---------------+------------------+
|  2|      2.0|     2.0|      1043.0|    1300000.0|       4.0|   2007.0|     1455667200|    (70,[1],[1.0])|
|  5|      1.0|     1.0|       903.0|     750000.0|       3.0|   2004.0|     1455667200|    (70,[1],[1.0])|
|  7|      4.0|     3.0|      1425.0|    1495000.0|       6.0|   2003.0|     1455667200|    (70,[8],[1.0])|
|  9|      3.0|     3.0|      2231.0|    2700000.0|      10.0|   1927.0|     1455667200|    (70,[8],[1.0])|
| 11|      3.0|     3.0|      1300.0|    1530000.0|       4.0|   1900.0|     1455667200|    (70,[2],[1.0])|
| 12|      1.0|     2.0|      1250.0|     460000.0|       5.0|   1924.0|     1455667200|   (70,[34],[1.0])|
| 13|      1.0|     3.0|    

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.StringIndexer
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_2f7196545152
encoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_592c053827bf
pipeline: org.apache.spark.ml.Pipeline = pipeline_920d33d8cbee
housingEncoded: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 7 more fields]


Now we can create a new train and test set, and try again with the same algorithms.

In [16]:
val (train_neighborhood, test_neighborhood) = train_test_split(housingEncoded)

train_neighborhood: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 8 more fields]
test_neighborhood: org.apache.spark.sql.DataFrame = [_c0: int, bathrooms: double ... 8 more fields]


In [17]:
train_eval(lr, lrParamMap, train_neighborhood, test_neighborhood)

Root Mean Squared Error (RMSE) on test data = 754869.9632285038
R^2 on test data = 0.4723389619596349
{
	linReg_359b4e462342-aggregationDepth: 2,
	linReg_359b4e462342-elasticNetParam: 0.0,
	linReg_359b4e462342-epsilon: 1.35,
	linReg_359b4e462342-featuresCol: features,
	linReg_359b4e462342-fitIntercept: true,
	linReg_359b4e462342-labelCol: lastsoldprice,
	linReg_359b4e462342-loss: squaredError,
	linReg_359b4e462342-maxIter: 10000,
	linReg_359b4e462342-predictionCol: prediction,
	linReg_359b4e462342-regParam: 10.0,
	linReg_359b4e462342-solver: auto,
	linReg_359b4e462342-standardization: true,
	linReg_359b4e462342-tol: 1.0E-6
}


res7: org.apache.spark.ml.Model[_] = linReg_359b4e462342


In [18]:
train_eval(decisionTree, dtParamMap, train_neighborhood, test_neighborhood)

Root Mean Squared Error (RMSE) on test data = 722171.2606321493
R^2 on test data = 0.5170622654844328
{
	dtr_c14d3cc21045-cacheNodeIds: false,
	dtr_c14d3cc21045-checkpointInterval: 10,
	dtr_c14d3cc21045-featuresCol: features,
	dtr_c14d3cc21045-impurity: variance,
	dtr_c14d3cc21045-labelCol: lastsoldprice,
	dtr_c14d3cc21045-maxBins: 32,
	dtr_c14d3cc21045-maxDepth: 5,
	dtr_c14d3cc21045-maxMemoryInMB: 256,
	dtr_c14d3cc21045-minInfoGain: 0.0,
	dtr_c14d3cc21045-minInstancesPerNode: 1,
	dtr_c14d3cc21045-predictionCol: prediction,
	dtr_c14d3cc21045-seed: 926680331
}


res8: org.apache.spark.ml.Model[_] = DecisionTreeRegressionModel (uid=dtr_c14d3cc21045) of depth 5 with 63 nodes


In [19]:
train_eval(randomForest, rfParamMap, train_neighborhood, test_neighborhood)

Root Mean Squared Error (RMSE) on test data = 581188.983582857
R^2 on test data = 0.6872153115815951
{
	rfr_162d5da97e7e-cacheNodeIds: false,
	rfr_162d5da97e7e-checkpointInterval: 10,
	rfr_162d5da97e7e-featureSubsetStrategy: auto,
	rfr_162d5da97e7e-featuresCol: features,
	rfr_162d5da97e7e-impurity: variance,
	rfr_162d5da97e7e-labelCol: lastsoldprice,
	rfr_162d5da97e7e-maxBins: 64,
	rfr_162d5da97e7e-maxDepth: 10,
	rfr_162d5da97e7e-maxMemoryInMB: 256,
	rfr_162d5da97e7e-minInfoGain: 0.0,
	rfr_162d5da97e7e-minInstancesPerNode: 1,
	rfr_162d5da97e7e-numTrees: 100,
	rfr_162d5da97e7e-predictionCol: prediction,
	rfr_162d5da97e7e-seed: 235498149,
	rfr_162d5da97e7e-subsamplingRate: 1.0
}


res9: org.apache.spark.ml.Model[_] = RandomForestRegressionModel (uid=rfr_162d5da97e7e) with 100 trees


In [20]:
train_eval(gradientBoost, gbParamMap, train_neighborhood, test_neighborhood)

Root Mean Squared Error (RMSE) on test data = 636055.9695573623
R^2 on test data = 0.6253709908240936
{
	gbtr_4a77c4a76717-cacheNodeIds: false,
	gbtr_4a77c4a76717-checkpointInterval: 10,
	gbtr_4a77c4a76717-featureSubsetStrategy: all,
	gbtr_4a77c4a76717-featuresCol: features,
	gbtr_4a77c4a76717-impurity: variance,
	gbtr_4a77c4a76717-labelCol: lastsoldprice,
	gbtr_4a77c4a76717-lossType: squared,
	gbtr_4a77c4a76717-maxBins: 32,
	gbtr_4a77c4a76717-maxDepth: 5,
	gbtr_4a77c4a76717-maxIter: 20,
	gbtr_4a77c4a76717-maxMemoryInMB: 256,
	gbtr_4a77c4a76717-minInfoGain: 0.0,
	gbtr_4a77c4a76717-minInstancesPerNode: 1,
	gbtr_4a77c4a76717-predictionCol: prediction,
	gbtr_4a77c4a76717-seed: -131597770,
	gbtr_4a77c4a76717-stepSize: 0.1,
	gbtr_4a77c4a76717-subsamplingRate: 1.0,
	gbtr_4a77c4a76717-validationTol: 0.01
}


res10: org.apache.spark.ml.Model[_] = GBTRegressionModel (uid=gbtr_4a77c4a76717) with 20 trees
