# ElasticNet Regression with PySpark

This notebook creates and measures a linear regression model using sklearn.

* Method: ElasticNet
* Dataset: [California Housing](http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html)

## Imports

In [None]:
# Python core libs
from os import getlogin, path, environ

# Set SPARK_HOME
# environ["SPARK_HOME"] = "/home/students/spark-2.2.0"

# Findspark
import findspark
findspark.init()

# PySpark and PySpark SQL
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

# PySpark MLlib
from pyspark.ml.linalg import DenseVector
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StandardScaler

import matplotlib.pyplot as plt
%matplotlib inline

## Get Some Context

In [None]:
# Create a SparkContext and a SQLContext context to use
sc = SparkContext(appName="Linear Regression with Spark")
sqlContext = SQLContext(sc)

## Load and Prepare the Data

In [None]:
# Data 
DATA_FILE = "/Users/robert.dempsey/Dev/daamlobd/data/CaliforniaHousing/cal_housing.data"
HEADER_FILE = "/Users/robert.dempsey/Dev/daamlobd/data/CaliforniaHousing/cal_housing.domain"

In [None]:
# Prepare the columns
headers = sc.textFile(HEADER_FILE)
cols = [col.split(":")[0] for col in list(headers.collect())]
cols

In [None]:
# Import the data
data = sqlContext.read.csv(DATA_FILE)

# Rename the columns
data = data.toDF('longitude', 'latitude', 'housingMedianAge','totalRooms', 'totalBedrooms',
                 'population', 'households', 'medianIncome', 'medianHouseValue')

# Show the top two rows
data.show(2)

In [None]:
# View the schema
data.printSchema()

In [None]:
# Create a user defined function (UDF) to convert the column types
def convert_column_type(df, names, new_type):
    for name in names: 
        df = df.withColumn(name, df[name].cast(new_type))
    return df 

In [None]:
# Convert the columns to the correct types
data = convert_column_type(data, data.columns, FloatType())

# View the schema
data.printSchema()

In [None]:
# Describe the data - convert to Pandas dataframe to make it prettier
data_pd = data.describe().toPandas()
data_pd

**Note**: the difference between many of the min and max values are large so we'll need to normalize the data

In [None]:
# Convert the dependent variable, medianHouseValue, to use units of 100000
data = data.withColumn("medianHouseValue", F.col("medianHouseValue")/100000)
data.take(2)

## Feature Engineering

Add additional features to the dataframe:
* Rooms per household: number of rooms in a household per block group
* Population per household: an indication of how many people live in households per block group
* Bedrooms per room: how many rooms are bedrooms per block group

In [None]:
# Add the columns to the dataframe
data = data.withColumn("roomsPerHousehold", F.col("totalRooms")/F.col("households")) \
           .withColumn("populationPerHousehold", F.col("population")/F.col("households")) \
           .withColumn("bedroomsPerRoom", F.col("totalBedRooms")/F.col("totalRooms"))
        
# View the first row
data.first()

In [None]:
# Prepare the dataframe for analysis by reordering and selecting a subset of the columns
# Move our target variable to the first column to make it easy to extract
data = data.select("medianHouseValue",
                   "totalBedRooms", 
                   "population", 
                   "households", 
                   "medianIncome", 
                   "roomsPerHousehold", 
                   "populationPerHousehold", 
                   "bedroomsPerRoom")
data.first()

## Standardization

In [None]:
# Split the data into features and a label (target)
# DenseVector: used to store arrays of values for use in PySpark
input_data = data.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace the dataframe with the new dataframe
data = sqlContext.createDataFrame(input_data, ["label", "features"])

# Show the top row
data.first()

In [None]:
# Scale the data using the StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the dataframe to the scaler
scaler = standardScaler.fit(data)

# Transform the data in the dataframe with the scaler
scaled_df = scaler.transform(data)

# Inspect the result
scaled_df.take(2)

## Fit a Linear Regression Model

In [None]:
# Split the data into train and test sets
# The seed gives us reproducability of results
X_train, X_test = scaled_df.randomSplit([.8, .2], seed=1234)

Model parameters:
* maxIter: max number of iterations to run the optimization algorithm (gradient descent)
* regParam: regularization parameter
* elasticNetParam: elastic net parameter
  * 1 = L1 (LASSO)
  * 0 = L2 (Ridge)
  * Between 0 and 1 = ElasticNet

Below we train an elastic net regularized linear regression model

In [None]:
# Create an instance of a LinearRegression model
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
trained_model = lr.fit(X_train)

In [None]:
# Intercept for the model
print('Estimated intercept coefficient: {}'.format(trained_model.intercept))

In [None]:
# Create a dataframe with the features and coefficients
feature_columns = ["totalBedRooms", "population", "households", "medianIncome", "roomsPerHousehold",
                   "populationPerHousehold", "bedroomsPerRoom"]
coefficients = [float(coef) for coef in trained_model.coefficients]

cols_coefs = list(zip(feature_columns, coefficients))

c_df = sqlContext.createDataFrame(cols_coefs, ["feature", "coefficient"])
c_df.show()

**Interpretation**: it appears there is a weak correlation between medianIncome and medianHouseValue

In [None]:
# Create a plot for medianIncome and medianHouseValue
median_house_values = X_train.select('label').collect()
median_incomes = [row[0][4] for row in X_train.select('features').collect()]


fig = plt.figure(figsize=(20,10))

plt.scatter(median_incomes, median_house_values)
plt.xlabel("Median Income")
plt.ylabel("Median House Value")
plt.title("Relationship between Median Income and Median House Value")

plt.show()

## Make Predictions

In [None]:
# Use the transform() method to predict labels for the test data
predicted = trained_model.transform(X_test)

# Extract the predictions and the known correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip the predictions and labels into a list
predictions_and_labels = predictions.zip(labels).collect()

# Print the first five records (actual value, predicted value)
predictions_and_labels[:5]

In [None]:
# TODO - Plot this shit.
predicted.show()

In [None]:
X_test.show(2)

In [None]:
# TODO: Add a plot
# Create a plot to compare actual median house values and the predicted median house values
# fig = plt.figure(figsize=(20,10))
# plt.scatter(Y_test, y_pred)
# plt.xlabel("Actual Median House Value: $Y_i$")
# plt.ylabel("Predicted Median House Value: $\hat{Y}_i$")
# plt.title("Actual vs. Predicted Median House Values: $Y_i$ vs. $\hat{Y}_i$")
# plt.show()

## Model Evaluation

### Root Mean Squared Error

* An absolute measure of fit
* The distance, on average, of a data point from the fitted line, measured along a vertical line.
* Measured in the same units as the response variable
* Gives a relatively height weight to large errors; mor euseful when large errors are particulary undesirable
* Values closer to zero (0) are better

In [None]:
rmse = trained_model.summary.rootMeanSquaredError
print("Root Mean Squared Error: {}".format(rmse))

### Variance (R^2)

* Explains how much of the variability of a factor can be caused or explained by its relationship to another factor; how well the model is predicting.
* A score of 1 means a perfect prediction
* A score of 0 means the model always predicts the expected value of y, disregarding the input features

In [None]:
r2 = trained_model.summary.r2
print("Variance Score: %.2f" % r2)

## Results

Due to the high RMSE and median R2 scores the model needs some help.

## Cleanup

In [None]:
sc.stop()