## House Price Prediction using Linear Regression Model (PySpark & MLLib)
This is an attempt to use Linear Prediction to predict the price of houses from a test data set for the city of Sacramento.
The data file has attributes like street, city, zip, state, beds, baths, sq__ft, type, sale_date, price, latitude, longitude.

In [2]:
from pyspark.sql.types import *
from pyspark.sql import Row

## Data Loading
We get the file using wget command. The file has been uploaded to dropbox for easy access.

In [4]:
%sh
wget 'https://www.dropbox.com/s/a6l14d0ngrk7x2i/Sacramentorealestatetransactions.csv?dl=0' -O houses_data.csv

houses_data = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .option('inferSchema', 'true')\
  .load("file:/databricks/driver/houses_data.csv")
#houses = sqlContext.read.format('csv').options(header='true', inferSchema='true').load('/FileStore/tables/Sacramentorealestatetransactions.csv')

Here, we take a look at the data present in the CSV file

In [6]:
rdd = sc.textFile('file:/databricks/driver/houses_data.csv')
rdd.take(5)


The following step maps the RDD by splitting the lines which are ',' delimited and convert them to RDD's tuple format.

In [8]:
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)

Next, we use the 'filter' transformation to remove any lines which contain headers, from the RDD.

In [10]:
header = rdd.first()
rdd = rdd.filter(lambda line:line != header)
rdd.take(2)

Converting the RDD to DataFrame

In [12]:
df = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9])).toDF()
df.show()

In [13]:
df.toPandas().head()

In Cmd 15, we group the data on the number of beds and count the houses based on that data. We notice that there are 108 houses with '0' beds which is offcourse, not possible.

In [15]:
df.groupBy("beds").count().show()

In [16]:
df.describe(['baths', 'beds','price','sqft']).show()

Importing necessary modules for Linear Regression

In [18]:
import pyspark.mllib
import pyspark.mllib.regression
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *

We start by creating a dataframe df that has only the subset of features we're interested in. We're going to predict the price of the house from the number of baths, beds, and square feet.

In [20]:
df = df.select('price','baths','beds','sqft')

Here, we exclude all suspicious values from the dataframe for any of the features we are using for prediction.

In [22]:
df = df[df.baths > 0]
df = df[df.beds > 0]
df = df[df.sqft > 0]
df.describe(['baths','beds','price','sqft']).show()

## Labeled Points
The features should be expressed with LabeledPoints. The required format for a labeled point is a tuple of the response value and a vector of predictors. We can call 'map' on df in order to return an RDD of LabeledPoints.

In [24]:
temp = df.rdd.map(lambda line:LabeledPoint(line[0],[line[1:]]))
temp.take(5)

## Data Scaling
We are using Stochastic Gradient Descent and the square footage of the houses is quite large in comparison to the number of bedrooms and bathrooms. Hence, we use the StandardScaler to scale required data for easy calculations and metrics.

In [26]:
from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

In [27]:
features = df.rdd.map(lambda row: row[1:])
features.take(5)

In [28]:
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)
features_transform.take(5)

## Labels with features
The labels (prices) are in a DataFrame and the scaled features are in the new RDD we created.
We map the RDD which gets all the prices (zero elements) from each row.

In [30]:
lab = df.rdd.map(lambda row: row[0])
lab.take(5)

In [31]:
transformedData = lab.zip(features_transform)
transformedData.take(5)

In [32]:
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))
transformedData.take(5)

Splitting the datasets into training and testing datasets

In [34]:
trainingData, testingData = transformedData.randomSplit([.8,.2],seed=1234)

Importing linear regression with stochastic gradient descent and building a model. The number of iterations is specified along with the step size and the data set.

In [36]:
from pyspark.mllib.regression import LinearRegressionWithSGD
linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)

Now we can pull features such as coefficients and intercepts from the model.

In [38]:
linearModel.weights

We are showing the first 10 points from the test set. We can make a prediction on one of the points using our model.

In [40]:
testingData.take(10)

## Prediction on one data point from the test dataset
Output is the projected price of the house in USD.

In [42]:
linearModel.predict([1.49297445326,3.52055958053,1.73535287287])