# Machine Learning With Spark ML
In this lab assignment, you will complete a project by going through the following steps:
1. Get the data.
2. Discover the data to gain insights.
3. Prepare the data for Machine Learning algorithms.
4. Select a model and train it.
5. Fine-tune your model.
6. Present your solution.

As a dataset, we use the California Housing Prices dataset from the StatLib repository. This dataset was based on data from the 1990 California census. The dataset has the following columns
1. `longitude`: a measure of how far west a house is (a higher value is farther west)
2. `latitude`: a measure of how far north a house is (a higher value is farther north)
3. `housing_,median_age`: median age of a house within a block (a lower number is a newer building)
4. `total_rooms`: total number of rooms within a block
5. `total_bedrooms`: total number of bedrooms within a block
6. `population`: total number of people residing within a block
7. `households`: total number of households, a group of people residing within a home unit, for a block
8. `median_income`: median income for households within a block of houses
9. `median_house_value`: median house value for households within a block
10. `ocean_proximity`: location of the house w.r.t ocean/sea

---
# 1. Get the data
Let's start the lab by loading the dataset. The can find the dataset at `data/housing.csv`. To infer column types automatically, when you are reading the file, you need to set `inferSchema` to true. Moreover enable the `header` option to read the columns' name from the file.

In [1]:
// TODO: Replace <FILL IN> with appropriate code

val housing = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("housing.csv")



housing = [longitude: double, latitude: double ... 8 more fields]


[longitude: double, latitude: double ... 8 more fields]

---
# 2. Discover the data to gain insights
Now it is time to take a look at the data. In this step we are going to take a look at the data a few different ways:
* See the schema and dimension of the dataset
* Look at the data itself
* Statistical summary of the attributes
* Breakdown of the data by the categorical attribute variable
* Find the correlation among different attributes
* Make new attributes by combining existing attributes

## 2.1. Schema and dimension
Print the schema of the dataset

In [3]:
// TODO: Replace <FILL IN> with appropriate code

housing.schema.printTreeString
// or
housing.printSchema

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



Print the number of records in the dataset.

In [4]:
// TODO: Replace <FILL IN> with appropriate code

housing.count()

20640

## 2.2. Look at the data
Print the first five records of the dataset.

In [5]:
// TODO: Replace <FILL IN> with appropriate code

housing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Print the number of records with population more than 10000.

In [6]:
// TODO: Replace <FILL IN> with appropriate code

housing.filter(housing("population") > 10000).count()  //We need to convert the first row into either labels or completly remove them

23

## 2.3. Statistical summary
Print a summary of the table statistics for the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. You can use the `describe` command.

In [7]:
// TODO: Replace <FILL IN> with appropriate code

housing.describe("housing_median_age","total_rooms","median_house_value","population").show()

+-------+------------------+------------------+------------------+------------------+
|summary|housing_median_age|       total_rooms|median_house_value|        population|
+-------+------------------+------------------+------------------+------------------+
|  count|             20640|             20640|             20640|             20640|
|   mean|28.639486434108527|2635.7630813953488|206855.81690891474|1425.4767441860465|
| stddev| 12.58555761211163|2181.6152515827944|115395.61587441359|  1132.46212176534|
|    min|               1.0|               2.0|           14999.0|               3.0|
|    max|              52.0|           39320.0|          500001.0|           35682.0|
+-------+------------------+------------------+------------------+------------------+



Print the maximum age (`housing_median_age`), the minimum number of rooms (`total_rooms`), and the average of house values (`median_house_value`).

In [8]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.sql.functions._

housing.agg(max(housing("housing_median_age")), min(housing("total_rooms")), avg(housing("median_house_value"))).show()

//later we found that housing("some_column") can also be written as $"some_column"

+-----------------------+----------------+-----------------------+
|max(housing_median_age)|min(total_rooms)|avg(median_house_value)|
+-----------------------+----------------+-----------------------+
|                   52.0|             2.0|     206855.81690891474|
+-----------------------+----------------+-----------------------+



## 2.4. Breakdown the data by categorical data
Print the number of houses in different areas (`ocean_proximity`), and sort them in descending order.

In [9]:
// TODO: Replace <FILL IN> with appropriate code
housing.groupBy(housing("ocean_proximity")).count().sort($"count".desc).show() // first, grouped based on ocean proximity and sorted on the count
//housing.sort($"population".desc).groupBy(housing("ocean_proximity"))  //sorted based on population (our choice). then grouped based on ocean proximitty

+---------------+-----+
|ocean_proximity|count|
+---------------+-----+
|      <1H OCEAN| 9136|
|         INLAND| 6551|
|     NEAR OCEAN| 2658|
|       NEAR BAY| 2290|
|         ISLAND|    5|
+---------------+-----+



Print the average value of the houses (`median_house_value`) in different areas (`ocean_proximity`), and call the new column `avg_value` when print it.

In [10]:
// TODO: Replace <FILL IN> with appropriate code

housing.groupBy($"ocean_proximity").agg(mean($"median_house_value").alias("avg_value")).show

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



Rewrite the above question in SQL.

In [11]:
// TODO: Replace <FILL IN> with appropriate code

housing.createOrReplaceTempView("df")
spark.sql("""
SELECT ocean_proximity, AVG(median_house_value) AS avg_value 
FROM df 
GROUP BY ocean_proximity""").show()

+---------------+------------------+
|ocean_proximity|         avg_value|
+---------------+------------------+
|         ISLAND|          380440.0|
|     NEAR OCEAN|249433.97742663656|
|       NEAR BAY|259212.31179039303|
|      <1H OCEAN|240084.28546409807|
|         INLAND|124805.39200122119|
+---------------+------------------+



## 2.5. Correlation among attributes
Print the correlation among the attributes `housing_median_age`, `total_rooms`, `median_house_value`, and `population`. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, you can use the `VectorAssembler` Transformer.

In [12]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.VectorAssembler

val va = new VectorAssembler().setInputCols(Array("housing_median_age", "total_rooms", "population", "median_house_value")).setOutputCol("features")

val housingAttrs = va.transform(housing)

housingAttrs.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,322.0...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,2401...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,496....|
|  -122.25|   37.85|              52.0|     12

va = vecAssembler_c947cae9940c
housingAttrs = [longitude: double, latitude: double ... 9 more fields]


[longitude: double, latitude: double ... 9 more fields]

In [13]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val Row(coeff: Matrix) = Correlation.corr(housingAttrs, "features").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                   -0.36126220122231784  -0.2962442397735293   0.10562341249318154   
-0.36126220122231784  1.0                   0.8571259728659772    0.13415311380654338   
-0.2962442397735293   0.8571259728659772    1.0                   -0.02464967888891235  
0.10562341249318154   0.13415311380654338   -0.02464967888891235  1.0                   


coeff = 


1.0                   -0.36126220122231784  -0.2962442397735293   0.10562341249318154
-0.36126220122231784  1.0                   0.8571259728659772    0.13415311380654338
-0.2962442397735293   0.8571259728659772    1.0                   -0.02464967888891235
0.10562341249318154   0.13415311380654338   -0.02464967888891235  1.0


## 2.6. Combine and make new attributes
Now, let's try out various attribute combinations. In the given dataset, the total number of rooms in a block is not very useful, if we don't know how many households there are. What we really want is the number of rooms per household. Similarly, the total number of bedrooms by itself is not very useful, and we want to compare it to the number of rooms. And the population per household seems like also an interesting attribute combination to look at. To do so, add the three new columns to the dataset as below. We will call the new dataset the `housingExtra`.
```
rooms_per_household = total_rooms / households
bedrooms_per_room = total_bedrooms / total_bedrooms
population_per_household = population / households
```

In [14]:
// TODO: Replace <FILL IN> with appropriate code

val housingCol1 = housing.withColumn("rooms_per_household", housing.col("total_rooms") / housing.col("households"))
val housingCol2 = housingCol1.withColumn("bedrooms_per_room", housing.col("total_bedrooms") / housing.col("total_rooms"))
val housingExtra = housingCol2.withColumn("population_per_household", housing.col("population") / housing.col("households"))


housingExtra.select("rooms_per_household", "bedrooms_per_room", "population_per_household").show(5)

+-------------------+-------------------+------------------------+
|rooms_per_household|  bedrooms_per_room|population_per_household|
+-------------------+-------------------+------------------------+
|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  8.288135593220339|0.12951601908657123|      2.8022598870056497|
| 5.8173515981735155|0.18445839874411302|       2.547945205479452|
|  6.281853281853282| 0.1720958819913952|      2.1814671814671813|
+-------------------+-------------------+------------------------+
only showing top 5 rows



housingCol1 = [longitude: double, latitude: double ... 9 more fields]
housingCol2 = [longitude: double, latitude: double ... 10 more fields]
housingExtra = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

---
## 3. Prepare the data for Machine Learning algorithms
Before going through the Machine Learning steps, let's first rename the label column from `median_house_value` to `label`.

In [15]:
// TODO: Replace <FILL IN> with appropriate code

val renamedHousing = housingExtra.withColumnRenamed("median_house_value", "label")
renamedHousing.show(20)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.238137082601054|0.15579659106916466|       2.109841827768014|
|  -122.24|   37.85|              5

renamedHousing = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

Now, we want to separate the numerical attributes from the categorical attribute (`ocean_proximity`) and keep their column names in two different lists. Moreover, sice we don't want to apply the same transformations to the predictors (features) and the label, we should remove the label attribute from the list of predictors. 

In [16]:
// label columns
val colLabel = "label"

// categorical columns
val colCat = "ocean_proximity"

// numerical columns
val colNum = renamedHousing.columns.filter(_ != colLabel).filter(_ != colCat)

colLabel = label
colCat = ocean_proximity
colNum = Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household)


Array(longitude, latitude, housing_median_age, total_rooms, total_bedrooms, population, households, median_income, rooms_per_household, bedrooms_per_room, population_per_household)

## 3.1. Prepare continuse attributes
### Data cleaning
Most Machine Learning algorithms cannot work with missing features, so we should take care of them. As a first step, let's find the columns with missing values in the numerical attributes. To do so, we can print the number of missing values of each continues attributes, listed in `colNum`.

In [17]:
// TODO: Replace <FILL IN> with appropriate code

for (c <- colNum) {
   val x = renamedHousing.select(c).na.drop.count()
    val y = renamedHousing.select(c).count()
    println(c, ":", y-x)   
}

// or we may even use filter funtion to optimize the code
println("\n")

for (c <- colNum) {
    val count = renamedHousing.select(c).filter(renamedHousing(c).isNull).count
    println(s"$c: $count")
}

(longitude,:,0)
(latitude,:,0)
(housing_median_age,:,0)
(total_rooms,:,0)
(total_bedrooms,:,207)
(population,:,0)
(households,:,0)
(median_income,:,0)
(rooms_per_household,:,0)
(bedrooms_per_room,:,207)
(population_per_household,:,0)


longitude: 0
latitude: 0
housing_median_age: 0
total_rooms: 0
total_bedrooms: 207
population: 0
households: 0
median_income: 0
rooms_per_household: 0
bedrooms_per_room: 207
population_per_household: 0


As we observerd above, the `total_bedrooms` and `bedrooms_per_room` attributes have some missing values. One way to take care of missing values is to use the `Imputer` Transformer, which completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. To use it, you need to create an `Imputer` instance, specifying that you want to replace each attribute's missing values with the "median" of that attribute.

In [18]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.Imputer

val imputer = new Imputer().setStrategy("median").setInputCols(Array("total_bedrooms", "bedrooms_per_room"))
  .setOutputCols(Array("total_bedrooms", "bedrooms_per_room"))                            
val imputedHousing = imputer.fit(renamedHousing).transform(renamedHousing)
imputedHousing.select("total_bedrooms", "bedrooms_per_room").show(5)

+--------------+-------------------+
|total_bedrooms|  bedrooms_per_room|
+--------------+-------------------+
|         129.0|0.14659090909090908|
|        1106.0|0.15579659106916466|
|         190.0|0.12951601908657123|
|         235.0|0.18445839874411302|
|         280.0| 0.1720958819913952|
+--------------+-------------------+
only showing top 5 rows



imputer = imputer_c3df6ebfeacc
imputedHousing = [longitude: double, latitude: double ... 11 more fields]


[longitude: double, latitude: double ... 11 more fields]

### Scaling
One of the most important transformations you need to apply to your data is feature scaling. With few exceptions, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. This is the case for the housing data: the total number of rooms ranges from about 6 to 39,320, while the median incomes only range from 0 to 15. Note that scaling the label attribues is generally not required.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [19]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val va = new VectorAssembler().setInputCols(colNum).setOutputCol("imputed_features")
val featuredHousing = va.transform(imputedHousing)

val scaler = new StandardScaler().setInputCol("imputed_features").setOutputCol("scaledFeatures").setWithStd(true).setWithMean(true)
val scaledHousing = scaler.fit(featuredHousing).transform(featuredHousing)

scaledHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|    imputed_features|      scaledFeatures|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-122.23,37.88,41...|[-1.3278030546902...|
|  -122.22|   37.86|              21.0|     7099.0|        1

va = vecAssembler_5d41adbc3c53
featuredHousing = [longitude: double, latitude: double ... 12 more fields]
scaler = stdScal_2ea92f6e98a4
scaledHousing = [longitude: double, latitude: double ... 13 more fields]


[longitude: double, latitude: double ... 13 more fields]

## 3.2. Prepare categorical attributes
After imputing and scaling the continuse attributes, we should take care of the categorical attributes. Let's first print the number of distict values of the categirical attribute `ocean_proximity`.

In [160]:
// TODO: Replace <FILL IN> with appropriate code

renamedHousing.select(renamedHousing("ocean_proximity")).distinct.show

+---------------+
|ocean_proximity|
+---------------+
|         ISLAND|
|     NEAR OCEAN|
|       NEAR BAY|
|      <1H OCEAN|
|         INLAND|
+---------------+



### String indexer
Most Machine Learning algorithms prefer to work with numbers. So let's convert the categorical attribute `ocean_proximity` to numbers. To do so, we can use the `StringIndexer` that encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.

In [20]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer().setInputCol("ocean_proximity").setOutputCol("proximity_index")
val idxHousing = indexer.fit(renamedHousing).transform(renamedHousing)

idxHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|proximity_index|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|            3.0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|358500.0|       NEAR BAY|  6.238137082601054|0.155796591069164

indexer = strIdx_5aba0125e6cc
idxHousing = [longitude: double, latitude: double ... 12 more fields]


[longitude: double, latitude: double ... 12 more fields]

Now we can use this numerical data in any Machine Learning algorithm. You can look at the mapping that this encoder has learned using the `labels` method: "<1H OCEAN" is mapped to 0, "INLAND" is mapped to 1, etc.

In [21]:
indexer.fit(renamedHousing).labels

Array(<1H OCEAN, INLAND, NEAR OCEAN, NEAR BAY, ISLAND)

### One-hot encoding
Now, convert the label indices built in the last step into one-hot vectors. To do this, you can take advantage of the `OneHotEncoderEstimator` Estimator.

In [22]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.feature.OneHotEncoderEstimator

val encoder = new OneHotEncoderEstimator().setInputCols(Array("proximity_index")).setOutputCols(Array("proximity_vec"))
val ohHousing = encoder.fit(idxHousing).transform(idxHousing)

ohHousing.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+---------------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|proximity_index|proximity_vec|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+---------------+-------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|            3.0|(4,[3],[1.0])|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------

encoder = oneHotEncoder_914ce1946efe
ohHousing = [longitude: double, latitude: double ... 13 more fields]


[longitude: double, latitude: double ... 13 more fields]

---
# 4. Pipeline
As you can see, there are many data transformation steps that need to be executed in the right order. For example, you called the `Imputer`, `VectorAssembler`, and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `numPipeline` to call the numerical transformers you built above (`imputer`, `va`, and `scaler`) in the right order from left to right, as well as a pipeline called `catPipeline` to call the categorical transformers (`indexer` and `encoder`). Then, put these two pipelines `numPipeline` and `catPipeline` into one pipeline.

In [23]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

val numPipeline = new Pipeline().setStages(Array(imputer, va, scaler))
val catPipeline = new Pipeline().setStages(Array(indexer, encoder))
val pipeline = new Pipeline().setStages(Array(numPipeline, catPipeline))
val newHousing = pipeline.fit(renamedHousing).transform(renamedHousing)

newHousing.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+---------------+-------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|   label|ocean_proximity|rooms_per_household|  bedrooms_per_room|population_per_household|    imputed_features|      scaledFeatures|proximity_index|proximity_vec|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+--------+---------------+-------------------+-------------------+------------------------+--------------------+--------------------+---------------+-------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|452600.0|       NEAR BAY|  6.984126984126984|0.14659090909090908|      2.5555555555555554|[-122.23,37.8

numPipeline = pipeline_5efd819f8869
catPipeline = pipeline_65831ee3b073
pipeline = pipeline_04992102f117
newHousing = [longitude: double, latitude: double ... 15 more fields]


[longitude: double, latitude: double ... 15 more fields]

Now, use `VectorAssembler` to put all attributes of the final dataset `newHousing` into a big vector, and call the new column `features`.

In [24]:
// TODO: Replace <FILL IN> with appropriate code

val va2 = new VectorAssembler().setInputCols(Array("scaledFeatures", "proximity_vec")).setOutputCol("features")
val dataset = va2.transform(newHousing).select("features", "label")

dataset.show(1, false)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|features                                                                                                                                                                                                                                |label   |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|[-1.3278030546902004,1.0525227849496404,0.9821188656747666,-0.804799599801809,-0.9724529206674336,-0.9744049915469923,-0.977009185045236,2.3447089561176147,0.6285442264151613,-1.149894671563979,-0.049595334664572825,0.0,0.0,0.0,1.0]|452600.0|
+-----------------------

va2 = vecAssembler_5b89666e9cf2
dataset = [features: vector, label: double]


[features: vector, label: double]

---
# 5. Make a model
Here we going to make four different regression models:
* Linear regression model
* Decission tree regression
* Random forest regression
* Gradient-booster forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [25]:
// TODO: Replace <FILL IN> with appropriate code

val Array(trainSet, testSet) = dataset.randomSplit(Array[Double](0.8, 0.2), 18)

trainSet = [features: vector, label: double]
testSet = [features: vector, label: double]


[features: vector, label: double]

## 5.1. Linear regression model
Now, train a Linear Regression model using the `LinearRegression` class. Then, print the coefficients and intercept of the model, as well as the summary of the model over the training set by calling the `summary` method.

In [26]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.LinearRegression

// train the model
val lr = new LinearRegression().setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8).setLabelCol("label").setFeaturesCol("features")
val lrModel = lr.fit(trainSet)
val trainingSummary = lrModel.summary

println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

Coefficients: [-15608.050787777065,-12563.793678972797,14914.735338418312,9787.971027892929,13092.174914472274,-32044.72554182368,14098.864615585388,83258.12601013694,2830.188332018626,19733.842477244772,-914.7312259083376,17888.071781180028,-37052.75155671972,32329.640824988346,18595.815548960676] Intercept: 204169.62060568325
RMSE: 68665.04620577405


lr = linReg_3e6f6f111a3a
lrModel = linReg_3e6f6f111a3a
trainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@33fdfbb9


org.apache.spark.ml.regression.LinearRegressionTrainingSummary@33fdfbb9

Now, use `RegressionEvaluator` to measure the root-mean-square-erroe (RMSE) of the model on the test dataset.

In [27]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.evaluation.RegressionEvaluator

// make predictions on the test data
val predictions = lrModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|147632.00472499238|66900.0|[-2.3160579331997...|
|134669.48648116886|74600.0|[-2.3060755606895...|
| 175103.9735014104|67000.0|[-2.3010843744344...|
|  163072.459815291|81300.0|[-2.2960931881793...|
|155581.54840993893|68300.0|[-2.2960931881793...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 70214.35257522072


predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_a66fa7c94af7
rmse = 70214.35257522072


70214.35257522072

## 5.2. Decision tree regression
Repeat what you have done on Regression Model to build a Decision Tree model. Use the `DecisionTreeRegressor` to make a model and then measure its RMSE on the test dataset.

In [28]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("features")

// train the model
val dtModel = dt.fit(trainSet)

// make predictions on the test data
val predictions = dtModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|144108.78665785998|66900.0|[-2.3160579331997...|
|130220.91503267974|74600.0|[-2.3060755606895...|
|144108.78665785998|67000.0|[-2.3010843744344...|
|144108.78665785998|81300.0|[-2.2960931881793...|
|130220.91503267974|68300.0|[-2.2960931881793...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 67392.90965263039


dt = dtr_824be247e9c1
dtModel = DecisionTreeRegressionModel (uid=dtr_824be247e9c1) of depth 5 with 63 nodes
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_f122502776e5
rmse = 67392.90965263039


67392.90965263039

## 5.3. Random forest regression
Let's try the test error on a Random Forest Model. Youcan use the `RandomForestRegressor` to make a Random Forest model.

In [30]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val rf = new RandomForestRegressor().setLabelCol("label").setFeaturesCol("features")

// train the model
val rfModel = rf.fit(trainSet)

// make predictions on the test data
val predictions = rfModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|155879.78918838827|66900.0|[-2.3160579331997...|
|150787.26734444703|74600.0|[-2.3060755606895...|
|146842.63420813056|67000.0|[-2.3010843744344...|
|150436.48746046683|81300.0|[-2.2960931881793...|
|149266.76631813834|68300.0|[-2.2960931881793...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 65752.90576315283


rf = rfr_03e457e9295b
rfModel = RandomForestRegressionModel (uid=rfr_03e457e9295b) with 20 trees
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_799d5533a1a9
rmse = 65752.90576315283


65752.90576315283

## 5.4. Gradient-boosted tree regression
Fianlly, we want to build a Gradient-boosted Tree Regression model and test the RMSE of the test data. Use the `GBTRegressor` to build the model.

In [184]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.regression.GBTRegressor
import org.apache.spark.ml.evaluation.RegressionEvaluator

val gb = new GBTRegressor().setLabelCol("label").setFeaturesCol("features")

// train the model
val gbModel = gb.fit(trainSet)

// make predictions on the test data
val predictions = gbModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

// select (prediction, true label) and compute test error
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+-----------------+-------+--------------------+
|       prediction|  label|            features|
+-----------------+-------+--------------------+
|89260.46412775086|66900.0|[-2.3160579331997...|
|78687.96573911922|74600.0|[-2.3060755606895...|
|81600.82945881764|67000.0|[-2.3010843744344...|
|82788.50612024088|81300.0|[-2.2960931881793...|
| 85171.5914324368|68300.0|[-2.2960931881793...|
+-----------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 55891.25344492628


gb = gbtr_5bb236616e06
gbModel = GBTRegressionModel (uid=gbtr_5bb236616e06) with 20 trees
predictions = [features: vector, label: double ... 1 more field]
evaluator = regEval_95f330fe9524
rmse = 55891.25344492628


55891.25344492628

---
# 6. Hyperparameter tuning
An important task in Machie Learning is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as LinearRegression, or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately. MLlib supports model selection tools, such as `CrossValidator`. These tools require the following items:
* Estimator: algorithm or Pipeline to tune (`setEstimator`)
* Set of ParamMaps: parameters to choose from, sometimes called a "parameter grid" to search over (`setEstimatorParamMaps`)
* Evaluator: metric to measure how well a fitted Model does on held-out test data (`setEvaluator`)

`CrossValidator` begins by splitting the dataset into a set of folds, which are used as separate training and test datasets. For example with `k=3` folds, `CrossValidator` will generate 3 (training, test) dataset pairs, each of which uses 2/3 of the data for training and 1/3 for testing. To evaluate a particular `ParamMap`, `CrossValidator` computes the average evaluation metric for the 3 Models produced by fitting the Estimator on the 3 different (training, test) dataset pairs. After identifying the best `ParamMap`, `CrossValidator` finally re-fits the Estimator using the best ParamMap and the entire dataset.

Below, use the `CrossValidator` to select the best Random Forest model. To do so, you need to define a grid of parameters. Let's say we want to do the search among the different number of trees (1, 5, and 10), and different tree depth (5, 10, and 15).

Name: Unknown Error
Message: lastException: Throwable = null
<console>:122: error: value show is not a member of org.apache.spark.ml.Pipeline
       pipeline.show
                ^

StackTrace: 

In [185]:
// TODO: Replace <FILL IN> with appropriate code

import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.CrossValidator

val paramGrid = new ParamGridBuilder().addGrid(rf.numTrees, Array(1, 5, 10)).addGrid(rf.maxDepth, Array(5, 10, 15)).build()
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val cv = new CrossValidator().setEstimator(rf).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(3)
val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

+------------------+-------+--------------------+
|        prediction|  label|            features|
+------------------+-------+--------------------+
|101867.23076923078|66900.0|[-2.3160579331997...|
|102395.56410256411|74600.0|[-2.3060755606895...|
| 83554.69061201955|67000.0|[-2.3010843744344...|
| 83044.72319347318|81300.0|[-2.2960931881793...|
| 71755.07142857143|68300.0|[-2.2960931881793...|
+------------------+-------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 52738.250832859354


paramGrid = 


Array({
	rfr_1f251db89a6a-maxDepth: 5,
	rfr_1f251db89a6a-numTrees: 1
}, {
	rfr_1f251db89a6a-maxDepth: 5,
	rfr_1f251db89a6a-numTrees: 5
}, {
	rfr_1f251db89a6a-maxDepth: 5,
	rfr_1f251db89a6a-numTrees: 10
}, {
	rfr_1f251db89a6a-maxDepth: 10,
	rfr_1f251db89a6a-numTrees: 1
}, {
	rfr_1f251db89a6a-maxDepth: 10,
	rfr_1f251db89a6a-numTrees: 5
}, {
	rfr_1f251db89a6a-maxDepth: 10,
	rfr_1f251db89a6a-numTrees: 10
}, {
	rfr_1f251db89a6a-maxDepth: 15,
	rfr_1f251db89a6a-numTrees: 1
}, {
	rfr_1f251db89a6a-maxDepth: 15,
	rfr_1f251db89a6a-numTrees: 5
}, {
	rfr_1f251db89a6a-maxDepth: 15,
	rfr_1f25...


---
# 7. Custom transformer
At the end of part two, we added extra columns to the `housing` dataset. Here, we are going to implement a Transformer to do the same task. The Transformer should take the name of two input columns `inputCol1` and `inputCol2`, as well as the name of ouput column `outputCol`. It, then, computes `inputCol1` divided by `inputCol2`, and adds its result as a new column to the dataset. The details of the implemeting a custom Tranfomer is explained [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types). Please read it before before starting to implement it.

First, define the given parameters of the Transformer and implement a method to validate their schemas (`StructType`).

In [31]:
import org.apache.spark.sql.types.{StructField, StructType, DoubleType}
import org.apache.spark.ml.param.{ParamMap, Param, Params}

trait MyParams extends Params {
    final val inputCol1 = new Param[String](this, "inputCol1","this is first input column")
    final val inputCol2 = new Param[String](this, "inputCol2","this is second input column")
    final val outputCol = new Param[String](this, "outputCol","this is the output column")
    
  protected def validateAndTransformSchema(schema: StructType): StructType = {
    val idx1 = schema.fieldIndex($(inputCol1))
    val field1 = schema.fields(idx1)
    if (field1.dataType != DoubleType) {
      throw new Exception(s"Input type ${field1.dataType} did not match input type DoubleType")
    }
      
    val idx2 = schema.fieldIndex($(inputCol2))
    val field2 = schema.fields(idx2)
    if (field2.dataType != DoubleType) {
      throw new Exception(s"Input type ${field2.dataType} did not match input type DoubleType")
    }
    
    schema.add(StructField($(outputCol), DoubleType, false)) 
  }
}


defined trait MyParams


Then, extend the class `Transformer`, and implement its setter functions for the input and output columns, and call then `setInputCol1`, `setInputCol2`, and `setOutputCol`. Morever, you need to override the methods `copy`, `transformSchema`, and the `transform`. The details of what you need to cover in these methods is given [here](https://www.oreilly.com/learning/extend-spark-ml-for-your-own-modeltransformer-types).

In [33]:
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{ParamMap, Param, Params}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions.{col, udf}

class MyTransformer(override val uid: String) extends Transformer with MyParams {
    def this() = this(Identifiable.randomUID("configurablewordcount"))
    
    def setInputCol1(value: String) = set(inputCol1, value)
    
    def setInputCol2(value: String) = set(inputCol2, value)
    
    def setOutputCol(value: String) = set(outputCol, value)

    override def copy(extra: ParamMap) : MyTransformer = {
        defaultCopy(extra)
    }

    override def transformSchema(schema: StructType): StructType = {
        validateAndTransformSchema(schema)
    }
    
    override def transform(dataset: Dataset[_]): DataFrame = {
       dataset.withColumn($(outputCol), dataset($(inputCol1)) / dataset($(inputCol2)))
  }
}

defined class MyTransformer


Now, an instance of `MyTransformer`, and set the input columns `total_rooms` and `households`, and the output column `rooms_per_household` and run it over the `housing` dataset.

In [34]:
val myTransformer = new MyTransformer().setInputCol1("total_rooms").setInputCol2("households").setOutputCol("rooms_per_household")

val myDataset = myTransformer.transform(housing).select("rooms_per_household").show(5)

+-------------------+
|rooms_per_household|
+-------------------+
|  6.984126984126984|
|  6.238137082601054|
|  8.288135593220339|
| 5.8173515981735155|
|  6.281853281853282|
+-------------------+
only showing top 5 rows



myTransformer = configurablewordcount_817275e76c72


myDataset: Unit = ()


configurablewordcount_817275e76c72

---
# 8. Custom estimator (predictor)
Now, it's time to implement your own linear regression with gradient descent algorithm as a brand new Estimator. The whole code of the Estimator is given to you, and you do not need to implement anything. It is just a sample that shows how to build a custom Estimator.

The gradient descent update for linear regression is:
$$
w_{i+1} = w_{i} - \alpha_{i} \sum\limits_{j=1}^n (w_i^\top x_j - y_j)x_j
$$

where $i$ is the iteration number of the gradient descent algorithm, and $j$ identifies the observation. Here, $w$ represents an array of weights that is the same size as the array of features and provides a weight for each of the features when finally computing the label prediction in the form:

$$
prediction = w^\top \cdot\ x
$$

where $w$ is the final array of weights computed by the gradient descent, $x$ is the array of features of the observation point and $prediction$ is the label we predict should be associated to this observation.

The given `Helper` class implements the helper methods:
* `dot`: implements the dot product of two vectors and the dot product of a vector and a scalar
* `sum`: implements addition of two vectors
* `fill`: creates a vector of predefined size and initialize it with the predefined value

What you need to do is to implement the methods of the Linear Regresstion class `LR`, which are
* `rmsd`: computes the Root Mean Square Error of a given RDD of tuples of (label, prediction) using the formula:
$$
rmse = \sqrt{\frac{\sum\limits_{i=1}^n (label - prediction)^2}{n}}
$$
* `gradientSummand`: computes the following formula:
$$
gs_{ij} = (w_i^\top x_j - y_j)x_j
$$
* `gradient`: computes the following formula:
$$
gradient = \sum\limits_{j=1}^n gs_{ij}
$$

In [None]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

case class Instance(label: Double, features: Vector)

object Helper extends Serializable {
  def dot(v1: Vector, v2: Vector): Double = {
    val m = Matrices.dense(1, v1.size, v1.toArray)
    m.multiply(v2).values(0)
  }

  def dot(v: Vector, s: Double): Vector = {
    val baseArray = v.toArray.map(vi => vi * s)
    Vectors.dense(baseArray)
  }

  def sumVectors(v1: Vector, v2: Vector): Vector = {
    val baseArray = ((v1.toArray) zip (v2.toArray)).map { case (val1, val2) => val1 + val2 }
    Vectors.dense(baseArray)
  }

  def fillVector(size: Int, fillVal: Double): Vector = Vectors.dense(Array.fill[Double](size)(fillVal));
}

In [None]:
class LR() extends Serializable {
  def calcRMSE(labelsAndPreds: RDD[(Double, Double)]): Double = {
    val regressionMetrics = new RegressionMetrics(labelsAndPreds)
    regressionMetrics.rootMeanSquaredError
  }
  
  def gradientSummand(weights: Vector, lp: Instance): Vector = {
    val mult = (Helper.dot(weights, lp.features) - lp.label)
    val seq = (0 to lp.features.size - 1).map(i => lp.features(i) * mult)
    return Vectors.dense(seq.toArray)
  }
  
  def linregGradientDescent(trainData: RDD[Instance], numIters: Int): (Vector, Array[Double]) = {
    val n = trainData.count()
    val d = trainData.take(1)(0).features.size
    var w = Helper.fillVector(d, 0)
    val alpha = 1.0
    val errorTrain = Array.fill[Double](numIters)(0.0)

    for (i <- 0 until numIters) {
      val labelsAndPredsTrain = trainData.map(lp => (lp.label, Helper.dot(w, lp.features)))
      errorTrain(i) = calcRMSE(labelsAndPredsTrain)

      val gradient = trainData.map(lp => gradientSummand(w, lp)).reduce((v1, v2) => Helper.sumVectors(v1, v2))
      val alpha_i = alpha / (n * scala.math.sqrt(i + 1))
      val wAux = Helper.dot(gradient, (-1) * alpha_i)
      w = Helper.sumVectors(w, wAux)
    }
    (w, errorTrain)
  }
}

In [None]:
abstract class MyLinearModel[FeaturesType, Model <: MyLinearModel[FeaturesType, Model]]
  extends PredictionModel[FeaturesType, Model] {
}

class MyLinearModelImpl(override val uid: String, val weights: Vector, val trainingError: Array[Double])
    extends MyLinearModel[Vector, MyLinearModelImpl] {

  override def copy(extra: ParamMap): MyLinearModelImpl = defaultCopy(extra)

  def predict(features: Vector): Double = {
    println("Predicting")
    val prediction = Helper.dot(weights, features)
    prediction
  }
}

In [None]:
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.PredictorParams
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._

import org.apache.spark.sql.Row
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Matrices
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.ml.{PredictionModel, Predictor}

abstract class MyLinearRegression[
    FeaturesType,
    Learner <: MyLinearRegression[FeaturesType, Learner, Model],
    Model <: MyLinearModel[FeaturesType, Model]]
  extends Predictor[FeaturesType, Learner, Model] {
}

class MyLinearRegressionImpl(override val uid: String)
    extends MyLinearRegression[Vector, MyLinearRegressionImpl, MyLinearModelImpl] {
  def this() = this(Identifiable.randomUID("linReg"))

  override def copy(extra: ParamMap): MyLinearRegressionImpl = defaultCopy(extra)
  
  def train(dataset: Dataset[_]): MyLinearModelImpl = {
    println("Training")

    val numIters = 10

    val instances: RDD[Instance] = dataset.select(
      col($(labelCol)), col($(featuresCol))).rdd.map {
        case Row(label: Double, features: Vector) =>
          Instance(label, features)
      }

    val (weights, trainingError) = new LR().linregGradientDescent(instances, numIters)

    new MyLinearModelImpl(uid, weights, trainingError)
  }
}

In [None]:
import org.apache.spark.ml.evaluation.RegressionEvaluator

val lr = new MyLinearRegressionImpl().setLabelCol("label").setFeaturesCol("features")
val model = lr.fit(trainSet)
val predictions = model.transform(trainSet)
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

---
# 9. An End-to-End Classification Test
As the last step, you are given a dataset called `data/ccdefault.csv`. The dataset represents default of credit card clients. It has 30,000 cases and 24 different attributes. More details about the dataset is available at `data/ccdefault.txt`. In this task you should make three models, compare their results and conclude the ideal solution. Here are the suggested steps:
1. Load the data.
2. Carry out some exploratory analyses (e.g., how various features and the target variable are distributed).
3. Train a model to predict the target variable (risk of `default`).
  - Employ three different models (logistic regression, decision tree, and random forest).
  - Compare the models' performances (e.g., AUC).
  - Defend your choice of best model (e.g., what are the strength and weaknesses of each of these models?).
4. What more would you do with this data? Anything to help you devise a better solution?

# 9.1 Loading the Data

In [1]:
val ccdefault = spark.read.format("csv").option("header","true").option("inferSchema", "true").load("ccdefault.csv")
ccdefault.printSchema

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- DEFAULT: integer (nullable = tru

ccdefault = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

In [2]:
ccdefault.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|     3455|     3261|       0|    1000|    1000|    1000|       0|    2000|    

# 9.2 Statistical summary

In [3]:
for (c <- ccdefault.columns) {
    ccdefault.describe(c).show
}

+-------+-----------------+
|summary|               ID|
+-------+-----------------+
|  count|            30000|
|   mean|          15000.5|
| stddev|8660.398374208891|
|    min|                1|
|    max|            30000|
+-------+-----------------+

+-------+------------------+
|summary|         LIMIT_BAL|
+-------+------------------+
|  count|             30000|
|   mean|167484.32266666667|
| stddev|129747.66156720246|
|    min|             10000|
|    max|           1000000|
+-------+------------------+

+-------+------------------+
|summary|               SEX|
+-------+------------------+
|  count|             30000|
|   mean|1.6037333333333332|
| stddev|0.4891291960902602|
|    min|                 1|
|    max|                 2|
+-------+------------------+

+-------+------------------+
|summary|         EDUCATION|
+-------+------------------+
|  count|             30000|
|   mean|1.8531333333333333|
| stddev|0.7903486597207269|
|    min|                 0|
|    max|           

# Finding Correlation among different Attributes

The given data has 6 features for each of the "repayment status (PAY_X)", "Amount of bill statement (BILL_AMTX)", and "Amount of previous payment (PAY_AMTX)". Let's find the Pearson Correlation between 6 features of PAY_X, BILL_AMTX, and PAY_AMTX. To do so, first you need to put these attributes into one vector. Then, compute the standard correlation coefficient (Pearson) between every pair of attributes in this new vector. To make a vector of these attributes, we can use the VectorAssembler Transformer.

In [4]:
// correlation for PAY_X features

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.DoubleType

val cols = Array("PAY_0", "PAY_2", "PAY_3", "PAY_4", "PAY_5", "PAY_6")
val va = new VectorAssembler().setInputCols(cols).setOutputCol("PAY_features")

val ccdefaultAttrs_1 = va.transform(ccdefault)

ccdefaultAttrs_1.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|        PAY_features|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|[2.0,2.0,-1.0,-1....|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|    

cols = Array(PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)
va = vecAssembler_0359af9f780f
ccdefaultAttrs_1 = [ID: int, LIMIT_BAL: int ... 24 more fields]


[ID: int, LIMIT_BAL: int ... 24 more fields]

In [5]:
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val Row(coeff: Matrix) = Correlation.corr(ccdefaultAttrs_1, "PAY_features").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                 0.6721643825483117  0.5742450926204353  ... (6 total)
0.6721643825483117  1.0                 0.766551682934095   ...
0.5742450926204353  0.766551682934095   1.0                 ...
0.5388406268712332  0.6620671310239535  0.7773588733012698  ...
0.509426063665447   0.6227802453768725  0.6867745109947861  ...
0.4745530860641512  0.5755008617793068  0.6326835927184404  ...


coeff = 


1.0                 0.6721643825483117  0.5742450926204353  ... (6 total)
0.6721643825483117  1.0                 0.766551682934095   ...
0.5742450926204353  0.766551682934095   1.0                 ...
0.5388406268712332  0.6620671310239535  0.7773588733012698  ...
0.509426063665447   0.6227802453768725  0.6867745109947861  ...
0.4745530860641512  0.5755008617793068  0.6326835927184404  ...


In [6]:
// correlation for BILL_AMTX features

val cols = Array("BILL_AMT1", "BILL_AMT2", "BILL_AMT3", "BILL_AMT4", "BILL_AMT5", "BILL_AMT6")
val va = new VectorAssembler().setInputCols(cols).setOutputCol("BILL_AMT_features")

val ccdefaultAttrs_2 = va.transform(ccdefault)

ccdefaultAttrs_2.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|   BILL_AMT_features|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|[3913.0,3102.0,68...|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|    

cols = Array(BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6)
va = vecAssembler_ea6227729c15
ccdefaultAttrs_2 = [ID: int, LIMIT_BAL: int ... 24 more fields]


[ID: int, LIMIT_BAL: int ... 24 more fields]

In [7]:

val Row(coeff: Matrix) = Correlation.corr(ccdefaultAttrs_2, "BILL_AMT_features").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                 0.9514836727518164  0.8922785291271811  ... (6 total)
0.9514836727518164  1.0                 0.9283262592714868  ...
0.8922785291271811  0.9283262592714868  1.0                 ...
0.8602721890293089  0.8924822912577247  0.9239694565909823  ...
0.8297786058329933  0.8597783072714432  0.8839096973620095  ...
0.8026501885528523  0.8315935591018226  0.8533200905940505  ...


coeff = 


1.0                 0.9514836727518164  0.8922785291271811  ... (6 total)
0.9514836727518164  1.0                 0.9283262592714868  ...
0.8922785291271811  0.9283262592714868  1.0                 ...
0.8602721890293089  0.8924822912577247  0.9239694565909823  ...
0.8297786058329933  0.8597783072714432  0.8839096973620095  ...
0.8026501885528523  0.8315935591018226  0.8533200905940505  ...


In [8]:
// correlation for PAY_AMTX features

val cols = Array("PAY_AMT1", "PAY_AMT2", "PAY_AMT3", "PAY_AMT4", "PAY_AMT5", "PAY_AMT6")
val va = new VectorAssembler().setInputCols(cols).setOutputCol("PAY_AMT_features")

val ccdefaultAttrs_3 = va.transform(ccdefault)

ccdefaultAttrs_3.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|    PAY_AMT_features|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+--------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|     (6,[1],[689.0])|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|    

cols = Array(PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)
va = vecAssembler_bf5aaafe9d3f
ccdefaultAttrs_3 = [ID: int, LIMIT_BAL: int ... 24 more fields]


[ID: int, LIMIT_BAL: int ... 24 more fields]

In [9]:
val Row(coeff: Matrix) = Correlation.corr(ccdefaultAttrs_3, "PAY_AMT_features").head

println(s"The standard correlation coefficient:\n ${coeff}")

The standard correlation coefficient:
 1.0                  0.2855755286868427   0.25219113895240175  ... (6 total)
0.2855755286868427   1.0                  0.24477045029284722  ...
0.25219113895240175  0.24477045029284722  1.0                  ...
0.19955793117068046  0.1801067436456241   0.216325091700843    ...
0.1484592750153428   0.18090775259416947  0.15921372030871106  ...
0.18573525544572722  0.15763391627233472  0.1627400332918339   ...


coeff = 


1.0                  0.2855755286868427   0.25219113895240175  ... (6 total)
0.2855755286868427   1.0                  0.24477045029284722  ...
0.25219113895240175  0.24477045029284722  1.0                  ...
0.19955793117068046  0.1801067436456241   0.216325091700843    ...
0.1484592750153428   0.18090775259416947  0.15921372030871106  ...
0.18573525544572722  0.15763391627233472  0.1627400332918339   ...


The Pearson Correlation coefficient outputs values between -1 and 1. 1 implies, the features are higly correlated. -1 implies negatively correlated (inversely propotional). 0 implies not correlated at all.


From the above Pearson correlation test, PAY_X and PAY_AMTX are not much correlated with each other. They may be very strong features in deciding whether the coustmer is defauler or no.

Further, we found that BILL_AMTX features are highly correlated with each other. Meaning all the six features of BILL_AMTX contains the same information. So, we may remove 5 features and train the model using 1 feature only. However, BILL_AMTX conveys history of payments of clients, which is needed for real-world banking scenario. Hence, we are not removing these features as well.

Furthermore, the improvement in predicting accuracy of the model increased very marginally only. (so, we decided to keep the features as is)

#  9.3 Preparing the data for Machine Learning Algorithm

From the given data set, let's first rename the label column from "DEFAULT" to "label"

In [10]:
val renamedccdefault = ccdefault.withColumnRenamed("DEFAULT", "label")
ccdefault.show(1)

renamedccdefault.show(1)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|      1|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+----

renamedccdefault = [ID: int, LIMIT_BAL: int ... 23 more fields]


[ID: int, LIMIT_BAL: int ... 23 more fields]

From above, we can notice that there is no categorical data. Hence, we can now proceed with Data Cleaning

Before data cleaning, let's seperate the label from the features.

In [11]:
// label columns
val colLabel = "label"

// ID column
val colID = "ID"


// Attributes (features) columns
val featureCol = renamedccdefault.columns.filter(_ != colLabel).filter(_ != colID)

colLabel = label
colID = ID
featureCol = Array(LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


Array(LIMIT_BAL, SEX, EDUCATION, MARRIAGE, AGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)

# Data Cleaning

let's find the columns with missing values in the attributes. To do so, we can print the number of missing values of each continues attributes, listed in featureCol

In [12]:
for (c <- featureCol) {
    val count = ccdefault.select(c).filter(ccdefault(c).isNull).count
    println(s"$c: $count")
}

LIMIT_BAL: 0
SEX: 0
EDUCATION: 0
MARRIAGE: 0
AGE: 0
PAY_0: 0
PAY_2: 0
PAY_3: 0
PAY_4: 0
PAY_5: 0
PAY_6: 0
BILL_AMT1: 0
BILL_AMT2: 0
BILL_AMT3: 0
BILL_AMT4: 0
BILL_AMT5: 0
BILL_AMT6: 0
PAY_AMT1: 0
PAY_AMT2: 0
PAY_AMT3: 0
PAY_AMT4: 0
PAY_AMT5: 0
PAY_AMT6: 0


From above, we can notice none of the features have missing values

# Scaling

# Numerical Data

---
The features `LIMIT_BAL`, `AGE`, `BILL_AMTX`, and `PAY_AMTX` features are of Numerical type and have to be scaled.

Generally, Machine Learning algorithms don't perform well when the input numerical attributes have very different scales. Here, we are not scaling the "label" attribute.

One way to get all attributes to have the same scale is to use standardization. In standardization, for each value, first it subtracts the mean value (so standardized values always have a zero mean), and then it divides by the variance so that the resulting distribution has unit variance. To do this, we can use the `StandardScaler` Estimator. To use `StandardScaler`, again we need to convert all the numerical attributes into a big vectore of features using `VectorAssembler`, and then call `StandardScaler` on that vactor.

In [13]:
// The Attributes that are going to be scaled. These Attributes are selected because, 
//the difference between the max and minimum is very huge


val cols = Array("LIMIT_BAL", "AGE", "BILL_AMT1", "BILL_AMT2", "BILL_AMT3", "BILL_AMT4", "BILL_AMT5", 
                 "BILL_AMT6", "PAY_AMT1", "PAY_AMT2", "PAY_AMT3", "PAY_AMT4", "PAY_AMT5", "PAY_AMT6")

cols = Array(LIMIT_BAL, AGE, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)


Array(LIMIT_BAL, AGE, BILL_AMT1, BILL_AMT2, BILL_AMT3, BILL_AMT4, BILL_AMT5, BILL_AMT6, PAY_AMT1, PAY_AMT2, PAY_AMT3, PAY_AMT4, PAY_AMT5, PAY_AMT6)

In [14]:
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}

val va = new VectorAssembler().setInputCols(cols).setOutputCol("vectored_features")
val vectoredccdefault = va.transform(renamedccdefault)

val scaler = new StandardScaler().setInputCol("vectored_features").setOutputCol("scaled_features")
val scaledccdefault = scaler.fit(vectoredccdefault).transform(vectoredccdefault)

scaledccdefault.show(1)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-----------------+--------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|vectored_features|     scaled_features|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-----------------+--------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|    1|  (6,[1],[689.0])|(6,[1],[0.0299033...|
+---+---------+---+---------+--------+---+-----+

va = vecAssembler_deef9393d8a5
vectoredccdefault = [ID: int, LIMIT_BAL: int ... 24 more fields]
scaler = stdScal_11dc44e0c867
scaledccdefault = [ID: int, LIMIT_BAL: int ... 25 more fields]


[ID: int, LIMIT_BAL: int ... 25 more fields]

# Categorical Data

---
The features `SEX`, `EDUCATION`, `MARRIAGE`, and `PAY_X` features are of Categorical type and `one-hot encoding` has to be used for these features.

In [15]:
//Let's first print the number of distict values of the categirical attribute "SEX", "EDUCATION", "MARRIAGE", and "PAY_X".



renamedccdefault.select($"SEX", $"EDUCATION", $"MARRIAGE", $"PAY_0").distinct.show(5)

+---+---------+--------+-----+
|SEX|EDUCATION|MARRIAGE|PAY_0|
+---+---------+--------+-----+
|  2|        1|       1|    0|
|  2|        3|       2|    4|
|  1|        2|       1|    1|
|  1|        2|       2|    4|
|  1|        4|       2|   -2|
+---+---------+--------+-----+
only showing top 5 rows



---
`String Indexing` has already been applied and so, we can directly move to applying `One-hot encoding`

---

### One-hot Encoding

---

In [16]:
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}


//indexing columns
val stringColumns = Array("SEX", "EDUCATION", "MARRIAGE", "PAY_0", "PAY_2", "PAY_3", "PAY_4", "PAY_5", "PAY_6")
val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
  cname => new StringIndexer()
    .setInputCol(cname)
    .setOutputCol(s"${cname}_index")
)

// Add the rest of your pipeline like VectorAssembler and algorithm
val index_pipeline = new Pipeline().setStages(index_transformers)
val index_model = index_pipeline.fit(renamedccdefault)
val ccdefault_indexed = index_model.transform(renamedccdefault)


//encoding columns
val indexColumns  = ccdefault_indexed.columns.filter(x => x contains "index")

val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
    cname => new OneHotEncoder()
     .setInputCol(cname)
     .setOutputCol(s"${cname}_vec"))

val encoder = new OneHotEncoderEstimator()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))


val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
val ccdefault_encoded = encoder.fit(ccdefault_indexed).transform(ccdefault_indexed)

ccdefault_encoded.show(1)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+---------+---------------+--------------+-----------+-----------+-----------+-----------+-----------+-----------+------------------+---------------+-------------------+---------------+---------------+---------------+-------------+---------------+---------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|SEX_index|EDUCATION_index|MARRIAGE_index|PAY_0_index|PAY_2_index|PAY_3_index|PAY_4_index|PAY_5_index|PAY_6_index|MARRIAGE_index_vec|PAY_5_index_vec|EDUCATION_index_vec|PAY_3_index_vec|PAY_6_index_vec|PAY_4_index_vec|SEX_index_vec|PAY_0_index_vec|PAY_2_index_vec|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+

stringColumns = Array(SEX, EDUCATION, MARRIAGE, PAY_0, PAY_2, PAY_3, PAY_4, PAY_5, PAY_6)
index_transformers = Array(strIdx_b3f458c69b17, strIdx_1c29868288f0, strIdx_90c2447374e0, strIdx_9e31df9d75db, strIdx_7427a339438d, strIdx_5a2b777ff858, strIdx_200186cca87c, strIdx_8681ba2de8c8, strIdx_3070bf5f9f31)
index_pipeline = pipeline_ed1498906969
index_model = pipeline_ed1498906969
ccdefault_indexed = [ID: int, LIMIT_BAL: int ... 32 more fields...




[ID: int, LIMIT_BAL: int ... 32 more fields...

# 9.4 Pipeline

There are many data transformation steps that need to be executed in the right order. For example, we called the  `VectorAssembler` and `StandardScaler` from left to right. However, we can use the `Pipeline` class to define a sequence of Transformers/Estimators, and run them in order. A `Pipeline` is an `Estimator`, thus, after a Pipeline's `fit()` method runs, it produces a `PipelineModel`, which is a `Transformer`.

Now, let's create a pipeline called `featurePipeline` to call the numerical transformers you built above (`va` and `scaler`) in the right order from left to right.

In [17]:
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

val numberPipeline = new Pipeline().setStages(Array(va, scaler))
val catPipeline = new Pipeline().setStages(Array(index_pipeline, one_hot_pipeline))
val finalPipeline = new Pipeline().setStages(Array(numberPipeline, catPipeline))
val new_ccdefault = finalPipeline.fit(renamedccdefault).transform(renamedccdefault)

new_ccdefault.show(1)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+-----+-----------------+--------------------+---------+---------------+--------------+-----------+-----------+-----------+-----------+-----------+-----------+-------------+-------------------+------------------+---------------+---------------+---------------+---------------+---------------+---------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|label|vectored_features|     scaled_features|SEX_index|EDUCATION_index|MARRIAGE_index|PAY_0_index|PAY_2_index|PAY_3_index|PAY_4_index|PAY_5_index|PAY_6_index|SEX_index_vec|EDUCATION_index_vec|MARRIAGE_index_vec|PAY_0_index_vec|PAY_2_index_vec|PAY_3_index_vec|PAY_4_index_vec|PAY_5_index_vec|PAY_6_index_vec|


numberPipeline = pipeline_7d2eab61093a
catPipeline = pipeline_929fc3076a3b
finalPipeline = pipeline_e0435be80390
new_ccdefault = [ID: int, LIMIT_BAL: int ... 43 more fields]


[ID: int, LIMIT_BAL: int ... 43 more fields]

---
Now, use `VectorAssembler` to put all attributes of the final dataset `new_ccdefault` into a big vector, and call the new column `features`.

In [18]:
val va2 = new VectorAssembler()
        .setInputCols(Array("scaled_features", "SEX_index_vec", "EDUCATION_index_vec", "MARRIAGE_index_vec", "PAY_0_index_vec", 
                            "PAY_2_index_vec", "PAY_3_index_vec", "PAY_4_index_vec", "PAY_5_index_vec", "PAY_6_index_vec"))
        .setOutputCol("features")
val dataset = va2.transform(new_ccdefault).select("features", "label")

dataset.show(1, false)


+--------------------------------------------------------------------------------------------+-----+
|features                                                                                    |label|
+--------------------------------------------------------------------------------------------+-----+
|(74,[1,6,7,14,20,28,37,47,58,67],[0.029903384202815683,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1    |
+--------------------------------------------------------------------------------------------+-----+
only showing top 1 row



va2 = vecAssembler_534b07249ed5
dataset = [features: vector, label: int]


[features: vector, label: int]

# 9.5 Make a Model

---
Here we going to make three different regression models:
* Logistic regression model
* Decission tree regression
* Random forest regression

But, before giving the data to train a Machine Learning model, let's first split the data into training dataset (`trainSet`) with 80% of the whole data, and test dataset (`testSet`) with 20% of it.

In [19]:
val Array(trainSet, testSet) = dataset.randomSplit(Array[Double](0.8, 0.2), 18)

trainSet = [features: vector, label: int]
testSet = [features: vector, label: int]


[features: vector, label: int]

# 9.5.1 Logestic Regression model

In [32]:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

//Note that the default metric for the BinaryClassificationEvaluator is areaUnderROC

val logisticRegression = new LogisticRegression()
    .setLabelCol("label")
    .setFeaturesCol("features")

val logisticRegressionModel = logisticRegression.fit(trainSet)

// make predictions on the test data
val predictions = logisticRegressionModel.transform(testSet)
predictions.select("prediction", "label", "features").show(5)

predictions.show(5)

val evaluator = new BinaryClassificationEvaluator()

// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better).
val areaUnderROC = evaluator.evaluate(predictions)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|    1|(74,[0,1,2,3,4,5,...|
|       0.0|    0|(74,[0,1,2,3,4,5,...|
|       0.0|    0|(74,[0,1,2,3,4,5,...|
|       0.0|    1|(74,[0,1,2,3,4,5,...|
|       0.0|    0|(74,[0,1,2,3,4,5,...|
+----------+-----+--------------------+
only showing top 5 rows

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(74,[0,1,2,3,4,5,...|    1|[1.99150791554444...|[0.87990257635454...|       0.0|
|(74,[0,1,2,3,4,5,...|    0|[1.99411551496260...|[0.88017785903984...|       0.0|
|(74,[0,1,2,3,4,5,...|    0|[1.98874672511135...|[0.87961048401260...|       0.0|
|(74,[0,1,2,3,4,5,...|    1|[1.99097270897714...|[0.87984600741775...|       0.0|
|(74,[0,1,2,3,4,5,...|    0|[2.0009765046

logisticRegression = logreg_a6fc69979a65
logisticRegressionModel = logreg_a6fc69979a65
predictions = [features: vector, label: int ... 3 more fields]
evaluator = binEval_f3051788d630
areaUnderROC = 0.7570233045350765


0.7570233045350765

In [33]:
// Create ParamGrid for Cross Validation
import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }

val paramGrid = new ParamGridBuilder()
             .addGrid(logisticRegression.regParam, Array(0.01, 0.5, 2.0))
             .addGrid(logisticRegression.elasticNetParam, Array(0.0, 0.5, 1.0))
             .addGrid(logisticRegression.maxIter, Array(1, 5, 10))
             .build()

paramGrid = 


Array({
	logreg_a6fc69979a65-elasticNetParam: 0.0,
	logreg_a6fc69979a65-maxIter: 1,
	logreg_a6fc69979a65-regParam: 0.01
}, {
	logreg_a6fc69979a65-elasticNetParam: 0.5,
	logreg_a6fc69979a65-maxIter: 1,
	logreg_a6fc69979a65-regParam: 0.01
}, {
	logreg_a6fc69979a65-elasticNetParam: 1.0,
	logreg_a6fc69979a65-maxIter: 1,
	logreg_a6fc69979a65-regParam: 0.01
}, {
	logreg_a6fc69979a65-elasticNetParam: 0.0,
	logreg_a6fc69979a65-maxIter: 1,
	logreg_a6fc69979a65-regParam: 0.5
}, {
	logreg_a6fc69979a65-elasticNetParam: 0.5,
	logreg_a6fc69979a65-maxIter: 1,
	logreg_a6fc69979a65-regParam: 0.5
}, {
	logreg_a6fc69979a65-elasticNetParam: 1.0,
	logreg_a6fc69979a65-maxIter: 1,
	logre...


In [34]:
//Create 5-fold CrossValidator
val cv = new CrossValidator().setEstimator(logisticRegression).setEstimatorParamMaps(paramGrid)
                                .setEvaluator(evaluator).setNumFolds(5)


cv = cv_5fa26cf9981e


cv_5fa26cf9981e

In [35]:
//Run cross validations
val cvModel = cv.fit(trainSet)

//Use test set to measure the accuracy of our model on new data
val predictions = cvModel.transform(testSet)

// cvModel uses the best model found from the Cross Validation
// Evaluate best model
val accuracy = evaluator.evaluate(predictions)

cvModel = cv_e79af6ddfea5
predictions = [features: vector, label: int ... 3 more fields]
accuracy = 0.7559659901670814


0.7559659901670814

# 9.5.2 Decision Tree

In [36]:
import org.apache.spark.ml.classification.DecisionTreeClassifier


//Create initial Decision Tree Model
val dt = new DecisionTreeClassifier().setLabelCol("label").setFeaturesCol("features") //we can use max depth as well (.maxDepth(3))

//Train model with Training Data
val dtModel = dt.fit(trainSet)

dt = dtc_bbaeb19c3ad3
dtModel = DecisionTreeClassificationModel (uid=dtc_bbaeb19c3ad3) of depth 5 with 57 nodes


DecisionTreeClassificationModel (uid=dtc_bbaeb19c3ad3) of depth 5 with 57 nodes

In [37]:
//Make predictions on test data using the Transformer.transform() method.
val predictions = dtModel.transform(testSet)
predictions.printSchema()

root
 |-- features: vector (nullable = true)
 |-- label: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



predictions = [features: vector, label: int ... 3 more fields]


[features: vector, label: int ... 3 more fields]

In [38]:
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

//Evaluate model
val evaluator = new BinaryClassificationEvaluator() 

//Entropy and the Gini coefficient are the supported measures of impurity for Decision Trees. This is Gini by default.
//Changing this value is simple, model.setImpurity("Entropy").
val accuracy = evaluator.evaluate(predictions)

evaluator = binEval_82f75454dd06
accuracy = 0.3621843704597344


0.3621843704597344

In [39]:
// Create ParamGrid for Cross Validation
import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }

val paramGrid = new ParamGridBuilder()
             .addGrid(dt.maxDepth, Array(1, 2, 6, 10))
             .addGrid(dt.maxBins, Array(20, 40, 80))
             .build()


//Create 5-fold CrossValidator
val cv = new CrossValidator().setEstimator(dt).setEstimatorParamMaps(paramGrid).setEvaluator(evaluator).setNumFolds(5)



paramGrid = 


Array({
	dtc_bbaeb19c3ad3-maxBins: 20,
	dtc_bbaeb19c3ad3-maxDepth: 1
}, {
	dtc_bbaeb19c3ad3-maxBins: 40,
	dtc_bbaeb19c3ad3-maxDepth: 1
}, {
	dtc_bbaeb19c3ad3-maxBins: 80,
	dtc_bbaeb19c3ad3-maxDepth: 1
}, {
	dtc_bbaeb19c3ad3-maxBins: 20,
	dtc_bbaeb19c3ad3-maxDepth: 2
}, {
	dtc_bbaeb19c3ad3-maxBins: 40,
	dtc_bbaeb19c3ad3-maxDepth: 2
}, {
	dtc_bbaeb19c3ad3-maxBins: 80,
	dtc_bbaeb19c3ad3-maxDepth: 2
}, {
	dtc_bbaeb19c3ad3-maxBins: 20,
	dtc_bbaeb19c3ad3-maxDepth: 6
}, {
	dtc_bbaeb19c3ad3-maxBins: 40,
	dtc_bbaeb19c3ad3-maxDepth: 6
}, {
	dtc_bbaeb19c3ad3-maxBins: 80,
	dtc_bbaeb19c3ad3-maxDepth: 6
}, {
	dtc_bbaeb19c3ad3-maxBins: 20,
	dtc_bbaeb19c3ad3-maxDepth: 10
}, {
	dtc...


In [40]:
//Run cross validations
val cvModel = cv.fit(trainSet)

val predictions = cvModel.transform(testSet)
val accuracy = evaluator.evaluate(predictions)

cvModel = cv_5fa26cf9981e
predictions = [features: vector, label: int ... 3 more fields]
accuracy = 0.758248041199674


0.758248041199674

# 9.5.3 Random Forest

In [50]:
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator


// create the classifier,  set parameters for training**
val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
//  use the random forest classifier  to train (fit) the model**
val model = classifier.fit(trainSet)

// print out the random forest trees**
model.toDebugString

// run the  model on test features to get predictions**
val predictions = model.transform(testSet)
//As you can see, the previous model transform produced a new columns: rawPrediction, probablity and prediction.**
predictions.show(5)

// create an Evaluator for binary classification, which expects two input columns: rawPrediction and label.**
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
// Evaluates predictions and returns a scalar metric areaUnderROC(larger is better).**
val accuracy = evaluator.evaluate(predictions)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(74,[0,1,2,3,4,5,...|    1|[16.9688185536942...|[0.84844092768471...|       0.0|
|(74,[0,1,2,3,4,5,...|    0|[16.9688185536942...|[0.84844092768471...|       0.0|
|(74,[0,1,2,3,4,5,...|    0|[16.9688185536942...|[0.84844092768471...|       0.0|
|(74,[0,1,2,3,4,5,...|    1|[16.9688185536942...|[0.84844092768471...|       0.0|
|(74,[0,1,2,3,4,5,...|    0|[16.9688185536942...|[0.84844092768471...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



classifier = rfc_6dc7f5d9c74e
model = RandomForestClassificationModel (uid=rfc_6dc7f5d9c74e) with 20 trees
predictions = [features: vector, label: int ... 3 more fields]
evaluator = binEval_0f3b416cf659
accuracy = 0.7559659901670814


0.7559659901670814

In [51]:
// Create ParamGrid for Cross Validation
import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }

val paramGrid = new ParamGridBuilder()
             .addGrid(classifier.maxDepth, Array(2, 4, 6))
             .addGrid(classifier.maxBins, Array(20, 60))
             .addGrid(classifier.numTrees, Array(20, 30, 40))
             .build()

paramGrid = 


Array({
	rfc_6dc7f5d9c74e-maxBins: 20,
	rfc_6dc7f5d9c74e-maxDepth: 2,
	rfc_6dc7f5d9c74e-numTrees: 20
}, {
	rfc_6dc7f5d9c74e-maxBins: 20,
	rfc_6dc7f5d9c74e-maxDepth: 2,
	rfc_6dc7f5d9c74e-numTrees: 30
}, {
	rfc_6dc7f5d9c74e-maxBins: 20,
	rfc_6dc7f5d9c74e-maxDepth: 2,
	rfc_6dc7f5d9c74e-numTrees: 40
}, {
	rfc_6dc7f5d9c74e-maxBins: 60,
	rfc_6dc7f5d9c74e-maxDepth: 2,
	rfc_6dc7f5d9c74e-numTrees: 20
}, {
	rfc_6dc7f5d9c74e-maxBins: 60,
	rfc_6dc7f5d9c74e-maxDepth: 2,
	rfc_6dc7f5d9c74e-numTrees: 30
}, {
	rfc_6dc7f5d9c74e-maxBins: 60,
	rfc_6dc7f5d9c74e-maxDepth: 2,
	rfc_6dc7f5d9c74e-numTrees: 40
}, {
	rfc_6dc7f5d9c74e-maxBins: 20,
	rfc_6dc7f5d9c74e-maxDepth: 4,
	rfc_6dc7f5d9c7...


In [52]:
//Create 5-fold CrossValidator
val cv = new CrossValidator().setEstimator(classifier).setEstimatorParamMaps(paramGrid)
                                .setEvaluator(evaluator).setNumFolds(5)

cv = cv_2f083ba8d6b2


cv_2f083ba8d6b2

In [53]:
//Run cross validations
val cvModel = cv.fit(trainSet)

//Use test set to measure the accuracy of our model on new data
val predictions = cvModel.transform(testSet)

// cvModel uses the best model found from the Cross Validation
// Evaluate best model
val accuracy = evaluator.evaluate(predictions)

cvModel = cv_2bbf56c18b3a
predictions = [features: vector, label: int ... 3 more fields]
accuracy = 0.7664776781081359


0.7664776781081359


---
### Choosing the best model to predict the risk of default

The observed Area under curve for different models is listed below:

`Logistic Regression`
 0.75 - Without hypertuning
 0.75 - With hypertuning
 
 `Decision Tree`
 0.36 - Without hypertuning
 0.75 - With hypertuning

`Random Forest`
0.75 - without hypertuning
0.76 - with hypertuning

From the above output, we can see that `Random Forest` when used with appropriate hypertuning parmeters gives the best performance in predicting the risk of default. This is because `Random Forest` constructs multiple decision trees of different depths and nodes during training. Then, for any test input, the decision is made based on the label decided by maximum number of trees. However, the main drawback is that it takes significantly longer computational time.

`Decision Tree` constructs only one tree with default depth and nodes, which may not result in a better performance. But, it take less computational time compared to `Random Forest`.

In case of `Logistic Regression`, we get comparable performance as Random forest. Even though Random forest is slightly better in performance, we also discussed that it takes much longer computational time. Thus we can argue that logistic regression provides a good balance between performance and computational time. 

PS:
(`Logistic Regression` is less likely to overfit when compared with decision trees. This is because LR only finds a single linear decision boundary, whereas decision trees can find non linear partitions as well.)
 

# Devising a better solution

The features BILL_AMTX are highly correlated with each other, so instead of six features, we may remove 5 features and use only 1 feature in our model. This will reduce the computational time. 
Further, we can check the contribution of different input features in predicting output label. Features that are not very useful in prediction can then be removed to further reduce computational time. 
To further reduce the computation time of logistic regression, we can try using mini batch gradient descent with different batch sizes for training.
We can also play around with more values of hyperparameters, such as `number of trees`, `number of bins`, and `tree depth` to improve the performance of the model.