# Population vs. Median Home Prices
#### *Linear Regression with Single Variable*

*Note, this notebook requires Spark 2.0+*

In [3]:
%scala if (org.apache.spark.BuildInfo.sparkBranch < "2.0") sys.error("Attach this notebook to a cluster running Spark 2.0+")

### Load and parse the data

In [5]:
# Use the Spark CSV datasource with options specifying:
#  - First line of file is a header
#  - Automatically infer the schema of the data
#  - Note that we're using `spark` instead of `sqlContext` now.
data = spark.read.format("com.databricks.spark.csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
data.cache()  # Cache data for faster reuse
data.count()

In [6]:
display(data)

2014 rank,City,State,State Code,2014 Population estimate,2015 median sales price
101,Birmingham,Alabama,AL,212247.0,162.9
125,Huntsville,Alabama,AL,188226.0,157.7
122,Mobile,Alabama,AL,194675.0,122.5
114,Montgomery,Alabama,AL,200481.0,129.0
64,Anchorage[19],Alaska,AK,301010.0,
78,Chandler,Arizona,AZ,254276.0,
86,Gilbert[20],Arizona,AZ,239277.0,
88,Glendale,Arizona,AZ,237517.0,
38,Mesa,Arizona,AZ,464704.0,
148,Peoria,Arizona,AZ,166934.0,


In [7]:
data.printSchema()

In [8]:
data = data.dropna()  # drop rows with missing values
data.count()

In [9]:
# This will let us access the table from our SQL notebook!
#  Note - we're using `createOrReplaceTempView` instead of `registerTempTable`
data.createOrReplaceTempView("data_geo")

In [10]:
%sql select City, `State Code`, `2014 Population estimate`/1000 as `2014 Pop estimate`, `2015 median sales price` from data_geo

City,State Code,2014 Pop estimate,2015 median sales price
Birmingham,AL,212.247,162.9
Huntsville,AL,188.226,157.7
Mobile,AL,194.675,122.5
Montgomery,AL,200.481,129.0
Phoenix,AZ,1537.058,206.1
Tucson,AZ,527.972,178.1
Little Rock,AR,197.706,131.8
Anaheim,CA,346.997,685.7
Los Angeles,CA,3928.864,434.7
Riverside,CA,319.504,281.0


## Limit data to Population vs. Price
(for our ML example)

We also use VectorAssembler to put this together

In [12]:
# Create DataFrame with just the data we want to run linear regression
df = spark.sql("select `2014 Population estimate`, `2015 median sales price` as label from data_geo")
display(df)


2014 Population estimate,label
212247,162.9
188226,157.7
194675,122.5
200481,129.0
1537058,206.1
527972,178.1
197706,131.8
346997,685.7
3928864,434.7
319504,281.0


In [13]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["2014 Population estimate"],
    outputCol="features")
output = assembler.transform(df)
display(output.select("features", "label"))

features,label
"List(1, 1, List(), List(212247.0))",162.9
"List(1, 1, List(), List(188226.0))",157.7
"List(1, 1, List(), List(194675.0))",122.5
"List(1, 1, List(), List(200481.0))",129.0
"List(1, 1, List(), List(1537058.0))",206.1
"List(1, 1, List(), List(527972.0))",178.1
"List(1, 1, List(), List(197706.0))",131.8
"List(1, 1, List(), List(346997.0))",685.7
"List(1, 1, List(), List(3928864.0))",434.7
"List(1, 1, List(), List(319504.0))",281.0


## Linear Regression

**Goal**
* Predict y = 2015 Median Housing Price
* Using feature x = 2014 Population Estimate

**References**
* [MLlib LinearRegression user guide](http://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression)
* [PySpark LinearRegression API](http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegression)

In [15]:
# Import LinearRegression class
from pyspark.ml.regression import LinearRegression

# Define LinearRegression algorithm
lr = LinearRegression()

# Fit 2 models, using different regularization parameters
modelA = lr.fit(output, {lr.regParam:0.0})
modelB = lr.fit(output, {lr.regParam:100.0})

In [16]:
print ">>>> ModelA intercept: %r, coefficient: %r" % (modelA.intercept, modelA.coefficients[0])

In [17]:
print ">>>> ModelB intercept: %r, coefficient: %r" % (modelB.intercept, modelB.coefficients[0])

## Make predictions

Calling `transform()` on data adds a new column of predictions.

In [19]:
# Make predictions
predictionsA = modelA.transform(output)
display(predictionsA)

## Evaluate the Model
#### Predicted vs. True label

In [21]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(predictionsA)
print("ModelA: Root Mean Squared Error = " + str(RMSE))

In [22]:
predictionsB = modelB.transform(output)
RMSE = evaluator.evaluate(predictionsB)
print("ModelB: Root Mean Squared Error = " + str(RMSE))

# Linear Regression Plots

In [24]:
import numpy as np
from pandas import *
from ggplot import *

pop = output.rdd.map(lambda p: (p.features[0])).collect()
price = output.rdd.map(lambda p: (p.label)).collect()
predA = predictionsA.select("prediction").rdd.map(lambda r: r[0]).collect()
predB = predictionsB.select("prediction").rdd.map(lambda r: r[0]).collect()

pydf = DataFrame({'pop':pop,'price':price,'predA':predA, 'predB':predB})

## View the Python Pandas DataFrame (pydf)

In [26]:
pydf

## ggplot figure
Now that the Python Pandas DataFrame (pydf), use ggplot and display the scatterplot and the two regression models

In [28]:
p = ggplot(pydf, aes('pop','price')) + \
    geom_point(color='blue') + \
    geom_line(pydf, aes('pop','predA'), color='red') + \
    geom_line(pydf, aes('pop','predB'), color='green') + \
    scale_x_log10() + scale_y_log10()
display(p)