In [1]:
import org.apache.spark.sql._

val spark = SparkSession.builder().appName("Housing-Spark").getOrCreate()
      
import spark.implicits._

# Get the data

In [3]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val customSchema = StructType(Array(
    StructField("longitude", FloatType, true),
    StructField("latitude", FloatType, true),
    StructField("housing_median_age", FloatType, true),
    StructField("total_rooms", FloatType, true),
    StructField("total_bedrooms", FloatType, true),
    StructField("population", FloatType, true),
    StructField("households", FloatType, true),
    StructField("median_income", FloatType, true),
    StructField("median_house_value", FloatType, true),
    StructField("ocean_proximity", StringType, true)
    ))

val housing = spark.read.
      option("header","true").schema(customSchema).csv("../datasets/housing/housing.csv")
            
housing.count()

20640

In [4]:
housing.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970444871473|35.63186143109965|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710030346416|206855.81690891474|           null|
| stddev|  2.0035317

In [5]:
housing.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [6]:
housing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [8]:
housing.groupBy('ocean_proximity).count().orderBy(desc("count")).show()

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



## Random Sampling 

In [7]:
val Array(random_train_df, random_test_df) = housing.randomSplit(Array(0.8, 0.2), seed=42L)
println(s"Random selection = ${random_train_df.count}, ${random_test_df.count}")

Random selection = 16549, 4091


In [10]:
val housing_income_cat = housing.withColumn("income_cat", when(ceil('median_income / 1.5) < 5, ceil('median_income / 1.5)).otherwise(5.0))
housing_income_cat.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|income_cat|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|       5.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|       5.0|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|       5.0|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|    

### Original proportion line

In [9]:
val total = housing_income_cat.count() 
val overall_distr =  housing_income_cat.groupBy($"income_cat").count().withColumn("ratio", ($"count" / total))

overall_distr.sort(desc("ratio")).show()                 

                                                                                +----------+-----+--------------------+
|income_cat|count|               ratio|
+----------+-----+--------------------+
|       3.0| 7236|  0.3505813953488372|
|       2.0| 6581|  0.3188468992248062|
|       4.0| 3639| 0.17630813953488372|
|       5.0| 2362| 0.11443798449612404|
|       1.0|  822|0.039825581395348836|
+----------+-----+--------------------+



In [10]:
val Array(random_train_temp, random_test_temp) = housing_income_cat.randomSplit(Array(0.8, 0.2), seed=42L)
println(s"Random selection = ${random_train_temp.count}, ${random_test_temp.count}")  

Random selection = 16549, 4091


##### Originally I thought I need to provide fractions by manually calcuating percent using original percentage * .8 Actually, Dataset.stat will figure it out for me.  I only need to provide percentage.  The followings just leave for reference

In [11]:
val manual_fractions = overall_distr.rdd.map {
              case Row(key: Double, c:Long, value: Double) =>
                 key -> value * 0.8
           }.collectAsMap().toMap
manual_fractions           

Map(5.0 -> 0.09155038759689924, 1.0 -> 0.03186046511627907, 2.0 -> 0.25507751937984496, 3.0 -> 0.2804651162790698, 4.0 -> 0.141046511627907)

## Tentative Stratified Sampling

In [12]:
val fractions = housing_income_cat.select($"income_cat").distinct().rdd.map {
    case Row(key: Double) =>
      key -> 0.8
}.collectAsMap.toMap

val strat_train_temp = housing_income_cat.stat.sampleBy("income_cat", fractions, 42L)
val sample_count = strat_train_temp.count()
sample_count 

16549

In [13]:
sample_count.toDouble / total

0.8017926356589147

In [14]:
val strat_test_temp = spark.createDataFrame(housing_income_cat.rdd.subtract(strat_train_temp.rdd), 
      housing_income_cat.schema)
strat_train_temp.count() + strat_test_temp.count()

20640

In [15]:
val strat_train_set = strat_train_temp.drop($"income_cat")
val strat_test_set = strat_test_temp.drop($"income_cat")

# Use function to take compare proportions

In [16]:
def income_cat_proportions(df: Dataset[Row], col: String) = {
  val tot = df.count  
  df.groupBy($"income_cat").count().withColumn(col, ($"count" / tot)).drop("count")   
}

val compare_props = income_cat_proportions(housing_income_cat, "Overall").
      join(income_cat_proportions(random_train_temp, "Random"), "income_cat").
      join(income_cat_proportions(strat_train_temp, "Stratified"), "income_cat").
      withColumn("Rand. %error", ($"Random" / $"Overall") * 100 - 100).
      withColumn("Strat. %error", ($"Stratified" / $"Overall") * 100 - 100)
      
compare_props.show()

+----------+--------------------+-------------------+-------------------+--------------------+--------------------+
|income_cat|             Overall|             Random|         Stratified|        Rand. %error|       Strat. %error|
+----------+--------------------+-------------------+-------------------+--------------------+--------------------+
|       1.0|0.039825581395348836|0.03963985739319596|0.04024412351199468| -0.4663434798583097|  1.0509378695340956|
|       4.0| 0.17630813953488372|0.17547888089914798|0.17614357362982658|-0.47034620064813737|-0.09333993625665471|
|       3.0|  0.3505813953488372|0.35180373436461415| 0.3515016013052148| 0.34866054844715677| 0.26247997429011605|
|       2.0|  0.3188468992248062| 0.3177231252643664| 0.3173605655930872| -0.3524493928502892|-0.46615903599271746|
|       5.0| 0.11443798449612404|0.11535440207867545|0.11475013595987672|  0.8007984294606842|  0.2727691029574544|
+----------+--------------------+-------------------+-------------------

In [17]:
housing.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)
 |-- ocean_proximity: string (nullable = true)



## Find out median using Spark

In [18]:
housing.createOrReplaceTempView("df")
spark.sql("select percentile_approx(longitude, 0.5) from df").show()

+------------------------------------------------------------------------+
|percentile_approx(CAST(longitude AS DOUBLE), CAST(0.5 AS DOUBLE), 10000)|
+------------------------------------------------------------------------+
|                                                     -118.48999786376953|
+------------------------------------------------------------------------+



## Tentative apply Imputer, StringIndex and OneHotEncoding

#### Spark does not require that all fields must be numeric to apply Imputer or all fields must be categorical to apply StringIndex and OneHotEncoding.  Spark allows specifying input columns and output columns 

In [21]:
val housing_value = strat_train_set.drop("median_house_value")
val housing_num = housing_value.drop("ocean_proximity")
val housing_cat = housing_value.select($"ocean_proximity")

housing_num.stat.approxQuantile(housing_num.schema.fieldNames, Array(0.5), 0.0001)

Array(Array(-118.5199966430664), Array(34.27000045776367), Array(29.0), Array(2131.0), Array(435.0), Array(1164.0), Array(410.0), Array(3.53410005569458))

In [22]:
import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer().setStrategy("median").setInputCols(Array("total_bedrooms")).setOutputCols(Array("total_bedrooms_out"))
    
val imputerModel = imputer.fit(housing_num)
val X = imputerModel.transform(housing_num)
X.filter(isnull($"total_bedrooms")).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|total_bedrooms_out|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.17|   37.75|              38.0|      992.0|          null|     732.0|     259.0|       1.6196|             435.0|
|  -122.28|   37.78|              29.0|     5154.0|          null|    3741.0|    1273.0|       2.5762|             435.0|
|  -122.24|   37.75|              45.0|      891.0|          null|     384.0|     146.0|       4.9489|             435.0|
|   -122.1|   37.69|              41.0|      746.0|          null|     387.0|     161.0|       3.9063|             435.0|
|  -122.14|   37.67|              37.0|     3342.0|          null|    1635.0|     557.0|       4.7933|             435.0|
+---------+--------+----

In [24]:
import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer().setInputCol("ocean_proximity").setOutputCol("op_index")
val indexed = indexer.fit(housing_cat).transform(housing_cat)
val op_cat_arr = indexed.distinct().select($"ocean_proximity").sort("op_index").collect()

#### OneHotEncoder will numeric index into a Spark Vector.  Spark in default use condensed form of Spark Vector: the last category is represented by all zeros.  We override it using normal form

In [25]:
import org.apache.spark.ml.feature.OneHotEncoder

val encoder = new OneHotEncoder().setInputCol("op_index").setOutputCol("ocean_proximity_out").setDropLast(false)
val encoded = encoder.transform(indexed)
encoded.distinct().show()

+---------------+--------+-------------------+
|ocean_proximity|op_index|ocean_proximity_out|
+---------------+--------+-------------------+
|      <1H OCEAN|     0.0|      (5,[0],[1.0])|
|         ISLAND|     4.0|      (5,[4],[1.0])|
|         INLAND|     1.0|      (5,[1],[1.0])|
|       NEAR BAY|     3.0|      (5,[3],[1.0])|
|     NEAR OCEAN|     2.0|      (5,[2],[1.0])|
+---------------+--------+-------------------+



In [26]:
housing_income_cat.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- income_cat: double (nullable = true)



## Pipeline combined with Imputer, StringIndexer and OneHotEncoder to fit and transform then add additional 3 fields

In [27]:
import org.apache.spark.ml.{Pipeline, PipelineModel}

val pipeline = new Pipeline().setStages(Array(imputer, indexer, encoder))
//we have to repeat exactly the same process to test set.  It's too much hassel
val pipelineModel = pipeline.fit(housing_income_cat)
val housing_temp = pipelineModel.transform(housing_income_cat).
                withColumn("room_per_household", $"total_rooms" / $"households").
                withColumn("population_per_household", $"population" / $"households").
                withColumn("bedrooms_per_room", $"total_bedrooms_out" / $"total_rooms")
                           
housing_temp.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- income_cat: double (nullable = true)
 |-- total_bedrooms_out: float (nullable = true)
 |-- op_index: double (nullable = true)
 |-- ocean_proximity_out: vector (nullable = true)
 |-- room_per_household: double (nullable = true)
 |-- population_per_household: double (nullable = true)
 |-- bedrooms_per_room: double (nullable = true)



In [25]:
housing_temp.filter(isnull($"total_bedrooms")).show(5, false)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+------------------+--------+-------------------+------------------+------------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|income_cat|total_bedrooms_out|op_index|ocean_proximity_out|room_per_household|population_per_household|bedrooms_per_room  |
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+----------+------------------+--------+-------------------+------------------+------------------------+-------------------+
|-122.16  |37.77   |47.0              |1256.0     |null          |570.0     |218.0     |4.375        |161900.0          |NEAR BAY       |3.0       |435.0             |3.0     |(5,[3],[1.0])      |5.761467889908257 |2.

## Prepare data for StandardScaler that only accept Vector as input

### We use median_house_value as label.  Features covers all numeric fields.  OP is a vector for categorical data.   Income_cat is for spliting data in stratified way.   Manual manipulation only works for limited features

In [28]:
import org.apache.spark.ml.linalg._
val housing_temp2 = housing_temp.rdd.map(r => (r.getFloat(8), Vectors.dense(
    r.getFloat(0), r.getFloat(1), r.getFloat(2), r.getFloat(3), r.getFloat(11), r.getFloat(5), r.getFloat(6), 
    r.getFloat(7), r.getDouble(14), r.getDouble(15), r.getDouble(16)), r.getAs[Vector](13), r.getDouble(10))).
    toDF("label", "features", "op", "income_cat")                                        

housing_temp2.show(5, false)

+--------+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------+
|label   |features                                                                                                                                          |op           |income_cat|
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------+-------------+----------+
|452600.0|[-122.2300033569336,37.880001068115234,41.0,880.0,129.0,322.0,126.0,8.325200080871582,6.984126984126984,2.5555555555555554,0.14659090909090908]   |(5,[3],[1.0])|5.0       |
|358500.0|[-122.22000122070312,37.86000061035156,21.0,7099.0,1106.0,2401.0,1138.0,8.301400184631348,6.238137082601054,2.109841827768014,0.15579659106916466]|(5,[3],[1.0])|5.0       |
|352100.0|[-122.23999786376953,37.849998474121094,52.0,1467.0,190.0,496.0,177.0,7.257

In [29]:
housing_temp2.count()

20640

## Apply StandardScaler to 'features' field with mean = true to center around zero

In [30]:
import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").
    setWithStd(true).setWithMean(true)
val scalerModel = scaler.fit(housing_temp2)
val scaledData = scalerModel.transform(housing_temp2)
scaledData.show(5)

+--------+--------------------+-------------+----------+--------------------+
|   label|            features|           op|income_cat|      scaledFeatures|
+--------+--------------------+-------------+----------+--------------------+
|452600.0|[-122.23000335693...|(5,[3],[1.0])|       5.0|[-1.3278047216384...|
|358500.0|[-122.22000122070...|(5,[3],[1.0])|       5.0|[-1.3228124691993...|
|352100.0|[-122.23999786376...|(5,[3],[1.0])|       5.0|[-1.3327931661046...|
|341300.0|[-122.25,37.84999...|(5,[3],[1.0])|       4.0|[-1.3377854185437...|
|342200.0|[-122.25,37.84999...|(5,[3],[1.0])|       3.0|[-1.3377854185437...|
+--------+--------------------+-------------+----------+--------------------+
only showing top 5 rows



### Combine numeric vector and categorical vector into one feature vector to prepare for data for modeling

In [31]:
val combinedData = scaledData.rdd.map(r => (r.getFloat(0), new DenseVector( 
     r.getAs[Vector](4).toArray ++ r.getAs[Vector](2).toArray), r.getDouble(3))).
     toDF("label", "features", "income_cat")
combinedData.show(5, false)     

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|label   |features                                                                                                                                                                                                                                        |income_cat|
+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|452600.0|[-1.3278047216384026,1.0525232947285934,0.9821188656747666,-0.804799599801809,-0.9724529206674336,-0.9744049915469923,-0.977009185045236,2.3447089981016496,0.6285442264151613,-0.049595334664572825,-1.0

## Stratified Sampling

In [32]:
val strat_train_temp = combinedData.stat.sampleBy("income_cat", fractions, 42L)
val strat_test_temp = spark.createDataFrame(combinedData.rdd.subtract(strat_train_temp.rdd), combinedData.schema)
strat_train_temp.count()

16549

In [33]:
val train_set = strat_train_temp.drop("income_cat")
val test_set = strat_test_temp.drop("income_cat")
train_set.show(5, false)

+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label   |features                                                                                                                                                                                                                                           |
+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|452600.0|[-1.3278047216384026,1.0525232947285934,0.9821188656747666,-0.804799599801809,-0.9724529206674336,-0.9744049915469923,-0.977009185045236,2.3447089981016496,0.6285442264151613,-0.049595334664572825,-1.0299628811021717,0.0,0.0,

## Linear Regression Model Fit, Transform and Evaluate

In [34]:
import org.apache.spark.ml.regression._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.evaluation._


val lr =  new LinearRegression().setMaxIter(10)
//Param for the ElasticNet mixing parameter, in range [0, 1]. 
//For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
//For regParam, 1 penalize model complexity and 0: training error accounts for all
val pgLr = new ParamGridBuilder().
    addGrid(lr.regParam, Array(0.001, 0.000001, 0.000000001)).
    addGrid(lr.elasticNetParam, Array(0.01, 0.001, 0.0001)).
    build()
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("label").setPredictionCol("prediction")

val cvLr = new CrossValidator().setEstimator(lr).setEvaluator(evaluator).
  setEstimatorParamMaps(pgLr).setNumFolds(5).setSeed(42L)

val lrModel = cvLr.fit(train_set)
println(lrModel.bestModel.explainParams()+"\n")

aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0, current: 0.001)
featuresCol: features column name (default: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label)
maxIter: maximum number of iterations (>= 0) (default: 100, current: 10)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0, current: 1.0E-9)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (>= 0) (default: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance

In [35]:
(lrModel.getEstimatorParamMaps zip lrModel.avgMetrics).sortBy(_._2)

Array(({
	linReg_92b756da9d32-elasticNetParam: 0.001,
	linReg_92b756da9d32-regParam: 1.0E-9
},69597.44343297838), ({
	linReg_92b756da9d32-elasticNetParam: 1.0E-4,
	linReg_92b756da9d32-regParam: 1.0E-9
},69597.44343297838), ({
	linReg_92b756da9d32-elasticNetParam: 0.01,
	linReg_92b756da9d32-regParam: 1.0E-9
},69597.4434329784), ({
	linReg_92b756da9d32-elasticNetParam: 1.0E-4,
	linReg_92b756da9d32-regParam: 1.0E-6
},69597.4434330068), ({
	linReg_92b756da9d32-elasticNetParam: 0.001,
	linReg_92b756da9d32-regParam: 1.0E-6
},69597.4434330071), ({
	linReg_92b756da9d32-elasticNetParam: 0.01,
	linReg_92b756da9d32-regParam: 1.0E-6
},69597.44343300952), ({
	linReg_92b756da9d32-elasticNetParam: 1.0E-4,
	linReg_92b756da9d32-regParam: 0.001...

In [36]:
evaluator.evaluate(lrModel.transform(train_set))

69399.03027776726

In [37]:
val lrPrediction = lrModel.transform(test_set)
lrPrediction.show(10)

+--------+--------------------+------------------+
|   label|            features|        prediction|
+--------+--------------------+------------------+
|117400.0|[0.68863496282266...|177420.43915487675|
|320000.0|[-1.1630946629040...|287508.88277954864|
|113900.0|[0.31928824975972...|140922.81431683444|
| 83100.0|[-1.3527583678881...| 94289.23368225707|
|187600.0|[0.19450859459237...|230204.37301735595|
| 91300.0|[-0.9484735036972...|139929.35129356917|
|110200.0|[0.65868906413382...|175891.94619083253|
|308700.0|[-1.2529361669434...|  296036.337390525|
|203800.0|[0.53390940896647...|170068.96368219485|
|500001.0|[0.60378571122235...| 277227.8018432234|
+--------+--------------------+------------------+
only showing top 10 rows



In [38]:
evaluator.evaluate(lrPrediction)

68279.11552227607

### The above is NOT a case of overfitting.  When a model is overfitting, the metric evaluated on test set is much worser than one on traing set. 

## Decision Tree Regression Model Fit, Transform and Evaluate

In [67]:
val dtr =  new DecisionTreeRegressor().setMaxMemoryInMB(512).setCacheNodeIds(true)
val pgDtr = new ParamGridBuilder().
    addGrid(dtr.maxBins, Array(8, 16, 32)).
    addGrid(dtr.maxDepth, Array(2, 3, 5, 8)).
    addGrid(dtr.minInstancesPerNode, Array(20, 50, 100)).
    build()
    
val cvDtr = new CrossValidator().setEstimator(dtr).setEvaluator(evaluator).
  setEstimatorParamMaps(pgDtr).setNumFolds(5).setSeed(42L) 
  
val dtrModel = cvDtr.fit(train_set) 
println(dtrModel.bestModel.explainParams()+"\n")

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false, current: true)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default: 10)
featuresCol: features column name (default: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: variance (default: variance)
labelCol: label column name (default: label)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32, current: 32)
maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 8)
maxMemoryInMB: Maximum memory in MB allocated to histogram ag

In [68]:
(dtrModel.getEstimatorParamMaps zip dtrModel.avgMetrics).sortBy(_._2)

Array(({
	dtr_5f09103936d4-maxBins: 32,
	dtr_5f09103936d4-maxDepth: 8,
	dtr_5f09103936d4-minInstancesPerNode: 20
},61212.522915586065), ({
	dtr_5f09103936d4-maxBins: 16,
	dtr_5f09103936d4-maxDepth: 8,
	dtr_5f09103936d4-minInstancesPerNode: 20
},61721.49217717871), ({
	dtr_5f09103936d4-maxBins: 32,
	dtr_5f09103936d4-maxDepth: 8,
	dtr_5f09103936d4-minInstancesPerNode: 50
},61793.54800918797), ({
	dtr_5f09103936d4-maxBins: 16,
	dtr_5f09103936d4-maxDepth: 8,
	dtr_5f09103936d4-minInstancesPerNode: 50
},62156.30331940999), ({
	dtr_5f09103936d4-maxBins: 32,
	dtr_5f09103936d4-maxDepth: 8,
	dtr_5f09103936d4-minInstancesPerNode: 100
},62944.89901907229), ({
	dtr_5f09103936d4-maxBins: 16,
	dtr_5f09103936d4-maxDepth: 8,
	dtr_5f09103936d4...

In [69]:
evaluator.evaluate(dtrModel.transform(train_set))

56611.404566064

In [70]:
val dtrPrediction = dtrModel.transform(test_set)
dtrPrediction.show(10)

+--------+--------------------+------------------+
|   label|            features|        prediction|
+--------+--------------------+------------------+
|117400.0|[0.68863496282266...|147415.38461538462|
|320000.0|[-1.1630946629040...|         303134.68|
|113900.0|[0.31928824975972...| 89885.20408163265|
| 83100.0|[-1.3527583678881...| 86694.04761904762|
|187600.0|[0.19450859459237...| 202633.4976076555|
| 91300.0|[-0.9484735036972...|125391.00529100529|
|110200.0|[0.65868906413382...|147415.38461538462|
|308700.0|[-1.2529361669434...|235402.79695431472|
|203800.0|[0.53390940896647...|191588.51174934726|
|500001.0|[0.60378571122235...|          384011.5|
+--------+--------------------+------------------+
only showing top 10 rows



In [71]:
evaluator.evaluate(dtrPrediction)

59475.62025137472

### The above seems to a little overfitting. There are no regParm or elasticNet to tune. Research show it's common for decision tree models to be over-grown and overfitting.  Pruning can help alleviate overfitting.  I add minInstancesPerNode constraint.  The seems to reduce variance and overfitting.  

## Random Forest Regression Model Fit, Transform and Evaluate 

In [73]:
val rfr =  new RandomForestRegressor().setMaxMemoryInMB(512).setCacheNodeIds(true)

val pgRfr = new ParamGridBuilder().
    addGrid(rfr.maxBins, Array(8, 16, 32)).
    addGrid(rfr.maxDepth, Array(5, 8, 12, 16)).
    addGrid(rfr.minInstancesPerNode, Array(20)).
    build()
    
val cvRfr = new CrossValidator().setEstimator(rfr).setEvaluator(evaluator).
  setEstimatorParamMaps(pgRfr).setNumFolds(5).setSeed(42L) 
val rfrModel = cvRfr.fit(train_set) 
println(rfrModel.bestModel.explainParams()+"\n")  

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false, current: true)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: auto)
featuresCol: features column name (default: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: variance (default: variance)
labelCol: label column name (default: label)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature. (default: 32, current: 32)
maxDepth: Maximum depth of the tree

In [74]:
(rfrModel.getEstimatorParamMaps zip rfrModel.avgMetrics).sortBy(_._2)

Array(({
	rfr_add3f40f2ffa-maxBins: 32,
	rfr_add3f40f2ffa-maxDepth: 16,
	rfr_add3f40f2ffa-minInstancesPerNode: 20
},53995.93034139974), ({
	rfr_add3f40f2ffa-maxBins: 32,
	rfr_add3f40f2ffa-maxDepth: 12,
	rfr_add3f40f2ffa-minInstancesPerNode: 20
},54234.685943040604), ({
	rfr_add3f40f2ffa-maxBins: 16,
	rfr_add3f40f2ffa-maxDepth: 16,
	rfr_add3f40f2ffa-minInstancesPerNode: 20
},55517.5368158844), ({
	rfr_add3f40f2ffa-maxBins: 16,
	rfr_add3f40f2ffa-maxDepth: 12,
	rfr_add3f40f2ffa-minInstancesPerNode: 20
},55754.37328374521), ({
	rfr_add3f40f2ffa-maxBins: 32,
	rfr_add3f40f2ffa-maxDepth: 8,
	rfr_add3f40f2ffa-minInstancesPerNode: 20
},57540.076427766646), ({
	rfr_add3f40f2ffa-maxBins: 16,
	rfr_add3f40f2ffa-maxDepth: 8,
	rfr_add3f40f2...

In [75]:
evaluator.evaluate(rfrModel.transform(train_set))

47342.5567613887

In [76]:
val rfrPrediction = rfrModel.transform(test_set)
rfrPrediction.show(10)

+--------+--------------------+------------------+
|   label|            features|        prediction|
+--------+--------------------+------------------+
|117400.0|[0.68863496282266...|150101.23952855787|
|320000.0|[-1.1630946629040...|353019.82845839777|
|113900.0|[0.31928824975972...|105523.13078073513|
| 83100.0|[-1.3527583678881...| 90497.28232335419|
|187600.0|[0.19450859459237...| 209087.4955315608|
| 91300.0|[-0.9484735036972...|114915.31531508919|
|110200.0|[0.65868906413382...|121627.91963113724|
|308700.0|[-1.2529361669434...|235375.40572882886|
|203800.0|[0.53390940896647...|175043.05214427217|
|500001.0|[0.60378571122235...|392332.21383996226|
+--------+--------------------+------------------+
only showing top 10 rows



In [77]:
evaluator.evaluate(rfrPrediction)

52057.414977450644

### The above is still a little overfitting and it is better than before (10 numTree and no minInstancesPerNode, 30K+ difference in metrics value).  Increasing numTree help stablize the metrics. 

In [78]:
val imp_features_arr = rfrModel.bestModel.asInstanceOf[RandomForestRegressionModel].featureImportances.toArray
imp_features_arr

Array(0.060573039498378244, 0.04765862892710125, 0.038085786043434794, 0.006481938131746838, 0.006393222902016197, 0.007132552001144698, 0.005379025230251163, 0.3089598383060173, 0.054836496090242694, 0.10789750310754889, 0.1065564500282175, 0.006052499578144016, 0.23744360585703922, 0.004638894234662272, 0.0019105200640549137, 0.0)

In [80]:
val fns = housing_temp.schema.fieldNames
((fns.take(8) ++ fns.slice(14, 17) ++ op_cat_arr) zip imp_features_arr).sortBy(_._2)(Ordering[Double].reverse)

Array((median_income,0.3089598383060173), ([INLAND],0.23744360585703922), (population_per_household,0.10789750310754889), (bedrooms_per_room,0.1065564500282175), (longitude,0.060573039498378244), (room_per_household,0.054836496090242694), (latitude,0.04765862892710125), (housing_median_age,0.038085786043434794), (population,0.007132552001144698), (total_rooms,0.006481938131746838), (total_bedrooms,0.006393222902016197), ([<1H OCEAN],0.006052499578144016), (households,0.005379025230251163), ([NEAR OCEAN],0.004638894234662272), ([NEAR BAY],0.0019105200640549137), ([ISLAND],0.0))