# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---
# 1. Get the data
Let's start the lab by loading the dataset. The can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.

In [1]:
// TODO: Replace <FILL IN> with appropriate code

val housing = spark.read.option("inferSchema",true).option("header",true).format("csv").load("data/housing.csv")

housing = [longitude: double, latitude: double ... 8 more fields]


[longitude: double, latitude: double ... 8 more fields]

---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [2]:
// TODO: Replace <FILL IN> with appropriate code

housing.printSchema

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [3]:
// TODO: Replace <FILL IN> with appropriate code

housing.count()

20640

## 2.2. Look at the data
Print the first five records of the dataset.

In [4]:
// TODO: Replace <FILL IN> with appropriate code

housing.take(5)

0,1,2,3,4,5,6,7,8,9
-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


Print the number of records with population more than 10000.

In [5]:
// TODO: Replace <FILL IN> with appropriate code

housing.filter("population > 10000").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -121.92|   37.53|               7.0|    28258.0|        3864.0|   12203.0|    3701.0|       8.4045|          451100.0|      <1H OCEAN|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|      <1H OCEAN|
|  -117.87|   34.04|               7.0|    27700.0|        4179.0|   15037.0|    4072.0|       6.6288|          339700.0|      <1H OCEAN|
|  -117.88|   33.96|              16.0|    19059.0|        3079.0|   10988.0|    3061.0|       5.5469|          265200.0|      <1H OCEAN|
|  -118.78|   34.16|              

## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

In [6]:
// TODO: Replace <FILL IN> with appropriate code

housing.describe("housing_median_age","total_rooms","median_house_value","population").show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev| 12.58555761211163|2181.6152515827944|115395.61587441359|  1132.46212176534|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [7]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.sql.functions._

housing.select(max("housing_median_age"),min("total_rooms"),avg("median_house_value")).show()

+-----------------------+----------------+-----------------------+
|max(housing_median_age)|min(total_rooms)|avg(median_house_value)|
+-----------------------+----------------+-----------------------+
|                   52.0|             2.0|     206855.81690891474|
+-----------------------+----------------+-----------------------+



## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order.

In [8]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy("ocean_proximity").agg(count("households")).orderBy(desc("count(households)")).show()

+---------------+-----------------+
|ocean_proximity|count(households)|
+---------------+-----------------+
|      <1H OCEAN|             9136|
|         INLAND|             6551|
|     NEAR OCEAN|             2658|
|       NEAR BAY|             2290|
|         ISLAND|                5|
+---------------+-----------------+



Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when print it.

In [9]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy("ocean_proximity").agg(avg("median_house_value")).withColumnRenamed("avg(median_house_value)", "avg_value").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewrite the above question in SQL.

In [10]:
// TODO: Replace <FILL IN> with appropriate code

housing.createOrReplaceTempView("df")
spark.sql("""select ocean_proximity, avg(median_house_value) as avg_value from df group by ocean_proximity""").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [11]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("housing_median_age","total_rooms","median_house_value","population")).setOutputCol("attributes")

val housingAttrs = va.transform(housing)

housingAttrs.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|          attributes|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,45260...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,3585...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,3521...|
|  -122.25|   37.85|              52.0|     12

va = vecAssembler_8e0af4e0c709
housingAttrs = [longitude: double, latitude: double ... 9 more fields]


[longitude: double, latitude: double ... 9 more fields]

In [12]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val Row(coeff: Matrix) = Correlation.corr(housingAttrs,"attributes").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                   -0.36126220122231784  0.10562341249318154   -0.2962442397735293   
-0.36126220122231784  1.0                   0.13415311380654338   0.8571259728659772    
0.10562341249318154   0.13415311380654338   1.0                   -0.02464967888891235  
-0.2962442397735293   0.8571259728659772    -0.02464967888891235  1.0                   


coeff = 


1.0                   -0.36126220122231784  0.10562341249318154   -0.2962442397735293   
-0.36126220122231784  1.0                   0.13415311380654338   0.8571259728659772    
0.10562341249318154   0.13415311380654338   1.0                   -0.02464967888891235  
-0.2962442397735293   0.8571259728659772    -0.02464967888891235  1.0                   

## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_rooms
population_per_household = population / households
```

In [13]:
// TODO: Replace <FILL IN> with appropriate code

val housingCol1 = housing.withColumn("rooms_per_household",housing("total_rooms")/housing("households"))
val housingCol2 = housingCol1.withColumn("bedrooms_per_room",housing("total_bedrooms") / housing("total_rooms"))
val housingExtra = housingCol2.withColumn("population_per_household",housing("population") / housing("households"))

housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



housingCol1 = [longitude: double, latitude: double ... 9 more fields]
housingCol2 = [longitude: double, latitude: double ... 10 more fields]
housingExtra = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [14]:
// TODO: Replace <FILL IN> with appropriate code

val renamedHousing = housingExtra.withColumnRenamed("median_house_value","label")

renamedHousing = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, sice we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [15]:
// label columns
val colLabel = "label"

// categorical columns
val colCat = "ocean_proximity"

// numerical columns
val colNum = renamedHousing.columns.filter(_ != colLabel).filter(_ != colCat)

colLabel = label
colCat = ocean_proximity
colNum = Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household)


[longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household]

## 3.1. Prepare continuse attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [16]:
// TODO: Replace <FILL IN> with appropriate code

for (c <- colNum) {
    renamedHousing.select(c).agg(sum(when(col(c).isNotNull,0).otherwise(1))).show()
    //renamedHousing.groupBy(c).agg(count("*").alias("numOfNull")).where(col(c).isNull).show()
}

+--------------------------------------------------------+
|sum(CASE WHEN (longitude IS NOT NULL) THEN 0 ELSE 1 END)|
+--------------------------------------------------------+
|                                                       0|
+--------------------------------------------------------+

+-------------------------------------------------------+
|sum(CASE WHEN (latitude IS NOT NULL) THEN 0 ELSE 1 END)|
+-------------------------------------------------------+
|                                                      0|
+-------------------------------------------------------+

+-----------------------------------------------------------------+
|sum(CASE WHEN (housing_median_age IS NOT NULL) THEN 0 ELSE 1 END)|
+-----------------------------------------------------------------+
|                                                                0|
+-----------------------------------------------------------------+

+----------------------------------------------------------+
|sum(CASE W

As we observerd above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an `Imputer` instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [17]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer().setStrategy("mean").setInputCols(Array("total_bedrooms","bedrooms_per_room")).setOutputCols(Array("total_bedrooms","bedrooms_per_room"))                                  
val imputedHousing = imputer.fit(renamedHousing).transform(renamedHousing)

imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



imputer = imputer_cfd2b3eb0c01
imputedHousing = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [18]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val va = new VectorAssembler().setInputCols(colNum).setOutputCol("numericalFeatures")
val featuredHousing = va.transform(imputedHousing)

val scaler = new StandardScaler().setInputCol("numericalFeatures").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)
val scaledHousing = scaler.fit(featuredHousing).transform(featuredHousing)

scaledHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|   numericalFeatures|      scaledFeatures|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-122.23,37.88,41...|[-61.007269596069...|
|  -122.22|   37.86|              21.0|     7099.0|        1

va = vecAssembler_1ca89bb51a73
featuredHousing = [longitude: double, latitude: double ... 12 more fields]
scaler = stdScal_97390b9bc023
scaledHousing = [longitude: double, latitude: double ... 13 more fields]


[longitude: double, latitude: double ... 13 more fields]

## 3.2. Prepare categorical attributes
After imputing and scaling the continuse attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute `ocean_proximity`.

In [19]:
// TODO: Replace <FILL IN> with appropriate code

renamedHousing.select("ocean_proximity").distinct.show

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the `StringIndexer` that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [20]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer().setInputCol("ocean_proximity").setOutputCol("indexed_ocean_proximity")
val idxHousing = indexer.fit(renamedHousing).transform(renamedHousing)

idxHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|indexed_ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.2

indexer = strIdx_ecbe136af6fc
idxHousing = [longitude: double, latitude: double ... 12 more fields]


[longitude: double, latitude: double ... 12 more fields]

Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [21]:
indexer.fit(renamedHousing).labels

[<1H OCEAN, INLAND, NEAR OCEAN, NEAR BAY, ISLAND]

### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the `OneHotEncoderEstimator` Estimator.

In [22]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encoder = new OneHotEncoderEstimator().setInputCols(Array("indexed_ocean_proximity")).setOutputCols(Array("encoded_ocean_proximity"))
val ohHousing = encoder.fit(idxHousing).transform(idxHousing)

ohHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|indexed_ocean_proximity|encoded_ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+-----------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|                    3.0|          (4,[3],[1.0])|
|  -122.22|   37.86|              21

encoder = oneHotEncoder_b08f4a588f38
ohHousing = [longitude: double, latitude: double ... 13 more fields]


[longitude: double, latitude: double ... 13 more fields]

---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [23]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

val numPipeline = new Pipeline().setStages(Array(imputer,va,scaler))
val catPipeline = new Pipeline().setStages(Array(indexer,encoder))
val pipeline = new Pipeline().setStages(Array(numPipeline, catPipeline))
val newHousing = pipeline.fit(renamedHousing).transform(renamedHousing)

newHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+-----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|   numericalFeatures|      scaledFeatures|indexed_ocean_proximity|encoded_ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+-----------------------+-----------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14

numPipeline = pipeline_eb72216c1cfc
catPipeline = pipeline_2f6b9bf3a724
pipeline = pipeline_373e3d1df1f6
newHousing = [longitude: double, latitude: double ... 15 more fields]


[longitude: double, latitude: double ... 15 more fields]

Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector, and call the new column `features`.

In [24]:
// TODO: Replace <FILL IN> with appropriate code

val va2 = new VectorAssembler().setInputCols(Array("scaledFeatures","encoded_ocean_proximity")).setOutputCol("features")
val dataset = va2.transform(newHousing).select("features", "label")

dataset.show(5)

+--------------------+--------+
|            features|   label|
+--------------------+--------+
|[-61.007269596069...|452600.0|
|[-61.002278409814...|358500.0|
|[-61.012260782324...|352100.0|
|[-61.017251968579...|341300.0|
|[-61.017251968579...|342200.0|
+--------------------+--------+
only showing top 5 rows



va2 = vecAssembler_8f978c6633ce
dataset = [features: vector, label: double]


[features: vector, label: double]

---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [25]:
// TODO: Replace <FILL IN> with appropriate code

val Array(trainSet, testSet) = dataset.randomSplit(Array(0.8, 0.2))

trainSet = [features: vector, label: double]
testSet = [features: vector, label: double]


[features: vector, label: double]

## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [26]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.LinearRegression

// train the model
val lr = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(trainSet)
val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients}, Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [-15556.506397732895,-12473.634154038255,15113.813744926687,8533.494106827868,13039.230784818687,-29805.91454292005,14083.1643892406,83530.39545687684,3943.7421796881536,19445.065502057052,-1050.0494313024044,18186.551818566506,-37627.691640125886,31270.937454777064,18379.168006061882], Intercept: -808157.4971628699
RMSE: 68664.97716947603


lr = linReg_39eeaf8d63cd
lrModel = linReg_39eeaf8d63cd
trainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@6ceecf92


org.apache.spark.ml.regression.LinearRegressionTrainingSummary@6ceecf92

Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [27]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
| 147259.7645971781|66900.0|[-61.995524474579...|
|164561.64518864825|68400.0|[-61.995524474579...|
|168478.51836781914|70000.0|[-61.985542102068...|
| 173984.0086774386|67000.0|[-61.980550915813...|
| 148994.2908594812|64600.0|[-61.980550915813...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 70368.95634488532


predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_fdc89edb8a91
rmse = 70368.95634488532


70368.95634488532

## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [28]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("features")

// train the model
val dtModel = dt.fit(trainSet)

// make predictions on the test data
val predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|138419.96238244514|66900.0|[-61.995524474579...|
|167078.95072308517|68400.0|[-61.995524474579...|
|146292.13857677902|70000.0|[-61.985542102068...|
|138419.96238244514|67000.0|[-61.980550915813...|
|138419.96238244514|64600.0|[-61.980550915813...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 69589.84468293576


dt = dtr_6b1f3dabdb3a
dtModel = DecisionTreeRegressionModel (uid=dtr_6b1f3dabdb3a) of depth 5 with 63 nodes
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_dcd3ff4fd38c
rmse = 69589.84468293576


69589.84468293576

## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [29]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val rf = new RandomForestRegressor().setNumTrees(10)

// train the model
val rfModel = rf.fit(trainSet)

// make predictions on the test data
val predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
| 166879.5319449747|66900.0|[-61.995524474579...|
|  171013.009593966|68400.0|[-61.995524474579...|
|173003.11859628579|70000.0|[-61.985542102068...|
|160469.74414201372|67000.0|[-61.980550915813...|
|160469.74414201372|64600.0|[-61.980550915813...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 67366.37099345516


rf = rfr_720cb0af7a3a
rfModel = RandomForestRegressionModel (uid=rfr_720cb0af7a3a) with 10 trees
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_8f5e0261f62d
rmse = 67366.37099345516


67366.37099345516

## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [30]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val gb = new GBTRegressor().setMaxIter(10).setFeatureSubsetStrategy("auto")

// train the model
val gbModel = gb.fit(trainSet)

// make predictions on the test data
val predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|108889.41761252526|66900.0|[-61.995524474579...|
|144577.81726789603|68400.0|[-61.995524474579...|
| 84829.13112772178|70000.0|[-61.985542102068...|
|104290.17153440078|67000.0|[-61.980550915813...|
|  97491.8478314796|64600.0|[-61.980550915813...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 60653.91854580034


gb = gbtr_f630f363c76a
gbModel = GBTRegressionModel (uid=gbtr_f630f363c76a) with 10 trees
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_155f5069b0e8
rmse = 60653.91854580034


60653.91854580034

In [31]:
println(gb.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n]. (default: all, current: auto)
featuresCol: features column name (default: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: variance (default: variance)
labelCol: label column name (default: label)
lossType: Loss function which GBT tries to minimize (case-insensitive). Supported options: squared, abso

---
# 6. Hyperparameter tuning
An important task in Machie Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

In [32]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.CrossValidator

val paramGrid = new ParamGridBuilder().addGrid(rf.numTrees,Array(1,5,10)).addGrid(rf.maxDepth,Array(5,10,15)).build()

val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val cv = new CrossValidator().setEstimator(rf).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+-----------------+-------+--------------------+
|       prediction|  label|            features|
+-----------------+-------+--------------------+
|         109345.0|66900.0|[-61.995524474579...|
|         137495.0|68400.0|[-61.995524474579...|
|79485.14705882352|70000.0|[-61.985542102068...|
|79088.76427494074|67000.0|[-61.980550915813...|
|75913.45475113123|64600.0|[-61.980550915813...|
+-----------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 53476.83867334043


paramGrid = 


Array({
	rfr_720cb0af7a3a-maxDepth: 5,
	rfr_720cb0af7a3a-numTrees: 1
}, {
	rfr_720cb0af7a3a-maxDepth: 5,
	rfr_720cb0af7a3a-numTrees: 5
}, {
	rfr_720cb0af7a3a-maxDepth: 5,
	rfr_720cb0af7a3a-numTrees: 10
}, {
	rfr_720cb0af7a3a-maxDepth: 10,
	rfr_720cb0af7a3a-numTrees: 1
}, {
	rfr_720cb0af7a3a-maxDepth: 10,
	rfr_720cb0af7a3a-numTrees: 5
}, {
	rfr_720cb0af7a3a-maxDepth: 10,
	rfr_720cb0af7a3a-numTrees: 10
}, {
	rfr_720cb0af7a3a-maxDepth: 15,
	rfr_720cb0af7a3a-numTrees: 1
}, {
	rfr_720cb0af7a3a-maxDepth: 15,
	rfr_720cb0af7a3a-numTrees: 5
}, {
	rfr_720cb0af7a3a-maxDepth: 15,
	rfr_720c...


[{
	rfr_720cb0af7a3a-maxDepth: 5,
	rfr_720cb0af7a3a-numTrees: 1
}, {
	rfr_720cb0af7a3a-maxDepth: 5,
	rfr_720cb0af7a3a-numTrees: 5
}, {
	rfr_720cb0af7a3a-maxDepth: 5,
	rfr_720cb0af7a3a-numTrees: 10
}, {
	rfr_720cb0af7a3a-maxDepth: 10,
	rfr_720cb0af7a3a-numTrees: 1
}, {
	rfr_720cb0af7a3a-maxDepth: 10,
	rfr_720cb0af7a3a-numTrees: 5
}, {
	rfr_720cb0af7a3a-maxDepth: 10,
	rfr_720cb0af7a3a-numTrees: 10
}, {
	rfr_720cb0af7a3a-maxDepth: 15,
	rfr_720cb0af7a3a-numTrees: 1
}, {
	rfr_720cb0af7a3a-maxDepth: 15,
	rfr_720cb0af7a3a-numTrees: 5
}, {
	rfr_720cb0af7a3a-maxDepth: 15,
	rfr_720cb0af7a3a-numTrees: 10
}]

---
# 7. Custom transformer
At the end of part two, we added extra columns to the `housing` dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns `inputCol1` and `inputCol2`, as well as the name of ouput column `outputCol`. It, then, computes `inputCol1` divided by `inputCol2`, and adds its result as a new column to the dataset. The details of the implemeting a custom Tranfomer is explained [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types). Please read it before before starting to implement it.

First, define the given parameters of the Transformer and implement a method to validate their schemas (`StructType`).

In [33]:
import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.ml.param.{ParamMap, Param, Params}

trait MyParams extends Params {
    final val inputCol1 = new Param[String](this, "inputCol1", "The input column1")
    final val inputCol2 = new Param[String](this, "inputCol2", "The input column2")
    final val outputCol = new Param[String](this, "outputCol", "The output column")
    
  protected def validateAndTransformSchema(schema: StructType): StructType = {
    // Check that the input type is a string
    val idx1 = schema.fieldIndex($(inputCol1))
    val field1 = schema.fields(idx1)
    if (field1.dataType != DoubleType) {
      throw new Exception("Input type ${field1.dataType} did not match input type DoubleType")
    }
    val idx2 = schema.fieldIndex($(inputCol2))
    val field2 = schema.fields(idx2)
    if (field2.dataType != DoubleType) {
      throw new Exception("Input type ${field2.dataType} did not match input type DoubleType")
    }
    // Add the return field
    schema.add(StructField($(outputCol), DoubleType, false))
  }
}


defined trait MyParams


Then, extend the class `Transformer`, and implement its setter functions for the input and output columns, and call then `setInputCol1`, `setInputCol2`, and `setOutputCol`. Morever, you need to override the methods `copy`, `transformSchema`, and the `transform`. The details of what you need to cover in these methods is given [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types).

In [34]:
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.{col, udf}

class MyTransformer(override val uid: String) extends Transformer with MyParams {
    def this() = this(Identifiable.randomUID("configurablewordcount"))
    
    def setInputCol1(value: String) = set(inputCol1, value)
    
    def setInputCol2(value: String) = set(inputCol2, value)
    
    def setOutputCol(value: String) = set(outputCol, value)

    override def copy(extra: ParamMap) : MyTransformer = { 
        defaultCopy(extra)
    }
    
    override def transformSchema(schema: StructType):StructType={
        validateAndTransformSchema(schema)
    }
    
    override def transform(dataset: Dataset[_]): DataFrame = {
        //val housingCol1 = housing.withColumn("rooms_per_household", $"total_rooms" / $"households") 
        //val housingCol2 = housingCol1.withColumn("bedrooms_per_room", $"total_bedrooms" / $"total_rooms") 
        //val housingExtra = housingCol2.withColumn("population_per_household", $"population" / $"households") 
        dataset.withColumn($(outputCol), dataset($(inputCol1)) / dataset($(inputCol2)))
    }
}

defined class MyTransformer


Now, an instance of `MyTransformer`, and set the input columns `total_rooms` and `households`, and the output column `rooms_per_household` and run it over the `housing` dataset.

In [35]:
val myTransformer = new MyTransformer().setInputCol1("total_rooms").setInputCol2("households").setOutputCol("rooms_per_household")

val myDataset = myTransformer.transform(housing).select("rooms_per_household").show(5)

+-------------------+
|rooms_per_household|
+-------------------+
|  6.984126984126984|
|  6.238137082601054|
|  8.288135593220339|
| 5.8173515981735155|
|  6.281853281853282|
+-------------------+
only showing top 5 rows



myTransformer = configurablewordcount_2663b2044f89


myDataset: Unit = ()


configurablewordcount_2663b2044f89

---
# 8. Custom estimator (predictor)
Now, it's time to implement your own linear regression with gradient descent algorithm as a brand new Estimator. The whole code of the Estimator is given to you, and you do not need to implement anything. It is just a sample that shows how to build a custom Estimator.

The gradient descent update for linear regression is:
$$
w_{i+1} = w_{i} - \alpha_{i} \sum\limits_{j=1}^n (w_i^\top x_j - y_j)x_j
$$

where $i$ is the iteration number of the gradient descent algorithm, and $j$ identifies the observation. Here, $w$ represents an array of weights that is the same size as the array of features and provides a weight for each of the features when finally computing the label prediction in the form:

$$
prediction = w^\top \cdot\ x
$$

where $w$ is the final array of weights computed by the gradient descent, $x$ is the array of features of the observation point and $prediction$ is the label we predict should be associated to this observation.

The given `Helper` class implements the helper methods:
* `dot`: implements the dot product of two vectors and the dot product of a vector and a scalar
* `sum`: implements addition of two vectors
* `fill`: creates a vector of predefined size and initialize it with the predefined value

What you need to do is to implement the methods of the Linear Regresstion class `LR`, which are
* `rmsd`: computes the Root Mean Square Error of a given RDD of tuples of (label, prediction) using the formula:
$$
rmse = \sqrt{\frac{\sum\limits_{i=1}^n (label - prediction)^2}{n}}
$$
* `gradientSummand`: computes the following formula:
$$
gs_{ij} = (w_i^\top x_j - y_j)x_j
$$
* `gradient`: computes the following formula:
$$
gradient = \sum\limits_{j=1}^n gs_{ij}
$$

In [36]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

case class Instance(label: Double, features: Vector)

object Helper extends Serializable {
  def dot(v1: Vector, v2: Vector): Double = {
    val m = Matrices.dense(1, v1.size, v1.toArray)
    m.multiply(v2).values(0)
  }

  def dot(v: Vector, s: Double): Vector = {
    val baseArray = v.toArray.map(vi => vi * s)
    Vectors.dense(baseArray)
  }

  def sumVectors(v1: Vector, v2: Vector): Vector = {
    val baseArray = ((v1.toArray) zip (v2.toArray)).map { case (val1, val2) => val1 + val2 }
    Vectors.dense(baseArray)
  }

  def fillVector(size: Int, fillVal: Double): Vector = Vectors.dense(Array.fill[Double](size)(fillVal));
}

defined class Instance
defined object Helper


In [37]:
class LR() extends Serializable {
  def calcRMSE(labelsAndPreds: RDD[(Double, Double)]): Double = {
    val regressionMetrics = new RegressionMetrics(labelsAndPreds)
    regressionMetrics.rootMeanSquaredError
  }
  
  def gradientSummand(weights: Vector, lp: Instance): Vector = {
    val mult = (Helper.dot(weights, lp.features) - lp.label)
    val seq = (0 to lp.features.size - 1).map(i => lp.features(i) * mult)
    return Vectors.dense(seq.toArray)
  }
  
  def linregGradientDescent(trainData: RDD[Instance], numIters: Int): (Vector, Array[Double]) = {
    val n = trainData.count()
    val d = trainData.take(1)(0).features.size
    var w = Helper.fillVector(d, 0)
    val alpha = 1.0
    val errorTrain = Array.fill[Double](numIters)(0.0)

    for (i <- 0 until numIters) {
      val labelsAndPredsTrain = trainData.map(lp => (lp.label, Helper.dot(w, lp.features)))
      errorTrain(i) = calcRMSE(labelsAndPredsTrain)

      val gradient = trainData.map(lp => gradientSummand(w, lp)).reduce((v1, v2) => Helper.sumVectors(v1, v2))
      val alpha_i = alpha / (n * scala.math.sqrt(i + 1))
      val wAux = Helper.dot(gradient, (-1) * alpha_i)
      w = Helper.sumVectors(w, wAux)
    }
    (w, errorTrain)
  }
}

Name: Compile Error
Message: <console>:18: error: not found: type RDD
         def calcRMSE(labelsAndPreds: RDD[(Double, Double)]): Double = {
                                      ^
<console>:19: error: not found: type RegressionMetrics
           val regressionMetrics = new RegressionMetrics(labelsAndPreds)
                                       ^
<console>:23: error: type Vector takes type parameters
         def gradientSummand(weights: Vector, lp: Instance): Vector = {
                                                             ^
<console>:23: error: type Vector takes type parameters
         def gradientSummand(weights: Vector, lp: Instance): Vector = {
                                      ^
<console>:26: error: not found: value Vectors
           return Vectors.dense(seq.toArray)
                  ^
<console>:29: error: type Vector takes type parameters
         def linregGradientDescent(trainData: RDD[Instance], numIters: Int): (Vector, Array[Double]) = {
                    

In [38]:
abstract class MyLinearModel[FeaturesType, Model <: MyLinearModel[FeaturesType, Model]]
  extends PredictionModel[FeaturesType, Model] {
}

class MyLinearModelImpl(override val uid: String, val weights: Vector, val trainingError: Array[Double])
    extends MyLinearModel[Vector, MyLinearModelImpl] {

  override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)

  def predict(features: Vector): Double = {
    println("Predicting")
    val prediction = Helper.dot(weights, features)
    prediction
  }
}

Name: Compile Error
Message: <console>:16: error: not found: type PredictionModel
         extends PredictionModel[FeaturesType, Model] {
                 ^
<console>:20: error: type Vector takes type parameters
           extends MyLinearModel[Vector, MyLinearModelImpl] {
                                 ^
<console>:19: error: type Vector takes type parameters
       class MyLinearModelImpl(override val uid: String, val weights: Vector, val trainingError: Array[Double])
                                                                      ^
<console>:22: error: not found: type ParamMap
         override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)
                                  ^
<console>:22: error: not found: value defaultCopy
         override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)
                                                                 ^
<console>:24: error: type Vector takes type parameters
         def predict(features:

In [39]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

abstract class MyLinearRegression[
    FeaturesType,
    Learner <: MyLinearRegression[FeaturesType, Learner, Model],
    Model <: MyLinearModel[FeaturesType, Model]]
  extends Predictor[FeaturesType, Learner, Model] {
}

class MyLinearRegressionImpl(override val uid: String)
    extends MyLinearRegression[Vector, MyLinearRegressionImpl, MyLinearModelImpl] {
  def this() = this(Identifiable.randomUID("linReg"))

  override def copy(extra: ParamMap): MyLinearRegressionImpl = defaultCopy(extra)
  
  def train(dataset: Dataset[_]): MyLinearModelImpl = {
    println("Training")

    val numIters = 10

    val instances: RDD[Instance] = dataset.select(
      col($(labelCol)), col($(featuresCol))).rdd.map {
        case Row(label: Double, features: Vector) =>
          Instance(label, features)
      }

    val (weights, trainingError) = new LR().linregGradientDescent(instances, numIters)

    new MyLinearModelImpl(uid, weights, trainingError)
  }
}

Name: Compile Error
Message: <console>:34: error: not found: type MyLinearModel
           Model <: MyLinearModel[FeaturesType, Model]]
                    ^
<console>:39: error: not found: type MyLinearModelImpl
           extends MyLinearRegression[Vector, MyLinearRegressionImpl, MyLinearModelImpl] {
                                                                      ^
<console>:44: error: not found: type MyLinearModelImpl
         def train(dataset: Dataset[_]): MyLinearModelImpl = {
                                         ^
<console>:55: error: not found: type LR
           val (weights, trainingError) = new LR().linregGradientDescent(instances, numIters)
                                              ^
<console>:57: error: not found: type MyLinearModelImpl
           new MyLinearModelImpl(uid, weights, trainingError)
               ^

StackTrace: 

In [40]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val lr = new MyLinearRegressionImpl().setLabelCol("label").setFeaturesCol("features")
val model = lr.fit(trainSet)
val predictions = model.transform(trainSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

Name: Compile Error
Message: <console>:82: error: not found: type MyLinearRegressionImpl
       val lr = new MyLinearRegressionImpl().setLabelCol("label").setFeaturesCol("features")
                    ^

StackTrace: 

---
# 9. An End-to-End Classification Test
As the last step, you are given a dataset called `data/ccdefault.csv`. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at `data/ccdefault.txt`. In this task you should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:
1. Load the data.
2. Carry out some exploratory analyses (e.g., how various features and the target variable are distributed).
3. Train a model to predict the target variable (risk of `default`).
  - Employ three different models (logistic regression, decision tree, and random forest).
  - Compare the models' performances (e.g., AUC).
  - Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).
4. What more would you do with this data? Anything to help you devise a better solution?

## 9.1 Load the file, explore the data
Load the file from file `data/ccdefault.csv`.

In [41]:
val originalCC = spark.read.format("csv").option("inferSchema",true).option("header",true).load("data/ccdefault.csv")

originalCC = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

Print schema, the record number.

In [42]:
originalCC.printSchema
originalCC.count

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

30000

Cache the file so we can read efficiently from memory as opposed to reading it from disk every time we need it.  
Show the first 5 rows of data in other to have a glimpse of the data.

In [43]:
originalCC.cache()
originalCC.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    

Check if there is any NULL value for each column.

In [44]:
import org.apache.spark.sql.functions._
for (c <- originalCC.columns){ originalCC.select(c).agg(sum(when(col(c).isNotNull,0).otherwise(1)).alias(s"$c: #ofNull")).show }

+-----------+
|ID: #ofNull|
+-----------+
|          0|
+-----------+

+------------------+
|LIMIT_BAL: #ofNull|
+------------------+
|                 0|
+------------------+

+------------+
|SEX: #ofNull|
+------------+
|           0|
+------------+

+------------------+
|EDUCATION: #ofNull|
+------------------+
|                 0|
+------------------+

+-----------------+
|MARRIAGE: #ofNull|
+-----------------+
|                0|
+-----------------+

+------------+
|AGE: #ofNull|
+------------+
|           0|
+------------+

+--------------+
|PAY_0: #ofNull|
+--------------+
|             0|
+--------------+

+--------------+
|PAY_2: #ofNull|
+--------------+
|             0|
+--------------+

+--------------+
|PAY_3: #ofNull|
+--------------+
|             0|
+--------------+

+--------------+
|PAY_4: #ofNull|
+--------------+
|             0|
+--------------+

+--------------+
|PAY_5: #ofNull|
+--------------+
|             0|
+--------------+

+--------------+
|PAY_6: #ofNull|


Summary on `DEFAULT`

In [45]:
originalCC.describe("DEFAULT").show()
originalCC.select("DEFAULT").distinct().show()

+-------+-------------------+
|summary|            DEFAULT|
+-------+-------------------+
|  count|              30000|
|   mean|             0.2212|
| stddev|0.41506180569093254|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+
|DEFAULT|
+-------+
|      1|
|      0|
+-------+



In [46]:
originalCC.filter("DEFAULT = 1").count()

6636

## 9.2 data preprocessing & feature engineering
From above eplorations, we conclude that all data is numerical(all Int) and there is no NULL.  
`featuresCols`: Array containing all features  
`catCols`: categorical features (Treat a feature with less than 8 distinct values as categorical.)   
`numCols`: numerical features  

We will drop the `ID` column as it is irrelevant for prediction `DEFAULT`.

In [47]:
val featureCols = originalCC.columns.filter( _ != "ID").filter( _ != "DEFAULT")

featureCols = Array(LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


[LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6]

In [48]:
val catCols = featureCols.filter(originalCC.select(_).distinct().count < 8)

catCols = Array(SEX, EDUCATION, MARRIAGE)


[SEX, EDUCATION, MARRIAGE]

In [49]:
val numCols = featureCols.filter( _ != "SEX").filter( _ != "EDUCATION").filter( _ != "MARRIAGE")

numCols = Array(LIMIT_BAL, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


[LIMIT_BAL, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6]

### 9.2.1 Prepare continuous features

Add a new column assembling all numerical features into an vector `numericalFeatures`.  
And declare a `StandardScaler` estimator.

In [50]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StandardScaler

val va = new VectorAssembler().setInputCols(numCols).setOutputCol("numericalFeatures")
val scaler = new StandardScaler().setInputCol("numericalFeatures").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)


va = vecAssembler_17b2c36cd818
scaler = stdScal_009efd2f1170


stdScal_009efd2f1170

### 9.2.2 Prepare categorical features

In this dataset, we have ordinal variables like education and also nominal variables like sex. We will use One-Hot Encoding to convert all categorical variables into binary vectors. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.  
`catCols` = Array(SEX, EDUCATION, MARRIAGE)
`encodedCatCols` = Array(encodedSEX, encodedEDUCATION, encodedMARRIAGE)

In [51]:
import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encodedCatCols = Array("encodedSEX", "encodedEDUCATION", "encodedMARRIAGE")
val encoding = new OneHotEncoderEstimator().setInputCols(catCols).setOutputCols(encodedCatCols)
//val output2 = encoding.fit(scaledCC).transform(scaledCC)
val va2 = new VectorAssembler().setInputCols(encodedCatCols).setOutputCol("encodedFeatures")
//val output3 = va2.transform(output2)

//Put `scaledFeatures` and `encodedFeatures` into one vector `features`
val va3 = new VectorAssembler().setInputCols(Array("scaledFeatures","encodedFeatures")).setOutputCol("features")
//val output5 = va3.transform(output4)

encodedCatCols = Array(encodedSEX, encodedEDUCATION, encodedMARRIAGE)
encoding = oneHotEncoder_30e8d55a16c7
va2 = vecAssembler_140a2015083c
va3 = vecAssembler_08a29474b5ec


vecAssembler_08a29474b5ec

### 9.2.3 Pipeline
Since there are multiple estimators and transformations, we use a Pipeline to tie the stages together. This will simplify our code.

In [52]:
import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(va,scaler,encoding,va2,va3))
val output = pipeline.fit(originalCC).transform(originalCC)
val processedCC = output.withColumn("label",output("DEFAULT").cast("double"))

pipeline = pipeline_542a3ef702bd
output = [ID: int, LIMIT_BAL: int ... 30 more fields]
processedCC = [ID: int, LIMIT_BAL: int ... 31 more fields]


[ID: int, LIMIT_BAL: int ... 31 more fields]

In [53]:
val dataset = processedCC.select("features","label")
dataset.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(31,[0,1,2,3,4,5,...|  1.0|
|(31,[0,1,2,3,7,8,...|  1.0|
|(31,[0,1,8,9,10,1...|  0.0|
|(31,[0,1,8,9,10,1...|  0.0|
|(31,[0,1,2,4,8,9,...|  0.0|
+--------------------+-----+
only showing top 5 rows



dataset = [features: vector, label: double]


[features: vector, label: double]

In [54]:
dataset.select("features").head

[(31,[0,1,2,3,4,5,6,7,8,9,10,15,24,29],[0.15414535998894324,2.603628744963987,1.7796736791807803,1.6705842242124869,-0.8355143261989371,-0.8553305663148897,-1.7649331341008119,-1.7391491486204211,0.053139869207970765,0.0435834725779124,0.009935199510232218,0.029903384202815683,1.0,1.0])]

In [55]:
val Array(trainDataset, testDataset) = dataset.randomSplit(Array(0.8, 0.2))

trainDataset = [features: vector, label: double]
testDataset = [features: vector, label: double]


[features: vector, label: double]

In [56]:
trainDataset.count

23863

## 9.3 Make models
Here we going to make three different models:
* Logistic regression
* Decision tree
* Random forest  
Before we make any model, we divide the data into `trainSet` and `testSet` with the proportion of 0.8:0.2.

The attibutes of dataset used for all three models are following:  
* Continuous features: `SEX`, `EDUCATION`, `MARRIAGE`   
* Categorical features: `LIMIT_BAL`, `AGE`, `PAY_0`, `PAY_2`, `PAY_3`, `PAY_4`, `PAY_5`, `PAY_6`, `BILL_AMT1`, `BILL_AMT2`, `BILL_AMT3`, `BILL_AMT4`, `BILL_AMT5`, `BILL_AMT6`, `PAY_AMT1`, `PAY_AMT2`, `PAY_AMT3`, `PAY_AMT4`, `PAY_AMT5`, `PAY_AMT6` . 
* Size of the `features` vector: 31
* Number of traing data: 24000
* Number of test data: 6000

### 9.3.1 Logistic regression
We use `ParamGridBuilder` to tune hyperparameter `maxIter`, `regParam`, `elasticNetParam`.

In [57]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val lr = new LogisticRegression()//.setMaxIter(100).setRegParam(0.0001).setElasticNetParam(0.8)
val paramGrid = new ParamGridBuilder().addGrid(lr.maxIter,Array(10,100,1000))
                .addGrid(lr.regParam,Array(0.001,0.1,0.3))
                .addGrid(lr.elasticNetParam,Array(0.2,0.5,0.8))
                .build()
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")//we will use Area Under ROC as the evaluation metric. This metric is applicable when one class (label) dominates the other. 
val cv = new CrossValidator().setEstimator(lr).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainDataset)

val Predictions = cvModel.transform(testDataset)
val AUC = evaluator.evaluate(Predictions)
println(s"AUC: $AUC")


val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accurary = evaluator2.evaluate(Predictions)
println(s"accurary: $accurary")

//val trainingSummary = lrModel.binarySummary
//println(s"threshold:${lrModel.getThreshold}")
//println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
//println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")
//println(s"accuracy: ${trainingSummary.accuracy}")
//println(s"weightedFMeasure: ${trainingSummary.weightedFMeasure}")
//println(s"precisionByLabel: ${trainingSummary.precisionByLabel.mkString(" and ")}")//se the mkString method to print and specify the separator between precision for each label.
//val objectiveHistory = trainingSummary.objectiveHistory
//println("objectiveHistory:")
//objectiveHistory.foreach(loss => println(loss)) //objective function (scaled loss + regularization) at each iteration.

AUC: 0.7291466302796468
accurary: 0.8161968388463419


lr = logreg_a46f863c7dca
paramGrid = 


Array({
	logreg_a46f863c7dca-elasticNetParam: 0.2,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.001
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.2,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.1
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.2,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.3
}, {
...


[{
	logreg_a46f863c7dca-elasticNetParam: 0.2,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.001
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.2,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.1
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.2,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.3
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.5,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.001
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.5,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.1
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.5,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.3
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.8,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.001
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.8,
	logreg_a46f863c7dca-maxIter: 10,
	logreg_a46f863c7dca-regParam: 0.1
}, {
	logreg_a46f863c7dca-elasticNetParam: 0.8,
	logreg_a46f

### 9.3.2 Decision tree
We use `ParamGridBuilder` to tune hyperparameter `maxDepth`, `impurity`.

In [58]:
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val dt = new DecisionTreeClassifier()//.setMaxDepth(30)
//val dtModel = dt.fit(trainDataset)
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
val paramGrid = new ParamGridBuilder().addGrid(dt.maxDepth,Array(10,20,30)).addGrid(dt.impurity,Array("entropy","gini")).build()
val cv = new CrossValidator().setEstimator(dt).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainDataset)

val Predictions = cvModel.transform(testDataset)
val AUC = evaluator.evaluate(Predictions)
println(s"AUC: $AUC")

val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accurary = evaluator2.evaluate(Predictions)
println(s"accurary: $accurary")

AUC: 0.6622839128593611
accurary: 0.7471077073488676


dt = dtc_1dfc5dd7d417
evaluator = binEval_422684dcc23a
paramGrid = 


Array({
	dtc_1dfc5dd7d417-impurity: entropy,
	dtc_1dfc5dd7d417-maxDepth: 10
}, {
	dtc_1dfc5dd7d417-impurity: gini,
	dtc_1dfc5dd7d417-maxDepth: 10
}, {
	dtc_1dfc5dd7d417-impurity: entropy,
	dtc_1dfc5dd7d417-maxDepth: 20
}, {
	dtc_1dfc5dd7d417-impurity: gini,
	dt...


[{
	dtc_1dfc5dd7d417-impurity: entropy,
	dtc_1dfc5dd7d417-maxDepth: 10
}, {
	dtc_1dfc5dd7d417-impurity: gini,
	dtc_1dfc5dd7d417-maxDepth: 10
}, {
	dtc_1dfc5dd7d417-impurity: entropy,
	dtc_1dfc5dd7d417-maxDepth: 20
}, {
	dtc_1dfc5dd7d417-impurity: gini,
	dtc_1dfc5dd7d417-maxDepth: 20
}, {
	dtc_1dfc5dd7d417-impurity: entropy,
	dtc_1dfc5dd7d417-maxDepth: 30
}, {
	dtc_1dfc5dd7d417-impurity: gini,
	dtc_1dfc5dd7d417-maxDepth: 30
}]

### 9.3.3 Random forest 
We use `ParamGridBuilder` to tune hyperparameter `maxDepth`, `numTrees`.

In [59]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val rf = new RandomForestClassifier()//.setNumTrees(50).setMaxDepth(30)
//val rfModel = rf.fit(trainDataset)
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth,Array(10,20,30)).addGrid(rf.numTrees,Array(25,50,100)).build()
val cv = new CrossValidator().setEstimator(rf).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainDataset)

val Predictions = cvModel.transform(testDataset)
val AUC = evaluator.evaluate(Predictions)
println(s"AUC: $AUC")

val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accurary = evaluator2.evaluate(Predictions)
println(s"accurary: $accurary")

AUC: 0.7877902727457997
accurary: 0.8259736027374939


rf = rfc_af60664d385d
evaluator = binEval_b53427780732
paramGrid = 


Array({
	rfc_af60664d385d-maxDepth: 10,
	rfc_af60664d385d-numTrees: 25
}, {
	rfc_af60664d385d-maxDepth: 10,
	rfc_af60664d385d-numTrees: 50
}, {
	rfc_af60664d385d-maxDepth: 10,
	rfc_af60664d385d-numTrees: 100
}, {
	rfc_af60664d385d-maxDepth: 20,
	rfc_af60664d385...


[{
	rfc_af60664d385d-maxDepth: 10,
	rfc_af60664d385d-numTrees: 25
}, {
	rfc_af60664d385d-maxDepth: 10,
	rfc_af60664d385d-numTrees: 50
}, {
	rfc_af60664d385d-maxDepth: 10,
	rfc_af60664d385d-numTrees: 100
}, {
	rfc_af60664d385d-maxDepth: 20,
	rfc_af60664d385d-numTrees: 25
}, {
	rfc_af60664d385d-maxDepth: 20,
	rfc_af60664d385d-numTrees: 50
}, {
	rfc_af60664d385d-maxDepth: 20,
	rfc_af60664d385d-numTrees: 100
}, {
	rfc_af60664d385d-maxDepth: 30,
	rfc_af60664d385d-numTrees: 25
}, {
	rfc_af60664d385d-maxDepth: 30,
	rfc_af60664d385d-numTrees: 50
}, {
	rfc_af60664d385d-maxDepth: 30,
	rfc_af60664d385d-numTrees: 100
}]

## 9.4 Improvement
#### What more would you do with this data? Anything to help you devise a better solution?
- Treating PAY_0 - PAY_6 as categorical features besides SEX, EDUCATION, MARRIAGE
- Adding interactions between numerical features
    * With degree 2, the total number of features is 330
    * With degree 3, the total number of features is 2358 
- Adding new features by intepreting features  
    * `Total expense in 5 months` = `bill statement in one momth` - `bill statement in previous month` + `payment in previous month`  
    * `Total payment in 5 months` = `PAY_AMT1` + `PAY_AMT2` + `PAY_AMT3` + `PAY_AMT4` + `PAY_AMT5`  
    * `Repayment ability` = `Total expense in 5 months` / `Total payment in 5 months`  
    

In [1]:
import org.apache.spark.sql.functions._
val originalCC = spark.read.format("csv").option("inferSchema",true).option("header",true).load("data/ccdefault.csv")

originalCC = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

In [2]:
val addpay = originalCC.withColumn("PAY_0",when(col("PAY_0").lt(0),0).otherwise(col("PAY_0")))
val addpay2 = addpay.withColumn("PAY_2",when(col("PAY_2").lt(0),0).otherwise(col("PAY_2")))
val addpay3 = addpay2.withColumn("PAY_3",when(col("PAY_3").lt(0),0).otherwise(col("PAY_3")))
val addpay4 = addpay3.withColumn("PAY_4",when(col("PAY_4").lt(0),0).otherwise(col("PAY_4")))
val addpay5 = addpay4.withColumn("PAY_5",when(col("PAY_5").lt(0),0).otherwise(col("PAY_5")))
val addpay6 = addpay5.withColumn("PAY_6",when(col("PAY_6").lt(0),0).otherwise(col("PAY_6")))

addpay = [ID: int, LIMIT_BAL: int ... 23 more fields]
addpay2 = [ID: int, LIMIT_BAL: int ... 23 more fields]
addpay3 = [ID: int, LIMIT_BAL: int ... 23 more fields]
addpay4 = [ID: int, LIMIT_BAL: int ... 23 more fields]
addpay5 = [ID: int, LIMIT_BAL: int ... 23 more fields]
addpay6 = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

In [3]:
//
val credit1 = addpay6.withColumn("CON_1", $"BILL_AMT1" - $"BILL_AMT2" + $"PAY_AMT1") 
val credit2 = credit1.withColumn("CON_2", $"BILL_AMT2" - $"BILL_AMT3" + $"PAY_AMT2") 
val credit3 = credit2.withColumn("CON_3", $"BILL_AMT3" - $"BILL_AMT4" + $"PAY_AMT3") 
val credit4 = credit3.withColumn("CON_4", $"BILL_AMT4" - $"BILL_AMT5" + $"PAY_AMT4") 
val credit5 = credit4.withColumn("CON_5", $"BILL_AMT5" - $"BILL_AMT6" + $"PAY_AMT5") 
val credit6 = credit5.withColumn("TOTAL_CON", $"CON_1" + $"CON_2" + $"CON_3" + $"CON_4" + $"CON_5")
val credit7 = credit6.withColumn("TOTAL_PAY", $"PAY_AMT1" + $"PAY_AMT2" + $"PAY_AMT3" + $"PAY_AMT4" + $"PAY_AMT5")
val creditfinal = credit7.withColumn("REPAYMENT ABILITY", $"TOTAL_CON" / $"TOTAL_PAY")

credit1 = [ID: int, LIMIT_BAL: int ... 24 more fields]
credit2 = [ID: int, LIMIT_BAL: int ... 25 more fields]
credit3 = [ID: int, LIMIT_BAL: int ... 26 more fields]
credit4 = [ID: int, LIMIT_BAL: int ... 27 more fields]
credit5 = [ID: int, LIMIT_BAL: int ... 28 more fields]
credit6 = [ID: int, LIMIT_BAL: int ... 29 more fields]
credit7 = [ID: int, LIMIT_BAL: int ... 30 more fields]
creditfinal = [ID: int, LIMIT_BAL: int ... 31 more fields]


[ID: int, LIMIT_BAL: int ... 31 more fields]

### data preprocessing & feature engineering
From above eplorations, we conclude that all data is numerical(all Int) and there is no NULL.  
`featuresCols`: Array containing all features  
`catCols`: categorical features (Treat a feature with less than 8 distinct values as categorical.)   
`numCols`: numerical features  

We will drop the `ID` column as it is irrelevant for prediction `DEFAULT`.

In [4]:
val featureCols = creditfinal.columns.filter( _ != "ID").filter( _ != "DEFAULT")

featureCols = Array(LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6, CON_1, CON_2, CON_3, CON_4, CON_5, TOTAL_CON, TOTAL_PAY, REPAYMENT ABILITY)


[LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6, CON_1, CON_2, CON_3, CON_4, CON_5, TOTAL_CON, TOTAL_PAY, REPAYMENT ABILITY]

In [5]:
val catCols = featureCols.filter(creditfinal.select(_).distinct().count < 12)

catCols = Array(SEX, EDUCATION, MARRIAGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)


[SEX, EDUCATION, MARRIAGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6]

In [6]:
val numCols = featureCols.filter( _ != "SEX").filter( _ != "EDUCATION").filter( _ != "MARRIAGE").filter( _ != "PAY_0").filter( _ != "PAY_2").filter( _ != "PAY_3").filter( _ != "PAY_4").filter( _ != "PAY_5").filter( _ != "PAY_6")

numCols = Array(LIMIT_BAL, AGE, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6, CON_1, CON_2, CON_3, CON_4, CON_5, TOTAL_CON, TOTAL_PAY, REPAYMENT ABILITY)


[LIMIT_BAL, AGE, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6, CON_1, CON_2, CON_3, CON_4, CON_5, TOTAL_CON, TOTAL_PAY, REPAYMENT ABILITY]

### Prepare continuous features

Add a new column assembling all numerical features into an vector `numericalFeatures`.  
And declare a `StandardScaler` estimator.

In [7]:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer().setStrategy("median").setInputCols(Array("REPAYMENT ABILITY")).setOutputCols(Array("REPAYMENT ABILITY"))
val va = new VectorAssembler().setInputCols(numCols).setOutputCol("numericalFeatures")
val scaler = new StandardScaler().setInputCol("numericalFeatures").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(false)

//val va2 = new VectorAssembler().setInputCols(catCols).setOutputCol("categoricalFeatures")
//val output = va.transform(originalCC)
//val output2 = va2.transform(output1)
//output2.printSchema

imputer = imputer_d574c60d28f7
va = vecAssembler_893c43863fe1
scaler = stdScal_f35d79b911d0


stdScal_f35d79b911d0

In [8]:
import org.apache.spark.ml.feature.PolynomialExpansion

val polyExpansion = new PolynomialExpansion()
  .setInputCol("scaledFeatures")
  .setOutputCol("polyScaledFeatures")
  .setDegree(2)


polyExpansion = poly_73cbb1d8ea8a


poly_73cbb1d8ea8a

### Prepare categorical features

In this dataset, we have ordinal variables like education and also nominal variables like sex. We will use One-Hot Encoding to convert all categorical variables into binary vectors. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features.  
`catCols` = Array(SEX, EDUCATION, MARRIAGE)
`encodedCatCols` = Array(encodedSEX, encodedEDUCATION, encodedMARRIAGE)

In [9]:
import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encodedCatCols = Array("encodedSEX", "encodedEDUCATION", "encodedMARRIAGE","encodedPAY_0","encodedPAY_2","encodedPAY_3","encodedPAY_4","encodedPAY_5","encodedPAY_6")
val encoding = new OneHotEncoderEstimator().setInputCols(catCols).setOutputCols(encodedCatCols)
//val output2 = encoding.fit(scaledCC).transform(scaledCC)
val va2 = new VectorAssembler().setInputCols(encodedCatCols).setOutputCol("encodedFeatures")
//val output3 = va2.transform(output2)

//Put `scaledFeatures` and `encodedFeatures` into one vector `features`
val va3 = new VectorAssembler().setInputCols(Array("polyScaledFeatures","encodedFeatures")).setOutputCol("features")
//val output5 = va3.transform(output4)

encodedCatCols = Array(encodedSEX, encodedEDUCATION, encodedMARRIAGE, encodedPAY_0, encodedPAY_2, encodedPAY_3, encodedPAY_4, encodedPAY_5, encodedPAY_6)
encoding = oneHotEncoder_232128280b62
va2 = vecAssembler_2162bd8a6eeb
va3 = vecAssembler_302a96643a06


vecAssembler_302a96643a06

### Pipeline
Since there are multiple estimators and transformations, we use a Pipeline to tie the stages together. This will simplify our code.

In [10]:
import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline().setStages(Array(imputer,va,scaler,polyExpansion,encoding,va2,va3))
val output = pipeline.fit(creditfinal).transform(creditfinal)
val processedCC = output.withColumn("label",output("DEFAULT").cast("double"))

pipeline = pipeline_dd01b5e1b43f
output = [ID: int, LIMIT_BAL: int ... 45 more fields]
processedCC = [ID: int, LIMIT_BAL: int ... 46 more fields]


[ID: int, LIMIT_BAL: int ... 46 more fields]

In [11]:
val dataset = processedCC.select("features","label")
dataset.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(334,[0,1,2,3,4,5...|  1.0|
|[0.92487215993365...|  1.0|
|[0.69365411995024...|  0.0|
|[0.38536339997235...|  0.0|
|[0.38536339997235...|  0.0|
+--------------------+-----+
only showing top 5 rows



dataset = [features: vector, label: double]


[features: vector, label: double]

In [12]:
dataset.select("features").head

[(334,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,54,55,56,57,58,59,64,119,120,121,122,123,124,129,134,135,136,137,138,139,140,145,150,151,152,153,154,155,156,157,162,167,168,169,209,210,211,212,213,214,219,224,225,226,229,230,231,232,233,234,235,240,245,246,247,250,251,252,253,254,255,256,257,262,267,268,269,272,273,274,279,284,288,296,302,310,318,326],[0.15414535998894324,0.023760792006120905,2.603628744963987,0.4013372901700343,6.778882641602746,0.053139869207970765,0.008191264268828014,0.13835649097349934,0.0028238456994402394,0.0435834725779124,0.0067181900700905425,0.1134751820092024,0.002316020032419445,0.0018995190819496419,0.009935199510232218,0.001531464905066718,0.025867571031792727,5.279552025288353E-4,4.3301049541029455E-4,9.87081893081185E-5,0.029903384202815683,0.0046094679228307015,0.07785731068215292,0.0015890619254133244,0.001303293325390196,2.970960880861002E-4,8.942123867812065E-4,0.03740481386767184,0.005765778498951692,0.09738824858589797,0.0019876869166765

In [13]:
val Array(trainDataset, testDataset) = dataset.randomSplit(Array(0.8, 0.2))

trainDataset = [features: vector, label: double]
testDataset = [features: vector, label: double]


[features: vector, label: double]

In [14]:
trainDataset.count

23916

### 9.4.1 Logistic regression


In [74]:

//!!!!!!!!!!!!WITHOUT USING CROSSVALIDATION, JUST FOR TEST!!!!!!!!!!!!!!!!!!

import org.apache.spark.ml.classification.LogisticRegression
//import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
//import org.apache.spark.ml.tuning.CrossValidator
val lr = new LogisticRegression().setMaxIter(1000).setRegParam(0.001).setElasticNetParam(0.8)
val lrModel = lr.fit(trainDataset)

val predictions = lrModel.transform(testDataset)

val evaluator1 = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val AUC = evaluator1.evaluate(predictions)
val accurary = evaluator2.evaluate(predictions)
println(s"AUC: $AUC")
println(s"accurary: $accurary")

AUC: 0.7723273435808704
accurary: 0.8176312247644684


lr = logreg_cf615b854591
lrModel = LogisticRegressionModel: uid = logreg_cf615b854591, numClasses = 2, numFeatures = 334
predictions = [features: vector, label: double ... 3 more fields]
evaluator1 = binEval_d847f712d341
evaluator2 = mcEval_97a2b31ae66f
AUC = 0.7723273435808704
accurary = 0.8176312247644...


0.8176312247644684

In [75]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val lr = new LogisticRegression()//.setMaxIter(100).setRegParam(0.0001).setElasticNetParam(0.8)
val paramGrid = new ParamGridBuilder().addGrid(lr.maxIter,Array(10,100,1000))
                .addGrid(lr.regParam,Array(0.001,0.1,0.3))
                .addGrid(lr.elasticNetParam,Array(0.2,0.5,0.8))
                .build()
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")//we will use Area Under ROC as the evaluation metric. This metric is applicable when one class (label) dominates the other. 
val cv = new CrossValidator().setEstimator(lr).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainDataset)

val Predictions = cvModel.transform(testDataset)
val AUC = evaluator.evaluate(Predictions)
println(s"AUC: $AUC")

val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accurary = evaluator2.evaluate(Predictions)
println(s"accurary: $accurary")

//val trainingSummary = lrModel.binarySummary
//println(s"threshold:${lrModel.getThreshold}")
//println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
//println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")
//println(s"accuracy: ${trainingSummary.accuracy}")
//println(s"weightedFMeasure: ${trainingSummary.weightedFMeasure}")
//println(s"precisionByLabel: ${trainingSummary.precisionByLabel.mkString(" and ")}")//se the mkString method to print and specify the separator between precision for each label.
//val objectiveHistory = trainingSummary.objectiveHistory
//println("objectiveHistory:")
//objectiveHistory.foreach(loss => println(loss)) //objective function (scaled loss + regularization) at each iteration.

AUC: 0.772468516014658
accurary: 0.816621803499327


lr = logreg_5ed8b99d4da5
paramGrid = 


Array({
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 10,
	logreg_5ed8b99d4da5-regParam: 0.001
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 10,
	logreg_5ed8b99d4da5-regParam: 0.1
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 10,
	logreg_5ed8b99d4da5-regParam: 0.3
}, {
...


[{
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 10,
	logreg_5ed8b99d4da5-regParam: 0.001
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 10,
	logreg_5ed8b99d4da5-regParam: 0.1
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 10,
	logreg_5ed8b99d4da5-regParam: 0.3
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 100,
	logreg_5ed8b99d4da5-regParam: 0.001
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 100,
	logreg_5ed8b99d4da5-regParam: 0.1
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 100,
	logreg_5ed8b99d4da5-regParam: 0.3
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 1000,
	logreg_5ed8b99d4da5-regParam: 0.001
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logreg_5ed8b99d4da5-maxIter: 1000,
	logreg_5ed8b99d4da5-regParam: 0.1
}, {
	logreg_5ed8b99d4da5-elasticNetParam: 0.2,
	logr

### 9.4.2 Decision tree

In [76]:
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val dt = new DecisionTreeClassifier()//.setMaxDepth(30)
//val dtModel = dt.fit(trainDataset)
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
val paramGrid = new ParamGridBuilder().addGrid(dt.maxDepth,Array(10,20,30)).addGrid(dt.impurity,Array("entropy","gini")).build()
val cv = new CrossValidator().setEstimator(dt).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainDataset)

val Predictions = cvModel.transform(testDataset)
val AUC = evaluator.evaluate(Predictions)
println(s"AUC: $AUC")

val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accurary = evaluator2.evaluate(Predictions)
println(s"accurary: $accurary")


AUC: 0.6361871188323862
accurary: 0.7269515477792732


dt = dtc_06faaf0e8b31
evaluator = binEval_63c32dcc2722
paramGrid = 


Array({
	dtc_06faaf0e8b31-impurity: entropy,
	dtc_06faaf0e8b31-maxDepth: 10
}, {
	dtc_06faaf0e8b31-impurity: gini,
	dtc_06faaf0e8b31-maxDepth: 10
}, {
	dtc_06faaf0e8b31-impurity: entropy,
	dtc_06faaf0e8b31-maxDepth: 20
}, {
	dtc_06faaf0e8b31-impurity: gini,
	dt...


[{
	dtc_06faaf0e8b31-impurity: entropy,
	dtc_06faaf0e8b31-maxDepth: 10
}, {
	dtc_06faaf0e8b31-impurity: gini,
	dtc_06faaf0e8b31-maxDepth: 10
}, {
	dtc_06faaf0e8b31-impurity: entropy,
	dtc_06faaf0e8b31-maxDepth: 20
}, {
	dtc_06faaf0e8b31-impurity: gini,
	dtc_06faaf0e8b31-maxDepth: 20
}, {
	dtc_06faaf0e8b31-impurity: entropy,
	dtc_06faaf0e8b31-maxDepth: 30
}, {
	dtc_06faaf0e8b31-impurity: gini,
	dtc_06faaf0e8b31-maxDepth: 30
}]

### 9.4.3 Random forest 

In [15]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val rf = new RandomForestClassifier()//.setNumTrees(50).setMaxDepth(30)
val evaluator = new BinaryClassificationEvaluator().setMetricName("areaUnderROC")
val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth,Array(10,20,30)).addGrid(rf.numTrees,Array(25,50,100)).build()
val cv = new CrossValidator().setEstimator(rf).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainDataset)

val Predictions = cvModel.transform(testDataset)
val AUC = evaluator.evaluate(Predictions)
println(s"AUC: $AUC")

val evaluator2 = new MulticlassClassificationEvaluator().setMetricName("accuracy")
val accurary = evaluator2.evaluate(Predictions)
println(s"accurary: $accurary")

AUC: 0.7711840121167566
accurary: 0.8088428665351742


rf = rfc_bfc24d4b2734
evaluator = binEval_bc73c657cb14
paramGrid = 


Array({
	rfc_bfc24d4b2734-maxDepth: 10,
	rfc_bfc24d4b2734-numTrees: 25
}, {
	rfc_bfc24d4b2734-maxDepth: 10,
	rfc_bfc24d4b2734-numTrees: 50
}, {
	rfc_bfc24d4b2734-maxDepth: 10,
	rfc_bfc24d4b2734-numTrees: 100
}, {
	rfc_bfc24d4b2734-maxDepth: 20,
	rfc_bfc24d4b273...


[{
	rfc_bfc24d4b2734-maxDepth: 10,
	rfc_bfc24d4b2734-numTrees: 25
}, {
	rfc_bfc24d4b2734-maxDepth: 10,
	rfc_bfc24d4b2734-numTrees: 50
}, {
	rfc_bfc24d4b2734-maxDepth: 10,
	rfc_bfc24d4b2734-numTrees: 100
}, {
	rfc_bfc24d4b2734-maxDepth: 20,
	rfc_bfc24d4b2734-numTrees: 25
}, {
	rfc_bfc24d4b2734-maxDepth: 20,
	rfc_bfc24d4b2734-numTrees: 50
}, {
	rfc_bfc24d4b2734-maxDepth: 20,
	rfc_bfc24d4b2734-numTrees: 100
}, {
	rfc_bfc24d4b2734-maxDepth: 30,
	rfc_bfc24d4b2734-numTrees: 25
}, {
	rfc_bfc24d4b2734-maxDepth: 30,
	rfc_bfc24d4b2734-numTrees: 50
}, {
	rfc_bfc24d4b2734-maxDepth: 30,
	rfc_bfc24d4b2734-numTrees: 100
}]

## 9.5  Summary
Here is the summary for each model.
#### Logistic regression
Original logistic regreesion model gives us ability to predict default with 0.726 AUC, 0.816 accuracy.       
Improved logistic regreesion model gives us ability to predict default with 0.772 AUC, 0.818 accuracy.  
In this dataset, all features are expressed as numerical. However, we regard `SEX`, `EDUCATION`, `MARRIAGE`, `PAY_0`, `PAY_2`, `PAY_3`, `PAY_4`, `PAY_5`, `PAY_6` as categorical values because comparing their numerical values directly are meaningless. As logistic regression expects continuous features, so we used one-hot encoding maps those categorical features, represented as label indexes.

#### Decision tree
Original Decision tree model gives us ability to predict default with 0.623 AUC, 0.722 accuracy.      
Improved Decision tree model gives us ability to predict default with 0.63 AUC, 0.727 accuracy.      
Decision trees is a non-linear classifier. It will partition the feature space into half-spaces using axis-aliged linear decision boundaries. Compared to logistic regressions, both continuous and categorical features are handled well with decision tree models. And it is very fast as well. Decision trees are prone to overfitting, especially when the depth of the tree is particularly big. 

#### Random forest  
Original Decision tree model gives us ability to predict default with 0.787 AUC, 0.826 accuracy.          
Improved Decision tree model gives us ability to predict default with 0.771 AUC, 0.809 accuracy.       
A random forest is simply a collection of decision trees. Random forests mitigate the problem of easy-overfitting with decision trees. Because it is training on different samples of the data with subsets of featues. If we use many trees in our forest, eventually many or all of our features will have been included. This inclusion of many features will help limit the error due to bias and variance.