In [1]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import *

In [2]:
sales = spark.read.csv('/FileStore/tables/home_data.csv',header = True,inferSchema = True)

In [3]:
sales.show()

In [4]:
sales.select("sqft_living","price").show()

In [5]:
sales.printSchema()

In [6]:
sales.dtypes

In [7]:
sales.cache()

In [8]:
sqft_living_assembler = VectorAssembler(inputCols=["sqft_living"],outputCol="features")

In [9]:
sqft_living_df= sqft_living_assembler.transform(sales)

In [10]:
(trainingData, testData) = sqft_living_df.randomSplit([0.8, 0.2],seed = 11L)

In [11]:
sqft_living_lr = LinearRegression(labelCol="price", featuresCol="features")

In [12]:
sqft_living_lr_model = sqft_living_lr.fit(trainingData)

In [13]:
sqft_living_predictions = sqft_living_lr_model.transform(testData)

In [14]:
sqft_living_predictions.select("prediction","price").show()

In [15]:
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")

In [16]:
sqft_rmse = evaluator.evaluate(sqft_living_predictions) 

In [17]:
print("Root Mean Squared Error (RMSE) on test data = %g" % sqft_rmse)

In [18]:
my_features = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'zipcode']
sales.select(my_features).show()
my_features_assembler = VectorAssembler(inputCols=my_features,outputCol="features")
my_features_df= my_features_assembler.transform(sales)
(my_features_trainingData, my_features_testData) = my_features_df.randomSplit([0.8, 0.2],seed = 11L)
my_features_lr = LinearRegression(labelCol="price", featuresCol="features")
my_features_model = my_features_lr.fit(my_features_trainingData)
my_features_predictions = my_features_model.transform(my_features_testData)
my_features_predictions.select("prediction", "price", "features").show(5)
my_features_testData.select(mean('price').alias('price_value')).collect()  #544128.029162747
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(my_features_predictions) #

In [19]:
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) #273356 (previous 281234)

Pipeline

In [21]:
(my_features_trainingData, my_features_testData) = sales.randomSplit([0.8, 0.2],seed = 11L)
my_features_assembler = VectorAssembler(inputCols=my_features,outputCol="features")
my_features_lr = LinearRegression(labelCol="price", featuresCol="features")

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[my_features_assembler, my_features_lr])
pipelineModel = pipeline.fit(my_features_trainingData)
predictions = pipelineModel.transform(my_features_testData)
predictions.select("price", "prediction", "features").show()
rmse = evaluator.evaluate(predictions)
print "RMSE on our test set: %g" % rmse

DecisionTreeRegressor

In [23]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price", featuresCol="features")
pipeline_dt = Pipeline(stages=[my_features_assembler, dt])
pipeline_dt_Model = pipeline_dt.fit(my_features_trainingData)
predictions_dt = pipeline_dt_Model.transform(my_features_testData)
predictions_dt.select("price", "prediction", "features").show()
rmse_dt = evaluator.evaluate(predictions_dt)
print "rmse_dt on our test set: %g" % rmse_dt


RandomForestRegressor

In [25]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", featuresCol="features")
pipeline_rf = Pipeline(stages=[my_features_assembler, rf])
pipeline_rf_Model = pipeline_rf.fit(my_features_trainingData)
predictions_rf = pipeline_rf_Model.transform(my_features_testData)
predictions_rf.select("price", "prediction", "features").show()
rmse_rf = evaluator.evaluate(predictions_rf)
print "rmse_rf on our test set: %g" % rmse_rf

Gradient-boosted trees (GBTs)

In [27]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="price", featuresCol="features")
pipeline_gbt = Pipeline(stages=[my_features_assembler, gbt])
pipeline_gbt_Model = pipeline_gbt.fit(my_features_trainingData)
predictions_gbt = pipeline_gbt_Model.transform(my_features_testData)
predictions_gbt.select("price", "prediction", "features").show()
rmse_gbt = evaluator.evaluate(predictions_gbt)
print "rmse_gbt on our test set: %g" % rmse_gbt

### RMSE
https://www.google.co.in/search?q=house+value+regression+rmse&rlz=1C5CHFA_enIN727IN727&source=lnms&tbm=isch&sa=X&ved=0ahUKEwir67KWhK_cAhWJiqYKHZjbBmcQ_AUICigB&biw=1440&bih=733#imgrc=197smqAHQdXblM:
  
https://spark.apache.org/docs/latest/ml-pipeline.html

https://www.kaggle.com/c/house-prices-advanced-regression-techniques