In [1]:
# Goal is to predict the House Price in King County, USA 
# Dataset can be obtained from: https://www.kaggle.com/harlfoxem/housesalesprediction
# Linear Regression is used to predict the house price

# Model performance is evaluated using RMSE and R-Squared; Different Regression is used to fit the model. 
# Some of the regression techniques applied  were Ridge Regression and Lasso Regression. 

# Steps in Modelling:
# 1. Clean the data/ check for null values
# 2. Transformation: One Hot Encoding  for Categorical columns,  Vector Assembles for Features
# 3. Fit the model into Linear Regression 
# 4. Cross validate the model for best fit.
# 5. Compare the Performance between LinearRegression, Ridge Regression and Lasso Regression

#Which parts this example conver ? delete not applicables
#1. Linear Regression, Lasso Regression, Ridge Regression


In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Read the csv file and create spark Dataframe
housing_df = sqlContext.read.format("com.databricks.spark.csv")       \
        .options(delimiter=',', header = True, inferSchema = True)  \
        .load('/FileStore/tables/kc_house_data.csv')

In [3]:
# Show only Top 4 results
housing_df.show(4)
# Cache the dataframe
housing_df.cache()


In [4]:
#  Get the count of records
housing_df.count()


In [5]:
housing_df.describe("price", "sqft_living").show()


In [6]:
# Find the correlation between house price and sqft_living
column_labels = ['price','sqft_living', 'sqft_lot', 'bedrooms','bathrooms', \
         'floors', 'sqft_above', 'sqft_basement','yr_built','yr_renovated', \
        'sqft_living15', 'sqft_lot15']
housing_df.stat.corr( 'price', 'sqft_living' )


In [7]:
import numpy as np
from pyspark.mllib.stat import Statistics

In [8]:
# find which features highly correlated with price One can use many methoods to fidn co relations between the columns 
column_corr = Statistics.corr(housing_df.rdd.map(lambda x:
                         np.array([x['price'],
                                   x['sqft_living'],
                                   x['sqft_lot'],
                                   x['bedrooms'],
                                   x['bathrooms'],
                                   x['floors'],
                                   x['sqft_above'],
                                   x['sqft_basement'],
                                   x['yr_built'],
                                   x['yr_renovated'],
                                   x['sqft_living15'],
                                   x['sqft_lot15']
                                  ])), method='pearson')

In [9]:
# check for null values in data
from pyspark.sql.functions import isnull

# Drop any rows (not columns) where Data value is Any 
house_df_clean = housing_df.na.drop( how = 'any' )

house_df_clean.count() == housing_df.count()
# if both the records are same then no null values

In [10]:
# Feature transformation
## if a variable is skewed, it can be log transformed to make it more normal. Also, derive features from existing features, which can explain or predict the response variable. Price is highly  skewed; hence apply log for it.
from pyspark.sql.functions import col, log
housing_df = housing_df.withColumn( 'log_price', log('price') )

In [11]:
housing_df = housing_df.withColumn( 'log_sqft_lot', log('sqft_lot') )


In [12]:
# find the correlation now
housing_df.stat.corr( 'price', 'sqft_lot' )


In [13]:
housing_df.stat.corr( 'log_price', 'log_sqft_lot' )


In [14]:
# calculating age of house
from pyspark.sql.functions import lit

# Lit is python function  
housing_df = housing_df.withColumn("age", lit(2015) - col('yr_built'))

In [15]:
# find last rennovated age
housing_df = housing_df.withColumn("rennovate_age", lit(2015) - col('yr_renovated'))


In [16]:
# keep the copy of the original
housing_original_df = housing_df


In [17]:
continuous_features = ['sqft_living', 'bedrooms', 'bathrooms', 'floors',
                    'log_sqft_lot', 'age', 'sqft_above',
                    'sqft_living15', 'sqft_lot15', 'rennovate_age']

categorical_features = ['zipcode', 'waterfront',
                      'grade', 'condition',
                      'view']

In [18]:
# Build datasets 
def create_category_vars( dataset, field_name ):
  idx_col = field_name + "Index"
  col_vec = field_name + "Vec"

  month_stringIndexer = StringIndexer( inputCol=field_name,
                                       outputCol=idx_col )

  month_model = month_stringIndexer.fit( dataset )
  month_indexed = month_model.transform( dataset )

  month_encoder = OneHotEncoder( dropLast=True,
                                 inputCol=idx_col,
                                 outputCol= col_vec )

  return month_encoder.transform( month_indexed )

In [19]:
# OneHot Encoding for all the categorical columns
# Encoding - convert any text value within columns into the numbers Spark supply function - OneHotEncoder
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, PolynomialExpansion, VectorIndexer

for col in categorical_features:
  housing_df = create_category_vars( housing_df, col )

housing_df.cache()

In [20]:
# create features for vector Assembler
# Continuous values = Cant be categoried like sq ft of house 
featureCols = continuous_features + ['zipcodeVec',
                                   'waterfrontVec',
                                   'gradeVec',
                                   'conditionVec',
                                   'viewVec']

In [21]:
assembler = VectorAssembler( inputCols = featureCols, outputCol = "features")


In [22]:
housing_train_df = assembler.transform( housing_df )


In [23]:
# Create label
from pyspark.sql.functions import round

housing_train_df = housing_train_df.withColumn( "label", round('log_price', 4) )

In [24]:
# split the dataset
train_df, test_df = housing_train_df.randomSplit( [0.7, 0.3], seed = 30 )


In [25]:
# build the linear regression model
from pyspark.ml.regression import LinearRegression
# regParam=0.0 
linreg = LinearRegression(maxIter=500, regParam=0.0)
lm = linreg.fit( train_df )


In [26]:
# Where 
lm.intercept


In [27]:
lm.coefficients

# make predictions for test data and evaluate
y_pred = lm.transform( test_df )


In [28]:
y_pred.select( 'features', 'label', 'prediction' ).show( 5 )


In [29]:
# calculate actual predicted price
from pyspark.sql.functions import exp

y_pred = y_pred.withColumn( "y_pred", exp( 'prediction' ) )

In [30]:
# calculate RMSE
from pyspark.ml.evaluation import RegressionEvaluator
rmse_evaluator = RegressionEvaluator(labelCol="price",
                              predictionCol="y_pred",
                              metricName="rmse" )
lm_rmse = rmse_evaluator.evaluate( y_pred )
lm_rmse

In [31]:
# calcualte R-Squared - Diff between predicted value and actual value
r2_evaluator = RegressionEvaluator(labelCol="price",
                              predictionCol="y_pred",
                              metricName="r2" )
lm_r2 = r2_evaluator.evaluate( y_pred )
lm_r2

In [32]:
def get_r2_rmse( model, test_df ):
  y_pred = model.transform( test_df )
  y_pred = y_pred.withColumn( "y_pred", exp( 'prediction' ) )
  rmse_evaluator = RegressionEvaluator(labelCol="price",
                              predictionCol="y_pred",
                              metricName="rmse" )
  r2_evaluator = RegressionEvaluator(labelCol="price",
                              predictionCol="y_pred",
                              metricName="r2" )


In [33]:
perf_params = get_r2_rmse( lm, test_df )

# create dataframe to store all the model performances

import pandas as pd

model_perf = pd.DataFrame( columns = ['name', 'rsquared', 'rmse'] )

In [34]:
model_perf = model_perf.append( pd.Series( ["Linear Regression"] + perf_params ,
                 index = model_perf.columns ),
                 ignore_index = True )

In [35]:
model_perf


In [36]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
lrModel = LinearRegression(maxIter=50)



In [37]:
paramGrid = ParamGridBuilder()                          \
  .addGrid(lrModel.regParam, [0.1, 0.01, 0.001])      \
  .addGrid(lrModel.elasticNetParam, [0.0])            \
  .build()

In [38]:
evaluator = RegressionEvaluator(
  metricName="r2",
  labelCol="label",
)

In [39]:
crossval = CrossValidator(estimator=lrModel,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator,
                        numFolds=2)  # use 3+ folds in practice

In [40]:
cvModel = crossval.fit( train_df )


In [41]:
ridge_perf = get_r2_rmse( cvModel.bestModel, test_df )


In [42]:
model_perf = model_perf.append( pd.Series( ["Ridge Regression"] + ridge_perf ,
                 index = model_perf.columns ),
                 ignore_index = True )

model_perf

In [43]:
#Using Lasso Regression  - Another method to avoid overfit 
# the regParam is a L1 (ridge) penalty, if elastic param is 1.0
paramGrid = ParamGridBuilder()                          \
  .addGrid(lrModel.regParam, [0.1, 0.01, 0.001])      \
  .addGrid(lrModel.elasticNetParam, [1.0])            \
  .build()

evaluator = RegressionEvaluator(
  metricName="r2",
  labelCol="label",
)

crossval = CrossValidator(estimator=lrModel,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator,
                        numFolds=2)  # use 3+ folds in practice

In [44]:
cvModel = crossval.fit( train_df )
lasso_perf = get_r2_rmse( cvModel.bestModel, test_df )


In [45]:
model_perf = model_perf.append( pd.Series( ["Lasso Regression"] + lasso_perf ,
                 index = model_perf.columns ),
                 ignore_index = True )

model_perf