In [1]:
data = spark.read.csv("/databricks-datasets/samples/population-vs-price/data_geo.csv", header="true", inferSchema="true")
data.cache()

In [2]:
display(data)

2014 rank,City,State,State Code,2014 Population estimate,2015 median sales price
101,Birmingham,Alabama,AL,212247.0,162.9
125,Huntsville,Alabama,AL,188226.0,157.7
122,Mobile,Alabama,AL,194675.0,122.5
114,Montgomery,Alabama,AL,200481.0,129.0
64,Anchorage[19],Alaska,AK,301010.0,
78,Chandler,Arizona,AZ,254276.0,
86,Gilbert[20],Arizona,AZ,239277.0,
88,Glendale,Arizona,AZ,237517.0,
38,Mesa,Arizona,AZ,464704.0,
148,Peoria,Arizona,AZ,166934.0,


In [3]:
from pyspark.sql.functions import col
data = data.dropna()
exprs = [col(column).alias(column.replace(' ','_')) for column in data.columns]

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

vdata = data.select(*exprs).selectExpr("2014_Population_estimate as population", "2015_median_sales_price as label")
stages = []
assembler = VectorAssembler(inputCols=["population"], outputCol="features")
stages += [assembler]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(vdata)
dataset = pipelineModel.transform(vdata)
selectedcols = ["features", "label"]

In [5]:
display(dataset.select(selectedcols))

features,label
"List(1, 1, List(), List(212247.0))",162.9
"List(1, 1, List(), List(188226.0))",157.7
"List(1, 1, List(), List(194675.0))",122.5
"List(1, 1, List(), List(200481.0))",129.0
"List(1, 1, List(), List(1537058.0))",206.1
"List(1, 1, List(), List(527972.0))",178.1
"List(1, 1, List(), List(197706.0))",131.8
"List(1, 1, List(), List(346997.0))",685.7
"List(1, 1, List(), List(3928864.0))",434.7
"List(1, 1, List(), List(319504.0))",281.0


In [6]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression()
modelA = lr.fit(dataset, {lr.regParam:0.0})
modelB = lr.fit(dataset, {lr.regParam:100.0})

In [7]:
predictionsA = modelA.transform(dataset)
display(predictionsA)

population,label,features,prediction
212247,162.9,"List(1, 1, List(), List(212247.0))",199.3167659584664
188226,157.7,"List(1, 1, List(), List(188226.0))",198.40882267887196
194675,122.5,"List(1, 1, List(), List(194675.0))",198.65258131548592
200481,129.0,"List(1, 1, List(), List(200481.0))",198.87203590444247
1537058,206.1,"List(1, 1, List(), List(1537058.0))",249.39183544694856
527972,178.1,"List(1, 1, List(), List(527972.0))",211.25050693302884
197706,131.8,"List(1, 1, List(), List(197706.0))",198.7671467407576
346997,685.7,"List(1, 1, List(), List(346997.0))",204.4100325554172
3928864,434.7,"List(1, 1, List(), List(3928864.0))",339.79707185649573
319504,281.0,"List(1, 1, List(), List(319504.0))",203.37085497805197


In [8]:
predictionsB = modelB.transform(dataset)
display(predictionsB)

population,label,features,prediction
212247,162.9,"List(1, 1, List(), List(212247.0))",204.43640360159205
188226,157.7,"List(1, 1, List(), List(188226.0))",203.91746594049368
194675,122.5,"List(1, 1, List(), List(194675.0))",204.05678690866415
200481,129.0,"List(1, 1, List(), List(200481.0))",204.18221682666663
1537058,206.1,"List(1, 1, List(), List(1537058.0))",233.05695735611485
527972,178.1,"List(1, 1, List(), List(527972.0))",211.25716847608865
197706,131.8,"List(1, 1, List(), List(197706.0))",204.12226711559933
346997,685.7,"List(1, 1, List(), List(346997.0))",207.34747515702293
3928864,434.7,"List(1, 1, List(), List(3928864.0))",284.72833704226645
319504,281.0,"List(1, 1, List(), List(319504.0))",206.7535301457171


In [9]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(predictionsA)
print("ModelA: Root Mean Squared Error = " + str(RMSE))

In [10]:
predictionsB = modelB.transform(dataset)
RMSE = evaluator.evaluate(predictionsB)
print("ModelB: Root Mean Squared Error = " + str(RMSE))

In [11]:
import numpy as np
import matplotlib.pyplot as plt

x = dataset.rdd.map(lambda p: (p.features[0])).collect()
y = dataset.rdd.map(lambda p: (p.label)).collect()

plt.style.use('classic')
plt.rcParams['lines.linewidth'] = 0
fig, ax = plt.subplots()
ax.loglog(x,y)
plt.xlim(1.0e5, 1.0e7)
plt.ylim(5.0e1, 1.0e3)
ax.scatter(x, y, c="blue")

display(fig)


In [12]:
display(modelA,dataset)

fitted values,residuals
199.3167659584664,-36.41676595846639
198.40882267887196,-40.70882267887194
198.65258131548592,-76.15258131548592
198.87203590444247,-69.87203590444247
211.25050693302884,-33.15050693302885
198.7671467407576,-66.96714674075758
204.4100325554172,481.2899674445829
203.37085497805197,77.62914502194806
209.63377749220228,66.16622250779773
243.495779319366,266.80422068063405


In [13]:
pop = dataset.rdd.map(lambda p: (p.features[0])).collect()
price = dataset.rdd.map(lambda p: (p.label)).collect()
predA = predictionsA.select("prediction").rdd.map(lambda r: r[0]).collect()
predB = predictionsB.select("prediction").rdd.map(lambda r: r[0]).collect()

pydf = DataFrame({'pop':pop,'price':price,'predA':predA, 'predB':predB})

In [14]:
pydf

Unnamed: 0,pop,price,predA,predB
0,212247.0,162.9,199.316766,204.436404
1,188226.0,157.7,198.408823,203.917466
2,194675.0,122.5,198.652581,204.056787
3,200481.0,129.0,198.872036,204.182217
4,1537058.0,206.1,249.391835,233.056957
5,527972.0,178.1,211.250507,211.257168
6,197706.0,131.8,198.767147,204.122267
7,346997.0,685.7,204.410033,207.347475
8,3928864.0,434.7,339.797072,284.728337
9,319504.0,281.0,203.370855,206.753530


In [15]:
fig, ax = plt.subplots()
ax.loglog(x,y)
ax.scatter(x, y)
plt.xlim(1.0e5, 1.0e7)
plt.ylim(5.0e1, 1.0e3)
ax.plot(pop, predA, '.r-')
ax.plot(pop, predB, '.g-')
display(fig)