In [None]:
import findspark
findspark.init('C:\Spark\spark-2.3.0-bin-hadoop2.7')
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
from pyspark.sql.functions import col, countDistinct,avg,mean,stddev, year, month, dayofmonth, when,sum,count
from pyspark.sql.types import DoubleType
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor,LinearRegression
from pyspark.ml.feature import VectorIndexer,VectorAssembler,StringIndexer,Imputer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.stat import Correlation
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,TrainValidationSplit

import pandas as pd
import numpy as np


import seaborn as sns
color = sns.color_palette()
%matplotlib inline

In [None]:
spark = SparkSession.builder.getOrCreate()

### Reading the Data files

In [None]:
train_2016 = spark.read.option("header","true"). option("inferSchema","true").csv("C:\\Users\\moniy\\Desktop\\Zillow\\Data\\train_2016_v2.csv")
properties_2016 = spark.read.option("header","true").option("inferSchema","true").csv("C:\\Users\\moniy\\Desktop\\Zillow\\Data\\property_2016.csv")

#### Count the null values in each column

In [None]:
properties_2016.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in properties_2016.columns)).show()

Fill the Nan values with -1

In [None]:
properties_2016 = properties_2016.na.fill(-1)

In [None]:
properties_2016 = properties_2016.na.fill(True)

### Merging Data

We merge the two files transaction and properties as, 
to get the relevant features we need to train on properties that have been sold

In [None]:
joinedData = properties_2016.join(train_2016, properties_2016["parcelid"] == train_2016["parcelid"])

Using String Indexer to chnage the categorical columns

In [None]:
sI_taxdelinquencyflag = StringIndexer(inputCol="taxdelinquencyflag", outputCol="tDF_IX").fit(joinedData)
sI_propertyzoningdesc = StringIndexer(inputCol="propertyzoningdesc", outputCol="pZD_IX").fit(joinedData)

In [None]:
X.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in X.columns)).show()

### Benchmark Model

We will set this benchmark model and try to beat this prediction 

We create a vector assembler to create a features set to be fed to the model

In [None]:
vectorAssembler_features = VectorAssembler(inputCols=X.columns , outputCol="features")

Creating a model Random Forest Regressor

In [None]:
rf = RandomForestRegressor(featuresCol = vectorAssembler_features.getOutputCol(),labelCol='logerror')

Pipelining the stages for the model training

In [None]:
pipeline = Pipeline(stages=[vectorAssembler_features,rf])

In [None]:
model = pipeline.fit(train)

Model is created from the train features

Now we do the prediction on test set

In [None]:
prediction = model.transform(test)

In [None]:
evaluator = RegressionEvaluator()
evaluator.setLabelCol("logerror")
evaluator.setPredictionCol("prediction")
evaluator.setMetricName("rmse")
rmse = evaluator.evaluate(prediction)
print(rmse)

In [None]:
evaluator.setMetricName("mse")
mse = evaluator.evaluate(prediction)
print(mse)

### Data Preprocessing

We cast the date column in train_2016 as date

In [None]:
train_2016 = train_2016.withColumn('transactiondate', train_2016['transactiondate'].cast('date'))

In [None]:
train_2016.select('transactiondate').show(2)

In [None]:
train = properties_2016.join(train_2016, "parcelid")

In [None]:
train = train.withColumn('transaction_month',month('transactiondate'))

### Missing Value Treatment

Get the total number of records

In [None]:
rows = train.count()

#### Printing the columns and its null count

In [None]:
null_count = train.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in train.columns))

In [None]:
for field in null_count.schema.fields:
    name = str(field.name)
    null_count = null_count.withColumn(name, col(name)/rows)

#### Dropping columns with missing ratio greater than 50%

In [None]:
col_todrop = [c for c, v in null_count.select([
    count(when(col(c) > 0.5, 1)).alias(c) for c in null_count.columns
]).first().asDict().items() if v]  ## dropped all columns with missing value greater than 50%

In [None]:
train = train.drop(*col_todrop)

In [None]:
train = train.drop('calculatedbathnbr','fullbathcnt','finishedsquarefeet12','rawcensustractandblock','censustractandblock','landtaxvaluedollarcnt','taxvaluedollarcnt')

Casting integer columns as Double

In [None]:
 int_columnList = [item[0] for item in train.dtypes if item[1].startswith('int')]

In [None]:
for f in int_columnList: 
    train = train.withColumn(f, train[f].cast(DoubleType()))

#### Impute the remaining null values with the medians

In [None]:
imputer = Imputer(
    inputCols=['bathroomcnt', 'bedroomcnt', 'buildingqualitytypeid', 'calculatedfinishedsquarefeet', 'fips', 'heatingorsystemtypeid', 'latitude', 'longitude', 'lotsizesquarefeet', 'propertylandusetypeid', 'regionidcity', 'regionidcounty', 'regionidzip', 'roomcnt', 'unitcnt', 'yearbuilt', 'structuretaxvaluedollarcnt', 'assessmentyear', 'taxamount', 'logerror', 'transaction_month'],
    outputCols=['bathroomcnt_out', 'bedroomcnt_out', 'buildingqualitytypeid_out', 'calculatedfinishedsquarefeet_out', 'fips_out', 'heatingorsystemtypeid_out', 'latitude_out', 'longitude_out', 'lotsizesquarefeet_out', 'propertylandusetypeid_out', 'regionidcity_out', 'regionidcounty_out', 'regionidzip_out', 'roomcnt_out', 'unitcnt_out', 'yearbuilt_out', 'structuretaxvaluedollarcnt_out', 'assessmentyear_out', 'taxamount_out', 'logerror_out', 'transaction_month_out'],
    strategy = 'median') ## Substitute with median

In [None]:
model = imputer.fit(train)
train_int_cleaned = model.transform(train)

#### Dropping columns from which data has been extracted

In [None]:
train_int_cleaned = train_int_cleaned.drop('bathroomcnt', 'bedroomcnt', 'buildingqualitytypeid', 'calculatedfinishedsquarefeet', 'fips', 'heatingorsystemtypeid', 'latitude', 'longitude', 'lotsizesquarefeet', 'propertylandusetypeid', 'regionidcity', 'regionidcounty', 'regionidzip', 'roomcnt', 'unitcnt', 'yearbuilt', 'structuretaxvaluedollarcnt', 'assessmentyear', 'taxamount', 'logerror', 'transaction_month')

In [None]:
sI_propertycountylandusecode = StringIndexer(inputCol="propertycountylandusecode", outputCol="tDF_IX").fit(train_int_cleaned)
sI_propertyzoningdesc = StringIndexer(inputCol="propertyzoningdesc", outputCol="pZD_IX").fit(train_int_cleaned)

Dropping the categorical columns

In [None]:
X = train_int_cleaned.drop('transactiondate','propertyzoningdesc', 'propertycountylandusecode')

In [None]:
X = X.na.fill(-1)

#### Clipping the outliers from the logerror column

In [None]:
X_cleaned = X.filter((col("logerror_out") > -0.417) & (col("logerror_out") < 0.418))

In [None]:
test = X_cleaned.logerror_out

In [None]:
X_cleaned = X_cleaned.drop('parcelid') 

### Feature Engineering

We create new feature New_LivingArea, which we get on dividing overall area by lot size

In [None]:
X_cleaned = X_cleaned.withColumn('New_LivingAreaProp',col('calculatedfinishedsquarefeet_out')/col('lotsizesquarefeet_out'))

Another Fetaure, number of properties in a zipcode

In [None]:
region = X_cleaned.groupby(X_cleaned.regionidzip_out).agg(count('regionidzip_out').alias('zip_count'))

In [None]:
city = X_cleaned.groupby(X_cleaned.regionidcity_out).agg(count('regionidcity_out').alias('city_count'))

In [None]:
X_cleaned = X_cleaned.join(region,"regionidzip_out")

In [None]:
X_cleaned = X_cleaned.join(city,"regionidcity_out")

### Model Selection

We create the vector assembler to get the feature set to be fed to the models

In [None]:
vectorAssembler_features = VectorAssembler(inputCols=['regionidcity_out','regionidzip_out','propertylandusetypeid_out',
 'longitude_out',
 'yearbuilt_out',
 'fips_out',
 'roomcnt_out',
 'lotsizesquarefeet_out',
 'buildingqualitytypeid_out',
 'latitude_out',
 'regionidcounty_out',
 'structuretaxvaluedollarcnt_out',
 'bedroomcnt_out',
 'heatingorsystemtypeid_out',
 'bathroomcnt_out',
 'taxamount_out',
 'assessmentyear_out',
 'calculatedfinishedsquarefeet_out',
 'transaction_month_out',
 'unitcnt_out',
 'New_LivingAreaProp',
 'zip_count',
 'city_count'], outputCol="features")

In [None]:
X_training = vectorAssembler_features.transform(X_cleaned)

#### Split the data into train and testing data

In [None]:
train,test = X_training.randomSplit([.8, .2])

We will create model


Then cross validate it , 
select the best model
Get the root mean squared error

Finally do the prediction on test data, all for following models

#### Linear Regression

In [None]:
lr = LinearRegression(maxIter=1000,labelCol= "logerror_out")
pipeline = Pipeline(stages=[lr])
paramGrid = ParamGridBuilder().addGrid(lr.regParam,[0.01,0.3]).addGrid(lr.elasticNetParam,[0.3, 0.1]).build()
modelEvaluator=RegressionEvaluator(labelCol="logerror_out")
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=10)

In [None]:
cvModel = crossval.fit(X_training)

In [None]:
bestModel = cvModel.bestModel

In [None]:
trainingSummary = cvModel.bestModel.stages[-1].summary

In [None]:
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

##### Prediction Accuracy

In [None]:
predictions = bestModel.transform(test)
rmse = modelEvaluator.evaluate(predictions)
print(rmse)

#### Generalized Linear Regression

In [None]:
 glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=1000,labelCol = 'logerror_out')

In [None]:
pipeline = Pipeline(stages=[glr])
paramGrid = ParamGridBuilder().addGrid(glr.regParam,[0.3,0.01]).build()
modelEvaluator=RegressionEvaluator(labelCol="logerror_out")
crossval1 = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=10)

In [None]:
cvModel1 = crossval1.fit(X_training)

In [None]:
bestModel_glm = cvModel1.bestModel

In [None]:
trainingSummary_glm = bestModel_glm.stages[-1].summary

In [None]:
print("Coefficient Standard Errors: " + str(trainingSummary_glm.coefficientStandardErrors))
print("P Values: " + str(trainingSummary_glm.pValues))
print("AIC: " + str(trainingSummary_glm.aic))

##### Prediction Accuracy

In [None]:
predictions_glm = bestModel_glm.transform(test)
rmse_glm = modelEvaluator.evaluate(predictions_glm)
print(rmse_glm)

#### Decision Tree Regression

In [None]:
dt = DecisionTreeRegressor(featuresCol="features",labelCol='logerror_out')

In [None]:
pipeline = Pipeline(stages=[dt])
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [2, 20]).build()
modelEvaluator=RegressionEvaluator(labelCol="logerror_out")
crossval2 = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=10)

In [None]:
cvModel2 = crossval2.fit(X_training)

In [None]:
bestModel_dt = cvModel2.bestModel

In [None]:
predictions_dt = bestModel_dt.transform(test)

#### Prediction Accuracy

In [None]:
rmse_dt = modelEvaluator.evaluate(predictions_dt)
print(rmse_dt)

#### Random Forest Regression

In [None]:
rf = RandomForestRegressor(featuresCol = "features",labelCol='logerror_out')

In [None]:
pipeline = Pipeline(stages=[rf])
paramGrid = ParamGridBuilder().build()
modelEvaluator=RegressionEvaluator(labelCol="logerror_out")
crossval3 = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=10)

In [None]:
cvModel3 = crossval3.fit(X_training)

In [None]:
bestModel_rf = cvModel3.bestModel

In [None]:
predictions_rf = bestModel_rf.transform(test)

#### Prediction Accuracy

In [None]:
rmse_rf = modelEvaluator.evaluate(predictions_rf)
print(rmse_rf)

#### Gradient Boosted Tree Regression

In [None]:
gbt = GBTRegressor(labelCol = 'logerror_out')

In [None]:
pipeline = Pipeline(stages=[gbt])
paramGrid = ParamGridBuilder().build()
modelEvaluator=RegressionEvaluator(labelCol="logerror_out")
crossval4 = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=modelEvaluator,
                          numFolds=10)

In [None]:
cvModel4 = crossval4.fit(X_training)

In [None]:
bestModel_gbt = cvModel4.bestModel

In [None]:
predictions_gbt = bestModel_gbt.transform(test)

In [None]:
rmse_gbt = modelEvaluator.evaluate(predictions_gbt)

##### Predcition Accuracy

In [None]:
print(rmse_gbt)