# Predicting Boston Housing Prices

## Getting Started

Our target with this project is to evaluate the performance and predictive power of a model that has been trained and tested on data collected from homes in suburbs of Boston, Massachusetts. A model trained on this data that is seen as a good fit could then be used to make certain predictions about a home — in particular, its monetary value. This model would prove to be invaluable for someone like a real estate agent who could make use of such information on a daily basis.

The dataset for this project originates from the UCI Machine Learning Repository. The Boston housing data was collected in 1978 and each of the 506 entries represent aggregated data about 14 features for homes from various suburbs in Boston, Massachusetts. For the purposes of this project, the following preprocessing steps have been made to the dataset:

  - 16 data points have an 'MEDV' value of 50.0. These data points likely contain missing or censored values and have been removed.
  - 1 data point has an 'RM' value of 8.78. This data point can be considered an outlier and has been removed.
  - The features 'RM', 'LSTAT', 'PTRATIO', and 'MEDV' are essential. The remaining non-relevant features have been excluded.
  - The feature 'MEDV' has been multiplicatively scaled to account for 35 years of market inflation.

### Importing libraries

In otder to download dependencies directly into the notebook we need to make some basic translation from the usual SBT way. 

Basically, if our dependency is 
```
libraryDependencies += "org.jupyter-scala" %% "spark" % "0.4.2"
``` 
in SBT, then in Notebook format it would be 
```
import $ivy.`org.jupyter-scala::spark:0.4.2`
```

The main differences are:
 
  - %% is replaced with ::
  - % is replaced with :
  - We enclose the dependency name in backticks (`)
  - We prepend the dependency name with $ivy.
  
### Using Jupyter Scala Spark session

In order to use Spark in a notebook, we must only use SparkSessions from the org.jupyter-scala::spark library becayse they are aware of the kernel.

In [1]:
import $ivy.`org.jupyter-scala::spark:0.4.2`

[32mimport [39m[36m$ivy.$                               [39m

Let's load the data and import a few of the libraries we'll need:

In [2]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $ivy.`org.apache.spark::spark-core:2.2.0`
import $ivy.`org.apache.spark:spark-sql_2.11:2.2.0`
import $ivy.`org.apache.spark::spark-mllib:2.2.0`

import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._

val sparkSession = JupyterSparkSession.builder()
  .jupyter()
  .master("local")
  .appName("housing")
  .getOrCreate()

// import sparkSession.sqlContext.implicits._

def getData(path: String) =
    sparkSession.read
        .option("header", "true")
        .option("inferSchema", "true")
        .format("csv")
        .load(path)

val housingData = getData("../resources/housing.csv")

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[32mimport [39m[36m$exclude.$                        , $ivy.$                            // for cleaner logs
[39m
[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                      
[39m
[32mimport [39m[36m$ivy.$                                    

[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36mjupyter.spark.session._

[39m
[36msparkSession[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mSparkSession[39m = org.apache.spark.sql.SparkSession@25a9a694
defined [32mfunction[39m [36mgetData[39m
[36mhousingData[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mpackage[39m.[32mDataFrame[39m = [RM: double, LSTAT: double ... 2 more fields]

In [3]:
housingData.printSchema()

root
 |-- RM: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- MEDV: double (nullable = true)



In [5]:
val prices = housingData.select("MEDV")
val features = housingData.drop("MEDV")

println(s"Boston housing dataset has ${housingData.count()} data points with ${features.columns.length} variables each.")

Boston housing dataset has 489 data points with 3 variables each.


[36mprices[39m: [32mDataFrame[39m = [MEDV: double]
[36mfeatures[39m: [32mDataFrame[39m = [RM: double, LSTAT: double ... 1 more field]

## Data Exploration

Since the main goal of this project is to construct a working model which has the capability of predicting the value of houses, we will need to separate the dataset into **features** and the **target variable**. The features, 'RM', 'LSTAT', and 'PTRATIO', give us quantitative information about each data point. The **target variable**, 'MEDV', will be the variable we seek to predict. These are stored in `features` and `prices`, respectively.

### Calculating Statistics

In order to get familiarized with the data, we'll calculate a set of descriptive statistics. These statistics will be extremely important later on to analyze various prediction results from the constructed model.

In particular, we will calculate the minimum, maximum, mean, and standard deviation of 'MEDV', which is stored in `prices`.

In [6]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

// Compute descriptive stats.

val Row(minValue: Double, maxValue: Double, meanValue: Double, stdValue: Double) = prices.agg(min("MEDV"), max("MEDV"), mean("MEDV"), stddev("MEDV")).head

val formatter = java.text.NumberFormat.getIntegerInstance

// Show the calculated statistics
println("Statistics for the Boston housing dataset: \n")
println(s"Minimum price: ${formatter.format(minValue)}")
println(s"Maximum price: ${formatter.format(maxValue)}")
println(s"Mean price: ${formatter.format(meanValue)}")
println(s"Standard deviation of prices: ${formatter.format(stdValue)}")


Statistics for the Boston housing dataset: 

Minimum price: 105,000
Maximum price: 1,024,800
Mean price: 454,343
Standard deviation of prices: 165,340


[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.Row

// Compute descriptive stats.

[39m
[36mminValue[39m: [32mDouble[39m = [32m105000.0[39m
[36mmaxValue[39m: [32mDouble[39m = [32m1024800.0[39m
[36mmeanValue[39m: [32mDouble[39m = [32m454342.9447852761[39m
[36mstdValue[39m: [32mDouble[39m = [32m165340.2776526678[39m
[36mformatter[39m: [32mjava[39m.[32mtext[39m.[32mNumberFormat[39m = java.text.DecimalFormat@674dc

### Feature Observation

As a reminder, we are using three features from the Boston housing dataset: 'RM', 'LSTAT', and 'PTRATIO'. For each data point (neighborhood):

 - 'RM' is the average number of rooms among homes in the neighborhood.
 - 'LSTAT' is the percentage of homeowners in the neighborhood considered "lower class" (working poor).
 - 'PTRATIO' is the ratio of students to teachers in primary and secondary schools in the neighborhood.
 
Just by *intuition* I would think that LSTAT and PTRATIO have a bigger impact on our target feature, MEDV, because they define the "goodness" and "appeal" of a neighborhood.

More specifically, I'd say LSTAT is negative correlated with MEDV. This is, when LSTAT increases, MEDV actually decreases, because an increment of "lower class" citizens in a particular neighborhood would most likely affect its status among the Boston inhabitants.

I also think that PTRATIO is negative correlated with MEDV. So, for instance, it is sensible to think that a neighborhood with a 10 students per teacher provides a better education to its residents' children than one where every teacher has to attend 30 kids (although this assumption could perfectly be subject of a different problem :)).

Finally, I would say that RM is positively correlated with MEDV, which means that an increase in RM also increases MEDV. Although, unfortunatelly, we do not have access to the area of the house in square meters/feet, it's sensible to assume that a house with more bedrooms is also a bigger house, hence its worth would increase.

In [7]:
import $ivy.`org.vegas-viz::vegas:0.3.8`
import $ivy.`org.vegas-viz::vegas-spark:0.3.8`

import vegas._
import vegas.sparkExt._

def scatterPlot(x: String, y: String, title: String): Unit = { 
    Vegas(title, width=400, height=200)
    .withDataFrame(housingData.select(x, y))
    .mark(Point)
    .encodeX(x, Quantitative)
    .encodeY(y, Quantitative)
    .show
}

[32mimport [39m[36m$ivy.$                           
[39m
[32mimport [39m[36m$ivy.$                                 

[39m
[32mimport [39m[36mvegas._
[39m
[32mimport [39m[36mvegas.sparkExt._[39m

In [23]:
scatterPlot("RM", "MEDV", "Prices vs. RM")

defined [32mfunction[39m [36mscatterPlot[39m

In [24]:
scatterPlot("LSTAT", "MEDV", "Prices vs. LSTAT")

In [25]:
scatterPlot("PTRATIO", "MEDV", "Prices vs. PTRATIO")

It seems that RM and prices are, in fact, positively correlated. Also, LSTAT are definitely negatively correlated. However, the points in the last plot are very scattered, so it seems that PTRATIO and prices share a very low correlation factor (if any).

## Developing a Model

Let's now develop the tools and techniques necessary for a model to make a prediction. Being able to make accurate evaluations of each model's performance through the use of these tools and techniques helps to greatly reinforce the confidence in our predictions.

### Defining a Performance Metric
It is difficult to measure the quality of a given model without quantifying its performance over training and testing. This is typically done using some type of performance metric, whether it is through calculating some type of error, the goodness of fit, or some other useful measurement. For this project, we will be calculating the [*coefficient of determination*](http://stattrek.com/statistics/dictionary.aspx?definition=coefficient_of_determination), R<sup>2</sup>, to quantify our model's performance. The coefficient of determination for a model is a useful statistic in regression analysis, as it often describes how "good" that model is at making predictions. 

The values for R<sup>2</sup> range from 0 to 1, which captures the percentage of squared correlation between the predicted and actual values of the **target variable**. A model with an R<sup>2</sup> of 0 is no better than a model that always predicts the *mean* of the target variable, whereas a model with an R<sup>2</sup> of 1 perfectly predicts the target variable. Any value between 0 and 1 indicates what percentage of the target variable, using this model, can be explained by the **features**. _A model can be given a negative R<sup>2</sup> as well, which indicates that the model is **arbitrarily worse** than one that always predicts the mean of the target variable._

### Shuffle and Split Data
Let's now take the Boston housing dataset and split the data into training and testing subsets. Typically, the data is also shuffled into a random order when creating the training and testing subsets to remove any bias in the ordering of the dataset.

In [28]:
val Array(trainingData, testData) = housingData.withColumnRenamed("MEDV", "label").randomSplit(Array(0.8, 0.2), seed = 42)

[36mtrainingData[39m: [32mDataset[39m[[32mRow[39m] = [RM: double, LSTAT: double ... 2 more fields]
[36mtestData[39m: [32mDataset[39m[[32mRow[39m] = [RM: double, LSTAT: double ... 2 more fields]

### Benefit of Training and Testing

Splitting our data into training and testing chunks allow us to separate a portion of it for measuring the performance of our model in an unseen dataset. This way we can detect if our model is underfitting (its performance on the training set is poor) or overfitting to the training set (its performance on the test set is considerably worse than in the training set).

### Selecting a Model

We'll use a simple model called Decision Tree, which is handy for both regression and classification.

In [30]:
import org.apache.spark.ml.regression.DecisionTreeRegressor

val regressor = new DecisionTreeRegressor()

[32mimport [39m[36morg.apache.spark.ml.regression.DecisionTreeRegressor

[39m
[36mregressor[39m: [32mml[39m.[32mregression[39m.[32mDecisionTreeRegressor[39m = dtr_eb04825c8fee


## Evaluating Model Performance
In this final section of the project, we will construct a model and make a prediction on the client's feature set using an optimized model using grid search.

###  Grid Search

One of the hardest task in machine learning is selecting a good model, and after that, tuning it to achieve its optimal state! Given that some algorithms may have more knobs to tune, the search space is remarkably complex to apply a guess-and-check method. So, in order to automate this task, we can define a grid of parameters we would like to explore, and finally we pick the combination that provides the best results. An example of a grid search for a decision tree might be:

{'max_depth': (1, 2, 5, 10), 'impurity': ('variance')}

This means that our grid search algorithm will train a DecisionTree with parameters max_depth=1 and impurity=variance, and then a DecisionTree with max_depth=2 and impurity=variance and so on until it has explored all the possible parameter combinations.

In [33]:
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

val paramGrid = new ParamGridBuilder()
    .addGrid(regressor.impurity, Array("variance"))
    .addGrid(regressor.maxDepth, Array(1, 2, 5, 10))
    .build()

[32mimport [39m[36morg.apache.spark.ml.evaluation.RegressionEvaluator
[39m
[32mimport [39m[36morg.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

[39m
[36mparamGrid[39m: [32mArray[39m[[32mml[39m.[32mparam[39m.[32mParamMap[39m] = [33mArray[39m(
  {
	dtr_eb04825c8fee-impurity: variance,
	dtr_eb04825c8fee-maxDepth: 1
},
  {
	dtr_eb04825c8fee-impurity: variance,
	dtr_eb04825c8fee-maxDepth: 2
},
  {
	dtr_eb04825c8fee-impurity: variance,
	dtr_eb04825c8fee-maxDepth: 5
[33m...[39m

### Fitting a Model

In [34]:
import org.apache.spark.ml.feature.VectorAssembler

val featureColumnsNames = features.columns.toArray

val assembler = new VectorAssembler()
    .setInputCols(featureColumnsNames)
    .setOutputCol("features")

import org.apache.spark.ml.Pipeline
  
val pipeline = new Pipeline().setStages(Array(assembler, regressor))

import org.apache.spark.ml.evaluation.RegressionEvaluator
                                        
val trainValidationSplit = new TrainValidationSplit()
    .setEstimator(pipeline)
    .setEvaluator(new RegressionEvaluator())
    .setEstimatorParamMaps(paramGrid)
    .setTrainRatio(0.8)

[32mimport [39m[36morg.apache.spark.ml.feature.VectorAssembler

[39m
[36mfeatureColumnsNames[39m: [32mArray[39m[[32mString[39m] = [33mArray[39m([32m"RM"[39m, [32m"LSTAT"[39m, [32m"PTRATIO"[39m)
[36massembler[39m: [32mVectorAssembler[39m = vecAssembler_ff1b2d383d74
[32mimport [39m[36morg.apache.spark.ml.Pipeline
  
[39m
[36mpipeline[39m: [32mml[39m.[32mPipeline[39m = pipeline_f51410372687
[32mimport [39m[36morg.apache.spark.ml.evaluation.RegressionEvaluator
                                        
[39m
[36mtrainValidationSplit[39m: [32mTrainValidationSplit[39m = tvs_81cd173cc0a3

### Making Predictions

Once a model has been trained on a given set of data, it can now be used to make predictions on new sets of input data. In the case of a *decision tree regressor*, the model has learned *what the best questions to ask about the input data are*, and can respond with a prediction for the **target variable**.

In [35]:
val model = trainValidationSplit.fit(trainingData)
val results = model.transform(testData)

[36mmodel[39m: [32mml[39m.[32mtuning[39m.[32mTrainValidationSplitModel[39m = tvs_81cd173cc0a3
[36mresults[39m: [32mDataFrame[39m = [RM: double, LSTAT: double ... 4 more fields]

In [37]:
import org.apache.spark.mllib.evaluation.RegressionMetrics

results.printSchema()

import sparkSession.implicits._

val predictionAndObservations = results.select(results("prediction"), results("label")).as[(Double, Double)].rdd
val metrics = new RegressionMetrics(predictionAndObservations)
  
println(metrics.r2)

root
 |-- RM: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)

0.831346213577439


[32mimport [39m[36morg.apache.spark.mllib.evaluation.RegressionMetrics

[39m
[32mimport [39m[36msparkSession.implicits._
[39m
[36mpredictionAndObservations[39m: [32mrdd[39m.[32mRDD[39m[([32mDouble[39m, [32mDouble[39m)] = MapPartitionsRDD[353] at rdd at cmd36.sc:6
[36mmetrics[39m: [32mRegressionMetrics[39m = org.apache.spark.mllib.evaluation.RegressionMetrics@6f2f52b2

We can see that our model achieved an R<sup>2</sup> score of 0.8313, which means that roughly the 83.13% of the variation in the housing prices are explained by our selected features.

### Predicting Selling Prices
Let's imagine for a second that we are a real estate agent in the Boston area looking to use this model to help price homes owned by our clients that they wish to sell. We have collected the following information from three of our clients:

| Feature | Client 1 | Client 2 | Client 3 |
| :---: | :---: | :---: | :---: |
| Total number of rooms in home | 5 rooms | 4 rooms | 8 rooms |
| Neighborhood poverty level (as %) | 17% | 32% | 3% |
| Student-teacher ratio of nearby schools | 15-to-1 | 22-to-1 | 12-to-1 |

* What price would we recommend each client sell his/her home at? 
* Do these prices seem reasonable given the values for the respective features? 

In [38]:
// Produce a matrix for client data
val clientData = Array(Row(5, 17, 15), // Client 1
                       Row(4, 32, 22), // Client 2
                       Row(8, 3, 12))  // Client 3

[36mclientData[39m: [32mArray[39m[[32mRow[39m] = [33mArray[39m([5,17,15], [4,32,22], [8,3,12])

In [38]:
model.transform(clientData)

cmd38.sc:1: type mismatch;
 found   : Array[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.Dataset[_]
val res38 = model.transform(clientData)
                            ^

: 