# Create Spark object

In [30]:
# No environment path hacking required!
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("HousingModelDocker") \
    .getOrCreate()

# Packages

In [31]:
import numpy as np
import pyspark.sql.functions as F

# Create Dataframe

In [32]:
import os
display(os.listdir('work'))

['housing_prices.csv', 'housing_prices.ipynb']

In [33]:
data = spark.read.csv('work/housing_prices.csv',header=True, inferSchema=True)
data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [34]:
data.count()

20640

## Data Cleansing

### Counting NULL values in SINGLE column

In [35]:
(
    data.select
    (
        F.count(
                F.when(
                        F.col('ocean_proximity').isNull(),1
                    )
                ).alias('ocean_proximity')
    ).show()
 )

+---------------+
|ocean_proximity|
+---------------+
|              0|
+---------------+



### Counting NULL values in all columns

In [36]:
(
    data.select
    (
        [
            F.count(
                F.when(
                        F.col(c).isNull(),1
                    )
                ).alias(c) for c in data.columns
        ]
    ).show()
)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



### Handling NULL value in ``total_bedrooms`` column

In [37]:
filtered_data = data.dropna(subset=['total_bedrooms'])
filtered_data.count()

20433

# Create Features

In [39]:
from pyspark.ml.feature import VectorAssembler

feature_columns = ['housing_median_age',
                   'total_rooms',
                   'total_bedrooms',
                   'population',
                   'households',
                   'median_income']
assemblers = VectorAssembler(inputCols=feature_columns,outputCol='features')
transformed_data = assemblers.transform(filtered_data)
transformed_data.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|            features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[41.0,880.0,129.0...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[21.0,7099.0,1106...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[52.0,1467.0,190....|
|  -122.25|   37.85|              52.0|     12

# Create Model

In [43]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='median_house_value')
model = lr.fit(transformed_data)

# Quick Test

In [46]:
single_record = [
    {
        'housing_median_age': 41.0,
        'total_rooms': 880.0,
        'total_bedrooms': 129.0,
        'population': 322.0,
        'households': 126.0,
        'median_income': 8.3252

    }
]

single_record_df = spark.createDataFrame(single_record)

new_data = assemblers.transform(single_record_df)
new_data.show()
model.transform(new_data).show()

+----------+------------------+-------------+----------+--------------+-----------+--------------------+
|households|housing_median_age|median_income|population|total_bedrooms|total_rooms|            features|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+
|     126.0|              41.0|       8.3252|     322.0|         129.0|      880.0|[41.0,880.0,129.0...|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+

+----------+------------------+-------------+----------+--------------+-----------+--------------------+------------------+
|households|housing_median_age|median_income|population|total_bedrooms|total_rooms|            features|        prediction|
+----------+------------------+-------------+----------+--------------+-----------+--------------------+------------------+
|     126.0|              41.0|       8.3252|     322.0|         129.0|      880.0|[41.0,880.0,129.0..