# CMU 10-405/10-605 auto-graded notebook

Before you turn this assignment in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE."

---

#CMU 10405/10605 Machine Learning with Large Datasets

## Homework 2: ML Pipeline & Linear Regression

In [4]:
# Who did you collaborate with on this assignment? 
# if no one, collaborators should contain an empty string,
# else list your collaborators below

collaborators = [""]

In [5]:
try:
    collaborators
except:
    raise AssertionError("you did not list your collaborators, if any")   

In [6]:
# YOU CAN MOST LIKELY IGNORE THIS CELL. This is only of use for running this notebook locally.

# THIS CELL DOES NOT NEED TO BE RUN ON DATABRICKS. 
# Note that Databricks already creates a SparkContext for you, so this cell can be skipped.
# import findspark
# findspark.init()
# import pyspark
# from pyspark.sql import SQLContext
# sc = pyspark.SparkContext(appName="hw")
# sqlContext = SQLContext(sc)

# print("spark context started")

#I. Power Plant Machine Learning Pipeline Application
This section is an end-to-end exercise of performing Extract-Transform-Load and Exploratory Data Analysis on a real-world dataset, and then applying several different machine learning algorithms to solve a supervised regression problem on the dataset.

** This excercise covers: **
* *Part 1: Business Understanding*
* *Part 2: Load Your Data*
* *Part 3: Explore Your Data*
* *Part 4: Visualize Your Data*
* *Part 5: Data Preparation*
* *Part 6: Data Modeling*
* *Part 7: Tuning and Evaluation*

*Our goal is to accurately predict power output given a set of environmental readings from various sensors in a natural gas-fired power generation plant.*


** Background **

Power generation is a complex process, and understanding and predicting power output is an important element in managing a plant and its connection to the power grid. The operators of a regional power grid create predictions of power demand based on historical information and environmental factors (e.g., temperature). They then compare the predictions against available resources (e.g., coal, natural gas, nuclear, solar, wind, hydro power plants). Power generation technologies such as solar and wind are highly dependent on environmental conditions, and all generation technologies are subject to planned and unplanned maintenance.

Here is an real-world example of predicted demand (on two time scales), actual demand, and available resources from the California power grid: http://www.caiso.com/Pages/TodaysOutlook.aspx

![](http://content.caiso.com/outlook/SP/ems_small.gif)

The challenge for a power grid operator is how to handle a shortfall in available resources versus actual demand. There are three solutions to  a power shortfall: build more base load power plants (this process can take many years to decades of planning and construction), buy and import power from other regional power grids (this choice can be very expensive and is limited by the power transmission interconnects between grids and the excess power available from other grids), or turn on small [Peaker or Peaking Power Plants](https://en.wikipedia.org/wiki/Peaking_power_plant). Because grid operators need to respond quickly to a power shortfall to avoid a power outage, grid operators rely on a combination of the last two choices. In this exercise, we'll focus on the last choice.

** The Business Problem **

Because they supply power only occasionally, the power supplied by a peaker power plant commands a much higher price per kilowatt hour than power from a power grid's base power plants. A peaker plant may operate many hours a day, or it may operate only a few hours per year, depending on the condition of the region's electrical grid. Because of the cost of building an efficient power plant, if a peaker plant is only going to be run for a short or highly variable time it does not make economic sense to make it as efficient as a base load power plant. In addition, the equipment and fuels used in base load plants are often unsuitable for use in peaker plants because the fluctuating conditions would severely strain the equipment.

The power output of a peaker power plant varies depending on environmental conditions, so the business problem is _predicting the power output of a peaker power plant as a function of the environmental conditions_ -- since this would enable the grid operator to make economic tradeoffs about the number of peaker plants to turn on (or whether to buy expensive power from another grid).

Given this business problem, we need to first perform Exploratory Data Analysis to understand the data and then translate the business problem (predicting power output as a function of envionmental conditions) into a Machine Learning task.  In this instance, the ML task is regression since the label (or target) we are trying to predict is numeric. We will use an [Apache Spark ML Pipeline](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark-ml-package) to perform the regression.

The real-world data we are using in this notebook consists of 9,568 data points, each with 4 environmental attributes collected from a Combined Cycle Power Plant over 6 years (2006-2011), and is provided by the University of California, Irvine at [UCI Machine Learning Repository Combined Cycle Power Plant Data Set](https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant). You can find more details about the dataset on the UCI page, including the following background publications:
* Pinar Tüfekci, [Prediction of full load electrical power output of a base load operated combined cycle power plant using machine learning methods](http://www.journals.elsevier.com/international-journal-of-electrical-power-and-energy-systems/), International Journal of Electrical Power & Energy Systems, Volume 60, September 2014, Pages 126-140, ISSN 0142-0615.
* Heysem Kaya, Pinar Tüfekci and Fikret S. Gürgen: [Local and Global Learning Methods for Predicting Power of a Combined Gas & Steam Turbine](http://www.cmpe.boun.edu.tr/~kaya/kaya2012gasturbine.pdf), Proceedings of the International Conference on Emerging Trends in Computer and Electronics Engineering ICETCEE 2012, pp. 13-18 (Mar. 2012, Dubai).

**To Do**: Read the documentation and examples for [Spark Machine Learning Pipeline](https://spark.apache.org/docs/1.6.2/ml-guide.html#main-concepts-in-pipelines).

## Part 1: Business Understanding
The first step in any machine learning task is to understand the business need.

As described in the overview we are trying to predict power output given a set of readings from various sensors in a gas-fired power generation plant.

The problem is a regression problem since the label (or target) we are trying to predict is numeric.

## Part 2: Extract-Transform-Load (ETL) Your Data

Now that we understand what we are trying to do, the first step is to load our data into a format we can query and use.  This is known as ETL or "Extract-Transform-Load".  We will load our file from Amazon S3.

Note: Alternatively we could upload our data using "Databricks Menu > Tables > Create Table", assuming we had the raw files on our local computer.

Our data is available on Amazon s3 at the following path:

```
dbfs:/databricks-datasets/power-plant/data
```

**To Do:** Let's start by printing a sample of the data.

We'll use the built-in Databricks functions for exploring the Databricks filesystem (DBFS)

Use `display(dbutils.fs.ls("/databricks-datasets/power-plant/data"))` to list the files in the directory

In [10]:
# Uncomment the following code to run it in databricks

display(dbutils.fs.ls("/databricks-datasets/power-plant/data"))

Next, use the `dbutils.fs.head` command to look at the first 65,536 bytes of the first file in the directory.

Use `print dbutils.fs.head("/databricks-datasets/power-plant/data/Sheet1.tsv")` to list the files in the directory

In [12]:
print (dbutils.fs.head("/databricks-datasets/power-plant/data/Sheet1.tsv"))

`dbutils.fs` has its own help facility, which we can use to see the various available functions.

In [14]:
dbutils.fs.help()

### Exercise 2(a)

Now, let's use PySpark instead to print the first 5 lines of the data.

*Hint*: First create an RDD from the data by using [`sc.textFile("dbfs:/databricks-datasets/power-plant/data")`](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.SparkContext.textFile) to read the data into an RDD.

*Hint*: Then figure out how to use the RDD [`take()`](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD.take) method to extract the first 5 lines of the RDD and print each line.

In [16]:
url = "https://raw.githubusercontent.com/10605/data/master/hw2/Sheet"

from pyspark import SparkFiles
sc.addFile(url+"1.tsv"); sc.addFile(url+"2.tsv"); sc.addFile(url+"3.tsv"); sc.addFile(url+"4.tsv"); sc.addFile(url+"5.tsv")

rawTextRdd = sc.textFile("file://" + SparkFiles.getRootDirectory(), 8)

# Print the first five lines
print(rawTextRdd.take(5))

From our initial exploration of a sample of the data, we can make several observations for the ETL process:
  - The data is a set of .tsv (Tab Seperated Values) files (i.e., each row of the data is separated using tabs)
  - There is a header row, which is the name of the columns
  - It looks like the type of the data in each column is consistent (i.e., each column is of type double)

Our schema definition from UCI appears below:
- AT = Atmospheric Temperature in C
- V = Exhaust Vacuum Speed
- AP = Atmospheric Pressure
- RH = Relative Humidity
- PE = Power Output.  This is the value we are trying to predict given the measurements above.

We are ready to create a DataFrame from the TSV data. Spark does not have a native method for performing this operation, however we can use [spark-csv](https://spark-packages.org/package/databricks/spark-csv), a third-party package from [SparkPackages](https://spark-packages.org/). The documentation and source code for [spark-csv](https://spark-packages.org/package/databricks/spark-csv) can be found on [GitHub](https://github.com/databricks/spark-csv). The Python API can be found [here](https://github.com/databricks/spark-csv#python-api).

(**Note**: In Spark 2.0, the CSV package is built into the DataFrame API.)

To use the [spark-csv](https://spark-packages.org/package/databricks/spark-csv) package, we use the [sqlContext.read.format()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.format) method to specify the input data source format: `'com.databricks.spark.csv'`

We can provide the [spark-csv](https://spark-packages.org/package/databricks/spark-csv) package with options using the [options()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.options) method. The available options are listed in the GitHub documentation [here](https://github.com/databricks/spark-csv#features).

We will use the following three options:
- `delimiter='\t'` because our data is tab delimited
- `header='true'` because our data has a header row
- `inferschema='true'` because we believe that all of the data is double values, so the package can dynamically infer the type of each column. *Note that this will require two pass over the data.*


The last component of creating the DataFrame is to specify the location of the data source using the [load()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.load) method: `"/databricks-datasets/power-plant/data"`

Putting everything together, we will use an operation of the following form:

  `sqlContext.read.format().options().load()`

### Exercise 2(b)

**To Do:** Create a DataFrame from the data.

*Hint:* Use the above template and fill in each of the methods.

In [18]:
powerPlantDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='true', inferschema='true').load("file://" + SparkFiles.getRootDirectory())

In [19]:
# TEST
from nose.tools import assert_equal, assert_true
expected = set([(s, 'double') for s in ('AP', 'AT', 'PE', 'RH', 'V')])
assert_equal(expected, set(powerPlantDF.dtypes), "Incorrect schema for powerPlantDF")

Check the names and types of the columns using the [dtypes](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dtypes) method.

In [21]:
print (powerPlantDF.dtypes)

We can examine the data using the display() method.

In [23]:
display(powerPlantDF)

### Exercise 2(c)

Instead of having [spark-csv](https://spark-packages.org/package/databricks/spark-csv) infer the types of the columns, we can specify the schema as a [DataType](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.DataType), which is a list of [StructField](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType).
You can find a list of types in the [pyspark.sql.types](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#module-pyspark.sql.types) module. For our data, we will use [DoubleType()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.DoubleType).
For example, to specify that a column's name and type, we use: `StructField(`_name_`,` _type_`, True)`. (The third parameter, `True`, signifies that the column is nullable.)

Create a custom schema for the power plant data.

In [25]:
from pyspark.sql.types import *

# Custom Schema for Power Plant
customSchema = StructType([ \
    StructField("AT", DoubleType()), \
    StructField("V", DoubleType()), \
    StructField("AP", DoubleType()), \
    StructField("RH", DoubleType()), \
    StructField("PE", DoubleType()), \
                          ])


In [26]:
# TEST
assert_equal(set([f.name for f in customSchema.fields]), set(['AT', 'V', 'AP', 'RH', 'PE']), 'Incorrect column names in schema.')
assert_equal(set([f.dataType for f in customSchema.fields]), set([DoubleType(), DoubleType(), DoubleType(), DoubleType(), DoubleType()]), 'Incorrect column types in schema.')

### Exercise 2(d)

Now, let's use the schema to read the data. To do this, we will modify the earlier `sqlContext.read.format` step. We can specify the schema by:
- Adding `schema = customSchema` to the load method (use a comma and add it after the file name)
- Removing the `inferschema='true'`option because we are explicitly specifying the schema

In [28]:
# TODO: Uncomment the line and use the schema you created above to load the data again.
altPowerPlantDF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='true').load("file://" + SparkFiles.getRootDirectory(), schema=customSchema)

In [29]:
# TEST
expected = set([(s, 'double') for s in ('AP', 'AT', 'PE', 'RH', 'V')])
assert_equal(expected, set(altPowerPlantDF.dtypes), "Incorrect schema for powerPlantDF")

Note that no Spark jobs are launched this time. That is because we specified the schema, so the [spark-csv](https://spark-packages.org/package/databricks/spark-csv) package does not have to read the data to infer the schema. We can use the [dtypes](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dtypes) method to examine the names and types of the columns. They should be identical to the names and types of the columns that were earlier inferred from the data.

When you run the following cell, data would not be read.

In [31]:
print (altPowerPlantDF.dtypes)

Now we can examine the data using the display() method. *Note that this operation will cause the data to be read and the DataFrame will be created.*

In [33]:
display(altPowerPlantDF)

## Part 3: Explore Your Data
Now that your data is loaded, the next step is to explore it and perform some basic analysis and visualizations.

This is a step that you should always perform **before** trying to fit a model to the data, as this step will often lead to important insights about your data.

First, let's register our DataFrame as an SQL table named `power_plant`.  Because you may run this lab multiple times, we'll take the precaution of removing any existing tables first.

We can delete any existing `power_plant` SQL table using the SQL command: `DROP TABLE IF EXISTS power_plant` (we also need to to delete any Hive data associated with the table, which we can do with a Databricks file system operation).

Once any prior table is removed, we can register our DataFrame as a SQL table using [sqlContext.registerDataFrameAsTable()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerDataFrameAsTable).

### 3(a)

**ToDo:** Execute the prepared code in the following cell.

In [36]:
# uncomment to delete hive data in databricks

sqlContext.sql("DROP TABLE IF EXISTS power_plant")
sqlContext.registerDataFrameAsTable(powerPlantDF, "power_plant")

# Now that our DataFrame exists as a SQL table, we can explore it using SQL commands.

To execute SQL in a cell, we use the `%sql` operator. The following cell is an example of using SQL to query the rows of the SQL table.

**IMPORTANT**: uncomment the code in this part to run in databricks, COMMENT OUT ALL THE UNNECESSARY CODE IN THIS PART to submit to gradescope.

**NOTE**: `%sql` is a Databricks-only command. It calls `sqlContext.sql()` and passes the results to the Databricks-only `display()` function. These two statements are equivalent:

`%sql SELECT * FROM power_plant`

`display(sqlContext.sql("SELECT * FROM power_plant"))`

### 3(b)

**ToDo**: Execute the prepared code in the following cell.

In [38]:
%sql
SELECT * FROM power_plant

### 3(c)

Use the SQL `desc` command to describe the schema, by executing the following cell.

In [40]:
%sql desc power_plant

**Schema Definition**

Once again, here's our schema definition:

- AT = Atmospheric Temperature in C
- V = Exhaust Vacuum Speed
- AP = Atmospheric Pressure
- RH = Relative Humidity
- PE = Power Output

PE is our label or target. This is the value we are trying to predict given the measurements.

*Reference [UCI Machine Learning Repository Combined Cycle Power Plant Data Set](https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant)*

Let's perform some basic statistical analyses of all the columns.

We can get the DataFrame associated with a SQL table by using the [sqlContext.table()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.table) method and passing in the name of the SQL table. Then, we can use the DataFrame [describe()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) method with no arguments to compute some basic statistics for each column like count, mean, max, min and standard deviation.

In [43]:
# uncomment to run in databricks
# comment to submit to gradescope

# df = sqlContext.table("power_plant")
# display(df.describe())

##Part 4: Visualize Your Data

To understand our data, we will look for correlations between features and the label.  This can be important when choosing a model.  E.g., if features and a label are linearly correlated, a linear model like Linear Regression can do well; if the relationship is very non-linear, more complex models such as Decision Trees can be better. We can use Databrick's built in visualization to view each of our predictors in relation to the label column as a scatter plot to see the correlation between the predictors and the label.

### Exercise 4(a)

** Add figures to the following: **
Let's see if there is a corellation between Temperature and Power Output. We can use a dataframe consisting of only the Temperature (AT) and Power (PE) columns, and then use a scatter plot with Temperature on the X axis and Power on the Y axis to visualize the relationship (if any) between Temperature and Power.


Perform the following steps:

- Run the following cell
- Click on the drop down next to the "Bar chart" icon and select "Scatter" to turn the table into a Scatter plot

<img src="http://spark-mooc.github.io/web-assets/images/cs110x/change-plot-type-scatter.png" style="border: 1px solid #999999"/>

- Click on "Plot Options..."
- In the Values box, click on "Temperature" and drag it before "Power", as shown below:

<img src="http://spark-mooc.github.io/web-assets/images/cs110x/customize-plot-scatter.png" style="border: 1px solid #999999"/>

- Apply your changes by clicking the Apply button
- Increase the size of the graph by clicking and dragging the size control

In [45]:
from pyspark.sql.functions import col
display(powerPlantDF.select(powerPlantDF.AT.alias('Power'),powerPlantDF.PE.alias('Temperature')))

It looks like there is strong linear correlation between Temperature and Power Output.

** ASIDE: A quick physics lesson**: This correlation is to be expected as the second law of thermodynamics puts a fundamental limit on the [thermal efficiency](https://en.wikipedia.org/wiki/Thermal_efficiency) of all heat-based engines. The limiting factors are:
 - The temperature at which the heat enters the engine \\( T_{H} \\)
 - The temperature of the environment into which the engine exhausts its waste heat \\( T_C \\)

Our temperature measurements are the temperature of the environment. From [Carnot's theorem](https://en.wikipedia.org/wiki/Carnot%27s_theorem_%28thermodynamics%29), no heat engine working between these two temperatures can exceed the Carnot Cycle efficiency:
\\[ n_{th} \le 1 - \frac{T_C}{T_H}  \\]

Note that as the environmental temperature increases, the efficiency decreases -- _this is the effect that we see in the above graph._

### Exercise 4(b)

Use DataFrame to create a scatter plot of Power(PE) as a function of ExhaustVacuum (V).
Name the y-axis "Power" and the x-axis "ExhaustVacuum"

In [48]:
display(powerPlantDF.select(powerPlantDF.PE.alias('Power'),powerPlantDF.V.alias('ExhaustVacuum')))

Let's continue exploring the relationships (if any) between the variables and Power Output.

### Exercise 4(c)

Use DataFrame to create a scatter plot of Power(PE) as a function of Pressure (AP).
Name the y-axis "Power" and the x-axis "Pressure"

In [50]:
display(powerPlantDF.select(powerPlantDF.PE.alias('Power'),powerPlantDF.AP.alias('Pressure')))

### Exercise 4(d)

Use DataFrame to create a scatter plot of Power(PE) as a function of Humidity (RH).
Name the y-axis "Power" and the x-axis "Humidity"

In [52]:
display(powerPlantDF.select(powerPlantDF.PE.alias('Power'),powerPlantDF.RH.alias('Humidity')))

##Part 5: Data Preparation

The next step is to prepare the data for machine learning. Since all of this data is numeric and consistent this is a simple and straightforward task.

The goal is to use machine learning to determine a function that yields the output power as a function of a set of predictor features. The first step in building our ML pipeline is to convert the predictor features from DataFrame columns to Feature Vectors using the [pyspark.ml.feature.VectorAssembler()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler) method.

The VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler takes a list of input column names (each is a string) and the name of the output column (as a string).

### Exercise 5(a)

- Read the Spark documentation and useage examples for [VectorAssembler](https://spark.apache.org/docs/1.6.2/ml-features.html#vectorassembler)
- Convert the `power_plant` SQL table into a DataFrame named `dataset`
- Set the vectorizer's input columns to a list of the four columns of the input DataFrame: `["AT", "V", "AP", "RH"]`
- Set the vectorizer's output column name to `"features"`

In [54]:
from pyspark.ml.feature import VectorAssembler

datasetDF = sqlContext.sql("SELECT * FROM power_plant")

vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT","V","AP","RH"])
vectorizer.setOutputCol("features")

In [55]:
# TEST
assert_equal(set(vectorizer.getInputCols()), {"AT", "V", "AP", "RH"}, "Incorrect vectorizer input columns")
assert_equal(vectorizer.getOutputCol(), "features", "Incorrect vectorizer output column")

##Part 6: Data Modeling
Now let's model our data to predict what the power output will be given a set of sensor readings

Our first model will be based on simple linear regression since we saw some linear patterns in our data based on the scatter plots during the exploration stage.

We need a way of evaluating how well our linear regression model predicts power output as a function of input parameters. We can do this by splitting up our initial data set into a _Training Set_ used to train our model and a _Test Set_ used to evaluate the model's performance in giving predictions. We can use a DataFrame's [randomSplit()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) method to split our dataset. The method takes a list of weights and an optional random seed. The seed is used to initialize the random number generator used by the splitting function. 
However, in this part we just slice the dataframe by index and we will use randomSplit() in later parts.

### Exercise 6(a)

Use [monotonically_increasing_id()](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id) method to create index for `datasetDF` and divide up it into a trainingSetDF (80% of the input DataFrame) and a testSetDF (20% of the input DataFrame). Then cache each DataFrame in memory to maximize performance.

In [57]:
# We'll hold out 20% of our data for testing and leave 80% for training
import pyspark.sql.functions as f
indexDF = datasetDF.withColumn('index', f.monotonically_increasing_id())
split100 = indexDF.count() #indexDF.randomSplit([1.0], 123456)
split20 = int(split100*0.2) #indexDF.randomSplit([0.2], 123456)
split80 = int(split100*0.8)


split20DF = indexDF.sort(['index']).limit(split20).drop('index')
split80DF = indexDF.sort(['index'], ascending=False).limit(split80).drop('index')

# Let's cache these datasets for performance
testSetDF = split20DF.cache()
trainingSetDF = split80DF.cache()

In [58]:
# TEST
assert_equal(trainingSetDF.count(), 38272, "Incorrect size for training data set")
assert_equal(testSetDF.count(), 9568, "Incorrect size for test data set")

Next we'll create a Linear Regression Model and use the built in help to identify how to train it. See API details for [Linear Regression](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression) in the ML guide.

### Exercise 6(b)

- Read the documentation and examples for [Linear Regression](https://spark.apache.org/docs/1.6.2/ml-classification-regression.html#linear-regression)
- Run the next cell

In [60]:
# ***** LINEAR REGRESSION MODEL ****

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import LinearRegressionModel
from pyspark.ml import Pipeline

# Let's initialize our linear regression learner
lr = LinearRegression()

# We use explain params to dump the parameters we can use
print(lr.explainParams())

The cell below is based on the [Spark ML Pipeline API for Linear Regression](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression).

The first step is to set the parameters for the method:
- Set the name of the prediction column to "Predicted_PE"
- Set the name of the label column to "PE"
- Set the maximum number of iterations to 100
- Set the regularization parameter to 0.1

Next, we create the [ML Pipeline](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.Pipeline) and set the stages to the Vectorizer and Linear Regression learner we created earlier.

Finally, we create a model by training on `trainingSetDF`.

### Exercise 6(c)

- Read the [Linear Regression](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression) documentation
- Run the next cell, and be sure you understand what's going on.

In [62]:
# Now we set the parameters for the method
lr.setPredictionCol("Predicted_PE")\
  .setLabelCol("PE")\
  .setMaxIter(100)\
  .setRegParam(0.1)


# We will use the new spark.ml pipeline API. If you have worked with scikit-learn this will be very familiar.
lrPipeline = Pipeline()

lrPipeline.setStages([vectorizer, lr])

# Let's first train on the entire dataset to see what we get
lrModel = lrPipeline.fit(trainingSetDF)

From the Wikipedia article on [Linear Regression](https://en.wikipedia.org/wiki/Linear_regression):
> In statistics, linear regression is an approach for modeling the relationship between a scalar dependent variable \\( y \\) and one or more explanatory variables (or independent variables) denoted \\(X\\). In linear regression, the relationships are modeled using linear predictor functions whose unknown model parameters are estimated from the data. Such models are called linear models.

Linear regression has many practical uses. Most applications fall into one of the following two broad categories:
  - If the goal is prediction, or forecasting, or error reduction, linear regression can be used to fit a predictive model to an observed data set of \\(y\\) and \\(X\\) values. After developing such a model, if an additional value of \\(X\\) is then given without its accompanying value of \\(y\\), the fitted model can be used to make a prediction of the value of \\(y\\).
  - Given a variable \\(y\\) and a number of variables \\( X_1 \\), ..., \\( X_p \\) that may be related to \\(y\\), linear regression analysis can be applied to quantify the strength of the relationship between \\(y\\) and the \\( X_j\\), to assess which \\( X_j \\) may have no relationship with \\(y\\) at all, and to identify which subsets of the \\( X_j \\) contain redundant information about \\(y\\).

We are interested in both uses, as we would like to predict power output as a function of the input variables, and we would like to know which input variables are weakly or strongly correlated with power output.

Since Linear Regression is simply a Line of best fit over the data that minimizes the square of the error, given multiple input dimensions we can express each predictor as a line function of the form:

\\[ y = a + b x_1 + b x_2 + b x_i ... \\]

where \\(a\\) is the intercept and the \\(b\\) are the coefficients.

To express the coefficients of that line we can retrieve the Estimator stage from the PipelineModel and express the weights and the intercept for the function.

### Exercise 6(d)

Run the next cell. Ensure that you understand what's going on.

In [64]:
# The intercept is as follows:
intercept = lrModel.stages[1].intercept

# The coefficents (i.e., weights) are as follows:
weights = lrModel.stages[1].coefficients

# Create a list of the column names (without PE)
featuresNoLabel = [col for col in datasetDF.columns if col != "PE"]

# Merge the weights and labels
# Sort the coefficients from greatest absolute weight most to the least absolute weight
coefficents = sorted(zip(weights, featuresNoLabel), key=lambda tup: abs(tup[0]), reverse=True)

equation = "y = {intercept}".format(intercept=intercept)
variables = []
for x in coefficents:
    weight = abs(x[0])
    name = x[1]
    symbol = "+" if (x[0] > 0) else "-"
    equation += (" {} ({} * {})".format(symbol, weight, name))

# Finally here is our equation
print("Linear Regression Equation: " + equation)

Recall **Part 4: Visualize Your Data** when we visualized each predictor against Power Output using a Scatter Plot, does the final equation seems logical given those visualizations?

### Exercise 6(e)

Now let's see what our predictions look like given this model. We apply our Linear Regression model to the 20% of the data that we split from the input dataset. The output of the model will be a predicted Power Output column named "Predicted_PE".

- Run the next cell
- Scroll through the resulting table and notice how the values in the Power Output (PE) column compare to the corresponding values in the predicted Power Output (Predicted_PE) column

In [66]:
# Apply our LR model to the test data and predict power output
predictionsAndLabelsDF = lrModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")

display(predictionsAndLabelsDF)

From a visual inspection of the predictions, we can see that they are close to the actual values.

However, we would like a scientific measure of how well the Linear Regression model is performing in accurately predicting values. To perform this measurement, we can use an evaluation metric such as [Root Mean Squared Error](https://en.wikipedia.org/wiki/Root-mean-square_deviation) (RMSE) to validate our Linear Regression model.

RSME is defined as follows: \\( RMSE = \sqrt{\frac{\sum_{i = 1}^{n} (x_i - y_i)^2}{n}}\\) where \\(y_i\\) is the observed value and \\(x_i\\) is the predicted value

RMSE is a frequently used measure of the differences between values predicted by a model or an estimator and the values actually observed. The lower the RMSE, the better our model.

Spark ML Pipeline provides several regression analysis metrics, including [RegressionEvaluator()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator).

After we create an instance of [RegressionEvaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator), we set the label column name to "PE" and set the prediction column name to "Predicted_PE". We then invoke the evaluator on the predictions.

### Exercise 6(f)

Run the next cell and ensure that you understand what's going on.

In [68]:
# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")

# Run the evaluator on the DataFrame
rmse = regEval.evaluate(predictionsAndLabelsDF)

print("Root Mean Squared Error: %.2f" % rmse)

Another useful statistical evaluation metric is the coefficient of determination, denoted \\(R^2\\) or \\(r^2\\) and pronounced "R squared". It is a number that indicates the proportion of the variance in the dependent variable that is predictable from the independent variable and it provides a measure of how well observed outcomes are replicated by the model, based on the proportion of total variation of outcomes explained by the model. The coefficient of determination ranges from 0 to 1 (closer to 1), and the higher the value, the better our model.

To compute \\(r^2\\), we invoke the evaluator with  `regEval.metricName: "r2"`

### Exercise 6(g)

Run the next cell and ensure that you understand what's going on.

In [70]:
# Now let's compute another evaluation metric for our test dataset
r2 = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("r2: {0:.2f}".format(r2))

Generally, assuming a Gaussian distribution of errors, a good model will have 68% of predictions within 1 RMSE and 95% within 2 RMSE of the actual value (see http://statweb.stanford.edu/~susan/courses/s60/split/node60.html).

Let's examine the predictions and see if the RMSE meets this criteria.

We create a new DataFrame using [selectExpr()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.selectExpr) to project a set of SQL expressions, and register the DataFrame as a SQL table using [registerTempTable()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable).

### Exercise 6(h)

Run the next cell and ensure that you understand what's going on.

In [72]:
# First we remove the table if it already exists
sqlContext.sql("DROP TABLE IF EXISTS Power_Plant_RMSE_Evaluation")

# Next we calculate the residual error and divide it by the RMSE
predictionsAndLabelsDF.selectExpr("PE", "Predicted_PE", "PE - Predicted_PE Residual_Error", "(PE - Predicted_PE) / {} Within_RSME".format(rmse)).registerTempTable("Power_Plant_RMSE_Evaluation")

We can use SQL to explore the `Power_Plant_RMSE_Evaluation` table. First let's look at at the table using a SQL SELECT statement.

### Exercise 6(i)

Run the next cell and ensure that you understand what's going on.

In [74]:
# %sql SELECT * from Power_Plant_RMSE_Evaluation

Now we can display the RMSE as a Histogram.

### Exercise 6(j)

Perform the following steps:

- Run the following cell
- Click on the drop down next to the "Bar chart" icon a select "Histogram" to turn the table into a Histogram plot

<img src="http://spark-mooc.github.io/web-assets/images/cs110x/change-plot-type-histogram.png" style="border: 1px solid #999999"/>


- Click on "Plot Options..."
- In the "All Fields:" box, click on "&lt;id&gt;" and drag it into the "Keys:" box
- Change the "Aggregation" to "COUNT"

<img src="http://spark-mooc.github.io/web-assets/images/cs110x/customize-plot-histogram.png" style="border: 1px solid #999999"/>

- Apply your changes by clicking the Apply button
- Increase the size of the graph by clicking and dragging the size control

Notice that the histogram clearly shows that the RMSE is centered around 0 with the vast majority of the error within 2 RMSEs.

**IMPORTANT**: UNCOMMENT TO RUN IN DATABRICKS, COMMENT OUT SQL TO SUBMIT TO GRADESCOPE

In [76]:
# %sql -- Now we can display the RMSE as a Histogram
# SELECT Within_RSME  from Power_Plant_RMSE_Evaluation

Using a complex SQL SELECT statement, we can count the number of predictions within + or - 1.0 and + or - 2.0 and then display the results as a pie chart.

### Exercise 6(k)

Perform the following steps:

  - Run the following cell
  - Click on the drop down next to the "Bar chart" icon a select "Pie" to turn the table into a Pie Chart plot
  - Increase the size of the graph by clicking and dragging the size control

In [78]:
# %sql SELECT case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1
#             when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3
#        end RSME_Multiple, COUNT(*) AS count
# FROM Power_Plant_RMSE_Evaluation
# GROUP BY case when Within_RSME <= 1.0 AND Within_RSME >= -1.0 then 1  when  Within_RSME <= 2.0 AND Within_RSME >= -2.0 then 2 else 3 end

From the pie chart, we can see that 68% of our test data predictions are within 1 RMSE of the actual values, and 97% (68% + 29%) of our test data predictions are within 2 RMSE. So the model is pretty decent. Let's see if we can tune the model to improve it further.

##Part 7: Tuning and Evaluation

Now that we have a model with all of the data let's try to make a better model by tuning over several parameters. The process of tuning a model is known as [Model Selection](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#module-pyspark.ml.tuning) or [Hyperparameter Tuning](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#module-pyspark.ml.tuning), and Spark ML Pipeline makes the tuning process very simple and easy.

An important task in ML is model selection, or using data to find the best model or parameters for a given task. This is also called tuning. Tuning may be done for individual Estimators such as [LinearRegression](https://spark.apache.org/docs/1.6.2/ml-classification-regression.html#linear-regression), or for entire Pipelines which include multiple algorithms, featurization, and other steps. Users can tune an entire Pipeline at once, rather than tuning each element in the Pipeline separately.

Spark ML Pipeline supports model selection using tools such as [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator), which requires the following items:
  - [Estimator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.Estimator): algorithm or Pipeline to tune
  - [Set of ParamMaps](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder): parameters to choose from, sometimes called a _parameter grid_ to search over
  - [Evaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.Evaluator): metric to measure how well a fitted Model does on held-out test data

At a high level, model selection tools such as [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) work as follows:
  - They split the input data into separate training and test datasets.
  - For each (training, test) pair, they iterate through the set of ParamMaps:
    - For each [ParamMap](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder), they fit the [Estimator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.Estimator) using those parameters, get the fitted Model, and evaluate the Model's performance using the [Evaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.Evaluator).
  - They select the Model produced by the best-performing set of parameters.

The [Evaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.Evaluator) can be a [RegressionEvaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator) for regression problems. To help construct the parameter grid, users can use the [ParamGridBuilder](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder) utility.

Note that cross-validation over a grid of parameters is expensive. For example, in the next cell, the parameter grid has 10 values for [lr.regParam](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression.regParam), and [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) uses 3 folds. This multiplies out to (10 x 3) = 30 different models being trained. In realistic settings, it can be common to try many more parameters (e.g., multiple values for multiple parameters) and use more folds (_k_ = 3 and _k_ = 10 are common). In other words, using [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) can be very expensive. However, it is also a well-established method for choosing parameters which is more statistically sound than heuristic hand-tuning.

We perform the following steps:
  - Create a [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) using the Pipeline and [RegressionEvaluator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator) that we created earlier, and set the number of folds to 3
  - Create a list of 10 regularization parameters
  - Use [ParamGridBuilder](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder) to build a parameter grid with the regularization parameters and add the grid to the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)
  - Run the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) to find the parameters that yield the best model (i.e., lowest RMSE) and return the best model.

### Exercise 7(a)

Run the next cell. _Note that it will take some time to run the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) as it will run almost 200 Spark jobs_

In [81]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# We can reuse the RegressionEvaluator, regEval, to judge the model based on the best Root Mean Squared Error
# Let's create our CrossValidator with 3 fold cross validation
crossval = CrossValidator(estimator=lrPipeline, evaluator=regEval, numFolds=3)

# Let's tune over our regularization parameter from 0.01 to 0.10
regParam = [x / 100.0 for x in range(1, 11)]

# We'll create a paramter grid using the ParamGridBuilder, and add the grid to the CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, regParam)
             .build())
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
cvModel = crossval.fit(trainingSetDF).bestModel

Now that we have tuned our Linear Regression model, let's see what the new RMSE and \\(r^2\\) values are versus our intial model.

### Exercise 7(b)

Complete and run the next cell.

In [83]:
# Now let's use cvModel to compute an evaluation metric for our test dataset: testSetDF
# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
# Now let's compute the r2 evaluation metric for our test dataset

predictionsAndLabelsDF = cvModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")
rmseNew = regEval.evaluate(predictionsAndLabelsDF)
r2New = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("Original Root Mean Squared Error: {0:2.2f}".format(rmse))
print("New Root Mean Squared Error: {0:2.2f}".format(rmseNew))
print("Old r2: {0:2.2f}".format(r2))
print("New r2: {0:2.2f}".format(r2New))


In [84]:
# TEST
assert_equal(round(rmse, 2), 4.56, "Incorrect value for rmse")
assert_equal(round(rmseNew, 2), 4.56, "Incorrect value for rmseNew")
assert_equal(round(r2, 2), 0.93, "Incorrect value for r2")
assert_equal(round(r2New, 2), 0.93, "Incorrect value for r2New")

So our initial untuned and tuned linear regression models are identical. Let's look at the regularization parameter that the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) has selected.

Recall that the orginal regularization parameter we used was 0.01.

**NOTE**: The ML Python API currently doesn't provide a way to query the regularization parameter, so we cheat, by "reaching through" to the JVM version of the API.

In [86]:
print("Regularization parameter of the best model: {0:.2f}".format(cvModel.stages[-1]._java_obj.parent().getRegParam()))

Given that the only linearly correlated variable is Temperature, it makes sense try another Machine Learning method such as [Decision Tree](https://en.wikipedia.org/wiki/Decision_tree_learning) to handle non-linear data and see if we can improve our model.

[Decision Tree Learning](https://en.wikipedia.org/wiki/Decision_tree_learning) uses a [Decision Tree](https://en.wikipedia.org/wiki/Decision_tree) as a predictive model which maps observations about an item to conclusions about the item's target value. It is one of the predictive modelling approaches used in statistics, data mining and machine learning. Decision trees where the target variable can take continuous values (typically real numbers) are called regression trees.

Spark ML Pipeline provides [DecisionTreeRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor) as an implementation of [Decision Tree Learning](https://en.wikipedia.org/wiki/Decision_tree_learning).

The cell below is based on the [Spark ML Pipeline API for Decision Tree Regressor](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor).

### Exercise 7(c)

- Read the [Decision Tree Regressor](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor) documentation
- In the next cell, create a [DecisionTreeRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor)

- The next step is to set the parameters for the method (we do this for you):
  - Set the name of the prediction column to "Predicted_PE"
  - Set the name of the features column to "features"
  - Set the maximum number of bins to 100

- Create the [ML Pipeline](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.Pipeline) and set the stages to the Vectorizer we created earlier and [DecisionTreeRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor) learner we just created.

In [88]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor()
dt.setLabelCol("PE")\
  .setPredictionCol("Predicted_PE")\
  .setFeaturesCol("features")\
  .setMaxBins(100)
dtPipeline = Pipeline()
dtPipeline.setStages([vectorizer, dt])

In [89]:
# TEST
assert_equal(dtPipeline.getStages()[0].__class__.__name__, 'VectorAssembler', "Incorrect pipeline stage 0")
assert_equal(dtPipeline.getStages()[1].__class__.__name__, 'DecisionTreeRegressor', "Incorrect pipeline stage 0")

Instead of guessing what parameters to use, we will use [Model Selection](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#module-pyspark.ml.tuning) or [Hyperparameter Tuning](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#module-pyspark.ml.tuning) to create the best model.

We can reuse the exiting [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) by replacing the Estimator with our new `dtPipeline` (the number of folds remains 3).

### Exercise 7(d)

- Use [ParamGridBuilder](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder) to build a parameter grid with the parameter `dt.maxDepth` and a list of the values 2 and 3, and add the grid to the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)
- Run the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) to find the parameters that yield the best model (i.e. lowest RMSE) and return the best model.

_Note that it will take some time to run the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) as it will run almost 50 Spark jobs_

In [91]:
# Let's just reuse our CrossValidator with the new dtPipeline,  RegressionEvaluator regEval, and 3 fold cross validation
crossval.setEstimator(dtPipeline)

# Let's tune over our dt.maxDepth parameter on the values 2 and 3, create a paramter grid using the ParamGridBuilder
# Add the grid to the CrossValidator
# Now let's find and return the best model

paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [2,3]).build()
crossval.setEstimatorParamMaps(paramGrid)
dtModel = crossval.fit(trainingSetDF).bestModel

In [92]:
# TEST
assert_equal(dtModel.stages[0].__class__.__name__, 'VectorAssembler', "Incorrect pipeline stage 0")
assert_equal(dtModel.stages[1].__class__.__name__, 'DecisionTreeRegressionModel', "Incorrect pipeline stage 0")

### Exercise 7(e)

Now let's see how our tuned DecisionTreeRegressor model's RMSE and \\(r^2\\) values compare to our tuned LinearRegression model.

Complete and run the next cell.

In [94]:
# Now let's use dtModel to compute an evaluation metric for our test dataset: testSetDF
# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
# Now let's compute the r2 evaluation metric for our test dataset

predictionsAndLabelsDF = dtModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")
rmseDT = regEval.evaluate(predictionsAndLabelsDF)
r2DT = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("LR Root Mean Squared Error: {0:.2f}".format(rmseNew))
print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT))
print("LR r2: {0:.2f}".format(r2New))
print("DT r2: {0:.2f}".format(r2DT))

In [95]:
# TEST
assert_true(abs(round(rmseDT, 2) - 5.13) < 0.1, "Incorrect value for rmseDT")
assert_equal(round(r2DT, 2), 0.91, "Incorrect value for r2DT")

The line below will pull the Decision Tree model from the Pipeline and display it as an if-then-else string. Again, we have to "reach through" to the JVM API to make this one work.

**ToDo**: Run the next cell

In [97]:
# print (dtModel.stages[-1]._java_obj.toDebugString())

So our DecisionTree has slightly worse RMSE than our LinearRegression model (LR: 4.56 vs DT: 5.13). Maybe we can try an [Ensemble Learning](https://en.wikipedia.org/wiki/Ensemble_learning) method such as [Gradient-Boosted Decision Trees](https://en.wikipedia.org/wiki/Gradient_boosting) to see if we can strengthen our model by using an ensemble of weaker trees with weighting to reduce the error in our model.

[Random forests](https://en.wikipedia.org/wiki/Random_forest) or random decision tree forests are an ensemble learning method for regression that operate by constructing a multitude of decision trees at training time and outputting the class that is the mean prediction (regression) of the individual trees. Random decision forests correct for decision trees' habit of overfitting to their training set.

Spark ML Pipeline provides [RandomForestRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor) as an implementation of [Random forests](https://en.wikipedia.org/wiki/Random_forest).

The cell below is based on the [Spark ML Pipeline API for Random Forest Regressor](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor).

### Exercise 7(f)

- Read the [Random Forest Regressor](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor) documentation
- In the next cell, create a [RandomForestRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor)
- The next step is to set the parameters for the method (we do this for you):
  - Set the name of the prediction column to "Predicted_PE"
  - Set the name of the features column to "features"
  - Set the random number generator seed to 100088121L
  - Set the maximum depth to 8
  - Set the number of trees to 30
- Create the [ML Pipeline](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.Pipeline) and set the stages to the Vectorizer we created earlier and [RandomForestRegressor()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor) learner we just created.

In [99]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor()
rf.setLabelCol("PE")\
  .setPredictionCol("Predicted_PE")\
  .setFeaturesCol("features")\
  .setSeed(100088121)\
  .setMaxDepth(8)\
  .setNumTrees(30)
rfPipeline = Pipeline()
rfPipeline.setStages([vectorizer, rf])

In [100]:
# TEST
assert_equal(rfPipeline.getStages()[0].__class__.__name__, 'VectorAssembler', "Stage 0 of pipeline is not correct")
assert_equal(rfPipeline.getStages()[1].__class__.__name__, 'RandomForestRegressor', "Stage 1 of pipeline is not correct")

As with Decision Trees, instead guessing what parameters to use, we will use [Model Selection](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#module-pyspark.ml.tuning) or [Hyperparameter Tuning](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#module-pyspark.ml.tuning) to create the best model.

We can reuse the existing [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) by replacing the Estimator with our new `rfPipeline` (the number of folds remains 3).

### Exercise 7(g)

- Use [ParamGridBuilder](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder) to build a parameter grid with the parameter `rf.maxBins` and a list of the values 50 and 100, and add the grid to the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)
- Run the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) to find the parameters that yield the best model (i.e., lowest RMSE) and return the best model.

_Note that it will take some time to run the [CrossValidator](https://spark.apache.org/docs/1.6.2/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) as it will run almost 100 Spark jobs, and each job takes longer to run than the prior CrossValidator runs._

In [102]:
# Let's just reuse our CrossValidator with the new rfPipeline,  RegressionEvaluator regEval, and 3 fold cross validation
crossval.setEstimator(rfPipeline)

# Let's tune over our rf.maxBins parameter on the values 50 and 100, create a parameter grid using the ParamGridBuilder
# Add the grid to the CrossValidator
# Now let's find and return the best model

paramGrid = ParamGridBuilder().addGrid(rf.maxBins, [50, 100]).build()
crossval.setEstimatorParamMaps(paramGrid)
rfModel = crossval.fit(trainingSetDF).bestModel

In [103]:
# TEST
assert_equal(rfModel.stages[0].__class__.__name__, 'VectorAssembler', 'rfModel has incorrect stage 0')
assert_equal(rfModel.stages[1].__class__.__name__, 'RandomForestRegressionModel', 'rfModel has incorrect stage 1')

### Exercise 7(h)

Now let's see how our tuned RandomForestRegressor model's RMSE and \\(r^2\\) values compare to our tuned LinearRegression and tuned DecisionTreeRegressor models.

Complete and run the next cell.

In [105]:
# Now let's use rfModel to compute an evaluation metric for our test dataset: testSetDF
# Run the previously created RMSE evaluator, regEval, on the predictionsAndLabelsDF DataFrame
# Now let's compute the r2 evaluation metric for our test dataset

predictionsAndLabelsDF = rfModel.transform(testSetDF).select("AT", "V", "AP", "RH", "PE", "Predicted_PE")
rmseRF = regEval.evaluate(predictionsAndLabelsDF)
r2RF = regEval.evaluate(predictionsAndLabelsDF, {regEval.metricName: "r2"})

print("LR Root Mean Squared Error: {0:.2f}".format(rmseNew))
print("DT Root Mean Squared Error: {0:.2f}".format(rmseDT))
print("RF Root Mean Squared Error: {0:.2f}".format(rmseRF))
print("LR r2: {0:.2f}".format(r2New))
print("DT r2: {0:.2f}".format(r2DT))
print("RF r2: {0:.2f}".format(r2RF))

In [106]:
# TEST
assert_true(abs(round(rmseRF, 2) - 3.35) < 0.05, "Incorrect value for rmseDT")
assert_equal(round(r2RF, 2), 0.96, "Incorrect value for r2RF")

Note that the `r2` values are similar for all three. However, the RMSE for the Random Forest model is better.

The line below will pull the Random Forest model from the Pipeline and display it as an if-then-else string.

**ToDo**: Run the next cell

In [109]:
print (rfModel.stages[-1]._java_obj.toDebugString())

### Conclusion

Wow! So our best model is in fact our Random Forest tree model which uses an ensemble of 30 Trees with a depth of 8 to construct a better model than the single decision tree.

# II. Linear Regression

This section covers a common supervised learning pipeline, using a subset of the [Million Song Dataset](http://labrosa.ee.columbia.edu/millionsong/) from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/YearPredictionMSD). Our goal is to train a linear regression model to predict the release year of a song given a set of audio features.

## This section will cover:
*  *Part 1:* Read and parse the initial dataset
  * *Visualization 1:* Features
  * *Visualization 2:* Shifting labels

*  *Part 2:* Create and evaluate a baseline model
  * *Visualization 3:* Predicted vs. actual

*  *Part 3:* Train (via gradient descent) and evaluate a linear regression model
  * *Visualization 4:* Training error

*  *Part 4:* Train using SparkML and tune hyperparameters via grid search
  * *Visualization 5:* Best model's predictions
  * *Visualization 6:* Hyperparameter heat map

*  *Part 5:* Add interactions between features

> Note that, for reference, you can look up the details of:
> * the relevant Spark methods in [Spark's RDD Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) and [Spark's DataFrame Python API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
> * the relevant NumPy methods in the [NumPy Reference](http://docs.scipy.org/doc/numpy/reference/index.html)

## Part 1: Read and parse the initial dataset

### (1a) Load and check the data

The raw data is currently stored in text file.  We will start by storing this raw data in as a DataFrame, with each element of the DataFrame representing a data point as a comma-delimited string. Each string starts with the label (a year) followed by numerical audio features. Use the DataFrame [count method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.count) to check how many data points we have.  Then use the [take method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.take) to create and print out a list of the first 5 data points in their initial string format.

In [115]:
from pyspark import SparkFiles
url = 'https://raw.githubusercontent.com/10605/data/master/hw2/millionsong.txt'
sc.addFile(url)
raw_data_df = sqlContext.read.load("file://" + SparkFiles.get("millionsong.txt"), 'text')

In [116]:
num_points = raw_data_df.count()
print (num_points)
sample_points = raw_data_df.take(5)
print (sample_points)

In [117]:
# TEST Load and check the data (1a)
# from nose.tools import assert_equal, assert_true
assert_equal(num_points, 6724, 'incorrect value for num_points')
assert_equal(len(sample_points), 5, 'incorrect length for sample_points')

### (1b) Using `LabeledPoint`

In MLlib, labeled training instances are stored using the [LabeledPoint](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.regression.LabeledPoint) object.  Write the `parse_points` function that takes, as input, a DataFrame of comma-separated strings. We'll pass it the `raw_data_df` DataFrame.

It should parse each row in the DataFrame into individual elements, using Spark's [select](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select) and [split](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.split) methods.

For example, split `"2001.0,0.884,0.610,0.600,0.474,0.247,0.357,0.344,0.33,0.600,0.425,0.60,0.419"` into `['2001.0', '0.884', '0.610', '0.600', '0.474', '0.247', '0.357', '0.344', '0.33', '0.600', '0.425', '0.60', '0.419']`.

The first value in the resulting list (`2001.0` in the example, above) is the label. The remaining values (`0.884`, `0.610`, etc., in the example) are the features.

After splitting each row, map it to a `LabeledPoint`. You'll have to step down to an RDD (using `.rdd`) or use a DataFrame user-defined function to convert to the `LabeledPoint` object. (See **Hint**, below.) If you step down to an RDD, you'll have to use [toDF()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF) to convert back to a DataFrame.

Use this new `parse_points` function to parse `raw_data_df`.  Then print out the features and label for the first training point, using the `features` and `label` attributes. Finally, calculate the number of features for this dataset.

## Hint: Running Arbitrary Lambdas on a DataFrame

To solve this problem, you need a way to run your `parse_points` function on a DataFrame. There are two ways to do this, which we will illustrate with an extremely simple example.

Suppose you have a DataFrame consisting of a first name and a last name, and you want to add a unique [SHA-256](https://en.wikipedia.org/wiki/Secure_Hash_Algorithm) hash to each row.

```
df = sqlContext.createDataFrame([("John", "Smith"), ("Ravi", "Singh"), ("Julia", "Jones")], ("first_name", "last_name"))
```

Here's a simple function to calculate such a hash, using Python's built-in `hashlib` library:

```
def make_hash(first_name, last_name):
    import hashlib
    m = hashlib.sha256()
    # Join the first name and last name by a blank and hash the resulting
    # string.
    full_name = ' '.join((first_name, last_name))
    m.update(full_name)
    return m.hexdigest()
```

Okay, that's great. But, how do we use it on our DataFrame? We can use a UDF:

```
from pyspark.sql.functions import udf
u_make_hash = udf(make_hash)
df2 = df.select(df['*'], u_make_hash(df['first_name'], df['last_name']))
# could run df2.show() here to prove it works
```

Or we can step down to an RDD, use a lambda to call `make_hash` and have the lambda return a `Row` object, which Spark can use to ["infer" a new DataFrame](http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection).

```
from pyspark.sql import Row
def make_hash_from_row(row):
    hash = make_hash(row[0], row[1])
    return Row(first_name=row[0], last_name=row[1], hash=hash)

df2 = (df.rdd
         .map(lambda row: make_hash_from_row(row))
         .toDF())
```

These methods are roughly equivalent. You'll need to do something similar to convert _your_ `raw_data_df` DataFrame into a new DataFrame of `LabeledPoint` objects.

In [119]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

# Here is a sample raw data point:
# '2001.0,0.884,0.610,0.600,0.474,0.247,0.357,0.344,0.33,0.600,0.425,0.60,0.419'
# In this raw data point, 2001.0 is the label, and the remaining values are features

In [120]:
from pyspark.sql import functions as sql_functions

def parse_points(df):
    """Converts a DataFrame of comma separated unicode strings into a DataFrame of `LabeledPoints`.

    Args:
        df: DataFrame where each row is a comma separated unicode string. The first element in the string
            is the label and the remaining elements are the features.

    Returns:
        DataFrame: Each row is converted into a `LabeledPoint`, which consists of a label and
            features. To convert an RDD to a DataFrame, simply call toDF().
    """
    def makeLabel(row):
      comp = row['value'].split(',')
      label = float(comp[0])
      feat = [float(c) for c in comp[1:]]
      return LabeledPoint(label, feat)
    df2 = df.rdd.map(lambda row: makeLabel(row)).toDF()
    return df2

parsed_points_df = parse_points(raw_data_df)
first_point_features = parsed_points_df.first()['features']
first_point_label = parsed_points_df.first()['label']
print (first_point_features, first_point_label)

d = len(first_point_features)
print (d)

In [121]:
# TEST Using LabeledPoint (1b)
assert_true(isinstance(first_point_label, float), 'label must be a float')
expectedX0 = [0.8841,0.6105,0.6005,0.4747,0.2472,0.3573,0.3441,0.3396,0.6009,0.4257,0.6049,0.4192]
assert_true(np.allclose(expectedX0, first_point_features, 1e-4, 1e-4),
                'incorrect features for firstPointFeatures')

### Visualization 1: Features

First we will load and setup the visualization library. Then we will look at the raw features for 50 data points by generating a heatmap that visualizes each feature on a grey-scale and shows the variation of each feature across the 50 sample data points.  The features are all between 0 and 1, with values closer to 1 represented via darker shades of grey.

In [123]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm

# takeSample(withReplacement, num, [seed]) randomly selects num elements from the dataset with/without replacement, and has an
# optional seed parameter that one can set for reproducible results

data_values = (parsed_points_df
               .rdd
               .map(lambda lp: lp.features.toArray())
               .takeSample(False, 50, 47))

# You can uncomment the line below to see randomly selected features.  These will be randomly
# selected each time you run the cell because there is no set seed.  Note that you should run
# this cell with the line commented out when answering the lab quiz questions.
# data_values = (parsedPointsDF
#                .rdd
#                .map(lambda lp: lp.features.toArray())
#                .takeSample(False, 50))

def prepare_plot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                 gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

# generate layout and plot
fig, ax = prepare_plot(np.arange(.5, 11, 1), np.arange(.5, 49, 1), figsize=(8,7), hideLabels=True,
                       gridColor='#eeeeee', gridWidth=1.1)
image = plt.imshow(data_values,interpolation='nearest', aspect='auto', cmap=cm.Greys)
for x, y, s in zip(np.arange(-.125, 12, 1), np.repeat(-.75, 12), [str(x) for x in range(12)]):
    plt.text(x, y, s, color='#999999', size='10')
plt.text(4.7, -3, 'Feature', color='#999999', size='11'), ax.set_ylabel('Observation')
display(fig)

### (1c) Find the range

Now let's examine the labels to find the range of song years.  To do this, find the smallest and largest labels in the `parsed_points_df`.

We will use the min and max functions that are native to the DataFrames, and thus can be optimized using Spark's Catalyst Optimizer and Project Tungsten (don't worry about the technical details). This code will run faster than simply using the native min and max functions in Python. Use [selectExpr](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.selectExpr) to retrieve the min and max label values.

In [125]:
content_stats = (parsed_points_df
                 .selectExpr('label'))
min_year = content_stats.selectExpr('min(label)').first()['min(label)']
max_year = content_stats.selectExpr('max(label)').first()['max(label)']

print (min_year, max_year)

In [126]:
# TEST Find the range (1c)
assert_equal(len(parsed_points_df.first().features), 12,
                  'unexpected number of features in sample point')
sum_feat_two = parsed_points_df.rdd.map(lambda lp: lp.features[2]).sum()
assert_true(np.allclose(sum_feat_two, 3158.96224351), 'parsedPointsDF has unexpected values')
year_range = max_year - min_year
assert_true(year_range == 89, 'incorrect range for minYear to maxYear')

### (1d) Shift labels

As we just saw, the labels are years in the 1900s and 2000s.  In learning problems, it is often natural to shift labels such that they start from zero.  Starting with `parsed_points_df`, create a new DataFrame in which the labels are shifted such that smallest label equals zero (hint: use `select`). After, use [withColumnRenamed](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumnRenamed) to rename the appropriate columns to `features` and `label`.

In [128]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code

parsed_data_df = parsed_points_df.select(parsed_points_df.features, parsed_points_df.label-min_year)
parsed_data_df = parsed_data_df.withColumnRenamed('(label - '+str(min_year)+')', 'label')
# View the first point

print ('\n{0}'.format(parsed_data_df.first()))

In [129]:
# TEST Shift labels (1d)
old_sample_features = parsed_points_df.first().features
new_sample_features = parsed_data_df.first().features
assert_true(np.allclose(old_sample_features, new_sample_features),
                'new features do not match old features')
sum_feat_two = parsed_data_df.rdd.map(lambda lp: lp.features[2]).sum()
assert_true(np.allclose(sum_feat_two, 3158.96224351), 'parsed_data_df has unexpected values')
min_year_new = parsed_data_df.groupBy().min('label').first()[0]
max_year_new = parsed_data_df.groupBy().max('label').first()[0]
assert_true(min_year_new == 0, 'incorrect min year in shifted data')
assert_true(max_year_new == 89, 'incorrect max year in shifted data')

### Visualization 2: Shifting labels

We will look at the labels before and after shifting them.  Both scatter plots below visualize tuples storing:

* a label value and
* the number of training points with this label.

The first scatter plot uses the initial labels, while the second one uses the shifted labels.  Note that the two plots look the same except for the labels on the x-axis.

In [131]:
# get data for plot
old_data = (parsed_points_df
             .rdd
             .map(lambda lp: (lp.label, 1))
             .reduceByKey(lambda x, y: x + y)
             .collect())
x, y = zip(*old_data)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(1920, 2050, 20), np.arange(0, 150, 20))
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
ax.set_xlabel('Year'), ax.set_ylabel('Count')
display(fig)

In [132]:
# get data for plot
new_data = (parsed_data_df
             .rdd
             .map(lambda lp: (lp.label, 1))
             .reduceByKey(lambda x, y: x + y)
             .collect())
x, y = zip(*new_data)

# generate layout and plot data
fig, ax = prepare_plot(np.arange(0, 120, 20), np.arange(0, 120, 20))
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
ax.set_xlabel('Year (shifted)'), ax.set_ylabel('Count')
display(fig)
pass

### (1e) Training, validation, and test sets

We're almost done parsing our dataset, and our final task involves spliting the dataset into training, validation and test sets. Use the [randomSplit method](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) with the specified weights and seed to create DataFrames storing each of these datasets. Next, cache each of these DataFrames, as we will be accessing them multiple times in the remainder of this lab. Finally, compute the size of each dataset and verify that the sum of their sizes equals the value computed in Part (1a).

In [134]:
# TODO: Uncomment the following lines and replace <FILL IN> with appropriate code
weights = [.8, .1, .1]
seed = 42

parsed_train_data_df, parsed_val_data_df, parsed_test_data_df = parsed_data_df.randomSplit(weights, seed)
parsed_train_data_df.cache()
parsed_val_data_df.cache()
parsed_test_data_df.cache()
n_train = parsed_train_data_df.count()
n_val = parsed_val_data_df.count()
n_test = parsed_test_data_df.count()

print (n_train, n_val, n_test, n_train + n_val + n_test)
print (parsed_data_df.count())

In [135]:
# TEST Training, validation, and test sets (1e)
assert_equal(len(parsed_train_data_df.first().features), 12,
                  'parsed_train_data_df has wrong number of features')
sum_feat_two = (parsed_train_data_df
                 .rdd
                 .map(lambda lp: lp.features[2])
                 .sum())
sum_feat_three = (parsed_val_data_df
                  .rdd
                  .map(lambda lp: lp.features[3])
                  .reduce(lambda x, y: x + y))
sum_feat_four = (parsed_test_data_df
                  .rdd
                  .map(lambda lp: lp.features[4])
                  .reduce(lambda x, y: x + y))
assert_true(np.allclose([sum_feat_two, sum_feat_three, sum_feat_four],
                            2526.87757656, 297.340394298, 184.235876654),
                'parsed Train, Val, Test data has unexpected values')
assert_true(n_train + n_val + n_test == 6724, 'unexpected Train, Val, Test data set size')
assert_equal(n_train, 5405, 'unexpected value for nTrain')
assert_equal(n_val, 644, 'unexpected value for nVal')
assert_equal(n_test, 675, 'unexpected value for nTest')

## Part 2: Create and evaluate a baseline model

### (2a) Average label

A very simple yet natural baseline model is one where we always make the same prediction independent of the given data point, using the average label in the training set as the constant prediction value.  Compute this value, which is the average (shifted) song year for the training set.  Use `selectExpr` and `first()` from the [DataFrame API](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

In [138]:
average_train_year = (parsed_train_data_df
                        .selectExpr('avg(label)').first()['avg(label)'])


print(sum(parsed_train_data_df.rdd.map(lambda x:x.label).collect()), parsed_train_data_df.count())
print (average_train_year)

In [139]:
# TEST Average label (2a)
assert_true(np.allclose(average_train_year, 53.68140610545791),
                'incorrect value for average_train_year')

### (2b) Root mean squared error

We naturally would like to see how well this naive baseline performs.  We will use root mean squared error ([RMSE](http://en.wikipedia.org/wiki/Root-mean-square_deviation)) for evaluation purposes.  Using [Regression Evaluator](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator),  compute the RMSE given a dataset of _(prediction, label)_ tuples.

In [141]:
from pyspark.ml.evaluation import RegressionEvaluator

preds_and_labels = [(1., 3.), (2., 1.), (2., 2.)]
preds_and_labels_df = sqlContext.createDataFrame(preds_and_labels, ["prediction", "label"])


evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='label')

def calc_RMSE(dataset):
    """Calculates the root mean squared error for an dataset of (prediction, label) tuples.

    Args:
        dataset (DataFrame of (float, float)): A `DataFrame` consisting of (prediction, label) tuples.

    Returns:
        float: The square root of the mean of the squared errors.
    """
    
    return evaluator.evaluate(dataset)

example_rmse = calc_RMSE(preds_and_labels_df)
print (example_rmse)

In [142]:
# TEST Root mean squared error (2b)
assert_true(np.allclose(example_rmse, 1.29099444874), 'incorrect value for exampleRMSE')

### (2c) Training, validation and test RMSE

Now let's calculate the training, validation and test RMSE of our baseline model. To do this, first create DataFrames of _(prediction, label)_ tuples for each dataset, and then call `calc_RMSE()`. Note that each RMSE can be interpreted as the average prediction error for the given dataset (in terms of number of years). You can use [createDataFrame](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame) to make a DataFrame with the column names of "prediction" and "label" from an RDD.

In [144]:
preds_and_labels_train = parsed_train_data_df.rdd.map(lambda x:(average_train_year, x.label))
preds_and_labels_train_df = sqlContext.createDataFrame(preds_and_labels_train, ["prediction", "label"])
rmse_train_base = calc_RMSE(preds_and_labels_train_df)

preds_and_labels_val = parsed_val_data_df.rdd.map(lambda x:(average_train_year, x.label))
preds_and_labels_val_df = sqlContext.createDataFrame(preds_and_labels_val, ["prediction", "label"])
rmse_val_base = calc_RMSE(preds_and_labels_val_df)

preds_and_labels_test = parsed_test_data_df.rdd.map(lambda x:(average_train_year ,x.label))
preds_and_labels_test_df = sqlContext.createDataFrame(preds_and_labels_test, ["prediction", "label"])
rmse_test_base = calc_RMSE(preds_and_labels_test_df)

print ('Baseline Train RMSE = {0:.3f}'.format(rmse_train_base))
print ('Baseline Validation RMSE = {0:.3f}'.format(rmse_val_base))
print ('Baseline Test RMSE = {0:.3f}'.format(rmse_test_base))

In [145]:
# TEST Training, validation and test RMSE (2c)
assert_true(np.allclose([rmse_train_base, rmse_val_base, rmse_test_base],
                            [21.477279181895707, 21.22695358728026, 21.127323734084705]), 'incorrect RMSE values')

### Visualization 3: Predicted vs. actual

We will visualize predictions on the validation dataset. The scatter plots below visualize tuples storing i) the predicted value and ii) true label.  The first scatter plot represents the ideal situation where the predicted value exactly equals the true label, while the second plot uses the baseline predictor (i.e., `average_train_year`) for all predicted values.  Further note that the points in the scatter plots are color-coded, ranging from light yellow when the true and predicted values are equal to bright red when they drastically differ.

In [147]:
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
cmap = get_cmap('YlOrRd')
norm = Normalize()

def squared_error(lp):
    """Calculates the squared error for a single prediction."""
    label, prediction = lp
    return float((label - prediction)**2)

actual = np.asarray(parsed_val_data_df
                    .select('label')
                    .collect())
error = np.asarray(parsed_val_data_df
                   .rdd
                   .map(lambda lp: (lp.label, lp.label))
                   .map(lambda lp: squared_error(lp))
                   .collect())
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = prepare_plot(np.arange(0, 100, 20), np.arange(0, 100, 20))
plt.scatter(actual, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=0.5)
ax.set_xlabel('Predicted'), ax.set_ylabel('Actual')
display(fig)

In [148]:
def squared_error(lp):
    """Calculates the squared error for a single prediction."""
    label, prediction = lp
    return float((label - prediction)**2)

predictions = np.asarray(parsed_val_data_df
                         .rdd
                         .map(lambda lp: average_train_year)
                         .collect())
error = np.asarray(parsed_val_data_df
                   .rdd
                   .map(lambda lp: (lp.label, average_train_year))
                   .map(lambda lp: squared_error(lp))
                   .collect())
norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = prepare_plot(np.arange(53.0, 55.0, 0.5), np.arange(0, 100, 20))
ax.set_xlim(53, 55)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=0.3)
ax.set_xlabel('Predicted'), ax.set_ylabel('Actual')
display(fig)

## Part 3: Train (via gradient descent) and evaluate a linear regression model

### (3a) Gradient summand

Now let's see if we can do better via linear regression, training a model via gradient descent (we'll omit the intercept for now). Recall that the gradient descent update for linear regression is:
\\[ \scriptsize \mathbf{w}_{i+1} = \mathbf{w}_i - \alpha_i \sum_j (\mathbf{w}_i^\top\mathbf{x}_j  - y_j) \mathbf{x}_j \,.\\]
where \\( \scriptsize i \\) is the iteration number of the gradient descent algorithm, and \\( \scriptsize j \\) identifies the observation.

First, implement a function that computes the summand for this update, i.e., the summand equals \\( \scriptsize (\mathbf{w}^\top \mathbf{x} - y) \mathbf{x} \, ,\\) and test out this function on two examples.  Use the `DenseVector` [dot](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.linalg.DenseVector.dot) method.

In [151]:
from pyspark.mllib.linalg import DenseVector

In [152]:
def gradient_summand(weights, lp):
    """Calculates the gradient summand for a given weight and `LabeledPoint`.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        weights (DenseVector): An array of model weights (betas).
        lp (LabeledPoint): The `LabeledPoint` for a single observation.

    Returns:
        DenseVector: An array of values the same length as `weights`.  The gradient summand.
    """
    
    return (weights.dot(lp.features)-lp.label)*lp.features

example_w = DenseVector([1, 1, 1])
example_lp = LabeledPoint(2.0, [3, 1, 4])
summand_one = gradient_summand(example_w, example_lp)
print (summand_one)

example_w = DenseVector([.24, 1.2, -1.4])
example_lp = LabeledPoint(3.0, [-1.4, 4.2, 2.1])
summand_two = gradient_summand(example_w, example_lp)
print (summand_two)


In [153]:
# TEST Gradient summand (3a)
assert_true(np.allclose(summand_one, [18., 6., 24.]), 'incorrect value for summand_one')

### (3b) Use weights to make predictions

Next, implement a `get_labeled_predictions` function that takes in weights and an observation's `LabeledPoint` and returns a _(prediction, label)_ tuple.  Note that we can predict by computing the dot product between weights and an observation's features.

In [155]:
def get_labeled_prediction(weights, observation):
    """Calculates predictions and returns a (prediction, label) tuple.

    Note:
        The labels should remain unchanged as we'll use this information to calculate prediction
        error later.

    Args:
        weights (np.ndarray): An array with one weight for each features in `trainData`.
        observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the
            features for the data point.

    Returns:
        tuple: A (prediction, label) tuple. Convert the return type of the label and prediction to a float.
    """
    return (float(weights.dot(observation.features)), float(observation.label))#<FILL IN>

weights = np.array([1.0, 1.5])
prediction_example = sc.parallelize([LabeledPoint(2, np.array([1.0, .5])),
                                     LabeledPoint(1.5, np.array([.5, .5]))])
preds_and_labels_example = prediction_example.map(lambda lp: get_labeled_prediction(weights, lp))
print (preds_and_labels_example.collect())

In [156]:
# TEST Use weights to make predictions (3b)
assert_true(isinstance(preds_and_labels_example.first()[0], float), 'prediction must be a float')
assert_equal(preds_and_labels_example.collect(), [(1.75, 2.0), (1.25, 1.5)],
                  'incorrect definition for getLabeledPredictions')

### (3c) Gradient descent

Next, implement a gradient descent function for linear regression and test out this function on an example.

In [158]:
def linreg_gradient_descent(train_data, num_iters):
    """Calculates the weights and error for a linear regression model trained with gradient descent.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        train_data (RDD of LabeledPoint): The labeled data for use in training the model.
        num_iters (int): The number of iterations of gradient descent to perform.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The length of the training data
    n = train_data.count()
    # The number of features in the training data
    d = len(train_data.first().features)
    w = np.zeros(d) 
    alpha = 1.0
    # We will compute and store the training error after each iteration
    error_train = np.zeros(num_iters)
    for i in range(num_iters):
        # Use get_labeled_prediction from (3b) with trainData to obtain an RDD of (label, prediction)
        # tuples.  Note that the weights all equal 0 for the first iteration, so the predictions will
        # have large errors to start.
        preds_and_labels_train = train_data.map(lambda lp: get_labeled_prediction(w, lp))
    
        preds_and_labels_train_df = sqlContext.createDataFrame(preds_and_labels_train, ["prediction", "label"])
        error_train[i] = calc_RMSE(preds_and_labels_train_df)

        # Calculate the `gradient`.  Make use of the `gradient_summand` function you wrote in (3a).
        # Note that `gradient` should be a `DenseVector` of length `d`.
        gradient = train_data.map(lambda x: gradient_summand(w, x)).sum()

        # Update the weights
        alpha_i = alpha / (n * np.sqrt(i+1))
        w -= alpha_i*gradient
    return w, error_train

# create a toy dataset with n = 10, d = 3, and then run 5 iterations of gradient descent
# note: the resulting model will not be useful; the goal here is to verify that
# linreg_gradient_descent is working properly
example_n = 10
example_d = 3
example_data = (sc
                 .parallelize(parsed_train_data_df.take(example_n))
                 .map(lambda lp: LabeledPoint(lp.label, lp.features[0:example_d])))
print (example_data.take(2))
example_num_iters = 5
example_weights, example_error_train = linreg_gradient_descent(example_data, example_num_iters)
print (example_weights)


In [159]:
# TEST Gradient descent (3c)

expected_output = [ 3.6706757 , 28.77277769, 17.91957382]
assert_true(np.allclose(example_weights, expected_output), 'value of example_weights is incorrect')
expected_error = [38.71433843 ,32.45595388, 30.22343598, 29.10818648, 28.46448674]
assert_true(np.allclose(example_error_train, expected_error),
                'value of exampleErrorTrain is incorrect')

### (3d) Train the model

Now let's train a linear regression model on all of our training data and evaluate its accuracy on the validation set.  Note that the test set will not be used here.  If we evaluated the model on the test set, we would bias our final results.

We've already done much of the required work: we computed the number of features in Part (1b); we created the training and validation datasets and computed their sizes in Part (1e); and, we wrote a function to compute RMSE in Part (2b).

In [161]:
num_iters = 50
weights_LR0, error_train_LR0 = linreg_gradient_descent(parsed_train_data_df.rdd, num_iters)

preds_and_labels = (parsed_val_data_df
                      .rdd
                      .map(lambda lp: get_labeled_prediction(weights_LR0, lp)))
preds_and_labels_df = sqlContext.createDataFrame(preds_and_labels, ["prediction", "label"])
rmse_val_LR0 = calc_RMSE(preds_and_labels_df)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmse_val_base,
                                                                       rmse_val_LR0))

In [162]:
# TEST Train the model (3d)
expected_output =[22.49753756, 20.37597243, -0.27251763,  8.31521579,  5.88853769, -4.82949119,
                   15.60187764,  3.97275651,  9.93959836,  6.06173598, 10.9900427,   3.72333163]
assert_true(np.allclose(weights_LR0, expected_output), 'incorrect value for weights_LR0')

### Visualization 4: Training error

We will look at the log of the training error as a function of iteration. The first scatter plot visualizes the logarithm of the training error for all 50 iterations.  The second plot shows the training error itself, focusing on the final 44 iterations.

In [164]:
norm = Normalize()
clrs = cmap(np.asarray(norm(np.log(error_train_LR0))))[:,0:3]

fig, ax = prepare_plot(np.arange(0, 60, 10), np.arange(2, 6, 1))
ax.set_ylim(2, 6)
plt.scatter(range(0, num_iters), np.log(error_train_LR0), s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xlabel('Iteration'), ax.set_ylabel(r'$\log_e(errorTrainLR0)$')
display(fig)

In [165]:
norm = Normalize()
clrs = cmap(np.asarray(norm(error_train_LR0[6:])))[:,0:3]

fig, ax = prepare_plot(np.arange(0, 60, 10), np.arange(17, 22, 1))
ax.set_ylim(17.8, 21.2)
plt.scatter(range(0, num_iters-6), error_train_LR0[6:], s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xticklabels(map(str, range(6, 66, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
display(fig)

## Part 4: Train using SparkML and perform grid search

### (4a) `LinearRegression`
We're already doing better than the baseline model, but let's see if we can do better by adding an intercept, using regularization, and (based on the previous visualization) training for more iterations. MLlib's [LinearRegressionWithSGD](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.regression.LinearRegressionWithSGD) essentially applys smae idea that we used in Part (3b), albeit using stochastic gradient approximation and more efficiently with various additional functionality including an intercept in the model and also allowing L1 or L2 regularization.

First use [LinearRegressionWithSGD](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.regression.LinearRegressionWithSGD) to train a model with L2 regularization and with an intercept. This method returns a LinearRegressionModel. Next, use the model's weights (ex model.weights) and intercept (ex model.intercept) attributes to print out the model's parameters

In [168]:
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 500  # iterations
alpha = 1.0  # step
miniBatchFrac = 1.0  # miniBatchFraction
reg = 1e-1  # regParam
regType = 'l2'  # regType
useIntercept = True  # intercept

In [169]:
first_model = LinearRegressionWithSGD.train(data=parsed_train_data_df.rdd.map(lambda lp: LabeledPoint(lp.label, lp.features)), 
                                           iterations=numIters, 
                                           step=alpha, 
                                           miniBatchFraction=miniBatchFrac, 
                                           initialWeights=None, 
                                           regParam=reg, 
                                           regType=regType, 
                                           intercept=useIntercept
                                           )

coeffs_LR1 = first_model.weights
intercept_LR1 = first_model.intercept
print (coeffs_LR1, intercept_LR1)

In [170]:
# TEST LinearRegression (4a)
expected_intercept = 13.272917647663501
expected_weights= [15.90811601077196,14.141747071778694,0.6361803544493579,6.1325094710269825,3.974387960587376,-2.5115097734889575,10.55122835853148,3.108716057088987,7.247819807528351,4.703102469082578,7.760457489026197,3.0765704673716043]

assert_true(np.allclose(intercept_LR1, expected_intercept), 'incorrect value for intercept_LR1')
assert_true(np.allclose(coeffs_LR1, expected_weights), 'incorrect value for weights_LR1')

### (4b) Predict

Now use the [LinearRegressionModel.predict()](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.regression.LinearRegressionModel.predict) method to make a prediction on a sample point. Pass the features from a LabeledPoint into the predict() method

In [172]:
samplePoint = parsed_train_data_df.take(1)[0]
print(samplePoint)
sample_prediction = first_model.predict(samplePoint.features)
print (sample_prediction)

In [173]:
# TEST Predict (4b)
assert_true(np.allclose(sample_prediction, 39.82206386491484),
                'incorrect value for sample_prediction')

### (4c) Evaluate RMSE

Next evaluate the accuracy of this model on the validation set.  First, map [LinearRegressionModel.predict()](http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.regression.LinearRegressionModel.predict) method on features in parsed_val_data_df to get labelsAndPreds. Create a dataframe based on that and then use the `calc_RMSE()` function from Part (2b).

In [175]:
labels_and_preds = parsed_val_data_df.rdd.map(lambda lp: (lp.label, float(first_model.predict(lp.features))))
labels_and_preds_df = sqlContext.createDataFrame(labels_and_preds, ["label", "prediction"])
rmse_val_LR1 = calc_RMSE(labels_and_preds_df)

print(rmse_val_LR1)

In [176]:
# TEST Evaluate RMSE (4c)
assert_true(np.allclose(rmse_val_LR1, 19.49452264747249), 'incorrect value for rmseValLR1')

### (4d) Grid search

We're already outperforming the baseline on the validation set by almost 2 years on average, but let's see if we can do better. Perform grid search to find a good regularization parameter.  Try `regParam` values `1e-10`, `1e-5`, and `1.0`.

In [178]:
best_RMSE = rmse_val_LR1
best_reg_param = reg
best_model = first_model

num_iters = 500  # iterations
alpha = 1.0 # step
miniBatchFrac = 1.0

for reg in [1e-10, 1e-5, 1.0]:
    model = LinearRegressionWithSGD.train(data=parsed_train_data_df.rdd.map(lambda lp: LabeledPoint(lp.label, lp.features)), 
                                           iterations=num_iters, 
                                           step=alpha, 
                                           miniBatchFraction=miniBatchFrac, 
                                           initialWeights=None, 
                                           regParam=reg, 
                                           regType=regType, 
                                           intercept=useIntercept
                                           )
    
    labels_and_preds = parsed_val_data_df.rdd.map(lambda lp: (lp.label, float(model.predict(lp.features))))
    
    labels_and_preds_df = sqlContext.createDataFrame(labels_and_preds, ["label", "prediction"])
    rmse_val_grid = calc_RMSE(labels_and_preds_df)
    print (rmse_val_grid)

    if rmse_val_grid < best_RMSE:
        best_RMSE = rmse_val_grid
        best_reg_param = reg
        best_model = model

rmse_val_LR_grid = best_RMSE

print (('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n' +
       '\tLRGrid = {3:.3f}').format(rmse_val_base, rmse_val_LR0, rmse_val_LR1, rmse_val_LR_grid))

In [179]:
# TEST Grid search (4d)
assert_true(np.allclose(17.19165402610348, rmse_val_LR_grid), 'incorrect value for rmseValLRGrid')

### Visualization 5: Best model's predictions

Next, we create a visualization similar to 'Visualization 3: Predicted vs. actual' from Part 2 using the predictions from the best model from Part (4d) on the validation dataset.  Specifically, we create a color-coded scatter plot visualizing tuples storing i) the predicted value from this model and ii) true label.

In [181]:
predictions = np.asarray(parsed_val_data_df.rdd
                         .map(lambda lp: best_model.predict(lp.features))
                         .collect())
actual = np.asarray(parsed_val_data_df.rdd
                    .map(lambda lp: lp.label)
                    .collect())
error = np.asarray(parsed_val_data_df.rdd
                   .map(lambda lp: (lp.label, best_model.predict(lp.features)))
                   .map( lambda lp: squared_error(lp))
                   .collect())

norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = prepare_plot(np.arange(0, 120, 20), np.arange(0, 120, 20))
ax.set_xlim(15, 82), ax.set_ylim(-5, 105)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=.5)
ax.set_xlabel('Predicted'), ax.set_ylabel(r'Actual')
display(fig) 
pass

### Visualization 6: Hyperparameter heat map

Next, we perform a visualization of hyperparameter search using a larger set of hyperparameters (with precomputed results).  Specifically, we create a heat map where the brighter colors correspond to lower RMSE values.  The first plot has a large area with brighter colors.  In order to differentiate within the bright region, we generate a second plot corresponding to the hyperparameters found within that region.

In [183]:
from matplotlib.colors import LinearSegmentedColormap

# Saved parameters and results, to save the time required to run 36 models
numItersParams = [10, 50, 100, 250, 500, 1000]
regParams = [1e-8, 1e-6, 1e-4, 1e-2, 1e-1, 1]
rmseVal = np.array([[  20.36769649,   20.36770128,   20.36818057,   20.41795354,  21.09778437,  301.54258421],
                    [  19.04948826,   19.0495    ,   19.05067418,   19.16517726,  19.97967727,   23.80077467],
                    [  18.40149024,   18.40150998,   18.40348326,   18.59457491,  19.82155716,   23.80077467],
                    [  17.5609346 ,   17.56096749,   17.56425511,   17.88442127,  19.71577117,   23.80077467],
                    [  17.0171705 ,   17.01721288,   17.02145207,   17.44510574,  19.69124734,   23.80077467],
                    [  16.58074813,   16.58079874,   16.58586512,   17.11466904,  19.6860931 ,   23.80077467]])

numRows, numCols = len(numItersParams), len(regParams)
rmseVal = np.array(rmseVal)
rmseVal.shape = (numRows, numCols)

fig, ax = prepare_plot(np.arange(0, numCols, 1), np.arange(0, numRows, 1), figsize=(8, 7), hideLabels=True,
                      gridWidth=0.)
ax.set_xticklabels(regParams), ax.set_yticklabels(numItersParams)
ax.set_xlabel('Regularization Parameter'), ax.set_ylabel('Number of Iterations')

colors = LinearSegmentedColormap.from_list('blue', ['#0022ff', '#000055'], gamma=.2)
image = plt.imshow(rmseVal,interpolation='nearest', aspect='auto',
                    cmap = colors)
display(fig) 

In [184]:
# Zoom into the bottom left
numItersParamsZoom, regParamsZoom = numItersParams[-3:], regParams[:4]
rmseValZoom = rmseVal[-3:, :4]

numRows, numCols = len(numItersParamsZoom), len(regParamsZoom)

fig, ax = prepare_plot(np.arange(0, numCols, 1), np.arange(0, numRows, 1), figsize=(8, 7), hideLabels=True,
                      gridWidth=0.)
ax.set_xticklabels(regParamsZoom), ax.set_yticklabels(numItersParamsZoom)
ax.set_xlabel('Regularization Parameter'), ax.set_ylabel('Number of Iterations')

colors = LinearSegmentedColormap.from_list('blue', ['#0022ff', '#000055'], gamma=.2)
image = plt.imshow(rmseValZoom,interpolation='nearest', aspect='auto',
                    cmap = colors)
display(fig) 
pass

## Part 5: Add interactions between features

### (5a) Add 2-way interactions

So far, we've used the features as they were provided.  Now, we will add features that capture the two-way interactions between our existing features.  Write a function `two_way_interactions` that takes in a `LabeledPoint` and generates a new `LabeledPoint` that contains the old features and the two-way interactions between them.

> Note:
> * A dataset with three features would have nine ( \\( \scriptsize 3^2 \\) ) two-way interactions.
> * You might want to use [itertools.product](https://docs.python.org/2/library/itertools.html#itertools.product) to generate tuples for each of the possible 2-way interactions.
> * Remember that you can combine two `DenseVector` or `ndarray` objects using [np.hstack](http://docs.scipy.org/doc/numpy/reference/generated/numpy.hstack.html#numpy.hstack).

In [187]:
import itertools

def two_way_interactions(lp):
    """Creates a new `LabeledPoint` that includes two-way interactions.

    Note:
        For features [x, y] the two-way interactions would be [x^2, x*y, y*x, y^2] and these
        would be appended to the original [x, y] feature list.

    Args:
        lp (LabeledPoint): The label and features for this observation.

    Returns:
        LabeledPoint: The new `LabeledPoint` should have the same label as `lp`.  Its features
            should include the features from `lp` followed by the two-way interaction features.

    """
    
    arr = [e[0]*e[1] for e in itertools.product(lp.features, lp.features)]
    new_features = np.append(lp.features, arr)
    return  LabeledPoint(lp.label, new_features)

# Transform the existing train, validation, and test sets to include two-way interactions.
# Remember to convert them back to DataFrames at the end.
train_data_interact_df = parsed_train_data_df.rdd.map(lambda lp: two_way_interactions(lp)).toDF()
val_data_interact_df = parsed_val_data_df.rdd.map(lambda lp: two_way_interactions(lp)).toDF()
test_data_interact_df = parsed_test_data_df.rdd.map(lambda lp: two_way_interactions(lp)).toDF()

In [188]:
# TEST Add two-way interactions (5a)
two_way_example = two_way_interactions(LabeledPoint(0.0, [2, 3]))

assert_true(np.allclose(sorted(two_way_example.features),
                            sorted([2.0, 3.0, 4.0, 6.0, 6.0, 9.0])),
                'incorrect features generatedBy two_way_interactions')

two_way_point = two_way_interactions(LabeledPoint(1.0, [1, 2, 3]))

assert_true(np.allclose(sorted(two_way_point.features),
                            sorted([1.0,2.0,3.0,1.0,2.0,3.0,2.0,4.0,6.0,3.0,6.0,9.0])),
                'incorrect features generated by twoWayInteractions')


assert_equal(two_way_point.label, 1.0, 'incorrect label generated by two_way_interactions')


### (5b) Build interaction model

Now, let's build the new model.  We've done this several times now.  To implement this for the new features, we need to change a few variable names.

 > Note:
 > * Remember that we should build our model from the training data and evaluate it on the validation data.
 > * You should re-run your hyperparameter search after changing features, as using the best hyperparameters from your prior model will not necessary lead to the best model.
 > * For this exercise, we have already preset the hyperparameters to reasonable values.

In [190]:
numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
reg = 1e-10

model_interact = LinearRegressionWithSGD.train(data=train_data_interact_df.rdd.map(lambda lp: LabeledPoint(lp.label, lp.features)), 
                                           iterations=num_iters, 
                                           step=alpha, 
                                           miniBatchFraction=miniBatchFrac, 
                                           initialWeights=None, 
                                           regParam=reg,
                                           regType=regType, 
                                           intercept=useIntercept)

labels_and_preds_interact = val_data_interact_df.rdd.map(lambda lp: (lp.label, float(model_interact.predict(lp.features))))
labels_and_preds_interact_df = sqlContext.createDataFrame(labels_and_preds_interact, ["label", "prediction"])
rmse_val_interact = calc_RMSE(labels_and_preds_interact_df)

print (('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = ' +
       '{3:.3f}\n\tLRInteract = {4:.3f}').format(rmse_val_base, rmse_val_LR0, rmse_val_LR1,
                                                 rmse_val_LR_grid, rmse_val_interact))

In [191]:
# TEST Build interaction model (5b)
assert_true(np.allclose(rmse_val_interact, 16.197614458395268), 'incorrect value for rmse_val_interact')

### (5c) Evaluate interaction model on test data

Our next step is to evaluate the new model on the test dataset.  Note that we haven't used the test set to evaluate any of our models.  Because of this, our evaluation provides us with an unbiased estimate for how our model will perform on new data.  If we had changed our model based on viewing its performance on the test set, our estimate of RMSE would likely be overly optimistic.

We'll also print the RMSE for both the baseline model and our new model.  With this information, we can see how much better our model performs than the baseline model.

In [193]:
labels_and_preds_interact_test = test_data_interact_df.rdd.map(lambda lp: (lp.label, float(model_interact.predict(lp.features))))
labels_and_preds_interact_test_df = sqlContext.createDataFrame(labels_and_preds_interact_test, ["label", "prediction"])
rmse_test_interact = calc_RMSE(labels_and_preds_interact_test_df)

print (('Test RMSE:\n\tBaseline = {0:.3f}\n\tLRInteract = {1:.3f}'
       .format(rmse_test_base, rmse_test_interact)))

In [194]:
# TEST Evaluate interaction model on test data (5c)
assert_true(np.allclose(rmse_test_interact, 16.605307423246618),
                'incorrect value for rmse_test_interact')