In [1]:
!pip install sodapy
import pandas as pd
from sodapy import Socrata
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
import cassandra
from cassandra.cluster import Cluster
import pandas as pd
import csv
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark import SparkConf



In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
spark = create_spark_session()

In [None]:
client = Socrata('data.edmonton.ca',
                  '47asXuyWaxWsAxOt3EGvqq40r',
                  username='8fhzefz0bvnvu12f223cjk9e7',
                  password='245x2cx2xwx08mvusu7aml8f4n94bvdlvm6a31k6miu3c8cpz0',
                 timeout=30)
batch_size = 3500
batchs = 965
count = 0
for batch in range(batchs):
    offset = batch * batch_size
    results = client.get("qi6a-xuwt", limit=batch_size, offset=offset)
    results_df = pd.DataFrame.from_records(results)
    columns = ['account_number', 'suite', 'lot_size', 'assessed_value', 'assessment_year', 'garage', 'house_number', 'mill_class_1', 'neighborhood_name', 'street_name', 'year_built', 'zoning']
    df1 = pd.DataFrame(results_df, columns=columns)
    
    # Convert assessment year and value to numeric value
    df1["assessment_year"] = df1["assessment_year"].astype(str).astype(int)
    df1["assessed_value"] = df1["assessed_value"].astype(str).astype(float)
    df1["lot_size"] = df1["lot_size"].astype(str).astype(float)
    # Fill in missing data for house number with 0 & change its data type to integer
    df1['house_number'] = df1['house_number'].fillna(0)
    df1["house_number"] = df1["house_number"].astype(str).astype(int)
    df1['year_built'] = df1['year_built'].fillna(0)
    df1["year_built"] = df1["year_built"].astype(str).astype(int)
    
    # Define data type of each column to be converted to a pyspark dataframe
    houseSchema = StructType([
        StructField('account_number', StringType(), True),
        StructField('suite', StringType(), True),
        StructField("lot_size", FloatType(), True),
        StructField("assessed_value", FloatType(), True),
        StructField("assessment_year", IntegerType(), True),
        StructField("garage", StringType(), True),
        StructField("house_number", IntegerType(), True),
        StructField("mill_class_1", StringType(), True),
        StructField("neighborhood_name", StringType(), True),
        StructField("street_name", StringType(), True),
        StructField("year_built", IntegerType(), True),
        StructField("zoning", StringType(), True)
    ])
    sparkDF_house_value=spark.createDataFrame(df1, schema=houseSchema)    
    # Write the dataframe to a parquet file
    sparkDF_house_value.write.mode('append').parquet("output/houses.parquet")
    count = count + 1
    print(count)

In [3]:
houses = spark.read.parquet("output/houses.parquet")
houses.count()

3377500

## Prepare data:

* Gather necessary data to answer your questions
    * Handle categorical and missing data
    * Provide insight into the methods you chose and why you chose them
* Analyze, Model, and Visualize

* Provide a clear connection between your business questions and how the data answers them.

In [4]:
# Necessary imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
%matplotlib inline

#### Question 1: Does residential or commercial real estate have higher Return on Investment (ROI)?

In [5]:
houses.createOrReplaceTempView("houses_table")

In [None]:
# find out how many years the dataset has
spark.sql("SELECT distinct assessment_year FROM houses_table ORDER BY assessment_year").show()

In [None]:
# Other Residential = Multi-family (apartments & townhouses)
# Residential with suite = Individual townhouse/ condo/ apartment
final_df = spark.sql("SELECT 2012 AS year, mill_class_1, AVG(assessed_value) AS average_value FROM houses_table WHERE assessment_year = 2012 GROUP BY mill_class_1 \
        UNION ALL (SELECT 2020 AS year, mill_class_1, AVG(assessed_value) AS average_value FROM houses_table WHERE assessment_year = 2020 GROUP BY mill_class_1)")

In [None]:
final_df.createOrReplaceTempView("houses_data_table")

In [None]:
spark.sql("SELECT * from houses_data_table WHERE year = 2012").show()

In [None]:
ROI_df = spark.sql("SELECT a.mill_class_1, a.average_value AS 2012_value, b.average_value AS 2020_value, (b.average_value - a.average_value)/a.average_value * 100 AS ROI FROM (SELECT mill_class_1, year, average_value from houses_data_table WHERE year = 2012) a \
        JOIN (SELECT mill_class_1, year, average_value from houses_data_table WHERE year = 2020) b ON a.mill_class_1 = b.mill_class_1 \
        WHERE a.mill_class_1 = 'RESIDENTIAL' OR a.mill_class_1 = 'OTHER RESIDENTIAL' ORDER BY (b.average_value - a.average_value)/a.average_value * 100 desc")

In [None]:
ROI_df.createOrReplaceTempView("ROI_table")
final_ROI_df = spark.sql("SELECT ROI, CASE WHEN mill_class_1 = 'OTHER RESIDENTIAL' THEN 'Multi family' ELSE 'Single family' END AS property_type FROM ROI_table")

In [None]:
final_ROI_df = final_ROI_df.toPandas()
final_ROI_df= final_ROI_df.sort_values('ROI',ascending=False)
final_ROI_df.head()

In [None]:
# Return the Series having unique values
df_sorted_desc= final_ROI_df.sort_values('ROI',ascending=False)
df_sorted_desc.head()
plt.bar("property_type", "ROI", data=final_ROI_df)
# Labeling the axes
plt.xlabel('Property Type')
plt.ylabel('Return on Investment (%)')
plt.title("Return on Investment for Property Types (2012 - 2020)", size=14)
plt.savefig("bar_plot_ROI_by_property_type.png")
# Dsiplay the plot
plt.show()

#### Question 2: Is Multi Family a Riskier Investment than Single Family?

In [None]:
# Performance of Multi Family & Single Family over years
spark.sql("SELECT * from houses_table").show()

In [None]:
performance_df = spark.sql("SELECT AVG(assessed_value) AS avg_value, assessment_year, mill_class_1 from houses_table WHERE mill_class_1 = 'RESIDENTIAL' OR mill_class_1 = 'OTHER RESIDENTIAL' GROUP BY assessment_year, mill_class_1 ")

In [None]:
performance_df.createOrReplaceTempView("performance_table")
final_performance_df = spark.sql("SELECT avg_value, assessment_year, CASE WHEN mill_class_1 = 'OTHER RESIDENTIAL' THEN 'Multi family' ELSE 'Single family' END AS property_type FROM performance_table")
final_performance_df.createOrReplaceTempView("final_performance_table")
multi_family_performance = spark.sql("SELECT * FROM final_performance_table WHERE property_type='Multi family'")
single_family_performance = spark.sql("SELECT * FROM final_performance_table WHERE property_type='Single family'")

In [None]:
multi_family_performance.show()

In [None]:
# create data
multi_family_df = multi_family_performance.toPandas()
multi_family_df= multi_family_df.sort_values('assessment_year',ascending=True)

single_family_df = single_family_performance.toPandas()
single_family_df= single_family_df.sort_values('assessment_year',ascending=True)
# plot lines
plt.plot("assessment_year", "avg_value", data=multi_family_df, label = "Multi family")
plt.plot("assessment_year", "avg_value", data=single_family_df, label = "Single family")
# Labeling the axes
plt.xlabel('Assessment Year')
plt.ylabel('Average Value ($)')
plt.title("Average Value Over Years per Property Type in Edmonton", size=14)
plt.legend()
plt.savefig("line_chart_by_property_type.png")
plt.show()

#### Question 3: How well can we predict a property's value? What aspects correlate well to the value of a property?

In [None]:
# suite -> condo/ townhouse; lot_size; garage; street_name; year; mill_class_1 (multi family or single family); year_built, zoning
# https://towardsdatascience.com/logistic-regression-using-python-sklearn-numpy-mnist-handwriting-recognition-matplotlib-a6b31e2b166a

In [6]:
final_prediction_df = spark.sql("SELECT CASE WHEN mill_class_1 = 'OTHER RESIDENTIAL' THEN 'Multi family' ELSE 'Single family' END AS property_type, lot_size, garage, street_name, year_built, zoning, CASE WHEN suite NOT LIKE '%NaN' THEN 'Condominium' ELSE 'Detached House' END AS type, assessed_value FROM houses_table WHERE assessment_year = 2019")

In [7]:
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|   RF4|Detached House|      360000.0|
|Single family| 515.619|     N|     CHIVERS LOOP SW|         0|   RSL|Detached House|      157500.0|
| Multi family| 217.562|     N|ADMIRAL GIROUARD ...|         0|  RF5g|Detached House|       92500.0|
|Single family|   1.704|     N|                 NaN|      2017|   CB2|Detached House|       14500.0|
|Single family| 333.424|     N|        64 STREET NW|         0|   RMD|Detached House|      

In [8]:
# Data Exploration
final_prediction_df.cache()
final_prediction_df.printSchema()

root
 |-- property_type: string (nullable = false)
 |-- lot_size: float (nullable = true)
 |-- garage: string (nullable = true)
 |-- street_name: string (nullable = true)
 |-- year_built: integer (nullable = true)
 |-- zoning: string (nullable = true)
 |-- type: string (nullable = false)
 |-- assessed_value: float (nullable = true)



In [9]:
# Apply OneHotEncoder to property_type, garage, street_name, zoning, and type columns
# Apply StringIndexer to string columns first
# import required libraries
from pyspark.ml.feature import StringIndexer
property_type_indexer = StringIndexer(inputCol="property_type", outputCol="propertyTypeIndex")
#Fits a model to the input dataset with optional parameters.
final_prediction_df = property_type_indexer.fit(final_prediction_df).transform(final_prediction_df)
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|   RF4|Detached House|      360000.0|              0.0|
|Single family| 515.619|     N|     CHIVERS LOOP SW|         0|   RSL|Detached House|      157500.0|              0.0|
| Multi family| 217.562|     N|ADMIRAL GIROUARD ...|         0|  RF5g|Detached House|       92500.0|              1.0|
|Single family|   1.704|     N|                 

In [10]:
# Reference: https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a
# https://medium.com/@nutanbhogendrasharma/feature-transformer-vectorassembler-in-pyspark-ml-feature-part-3-b3c2c3c93ee9

In [11]:
# Apply OneHotEncoder to property_type, garage, street_name, zoning, and type columns
# Apply StringIndexer to string columns first
# import required libraries
from pyspark.ml.feature import StringIndexer
garage_indexer = StringIndexer(inputCol="garage", outputCol="garageIndex")
#Fits a model to the input dataset with optional parameters.
final_prediction_df = garage_indexer.fit(final_prediction_df).transform(final_prediction_df)
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|garageIndex|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|        0.0|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|        0.0|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|   RF4|Detached House|      360000.0|              0.0|        0.0|
|Single family| 515.619|     N|     CHIVERS LOOP SW|         0|   RSL|Detached House|      157500.0|              0.0|        1.0|
| Multi family| 217.562|     N|ADMIRAL GIROUARD ...|         0|  RF5g|Detached Hous

In [12]:
# Apply OneHotEncoder to property_type, garage, street_name, zoning, and type columns
# Apply StringIndexer to string columns first
# import required libraries
from pyspark.ml.feature import StringIndexer
street_name_indexer = StringIndexer(inputCol="street_name", outputCol="streetNameIndex")
#Fits a model to the input dataset with optional parameters.
final_prediction_df = street_name_indexer.fit(final_prediction_df).transform(final_prediction_df)
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|garageIndex|streetNameIndex|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|        0.0|          599.0|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|        0.0|         1508.0|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|   RF4|Detached House|      360000.0|              0.0|        0.0|         1194.0|
|Single family| 515.619|     N|     CHIVERS LOOP SW|         0|   RSL|Detached House|      157500.0|              0.0|

In [13]:
# Apply OneHotEncoder to property_type, garage, street_name, zoning, and type columns
# Apply StringIndexer to string columns first
# import required libraries
from pyspark.ml.feature import StringIndexer
zoning_indexer = StringIndexer(inputCol="zoning", outputCol="zoningIndex")
#Fits a model to the input dataset with optional parameters.
final_prediction_df = zoning_indexer.fit(final_prediction_df).transform(final_prediction_df)
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|garageIndex|streetNameIndex|zoningIndex|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|        0.0|          599.0|       11.0|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|        0.0|         1508.0|        2.0|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|   RF4|Detached House|      360000.0|              0.0|        0.0|         1194.0|        4.0|
|Single family| 515.619|     N|     CHIVERS LO

In [14]:
# Apply OneHotEncoder to property_type, garage, street_name, zoning, and type columns
# Apply StringIndexer to string columns first
# import required libraries
from pyspark.ml.feature import StringIndexer
type_indexer = StringIndexer(inputCol="type", outputCol="typeIndex")
#Fits a model to the input dataset with optional parameters.
final_prediction_df = type_indexer.fit(final_prediction_df).transform(final_prediction_df)
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+---------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|garageIndex|streetNameIndex|zoningIndex|typeIndex|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+---------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|        0.0|          599.0|       11.0|      0.0|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|        0.0|         1508.0|        2.0|      0.0|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|   RF4|Detached House|      360000.0|              0.0|        0.0|         1194.0|        4

In [15]:
# Now apply OneHotEncoder to string columns
from pyspark.ml.feature import OneHotEncoder

#onehotencoder to propertyTypeIndex
onehotencoder_property_type_vector = OneHotEncoder(inputCol="propertyTypeIndex", outputCol="property_type_vec")
final_prediction_df = onehotencoder_property_type_vector.transform(final_prediction_df)
final_prediction_df.show()

+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+---------+-----------------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|garageIndex|streetNameIndex|zoningIndex|typeIndex|property_type_vec|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+---------+-----------------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|        0.0|          599.0|       11.0|      0.0|    (1,[0],[1.0])|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|        0.0|         1508.0|        2.0|      0.0|    (1,[0],[1.0])|
|Single family| 277.888|     Y|    CARTMELL ROAD SW|      2019|  

In [16]:
# Now apply OneHotEncoder to string columns
# garage, street_name, zoning, and type
from pyspark.ml.feature import OneHotEncoder

#onehotencoder to propertyTypeIndex
onehotencoder_garage_vector = OneHotEncoder(inputCol="garageIndex", outputCol="garage_vec")
final_prediction_df = onehotencoder_garage_vector.transform(final_prediction_df)
final_prediction_df.show()

# Now apply OneHotEncoder to string columns
from pyspark.ml.feature import OneHotEncoder

#onehotencoder to propertyTypeIndex
onehotencoder_street_name_vector = OneHotEncoder(inputCol="streetNameIndex", outputCol="street_name_vec")
final_prediction_df = onehotencoder_street_name_vector.transform(final_prediction_df)
final_prediction_df.show()

# Now apply OneHotEncoder to string columns
from pyspark.ml.feature import OneHotEncoder

#onehotencoder to propertyTypeIndex
onehotencoder_zoning_vector = OneHotEncoder(inputCol="zoningIndex", outputCol="zoning_vec")
final_prediction_df = onehotencoder_zoning_vector.transform(final_prediction_df)
final_prediction_df.show()


+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+---------+-----------------+-------------+
|property_type|lot_size|garage|         street_name|year_built|zoning|          type|assessed_value|propertyTypeIndex|garageIndex|streetNameIndex|zoningIndex|typeIndex|property_type_vec|   garage_vec|
+-------------+--------+------+--------------------+----------+------+--------------+--------------+-----------------+-----------+---------------+-----------+---------+-----------------+-------------+
|Single family| 262.395|     Y|EAGLESON CRESCENT NW|      2019|   RMD|Detached House|      350000.0|              0.0|        0.0|          599.0|       11.0|      0.0|    (1,[0],[1.0])|(1,[0],[1.0])|
|Single family| 436.861|     Y|     CRAWFORD WAY SW|      2019|   RSL|Detached House|      508000.0|              0.0|        0.0|         1508.0|        2.0|      0.0|    (1,[0],[1.0])|(1,[0],[1.

In [17]:
#onehotencoder to propertyTypeIndex
onehotencoder_type_vector = OneHotEncoder(inputCol="typeIndex", outputCol="type_vec")
final_prediction_df = onehotencoder_type_vector.transform(final_prediction_df)

# Feature transformer — VectorAssembler
from pyspark.ml.feature import VectorAssembler
final_prediction_df.columns

['property_type',
 'lot_size',
 'garage',
 'street_name',
 'year_built',
 'zoning',
 'type',
 'assessed_value',
 'propertyTypeIndex',
 'garageIndex',
 'streetNameIndex',
 'zoningIndex',
 'typeIndex',
 'property_type_vec',
 'garage_vec',
 'street_name_vec',
 'zoning_vec',
 'type_vec']

In [18]:
inputCols = [
    'lot_size',
    'year_built',
    'propertyTypeIndex',
    'garageIndex',
    'streetNameIndex',
    'zoningIndex',
    'typeIndex',
    'property_type_vec',
    'garage_vec',
    'street_name_vec',
    'zoning_vec',
    'type_vec'
]
outputCol = "features"
df_va = VectorAssembler(inputCols = inputCols, outputCol = outputCol)
final_prediction_df = df_va.setHandleInvalid("skip").transform(final_prediction_df)
final_prediction_df.select(['features']).toPandas().head(5)

Unnamed: 0,features
0,"(262.394989014, 2019.0, 0.0, 0.0, 599.0, 11.0,..."
1,"(436.860992432, 2019.0, 0.0, 0.0, 1508.0, 2.0,..."
2,"(277.888000488, 2019.0, 0.0, 0.0, 1194.0, 4.0,..."
3,"(515.619018555, 0.0, 0.0, 1.0, 1283.0, 2.0, 0...."
4,"(217.56199646, 0.0, 1.0, 1.0, 1556.0, 54.0, 0...."


In [19]:
new_df = final_prediction_df.select(['features','assessed_value'])
new_df.show()

+--------------------+--------------+
|            features|assessed_value|
+--------------------+--------------+
|(2954,[0,1,4,5,7,...|      350000.0|
|(2954,[0,1,4,5,7,...|      508000.0|
|(2954,[0,1,4,5,7,...|      360000.0|
|(2954,[0,3,4,5,7,...|      157500.0|
|(2954,[0,2,3,4,5,...|       92500.0|
|(2954,[0,1,3,5,7,...|       14500.0|
|(2954,[0,3,4,5,7,...|      144500.0|
|(2954,[0,1,4,5,7,...|      538000.0|
|(2954,[0,1,4,5,6,...|      327500.0|
|(2954,[0,3,4,5,7,...|      126000.0|
|(2954,[0,1,3,4,5,...|     1026500.0|
|(2954,[0,3,4,5,7,...|       85000.0|
|(2954,[0,1,4,5,7,...|      349000.0|
|(2954,[0,3,4,5,7,...|       38500.0|
|(2954,[0,3,4,5,7,...|      113000.0|
|(2954,[0,3,4,5,7,...|      168000.0|
|(2954,[0,1,3,4,5,...|      289500.0|
|(2954,[0,3,4,5,7,...|      123500.0|
|(2954,[0,3,4,5,7,...|      130000.0|
|(2954,[0,3,4,5,7,...|      164000.0|
+--------------------+--------------+
only showing top 20 rows



In [20]:
# Split data set into training and set datasets
splits = new_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [21]:
train_df.count()

279074

In [22]:
test_df.count()

119805

In [None]:
# Apply Gradient-boosted tree regression algorithm
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'assessed_value', maxIter=10,  maxBins=2846)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'assessed_value', 'features').show(5)


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
gbt_evaluator = RegressionEvaluator(labelCol="assessed_value", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
print("Root Mean Squared Error (RMSE) on test data = %g" % int(rmse))

##### The Root Mean Squared Error of 7,508,308 is unacceptable. Although with almost 400000 data rows, the root mean squared error is still very high. Therefore, we are very close to conclude that the features we picked are not all important factors in determining the price of a home. Let's back it up through calculating the feature importance. 

In [None]:
 gbt_model.featureImportances