In [84]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
import seaborn as sns
import matplotlib.pyplot as plt

In [85]:
spark = SparkSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate()

In [86]:
spark


In [87]:
sc = spark.sparkContext
sc

In [88]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext



<pyspark.sql.context.SQLContext at 0x1c15a01fad0>

In [89]:
HOUSING_DATA = 'housing.csv'


In [90]:
data=pd.read_csv(HOUSING_DATA)
data.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [91]:
schema=StructType([
    StructField("long",FloatType(),nullable=True),
    StructField("lat",FloatType(),nullable=True),
    StructField("medage",FloatType(),nullable=True),
    StructField("totalrooms",FloatType(),nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)

In [92]:
house_def=spark.read.csv(path=HOUSING_DATA,schema=schema).cache()

In [93]:
house_def.take(5)

[Row(long=None, lat=None, medage=None, totalrooms=None, totbdrms=None, pop=None, houshlds=None, medinc=None, medhv=None),
 Row(long=-122.2300033569336, lat=37.880001068115234, medage=41.0, totalrooms=880.0, totbdrms=129.0, pop=322.0, houshlds=126.0, medinc=8.325200080871582, medhv=452600.0),
 Row(long=-122.22000122070312, lat=37.86000061035156, medage=21.0, totalrooms=7099.0, totbdrms=1106.0, pop=2401.0, houshlds=1138.0, medinc=8.301400184631348, medhv=358500.0),
 Row(long=-122.23999786376953, lat=37.849998474121094, medage=52.0, totalrooms=1467.0, totbdrms=190.0, pop=496.0, houshlds=177.0, medinc=7.257400035858154, medhv=352100.0),
 Row(long=-122.25, lat=37.849998474121094, medage=52.0, totalrooms=1274.0, totbdrms=235.0, pop=558.0, houshlds=219.0, medinc=5.643099784851074, medhv=341300.0)]

In [94]:
house_def.show(15)

+-------+-----+------+----------+--------+------+--------+------+--------+
|   long|  lat|medage|totalrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+----------+--------+------+--------+------+--------+
|   null| null|  null|      null|    null|  null|    null|  null|    null|
|-122.23|37.88|  41.0|     880.0|   129.0| 322.0|   126.0|8.3252|452600.0|
|-122.22|37.86|  21.0|    7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|
|-122.24|37.85|  52.0|    1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|
|-122.25|37.85|  52.0|    1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|
|-122.25|37.85|  52.0|    1627.0|   280.0| 565.0|   259.0|3.8462|342200.0|
|-122.25|37.85|  52.0|     919.0|   213.0| 413.0|   193.0|4.0368|269700.0|
|-122.25|37.84|  52.0|    2535.0|   489.0|1094.0|   514.0|3.6591|299200.0|
|-122.25|37.84|  52.0|    3104.0|   687.0|1157.0|   647.0|  3.12|241400.0|
|-122.26|37.84|  42.0|    2555.0|   665.0|1206.0|   595.0|2.0804|226700.0|
|-122.25|37.84|  52.0|   

In [95]:
house_def.columns

['long',
 'lat',
 'medage',
 'totalrooms',
 'totbdrms',
 'pop',
 'houshlds',
 'medinc',
 'medhv']

In [82]:
house_def=house_def.fillna()

AttributeError: 'function' object has no attribute 'fillna'

In [96]:
house_def.printSchema()

root
 |-- long: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- medage: float (nullable = true)
 |-- totalrooms: float (nullable = true)
 |-- totbdrms: float (nullable = true)
 |-- pop: float (nullable = true)
 |-- houshlds: float (nullable = true)
 |-- medinc: float (nullable = true)
 |-- medhv: float (nullable = true)



In [30]:
house_def.select("pop","houshlds").where("medage>51").show(10)

+------+--------+
|   pop|houshlds|
+------+--------+
| 496.0|   177.0|
| 558.0|   219.0|
| 565.0|   259.0|
| 413.0|   193.0|
|1094.0|   514.0|
|1157.0|   647.0|
|1551.0|   714.0|
| 910.0|   402.0|
|1504.0|   734.0|
|1098.0|   468.0|
+------+--------+
only showing top 10 rows



In [35]:
(house_def.describe().select("summary", 
                    F.round("medage",4).alias("medage"),
                    F.round("totalrooms", 4).alias("totalrooms"),
                    F.round("totbdrms", 4).alias("totbdrms"),
                    F.round("pop", 4).alias("pop"),
                    F.round("houshlds", 4).alias("houshlds"),
                    F.round("medinc", 4).alias("medinc"),
                    F.round("medhv", 4).alias("medhv"))
                    .show())

+-------+-------+----------+--------+---------+--------+-------+-----------+
|summary| medage|totalrooms|totbdrms|      pop|houshlds| medinc|      medhv|
+-------+-------+----------+--------+---------+--------+-------+-----------+
|  count|20640.0|   20640.0| 20433.0|  20640.0| 20640.0|20640.0|    20640.0|
|   mean|28.6395| 2635.7631|537.8706|1425.4767|499.5397| 3.8707|206855.8169|
| stddev|12.5856| 2181.6153|421.3851|1132.4621|382.3298| 1.8998|115395.6159|
|    min|    1.0|       2.0|     1.0|      3.0|     1.0| 0.4999|    14999.0|
|    max|   52.0|   39320.0|  6445.0|  35682.0|  6082.0|15.0001|   500001.0|
+-------+-------+----------+--------+---------+--------+-------+-----------+



In [38]:
house_def=house_def.withColumn("medhv",col("medhv")/100000)
house_def.show(5)

+-------+-----+------+----------+--------+------+--------+------+--------------------+
|   long|  lat|medage|totalrooms|totbdrms|   pop|houshlds|medinc|               medhv|
+-------+-----+------+----------+--------+------+--------+------+--------------------+
|   null| null|  null|      null|    null|  null|    null|  null|                null|
|-122.23|37.88|  41.0|     880.0|   129.0| 322.0|   126.0|8.3252|4.525999999999999...|
|-122.22|37.86|  21.0|    7099.0|  1106.0|2401.0|  1138.0|8.3014|3.584999999999999...|
|-122.24|37.85|  52.0|    1467.0|   190.0| 496.0|   177.0|7.2574|3.520999999999999...|
|-122.25|37.85|  52.0|    1274.0|   235.0| 558.0|   219.0|5.6431|           3.413E-10|
+-------+-----+------+----------+--------+------+--------+------+--------------------+
only showing top 5 rows



In [40]:
house_def = (house_def.withColumn("rmsperhh", F.round(col("totalrooms")/col("houshlds"), 2))
                       .withColumn("popperhh", F.round(col("pop")/col("houshlds"), 2))
                       .withColumn("bdrmsperrm", F.round(col("totbdrms")/col("totalrooms"), 2)))

In [41]:
house_def.show(10)

+-------+-----+------+----------+--------+------+--------+------+--------------------+--------+--------+----------+
|   long|  lat|medage|totalrooms|totbdrms|   pop|houshlds|medinc|               medhv|rmsperhh|popperhh|bdrmsperrm|
+-------+-----+------+----------+--------+------+--------+------+--------------------+--------+--------+----------+
|   null| null|  null|      null|    null|  null|    null|  null|                null|    null|    null|      null|
|-122.23|37.88|  41.0|     880.0|   129.0| 322.0|   126.0|8.3252|4.525999999999999...|    6.98|    2.56|      0.15|
|-122.22|37.86|  21.0|    7099.0|  1106.0|2401.0|  1138.0|8.3014|3.584999999999999...|    6.24|    2.11|      0.16|
|-122.24|37.85|  52.0|    1467.0|   190.0| 496.0|   177.0|7.2574|3.520999999999999...|    8.29|     2.8|      0.13|
|-122.25|37.85|  52.0|    1274.0|   235.0| 558.0|   219.0|5.6431|           3.413E-10|    5.82|    2.55|      0.18|
|-122.25|37.85|  52.0|    1627.0|   280.0| 565.0|   259.0|3.8462|       

In [45]:
house_def = house_def.select("medhv", 
                              "totbdrms", 
                              "pop", 
                              "houshlds", 
                              "medinc", 
                              "rmsperhh", 
                              "popperhh", 
                              "bdrmsperrm")

In [52]:
featureCols = ["totbdrms", "pop", "houshlds", "medinc", "rmsperhh", "popperhh", "bdrmsperrm"]


In [49]:
assembler = VectorAssembler(inputCols=featureCols, outputCol="features") 


In [50]:
assembled_df = assembler.transform(house_def)


In [None]:
assembled_df.show(10, truncate=False)


In [None]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)
scaled_df.select("features", "features_scaled").show(10, truncate=False)

In [None]:
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=rnd_seed)


In [None]:
lr = (LinearRegression(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', 
                               maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False))

In [None]:
linearModel = lr.fit(train_data)

In [None]:
linearModel.coefficients

In [None]:
linearModel.intercept

In [None]:
print("RMSE: {0}".format(linearModel.summary.rootMeanSquaredError))


In [None]:
print("MAE: {0}".format(linearModel.summary.meanAbsoluteError))
