# Predicting Housing Prices with Apache Spark (Scala) - ML

#### Machine Learning Workflows
1) ETL and analyzing historical data in order to extract the significant features and label; <br>
2) Training, testing and evaluating the results of ML algorithms to build a model; <br>
3) Using the model in production with new data to make predictions; <br>
4) Model monitoring and model updating with new data

#### Dataset
>These spatial data contain 20,640 observations on housing prices with 9 economic covariates. It appeared in Pace and Barry (1997), "Sparse Spatial Autoregressions", Statistics and Probability Letters. Submitted by Kelley Pace (kpace@unix1.sncc.lsu.edu). [9/Nov/99] (536 kbytes) 


# 1. Import librarys

In [1]:
import org.apache.spark._
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.Pipeline
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

Intitializing Scala interpreter ...

Spark Web UI available at http://192.168.1.104:4042
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1591566543549)
SparkSession available as 'spark'


import org.apache.spark._
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression._
import org.apache.spark.ml.evaluation._
import org.apache.spark.ml.tuning._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.Pipeline
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession


# 2. Read Data

In [2]:
var file = "../dataset/housing.csv"

file: String = ../dataset/housing.csv


In [3]:
var df = spark.read.format("csv").option("inferSchema", "true").option("header", true).load(file)

df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 8 more fields]


In [4]:
df.show(4)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
+---------+--------+--------------

In [5]:
df.describe("housing_median_age","total_rooms","population","median_house_value").show

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|        population|median_house_value|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|1425.4767441860465|206855.81690891474|
| stddev| 12.58555761211163|2181.6152515827944|  1132.46212176534|115395.61587441359|
|    min|               1.0|               2.0|               3.0|           14999.0|
|    max|              52.0|           39320.0|           35682.0|          500001.0|
+-------+------------------+------------------+------------------+------------------+



# 3. Create new columns

In [6]:
df = df.withColumn("rooms_per_house", col("total_rooms")/col("households"))
df = df.withColumn("pop_per_house", col("population")/col("households"))
df = df.withColumn("bedrooms_per_room", col("total_bedrooms")/col("total_rooms"))

df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]
df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]
df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


## 3.1. Drop tables

In [7]:
df=df.drop("total_rooms","households", "population" , "totalbedrooms")

df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 8 more fields]


In [8]:
df.show(4)

+---------+--------+------------------+--------------+-------------+------------------+---------------+------------------+------------------+-------------------+
|longitude|latitude|housing_median_age|total_bedrooms|median_income|median_house_value|ocean_proximity|   rooms_per_house|     pop_per_house|  bedrooms_per_room|
+---------+--------+------------------+--------------+-------------+------------------+---------------+------------------+------------------+-------------------+
|  -122.23|   37.88|              41.0|         129.0|       8.3252|          452600.0|       NEAR BAY| 6.984126984126984|2.5555555555555554|0.14659090909090908|
|  -122.22|   37.86|              21.0|        1106.0|       8.3014|          358500.0|       NEAR BAY| 6.238137082601054| 2.109841827768014|0.15579659106916466|
|  -122.24|   37.85|              52.0|         190.0|       7.2574|          352100.0|       NEAR BAY| 8.288135593220339|2.8022598870056497|0.12951601908657123|
|  -122.25|   37.85|        

## 3.2. String to Index Ocean Proximity 

In [9]:
val indexer = new StringIndexer().setInputCol("ocean_proximity")
                                 .setOutputCol("ocean_proximity_in")

indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_6fcfb93923a4


In [10]:
var new_df = indexer.fit(df).transform(df)

new_df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 9 more fields]


In [11]:
new_df=new_df.drop("ocean_proximity")

new_df: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 8 more fields]


In [12]:
new_df.show(4)

+---------+--------+------------------+--------------+-------------+------------------+------------------+------------------+-------------------+------------------+
|longitude|latitude|housing_median_age|total_bedrooms|median_income|median_house_value|   rooms_per_house|     pop_per_house|  bedrooms_per_room|ocean_proximity_in|
+---------+--------+------------------+--------------+-------------+------------------+------------------+------------------+-------------------+------------------+
|  -122.23|   37.88|              41.0|         129.0|       8.3252|          452600.0| 6.984126984126984|2.5555555555555554|0.14659090909090908|               3.0|
|  -122.22|   37.86|              21.0|        1106.0|       8.3014|          358500.0| 6.238137082601054| 2.109841827768014|0.15579659106916466|               3.0|
|  -122.24|   37.85|              52.0|         190.0|       7.2574|          352100.0| 8.288135593220339|2.8022598870056497|0.12951601908657123|               3.0|
|  -122.25

# 4. Create Temp View

In [13]:
new_df.cache()
new_df.createOrReplaceTempView("house")

# 5. Correlation

In [14]:
new_df.select(corr("median_house_value", "median_income")).show()

+---------------------------------------+
|corr(median_house_value, median_income)|
+---------------------------------------+
|                     0.6880752079585578|
+---------------------------------------+



In [15]:
new_df.select(corr("median_house_value", "bedrooms_per_room")).show()

+-------------------------------------------+
|corr(median_house_value, bedrooms_per_room)|
+-------------------------------------------+
|                       -0.25588014941949866|
+-------------------------------------------+



In [16]:
new_df.select(corr("median_house_value", "pop_per_house")).show()

+---------------------------------------+
|corr(median_house_value, pop_per_house)|
+---------------------------------------+
|                   -0.02373741295613...|
+---------------------------------------+



In [17]:
new_df.select(corr("median_house_value", "ocean_proximity_in")).show()

+--------------------------------------------+
|corr(median_house_value, ocean_proximity_in)|
+--------------------------------------------+
|                        0.021732204251456527|
+--------------------------------------------+



# 6. Split data into train and test

In [18]:
val Array(trainingData, testData) = new_df.randomSplit(Array(0.8, 0.2), 1234)

trainingData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [longitude: double, latitude: double ... 8 more fields]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [longitude: double, latitude: double ... 8 more fields]


# 7. Feature Extraction and Pipelines

In [19]:
new_df.columns

res9: Array[String] = Array(longitude, latitude, housing_median_age, total_bedrooms, median_income, median_house_value, rooms_per_house, pop_per_house, bedrooms_per_room, ocean_proximity_in)


## 7.1. Features columns

In [20]:
val featureCols = Array("housing_median_age", "median_income", "rooms_per_house", "pop_per_house",
                        "bedrooms_per_room", "longitude", "latitude", "ocean_proximity_in")

featureCols: Array[String] = Array(housing_median_age, median_income, rooms_per_house, pop_per_house, bedrooms_per_room, longitude, latitude, ocean_proximity_in)


## 7.2. Create VectorAssembler

In [21]:
val assembler = new VectorAssembler().setHandleInvalid("skip")
                                     .setInputCols(featureCols)
                                     .setOutputCol("rawfeatures")

assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_73f60601072f


## 7.3. Create a transformer StandardScaler to standardize features

In [22]:
val scaler = new StandardScaler().setInputCol("rawfeatures")
                                 .setOutputCol("features")

scaler: org.apache.spark.ml.feature.StandardScaler = stdScal_697c75bec51a


## 7.4. Model Random Forest Regressor

In [23]:
val rf = new RandomForestRegressor().setLabelCol("median_house_value")
                                    .setFeaturesCol("features")

rf: org.apache.spark.ml.regression.RandomForestRegressor = rfr_b5c5b43f6af0


## 7.5. Create pipeline

In [24]:
val steps = Array(assembler, scaler, rf)

steps: Array[org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable{def copy(extra: org.apache.spark.ml.param.ParamMap): org.apache.spark.ml.PipelineStage with org.apache.spark.ml.util.DefaultParamsWritable}}] = Array(vecAssembler_73f60601072f, stdScal_697c75bec51a, rfr_b5c5b43f6af0)


In [25]:
val pipeline = new Pipeline().setStages(steps)

pipeline: org.apache.spark.ml.Pipeline = pipeline_ecd2f534f225


# 8. Train the model 
- cross validation
- tunning hiperparameters

## 8.1. Parameters

In [26]:
val paramGrid = new ParamGridBuilder().addGrid(rf.maxBins, Array(50, 100))
                                      .addGrid(rf.maxDepth, Array(7, 10, 20))
                                      .addGrid(rf.numTrees, Array(20, 40))
                                      .build()


paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 7,
	rfr_b5c5b43f6af0-numTrees: 20
}, {
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 7,
	rfr_b5c5b43f6af0-numTrees: 40
}, {
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 10,
	rfr_b5c5b43f6af0-numTrees: 20
}, {
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 10,
	rfr_b5c5b43f6af0-numTrees: 40
}, {
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 20,
	rfr_b5c5b43f6af0-numTrees: 20
}, {
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 20,
	rfr_b5c5b43f6af0-numTrees: 40
}, {
	rfr_b5c5b43f6af0-maxBins: 100,
	rfr_b5c5b43f6af0-maxDepth: 7,
	rfr_b5c5b43f6af0-numTrees: 20
}, {
	rfr_b5c5b43f6af0-maxBins: 100,
	rfr_b5c5b4...

## 8.2. Evaluator

In [27]:
val evaluator = new RegressionEvaluator().setLabelCol("median_house_value")
                                         .setPredictionCol("prediction")
                                         .setMetricName("rmse")

evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_fc5902f1f3d6


## 8.3. Cross validation

In [28]:
val crossvalidator = new CrossValidator().setEstimator(pipeline)
                                         .setEvaluator(evaluator)
                                         .setEstimatorParamMaps(paramGrid)
                                         .setNumFolds(3)

crossvalidator: org.apache.spark.ml.tuning.CrossValidator = cv_919a457a47f7


## 8.4. Fit Model

In [29]:
val pipelineModel = crossvalidator.fit(trainingData)

pipelineModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_919a457a47f7


## 8.5. Feature Importance

In [30]:
val featureImportances = pipelineModel.bestModel
                                      .asInstanceOf[PipelineModel]
                                      .stages(2)
                                      .asInstanceOf[RandomForestRegressionModel]
                                      .featureImportances

assembler.getInputCols.zip(featureImportances.toArray)
                      .sortBy(-_._2)
                      .foreach { case (feat, imp) => println(s"feature: $feat, importance: $imp") }

feature: median_income, importance: 0.3632059158120435
feature: pop_per_house, importance: 0.11961049174330007
feature: longitude, importance: 0.10312807196865144
feature: bedrooms_per_room, importance: 0.0987422354638934
feature: latitude, importance: 0.09767896347515746
feature: ocean_proximity_in, importance: 0.08726479259333791
feature: rooms_per_house, importance: 0.06824258298883883
feature: housing_median_age, importance: 0.06212694595477756


featureImportances: org.apache.spark.ml.linalg.Vector = (8,[0,1,2,3,4,5,6,7],[0.06212694595477756,0.3632059158120435,0.06824258298883883,0.11961049174330007,0.0987422354638934,0.10312807196865144,0.09767896347515746,0.08726479259333791])


## 8.6. Best Parameters

In [31]:
val bestEstimatorParamMap = pipelineModel.getEstimatorParamMaps
                                         .zip(pipelineModel.avgMetrics)
                                         .maxBy(_._2)
                                         ._1
println(s"Best params:\n$bestEstimatorParamMap")

Best params:
{
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 7,
	rfr_b5c5b43f6af0-numTrees: 40
}


bestEstimatorParamMap: org.apache.spark.ml.param.ParamMap =
{
	rfr_b5c5b43f6af0-maxBins: 50,
	rfr_b5c5b43f6af0-maxDepth: 7,
	rfr_b5c5b43f6af0-numTrees: 40
}


# 9. Prediction and model evaluate

In [32]:
val predictions = pipelineModel.transform(testData)

predictions: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 11 more fields]


In [33]:
predictions.select("prediction", "median_house_value").show(5)

+-----------------+------------------+
|       prediction|median_house_value|
+-----------------+------------------+
|         126910.0|           94600.0|
|         78556.25|           85800.0|
|97337.77777777778|          103600.0|
|112033.6111111111|           90100.0|
|85403.92857142857|           82800.0|
+-----------------+------------------+
only showing top 5 rows



In [34]:
val predictions_error = predictions.withColumn("error", col("prediction")-col("median_house_value"))
predictions_error.select("prediction", "median_house_value", "error").show

+-----------------+------------------+------------------+
|       prediction|median_house_value|             error|
+-----------------+------------------+------------------+
|         126910.0|           94600.0|           32310.0|
|         78556.25|           85800.0|          -7243.75|
|97337.77777777778|          103600.0|-6262.222222222219|
|112033.6111111111|           90100.0|21933.611111111095|
|85403.92857142857|           82800.0| 2603.928571428565|
|80827.41239316239|           81300.0|-472.5876068376092|
|76196.41666666666|           62500.0|13696.416666666657|
|         128485.0|          109400.0|           19085.0|
|83765.16666666666|           76900.0| 6865.166666666657|
|         87844.75|           74100.0|          13744.75|
|97743.91666666666|           80500.0|17243.916666666657|
|97232.46323529413|           66800.0|30432.463235294126|
|       117778.125|           74100.0|         43678.125|
|         134427.5|           55000.0|           79427.5|
|         1310

predictions_error: org.apache.spark.sql.DataFrame = [longitude: double, latitude: double ... 12 more fields]


In [35]:
predictions_error.describe("prediction", "median_house_value", "error").show

+-------+------------------+------------------+------------------+
|summary|        prediction|median_house_value|             error|
+-------+------------------+------------------+------------------+
|  count|              4127|              4127|              4127|
|   mean|206842.43086360718|205955.60649382116| 886.8243697859892|
| stddev|102159.15633129653| 115007.9470486048| 46584.00835777262|
|    min|           48767.5|           26900.0|-331935.5416666667|
|    max|          500001.0|          500001.0|         294377.71|
+-------+------------------+------------------+------------------+



## 9.1. MAE

In [36]:
val maevaluator = new RegressionEvaluator().setLabelCol("median_house_value")
                                           .setMetricName("mae")
val mae = maevaluator.evaluate(predictions)

maevaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_46f9c13b9c67
mae: Double = 30740.695947490512


## 9.2. RMSE

In [37]:
val evaluator = new RegressionEvaluator().setLabelCol("median_house_value")
                                         .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)

evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_3b8c63a30d26
rmse: Double = 46586.80574528031


# 10. Save model

In [38]:
pipelineModel.write.overwrite().save("modeldir")

In [39]:
val sameModel = CrossValidatorModel.load("modeldir")

sameModel: org.apache.spark.ml.tuning.CrossValidatorModel = cv_919a457a47f7
